code: mafs

Download patch

ref: eef0fe67ee4bb8c254f0f9e61c24ef5d34d05a4f
parent: 9073565b4482a85a72d82508adb76496975734d6
author: 9ferno <gophone2015@gmail.com>
date: Sun Oct 23 11:19:05 EDT 2022

implemented sync and write throttling

--- a/9p.c
+++ b/9p.c
@@ -758,7 +758,8 @@
 	QLock lck;
 	Work works[Nworks];
 	u16 hd, tl, nworks;
-	Rendez isfull, isempty;
+	Rendez isfull;	/* throttling */
+	Rendez isempty; /* workers do not have to keep polling to find work */
 };
 
 Buffer buf;
--- a/TODO
+++ b/TODO
@@ -1,6 +1,3 @@
-implement sync
-throttling  when the write queue becomes too much
-	show the writer queue when reading /adm/ctl
 profiling
 read only mode
 mafswrite():
@@ -20,5 +17,3 @@
 	cat /adm/ctl
 make docs/mafs.ms more interesting?
 test -A announce
-
-prepare tests and merge chktag.c and tests/sizes.c
--- a/ctl.c
+++ b/ctl.c
@@ -98,11 +98,14 @@
 
 	free = nfrees(&frees);
 	used = config.nblocks - free;
+
+	n = snprint(buf, 1024, "pending writes %llud blocks\n", pendingwrites());
 	if(config.size > TiB)
-	n = snprint(buf, 1024, "(blocks) free %ulld, used %ulld, total %ulld\n"
+	n += snprint(buf+n, 1024-n, "(blocks) free %ulld, used %ulld, total %ulld\n"
 						"(MiB) free %ulld, used %ulld, total %ulld\n"
 						"(GiB) free %ulld, used %ulld, total %ulld\n"
-						"(TiB) free %ulld, used %ulld, total %ulld\n",
+						"(TiB) free %ulld, used %ulld, total %ulld\n"
+						"pending writes %llud\n",
 						free, used, config.nblocks,
 						free * Rawblocksize / MiB,
 						used * Rawblocksize / MiB,
@@ -112,10 +115,11 @@
 						config.size / GiB,
 						free * Rawblocksize / TiB,
 						used * Rawblocksize / TiB,
-						config.size / TiB
+						config.size / TiB,
+						pendingwrites()
 						);
 	else if(config.size > GiB)
-	n = snprint(buf, 1024, "(blocks) free %ulld, used %ulld, total %ulld\n"
+	n += snprint(buf+n, 1024-n, "(blocks) free %ulld, used %ulld, total %ulld\n"
 						"(MiB) free %ulld, used %ulld, total %ulld\n"
 						"(GiB) free %ulld, used %ulld, total %ulld\n",
 						free, used, config.nblocks,
@@ -127,7 +131,7 @@
 						config.size / GiB
 						);
 	else if(config.size > MiB)
-	n = snprint(buf, 1024, "(blocks) free %ulld, used %ulld, total %ulld\n"
+	n += snprint(buf+n, 1024-n, "(blocks) free %ulld, used %ulld, total %ulld\n"
 						"(MiB) free %ulld, used %ulld, total %ulld\n",
 						free, used, config.nblocks,
 						free * Rawblocksize / MiB,
@@ -135,7 +139,7 @@
 						config.size / MiB
 						);
 	else
-	n = snprint(buf, 1024, "(blocks) free %ulld, used %ulld, total %ulld\n"
+	n += snprint(buf+n, 1024-n, "(blocks) free %ulld, used %ulld, total %ulld\n"
 						"(KiB) free %ulld, used %ulld, total %ulld\n",
 						free, used, config.nblocks,
 						free * Rawblocksize / KiB,
--- a/dat.h
+++ b/dat.h
@@ -37,6 +37,7 @@
 	 */
 	Nbuckets = 256*KiB,	/* number of Hiob */
 	Ncollisions = 4,	/* soft limit after which we start reusing older Iobuf's */
+	Npendingwrites	= MiB,	/* write throttling */
 
 	/* Qpnone is the Tag.path of free blocks/extents(Tfree),
 		and zero'ed out dentry blocks */
--- a/docs/mafs.ms
+++ b/docs/mafs.ms
@@ -763,7 +763,8 @@
 	QLock lck;		/* controls access to this queue */
 	Wbuf *head, *tail;	/* linked list of dirty blocks yet to be written to the disk */
 	s32 n;
-	Rendez isempty;
+	Rendez isfull;		/* write throttling */
+	Rendez isempty; 	/* writer does not have to keep polling to find work */
 } drts = {0};
 
 struct Wbuf
@@ -784,7 +785,9 @@
 .sp
 Free'd blocks are not written to the disk to avoid writing blanks to a disk.
 .sp
+The writer throttles input when there are more than Npendingwrites waiting to be written. This can be adjusted with the -w parameter.
 .sp
+.sp
 .ne 4
 .ft B
 Free blocks - Extents
@@ -1226,7 +1229,7 @@
 .sp
 	echo sync >> /n/mafs_myservice/adm/ctl
 .sp
-Stop Mafs.
+Stop Mafs. This command does not return until all the writes are written to the disk. So, could take a long time if you have a long writer queue.
 .sp
 	echo halt >> /n/mafs_myservice/adm/ctl
 .sp
--- a/mafs.c
+++ b/mafs.c
@@ -19,6 +19,7 @@
 u8	noauth = 0;
 u8	readonly = 0;
 u8	shuttingdown = 0;
+extern u64 npendingwrites;		/* write throttling */
 
 int	writeallow;	/* never on; for compatibility with fs */
 int	wstatallow;
@@ -32,7 +33,7 @@
 static void
 usage(void)
 {
-	fprint(2, "usage: mafs [-Ds] [-r service] [-n service] [-h nbuckets] [-a announce-string]... fsfile\n");
+	fprint(2, "usage: mafs [-Ds] [-r service] [-n service] [-h nbuckets] [-w npendingwrites] [-a announce-string]... fsfile\n");
 	exits("usage");
 }
 
@@ -55,6 +56,7 @@
 	rfork(RFNAMEG|RFNOTEG|RFREND);
 
 	nbuckets = Nbuckets;
+	npendingwrites = Npendingwrites;
 	sfd = -1;
 	doream = stdio = netc = 0;
 
@@ -69,6 +71,7 @@
 	case 'D':	chatty9p++; break;
 	case 'f':	devfile = ARGF(); break;
 	case 'h':	nbuckets = atoi(EARGF(usage())); break;
+	case 'w':	npendingwrites = atoi(EARGF(usage())); break;
 	case 'r':
 		doream = 1;
 		/* fall through */
@@ -100,6 +103,9 @@
 
 	if (access(devfile, AREAD|AWRITE) == -1)
 		sysfatal("%s cannot access device", devfile);
+
+	if(npendingwrites == 0)
+		npendingwrites = KiB;
 
 	formatinit();
 
--- a/sub.c
+++ b/sub.c
@@ -185,12 +185,6 @@
 	putbuf(sb);
 }
 
-/* almost obsolete as all writes except for the free extents are synchronous */
-void
-sync(void)
-{
-}
-
 void
 closefd(int fd)
 {
--- a/writer.c
+++ b/writer.c
@@ -1,10 +1,5 @@
 #include "all.h"
 
-enum
-{
-	Npendingwrites	= 32,
-};
-
 /* Only 1 writer to maintain the sequence of writes.
    Else, the directory entry could get written before the directory
    content by another write process.
@@ -20,7 +15,8 @@
 	QLock lck;			/* controls access to this queue */
 	Wbuf *head, *tail;	/* linked list of dirty blocks yet to be written to the disk */
 	s32 n;
-	Rendez isempty;
+	Rendez isfull;		/* write throttling */
+	Rendez isempty; 	/* writer does not have to keep polling to find work */
 } drts = {0};
 
 struct Wbuf
@@ -34,6 +30,7 @@
 	};
 };
 
+u64 npendingwrites;		/* write throttling */
 u8 stopwrites = 0;
 static void stats(void);
 
@@ -93,9 +90,10 @@
 	}
 	/* using canwlock() here as getbuf() could
 		have wlock()'ed the Iobuf too */
-	if(drts.head != nil)
+	if(drts.head != nil){
 		b = pluck(drts.head);
-	else
+		rwakeup(&drts.isfull);
+	}else
 		b = nil;
 	if(chatty9p > 4 && b!=nil){
 		dprint("getwrite done b->blkno %llud\n", b->blkno);
@@ -116,12 +114,8 @@
 		dprint("putwrite start p->blkno %llud\n", b->blkno);
 		stats();
 	}
-	if(drts.n > Npendingwrites){
-		if(drts.n < 0)
-			panic("putwrite drts.n < 0\n");
-		if(chatty9p > 4)
-			stats();
-	}
+	if(drts.n > Npendingwrites)
+		rsleep(&drts.isfull);
 	w = emalloc9p(sizeof(Wbuf)+Rawblocksize);
 	w->blkno = b->blkno;
 	memcpy(&w->io, b->io, Rawblocksize);
@@ -188,6 +182,7 @@
 
 	// set the locks used by the Rendezes
 	drts.isempty.l = &drts.lck;
+	drts.isfull.l = &drts.lck;
 
 	switch(rfork(RFPROC|RFMEM)){
 	case -1:
@@ -253,4 +248,11 @@
 		stats();
 	qunlock(&drts.lck);
 	return n;
+}
+
+void
+sync(void)
+{
+	while(drts.n > 0)
+		sleep(1000);
 }