code: mafs

Download patch

ref: 5076a214bf64da3c52f49e242003d91a17940308
parent: 8ba26cf58ea5df0dbd29149ed04794fff0c8bf87
author: 9ferno <gophone2015@gmail.com>
date: Tue Oct 18 18:25:25 EDT 2022

added a separate writer process

--- a/9p.c
+++ b/9p.c
@@ -856,7 +856,7 @@
 	a = 0;
 	for(i = 0; i<Nprocs; i++){
 		if(worker[i].w.f == nil){
-			if(worker[i].pid > 0){
+			if(worker[i].pid > 0 && worker[i].pid != getpid()){
 				// rwakeup(&buf.isempty); TODO why can't I get this to work?
 				postnote(PNPROC, worker[i].pid, "interrupt");
 				worker[i].pid = 0;
@@ -888,7 +888,7 @@
 void
 shutdown(void)
 {
-	int i, a, b;
+	int i, a;
 /*	User *u, *v; */
 	char srvfilename[Namelen];
 
@@ -903,9 +903,9 @@
 //	rwakeupall(&buf.isempty);
 	for(i = 0; i< 10; i++){
 		a = stopworkers();
-		b = stopwriters();
-		sleep((a>b?a:b)*1000);
+		sleep(a*1000);
 	}
+	stopwriter();
 	if(a > 1){ /* 1 for this running process */
 		dprint("%d processes still running\n", a);
 		for(i = 0; i<Nprocs; i++){
--- a/all.h
+++ b/all.h
@@ -78,13 +78,15 @@
 Iobuf*	getbufchk(u64 blkno, u16 len, u8 readonly, int tag, u64 qpath);
 void	iobufinit(void);
 void	putbuf(Iobuf *p);
+void	putbuffree(Iobuf *p);
 void	settag(Iobuf *p, int tag, u64 qpath);
 void	showbuf(Iobuf *p);
 
 /* writer functions */
-void	initwriters(u8 nws);
+void	initwriter(void);
 void	putwrite(Iobuf *b);
-int	stopwriters(void);
+Iobuf *rmwrite(Iobuf *b);
+void	stopwriter(void);
 
 /* routines to manipulate the contents */
 Iobuf*	allocblocks(u16 len, int tag, u64 qpath);
--- a/iobuf.c
+++ b/iobuf.c
@@ -105,6 +105,8 @@
 				wlock(p);
 				if(chatty9p > 4)
 					dprint("	after wlock() blkno %llud\n", blkno);
+				if(p->dirty)
+					panic("p->dirty with p->len != len\n");
 				free(p->xiobuf);
 				p->xiobuf = emalloc9p(len*Rawblocksize);
 				p->len = len;
@@ -126,6 +128,13 @@
 				wlock(p);
 				if(chatty9p > 4)
 					dprint("	after wlock() blkno %llud\n", blkno);
+				/* as we lock from top down, this should ensure that the
+					lower blocks in the hierarchy are removed from the write
+					queue only after the top blocks have been removed or erased.
+				 */
+				if(p->dirty)
+					rmwrite(p); /* remove the pending write as it will be
+									written by putbuf() anyway */
 			}
 			decref(p);
 			return p;
@@ -140,9 +149,14 @@
 		Ncollisions is a soft limit.
 	 */
 	if(ncollisions >= Ncollisions){
+Another:
 		do{
 			p = s->back;
 			if(p->ref == 0 && p->dirty == 0 && canwlock(p)){
+				if(p->dirty){
+					wunlock(p);
+					goto Another;
+				}
 				if(p->len != len){
 					free(p->xiobuf);
 					p->xiobuf = emalloc9p(len*Rawblocksize);
@@ -301,6 +315,50 @@
 		}else if(srcbno == config.root.srcbno){
 			for(i=0; i<Nbkp; i++)
 				bkp(srcbno, buf, config.root.dest[i], Qproot0+i*3);
+		}
+	}
+}
+
+/* only caller is freeblockbuf().
+	These blocks do not need to be written to the disk.
+	Hence, avoiding putwrite().
+ */
+void
+putbuffree(Iobuf *p)
+{
+	if(p == nil){
+		panic("putbuffree p == nil called by %#p\n", getcallerpc(&p));
+		dprint("%s\n", errstring[Ephase]);
+		return;
+	}
+	if(p->io == nil){
+		showbuf(p);
+		panic("putbuffree p->io == nil by %#p\n", getcallerpc(&p));
+		dprint("%s\n", errstring[Ephase]);
+		return;
+	}
+	if(p->len == 0){
+		showbuf(p);
+		panic("putbuffree p->len == 0 by %#p\n", getcallerpc(&p));
+		dprint("%s\n", errstring[Ephase]);
+		return;
+	}
+
+	if(chatty9p > 4)
+		dprint("putbuffree p->blkno %llud\n", p->blkno);
+
+	if(p->readers){
+		chkrunlock(p);
+		if(chatty9p > 4)
+		dprint(" .. runlock()'ed\n");
+	}else{
+		if(canwlock(p)){
+			panic("putbuffree: buffer not locked %llud\n", p->blkno);
+		}
+		p->dirty = 0;
+		if(chkwunlock(p) == 0){
+			showbuf(p);
+			panic("putbuffree chkwunlock(p) == 0 called by %#p\n", getcallerpc(&p));
 		}
 	}
 }
--- a/mafs.c
+++ b/mafs.c
@@ -42,7 +42,7 @@
 	static char *nets[8];
 	int doream, stdio, netc;
 	char buf[Namelen];
-	int pid, ctl, nwriters;
+	int pid, ctl;
 
 	progname = "mafs";
 	procname = "init";
@@ -55,7 +55,6 @@
 	rfork(RFNAMEG|RFNOTEG|RFREND);
 
 	nbuckets = Nbuckets;
-	nwriters = Nwriters;
 	sfd = -1;
 	doream = stdio = netc = 0;
 
@@ -70,7 +69,6 @@
 	case 'D':	chatty9p++; break;
 	case 'f':	devfile = ARGF(); break;
 	case 'h':	nbuckets = atoi(EARGF(usage())); break;
-	case 'w':	nwriters = atoi(EARGF(usage())); break;
 	case 'r':
 		doream = 1;
 		/* fall through */
@@ -106,7 +104,7 @@
 	formatinit();
 
 	tlocks = emalloc9p(NTLOCK * sizeof *tlocks);
-	initwriters(nwriters);
+	initwriter();
 	iobufinit();
 
 	/*
--- a/sub.c
+++ b/sub.c
@@ -124,7 +124,7 @@
 	/* clear the buf to avoid leaks on reuse */
 	memset(buf->io, 0, buf->len*Rawblocksize);
 	bfree(&frees, buf->blkno, buf->len);
-	putbuf(buf);
+	putbuffree(buf);
 }
 
 /* add the block to the extents used to manage free blocks */
--- a/writer.c
+++ b/writer.c
@@ -5,45 +5,46 @@
 	Npendingwrites	= 32,
 };
 
+/* Only 1 writer to maintain the sequence of writes.
+   Else, the directory entry could get written before the directory
+   content by another write process.
+   This creates a mess to recover on a crash.
+ */
+
 /* below is from nemo's Pg 252 */
 typedef	struct	Dirties	Dirties;
-typedef	struct	Writer	Writer;
 
 struct Dirties
 {
 	QLock lck;
 	Iobuf *head, *tail;
-	u32 n;
+	s32 n;
 	Rendez isempty;
-} drts;
+} drts = {0};
 
-struct Writer
-{
-	u32 pid;
-	u64 blkno;
-};	/* keeps track of running procs to flush */
+u8 stopwrites = 0;
 
-Writer *writers; /* array of writer proc pids */
-u16 nwriters;
-
 static Iobuf *
 pluck(Iobuf *b)
 {
-	if(b->prevdirty != nil && b->nextdirty != nil){
-		/* somewhere in the middle */
-		b->nextdirty->prevdirty = b->prevdirty;
-		b->prevdirty->nextdirty = b->nextdirty;
-		b->prevdirty = b->nextdirty = nil;
-		goto Done;
-	}else if(b->prevdirty == nil && b->nextdirty == nil){
-		drts.head = drts.tail = nil;
+	if(b == nil)
+		return nil;
+	else if(b->prevdirty == nil && b->nextdirty == nil){
 		/* only one */
+		drts.head = drts.tail = nil;
 		goto Done;
 	}else if(b->prevdirty == nil){
 		/* first in the linked list */
 		drts.head = b->nextdirty;
 		b->nextdirty = nil;
+		drts.head->prevdirty = nil;
 		goto Done;
+	}else if(b->prevdirty != nil && b->nextdirty != nil){
+		/* somewhere in the middle */
+		b->nextdirty->prevdirty = b->prevdirty;
+		b->prevdirty->nextdirty = b->nextdirty;
+		b->prevdirty = b->nextdirty = nil;
+		goto Done;
 	}else if(b->nextdirty == nil){
 		/* last in the linked list */
 		drts.tail = b->prevdirty;
@@ -51,9 +52,12 @@
 		b->prevdirty = nil;
 		goto Done;
 	}
-	panic("pluck: blkno %llud should not be here\n", b->blkno);
+	panic("pluck should not be here\n");
+	return nil; // too late, b was written already
 Done:
 	drts.n--;
+	if(drts.n < 0)
+		panic("drts.n < 0\n");
 	return b;
 }
 
@@ -61,10 +65,12 @@
 Iobuf *
 rmwrite(Iobuf *b)
 {
+	Iobuf *p;
+
 	qlock(&drts.lck);
-	pluck(b);
+	p = pluck(b);
 	qunlock(&drts.lck);
-	return b;
+	return p;
 }
 
 Iobuf *
@@ -73,23 +79,18 @@
 	Iobuf *b;
 
 	qlock(&drts.lck);
-Again:
 	if(drts.n == 0){
+		if(stopwrites){
+			qunlock(&drts.lck);
+			return nil;
+		}
 		rsleep(&drts.isempty);
-		if(shuttingdown)
+		if(drts.n == 0 && stopwrites){
+			qunlock(&drts.lck);
 			return nil;
-	}
-	for(b=drts.head; b->nextdirty != nil; b=b->nextdirty){
-		if(canwlock(b)){
-			pluck(b);
-			break;
-		}else{
-			dprint("getwrite: could not lock block %llud\n", b->blkno);
-			continue;
 		}
 	}
-	if(b == nil)
-		goto Again; /* only when the putbuf() is still adding to dirties */
+	b = pluck(drts.head);
 	qunlock(&drts.lck);
 	return b;
 }
@@ -100,29 +101,30 @@
 void
 putwrite(Iobuf *b)
 {
+	if(chatty9p > 4)
+		dprint("putwrite p->blkno %llud\n", b->blkno);
 	qlock(&drts.lck);
 	if(drts.n > Npendingwrites){
+		if(drts.n < 0)
+			panic("putwrite drts.n < 0\n");
 		stats();
 	}
 	b->dirty = 1;
 	if(drts.head == nil){
 		drts.head = drts.tail = b;
-		rwakeup(&drts.isempty);
 	}else{
 		drts.tail->nextdirty = b;
 		b->prevdirty = drts.tail;
 		drts.tail = b;
 	}
-	drts.n++;
+	if(drts.head == drts.tail)
+		rwakeup(&drts.isempty);
 	if(chkwunlock(b) == 0){
 		showbuf(b);
-		panic("putbuf chkwunlock(p) == 0 called by %#p\n", getcallerpc(&b));
+		panic("putwrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&b));
 	}
-	if(chatty9p > 4)
-		dprint(" .. wunlock()'ed\n");
+	drts.n++;
 	qunlock(&drts.lck);
-	if(chatty9p > 1)
-		stats();
 }
 
 void
@@ -130,9 +132,11 @@
 {
 	u64 n;
 
-	if(canwlock(p)){
-		panic("putbuf: buffer not locked %llud\n", p->blkno);
-	}
+	if(chatty9p > 4)
+		dprint("dowrite p->blkno %llud\n", p->blkno);
+	wlock(p);
+	if(chatty9p > 4)
+		dprint("dowrite p->blkno %llud locked\n", p->blkno);
 	p->io->dirty = 1;
 	if((n = devwrite(p->blkno, p->io, p->len)) != p->len*Rawblocksize){
 		dprint("%s\n", errstring[Esystem]);
@@ -142,90 +146,74 @@
 	p->io->dirty = 0;
 	devwritedirtyclear(p->blkno);
 	p->dirty = 0;
+	n = p->blkno;
 	if(chkwunlock(p) == 0){
 		showbuf(p);
-		panic("putbuf chkwunlock(p) == 0 called by %#p\n", getcallerpc(&p));
+		panic("dowrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&p));
 	}
 	if(chatty9p > 4)
-	dprint(" .. wunlock()'ed\n");
+	dprint("dowrite %llud wunlock()'ed\n", n);
 }
 
 void
-startwriter(int id)
+initwriter(void)
 {
-	char name[16];
 	Iobuf *b;
+	char name[Namelen];
 
-	switch(rfork(RFPROC|RFMEM|RFFDG)){
+	// set the locks used by the Rendezes
+	drts.isempty.l = &drts.lck;
+
+	switch(rfork(RFPROC|RFMEM)){
 	case -1:
 		panic("can't fork");
 	case 0:
-		if(chatty9p > 1)
-		dprint("child %d pid: %d\n", id, getpid());
+		if(chatty9p > 4)
+		dprint("writer started\n");
 		break;
 	default:
 		return;
 	}
-	writers[id].pid = id;
 	procname = name;
-	snprint(name, 16, "mafs writer %d", id);
+	snprint(name, Namelen, "%s writer", service);
 	procsetname(name);
-	while((b=getwrite()) != nil)
-		dowrite(b);
+	while(stopwrites == 0 || drts.n > 0){
+		b=getwrite();
+		if(b != nil)
+			dowrite(b);
+	}
+	if(chatty9p > 4)
 	dprint("%s process exited\n", name);
 	_exits(nil);
 }
 
 void
-initwriters(u8 nws)
+stopwriter(void)
 {
-	u8 i;
+	u64 n;
 
-	nwriters = nws;
-	// release all locks, set everything to null values
-	memset(&drts, 0, sizeof(drts));
-	writers = malloc(sizeof(Writer)*nwriters);
-
-	// set the locks used by the Rendezes
-	drts.isempty.l = &drts.lck;
-
-	for(i = 0; i < nwriters; i++)
-		startwriter(i);
+	stopwrites = 1;
+	do{
+		qlock(&drts.lck);
+		if(chatty9p > 4)
+		dprint("stopwriter drts.n %d\n", drts.n);
+		if(drts.n == 0)
+			rwakeup(&drts.isempty);
+		n = drts.n;
+		qunlock(&drts.lck);
+		if(n == 0)
+			return;
+		else
+			sleep(1000);
+	}while(n > 0);
 }
 
-int
-stopwriters(void)
-{
-	int i, a;
-
-	a = 0;
-	for(i = 0; i<nwriters; i++){
-		if(writers[i].blkno == 0){
-			// rwakeup(&buf.isempty); TODO why can't I get this to work?
-			postnote(PNPROC, writers[i].pid, "interrupt");
-			writers[i].pid = 0;
-		}else
-			a++;
-	}
-	dprint("%d processes still writing, %llud writes still pending\n", a, drts.n);
-	return a;
-}
-
 static void
 stats(void)
 {
-	int n, w, inv, i;
-
-	n = w = inv = 0;
-	for(i = 0; i<Nprocs; i++){
-		if(writers[i].pid == 0)
-			inv++;
-		else if(writers[i].blkno == 0)
-			n++;
-		else if(writers[i].blkno > 0)
-			w++;
-	}
-	dprint("Nprocs %d inv %d idle %d working %d dirties nworks %d hd %p tl %p\n",
-			nwriters, inv, n, w, drts.n, drts.head, drts.tail);
+	dprint("dirties nwrites %d hd %llud tl %llud\n",
+			drts.n,
+			drts.head == nil ? 0 : drts.head->blkno,
+			drts.tail == nil ? 0 : drts.tail->blkno);
 }