ref: 157fdba88b1a0fa00c7c0f044483fe860e7f9533
dir: /sys/src/9/port/qio.c/
#include "u.h"
#include "../port/lib.h"
#include "mem.h"
#include "dat.h"
#include "fns.h"
#include "../port/error.h"
#define QDEBUG if(0)
/*
* IO queues
*/
typedef struct Queue Queue;
struct Queue
{
Lock;
int state;
int dlen; /* data length in bytes */
uint rp, wp; /* read/write position (counting BALLOC() bytes) */
int limit; /* max BALLOC() bytes in queue */
int inilim; /* initial limit */
uchar noblock; /* true if writes return immediately when q full */
uchar eof; /* number of eofs read by user */
Block* bfirst; /* buffer */
Block* blast;
void* arg; /* argument to kick and bypass */
void (*kick)(void*); /* restart output */
void (*bypass)(void*, Block*); /* bypass queue altogether */
QLock rlock; /* mutex for reading processes */
Rendez rr; /* process waiting to read */
QLock wlock; /* mutex for writing processes */
Rendez wr; /* process waiting to write */
char err[ERRMAX];
};
enum
{
Maxatomic = 64*1024,
};
uint qiomaxatomic = Maxatomic;
/*
* free a list of blocks
*/
void
freeblist(Block *b)
{
Block *next;
for(; b != nil; b = next){
next = b->next;
b->next = nil;
freeb(b);
}
}
/*
* return count of bytes in a string of blocks
*/
int
blocklen(Block *bp)
{
int len;
len = 0;
while(bp != nil) {
len += BLEN(bp);
bp = bp->next;
}
return len;
}
/*
* copy the contents of a string of blocks into
* memory from an offset. blocklist kept unchanged.
* return number of copied bytes.
*/
long
readblist(Block *b, uchar *p, long n, ulong o)
{
ulong m, r;
r = 0;
while(n > 0 && b != nil){
m = BLEN(b);
if(o >= m)
o -= m;
else {
m -= o;
if(n < m)
m = n;
memmove(p, b->rp + o, m);
p += m;
r += m;
n -= m;
o = 0;
}
b = b->next;
}
return r;
}
/*
* copy the string of blocks into
* a single block and free the string
*/
Block*
concatblock(Block *bp)
{
int len;
if(bp->next == nil)
return bp;
len = blocklen(bp);
return pullupblock(bp, len);
}
/*
* make sure the first block has at least n bytes
*/
Block*
pullupblock(Block *bp, int n)
{
Block *nbp;
int i;
assert(n >= 0);
/*
* this should almost always be true, it's
* just to avoid every caller checking.
*/
if(BLEN(bp) >= n)
return bp;
/*
* if not enough room in the first block,
* add another to the front of the list.
*/
if(bp->lim - bp->rp < n){
nbp = allocb(n);
nbp->next = bp;
bp = nbp;
}
/*
* copy bytes from the trailing blocks into the first
*/
n -= BLEN(bp);
while((nbp = bp->next) != nil){
i = BLEN(nbp);
if(i > n) {
memmove(bp->wp, nbp->rp, n);
bp->wp += n;
nbp->rp += n;
QDEBUG checkb(bp, "pullupblock 1");
return bp;
} else {
/* shouldn't happen but why crash if it does */
if(i < 0){
print("pullup negative length packet, called from %#p\n",
getcallerpc(&bp));
i = 0;
}
memmove(bp->wp, nbp->rp, i);
bp->wp += i;
bp->next = nbp->next;
nbp->next = nil;
freeb(nbp);
n -= i;
if(n == 0){
QDEBUG checkb(bp, "pullupblock 2");
return bp;
}
}
}
freeb(bp);
return nil;
}
/*
* make sure the first block has at least n bytes
*/
Block*
pullupqueue(Queue *q, int n)
{
Block *b;
assert(n >= 0);
if(BLEN(q->bfirst) >= n)
return q->bfirst;
q->bfirst = pullupblock(q->bfirst, n);
for(b = q->bfirst; b != nil && b->next != nil; b = b->next)
;
q->blast = b;
return q->bfirst;
}
/*
* trim to len bytes starting at offset
*/
Block *
trimblock(Block *bp, int offset, int len)
{
ulong l;
Block *nb, *startb;
assert(len >= 0);
assert(offset >= 0);
QDEBUG checkb(bp, "trimblock 1");
l = blocklen(bp);
if(offset == 0 && len == l)
return bp;
if(l < offset+len) {
freeblist(bp);
return nil;
}
while((l = BLEN(bp)) < offset) {
offset -= l;
nb = bp->next;
bp->next = nil;
freeb(bp);
bp = nb;
}
startb = bp;
bp->rp += offset;
while((l = BLEN(bp)) < len) {
len -= l;
bp = bp->next;
}
bp->wp -= (BLEN(bp) - len);
if(bp->next != nil) {
freeblist(bp->next);
bp->next = nil;
}
return startb;
}
/*
* pad a block to the front (or the back if size is negative)
*/
Block*
padblock(Block *bp, int size)
{
int n;
Block *nbp;
QDEBUG checkb(bp, "padblock 0");
if(size >= 0){
if(bp->rp - bp->base >= size){
bp->rp -= size;
return bp;
}
n = BLEN(bp);
nbp = allocb(size+n);
nbp->rp += size;
nbp->wp = nbp->rp;
memmove(nbp->wp, bp->rp, n);
nbp->wp += n;
nbp->rp -= size;
} else {
size = -size;
if(bp->lim - bp->wp >= size)
return bp;
n = BLEN(bp);
nbp = allocb(n+size);
memmove(nbp->wp, bp->rp, n);
nbp->wp += n;
}
nbp->next = bp->next;
freeb(bp);
QDEBUG checkb(nbp, "padblock 1");
return nbp;
}
/*
* copy 'count' bytes into a new block
*/
Block*
copyblock(Block *bp, int count)
{
int l;
Block *nbp;
assert(count >= 0);
QDEBUG checkb(bp, "copyblock 0");
if(bp->pool == nil
|| count > bp->pool->size
|| (nbp = iallocbp(bp->pool)) == nil)
nbp = allocb(count);
nbp->flag |= bp->flag & ~(BINTR|BFREE);
for(; count > 0 && bp != nil; bp = bp->next){
l = BLEN(bp);
if(l > count)
l = count;
memmove(nbp->wp, bp->rp, l);
nbp->wp += l;
count -= l;
}
if(count > 0){
memset(nbp->wp, 0, count);
nbp->wp += count;
}
QDEBUG checkb(nbp, "copyblock 1");
return nbp;
}
Block*
adjustblock(Block* bp, int len)
{
int n;
Block *nbp;
if(len < 0){
freeb(bp);
return nil;
}
if(bp->rp+len > bp->lim){
nbp = copyblock(bp, len);
freeblist(bp);
QDEBUG checkb(nbp, "adjustblock 1");
return nbp;
}
n = BLEN(bp);
if(len > n)
memset(bp->wp, 0, len-n);
bp->wp = bp->rp+len;
QDEBUG checkb(bp, "adjustblock 2");
return bp;
}
/*
* if the allocated space is way out of line with the used
* space, reallocate to a smaller block
*/
Block*
packblock(Block *bp)
{
Block **l, *nbp;
int n;
for(l = &bp; (nbp = *l) != nil; l = &(*l)->next){
n = BLEN(nbp);
if((n<<2) < BALLOC(nbp)){
*l = allocb(n);
memmove((*l)->wp, nbp->rp, n);
(*l)->wp += n;
(*l)->next = nbp->next;
freeb(nbp);
}
}
return bp;
}
/*
* throw away up to count bytes from a
* list of blocks. Return count of bytes
* thrown away.
*/
int
pullblock(Block **bph, int count)
{
Block *bp;
int n, bytes;
bytes = 0;
if(bph == nil)
return 0;
while((bp = *bph) != nil && count > 0) {
QDEBUG checkb(bp, "pullblock ");
n = BLEN(bp);
if(count < n)
n = count;
bytes += n;
count -= n;
bp->rp += n;
if(BLEN(bp) == 0) {
*bph = bp->next;
bp->next = nil;
freeb(bp);
}
}
return bytes;
}
/*
* remove a block from the front of the queue
*/
Block*
qremove(Queue *q)
{
Block *b;
b = q->bfirst;
if(b == nil)
return nil;
QDEBUG checkb(b, "qremove");
q->bfirst = b->next;
b->next = nil;
q->dlen -= BLEN(b);
q->rp += BALLOC(b);
return b;
}
/*
* put a block back to the front of the queue
*/
void
qputback(Queue *q, Block *b)
{
QDEBUG checkb(b, "qputback");
b->next = q->bfirst;
if(q->bfirst == nil)
q->blast = b;
q->bfirst = b;
q->dlen += BLEN(b);
q->rp -= BALLOC(b);
}
/*
* after removing data from the queue,
* unlock queue and wakeup blocked writer.
* called at interrupt level.
*/
static int
iunlock_consumer(Queue *q)
{
int s = q->state;
/* stop flow control when back at or below the limit */
if((int)(q->wp - q->rp) <= q->limit)
q->state = s & ~Qflow;
iunlock(q);
if(s & Qflow){
/*
* wakeup flow controlled writers.
* note that this is done even when q->state
* still has Qflow set, as the unblocking
* condition depends on the writers local queuing
* position, not on the global queue length.
*/
wakeup(&q->wr);
}
return s;
}
/*
* after removing data from the queue,
* unlock queue and wakeup blocked writer.
* get output going again when it was blocked.
* called at process level.
*/
static int
iunlock_reader(Queue *q)
{
int s = iunlock_consumer(q);
if(q->kick != nil && s & Qflow)
(*q->kick)(q->arg);
return s;
}
/*
* after inserting into queue,
* unlock queue and wakeup starved reader.
* called at interrupt level.
*/
static int
iunlock_producer(Queue *q)
{
int s = q->state;
/* start flow control when above the limit */
if((int)(q->wp - q->rp) > q->limit)
s |= Qflow;
q->state = s & ~Qstarve;
iunlock(q);
if(s & Qstarve){
Proc *p = wakeup(&q->rr);
/* if we just wokeup a higher priority process, let it run */
if(p != nil && up != nil && p->priority > up->priority && islo())
sched();
}
return s;
}
/*
* unlock queue and wakeup starved reader.
* get output going again when it was starved.
* called at process level.
*/
static int
iunlock_writer(Queue *q)
{
int s = iunlock_producer(q);
if(q->kick != nil && s & (Qstarve|Qkick))
(*q->kick)(q->arg);
return s;
}
/*
* get next block from a queue, return null if nothing there
* called at interrupt level.
*/
Block*
qget(Queue *q)
{
Block *b;
ilock(q);
if((b = qremove(q)) == nil){
q->state |= Qstarve;
iunlock(q);
return nil;
}
iunlock_consumer(q);
return b;
}
/*
* Interrupt level copy out of a queue, return # bytes copied.
*/
int
qconsume(Queue *q, void *vp, int len)
{
Block *b, *tofree = nil;
int n;
assert(len >= 0);
ilock(q);
for(;;) {
b = q->bfirst;
if(b == nil){
q->state |= Qstarve;
len = -1;
goto out;
}
QDEBUG checkb(b, "qconsume 1");
n = BLEN(b);
if(n > 0)
break;
/* get rid of zero-length blocks */
q->bfirst = b->next;
q->rp += BALLOC(b);
/* remember to free this */
b->next = tofree;
tofree = b;
};
if(n < len)
len = n;
memmove(vp, b->rp, len);
b->rp += len;
q->dlen -= len;
/* discard the block if we're done with it */
if((q->state & Qmsg) || len == n){
q->bfirst = b->next;
q->rp += BALLOC(b);
q->dlen -= BLEN(b);
/* remember to free this */
b->next = tofree;
tofree = b;
}
out:
iunlock_consumer(q);
freeblist(tofree);
return len;
}
/*
* add a block list to a queue, return bytes added
*/
int
qaddlist(Queue *q, Block *b)
{
int len;
QDEBUG checkb(b, "qaddlist 1");
/* queue the block */
if(q->bfirst != nil)
q->blast->next = b;
else
q->bfirst = b;
len = BLEN(b);
q->wp += BALLOC(b);
while(b->next != nil){
b = b->next;
QDEBUG checkb(b, "qaddlist 2");
len += BLEN(b);
q->wp += BALLOC(b);
}
q->dlen += len;
q->blast = b;
return len;
}
int
qpass(Queue *q, Block *b)
{
int len;
ilock(q);
if(q->state & Qclosed){
iunlock(q);
freeblist(b);
return 0;
}
if(q->state & Qflow){
iunlock(q);
freeblist(b);
return -1;
}
len = qaddlist(q, b);
iunlock_producer(q);
return len;
}
int
qpassnolim(Queue *q, Block *b)
{
int len;
ilock(q);
if(q->state & Qclosed){
iunlock(q);
freeblist(b);
return 0;
}
len = qaddlist(q, b);
iunlock_producer(q);
return len;
}
int
qproduce(Queue *q, void *vp, int len)
{
Block *b;
assert(len >= 0);
b = iallocb(len);
if(b == nil)
return 0;
/* save in buffer */
memmove(b->wp, vp, len);
b->wp += len;
return qpass(q, b);
}
/*
* copy from offset in the queue
*/
Block*
qcopy(Queue *q, int len, ulong offset)
{
Block *b;
assert(len >= 0);
b = allocb(len);
ilock(q);
b->wp += readblist(q->bfirst, b->wp, len, offset);
iunlock(q);
return b;
}
/*
* called by non-interrupt code
*/
Queue*
qopen(int limit, int msg, void (*kick)(void*), void *arg)
{
Queue *q;
assert(limit >= 0);
q = malloc(sizeof(Queue));
if(q == nil)
return nil;
q->dlen = 0;
q->wp = q->rp = 0;
q->limit = q->inilim = limit;
q->kick = kick;
q->arg = arg;
q->state = msg | Qstarve;
q->eof = 0;
q->noblock = 0;
return q;
}
/* open a queue to be bypassed */
Queue*
qbypass(void (*bypass)(void*, Block*), void *arg)
{
Queue *q;
q = malloc(sizeof(Queue));
if(q == nil)
return nil;
q->dlen = 0;
q->wp = q->rp = 0;
q->limit = 0;
q->arg = arg;
q->bypass = bypass;
q->state = 0;
q->eof = 0;
q->noblock = 0;
return q;
}
static int
notempty(void *a)
{
Queue *q = a;
return q->bfirst != nil || (q->state & Qclosed);
}
/*
* wait for the queue to be non-empty or closed.
* called with q ilocked.
*/
static int
qwait(Queue *q)
{
/* wait for data */
for(;;){
if(q->bfirst != nil)
break;
if(q->state & Qclosed){
if(q->eof >= 3 || *q->err && strcmp(q->err, Ehungup) != 0)
return -1;
q->eof++;
return 0;
}
q->state |= Qstarve; /* flag requesting producer to wake me */
iunlock(q);
sleep(&q->rr, notempty, q);
ilock(q);
}
return 1;
}
/*
* cut off n bytes from the end of *h. return a new
* block with the tail and change *h to refer to the
* head.
*/
static Block*
splitblock(Block **h, int n)
{
Block *a, *b;
int m;
a = *h;
m = BLEN(a) - n;
if(m < n){
b = allocb(m);
memmove(b->wp, a->rp, m);
b->wp += m;
a->rp += m;
*h = b;
return a;
} else {
b = allocb(n);
a->wp -= n;
memmove(b->wp, a->wp, n);
b->wp += n;
return b;
}
}
/*
* get next block from a queue (up to a limit)
*/
Block*
qbread(Queue *q, int len)
{
Block *b;
int n;
assert(len >= 0);
eqlock(&q->rlock);
if(waserror()){
qunlock(&q->rlock);
nexterror();
}
ilock(q);
switch(qwait(q)){
case 0:
/* queue closed */
iunlock(q);
qunlock(&q->rlock);
poperror();
return nil;
case -1:
/* multiple reads on a closed queue */
iunlock(q);
error(q->err);
}
/* if we get here, there's at least one block in the queue */
b = qremove(q);
n = BLEN(b);
/* split block if it's too big and this is not a message queue */
if(n > len){
n -= len;
if((q->state & Qmsg) == 0)
qputback(q, splitblock(&b, n));
else
b->wp -= n;
}
iunlock_reader(q);
qunlock(&q->rlock);
poperror();
return b;
}
/*
* read a queue. if no data is queued, post a Block
* and wait on its Rendez.
*/
long
qread(Queue *q, void *vp, int len)
{
Block *b, *first, **last;
int m, n;
assert(len >= 0);
eqlock(&q->rlock);
if(waserror()){
qunlock(&q->rlock);
nexterror();
}
ilock(q);
again:
switch(qwait(q)){
case 0:
/* queue closed */
iunlock(q);
qunlock(&q->rlock);
poperror();
return 0;
case -1:
/* multiple reads on a closed queue */
iunlock(q);
error(q->err);
}
/* if we get here, there's at least one block in the queue */
last = &first;
if(q->state & Qcoalesce){
/* when coalescing, 0 length blocks just go away */
b = q->bfirst;
m = BLEN(b);
if(m <= 0){
freeb(qremove(q));
goto again;
}
/*
* grab the first block plus as many
* following blocks as will partially
* fit in the read.
*/
n = 0;
for(;;) {
*last = qremove(q);
n += m;
if(n >= len || q->bfirst == nil)
break;
last = &b->next;
b = q->bfirst;
m = BLEN(b);
}
} else {
first = qremove(q);
n = BLEN(first);
}
/* split last block if it's too big and this is not a message queue */
if(n > len && (q->state & Qmsg) == 0)
qputback(q, splitblock(last, n - len));
iunlock_reader(q);
qunlock(&q->rlock);
poperror();
if(waserror()){
freeblist(first);
nexterror();
}
n = readblist(first, vp, len, 0);
freeblist(first);
poperror();
return n;
}
/*
* a Flow represens a flow controlled
* writer on queue q with position p.
*/
typedef struct {
Queue* q;
uint p;
} Flow;
static int
unblocked(void *a)
{
Flow *f = a;
Queue *q = f->q;
return q->noblock || (int)(f->p - q->rp) <= q->limit || (q->state & Qclosed);
}
/*
* flow control, wait for queue to drain back to the limit
*/
static void
qflow(Flow *f)
{
Queue *q = f->q;
while(!unblocked(f)){
eqlock(&q->wlock);
if(waserror()){
qunlock(&q->wlock);
nexterror();
}
sleep(&q->wr, unblocked, f);
qunlock(&q->wlock);
poperror();
}
}
/*
* add a block to a queue obeying flow control
*/
long
qbwrite(Queue *q, Block *b)
{
Flow flow;
int len;
if(q->bypass != nil){
len = blocklen(b);
(*q->bypass)(q->arg, b);
return len;
}
if(waserror()){
freeblist(b);
nexterror();
}
ilock(q);
/* give up if the queue is closed */
if(q->state & Qclosed){
iunlock(q);
error(q->err);
}
/*
* if the queue is full,
* silently discard when non-blocking
*/
if(q->state & Qflow && q->noblock){
iunlock(q);
poperror();
len = blocklen(b);
freeblist(b);
return len;
}
len = qaddlist(q, b);
poperror();
/*
* save our current position in queue
* for flow control below.
*/
flow.q = q;
flow.p = q->wp;
if(iunlock_writer(q) & Qflow){
/*
* flow control, before allowing the process to continue and
* queue more. We do this here so that postnote can only
* interrupt us after the data has been queued. This means that
* things like 9p flushes and ssl messages will not be disrupted
* by software interrupts.
*/
qflow(&flow);
}
return len;
}
/*
* block here uninterruptable until queue drains.
*/
static void
qbloated(Queue *q)
{
Flow flow;
flow.q = q;
flow.p = q->wp;
while(waserror()){
if(up->procctl == Proc_exitme || up->procctl == Proc_exitbig)
error(Egreg);
}
qflow(&flow);
poperror();
}
/*
* write to a queue. only Maxatomic bytes at a time is atomic.
*/
int
qwrite(Queue *q, void *vp, int len)
{
int n, sofar;
Block *b;
uchar *p = vp;
assert(len >= 0);
QDEBUG if(!islo())
print("qwrite hi %#p\n", getcallerpc(&q));
/*
* when the queue length grew over twice the limit,
* block here before allocating more blocks.
* this can happen when qflow() is getting
* interrupted by notes, preventing effective
* flow control.
*/
if(q->state & Qflow && (int)(q->wp - q->rp)/2 > q->limit)
qbloated(q);
sofar = 0;
do {
n = len-sofar;
if(n > Maxatomic)
n = Maxatomic;
b = allocb(n);
if(waserror()){
freeb(b);
nexterror();
}
memmove(b->wp, p+sofar, n);
poperror();
b->wp += n;
sofar += qbwrite(q, b);
} while(sofar < len && (q->state & Qmsg) == 0);
return len;
}
/*
* used by print() to write to a queue. Since we may be splhi or not in
* a process, don't qlock.
*/
int
qiwrite(Queue *q, void *vp, int len)
{
int n, sofar;
Block *b;
uchar *p = vp;
assert(len >= 0);
sofar = 0;
do {
n = len-sofar;
if(n > Maxatomic)
n = Maxatomic;
b = iallocb(n);
if(b == nil)
break;
memmove(b->wp, p+sofar, n);
b->wp += n;
ilock(q);
if(q->state & (Qflow|Qclosed)){
iunlock(q);
freeb(b);
break;
}
sofar += qaddlist(q, b);
iunlock_writer(q);
} while(sofar < len && (q->state & Qmsg) == 0);
return sofar;
}
/*
* throw away the next 'len' bytes in the queue
*/
int
qdiscard(Queue *q, int len)
{
Block *b, *tofree = nil;
int n, sofar;
assert(len >= 0);
ilock(q);
for(sofar = 0; sofar < len; sofar += n){
b = q->bfirst;
if(b == nil)
break;
QDEBUG checkb(b, "qdiscard");
n = BLEN(b);
if(n <= len - sofar){
q->bfirst = b->next;
q->rp += BALLOC(b);
/* remember to free this */
b->next = tofree;
tofree = b;
} else {
n = len - sofar;
b->rp += n;
}
q->dlen -= n;
}
iunlock_consumer(q);
freeblist(tofree);
return sofar;
}
/*
* flush the output queue
*/
void
qflush(Queue *q)
{
Block *tofree;
ilock(q);
tofree = q->bfirst;
q->bfirst = nil;
q->rp = q->wp;
q->dlen = 0;
iunlock_consumer(q);
freeblist(tofree);
}
/*
* Mark a queue as closed. No further IO is permitted.
* All blocks are released.
*/
void
qclose(Queue *q)
{
Block *tofree;
if(q == nil)
return;
ilock(q);
q->state |= Qclosed;
q->state &= ~(Qflow|Qstarve);
kstrcpy(q->err, Ehungup, ERRMAX);
tofree = q->bfirst;
q->bfirst = nil;
q->rp = q->wp;
q->dlen = 0;
q->noblock = 0;
iunlock(q);
/* wake up readers/writers */
wakeup(&q->rr);
wakeup(&q->wr);
/* free queued blocks */
freeblist(tofree);
}
/*
* be extremely careful when calling this,
* as there is no reference accounting
*/
void
qfree(Queue *q)
{
qclose(q);
free(q);
}
/*
* Mark a queue as closed. Wakeup any readers. Don't remove queued
* blocks.
*/
void
qhangup(Queue *q, char *msg)
{
ilock(q);
q->state |= Qclosed;
if(msg == nil || *msg == '\0')
msg = Ehungup;
kstrcpy(q->err, msg, ERRMAX);
iunlock(q);
/* wake up readers/writers */
wakeup(&q->rr);
wakeup(&q->wr);
}
/*
* return non-zero if the q is hungup
*/
int
qisclosed(Queue *q)
{
return q->state & Qclosed;
}
/*
* mark a queue as no longer hung up
*/
void
qreopen(Queue *q)
{
ilock(q);
q->state &= ~Qclosed;
q->state |= Qstarve;
q->eof = 0;
q->limit = q->inilim;
iunlock(q);
}
/*
* return bytes queued
*/
int
qlen(Queue *q)
{
return q->dlen;
}
/*
* return true if we can read without blocking
*/
int
qcanread(Queue *q)
{
return q->bfirst != nil;
}
/*
* return non-zero when the queue is full
*/
int
qfull(Queue *q)
{
return q->state & Qflow;
}
/*
* change queue limit
*/
void
qsetlimit(Queue *q, int limit)
{
assert(limit >= 0);
ilock(q);
q->limit = limit;
iunlock_consumer(q);
}
/*
* set blocking/nonblocking
*/
void
qnoblock(Queue *q, int onoff)
{
ilock(q);
q->noblock = onoff;
iunlock_consumer(q);
}