code: mafs

Download patch

ref: 8ba26cf58ea5df0dbd29149ed04794fff0c8bf87
parent: ff8a41e99aed12a29b676c4c2d161eeae5ed652e
author: 9ferno <gophone2015@gmail.com>
date: Tue Oct 18 15:22:25 EDT 2022

removed synchronous writes

--- a/9p.c
+++ b/9p.c
@@ -759,6 +759,25 @@
 }
 
 /* below is from nemo's Pg 252 */
+typedef	struct	Buffer	Buffer;
+typedef	struct	Work	Work;
+
+struct Work
+{
+	void (*f)(Req *r);
+	Req *r;
+};
+
+struct Buffer
+{
+	QLock lck;
+	Work works[Nworks];
+	u16 hd, tl, nworks;
+	Rendez isfull, isempty;
+};
+
+Buffer buf;
+
 Work
 get(Buffer *b)
 {
@@ -784,7 +803,7 @@
 	return w;
 }
 
-void stats(void);
+static void stats(void);
 void
 put(Buffer *b, void (*f)(Req *r), Req *r)
 {
@@ -813,7 +832,6 @@
 		stats();
 }
 
-Buffer buf;
 struct
 {
 	u32 pid;
@@ -849,7 +867,7 @@
 	return a;
 }
 
-void
+static void
 stats(void)
 {
 	int n, w, inv, i;
@@ -870,7 +888,7 @@
 void
 shutdown(void)
 {
-	int i, a;
+	int i, a, b;
 /*	User *u, *v; */
 	char srvfilename[Namelen];
 
@@ -885,7 +903,8 @@
 //	rwakeupall(&buf.isempty);
 	for(i = 0; i< 10; i++){
 		a = stopworkers();
-		sleep(a*1000);
+		b = stopwriters();
+		sleep((a>b?a:b)*1000);
 	}
 	if(a > 1){ /* 1 for this running process */
 		dprint("%d processes still running\n", a);
@@ -931,6 +950,7 @@
 		w = get(b);
 	}
 	worker[id].pid = 0;
+	if(chatty9p > 1)
 	dprint("worker %d exiting\n", id);
 	exits(nil);
 }
--- a/all.h
+++ b/all.h
@@ -10,25 +10,9 @@
 #include "dat.h"
 #include "extents.h"
 
-typedef	struct	Buffer	Buffer;
 typedef	struct	Hiob	Hiob;
 typedef	struct	Iobuf	Iobuf;
-typedef	struct	Work	Work;
 
-struct Work
-{
-	void (*f)(Req *r);
-	Req *r;
-};
-
-struct Buffer
-{
-	QLock lck;
-	Work works[Nworks];
-	u16 hd, tl, nworks;
-	Rendez isfull, isempty;
-};
-
 struct Hiob		/* Hash bucket */
 {
 	Iobuf*	link;	/* least recently used of the circular list */
@@ -78,6 +62,9 @@
 		u8	*xiobuf;	/* "real" buffer pointer */
 		Content *io;	/* cast'able to contents */
 	};
+
+	u8 dirty;		/* to identify buffers which are yet to be written */
+	Iobuf *prevdirty, *nextdirty;
 };
 
 extern	u32	nbuckets;		/* n hash buckets for i/o */
@@ -93,6 +80,11 @@
 void	putbuf(Iobuf *p);
 void	settag(Iobuf *p, int tag, u64 qpath);
 void	showbuf(Iobuf *p);
+
+/* writer functions */
+void	initwriters(u8 nws);
+void	putwrite(Iobuf *b);
+int	stopwriters(void);
 
 /* routines to manipulate the contents */
 Iobuf*	allocblocks(u16 len, int tag, u64 qpath);
--- a/dat.h
+++ b/dat.h
@@ -23,6 +23,7 @@
 	MiB	= KiB*KiB,	/* Mibibytes */
 	GiB	= KiB*MiB,	/* Gibibytes */
 	TiB	= KiB*GiB,	/* Tibibytes */
+	Nwriters = 10,	/* max. number of writer processes */
 
 	/*
 	https://cs.stackexchange.com/questions/11029/why-is-it-best-to-use-a-prime-number-as-a-mod-in-a-hashing-function
--- a/extents.c
+++ b/extents.c
@@ -364,7 +364,7 @@
 	return e;
 }
 
-u64
+static u64
 pluck(Extents *es, Extent *e)
 {
 	Extent *dlow, *dsmall, *fhigh, *fbig;
--- a/iobuf.c
+++ b/iobuf.c
@@ -1,7 +1,5 @@
-#include	"all.h"
+#include "all.h"
 
-#define	DEBUG	0
-
 u32  nbuckets = 0;	/* nbuckets derived from -m or Nbuckets */
 Hiob *hiob = nil;	/* array of nbuckets */
 Extents frees = {0};
@@ -144,7 +142,7 @@
 	if(ncollisions >= Ncollisions){
 		do{
 			p = s->back;
-			if(p->ref == 0 && canwlock(p)){
+			if(p->ref == 0 && p->dirty == 0 && canwlock(p)){
 				if(p->len != len){
 					free(p->xiobuf);
 					p->xiobuf = emalloc9p(len*Rawblocksize);
@@ -229,18 +227,13 @@
 
  */
 void
-bkp(Iobuf *p, u64 bno, u64 qpath)
+bkp(u64 srcbno, u8 *contents, u64 bno, u64 qpath)
 {
 	Iobuf *buf;
 
-	if(p == nil){
-		dprint("bpk: p is nil invalid backup location %d, qpath %llud\n",
-				bno, qpath);
-		return;
-	}
 	if(bno == 0){
-		dprint("bkp %d: invalid backup location %d, qpath %llud\n",
-				p->blkno, bno, qpath);
+		dprint("bkp %llud: invalid backup location %llud, qpath %llud\n",
+				srcbno, bno, qpath);
 		return;
 	}
 
@@ -248,7 +241,7 @@
 	if(buf == nil){
 		panic("bkp: buf == nil\n");
 	}
-	memcpy(buf->io->buf, p->io->buf, Blocksize);
+	memcpy(buf->io->buf, contents, Blocksize);
 	if(qpath == Qproot0 || qpath == Qproot1){
 		buf->io->d.mode &= ~DMDIR; /* to avoid recursive du -a */
 	}
@@ -258,8 +251,9 @@
 void
 putbuf(Iobuf *p)
 {
-	u32 n;
 	s8 i;
+	u8 buf[Blocksize];
+	u64 srcbno;
 
 	if(p == nil){
 		panic("putbuf p == nil called by %#p\n", getcallerpc(&p));
@@ -287,33 +281,27 @@
 		if(chatty9p > 4)
 		dprint(" .. runlock()'ed\n");
 	}else{
+
 		if(canwlock(p)){
 			panic("putbuf: buffer not locked %llud\n", p->blkno);
 		}
-		p->io->dirty = 1;
-		if((n = devwrite(p->blkno, p->io, p->len)) != p->len*Rawblocksize){
-			dprint("%s\n", errstring[Esystem]);
-			panic("error writing block %llud: %d bytes: %r\n",
-					p->blkno, n);
+		srcbno = p->blkno;
+		if(p->blkno == config.config.srcbno ||
+			p->blkno == config.super.srcbno ||
+			p->blkno == config.root.srcbno){
+			memcpy(buf, p->io->buf, Blocksize);
 		}
-		p->io->dirty = 0;
-		devwritedirtyclear(p->blkno);
-		if(p->blkno == config.config.srcbno){
+		putwrite(p);
+		if(srcbno == config.config.srcbno){
 			for(i=0; i<Nbkp; i++)
-				bkp(p, config.config.dest[i], Qpconfig0+i*3);
-		}else if(p->blkno == config.super.srcbno){
+				bkp(srcbno, buf, config.config.dest[i], Qpconfig0+i*3);
+		}else if(srcbno == config.super.srcbno){
 			for(i=0; i<Nbkp; i++)
-				bkp(p, config.super.dest[i], Qpsuper0+i*3);
-		}else if(p->blkno == config.root.srcbno){
+				bkp(srcbno, buf, config.super.dest[i], Qpsuper0+i*3);
+		}else if(srcbno == config.root.srcbno){
 			for(i=0; i<Nbkp; i++)
-				bkp(p, config.root.dest[i], Qproot0+i*3);
+				bkp(srcbno, buf, config.root.dest[i], Qproot0+i*3);
 		}
-		if(chkwunlock(p) == 0){
-			showbuf(p);
-			panic("putbuf chkwunlock(p) == 0 called by %#p\n", getcallerpc(&p));
-		}
-		if(chatty9p > 4)
-		dprint(" .. wunlock()'ed\n");
 	}
 }
 
--- a/mafs.c
+++ b/mafs.c
@@ -42,7 +42,7 @@
 	static char *nets[8];
 	int doream, stdio, netc;
 	char buf[Namelen];
-	int pid, ctl;
+	int pid, ctl, nwriters;
 
 	progname = "mafs";
 	procname = "init";
@@ -55,6 +55,7 @@
 	rfork(RFNAMEG|RFNOTEG|RFREND);
 
 	nbuckets = Nbuckets;
+	nwriters = Nwriters;
 	sfd = -1;
 	doream = stdio = netc = 0;
 
@@ -69,6 +70,7 @@
 	case 'D':	chatty9p++; break;
 	case 'f':	devfile = ARGF(); break;
 	case 'h':	nbuckets = atoi(EARGF(usage())); break;
+	case 'w':	nwriters = atoi(EARGF(usage())); break;
 	case 'r':
 		doream = 1;
 		/* fall through */
@@ -104,6 +106,7 @@
 	formatinit();
 
 	tlocks = emalloc9p(NTLOCK * sizeof *tlocks);
+	initwriters(nwriters);
 	iobufinit();
 
 	/*
--- a/mkfile
+++ b/mkfile
@@ -16,6 +16,7 @@
 	sub.$O\
 	tag.$O\
 	user.$O\
+	writer.$O\
 
 HFILES=\
 	all.h\
--- /dev/null
+++ b/writer.c
@@ -1,0 +1,231 @@
+#include "all.h"
+
+enum
+{
+	Npendingwrites	= 32,
+};
+
+/* below is from nemo's Pg 252 */
+typedef	struct	Dirties	Dirties;
+typedef	struct	Writer	Writer;
+
+struct Dirties
+{
+	QLock lck;
+	Iobuf *head, *tail;
+	u32 n;
+	Rendez isempty;
+} drts;
+
+struct Writer
+{
+	u32 pid;
+	u64 blkno;
+};	/* keeps track of running procs to flush */
+
+Writer *writers; /* array of writer proc pids */
+u16 nwriters;
+
+static Iobuf *
+pluck(Iobuf *b)
+{
+	if(b->prevdirty != nil && b->nextdirty != nil){
+		/* somewhere in the middle */
+		b->nextdirty->prevdirty = b->prevdirty;
+		b->prevdirty->nextdirty = b->nextdirty;
+		b->prevdirty = b->nextdirty = nil;
+		goto Done;
+	}else if(b->prevdirty == nil && b->nextdirty == nil){
+		drts.head = drts.tail = nil;
+		/* only one */
+		goto Done;
+	}else if(b->prevdirty == nil){
+		/* first in the linked list */
+		drts.head = b->nextdirty;
+		b->nextdirty = nil;
+		goto Done;
+	}else if(b->nextdirty == nil){
+		/* last in the linked list */
+		drts.tail = b->prevdirty;
+		b->prevdirty->nextdirty = nil;
+		b->prevdirty = nil;
+		goto Done;
+	}
+	panic("pluck: blkno %llud should not be here\n", b->blkno);
+Done:
+	drts.n--;
+	return b;
+}
+
+/* the Iobuf should be wlock()'ed at entry */
+Iobuf *
+rmwrite(Iobuf *b)
+{
+	qlock(&drts.lck);
+	pluck(b);
+	qunlock(&drts.lck);
+	return b;
+}
+
+Iobuf *
+getwrite(void)
+{
+	Iobuf *b;
+
+	qlock(&drts.lck);
+Again:
+	if(drts.n == 0){
+		rsleep(&drts.isempty);
+		if(shuttingdown)
+			return nil;
+	}
+	for(b=drts.head; b->nextdirty != nil; b=b->nextdirty){
+		if(canwlock(b)){
+			pluck(b);
+			break;
+		}else{
+			dprint("getwrite: could not lock block %llud\n", b->blkno);
+			continue;
+		}
+	}
+	if(b == nil)
+		goto Again; /* only when the putbuf() is still adding to dirties */
+	qunlock(&drts.lck);
+	return b;
+}
+
+static void stats(void);
+
+/* the Iobuf should be wlock()'ed at entry */
+void
+putwrite(Iobuf *b)
+{
+	qlock(&drts.lck);
+	if(drts.n > Npendingwrites){
+		stats();
+	}
+	b->dirty = 1;
+	if(drts.head == nil){
+		drts.head = drts.tail = b;
+		rwakeup(&drts.isempty);
+	}else{
+		drts.tail->nextdirty = b;
+		b->prevdirty = drts.tail;
+		drts.tail = b;
+	}
+	drts.n++;
+	if(chkwunlock(b) == 0){
+		showbuf(b);
+		panic("putbuf chkwunlock(p) == 0 called by %#p\n", getcallerpc(&b));
+	}
+	if(chatty9p > 4)
+		dprint(" .. wunlock()'ed\n");
+	qunlock(&drts.lck);
+	if(chatty9p > 1)
+		stats();
+}
+
+void
+dowrite(Iobuf *p)
+{
+	u64 n;
+
+	if(canwlock(p)){
+		panic("putbuf: buffer not locked %llud\n", p->blkno);
+	}
+	p->io->dirty = 1;
+	if((n = devwrite(p->blkno, p->io, p->len)) != p->len*Rawblocksize){
+		dprint("%s\n", errstring[Esystem]);
+		panic("error writing block %llud: %llud bytes: %r\n",
+				p->blkno, n);
+	}
+	p->io->dirty = 0;
+	devwritedirtyclear(p->blkno);
+	p->dirty = 0;
+	if(chkwunlock(p) == 0){
+		showbuf(p);
+		panic("putbuf chkwunlock(p) == 0 called by %#p\n", getcallerpc(&p));
+	}
+	if(chatty9p > 4)
+	dprint(" .. wunlock()'ed\n");
+}
+
+void
+startwriter(int id)
+{
+	char name[16];
+	Iobuf *b;
+
+	switch(rfork(RFPROC|RFMEM|RFFDG)){
+	case -1:
+		panic("can't fork");
+	case 0:
+		if(chatty9p > 1)
+		dprint("child %d pid: %d\n", id, getpid());
+		break;
+	default:
+		return;
+	}
+	writers[id].pid = id;
+	procname = name;
+	snprint(name, 16, "mafs writer %d", id);
+	procsetname(name);
+	while((b=getwrite()) != nil)
+		dowrite(b);
+	dprint("%s process exited\n", name);
+	_exits(nil);
+}
+
+void
+initwriters(u8 nws)
+{
+	u8 i;
+
+	nwriters = nws;
+	// release all locks, set everything to null values
+	memset(&drts, 0, sizeof(drts));
+	writers = malloc(sizeof(Writer)*nwriters);
+
+	// set the locks used by the Rendezes
+	drts.isempty.l = &drts.lck;
+
+	for(i = 0; i < nwriters; i++)
+		startwriter(i);
+}
+
+int
+stopwriters(void)
+{
+	int i, a;
+
+	a = 0;
+	for(i = 0; i<nwriters; i++){
+		if(writers[i].blkno == 0){
+			// rwakeup(&buf.isempty); TODO why can't I get this to work?
+			postnote(PNPROC, writers[i].pid, "interrupt");
+			writers[i].pid = 0;
+		}else
+			a++;
+	}
+	dprint("%d processes still writing, %llud writes still pending\n", a, drts.n);
+	return a;
+}
+
+static void
+stats(void)
+{
+	int n, w, inv, i;
+
+	n = w = inv = 0;
+	for(i = 0; i<Nprocs; i++){
+		if(writers[i].pid == 0)
+			inv++;
+		else if(writers[i].blkno == 0)
+			n++;
+		else if(writers[i].blkno > 0)
+			w++;
+	}
+	dprint("Nprocs %d inv %d idle %d working %d dirties nworks %d hd %p tl %p\n",
+			nwriters, inv, n, w, drts.n, drts.head, drts.tail);
+}
+