code: plan9front

Download patch

ref: ad9748422e740128174f51090d7ca776681569ac
parent: 0f9c172712e834b8f45630a9665f2e271161692c
author: cinap_lenrek <cinap_lenrek@felloff.net>
date: Sun Mar 24 16:37:01 EDT 2024

kernel: Fix qio flow control

There is a pathological case with qio that triggers
a dead-lock for single threaded servers and
multiple requesters that can be reproduced like this:

int pfd[2];

void
main(int argc, char *argv[])
{
	char buf[0x10000];
	int i, n;

	ARGBEGIN {
	} ARGEND;

	if(pipe(pfd) < 0)
		sysfatal("pipe: %r");

	if(fork() == 0){
		while((n = read(pfd[0], buf, sizeof(buf))) > 0){
			sleep(10);
			write(pfd[0], buf, n);
		}
		exits(nil);
	}

	for(i = 0; i < PROCS; i++){
		if(fork() == 0){
			buf[0] = i;
			for(;;){
				write(pfd[1], buf, sizeof(buf));
				if(read(pfd[1], buf, sizeof(buf)) <= 0)
					break;
				print("%d %d\n", i, buf[0]);
			}
			exits(nil);
		}
	}
	waitpid();
}

The problem is how the reader decides to wake up the writer,
which was based only on the global queue length, but it should
really depend on the local queuing position of the writers
and their distance to the reader position.

Otherwise, a writer can be blocked even tho its message
has already been consumed by the reader.
When the reader tries to reply, it can get blocked himself
on writing the reply.

The new qio code basically makes sure that writers get
unblocked in order avoiding the issue.

The qio block statistics and qwindow() are gone now as
they where mostly unused.

--- a/sys/lib/acid/kernel
+++ b/sys/lib/acid/kernel
@@ -55,15 +55,6 @@
 	}
 }
 
-defn qiostats() {
-	print ("padblockcnt=", *padblockcnt\D, "\n");
-	print ("concatblockcnt=", *concatblockcnt\D, "\n");
-	print ("pullupblockcnt=", *pullupblockcnt\D, "\n");
-	print ("copyblockcnt=", *copyblockcnt\D, "\n");
-	print ("consumecnt=", *consumecnt\D, "\n");
-	print ("producecnt=", *producecnt\D, "\n");
-}
-
 // dump channels
 defn chan(c) {
 	local d, q;
--- a/sys/src/9/port/devpipe.c
+++ b/sys/src/9/port/devpipe.c
@@ -370,8 +370,6 @@
 	n = convM2D(dp, n, &d, nil);
 	if(n == 0)
 		error(Eshortstat);
-	if(d.length < 1 || d.length > conf.pipeqsize)
-		error(Ebadarg);
 
 	p = c->aux;
 	switch(NETTYPE(c->qid.path)){
@@ -379,6 +377,8 @@
 		error(Eperm);
 	case Qdata0:
 	case Qdata1:
+		if((uvlong)d.length > conf.pipeqsize)
+			error(Ebadarg);
 		qsetlimit(p->q[0], d.length);
 		qsetlimit(p->q[1], d.length);
 		break;
--- a/sys/src/9/port/portfns.h
+++ b/sys/src/9/port/portfns.h
@@ -301,7 +301,6 @@
 void		qreopen(Queue*);
 void		qsetlimit(Queue*, int);
 void		qunlock(QLock*);
-int		qwindow(Queue*);
 int		qwrite(Queue*, void*, int);
 void		qnoblock(Queue*, int);
 void		randominit(void);
--- a/sys/src/9/port/qio.c
+++ b/sys/src/9/port/qio.c
@@ -5,41 +5,34 @@
 #include	"fns.h"
 #include	"../port/error.h"
 
-static ulong padblockcnt;
-static ulong concatblockcnt;
-static ulong pullupblockcnt;
-static ulong copyblockcnt;
-static ulong consumecnt;
-static ulong producecnt;
+#define QDEBUG	if(1)
 
-#define QDEBUG	if(0)
-
 /*
  *  IO queues
  */
 typedef struct Queue	Queue;
-
 struct Queue
 {
 	Lock;
 
+	int	state;
+	int	dlen;		/* data length in bytes */
+	uint	rp, wp;		/* read/write position (counting BALLOC() bytes) */
+	int	limit;		/* max BALLOC() bytes in queue */
+	int	inilim;		/* initial limit */
+	uchar	noblock;	/* true if writes return immediately when q full */
+	uchar	eof;		/* number of eofs read by user */
+
 	Block*	bfirst;		/* buffer */
 	Block*	blast;
 
-	int	len;		/* bytes allocated to queue */
-	int	dlen;		/* data bytes in queue */
-	int	limit;		/* max bytes in queue */
-	int	inilim;		/* initial limit */
-	int	state;
-	int	noblock;	/* true if writes return immediately when q full */
-	int	eof;		/* number of eofs read by user */
-
+	void*	arg;		/* argument to kick and bypass */
 	void	(*kick)(void*);	/* restart output */
 	void	(*bypass)(void*, Block*);	/* bypass queue altogether */
-	void*	arg;		/* argument to kick */
 
 	QLock	rlock;		/* mutex for reading processes */
 	Rendez	rr;		/* process waiting to read */
+
 	QLock	wlock;		/* mutex for writing processes */
 	Rendez	wr;		/* process waiting to write */
 
@@ -69,44 +62,6 @@
 }
 
 /*
- *  pad a block to the front (or the back if size is negative)
- */
-Block*
-padblock(Block *bp, int size)
-{
-	int n;
-	Block *nbp;
-
-	QDEBUG checkb(bp, "padblock 0");
-	if(size >= 0){
-		if(bp->rp - bp->base >= size){
-			bp->rp -= size;
-			return bp;
-		}
-		n = BLEN(bp);
-		nbp = allocb(size+n);
-		nbp->rp += size;
-		nbp->wp = nbp->rp;
-		memmove(nbp->wp, bp->rp, n);
-		nbp->wp += n;
-		nbp->rp -= size;
-	} else {
-		size = -size;
-		if(bp->lim - bp->wp >= size)
-			return bp;
-		n = BLEN(bp);
-		nbp = allocb(n+size);
-		memmove(nbp->wp, bp->rp, n);
-		nbp->wp += n;
-	}
-	nbp->next = bp->next;
-	freeb(bp);
-	padblockcnt++;
-	QDEBUG checkb(nbp, "padblock 1");
-	return nbp;
-}
-
-/*
  *  return count of bytes in a string of blocks
  */
 int
@@ -123,19 +78,33 @@
 }
 
 /*
- * return count of space in blocks
+ *  copy the contents of a string of blocks into
+ *  memory from an offset. blocklist kept unchanged.
+ *  return number of copied bytes.
  */
-int
-blockalloclen(Block *bp)
+long
+readblist(Block *b, uchar *p, long n, ulong o)
 {
-	int len;
+	ulong m, r;
 
-	len = 0;
-	while(bp != nil) {
-		len += BALLOC(bp);
-		bp = bp->next;
+	r = 0;
+	while(n > 0 && b != nil){
+		m = BLEN(b);
+		if(o >= m)
+			o -= m;
+		else {
+			m -= o;
+			if(n < m)
+				m = n;
+			memmove(p, b->rp + o, m);
+			p += m;
+			r += m;
+			n -= m;
+			o = 0;
+		}
+		b = b->next;
 	}
-	return len;
+	return r;
 }
 
 /*
@@ -150,7 +119,6 @@
 	if(bp->next == nil)
 		return bp;
 	len = blocklen(bp);
-	concatblockcnt += len;
 	return pullupblock(bp, len);
 }
 
@@ -163,6 +131,8 @@
 	Block *nbp;
 	int i;
 
+	assert(n >= 0);
+
 	/*
 	 *  this should almost always be true, it's
 	 *  just to avoid every caller checking.
@@ -185,7 +155,6 @@
 	 */
 	n -= BLEN(bp);
 	while((nbp = bp->next) != nil){
-		pullupblockcnt++;
 		i = BLEN(nbp);
 		if(i > n) {
 			memmove(bp->wp, nbp->rp, n);
@@ -224,6 +193,8 @@
 {
 	Block *b;
 
+	assert(n >= 0);
+
 	if(BLEN(q->bfirst) >= n)
 		return q->bfirst;
 	q->bfirst = pullupblock(q->bfirst, n);
@@ -242,6 +213,9 @@
 	ulong l;
 	Block *nb, *startb;
 
+	assert(len >= 0);
+	assert(offset >= 0);
+
 	QDEBUG checkb(bp, "trimblock 1");
 	l = blocklen(bp);
 	if(offset == 0 && len == l)
@@ -278,6 +252,43 @@
 }
 
 /*
+ *  pad a block to the front (or the back if size is negative)
+ */
+Block*
+padblock(Block *bp, int size)
+{
+	int n;
+	Block *nbp;
+
+	QDEBUG checkb(bp, "padblock 0");
+	if(size >= 0){
+		if(bp->rp - bp->base >= size){
+			bp->rp -= size;
+			return bp;
+		}
+		n = BLEN(bp);
+		nbp = allocb(size+n);
+		nbp->rp += size;
+		nbp->wp = nbp->rp;
+		memmove(nbp->wp, bp->rp, n);
+		nbp->wp += n;
+		nbp->rp -= size;
+	} else {
+		size = -size;
+		if(bp->lim - bp->wp >= size)
+			return bp;
+		n = BLEN(bp);
+		nbp = allocb(n+size);
+		memmove(nbp->wp, bp->rp, n);
+		nbp->wp += n;
+	}
+	nbp->next = bp->next;
+	freeb(bp);
+	QDEBUG checkb(nbp, "padblock 1");
+	return nbp;
+}
+
+/*
  *  copy 'count' bytes into a new block
  */
 Block*
@@ -286,6 +297,8 @@
 	int l;
 	Block *nbp;
 
+	assert(count >= 0);
+
 	QDEBUG checkb(bp, "copyblock 0");
 	nbp = allocb(count);
 	for(; count > 0 && bp != nil; bp = bp->next){
@@ -300,7 +313,6 @@
 		memset(nbp->wp, 0, count);
 		nbp->wp += count;
 	}
-	copyblockcnt++;
 	QDEBUG checkb(nbp, "copyblock 1");
 
 	return nbp;
@@ -334,7 +346,30 @@
 	return bp;
 }
 
+/*
+ *  if the allocated space is way out of line with the used
+ *  space, reallocate to a smaller block
+ */
+Block*
+packblock(Block *bp)
+{
+	Block **l, *nbp;
+	int n;
 
+	for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
+		n = BLEN(nbp);
+		if((n<<2) < BALLOC(nbp)){
+			*l = allocb(n);
+			memmove((*l)->wp, nbp->rp, n);
+			(*l)->wp += n;
+			(*l)->next = nbp->next;
+			freeb(nbp);
+		}
+	}
+
+	return bp;
+}
+
 /*
  *  throw away up to count bytes from a
  *  list of blocks.  Return count of bytes
@@ -350,8 +385,8 @@
 	if(bph == nil)
 		return 0;
 
-	while(*bph != nil && count != 0) {
-		bp = *bph;
+	while((bp = *bph) != nil && count > 0) {
+		QDEBUG checkb(bp, "pullblock ");
 		n = BLEN(bp);
 		if(count < n)
 			n = count;
@@ -358,7 +393,6 @@
 		bytes += n;
 		count -= n;
 		bp->rp += n;
-		QDEBUG checkb(bp, "pullblock ");
 		if(BLEN(bp) == 0) {
 			*bph = bp->next;
 			bp->next = nil;
@@ -369,100 +403,146 @@
 }
 
 /*
- *  get next block from a queue, return null if nothing there
+ *  remove a block from the front of the queue
  */
 Block*
-qget(Queue *q)
+qremove(Queue *q)
 {
-	int dowakeup;
 	Block *b;
 
-	/* sync with qwrite */
-	ilock(q);
-
 	b = q->bfirst;
-	if(b == nil){
-		q->state |= Qstarve;
-		iunlock(q);
+	if(b == nil)
 		return nil;
-	}
-	QDEBUG checkb(b, "qget");
+	QDEBUG checkb(b, "qremove");
 	q->bfirst = b->next;
 	b->next = nil;
-	q->len -= BALLOC(b);
 	q->dlen -= BLEN(b);
+	q->rp += BALLOC(b);
+	return b;
+}
 
-	/* if writer flow controlled, restart */
-	if((q->state & Qflow) && q->len < q->limit/2){
-		q->state &= ~Qflow;
-		dowakeup = 1;
-	} else
-		dowakeup = 0;
+/*
+ *  put a block back to the front of the queue
+ */
+void
+qputback(Queue *q, Block *b)
+{
+	QDEBUG checkb(b, "qputback");
+	b->next = q->bfirst;
+	if(q->bfirst == nil)
+		q->blast = b;
+	q->bfirst = b;
+	q->dlen += BLEN(b);
+	q->rp -= BALLOC(b);
+}
 
+/*
+ *  after removing data from the queue,
+ *  unlock queue and wakeup blocked writer.
+ *  called at interrupt level.
+ */
+static int
+iunlock_consumer(Queue *q)
+{
+	int s = q->state;
+
+	/* stop flow control when back at or below the limit */
+	if((int)(q->wp - q->rp) <= q->limit)
+		q->state = s & ~Qflow;
+
 	iunlock(q);
 
-	if(dowakeup)
+	if(s & Qflow){
+		/*
+		 * wakeup flow controlled writers.
+		 * note that this is done even when q->state
+		 * still has Qflow set, as the unblocking
+		 * condition depends on the writers local queuing
+		 * position, not on the global queue length.
+		 */
 		wakeup(&q->wr);
+	}
+	return s;
+}
 
-	return b;
+/*
+ *  after removing data from the queue,
+ *  unlock queue and wakeup blocked writer.
+ *  get output going again when it was blocked.
+ *  called at process level.
+ */
+static int
+iunlock_reader(Queue *q)
+{
+	int s = iunlock_consumer(q);
+
+	if(q->kick != nil && s & Qflow)
+		(*q->kick)(q->arg);
+
+	return s;
 }
 
 /*
- *  throw away the next 'len' bytes in the queue
+ *  after inserting into queue,
+ *  unlock queue and wakeup starved reader.
+ *  called at interrupt level.
  */
-int
-qdiscard(Queue *q, int len)
+static int
+iunlock_producer(Queue *q)
 {
-	Block *b, *tofree = nil;
-	int dowakeup, n, sofar;
+	int s = q->state;
 
-	ilock(q);
-	for(sofar = 0; sofar < len; sofar += n){
-		b = q->bfirst;
-		if(b == nil)
-			break;
-		QDEBUG checkb(b, "qdiscard");
-		n = BLEN(b);
-		if(n <= len - sofar){
-			q->bfirst = b->next;
-			q->len -= BALLOC(b);
-			q->dlen -= BLEN(b);
+	/* start flow control when above the limit */
+	if((int)(q->wp - q->rp) > q->limit)
+		s |= Qflow;
 
-			/* remember to free this */
-			b->next = tofree;
-			tofree = b;
-		} else {
-			n = len - sofar;
-			b->rp += n;
-			q->dlen -= n;
-		}
+	q->state = s & ~Qstarve;
+	iunlock(q);
+
+	if(s & Qstarve){
+		Proc *p = wakeup(&q->rr);
+
+		/* if we just wokeup a higher priority process, let it run */
+		if(p != nil && up != nil && p->priority > up->priority && islo())
+			sched();
 	}
+	return s;
+}
 
-	/*
-	 *  if writer flow controlled, restart
-	 *
-	 *  This used to be
-	 *	q->len < q->limit/2
-	 *  but it slows down tcp too much for certain write sizes.
-	 *  I really don't understand it completely.  It may be
-	 *  due to the queue draining so fast that the transmission
-	 *  stalls waiting for the app to produce more data.  - presotto
-	 */
-	if((q->state & Qflow) && q->len < q->limit){
-		q->state &= ~Qflow;
-		dowakeup = 1;
-	} else
-		dowakeup = 0;
+/*
+ *  unlock queue and wakeup starved reader.
+ *  get output going again when it was starved.
+ *  called at process level.
+ */
+static int
+iunlock_writer(Queue *q)
+{
+	int s = iunlock_producer(q);
 
-	iunlock(q);
+	if(q->kick != nil && s & (Qstarve|Qkick))
+		(*q->kick)(q->arg);
 
-	if(dowakeup)
-		wakeup(&q->wr);
+	return s;
+}
 
-	if(tofree != nil)
-		freeblist(tofree);
+/*
+ *  get next block from a queue, return null if nothing there
+ *  called at interrupt level.
+ */
+Block*
+qget(Queue *q)
+{
+	Block *b;
 
-	return sofar;
+	ilock(q);
+	if((b = qremove(q)) == nil){
+		q->state |= Qstarve;
+		iunlock(q);
+		return nil;
+	}
+	iunlock_consumer(q);
+
+	return b;
 }
 
 /*
@@ -472,12 +552,11 @@
 qconsume(Queue *q, void *vp, int len)
 {
 	Block *b, *tofree = nil;
-	int n, dowakeup;
-	uchar *p = vp;
+	int n;
 
-	/* sync with qwrite */
-	ilock(q);
+	assert(len >= 0);
 
+	ilock(q);
 	for(;;) {
 		b = q->bfirst;
 		if(b == nil){
@@ -490,8 +569,10 @@
 		n = BLEN(b);
 		if(n > 0)
 			break;
+
+		/* get rid of zero-length blocks */
 		q->bfirst = b->next;
-		q->len -= BALLOC(b);
+		q->rp += BALLOC(b);
 
 		/* remember to free this */
 		b->next = tofree;
@@ -498,10 +579,9 @@
 		tofree = b;
 	};
 
-	consumecnt += n;
 	if(n < len)
 		len = n;
-	memmove(p, b->rp, len);
+	memmove(vp, b->rp, len);
 	b->rp += len;
 	q->dlen -= len;
 
@@ -508,7 +588,7 @@
 	/* discard the block if we're done with it */
 	if((q->state & Qmsg) || len == n){
 		q->bfirst = b->next;
-		q->len -= BALLOC(b);
+		q->rp += BALLOC(b);
 		q->dlen -= BLEN(b);
 
 		/* remember to free this */
@@ -515,23 +595,40 @@
 		b->next = tofree;
 		tofree = b;
 	}
-
 out:
-	/* if writer flow controlled, restart */
-	if((q->state & Qflow) && q->len < q->limit/2){
-		q->state &= ~Qflow;
-		dowakeup = 1;
-	} else
-		dowakeup = 0;
+	iunlock_consumer(q);
 
-	iunlock(q);
+	freeblist(tofree);
 
-	if(dowakeup)
-		wakeup(&q->wr);
+	return len;
+}
 
-	if(tofree != nil)
-		freeblist(tofree);
+/*
+ *  add a block list to a queue, return bytes added
+ */
+int
+qaddlist(Queue *q, Block *b)
+{
+	int len;
 
+	QDEBUG checkb(b, "qaddlist 1");
+
+	/* queue the block */
+	if(q->bfirst != nil)
+		q->blast->next = b;
+	else
+		q->bfirst = b;
+
+	len = BLEN(b);
+	q->wp += BALLOC(b);
+	while(b->next != nil){
+		b = b->next;
+		QDEBUG checkb(b, "qaddlist 2");
+		len += BLEN(b);
+		q->wp += BALLOC(b);
+	}
+	q->dlen += len;
+	q->blast = b;
 	return len;
 }
 
@@ -538,36 +635,22 @@
 int
 qpass(Queue *q, Block *b)
 {
-	int len, dowakeup;
+	int len;
 
-	/* sync with qread */
-	dowakeup = 0;
 	ilock(q);
-	if(q->len >= q->limit){
+	if(q->state & Qclosed){
 		iunlock(q);
 		freeblist(b);
-		return -1;
+		return 0;
 	}
-	if(q->state & Qclosed){
+	if(q->state & Qflow){
 		iunlock(q);
 		freeblist(b);
-		return 0;
+		return -1;
 	}
-
 	len = qaddlist(q, b);
+	iunlock_producer(q);
 
-	if(q->len >= q->limit/2)
-		q->state |= Qflow;
-
-	if(q->state & Qstarve){
-		q->state &= ~Qstarve;
-		dowakeup = 1;
-	}
-	iunlock(q);
-
-	if(dowakeup)
-		wakeup(&q->rr);
-
 	return len;
 }
 
@@ -574,101 +657,36 @@
 int
 qpassnolim(Queue *q, Block *b)
 {
-	int len, dowakeup;
+	int len;
 
-	/* sync with qread */
-	dowakeup = 0;
 	ilock(q);
-
 	if(q->state & Qclosed){
 		iunlock(q);
 		freeblist(b);
 		return 0;
 	}
-
 	len = qaddlist(q, b);
+	iunlock_producer(q);
 
-	if(q->len >= q->limit/2)
-		q->state |= Qflow;
-
-	if(q->state & Qstarve){
-		q->state &= ~Qstarve;
-		dowakeup = 1;
-	}
-	iunlock(q);
-
-	if(dowakeup)
-		wakeup(&q->rr);
-
 	return len;
 }
 
-/*
- *  if the allocated space is way out of line with the used
- *  space, reallocate to a smaller block
- */
-Block*
-packblock(Block *bp)
-{
-	Block **l, *nbp;
-	int n;
-
-	for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
-		n = BLEN(nbp);
-		if((n<<2) < BALLOC(nbp)){
-			*l = allocb(n);
-			memmove((*l)->wp, nbp->rp, n);
-			(*l)->wp += n;
-			(*l)->next = nbp->next;
-			freeb(nbp);
-		}
-	}
-
-	return bp;
-}
-
 int
 qproduce(Queue *q, void *vp, int len)
 {
 	Block *b;
-	int dowakeup;
-	uchar *p = vp;
 
+	assert(len >= 0);
+
 	b = iallocb(len);
 	if(b == nil)
 		return 0;
 
-	/* sync with qread */
-	dowakeup = 0;
-	ilock(q);
-
-	/* no waiting receivers, room in buffer? */
-	if(q->len >= q->limit){
-		q->state |= Qflow;
-		iunlock(q);
-		freeb(b);
-		return -1;
-	}
-	producecnt += len;
-
 	/* save in buffer */
-	memmove(b->wp, p, len);
+	memmove(b->wp, vp, len);
 	b->wp += len;
-	qaddlist(q, b);
 
-	if(q->state & Qstarve){
-		q->state &= ~Qstarve;
-		dowakeup = 1;
-	}
-
-	if(q->len >= q->limit)
-		q->state |= Qflow;
-	iunlock(q);
-
-	if(dowakeup)
-		wakeup(&q->rr);
-
-	return len;
+	return qpass(q, b);
 }
 
 /*
@@ -679,6 +697,8 @@
 {
 	Block *b;
 
+	assert(len >= 0);
+
 	b = allocb(len);
 	ilock(q);
 	b->wp += readblist(q->bfirst, b->wp, len, offset);
@@ -694,16 +714,18 @@
 {
 	Queue *q;
 
+	assert(limit >= 0);
+
 	q = malloc(sizeof(Queue));
 	if(q == nil)
 		return nil;
 
+	q->dlen = 0;
+	q->wp = q->rp = 0;
 	q->limit = q->inilim = limit;
 	q->kick = kick;
 	q->arg = arg;
-	q->state = msg;
-	
-	q->state |= Qstarve;
+	q->state = msg | Qstarve;
 	q->eof = 0;
 	q->noblock = 0;
 
@@ -720,10 +742,14 @@
 	if(q == nil)
 		return nil;
 
+	q->dlen = 0;
+	q->wp = q->rp = 0;
 	q->limit = 0;
 	q->arg = arg;
 	q->bypass = bypass;
 	q->state = 0;
+	q->eof = 0;
+	q->noblock = 0;
 
 	return q;
 }
@@ -733,7 +759,7 @@
 {
 	Queue *q = a;
 
-	return (q->state & Qclosed) || q->bfirst != nil;
+	return q->bfirst != nil || (q->state & Qclosed);
 }
 
 /*
@@ -749,10 +775,9 @@
 			break;
 
 		if(q->state & Qclosed){
-			if(++q->eof > 3)
+			if(q->eof >= 3 || *q->err && strcmp(q->err, Ehungup) != 0)
 				return -1;
-			if(*q->err && strcmp(q->err, Ehungup) != 0)
-				return -1;
+			q->eof++;
 			return 0;
 		}
 
@@ -765,101 +790,6 @@
 }
 
 /*
- * add a block list to a queue, return bytes added
- */
-int
-qaddlist(Queue *q, Block *b)
-{
-	int len, dlen;
-
-	QDEBUG checkb(b, "qaddlist 1");
-
-	/* queue the block */
-	if(q->bfirst != nil)
-		q->blast->next = b;
-	else
-		q->bfirst = b;
-
-	len = BALLOC(b);
-	dlen = BLEN(b);
-	while(b->next != nil){
-		b = b->next;
-		QDEBUG checkb(b, "qaddlist 2");
-
-		len += BALLOC(b);
-		dlen += BLEN(b);
-	}
-	q->blast = b;
-	q->len += len;
-	q->dlen += dlen;
-	return dlen;
-}
-
-/*
- *  called with q ilocked
- */
-Block*
-qremove(Queue *q)
-{
-	Block *b;
-
-	b = q->bfirst;
-	if(b == nil)
-		return nil;
-	QDEBUG checkb(b, "qremove");
-	q->bfirst = b->next;
-	b->next = nil;
-	q->dlen -= BLEN(b);
-	q->len -= BALLOC(b);
-	return b;
-}
-
-/*
- *  copy the contents of a string of blocks into
- *  memory from an offset. blocklist kept unchanged.
- *  return number of copied bytes.
- */
-long
-readblist(Block *b, uchar *p, long n, ulong o)
-{
-	ulong m, r;
-
-	r = 0;
-	while(n > 0 && b != nil){
-		m = BLEN(b);
-		if(o >= m)
-			o -= m;
-		else {
-			m -= o;
-			if(n < m)
-				m = n;
-			memmove(p, b->rp + o, m);
-			p += m;
-			r += m;
-			n -= m;
-			o = 0;
-		}
-		b = b->next;
-	}
-	return r;
-}
-
-/*
- *  put a block back to the front of the queue
- *  called with q ilocked
- */
-void
-qputback(Queue *q, Block *b)
-{
-	b->next = q->bfirst;
-	if(q->bfirst == nil)
-		q->blast = b;
-	q->bfirst = b;
-	q->len += BALLOC(b);
-	q->dlen += BLEN(b);
-}
-
-/*
  *  cut off n bytes from the end of *h. return a new
  *  block with the tail and change *h to refer to the
  *  head.
@@ -889,31 +819,6 @@
 }
 
 /*
- *  flow control, get producer going again
- *  called with q ilocked
- */
-static void
-qwakeup_iunlock(Queue *q)
-{
-	int dowakeup = 0;
-
-	/* if writer flow controlled, restart */
-	if((q->state & Qflow) && q->len < q->limit/2){
-		q->state &= ~Qflow;
-		dowakeup = 1;
-	}
-
-	iunlock(q);
-
-	/* wakeup flow controlled writers */
-	if(dowakeup){
-		if(q->kick != nil)
-			q->kick(q->arg);
-		wakeup(&q->wr);
-	}
-}
-
-/*
  *  get next block from a queue (up to a limit)
  */
 Block*
@@ -922,6 +827,8 @@
 	Block *b;
 	int n;
 
+	assert(len >= 0);
+
 	eqlock(&q->rlock);
 	if(waserror()){
 		qunlock(&q->rlock);
@@ -954,10 +861,8 @@
 		else
 			b->wp -= n;
 	}
+	iunlock_reader(q);
 
-	/* restart producer */
-	qwakeup_iunlock(q);
-
 	qunlock(&q->rlock);
 	poperror();
 
@@ -974,6 +879,8 @@
 	Block *b, *first, **last;
 	int m, n;
 
+	assert(len >= 0);
+
 	eqlock(&q->rlock);
 	if(waserror()){
 		qunlock(&q->rlock);
@@ -1005,8 +912,8 @@
 			freeb(qremove(q));
 			goto again;
 		}
-
-		/*  grab the first block plus as many
+		/*
+		 *  grab the first block plus as many
 		 *  following blocks as will partially
 		 *  fit in the read.
 		 */
@@ -1029,8 +936,7 @@
 	if(n > len && (q->state & Qmsg) == 0)
 		qputback(q, splitblock(last, n - len));
 
-	/* restart producer */
-	qwakeup_iunlock(q);
+	iunlock_reader(q);
 
 	qunlock(&q->rlock);
 	poperror();
@@ -1046,34 +952,39 @@
 	return n;
 }
 
+/*
+ *  a Flow represens a flow controlled
+ *  writer on queue q with position p.
+ */
+typedef struct {
+	Queue*	q;
+	uint	p;
+} Flow;
+
 static int
-qnotfull(void *a)
+unblocked(void *a)
 {
-	Queue *q = a;
+	Flow *f = a;
+	Queue *q = f->q;
 
-	return q->len < q->limit || (q->state & Qclosed);
+	return q->noblock || (int)(f->p - q->rp) <= q->limit || (q->state & Qclosed);
 }
 
 /*
- *  flow control, wait for queue to get below the limit
+ *  flow control, wait for queue to drain back to the limit
  */
 static void
-qflow(Queue *q)
+qflow(Flow *f)
 {
-	for(;;){
-		if(q->noblock || qnotfull(q))
-			break;
+	Queue *q = f->q;
 
-		ilock(q);
-		q->state |= Qflow;
-		iunlock(q);
-
+	while(!unblocked(f)){
 		eqlock(&q->wlock);
 		if(waserror()){
 			qunlock(&q->wlock);
 			nexterror();
 		}
-		sleep(&q->wr, qnotfull, q);
+		sleep(&q->wr, unblocked, f);
 		qunlock(&q->wlock);
 		poperror();
 	}
@@ -1085,8 +996,8 @@
 long
 qbwrite(Queue *q, Block *b)
 {
-	int len, dowakeup;
-	Proc *p;
+	Flow flow;
+	int len;
 
 	if(q->bypass != nil){
 		len = blocklen(b);
@@ -1094,7 +1005,6 @@
 		return len;
 	}
 
-	dowakeup = 0;
 	if(waserror()){
 		freeblist(b);
 		nexterror();
@@ -1106,9 +1016,11 @@
 		iunlock(q);
 		error(q->err);
 	}
-
-	/* don't queue over the limit */
-	if(q->len >= q->limit && q->noblock){
+	/*
+	 * if the queue is full,
+	 * silently discard when non-blocking
+	 */
+	if(q->state & Qflow && q->noblock){
 		iunlock(q);
 		poperror();
 		len = blocklen(b);
@@ -1115,43 +1027,48 @@
 		freeblist(b);
 		return len;
 	}
-
 	len = qaddlist(q, b);
-
-	/* make sure other end gets awakened */
-	if(q->state & Qstarve){
-		q->state &= ~Qstarve;
-		dowakeup = 1;
-	}
-	iunlock(q);
 	poperror();
 
-	/*  get output going again */
-	if(q->kick != nil && (dowakeup || (q->state&Qkick)))
-		q->kick(q->arg);
-
-	/* wakeup anyone consuming at the other end */
-	if(dowakeup){
-		p = wakeup(&q->rr);
-
-		/* if we just wokeup a higher priority process, let it run */
-		if(p != nil && p->priority > up->priority)
-			sched();
-	}
-
 	/*
-	 *  flow control, before allowing the process to continue and
-	 *  queue more. We do this here so that postnote can only
-	 *  interrupt us after the data has been queued.  This means that
-	 *  things like 9p flushes and ssl messages will not be disrupted
-	 *  by software interrupts.
+	 * save our current position in queue
+	 * for flow control below.
 	 */
-	qflow(q);
+	flow.q = q;
+	flow.p = q->wp;
+	if(iunlock_writer(q) & Qflow){
+		/*
+		 *  flow control, before allowing the process to continue and
+		 *  queue more. We do this here so that postnote can only
+		 *  interrupt us after the data has been queued.  This means that
+		 *  things like 9p flushes and ssl messages will not be disrupted
+		 *  by software interrupts.
+		 */
+		qflow(&flow);
+	}
 
 	return len;
 }
 
 /*
+ *  block here uninterruptable until queue drains.
+ */
+static void
+qbloated(Queue *q)
+{
+	Flow flow;
+
+	flow.q = q;
+	flow.p = q->wp;
+	while(waserror()){
+		if(up->procctl == Proc_exitme || up->procctl == Proc_exitbig)
+			error(Egreg);
+	}
+	qflow(&flow);
+	poperror();
+}
+
+/*
  *  write to a queue.  only Maxatomic bytes at a time is atomic.
  */
 int
@@ -1161,8 +1078,11 @@
 	Block *b;
 	uchar *p = vp;
 
+	assert(len >= 0);
+
 	QDEBUG if(!islo())
 		print("qwrite hi %#p\n", getcallerpc(&q));
+
 	/*
 	 * when the queue length grew over twice the limit,
 	 * block here before allocating more blocks.
@@ -1170,14 +1090,8 @@
 	 * interrupted by notes, preventing effective
 	 * flow control.
 	 */
-	if(q->len/2 >= q->limit && q->noblock == 0 && q->bypass == nil){
-		while(waserror()){
-			if(up->procctl == Proc_exitme || up->procctl == Proc_exitbig)
-				error(Egreg);
-		}
-		qflow(q);
-		poperror();
-	}
+	if(q->state & Qflow && (int)(q->wp - q->rp)/2 > q->limit)
+		qbloated(q);
 
 	sofar = 0;
 	do {
@@ -1207,11 +1121,11 @@
 int
 qiwrite(Queue *q, void *vp, int len)
 {
-	int n, sofar, dowakeup;
+	int n, sofar;
 	Block *b;
 	uchar *p = vp;
 
-	dowakeup = 0;
+	assert(len >= 0);
 
 	sofar = 0;
 	do {
@@ -1226,43 +1140,72 @@
 		b->wp += n;
 
 		ilock(q);
-
-		if((q->state & Qclosed) != 0 || q->len >= q->limit){
+		if(q->state & (Qflow|Qclosed)){
 			iunlock(q);
 			freeb(b);
 			break;
 		}
+		sofar += qaddlist(q, b);
+		iunlock_writer(q);
+	} while(sofar < len && (q->state & Qmsg) == 0);
 
-		qaddlist(q, b);
+	return sofar;
+}
 
-		if(q->state & Qstarve){
-			q->state &= ~Qstarve;
-			dowakeup = 1;
-		}
+/*
+ *  throw away the next 'len' bytes in the queue
+ */
+int
+qdiscard(Queue *q, int len)
+{
+	Block *b, *tofree = nil;
+	int n, sofar;
 
-		iunlock(q);
+	assert(len >= 0);
 
-		if(dowakeup){
-			if(q->kick != nil)
-				q->kick(q->arg);
-			wakeup(&q->rr);
+	ilock(q);
+	for(sofar = 0; sofar < len; sofar += n){
+		b = q->bfirst;
+		if(b == nil)
+			break;
+		QDEBUG checkb(b, "qdiscard");
+		n = BLEN(b);
+		if(n <= len - sofar){
+			q->bfirst = b->next;
+			q->rp += BALLOC(b);
+
+			/* remember to free this */
+			b->next = tofree;
+			tofree = b;
+		} else {
+			n = len - sofar;
+			b->rp += n;
 		}
+		q->dlen -= n;
+	}
+	iunlock_reader(q);
 
-		sofar += n;
-	} while(sofar < len && (q->state & Qmsg) == 0);
+	freeblist(tofree);
 
 	return sofar;
 }
 
 /*
- *  be extremely careful when calling this,
- *  as there is no reference accounting
+ *  flush the output queue
  */
 void
-qfree(Queue *q)
+qflush(Queue *q)
 {
-	qclose(q);
-	free(q);
+	Block *tofree;
+
+	ilock(q);
+	tofree = q->bfirst;
+	q->bfirst = nil;
+	q->rp = q->wp;
+	q->dlen = 0;
+	iunlock_reader(q);
+
+	freeblist(tofree);
 }
 
 /*
@@ -1272,32 +1215,42 @@
 void
 qclose(Queue *q)
 {
-	Block *bfirst;
+	Block *tofree;
 
 	if(q == nil)
 		return;
 
-	/* mark it */
 	ilock(q);
 	q->state |= Qclosed;
 	q->state &= ~(Qflow|Qstarve);
 	kstrcpy(q->err, Ehungup, ERRMAX);
-	bfirst = q->bfirst;
+	tofree = q->bfirst;
 	q->bfirst = nil;
-	q->len = 0;
+	q->rp = q->wp;
 	q->dlen = 0;
 	q->noblock = 0;
 	iunlock(q);
 
-	/* free queued blocks */
-	freeblist(bfirst);
-
 	/* wake up readers/writers */
 	wakeup(&q->rr);
 	wakeup(&q->wr);
+
+	/* free queued blocks */
+	freeblist(tofree);
 }
 
 /*
+ *  be extremely careful when calling this,
+ *  as there is no reference accounting
+ */
+void
+qfree(Queue *q)
+{
+	qclose(q);
+	free(q);
+}
+
+/*
  *  Mark a queue as closed.  Wakeup any readers.  Don't remove queued
  *  blocks.
  */
@@ -1304,7 +1257,6 @@
 void
 qhangup(Queue *q, char *msg)
 {
-	/* mark it */
 	ilock(q);
 	q->state |= Qclosed;
 	if(msg == nil || *msg == '\0')
@@ -1350,26 +1302,21 @@
 }
 
 /*
- * return space remaining before flow control
+ *  return true if we can read without blocking
  */
 int
-qwindow(Queue *q)
+qcanread(Queue *q)
 {
-	int l;
-
-	l = q->limit - q->len;
-	if(l < 0)
-		l = 0;
-	return l;
+	return q->bfirst != nil;
 }
 
 /*
- *  return true if we can read without blocking
+ *  return non-zero when the queue is full
  */
 int
-qcanread(Queue *q)
+qfull(Queue *q)
 {
-	return q->bfirst != nil;
+	return q->state & Qflow;
 }
 
 /*
@@ -1378,7 +1325,11 @@
 void
 qsetlimit(Queue *q, int limit)
 {
+	assert(limit >= 0);
+
+	ilock(q);
 	q->limit = limit;
+	iunlock_consumer(q);
 }
 
 /*
@@ -1387,34 +1338,7 @@
 void
 qnoblock(Queue *q, int onoff)
 {
-	q->noblock = onoff;
-}
-
-/*
- *  flush the output queue
- */
-void
-qflush(Queue *q)
-{
-	Block *bfirst;
-
-	/* mark it */
 	ilock(q);
-	bfirst = q->bfirst;
-	q->bfirst = nil;
-	q->len = 0;
-	q->dlen = 0;
-	iunlock(q);
-
-	/* free queued blocks */
-	freeblist(bfirst);
-
-	/* wake up writers */
-	wakeup(&q->wr);
-}
-
-int
-qfull(Queue *q)
-{
-	return q->state & Qflow;
+	q->noblock = onoff;
+	iunlock_consumer(q);
 }