code: mafs

ref: b3ccded71dc0330ba63fdb7d7a0e6fc6ded1657e
dir: /writer.c/

View raw version
#include "all.h"

/* Only 1 writer to maintain the sequence of writes.
   Else, the directory entry could get written before the directory
   content by another write process.
   This creates a mess to recover on a crash.
 */

/* below is from nemo's Pg 252 */
typedef	struct	Dirties	Dirties;
typedef	struct	Wbuf	Wbuf;

struct Dirties
{
	QLock lck;			/* controls access to this queue */
	Wbuf *head, *tail;	/* linked list of dirty blocks yet to be written to the disk */
	s32 n;
	Rendez isfull;		/* write throttling */
	Rendez isempty; 	/* writer does not have to keep polling to find work */
} drts = {0};

struct Wbuf
{
	u64	blkno;		/* block number on the disk, primary key */
	Wbuf *prev, *next;
	Iobuf *iobuf;	/* pointer to the used Iobuf in the buffer cache */
	union{
		u8	*payload;	/* "real" contents */
		Content *io;	/* cast'able to contents */
	};
};

u64 npendingwrites;		/* write throttling */
u8 stopwrites = 0;
static void stats(void);

static Wbuf *
pluck(Wbuf *b)
{
	if(b == nil)
		return nil;
	else if(b->prev == nil && b->next == nil){
		/* only one */
		drts.head = drts.tail = nil;
		goto Done;
	}else if(b->prev == nil){
		/* first in the linked list */
		drts.head = b->next;
		b->next = nil;
		drts.head->prev = nil;
		goto Done;
	}else if(b->prev != nil && b->next != nil){
		/* somewhere in the middle */
		b->next->prev = b->prev;
		b->prev->next = b->next;
		b->prev = b->next = nil;
		goto Done;
	}else if(b->next == nil){
		/* last in the linked list */
		drts.tail = b->prev;
		b->prev->next = nil;
		b->prev = nil;
		goto Done;
	}
	panic("pluck should not be here\n");
	return nil; // too late, b was written already
Done:
	drts.n--;
	if(drts.n < 0)
		panic("drts.n < 0\n");
	return b;
}

Wbuf *
getwrite(void)
{
	Wbuf *b;

	qlock(&drts.lck);
	if(drts.n == 0){
		if(stopwrites){
			qunlock(&drts.lck);
			return nil;
		}
		rsleep(&drts.isempty);
		if(drts.n == 0 && stopwrites){
			qunlock(&drts.lck);
			return nil;
		}
	}
	/* using canwlock() here as getbuf() could
		have wlock()'ed the Iobuf too */
	if(drts.head != nil){
		b = pluck(drts.head);
		if(drts.n == npendingwrites-1)
			rwakeup(&drts.isfull);
		if(chatty9p > 4 && b!=nil){
			dprint("getwrite done b->blkno %llud\n", b->blkno);
		stats();
		}
	}else
		b = nil;
	qunlock(&drts.lck);
	return b;
}

/* the Iobuf should be wlock()'ed at entry */
void
putwrite(Iobuf *b)
{
	Wbuf *w;

	w = emalloc9p(sizeof(Wbuf));
	w->blkno = b->blkno;
	w->payload = allocmemunit();
	memcpy(w->payload, b->xiobuf, Rawblocksize);
	incref(&b->dirties);
	w->iobuf = b;
	if(chkwunlock(b) == 0){
		showbuf(b);
		panic("putwrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&b));
	}

	qlock(&drts.lck);
	if(chatty9p > 4){
		dprint("putwrite start p->blkno %llud\n", b->blkno);
		stats();
	}
	if(drts.n == npendingwrites)
		rsleep(&drts.isfull);
	if(drts.head == nil){
		drts.head = drts.tail = w;
	}else{
		drts.tail->next = w;
		w->prev = drts.tail;
		drts.tail = w;
	}
	drts.n++;
	if(drts.n == 1)
		rwakeup(&drts.isempty);
	if(chatty9p > 4 && b!=nil){
		dprint("putwrite done b->blkno %llud\n", b->blkno);
		stats();
	}
	qunlock(&drts.lck);
}

/*
	dirties is decremented without a wlock() on the buffer in dowrite().
	Using a wlock() in dowrite() deadlocks with putwrite().
	getbuf() guarantees that even a free'ed block cannot be
	stolen until the dirties == 0. This avoids dirty blocks
	being stolen by other block numbers.
	incref(dirties) only happens with a wlock() in putwrite().
 */
void
dowrite(Wbuf *p)
{
	u64 n;

	if(chatty9p > 4){
		dprint("dowrite p->blkno %llud\n", p->blkno);
		stats();
	}
	if(chatty9p > 4)
		dprint("dowrite p->blkno %llud locked\n", p->blkno);
	if((n = devwrite(p->blkno, p->payload)) != Rawblocksize){
		dprint("%s\n", errstring[Esystem]);
		panic("error writing block %llud: %llud bytes: %r\n",
				p->blkno, n);
	}
	n = p->blkno;
	if(chatty9p > 4)
	dprint("dowrite %llud wunlock()'ed\n", n);
	decref(&p->iobuf->dirties);
	freememunit(p->payload);
	free(p);
}

void
initwriter(void)
{
	Wbuf *b;
	char name[Namelen];

	// set the locks used by the Rendezes
	drts.isempty.l = &drts.lck;
	drts.isfull.l = &drts.lck;

	switch(rfork(RFPROC|RFMEM)){
	case -1:
		panic("can't fork");
	case 0:
		if(chatty9p > 4)
		dprint("writer started\n");
		break;
	default:
		return;
	}
	procname = name;
	snprint(name, Namelen, "%s writer", service);
	procsetname(name);
	while(stopwrites == 0 || drts.n > 0){
		b=getwrite();
		if(b != nil)
			dowrite(b);
	}
	if(chatty9p > 4)
	dprint("%s process exited\n", name);
	exits(nil);
}

void
stopwriter(void)
{
	u64 n;

	stopwrites = 1;
	do{
		qlock(&drts.lck);
		if(chatty9p > 4)
		dprint("stopwriter drts.n %d\n", drts.n);
		if(drts.n == 0)
			rwakeup(&drts.isempty);
		n = drts.n;
		qunlock(&drts.lck);
		if(n == 0)
			return;
		else
			sleep(1000);
	}while(n > 0);
}

static void
stats(void)
{
	dprint("dirties nwrites %d hd %llud tl %llud\n",
			drts.n,
			drts.head == nil ? 0 : drts.head->blkno,
			drts.tail == nil ? 0 : drts.tail->blkno);
}

u64
pendingwrites(void)
{
	u64 n;

	qlock(&drts.lck);
	n = drts.n;
	if(chatty9p>4)
		stats();
	qunlock(&drts.lck);
	return n;
}

void
sync(void)
{
	while(drts.n > 0)
		sleep(1000);
}