code: mafs

ref: c947228f7cf50bbbeb3f1e1b1808280e9ae9247f
dir: /writer.c/

View raw version
#include "all.h"

enum
{
	Npendingwrites	= 32,
};

/* Only 1 writer to maintain the sequence of writes.
   Else, the directory entry could get written before the directory
   content by another write process.
   This creates a mess to recover on a crash.
 */

/* below is from nemo's Pg 252 */
typedef	struct	Dirties	Dirties;

struct Dirties
{
	QLock lck;
	Iobuf *head, *tail;
	s32 n;
	Rendez isempty;
} drts = {0};

u8 stopwrites = 0;
static void stats(void);

static Iobuf *
pluck(Iobuf *b)
{
	if(b == nil)
		return nil;
	else if(b->prevdirty == nil && b->nextdirty == nil){
		/* only one */
		drts.head = drts.tail = nil;
		goto Done;
	}else if(b->prevdirty == nil){
		/* first in the linked list */
		drts.head = b->nextdirty;
		b->nextdirty = nil;
		drts.head->prevdirty = nil;
		goto Done;
	}else if(b->prevdirty != nil && b->nextdirty != nil){
		/* somewhere in the middle */
		b->nextdirty->prevdirty = b->prevdirty;
		b->prevdirty->nextdirty = b->nextdirty;
		b->prevdirty = b->nextdirty = nil;
		goto Done;
	}else if(b->nextdirty == nil){
		/* last in the linked list */
		drts.tail = b->prevdirty;
		b->prevdirty->nextdirty = nil;
		b->prevdirty = nil;
		goto Done;
	}
	panic("pluck should not be here\n");
	return nil; // too late, b was written already
Done:
	drts.n--;
	if(drts.n < 0)
		panic("drts.n < 0\n");
	return b;
}

/* the Iobuf should be wlock()'ed at entry */
Iobuf *
rmwrite(Iobuf *b)
{
	Iobuf *p;

	qlock(&drts.lck);
	if(chatty9p > 4){
		dprint("rmwrite start b->blkno %llud\n", b->blkno);
		stats();
	}
	p = pluck(b);
	if(chatty9p > 4 && p!=nil){
		dprint("rmwrite removed p->blkno %llud\n", p->blkno);
		stats();
	}
	qunlock(&drts.lck);
	return p;
}

Iobuf *
getwrite(void)
{
	Iobuf *b;

	qlock(&drts.lck);
	if(drts.n == 0){
		if(stopwrites){
			qunlock(&drts.lck);
			return nil;
		}
		rsleep(&drts.isempty);
		if(drts.n == 0 && stopwrites){
			qunlock(&drts.lck);
			return nil;
		}
	}
	/* using canwlock() here as getbuf() could
		have wlock()'ed the Iobuf too */
	if(drts.head != nil && canwlock(drts.head))
		b = pluck(drts.head);
	else
		b = nil;
	if(chatty9p > 4 && b!=nil){
		dprint("getwrite done b->blkno %llud\n", b->blkno);
		stats();
	}
	qunlock(&drts.lck);
	return b;
}

/* the Iobuf should be wlock()'ed at entry */
void
putwrite(Iobuf *b)
{
	qlock(&drts.lck);
	if(chatty9p > 4){
		dprint("putwrite start p->blkno %llud\n", b->blkno);
		stats();
	}
	if(drts.n > Npendingwrites){
		if(drts.n < 0)
			panic("putwrite drts.n < 0\n");
		if(chatty9p > 4)
			stats();
	}
	b->dirty = 1;
	if(drts.head == nil){
		drts.head = drts.tail = b;
	}else{
		drts.tail->nextdirty = b;
		b->prevdirty = drts.tail;
		drts.tail = b;
	}
	drts.n++;
	if(drts.n == 1)
		rwakeup(&drts.isempty);
	if(chkwunlock(b) == 0){
		showbuf(b);
		panic("putwrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&b));
	}
	if(chatty9p > 4 && b!=nil){
		dprint("putwrite done b->blkno %llud\n", b->blkno);
		stats();
	}
	qunlock(&drts.lck);
}

void
dowrite(Iobuf *p)
{
	u64 n;

	if(chatty9p > 4){
		dprint("dowrite p->blkno %llud\n", p->blkno);
		stats();
	}
	if(chatty9p > 4)
		dprint("dowrite p->blkno %llud locked\n", p->blkno);
	p->io->dirty = 1;
	if((n = devwrite(p->blkno, p->io, p->len)) != p->len*Rawblocksize){
		dprint("%s\n", errstring[Esystem]);
		panic("error writing block %llud: %llud bytes: %r\n",
				p->blkno, n);
	}
	p->io->dirty = 0;
	devwritedirtyclear(p->blkno);
	p->dirty = 0;
	n = p->blkno;
	if(chkwunlock(p) == 0){
		showbuf(p);
		panic("dowrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&p));
	}
	if(chatty9p > 4)
	dprint("dowrite %llud wunlock()'ed\n", n);
}

void
initwriter(void)
{
	Iobuf *b;
	char name[Namelen];

	// set the locks used by the Rendezes
	drts.isempty.l = &drts.lck;

	switch(rfork(RFPROC|RFMEM)){
	case -1:
		panic("can't fork");
	case 0:
		if(chatty9p > 4)
		dprint("writer started\n");
		break;
	default:
		return;
	}
	procname = name;
	snprint(name, Namelen, "%s writer", service);
	procsetname(name);
	while(stopwrites == 0 || drts.n > 0){
		b=getwrite();
		if(b != nil)
			dowrite(b);
	}
	if(chatty9p > 4)
	dprint("%s process exited\n", name);
	_exits(nil);
}

void
stopwriter(void)
{
	u64 n;

	stopwrites = 1;
	do{
		qlock(&drts.lck);
		if(chatty9p > 4)
		dprint("stopwriter drts.n %d\n", drts.n);
		if(drts.n == 0)
			rwakeup(&drts.isempty);
		n = drts.n;
		qunlock(&drts.lck);
		if(n == 0)
			return;
		else
			sleep(1000);
	}while(n > 0);
}

static void
stats(void)
{
	dprint("dirties nwrites %d hd %llud tl %llud\n",
			drts.n,
			drts.head == nil ? 0 : drts.head->blkno,
			drts.tail == nil ? 0 : drts.tail->blkno);
}