code: plan9front

ref: 5622b0bbd878dbc34045cc6fd37cffa64461eabe
dir: /sys/src/cmd/webfs/buq.c/

View raw version
#include <u.h>
#include <libc.h>
#include <ctype.h>
#include <fcall.h>
#include <thread.h>
#include <9p.h>

#include "dat.h"
#include "fns.h"

static void
kickwqr(Buq *q, Req *r)
{
	Buf **bb, *b;
	int l;

	for(bb = &q->bh; q->nwq > 0; bb = &b->next){
		if((b = *bb) == nil)
			break;
		if(b->wreq == nil || (b->wreq != r && r != nil))
			continue;
		l = b->ep - b->rp;
		b = realloc(b, sizeof(*b) + l);
		*bb = b;
		if(b->next == nil)
			q->bt = &b->next;
		memmove(b->end, b->rp, l);
		b->rp = b->end;
		b->ep = b->rp + l;
		b->wreq->ofcall.count = b->wreq->ifcall.count;
		respond(b->wreq, q->error);
		b->wreq = nil;
		q->nwq--;
	}
}

static void
matchreq(Buq *q)
{
	Req *r;
	Buf *b;
	int l;

	while(r = q->rh){
		if((b = q->bh) == nil){
			if(q->closed){
				if((q->rh = r->aux) == nil)
					q->rt = &q->rh;
				if(r->ifcall.type == Tread)
					r->ofcall.count = 0;
				respond(r, q->error);
				continue;
			}
			break;
		}
		if((q->rh = r->aux) == nil)
			q->rt = &q->rh;
		if(r->ifcall.type == Topen){
			respond(r, nil);
			continue;
		}
		l = b->ep - b->rp;
		if(l > r->ifcall.count)
			l = r->ifcall.count;
		memmove(r->ofcall.data, b->rp, l);
		r->ofcall.count = l;
		respond(r, nil);
		b->rp += l;
		q->size -= l;
		if(b->rp >= b->ep){
			if((q->bh = b->next) == nil)
				q->bt = &q->bh;
			if(r = b->wreq){
				r->ofcall.count = r->ifcall.count;
				respond(r, nil);
				q->nwq--;
			}
			free(b);
		}
	}
	if(q->closed && q->nwq > 0)
		kickwqr(q, nil);
	rwakeupall(&q->rz);
}

int
buread(Buq *q, void *v, int l)
{
	Req *r;
	Buf *b;

	qlock(q);
	while((b = q->bh) == nil){
		if(q->closed){
			l = 0;
			if(q->error){
				werrstr("%s", q->error);
				l = -1;
			}
			qunlock(q);
			return l;
		}
		rsleep(&q->rz);
	}
	if(l > (b->ep - b->rp))
		l = b->ep - b->rp;
	memmove(v, b->rp, l);
	b->rp += l;
	q->size -= l;
	rwakeup(&q->rz);
	if(b->rp < b->ep){
		qunlock(q);
		return l;
	}
	if((q->bh = b->next) == nil)
		q->bt = &q->bh;
	qunlock(q);
	if(r = b->wreq){
		r->ofcall.count = r->ifcall.count;
		respond(r, nil);
	}
	free(b);
	return l;
}

int
buwrite(Buq *q, void *v, int l)
{
	Buf *b;

	b = emalloc(sizeof(*b) + l);
	b->wreq = nil;
	b->rp = b->end;
	b->ep = b->rp + l;
	memmove(b->rp, v, l);
	b->next = nil;
	qlock(q);
	if(q->closed){
		l = 0;
		if(q->error){
			werrstr("%s", q->error);
			l = -1;
		}
		qunlock(q);
		free(b);
		return l;
	}
	*q->bt = b;
	q->bt = &b->next;
	q->size += l;
	matchreq(q);
	while(!q->closed && q->size >= q->limit)
		rsleep(&q->rz);
	qunlock(q);
	return l;
}

void
buclose(Buq *q, char *error)
{
	if(q == nil)
		return;
	qlock(q);
	if(!q->closed){
		if(error)
			q->error = estrdup9p(error);
		q->closed = 1;
	}
	matchreq(q);
	qunlock(q);
}

Buq*
bualloc(int limit)
{
	Buq *q;

	q = emalloc(sizeof(*q));
	q->limit = limit;
	q->rt = &q->rh;
	q->bt = &q->bh;
	q->rz.l = q;
	incref(q);
	return q;
}

void
bufree(Buq *q)
{
	Buf *b;
	Key *k;

	if(q == nil || decref(q))
		return;
	while(b = q->bh){
		q->bh = b->next;
		free(b);
	}
	freeurl(q->url);
	while(k = q->hdr){
		q->hdr = k->next;
		free(k);
	}
	free(q->error);
	free(q);
}

void
bureq(Buq *q, Req *r)
{
	Buf *b;
	int l;

	switch(r->ifcall.type){
	default:
		respond(r, "bug in bureq");
		return;
	case Twrite:
		l = r->ifcall.count;
		if(!q->closed && (q->size + l) < q->limit){
			r->ofcall.count = buwrite(q, r->ifcall.data, r->ifcall.count);
			respond(r, nil);
			return;
		}
		b = emalloc(sizeof(*b));
		b->wreq = r;
		b->rp = (uchar*)r->ifcall.data;
		b->ep = b->rp + l;
		b->next = nil;
		qlock(q);
		*q->bt = b;
		q->bt = &b->next;
		q->size += l;
		q->nwq++;
		break;
	case Tread:
	case Topen:
		r->aux = nil;
		qlock(q);
		*q->rt = r;
		q->rt = (Req**)&r->aux;
		break;
	}
	matchreq(q);
	qunlock(q);
}

void
buflushreq(Buq *q, Req *r)
{
	Req **rr;

	switch(r->ifcall.type){
	default:
		respond(r, "bug in bufflushreq");
		return;
	case Twrite:
		qlock(q);
		kickwqr(q, r);
		break;
	case Topen:
	case Tread:
		qlock(q);
		for(rr = &q->rh; *rr; rr = (Req**)&((*rr)->aux)){
			if(*rr != r)
				continue;
			if((*rr = r->aux) == nil)
				q->rt = rr;
			respond(r, "interrupted");
			break;
		}
		break;
	}
	qunlock(q);
}