code: mafs

Download patch

ref: 6e2e8c58342a6b3bba516d0ee42107a54d4c7717
parent: 194a2072349c964f18ad8dff41cc97fb6d9b087b
author: 9ferno <gophone2015@gmail.com>
date: Wed Nov 9 03:21:21 EST 2022

batch all the Tdata writes of an Iounit

--- a/9p.c
+++ b/9p.c
@@ -1264,9 +1264,10 @@
 s32
 append(Dentry *d, u64 dblkno, char *wbuf, s32 wbufsize)
 {
-	Iobuf *buf;
+	Iobuf *buf, *bufs[(Iounit/Blocksize)+1];
 	s32 howmuch;
-	u64 blkno, lastblksize;
+	u64 blkno, lastblksize, written, i, nblocks, startreli;
+	u64 blknos[(Iounit/Blocksize)+1];
 
 	if(d == nil || wbuf == nil || wbufsize == 0)
 		return 0;
@@ -1280,19 +1281,43 @@
 				" d->size/Blocksize %llud\n",
 				d->name, d->size, d->size/Blocksize);
 
-		buf = allocblock(Tdata, d->qid.path);
-		if(buf == nil)
-			return -1;
-		blkno = buf->blkno;
-		howmuch = min(Blocksize, wbufsize);
-		memcpy(buf->io->buf, wbuf, howmuch);
-		putbuf(buf);
+		/* can I write multiple blocks at once? */
+		if((nblocks = wbufsize/Blocksize) > 1){
+			startreli = d->size/Blocksize;
+			if(allocblocks(Tdata, d->qid.path, nblocks, bufs) == 0)
+				return -1;
+			written = 0;
+			for(i = 0; i<nblocks; i++){
+				howmuch = min(Blocksize, wbufsize-written);
+				memcpy(bufs[i]->io->buf, wbuf+written, howmuch);
+				written += howmuch;
+				blknos[i] = bufs[i]->blkno;
+			}
+			putbufs(bufs, nblocks);
+			for(i = 0; i<nblocks; i++){
+				if(addrelative(d, dblkno, startreli+i, blknos[i]) == 0){
+					panic("could not write Tdata block\n");
+					return -2;
+				}
+			}
+			return written;
+		}else{
+			/* write single block */
+			howmuch = min(Blocksize, wbufsize);
+			buf = allocblock(Tdata, d->qid.path);
+			if(buf == nil)
+				return -1;
+			blkno = buf->blkno;
+			memcpy(buf->io->buf, wbuf, howmuch);
+			putbuf(buf);
 
-		if(addrelative(d, dblkno, d->size/Blocksize, blkno) == 0){
-			freeblock(blkno, Tdata, d->qid.path);
-			return -2;
+			if(addrelative(d, dblkno, d->size/Blocksize, blkno) == 0){
+				panic("could not write Tdata block\n");
+				freeblock(blkno, Tdata, d->qid.path);
+				return -2;
+			}
+			return howmuch;
 		}
-		return howmuch;
 	}else{
 		/* last block is partially full, fill it up */
 		/*
@@ -1418,7 +1443,7 @@
 			if(chatty9p > 1)
 				dprint("writefile(): append\n");
 			n = append(d, dblkno, wbuf+written, /* from where */
-							wbufsize-written		/* how much */);
+							wbufsize-written	/* how much */);
 			if(chatty9p > 1)
 				dprint("writefile(): append returned %d\n", n);
 			if(n<0){
--- a/all.h
+++ b/all.h
@@ -105,6 +105,7 @@
 Iobuf*	getbufchk(u64 blkno, u8 readonly, int tag, u64 qpath);
 void	iobufinit(void);
 void	putbuf(Iobuf *p);
+void	putbufs(Iobuf **ps, u64 len);
 void	putbuffree(Iobuf *p);
 void	settag(Iobuf *p, int tag, u64 qpath);
 void	showbuf(Iobuf *p);
@@ -112,11 +113,13 @@
 /* writer functions */
 void	initwriter(void);
 void	putwrite(Iobuf *b);
+void	putwrites(Iobuf **bs, u64 len);
 void	stopwriter(void);
 u64		pendingwrites(void);
 
 /* routines to manipulate the contents */
 Iobuf*	allocblock(int tag, u64 qpath);
+u64	allocblocks(int tag, u64 qpath, u64 len, Iobuf **bufs);
 void	freeblockbuf(Iobuf *buf);
 void	freeblock(u64 blkno, u16 tag, u64 qpath);
 void	fsok(int ok);
--- a/docs/mafs.ms
+++ b/docs/mafs.ms
@@ -772,7 +772,7 @@
 };
 .fi
 .sp
-A writer process takes the blocks from the Dirties linked list on a FIFO (first-in-first-out) basis and writes them to the disk. putbuf() adds blocks to the end of this linked list.
+A single writer process takes the blocks from the Dirties linked list on a FIFO (first-in-first-out) basis and writes them to the disk. putbuf() adds blocks to the end of this linked list.
 .sp
 The dirty blocks not yet written to the disk remain in the buffer cache and cannot be stolen when a need for new Iobuf arises.
 .sp
@@ -779,6 +779,8 @@
 Free'd blocks are not written to the disk to avoid writing blanks to a disk.
 .sp
 The writer throttles input when there are more than Npendingwrites waiting to be written. This can be adjusted with the -w parameter.
+.sp
+The alternative to having a single writer process is to have each worker write to the disk too, synchronous writes. Synchronous writes makes writes happen at disk write speed. With asynchronous writes, memory is used to hold the data until written to the disk. This shows increased write throughput until we fill up memory. After filling up memory, writes happen at disk speed. Asynchronous writes have the side effect of a single disk write queue.
 .sp
 .sp
 .ne 4
--- a/iobuf.c
+++ b/iobuf.c
@@ -316,6 +316,12 @@
 	}
 }
 
+void
+putbufs(Iobuf **ps, u64 len)
+{
+	putwrites(ps, len);
+}
+
 /* only caller is freeblockbuf().
 	These blocks do not need to be written to the disk.
 	Hence, avoiding putwrite().
--- a/mafs.c
+++ b/mafs.c
@@ -106,15 +106,17 @@
 		panic("null size %s", devfile);
 
 	/* 2/3rds of the memory for the pending writes
-		and 1/3rd for the buffer cache */
+		and 1/3rd for the buffer cache
+		leaving 4*(Iounit/Blocksize) for jumbo writes
+	 */
 	if(nmemunits == 0)
 		nmemunits = size/Rawblocksize > 8*MiB ? 8*MiB : size/Rawblocksize;
 	if(nmemunits < KiB)
 		nmemunits = KiB;
 	if(npendingwrites == 0)
-		npendingwrites = 2*nmemunits/3;
+		npendingwrites = 2*(nmemunits-(4*(Iounit/Blocksize)))/3;
 	if(nbuckets == 0)
-		nbuckets = nmemunits/(3*Ncollisions);
+		nbuckets = (nmemunits-(4*(Iounit/Blocksize)))/(3*Ncollisions);
 
 	if(chatty9p){
 		dprint("\nPlan 9 %d-bit file server with %d-deep indirect blocks\n",
--- a/sub.c
+++ b/sub.c
@@ -114,6 +114,30 @@
 	return buf;
 }
 
+u64
+allocblocks(int tag, u64 qpath, u64 len, Iobuf **bufs)
+{
+	u64 blkno, i;
+	Iobuf *buf;
+
+	blkno = balloc(&frees, len);
+	if(blkno == 0)
+		return 0;	/* the caller should trigger an Efull message */
+
+	if(chatty9p > 1)
+		dprint("alloc %llud\n", blkno);
+
+	for(i = 0; i < len; i++){
+		/* cannot do getbufchk() unless we ream the whole disk at start */
+		bufs[i] = buf = getbuf(blkno+i, Bwritable, Bfreshalloc);
+		/* clear the buf to avoid leaks on reuse */
+		memset(buf->xiobuf, 0, Rawblocksize);
+		settag(buf, tag, qpath);
+	}
+
+	return len;
+}
+
 /* the buf should have been wlock()'ed */
 void
 freeblockbuf(Iobuf *buf)
--- a/writer.c
+++ b/writer.c
@@ -71,12 +71,24 @@
 	return b;
 }
 
-/* the Iobuf should be wlock()'ed at entry */
+/*
+	the Iobuf should be wlock()'ed at entry and until it
+	is placed in the writer queue.
+	It is unlocked after it is placed in writer queue to
+	avoid another process putting it in the writer queue
+	before us. This ensures that the write order
+	is maintained and a newer write being overwritten
+	by older write.
+ */
 void
 putwrite(Iobuf *b)
 {
 	Wbuf *w;
 
+	if(chatty9p > 4){
+		dprint("putwrite start p->blkno %llud\n", b->blkno);
+		stats();
+	}
 	w = emalloc9p(sizeof(Wbuf));
 	w->blkno = b->blkno;
 	w->payload = allocmemunit();
@@ -83,18 +95,14 @@
 	memcpy(w->payload, b->xiobuf, Rawblocksize);
 	incref(&b->dirties);
 	w->iobuf = b;
+
+	qlock(&drts.lck);
+	if(drts.n >= npendingwrites)
+		rsleep(&drts.isfull);
 	if(chkwunlock(b) == 0){
 		showbuf(b);
 		panic("putwrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&b));
 	}
-
-	qlock(&drts.lck);
-	if(chatty9p > 4){
-		dprint("putwrite start p->blkno %llud\n", b->blkno);
-		stats();
-	}
-	if(drts.n == npendingwrites)
-		rsleep(&drts.isfull);
 	if(drts.head == nil){
 		drts.head = drts.tail = w;
 	}else{
@@ -105,10 +113,53 @@
 	drts.n++;
 	if(drts.n == 1)
 		rwakeup(&drts.isempty);
+	qunlock(&drts.lck);
 	if(chatty9p > 4 && b!=nil){
 		dprint("putwrite done b->blkno %llud\n", b->blkno);
 		stats();
 	}
+}
+void
+putwrites(Iobuf **bs, u64 len)
+{
+	Wbuf *w, *ws[(Iounit/Blocksize)+1];
+	u8 empty;
+	u64 i;
+	Iobuf *b;
+
+	if(len == 0)
+		return;
+	empty = 0;
+	for(i = 0; i < len; i++){
+		b = bs[i];
+		ws[i] = w = emalloc9p(sizeof(Wbuf));
+		w->blkno = b->blkno;
+		w->payload = allocmemunit();
+		memcpy(w->payload, b->xiobuf, Rawblocksize);
+		incref(&b->dirties);
+		w->iobuf = b;
+	}
+	qlock(&drts.lck);
+	if(drts.n >= npendingwrites)
+		rsleep(&drts.isfull);
+	if(drts.n == 0)
+		empty = 1;
+	for(i = 0; i < len; i++){
+		if(chkwunlock(bs[i]) == 0){
+			showbuf(bs[i]);
+			panic("putwrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&bs));
+		}
+		if(drts.head == nil){
+			drts.head = drts.tail = ws[i];
+		}else{
+			drts.tail->next = ws[i];
+			ws[i]->prev = drts.tail;
+			drts.tail = ws[i];
+		}
+		drts.n++;
+	}
+	if(empty)
+		rwakeup(&drts.isempty);
 	qunlock(&drts.lck);
 }
 
@@ -140,79 +191,87 @@
 			return;
 		}
 	}
-	/* using canwlock() here as getbuf() could
-		have wlock()'ed the Iobuf too */
-	if(drts.head != nil){
-		if(drts.n == npendingwrites-1)
-			full = 1;
-		if(drts.n > 1){
-			/* trying to write consecutive blocks with a write() call */
-			n = 1;
-			prevblkno = startblkno = drts.head->blkno;
-			for(b = drts.head->next;
-					n <= drts.n && b != nil && b->blkno == prevblkno+1 && n < 128;
-					b = b->next){
-				prevblkno=b->blkno;
-				n++;
+	// dprint("dowrite: drts.n %llud\n", drts.n);
+	if(drts.head == nil){
+		qunlock(&drts.lck);
+		return;
+	}
+
+	if(drts.n >= npendingwrites)
+		full = 1;
+	if(drts.n > 1){
+		/* trying to write consecutive blocks with a write() call */
+		n = 1;
+		prevblkno = startblkno = drts.head->blkno;
+		for(b = drts.head->next;
+				n <= drts.n && b != nil && b->blkno == prevblkno+1 && n < 128;
+				b = b->next){
+			prevblkno=b->blkno;
+			n++;
+		}
+		if(n > 1){
+			for(i = 0; i < n; i++)
+				blks[i] = pluck(drts.head);
+			if(full && drts.n < npendingwrites)
+				rwakeup(&drts.isfull);
+			// dprint("dowrite: at return drts.n %llud\n", drts.n);
+			qunlock(&drts.lck);
+
+			if(chatty9p > 4){
+				if(b != nil)
+				dprint("getwrite jumbo drts.n %llud > 1 n %llud start %llud next %llud\n",
+						drts.n, n, startblkno, b->blkno);
+				else
+				dprint("getwrite jumbo drts.n %llud > 1 n %llud start %llud\n",
+						drts.n, n, startblkno);
 			}
-			if(n > 1){
-				if(chatty9p > 4){
-					if(b != nil)
-					dprint("getwrite jumbo drts.n %llud > 1 n %llud start %llud next %llud\n",
-							drts.n, n, startblkno, b->blkno);
-					else
-					dprint("getwrite jumbo drts.n %llud > 1 n %llud start %llud\n",
-							drts.n, n, startblkno);
-				}
-				jumbo = emalloc9p(n*Rawblocksize);
-				for(i = 0; i < n; i++){
-					b = pluck(drts.head);
-					memcpy(jumbo+(i*Rawblocksize), b->payload, Rawblocksize);
-					blks[i] = b;
-					if(chatty9p > 4 && b!=nil)
-						dprint("getwrite done b->blkno %llud\n", b->blkno);
-				}
-				if((wn = devwrites(startblkno, jumbo, n)) != n*Rawblocksize){
-					dprint("%s\n", errstring[Esystem]);
-					panic("error writing block %llud: %llud bytes, written %llud: %r\n",
-							b->blkno, n, wn);
-				}
-				for(i = 0; i < n; i++){
-					b = blks[i];
-					blks[i] = 0;
-					if(chatty9p > 4)
-						dprint("dowrite %llud wunlock()'ed\n", b->blkno);
-					decref(&b->iobuf->dirties);
-					freememunit(b->payload);
-					free(b);
-				}
-				free(jumbo);
-				qunlock(&drts.lck);
-				return;
-			}else
-				goto single;
-		}else{
-single:
-			b = pluck(drts.head);
-			if(chatty9p > 4 && b!=nil)
-				dprint("getwrite done b->blkno %llud\n", b->blkno);
-			if((n = devwrite(b->blkno, b->payload)) != Rawblocksize){
+			jumbo = emalloc9p(n*Rawblocksize);
+			for(i = 0; i < n; i++){
+				b = blks[i];
+				memcpy(jumbo+(i*Rawblocksize), b->payload, Rawblocksize);
+				if(chatty9p > 4 && b!=nil)
+					dprint("getwrite b->blkno %llud\n", b->blkno);
+			}
+			if((wn = devwrites(startblkno, jumbo, n)) != n*Rawblocksize){
 				dprint("%s\n", errstring[Esystem]);
-				panic("error writing block %llud: %llud bytes: %r\n",
-						b->blkno, n);
+				panic("error writing block %llud: %llud bytes, written %llud: %r\n",
+						b->blkno, n, wn);
 			}
-			if(chatty9p > 4)
-				dprint("dowrite %llud wunlock()'ed\n", b->blkno);
-			decref(&b->iobuf->dirties);
-			freememunit(b->payload);
-			free(b);
-		}
-		if(full)
+			free(jumbo);
+			for(i = 0; i < n; i++){
+				b = blks[i];
+				if(chatty9p > 4)
+					dprint("dowrite %llud wunlock()'ed\n", b->blkno);
+				decref(&b->iobuf->dirties);
+				freememunit(b->payload);
+				free(b);
+				blks[i] = 0;
+			}
+			return;
+		}else
+			goto single;
+	}else{
+single:
+		b = pluck(drts.head);
+		if(full && drts.n < npendingwrites)
 			rwakeup(&drts.isfull);
+		qunlock(&drts.lck);
+
 		if(chatty9p > 4 && b!=nil)
-			stats();
+			dprint("getwrite done b->blkno %llud\n", b->blkno);
+		if((n = devwrite(b->blkno, b->payload)) != Rawblocksize){
+			dprint("%s\n", errstring[Esystem]);
+			panic("error writing block %llud: %llud bytes: %r\n",
+					b->blkno, n);
+		}
+		if(chatty9p > 4)
+			dprint("dowrite %llud wunlock()'ed\n", b->blkno);
+		decref(&b->iobuf->dirties);
+		freememunit(b->payload);
+		free(b);
 	}
-	qunlock(&drts.lck);
+	if(chatty9p > 4 && b!=nil)
+		stats();
 }
 
 void