git: 9front

ref: ee8b0d40248c6a6dcd16cfedb2bcba41f16df0ad
dir: /sys/src/9/port/devloopback.c/

View raw version
#include	"u.h"
#include	"../port/lib.h"
#include	"mem.h"
#include	"dat.h"
#include	"fns.h"
#include	"../port/error.h"

typedef struct Link	Link;
typedef struct Loop	Loop;

struct Link
{
	Lock;

	int	ref;

	long	packets;	/* total number of packets sent */
	long	bytes;		/* total number of bytes sent */
	int	indrop;		/* enable dropping on iq overflow */
	long	soverflows;	/* packets dropped because iq overflowed */
	long	droprate;	/* drop 1/droprate packets in tq */
	long	drops;		/* packets deliberately dropped */

	vlong	delay0ns;	/* nanosec of delay in the link */
	long	delaynns;	/* nanosec of delay per byte */

	Block	*tq;		/* transmission queue */
	Block	*tqtail;
	vlong	tout;		/* time the last packet in tq is really out */
	vlong	tin;		/* time the head packet in tq enters the remote side  */

	long	limit;		/* queue buffering limit */
	Queue	*oq;		/* output queue from other side & packets in the link */
	Queue	*iq;

	Timer	ci;		/* time to move packets from  next packet from oq */
};

struct Loop
{
	QLock;
	int	ref;
	int	minmtu;		/* smallest block transmittable */
	Loop	*next;
	ulong	path;
	Link	link[2];
};

static struct
{
	Lock;
	ulong	path;
} loopbackalloc;

enum
{
	Qtopdir=	1,		/* top level directory */

	Qloopdir,			/* loopback* directory */

	Qportdir,			/* directory each end of the loop */
	Qctl,
	Qstatus,
	Qstats,
	Qdata,

	MaxQ,

	Nloopbacks	= 5,

	Statelen	= 23*1024,	/* status buffer size */

	Tmsize		= 8,
	Delayn 		= 10000,	/* default delays in ns */
	Delay0 		= 2500000,

	Loopqlim	= 32*1024,	/* default size of queues */
};

static Dirtab loopportdir[] =
{
	"ctl",		{Qctl},		0,			0222,
	"status",	{Qstatus},	0,			0444,
	"stats",	{Qstats},	0,			0444,
	"data",		{Qdata},	0,			0666,
};
static Dirtab loopdirs[MaxQ];

static Loop	loopbacks[Nloopbacks];

#define TYPE(x) 	(((ulong)(x))&0xff)
#define ID(x) 		(((ulong)(x))>>8)
#define QID(x,y) 	((((ulong)(x))<<8)|((ulong)(y)))

static void	looper(Loop *lb);
static long	loopoput(Loop *lb, Link *link, Block *bp);
static void	ptime(uchar *p, vlong t);
static vlong	gtime(uchar *p);
static void	closelink(Link *link, int dofree);
static void	pushlink(Link *link, vlong now);
static void	freelb(Loop *lb);
static void	linkintr(Ureg*, Timer *ci);

static void
loopbackinit(void)
{
	int i;

	for(i = 0; i < Nloopbacks; i++)
		loopbacks[i].path = i;

	/* invert directory tables for non-directory entries */
	for(i=0; i<nelem(loopportdir); i++)
		loopdirs[loopportdir[i].qid.path] = loopportdir[i];
}

static Chan*
loopbackattach(char *spec)
{
	Loop *volatile lb;
	Queue *q;
	Chan *c;
	int chan;
	ulong dev;

	dev = strtoul(spec, nil, 10);
	if(dev >= Nloopbacks)
		error(Enodev);

	c = devattach(loopbackdevtab.dc, spec);
	if(waserror()){
		chanfree(c);
		nexterror();
	}

	lb = &loopbacks[dev];
	qlock(lb);
	if(waserror()){
		lb->ref--;
		qunlock(lb);
		nexterror();
	}

	lb->ref++;
	if(lb->ref == 1){
		for(chan = 0; chan < 2; chan++){
			lb->link[chan].ci.mode = Trelative;
			lb->link[chan].ci.a = &lb->link[chan];
			lb->link[chan].ci.f = linkintr;
			lb->link[chan].limit = Loopqlim;
			q = qopen(lb->link[chan].limit, 0, 0, 0);
			lb->link[chan].iq = q;
			if(q == nil){
				freelb(lb);
				exhausted("memory");
			}
			q = qopen(lb->link[chan].limit, 0, 0, 0);
			lb->link[chan].oq = q;
			if(q == nil){
				freelb(lb);
				exhausted("memory");
			}
			lb->link[chan].indrop = 1;

			lb->link[chan].delaynns = Delayn;
			lb->link[chan].delay0ns = Delay0;
		}
	}
	poperror();
	qunlock(lb);

	poperror();

	mkqid(&c->qid, QID(0, Qtopdir), 0, QTDIR);
	c->aux = lb;
	c->dev = dev;
	return c;
}

static int
loopbackgen(Chan *c, char*, Dirtab*, int, int i, Dir *dp)
{
	Dirtab *tab;
	int len, type;
	Qid qid;

	type = TYPE(c->qid.path);
	if(i == DEVDOTDOT){
		switch(type){
		case Qtopdir:
		case Qloopdir:
			snprint(up->genbuf, sizeof(up->genbuf), "#λ%ld", c->dev);
			mkqid(&qid, QID(0, Qtopdir), 0, QTDIR);
			devdir(c, qid, up->genbuf, 0, eve, 0555, dp);
			break;
		case Qportdir:
			snprint(up->genbuf, sizeof(up->genbuf), "loopback%ld", c->dev);
			mkqid(&qid, QID(0, Qloopdir), 0, QTDIR);
			devdir(c, qid, up->genbuf, 0, eve, 0555, dp);
			break;
		default:
			panic("loopbackgen %llux", c->qid.path);
		}
		return 1;
	}

	switch(type){
	case Qtopdir:
		if(i != 0)
			return -1;
		snprint(up->genbuf, sizeof(up->genbuf), "loopback%ld", c->dev);
		mkqid(&qid, QID(0, Qloopdir), 0, QTDIR);
		devdir(c, qid, up->genbuf, 0, eve, 0555, dp);
		return 1;
	case Qloopdir:
		if(i >= 2)
			return -1;
		snprint(up->genbuf, sizeof(up->genbuf), "%d", i);
		mkqid(&qid, QID(i, QID(0, Qportdir)), 0, QTDIR);
		devdir(c, qid, up->genbuf, 0, eve, 0555, dp);
		return 1;
	case Qportdir:
		if(i >= nelem(loopportdir))
			return -1;
		tab = &loopportdir[i];
		mkqid(&qid, QID(ID(c->qid.path), tab->qid.path), 0, QTFILE);
		devdir(c, qid, tab->name, tab->length, eve, tab->perm, dp);
		return 1;
	default:
		/* non directory entries end up here; must be in lowest level */
		if(c->qid.type & QTDIR)
			panic("loopbackgen: unexpected directory");	
		if(i != 0)
			return -1;
		tab = &loopdirs[type];
		if(tab == nil)
			panic("loopbackgen: unknown type: %d", type);
		len = tab->length;
		devdir(c, c->qid, tab->name, len, eve, tab->perm, dp);
		return 1;
	}
}


static Walkqid*
loopbackwalk(Chan *c, Chan *nc, char **name, int nname)
{
	Walkqid *wq;
	Loop *lb;

	wq = devwalk(c, nc, name, nname, nil, 0, loopbackgen);
	if(wq != nil && wq->clone != nil && wq->clone != c){
		lb = c->aux;
		qlock(lb);
		lb->ref++;
		if((c->flag & COPEN) && TYPE(c->qid.path) == Qdata)
			lb->link[ID(c->qid.path)].ref++;
		qunlock(lb);
	}
	return wq;
}

static int
loopbackstat(Chan *c, uchar *db, int n)
{
	return devstat(c, db, n, nil, 0, loopbackgen);
}

/*
 *  if the stream doesn't exist, create it
 */
static Chan*
loopbackopen(Chan *c, int omode)
{
	Loop *lb;

	if(c->qid.type & QTDIR){
		if(omode != OREAD)
			error(Ebadarg);
		c->mode = omode;
		c->flag |= COPEN;
		c->offset = 0;
		return c;
	}

	lb = c->aux;
	qlock(lb);
	if(TYPE(c->qid.path) == Qdata){
		if(lb->link[ID(c->qid.path)].ref){
			qunlock(lb);
			error(Einuse);
		}
		lb->link[ID(c->qid.path)].ref++;
	}
	qunlock(lb);

	c->mode = openmode(omode);
	c->flag |= COPEN;
	c->offset = 0;
	c->iounit = qiomaxatomic;
	return c;
}

static void
loopbackclose(Chan *c)
{
	Loop *lb;
	int ref, chan;

	lb = c->aux;

	qlock(lb);

	/*
	 * closing either side hangs up the stream
	 */
	if((c->flag & COPEN) && TYPE(c->qid.path) == Qdata){
		chan = ID(c->qid.path);
		if(--lb->link[chan].ref == 0){
			qhangup(lb->link[chan ^ 1].oq, nil);
			looper(lb);
		}
	}


	/*
	 *  if both sides are closed, they are reusable
	 */
	if(lb->link[0].ref == 0 && lb->link[1].ref == 0){
		for(chan = 0; chan < 2; chan++){
			closelink(&lb->link[chan], 0);
			qreopen(lb->link[chan].iq);
			qreopen(lb->link[chan].oq);
			qsetlimit(lb->link[chan].oq, lb->link[chan].limit);
			qsetlimit(lb->link[chan].iq, lb->link[chan].limit);
		}
	}
	ref = --lb->ref;
	if(ref == 0)
		freelb(lb);
	qunlock(lb);
}

static void
freelb(Loop *lb)
{
	int chan;

	for(chan = 0; chan < 2; chan++)
		closelink(&lb->link[chan], 1);
}

/*
 * called with the Loop qlocked,
 * so only pushlink can mess with the queues
 */
static void
closelink(Link *link, int dofree)
{
	Queue *iq, *oq;
	Block *bp;

	ilock(link);
	iq = link->iq;
	oq = link->oq;
	bp = link->tq;
	link->tq = nil;
	link->tqtail = nil;
	link->tout = 0;
	link->tin = 0;
	timerdel(&link->ci);
	iunlock(link);
	if(iq != nil){
		qclose(iq);
		if(dofree){
			ilock(link);
			free(iq);
			link->iq = nil;
			iunlock(link);
		}
	}
	if(oq != nil){
		qclose(oq);
		if(dofree){
			ilock(link);
			free(oq);
			link->oq = nil;
			iunlock(link);
		}
	}
	freeblist(bp);
}

static long
loopbackread(Chan *c, void *va, long n, vlong offset)
{
	Loop *lb;
	Link *link;
	char *buf;
	long rv;

	lb = c->aux;
	switch(TYPE(c->qid.path)){
	default:
		error(Eperm);
		return -1;	/* not reached */
	case Qtopdir:
	case Qloopdir:
	case Qportdir:
		return devdirread(c, va, n, nil, 0, loopbackgen);
	case Qdata:
		return qread(lb->link[ID(c->qid.path)].iq, va, n);
	case Qstatus:
		link = &lb->link[ID(c->qid.path)];
		buf = smalloc(Statelen);
		rv = snprint(buf, Statelen, "delay %lld %ld\n", link->delay0ns, link->delaynns);
		rv += snprint(buf+rv, Statelen-rv, "limit %ld\n", link->limit);
		rv += snprint(buf+rv, Statelen-rv, "indrop %d\n", link->indrop);
		snprint(buf+rv, Statelen-rv, "droprate %ld\n", link->droprate);
		rv = readstr(offset, va, n, buf);
		free(buf);
		break;
	case Qstats:
		link = &lb->link[ID(c->qid.path)];
		buf = smalloc(Statelen);
		rv = snprint(buf, Statelen, "packets: %ld\n", link->packets);
		rv += snprint(buf+rv, Statelen-rv, "bytes: %ld\n", link->bytes);
		rv += snprint(buf+rv, Statelen-rv, "dropped: %ld\n", link->drops);
		snprint(buf+rv, Statelen-rv, "soft overflows: %ld\n", link->soverflows);
		rv = readstr(offset, va, n, buf);
		free(buf);
		break;
	}
	return rv;
}

static Block*
loopbackbread(Chan *c, long n, ulong offset)
{
	Loop *lb;

	lb = c->aux;
	if(TYPE(c->qid.path) == Qdata)
		return qbread(lb->link[ID(c->qid.path)].iq, n);

	return devbread(c, n, offset);
}

static long
loopbackbwrite(Chan *c, Block *bp, ulong off)
{
	Loop *lb;

	lb = c->aux;
	if(TYPE(c->qid.path) == Qdata)
		return loopoput(lb, &lb->link[ID(c->qid.path) ^ 1], bp);
	return devbwrite(c, bp, off);
}

static long
loopbackwrite(Chan *c, void *va, long n, vlong off)
{
	Loop *lb;
	Link *link;
	Cmdbuf *volatile cb;
	Block *volatile bp;
	vlong d0, d0ns;
	long dn, dnns;

	switch(TYPE(c->qid.path)){
	case Qdata:
		bp = allocb(n);
		if(waserror()){
			freeb(bp);
			nexterror();
		}
		memmove(bp->wp, va, n);
		poperror();
		bp->wp += n;
		return loopbackbwrite(c, bp, off);
	case Qctl:
		lb = c->aux;
		link = &lb->link[ID(c->qid.path)];
		cb = parsecmd(va, n);
		if(waserror()){
			free(cb);
			nexterror();
		}
		if(cb->nf < 1)
			error("short control request");
		if(strcmp(cb->f[0], "delay") == 0){
			if(cb->nf != 3)
				error("usage: delay latency bytedelay");
			d0ns = strtoll(cb->f[1], nil, 10);
			dnns = strtol(cb->f[2], nil, 10);

			/*
			 * it takes about 20000 cycles on a pentium ii
			 * to run pushlink; perhaps this should be accounted.
			 */

			ilock(link);
			link->delay0ns = d0ns;
			link->delaynns = dnns;
			iunlock(link);
		}else if(strcmp(cb->f[0], "indrop") == 0){
			if(cb->nf != 2)
				error("usage: indrop [01]");
			ilock(link);
			link->indrop = strtol(cb->f[1], nil, 0) != 0;
			iunlock(link);
		}else if(strcmp(cb->f[0], "droprate") == 0){
			if(cb->nf != 2)
				error("usage: droprate ofn");
			ilock(link);
			link->droprate = strtol(cb->f[1], nil, 0);
			iunlock(link);
		}else if(strcmp(cb->f[0], "limit") == 0){
			if(cb->nf != 2)
				error("usage: limit maxqsize");
			ilock(link);
			link->limit = strtol(cb->f[1], nil, 0);
			qsetlimit(link->oq, link->limit);
			qsetlimit(link->iq, link->limit);
			iunlock(link);
		}else if(strcmp(cb->f[0], "reset") == 0){
			if(cb->nf != 1)
				error("usage: reset");
			ilock(link);
			link->packets = 0;
			link->bytes = 0;
			link->indrop = 0;
			link->soverflows = 0;
			link->drops = 0;
			iunlock(link);
		}else
			error("unknown control request");
		poperror();
		free(cb);
		break;
	default:
		error(Eperm);
	}

	return n;
}

static long
loopoput(Loop *lb, Link *link, Block *bp)
{
	long n = BLEN(bp);

	bp = padblock(bp, Tmsize);
	if(BLEN(bp) < lb->minmtu)
		bp = adjustblock(bp, lb->minmtu);
	ptime(bp->rp, todget(nil, nil));

	link->packets++;
	link->bytes += n;

	qbwrite(link->oq, bp);

	looper(lb);
	return n;
}

static void
looper(Loop *lb)
{
	vlong t;
	int chan;

	t = todget(nil);
	for(chan = 0; chan < 2; chan++)
		pushlink(&lb->link[chan], t);
}

static void
linkintr(Ureg*, Timer *ci)
{
	Link *link;

	link = ci->a;
	pushlink(link, ci->ns);
}

/*
 * move blocks between queues if they are ready.
 * schedule an interrupt for the next interesting time.
 *
 * must be called with the link ilocked.
 */
static void
pushlink(Link *link, vlong now)
{
	Block *bp;
	vlong tout, tin;

	/*
	 * put another block in the link queue
	 */
	ilock(link);
	if(link->iq == nil || link->oq == nil){
		iunlock(link);
		return;

	}
	timerdel(&link->ci);

	/*
	 * put more blocks into the xmit queue
	 * use the time the last packet was supposed to go out
	 * as the start time for the next packet, rather than
	 * the current time.  this more closely models a network
	 * device which can queue multiple output packets.
	 */
	tout = link->tout;
	if(!tout)
		tout = now;
	while(tout <= now){
		bp = qget(link->oq);
		if(bp == nil){
			tout = 0;
			break;
		}

		/*
		 * can't send the packet before it gets queued
		 */
		tin = gtime(bp->rp);
		if(tin > tout)
			tout = tin;
		tout = tout + (BLEN(bp) - Tmsize) * link->delayn;

		/*
		 * drop packets
		 */
		if(link->droprate && nrand(link->droprate) == 0)
			link->drops++;
		else{
			ptime(bp->rp, tout + link->delay0ns);
			if(link->tq == nil)
				link->tq = bp;
			else
				link->tqtail->next = bp;
			link->tqtail = bp;
		}
	}

	/*
	 * record the next time a packet can be sent,
	 * but don't schedule an interrupt if none is waiting
	 */
	link->tout = tout;
	if(!qcanread(link->oq))
		tout = 0;

	/*
	 * put more blocks into the receive queue
	 */
	tin = 0;
	while(bp = link->tq){
		tin = gtime(bp->rp);
		if(tin > now)
			break;
		bp->rp += Tmsize;
		link->tq = bp->next;
		bp->next = nil;
		if(!link->indrop)
			qpassnolim(link->iq, bp);
		else if(qpass(link->iq, bp) < 0)
			link->soverflows++;
		tin = 0;
	}
	if(bp == nil && qisclosed(link->oq) && !qcanread(link->oq) && !qisclosed(link->iq))
		qhangup(link->iq, nil);
	link->tin = tin;
	if(!tin || tin > tout && tout)
		tin = tout;

	link->ci.ns = tin - now;
	if(tin){
		if(tin < now)
			panic("loopback unfinished business");
		timeradd(&link->ci);
	}
	iunlock(link);
}

static void
ptime(uchar *p, vlong t)
{
	ulong tt;

	tt = t >> 32;
	p[0] = tt >> 24;
	p[1] = tt >> 16;
	p[2] = tt >> 8;
	p[3] = tt;
	tt = t;
	p[4] = tt >> 24;
	p[5] = tt >> 16;
	p[6] = tt >> 8;
	p[7] = tt;
}

static vlong
gtime(uchar *p)
{
	ulong t1, t2;

	t1 = (p[0] << 24) | (p[1] << 16) | (p[2] << 8) | p[3];
	t2 = (p[4] << 24) | (p[5] << 16) | (p[6] << 8) | p[7];
	return ((vlong)t1 << 32) | t2;
}

Dev loopbackdevtab = {
	L'λ',
	"loopback",

	devreset,
	loopbackinit,
	devshutdown,
	loopbackattach,
	loopbackwalk,
	loopbackstat,
	loopbackopen,
	devcreate,
	loopbackclose,
	loopbackread,
	loopbackbread,
	loopbackwrite,
	loopbackbwrite,
	devremove,
	devwstat,
};