code: mafs

ref: 5f15f2246d0fdac0fa98a0a910d40827edfaf68f
dir: /writer.c/

View raw version
#include "all.h"

/* Only 1 writer to maintain the sequence of writes.
   Else, the directory entry could get written before the directory
   content by another write process.
   This creates a mess to recover on a crash.
 */

/* below is from nemo's Pg 252 */
typedef	struct	Dirties	Dirties;
typedef	struct	Wbuf	Wbuf;

struct Dirties
{
	QLock lck;			/* controls access to this queue */
	Wbuf *head, *tail;	/* linked list of dirty blocks yet to be written to the disk */
	s32 n;			/* number of dirty blocks in this linked list */
	Rendez isfull;		/* write throttling */
	Rendez isempty; 	/* writer does not have to keep polling to find work */
} drts = {0};

struct Wbuf
{
	u64	blkno;		/* block number on the disk, primary key */
	Wbuf *prev, *next;	/* writer queue */
	Iobuf *iobuf;	/* pointer to the used Iobuf in the buffer cache */
	union{
		u8	*payload;	/* "real" contents */
		Content *io;	/* cast'able to contents */
	};
};

u64 npendingwrites;		/* write throttling */
u8 stopwrites = 0;
static void stats(void);

static Wbuf *
pluck(Wbuf *b)
{
	if(b == nil)
		return nil;
	else if(b->prev == nil && b->next == nil){
		/* only one */
		drts.head = drts.tail = nil;
		goto Done;
	}else if(b->prev == nil){
		/* first in the linked list */
		drts.head = b->next;
		b->next = nil;
		drts.head->prev = nil;
		goto Done;
	}else if(b->prev != nil && b->next != nil){
		/* somewhere in the middle */
		b->next->prev = b->prev;
		b->prev->next = b->next;
		b->prev = b->next = nil;
		goto Done;
	}else if(b->next == nil){
		/* last in the linked list */
		drts.tail = b->prev;
		b->prev->next = nil;
		b->prev = nil;
		goto Done;
	}
	panic("pluck should not be here\n");
	return nil; // too late, b was written already
Done:
	drts.n--;
	if(drts.n < 0)
		panic("drts.n < 0\n");
	return b;
}

/*
	the Iobuf should be wlock()'ed at entry and until it
	is placed in the writer queue.
	It is unlocked after it is placed in writer queue to
	avoid another process putting it in the writer queue
	before us. This ensures that the write order
	is maintained and a newer write being overwritten
	by older write.
 */
void
putwrite(Iobuf *b)
{
	Wbuf *w;

	if(chatty9p > 4){
		dprint("putwrite start p->blkno %llud\n", b->blkno);
		stats();
	}
	w = emalloc9p(sizeof(Wbuf));
	w->blkno = b->blkno;
	w->payload = allocmemunit();
	memcpy(w->payload, b->xiobuf, Rawblocksize);
	incref(&b->dirties);
	w->iobuf = b;

	qlock(&drts.lck);
	if(drts.n >= npendingwrites)
		rsleep(&drts.isfull);
	if(chkwunlock(b) == 0){
		showbuf(b);
		panic("putwrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&b));
	}
	if(drts.head == nil){
		drts.head = drts.tail = w;
	}else{
		drts.tail->next = w;
		w->prev = drts.tail;
		drts.tail = w;
	}
	drts.n++;
	if(drts.n == 1)
		rwakeup(&drts.isempty);
	qunlock(&drts.lck);
	if(chatty9p > 4 && b!=nil){
		dprint("putwrite done b->blkno %llud\n", b->blkno);
		stats();
	}
}
void
putwrites(Iobuf **bs, u64 len)
{
	Wbuf *w, *ws[Ntogether];
	u8 empty;
	u64 i;
	Iobuf *b;

	if(len == 0)
		return;
	empty = 0;
	for(i = 0; i < len; i++){
		b = bs[i];
		ws[i] = w = emalloc9p(sizeof(Wbuf));
		w->blkno = b->blkno;
		w->payload = allocmemunit();
		memcpy(w->payload, b->xiobuf, Rawblocksize);
		incref(&b->dirties);
		w->iobuf = b;
	}
	qlock(&drts.lck);
	if(drts.n >= npendingwrites)
		rsleep(&drts.isfull);
	if(drts.n == 0)
		empty = 1;
	for(i = 0; i < len; i++){
		if(chkwunlock(bs[i]) == 0){
			showbuf(bs[i]);
			panic("putwrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&bs));
		}
		if(drts.head == nil){
			drts.head = drts.tail = ws[i];
		}else{
			drts.tail->next = ws[i];
			ws[i]->prev = drts.tail;
			drts.tail = ws[i];
		}
		drts.n++;
	}
	if(empty)
		rwakeup(&drts.isempty);
	qunlock(&drts.lck);
}

/*
	dirties is decremented without a wlock() on the buffer in dowrite().
	Using a wlock() in dowrite() deadlocks with putwrite().
	getbuf() guarantees that even a free'ed block cannot be
	stolen until the dirties == 0. This avoids dirty blocks
	being stolen by other block numbers.
	incref(dirties) only happens with a wlock() in putwrite().
 */
void
dowrite(void)
{
	Wbuf *b, *blks[Ntogether];
	u64 prevblkno, startblkno, n, wn, i;
	u8 full, *jumbo;

	full = 0;
	qlock(&drts.lck);
	if(drts.n == 0){
		if(stopwrites){
			qunlock(&drts.lck);
			return;
		}
		rsleep(&drts.isempty);
		if(drts.n == 0 && stopwrites){
			qunlock(&drts.lck);
			return;
		}
	}
	// dprint("dowrite: drts.n %llud\n", drts.n);
	if(drts.head == nil){
		qunlock(&drts.lck);
		return;
	}

	if(drts.n >= npendingwrites)
		full = 1;
	if(drts.n > 1){
		/* trying to write consecutive blocks with a write() call */
		n = 1;
		prevblkno = startblkno = drts.head->blkno;
		for(b = drts.head->next;
				n <= drts.n && b != nil && b->blkno == prevblkno+1 && n < Ntogether;
				b = b->next){
			prevblkno=b->blkno;
			n++;
		}
		if(n > 1){
			for(i = 0; i < n; i++)
				blks[i] = pluck(drts.head);
			if(full && drts.n < npendingwrites)
				rwakeup(&drts.isfull);
			// dprint("dowrite: at return drts.n %llud\n", drts.n);
			qunlock(&drts.lck);

			if(chatty9p > 4){
				if(b != nil)
				dprint("getwrite jumbo drts.n %llud > 1 n %llud start %llud next %llud\n",
						drts.n, n, startblkno, b->blkno);
				else
				dprint("getwrite jumbo drts.n %llud > 1 n %llud start %llud\n",
						drts.n, n, startblkno);
			}
			jumbo = emalloc9p(n*Rawblocksize);
			for(i = 0; i < n; i++){
				b = blks[i];
				memcpy(jumbo+(i*Rawblocksize), b->payload, Rawblocksize);
				if(chatty9p > 4 && b!=nil)
					dprint("getwrite b->blkno %llud\n", b->blkno);
			}
			if((wn = devwrites(startblkno, jumbo, n)) != n*Rawblocksize){
				dprint("%s\n", errstring[Esystem]);
				panic("error writing jumbo block %llud: %llud bytes, written %llud: %r\n",
						startblkno, n, wn);
			}
			free(jumbo);
			for(i = 0; i < n; i++){
				b = blks[i];
				if(chatty9p > 4)
					dprint("dowrite %llud wunlock()'ed\n", b->blkno);
				decref(&b->iobuf->dirties);
				freememunit(b->payload);
				free(b);
				blks[i] = 0;
			}
			return;
		}else
			goto single;
	}else{
single:
		b = pluck(drts.head);
		if(full && drts.n < npendingwrites)
			rwakeup(&drts.isfull);
		qunlock(&drts.lck);

		if(chatty9p > 4 && b!=nil)
			dprint("getwrite done b->blkno %llud\n", b->blkno);
		if((n = devwrite(b->blkno, b->payload)) != Rawblocksize){
			dprint("%s\n", errstring[Esystem]);
			panic("error writing block %llud: %llud bytes: %r\n",
					b->blkno, n);
		}
		if(chatty9p > 4)
			dprint("dowrite %llud wunlock()'ed\n", b->blkno);
		decref(&b->iobuf->dirties);
		freememunit(b->payload);
		free(b);
	}
	if(chatty9p > 4 && b!=nil)
		stats();
}

void
initwriter(void)
{
	char name[Namelen];

	// set the locks used by the Rendezes
	drts.isempty.l = &drts.lck;
	drts.isfull.l = &drts.lck;

	switch(rfork(RFPROC|RFMEM)){
	case -1:
		panic("can't fork");
	case 0:
		if(chatty9p > 4)
		dprint("writer started\n");
		break;
	default:
		return;
	}
	procname = name;
	snprint(name, Namelen, "%s writer", service);
	procsetname(name);
	while(stopwrites == 0 || drts.n > 0){
		dowrite();
	}
	if(chatty9p > 4)
	dprint("%s process exited\n", name);
	exits(nil);
}

void
stopwriter(void)
{
	u64 n;

	stopwrites = 1;
	do{
		qlock(&drts.lck);
		if(chatty9p > 4)
		dprint("stopwriter drts.n %d\n", drts.n);
		if(drts.n == 0)
			rwakeup(&drts.isempty);
		n = drts.n;
		qunlock(&drts.lck);
		if(n == 0)
			return;
		else
			sleep(1000);
	}while(n > 0);
}

static void
stats(void)
{
	dprint("dirties nwrites %d hd %llud tl %llud\n",
			drts.n,
			drts.head == nil ? 0 : drts.head->blkno,
			drts.tail == nil ? 0 : drts.tail->blkno);
}

u64
pendingwrites(void)
{
	u64 n;

	qlock(&drts.lck);
	n = drts.n;
	if(chatty9p>4)
		stats();
	qunlock(&drts.lck);
	return n;
}

void
sync(void)
{
	while(drts.n > 0)
		sleep(1000);
}