ref: 6e2e8c58342a6b3bba516d0ee42107a54d4c7717
parent: 194a2072349c964f18ad8dff41cc97fb6d9b087b
author: 9ferno <gophone2015@gmail.com>
date: Wed Nov 9 03:21:21 EST 2022
batch all the Tdata writes of an Iounit
--- a/9p.c
+++ b/9p.c
@@ -1264,9 +1264,10 @@
s32
append(Dentry *d, u64 dblkno, char *wbuf, s32 wbufsize)
{
- Iobuf *buf;
+ Iobuf *buf, *bufs[(Iounit/Blocksize)+1];
s32 howmuch;
- u64 blkno, lastblksize;
+ u64 blkno, lastblksize, written, i, nblocks, startreli;
+ u64 blknos[(Iounit/Blocksize)+1];
if(d == nil || wbuf == nil || wbufsize == 0)
return 0;
@@ -1280,19 +1281,43 @@
" d->size/Blocksize %llud\n",
d->name, d->size, d->size/Blocksize);
- buf = allocblock(Tdata, d->qid.path);
- if(buf == nil)
- return -1;
- blkno = buf->blkno;
- howmuch = min(Blocksize, wbufsize);
- memcpy(buf->io->buf, wbuf, howmuch);
- putbuf(buf);
+ /* can I write multiple blocks at once? */
+ if((nblocks = wbufsize/Blocksize) > 1){
+ startreli = d->size/Blocksize;
+ if(allocblocks(Tdata, d->qid.path, nblocks, bufs) == 0)
+ return -1;
+ written = 0;
+ for(i = 0; i<nblocks; i++){
+ howmuch = min(Blocksize, wbufsize-written);
+ memcpy(bufs[i]->io->buf, wbuf+written, howmuch);
+ written += howmuch;
+ blknos[i] = bufs[i]->blkno;
+ }
+ putbufs(bufs, nblocks);
+ for(i = 0; i<nblocks; i++){
+ if(addrelative(d, dblkno, startreli+i, blknos[i]) == 0){
+ panic("could not write Tdata block\n");
+ return -2;
+ }
+ }
+ return written;
+ }else{
+ /* write single block */
+ howmuch = min(Blocksize, wbufsize);
+ buf = allocblock(Tdata, d->qid.path);
+ if(buf == nil)
+ return -1;
+ blkno = buf->blkno;
+ memcpy(buf->io->buf, wbuf, howmuch);
+ putbuf(buf);
- if(addrelative(d, dblkno, d->size/Blocksize, blkno) == 0){
- freeblock(blkno, Tdata, d->qid.path);
- return -2;
+ if(addrelative(d, dblkno, d->size/Blocksize, blkno) == 0){
+ panic("could not write Tdata block\n");
+ freeblock(blkno, Tdata, d->qid.path);
+ return -2;
+ }
+ return howmuch;
}
- return howmuch;
}else{
/* last block is partially full, fill it up */
/*
@@ -1418,7 +1443,7 @@
if(chatty9p > 1)
dprint("writefile(): append\n");
n = append(d, dblkno, wbuf+written, /* from where */
- wbufsize-written /* how much */);
+ wbufsize-written /* how much */);
if(chatty9p > 1)
dprint("writefile(): append returned %d\n", n);
if(n<0){
--- a/all.h
+++ b/all.h
@@ -105,6 +105,7 @@
Iobuf* getbufchk(u64 blkno, u8 readonly, int tag, u64 qpath);
void iobufinit(void);
void putbuf(Iobuf *p);
+void putbufs(Iobuf **ps, u64 len);
void putbuffree(Iobuf *p);
void settag(Iobuf *p, int tag, u64 qpath);
void showbuf(Iobuf *p);
@@ -112,11 +113,13 @@
/* writer functions */
void initwriter(void);
void putwrite(Iobuf *b);
+void putwrites(Iobuf **bs, u64 len);
void stopwriter(void);
u64 pendingwrites(void);
/* routines to manipulate the contents */
Iobuf* allocblock(int tag, u64 qpath);
+u64 allocblocks(int tag, u64 qpath, u64 len, Iobuf **bufs);
void freeblockbuf(Iobuf *buf);
void freeblock(u64 blkno, u16 tag, u64 qpath);
void fsok(int ok);
--- a/docs/mafs.ms
+++ b/docs/mafs.ms
@@ -772,7 +772,7 @@
};
.fi
.sp
-A writer process takes the blocks from the Dirties linked list on a FIFO (first-in-first-out) basis and writes them to the disk. putbuf() adds blocks to the end of this linked list.
+A single writer process takes the blocks from the Dirties linked list on a FIFO (first-in-first-out) basis and writes them to the disk. putbuf() adds blocks to the end of this linked list.
.sp
The dirty blocks not yet written to the disk remain in the buffer cache and cannot be stolen when a need for new Iobuf arises.
.sp
@@ -779,6 +779,8 @@
Free'd blocks are not written to the disk to avoid writing blanks to a disk.
.sp
The writer throttles input when there are more than Npendingwrites waiting to be written. This can be adjusted with the -w parameter.
+.sp
+The alternative to having a single writer process is to have each worker write to the disk too, synchronous writes. Synchronous writes makes writes happen at disk write speed. With asynchronous writes, memory is used to hold the data until written to the disk. This shows increased write throughput until we fill up memory. After filling up memory, writes happen at disk speed. Asynchronous writes have the side effect of a single disk write queue.
.sp
.sp
.ne 4
--- a/iobuf.c
+++ b/iobuf.c
@@ -316,6 +316,12 @@
}
}
+void
+putbufs(Iobuf **ps, u64 len)
+{
+ putwrites(ps, len);
+}
+
/* only caller is freeblockbuf().
These blocks do not need to be written to the disk.
Hence, avoiding putwrite().
--- a/mafs.c
+++ b/mafs.c
@@ -106,15 +106,17 @@
panic("null size %s", devfile);
/* 2/3rds of the memory for the pending writes
- and 1/3rd for the buffer cache */
+ and 1/3rd for the buffer cache
+ leaving 4*(Iounit/Blocksize) for jumbo writes
+ */
if(nmemunits == 0)
nmemunits = size/Rawblocksize > 8*MiB ? 8*MiB : size/Rawblocksize;
if(nmemunits < KiB)
nmemunits = KiB;
if(npendingwrites == 0)
- npendingwrites = 2*nmemunits/3;
+ npendingwrites = 2*(nmemunits-(4*(Iounit/Blocksize)))/3;
if(nbuckets == 0)
- nbuckets = nmemunits/(3*Ncollisions);
+ nbuckets = (nmemunits-(4*(Iounit/Blocksize)))/(3*Ncollisions);
if(chatty9p){
dprint("\nPlan 9 %d-bit file server with %d-deep indirect blocks\n",
--- a/sub.c
+++ b/sub.c
@@ -114,6 +114,30 @@
return buf;
}
+u64
+allocblocks(int tag, u64 qpath, u64 len, Iobuf **bufs)
+{
+ u64 blkno, i;
+ Iobuf *buf;
+
+ blkno = balloc(&frees, len);
+ if(blkno == 0)
+ return 0; /* the caller should trigger an Efull message */
+
+ if(chatty9p > 1)
+ dprint("alloc %llud\n", blkno);
+
+ for(i = 0; i < len; i++){
+ /* cannot do getbufchk() unless we ream the whole disk at start */
+ bufs[i] = buf = getbuf(blkno+i, Bwritable, Bfreshalloc);
+ /* clear the buf to avoid leaks on reuse */
+ memset(buf->xiobuf, 0, Rawblocksize);
+ settag(buf, tag, qpath);
+ }
+
+ return len;
+}
+
/* the buf should have been wlock()'ed */
void
freeblockbuf(Iobuf *buf)
--- a/writer.c
+++ b/writer.c
@@ -71,12 +71,24 @@
return b;
}
-/* the Iobuf should be wlock()'ed at entry */
+/*
+ the Iobuf should be wlock()'ed at entry and until it
+ is placed in the writer queue.
+ It is unlocked after it is placed in writer queue to
+ avoid another process putting it in the writer queue
+ before us. This ensures that the write order
+ is maintained and a newer write being overwritten
+ by older write.
+ */
void
putwrite(Iobuf *b)
{
Wbuf *w;
+ if(chatty9p > 4){
+ dprint("putwrite start p->blkno %llud\n", b->blkno);
+ stats();
+ }
w = emalloc9p(sizeof(Wbuf));
w->blkno = b->blkno;
w->payload = allocmemunit();
@@ -83,18 +95,14 @@
memcpy(w->payload, b->xiobuf, Rawblocksize);
incref(&b->dirties);
w->iobuf = b;
+
+ qlock(&drts.lck);
+ if(drts.n >= npendingwrites)
+ rsleep(&drts.isfull);
if(chkwunlock(b) == 0){
showbuf(b);
panic("putwrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&b));
}
-
- qlock(&drts.lck);
- if(chatty9p > 4){
- dprint("putwrite start p->blkno %llud\n", b->blkno);
- stats();
- }
- if(drts.n == npendingwrites)
- rsleep(&drts.isfull);
if(drts.head == nil){
drts.head = drts.tail = w;
}else{
@@ -105,10 +113,53 @@
drts.n++;
if(drts.n == 1)
rwakeup(&drts.isempty);
+ qunlock(&drts.lck);
if(chatty9p > 4 && b!=nil){
dprint("putwrite done b->blkno %llud\n", b->blkno);
stats();
}
+}
+void
+putwrites(Iobuf **bs, u64 len)
+{
+ Wbuf *w, *ws[(Iounit/Blocksize)+1];
+ u8 empty;
+ u64 i;
+ Iobuf *b;
+
+ if(len == 0)
+ return;
+ empty = 0;
+ for(i = 0; i < len; i++){
+ b = bs[i];
+ ws[i] = w = emalloc9p(sizeof(Wbuf));
+ w->blkno = b->blkno;
+ w->payload = allocmemunit();
+ memcpy(w->payload, b->xiobuf, Rawblocksize);
+ incref(&b->dirties);
+ w->iobuf = b;
+ }
+ qlock(&drts.lck);
+ if(drts.n >= npendingwrites)
+ rsleep(&drts.isfull);
+ if(drts.n == 0)
+ empty = 1;
+ for(i = 0; i < len; i++){
+ if(chkwunlock(bs[i]) == 0){
+ showbuf(bs[i]);
+ panic("putwrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&bs));
+ }
+ if(drts.head == nil){
+ drts.head = drts.tail = ws[i];
+ }else{
+ drts.tail->next = ws[i];
+ ws[i]->prev = drts.tail;
+ drts.tail = ws[i];
+ }
+ drts.n++;
+ }
+ if(empty)
+ rwakeup(&drts.isempty);
qunlock(&drts.lck);
}
@@ -140,79 +191,87 @@
return;
}
}
- /* using canwlock() here as getbuf() could
- have wlock()'ed the Iobuf too */
- if(drts.head != nil){
- if(drts.n == npendingwrites-1)
- full = 1;
- if(drts.n > 1){
- /* trying to write consecutive blocks with a write() call */
- n = 1;
- prevblkno = startblkno = drts.head->blkno;
- for(b = drts.head->next;
- n <= drts.n && b != nil && b->blkno == prevblkno+1 && n < 128;
- b = b->next){
- prevblkno=b->blkno;
- n++;
+ // dprint("dowrite: drts.n %llud\n", drts.n);
+ if(drts.head == nil){
+ qunlock(&drts.lck);
+ return;
+ }
+
+ if(drts.n >= npendingwrites)
+ full = 1;
+ if(drts.n > 1){
+ /* trying to write consecutive blocks with a write() call */
+ n = 1;
+ prevblkno = startblkno = drts.head->blkno;
+ for(b = drts.head->next;
+ n <= drts.n && b != nil && b->blkno == prevblkno+1 && n < 128;
+ b = b->next){
+ prevblkno=b->blkno;
+ n++;
+ }
+ if(n > 1){
+ for(i = 0; i < n; i++)
+ blks[i] = pluck(drts.head);
+ if(full && drts.n < npendingwrites)
+ rwakeup(&drts.isfull);
+ // dprint("dowrite: at return drts.n %llud\n", drts.n);
+ qunlock(&drts.lck);
+
+ if(chatty9p > 4){
+ if(b != nil)
+ dprint("getwrite jumbo drts.n %llud > 1 n %llud start %llud next %llud\n",
+ drts.n, n, startblkno, b->blkno);
+ else
+ dprint("getwrite jumbo drts.n %llud > 1 n %llud start %llud\n",
+ drts.n, n, startblkno);
}
- if(n > 1){
- if(chatty9p > 4){
- if(b != nil)
- dprint("getwrite jumbo drts.n %llud > 1 n %llud start %llud next %llud\n",
- drts.n, n, startblkno, b->blkno);
- else
- dprint("getwrite jumbo drts.n %llud > 1 n %llud start %llud\n",
- drts.n, n, startblkno);
- }
- jumbo = emalloc9p(n*Rawblocksize);
- for(i = 0; i < n; i++){
- b = pluck(drts.head);
- memcpy(jumbo+(i*Rawblocksize), b->payload, Rawblocksize);
- blks[i] = b;
- if(chatty9p > 4 && b!=nil)
- dprint("getwrite done b->blkno %llud\n", b->blkno);
- }
- if((wn = devwrites(startblkno, jumbo, n)) != n*Rawblocksize){
- dprint("%s\n", errstring[Esystem]);
- panic("error writing block %llud: %llud bytes, written %llud: %r\n",
- b->blkno, n, wn);
- }
- for(i = 0; i < n; i++){
- b = blks[i];
- blks[i] = 0;
- if(chatty9p > 4)
- dprint("dowrite %llud wunlock()'ed\n", b->blkno);
- decref(&b->iobuf->dirties);
- freememunit(b->payload);
- free(b);
- }
- free(jumbo);
- qunlock(&drts.lck);
- return;
- }else
- goto single;
- }else{
-single:
- b = pluck(drts.head);
- if(chatty9p > 4 && b!=nil)
- dprint("getwrite done b->blkno %llud\n", b->blkno);
- if((n = devwrite(b->blkno, b->payload)) != Rawblocksize){
+ jumbo = emalloc9p(n*Rawblocksize);
+ for(i = 0; i < n; i++){
+ b = blks[i];
+ memcpy(jumbo+(i*Rawblocksize), b->payload, Rawblocksize);
+ if(chatty9p > 4 && b!=nil)
+ dprint("getwrite b->blkno %llud\n", b->blkno);
+ }
+ if((wn = devwrites(startblkno, jumbo, n)) != n*Rawblocksize){
dprint("%s\n", errstring[Esystem]);
- panic("error writing block %llud: %llud bytes: %r\n",
- b->blkno, n);
+ panic("error writing block %llud: %llud bytes, written %llud: %r\n",
+ b->blkno, n, wn);
}
- if(chatty9p > 4)
- dprint("dowrite %llud wunlock()'ed\n", b->blkno);
- decref(&b->iobuf->dirties);
- freememunit(b->payload);
- free(b);
- }
- if(full)
+ free(jumbo);
+ for(i = 0; i < n; i++){
+ b = blks[i];
+ if(chatty9p > 4)
+ dprint("dowrite %llud wunlock()'ed\n", b->blkno);
+ decref(&b->iobuf->dirties);
+ freememunit(b->payload);
+ free(b);
+ blks[i] = 0;
+ }
+ return;
+ }else
+ goto single;
+ }else{
+single:
+ b = pluck(drts.head);
+ if(full && drts.n < npendingwrites)
rwakeup(&drts.isfull);
+ qunlock(&drts.lck);
+
if(chatty9p > 4 && b!=nil)
- stats();
+ dprint("getwrite done b->blkno %llud\n", b->blkno);
+ if((n = devwrite(b->blkno, b->payload)) != Rawblocksize){
+ dprint("%s\n", errstring[Esystem]);
+ panic("error writing block %llud: %llud bytes: %r\n",
+ b->blkno, n);
+ }
+ if(chatty9p > 4)
+ dprint("dowrite %llud wunlock()'ed\n", b->blkno);
+ decref(&b->iobuf->dirties);
+ freememunit(b->payload);
+ free(b);
}
- qunlock(&drts.lck);
+ if(chatty9p > 4 && b!=nil)
+ stats();
}
void