ref: d21f86bba4333494e3e772f031a5ac81f0f47d0f
dir: /sys/src/cmd/venti/srv/lumpqueue.c/
#include "stdinc.h"
#include "dat.h"
#include "fns.h"
typedef struct LumpQueue	LumpQueue;
typedef struct WLump		WLump;
enum
{
	MaxLumpQ	= 1 << 3	/* max. lumps on a single write queue, must be pow 2 */
};
struct WLump
{
	Lump	*u;
	Packet	*p;
	int	creator;
	int	gen;
	uint	ms;
};
struct LumpQueue
{
	QLock	lock;
	Rendez 	flush;
	Rendez	full;
	Rendez	empty;
	WLump	q[MaxLumpQ];
	int	w;
	int	r;
};
static LumpQueue	*lumpqs;
static int		nqs;
static QLock		glk;
static int		gen;
static void	queueproc(void *vq);
int
initlumpqueues(int nq)
{
	LumpQueue *q;
	int i;
	nqs = nq;
	lumpqs = MKNZ(LumpQueue, nq);
	for(i = 0; i < nq; i++){
		q = &lumpqs[i];
		q->full.l = &q->lock;
		q->empty.l = &q->lock;
		q->flush.l = &q->lock;
		if(vtproc(queueproc, q) < 0){
			seterr(EOk, "can't start write queue slave: %r");
			return -1;
		}
	}
	return 0;
}
/*
 * queue a lump & it's packet data for writing
 */
int
queuewrite(Lump *u, Packet *p, int creator, uint ms)
{
	LumpQueue *q;
	int i;
	trace(TraceProc, "queuewrite");
	i = indexsect(mainindex, u->score);
	if(i < 0 || i >= nqs){
		seterr(EBug, "internal error: illegal index section in queuewrite");
		return -1;
	}
	q = &lumpqs[i];
	qlock(&q->lock);
	while(q->r == ((q->w + 1) & (MaxLumpQ - 1))){
		trace(TraceProc, "queuewrite sleep");
		rsleep(&q->full);
	}
	q->q[q->w].u = u;
	q->q[q->w].p = p;
	q->q[q->w].creator = creator;
	q->q[q->w].ms = ms;
	q->q[q->w].gen = gen;
	q->w = (q->w + 1) & (MaxLumpQ - 1);
	trace(TraceProc, "queuewrite wakeup");
	rwakeup(&q->empty);
	qunlock(&q->lock);
	return 0;
}
void
flushqueue(void)
{
	int i;
	LumpQueue *q;
	if(!lumpqs)
		return;
	trace(TraceProc, "flushqueue");
	qlock(&glk);
	gen++;
	qunlock(&glk);
	for(i=0; i<mainindex->nsects; i++){
		q = &lumpqs[i];
		qlock(&q->lock);
		while(q->w != q->r && gen - q->q[q->r].gen > 0){
			trace(TraceProc, "flushqueue sleep q%d", i);
			rsleep(&q->flush);
		}
		qunlock(&q->lock);
	}
}
	
static void
queueproc(void *vq)
{
	LumpQueue *q;
	Lump *u;
	Packet *p;
	int creator;
	uint ms;
	threadsetname("queueproc");
	q = vq;
	for(;;){
		qlock(&q->lock);
		while(q->w == q->r){
			trace(TraceProc, "queueproc sleep empty");
			rsleep(&q->empty);
		}
		u = q->q[q->r].u;
		p = q->q[q->r].p;
		creator = q->q[q->r].creator;
		ms = q->q[q->r].ms;
		q->r = (q->r + 1) & (MaxLumpQ - 1);
		trace(TraceProc, "queueproc wakeup flush");
		rwakeupall(&q->flush);
		trace(TraceProc, "queueproc wakeup full");
		rwakeup(&q->full);
		qunlock(&q->lock);
		trace(TraceProc, "queueproc writelump %V", u->score);
		if(writeqlump(u, p, creator, ms) < 0)
			fprint(2, "failed to write lump for %V: %r", u->score);
		trace(TraceProc, "queueproc wrotelump %V", u->score);
		putlump(u);
	}
}