ref: 8ba26cf58ea5df0dbd29149ed04794fff0c8bf87
parent: ff8a41e99aed12a29b676c4c2d161eeae5ed652e
author: 9ferno <gophone2015@gmail.com>
date: Tue Oct 18 15:22:25 EDT 2022
removed synchronous writes
--- a/9p.c
+++ b/9p.c
@@ -759,6 +759,25 @@
}
/* below is from nemo's Pg 252 */
+typedef struct Buffer Buffer;
+typedef struct Work Work;
+
+struct Work
+{
+ void (*f)(Req *r);
+ Req *r;
+};
+
+struct Buffer
+{
+ QLock lck;
+ Work works[Nworks];
+ u16 hd, tl, nworks;
+ Rendez isfull, isempty;
+};
+
+Buffer buf;
+
Work
get(Buffer *b)
{
@@ -784,7 +803,7 @@
return w;
}
-void stats(void);
+static void stats(void);
void
put(Buffer *b, void (*f)(Req *r), Req *r)
{
@@ -813,7 +832,6 @@
stats();
}
-Buffer buf;
struct
{
u32 pid;
@@ -849,7 +867,7 @@
return a;
}
-void
+static void
stats(void)
{
int n, w, inv, i;
@@ -870,7 +888,7 @@
void
shutdown(void)
{
- int i, a;
+ int i, a, b;
/* User *u, *v; */
char srvfilename[Namelen];
@@ -885,7 +903,8 @@
// rwakeupall(&buf.isempty);
for(i = 0; i< 10; i++){
a = stopworkers();
- sleep(a*1000);
+ b = stopwriters();
+ sleep((a>b?a:b)*1000);
}
if(a > 1){ /* 1 for this running process */
dprint("%d processes still running\n", a);
@@ -931,6 +950,7 @@
w = get(b);
}
worker[id].pid = 0;
+ if(chatty9p > 1)
dprint("worker %d exiting\n", id);
exits(nil);
}
--- a/all.h
+++ b/all.h
@@ -10,25 +10,9 @@
#include "dat.h"
#include "extents.h"
-typedef struct Buffer Buffer;
typedef struct Hiob Hiob;
typedef struct Iobuf Iobuf;
-typedef struct Work Work;
-struct Work
-{
- void (*f)(Req *r);
- Req *r;
-};
-
-struct Buffer
-{
- QLock lck;
- Work works[Nworks];
- u16 hd, tl, nworks;
- Rendez isfull, isempty;
-};
-
struct Hiob /* Hash bucket */
{
Iobuf* link; /* least recently used of the circular list */
@@ -78,6 +62,9 @@
u8 *xiobuf; /* "real" buffer pointer */
Content *io; /* cast'able to contents */
};
+
+ u8 dirty; /* to identify buffers which are yet to be written */
+ Iobuf *prevdirty, *nextdirty;
};
extern u32 nbuckets; /* n hash buckets for i/o */
@@ -93,6 +80,11 @@
void putbuf(Iobuf *p);
void settag(Iobuf *p, int tag, u64 qpath);
void showbuf(Iobuf *p);
+
+/* writer functions */
+void initwriters(u8 nws);
+void putwrite(Iobuf *b);
+int stopwriters(void);
/* routines to manipulate the contents */
Iobuf* allocblocks(u16 len, int tag, u64 qpath);
--- a/dat.h
+++ b/dat.h
@@ -23,6 +23,7 @@
MiB = KiB*KiB, /* Mibibytes */
GiB = KiB*MiB, /* Gibibytes */
TiB = KiB*GiB, /* Tibibytes */
+ Nwriters = 10, /* max. number of writer processes */
/*
https://cs.stackexchange.com/questions/11029/why-is-it-best-to-use-a-prime-number-as-a-mod-in-a-hashing-function
--- a/extents.c
+++ b/extents.c
@@ -364,7 +364,7 @@
return e;
}
-u64
+static u64
pluck(Extents *es, Extent *e)
{
Extent *dlow, *dsmall, *fhigh, *fbig;
--- a/iobuf.c
+++ b/iobuf.c
@@ -1,7 +1,5 @@
-#include "all.h"
+#include "all.h"
-#define DEBUG 0
-
u32 nbuckets = 0; /* nbuckets derived from -m or Nbuckets */
Hiob *hiob = nil; /* array of nbuckets */
Extents frees = {0};
@@ -144,7 +142,7 @@
if(ncollisions >= Ncollisions){
do{
p = s->back;
- if(p->ref == 0 && canwlock(p)){
+ if(p->ref == 0 && p->dirty == 0 && canwlock(p)){
if(p->len != len){
free(p->xiobuf);
p->xiobuf = emalloc9p(len*Rawblocksize);
@@ -229,18 +227,13 @@
*/
void
-bkp(Iobuf *p, u64 bno, u64 qpath)
+bkp(u64 srcbno, u8 *contents, u64 bno, u64 qpath)
{
Iobuf *buf;
- if(p == nil){
- dprint("bpk: p is nil invalid backup location %d, qpath %llud\n",
- bno, qpath);
- return;
- }
if(bno == 0){
- dprint("bkp %d: invalid backup location %d, qpath %llud\n",
- p->blkno, bno, qpath);
+ dprint("bkp %llud: invalid backup location %llud, qpath %llud\n",
+ srcbno, bno, qpath);
return;
}
@@ -248,7 +241,7 @@
if(buf == nil){
panic("bkp: buf == nil\n");
}
- memcpy(buf->io->buf, p->io->buf, Blocksize);
+ memcpy(buf->io->buf, contents, Blocksize);
if(qpath == Qproot0 || qpath == Qproot1){
buf->io->d.mode &= ~DMDIR; /* to avoid recursive du -a */
}
@@ -258,8 +251,9 @@
void
putbuf(Iobuf *p)
{
- u32 n;
s8 i;
+ u8 buf[Blocksize];
+ u64 srcbno;
if(p == nil){
panic("putbuf p == nil called by %#p\n", getcallerpc(&p));
@@ -287,33 +281,27 @@
if(chatty9p > 4)
dprint(" .. runlock()'ed\n");
}else{
+
if(canwlock(p)){
panic("putbuf: buffer not locked %llud\n", p->blkno);
}
- p->io->dirty = 1;
- if((n = devwrite(p->blkno, p->io, p->len)) != p->len*Rawblocksize){
- dprint("%s\n", errstring[Esystem]);
- panic("error writing block %llud: %d bytes: %r\n",
- p->blkno, n);
+ srcbno = p->blkno;
+ if(p->blkno == config.config.srcbno ||
+ p->blkno == config.super.srcbno ||
+ p->blkno == config.root.srcbno){
+ memcpy(buf, p->io->buf, Blocksize);
}
- p->io->dirty = 0;
- devwritedirtyclear(p->blkno);
- if(p->blkno == config.config.srcbno){
+ putwrite(p);
+ if(srcbno == config.config.srcbno){
for(i=0; i<Nbkp; i++)
- bkp(p, config.config.dest[i], Qpconfig0+i*3);
- }else if(p->blkno == config.super.srcbno){
+ bkp(srcbno, buf, config.config.dest[i], Qpconfig0+i*3);
+ }else if(srcbno == config.super.srcbno){
for(i=0; i<Nbkp; i++)
- bkp(p, config.super.dest[i], Qpsuper0+i*3);
- }else if(p->blkno == config.root.srcbno){
+ bkp(srcbno, buf, config.super.dest[i], Qpsuper0+i*3);
+ }else if(srcbno == config.root.srcbno){
for(i=0; i<Nbkp; i++)
- bkp(p, config.root.dest[i], Qproot0+i*3);
+ bkp(srcbno, buf, config.root.dest[i], Qproot0+i*3);
}
- if(chkwunlock(p) == 0){
- showbuf(p);
- panic("putbuf chkwunlock(p) == 0 called by %#p\n", getcallerpc(&p));
- }
- if(chatty9p > 4)
- dprint(" .. wunlock()'ed\n");
}
}
--- a/mafs.c
+++ b/mafs.c
@@ -42,7 +42,7 @@
static char *nets[8];
int doream, stdio, netc;
char buf[Namelen];
- int pid, ctl;
+ int pid, ctl, nwriters;
progname = "mafs";
procname = "init";
@@ -55,6 +55,7 @@
rfork(RFNAMEG|RFNOTEG|RFREND);
nbuckets = Nbuckets;
+ nwriters = Nwriters;
sfd = -1;
doream = stdio = netc = 0;
@@ -69,6 +70,7 @@
case 'D': chatty9p++; break;
case 'f': devfile = ARGF(); break;
case 'h': nbuckets = atoi(EARGF(usage())); break;
+ case 'w': nwriters = atoi(EARGF(usage())); break;
case 'r':
doream = 1;
/* fall through */
@@ -104,6 +106,7 @@
formatinit();
tlocks = emalloc9p(NTLOCK * sizeof *tlocks);
+ initwriters(nwriters);
iobufinit();
/*
--- a/mkfile
+++ b/mkfile
@@ -16,6 +16,7 @@
sub.$O\
tag.$O\
user.$O\
+ writer.$O\
HFILES=\
all.h\
--- /dev/null
+++ b/writer.c
@@ -1,0 +1,231 @@
+#include "all.h"
+
+enum
+{
+ Npendingwrites = 32,
+};
+
+/* below is from nemo's Pg 252 */
+typedef struct Dirties Dirties;
+typedef struct Writer Writer;
+
+struct Dirties
+{
+ QLock lck;
+ Iobuf *head, *tail;
+ u32 n;
+ Rendez isempty;
+} drts;
+
+struct Writer
+{
+ u32 pid;
+ u64 blkno;
+}; /* keeps track of running procs to flush */
+
+Writer *writers; /* array of writer proc pids */
+u16 nwriters;
+
+static Iobuf *
+pluck(Iobuf *b)
+{
+ if(b->prevdirty != nil && b->nextdirty != nil){
+ /* somewhere in the middle */
+ b->nextdirty->prevdirty = b->prevdirty;
+ b->prevdirty->nextdirty = b->nextdirty;
+ b->prevdirty = b->nextdirty = nil;
+ goto Done;
+ }else if(b->prevdirty == nil && b->nextdirty == nil){
+ drts.head = drts.tail = nil;
+ /* only one */
+ goto Done;
+ }else if(b->prevdirty == nil){
+ /* first in the linked list */
+ drts.head = b->nextdirty;
+ b->nextdirty = nil;
+ goto Done;
+ }else if(b->nextdirty == nil){
+ /* last in the linked list */
+ drts.tail = b->prevdirty;
+ b->prevdirty->nextdirty = nil;
+ b->prevdirty = nil;
+ goto Done;
+ }
+ panic("pluck: blkno %llud should not be here\n", b->blkno);
+Done:
+ drts.n--;
+ return b;
+}
+
+/* the Iobuf should be wlock()'ed at entry */
+Iobuf *
+rmwrite(Iobuf *b)
+{
+ qlock(&drts.lck);
+ pluck(b);
+ qunlock(&drts.lck);
+ return b;
+}
+
+Iobuf *
+getwrite(void)
+{
+ Iobuf *b;
+
+ qlock(&drts.lck);
+Again:
+ if(drts.n == 0){
+ rsleep(&drts.isempty);
+ if(shuttingdown)
+ return nil;
+ }
+ for(b=drts.head; b->nextdirty != nil; b=b->nextdirty){
+ if(canwlock(b)){
+ pluck(b);
+ break;
+ }else{
+ dprint("getwrite: could not lock block %llud\n", b->blkno);
+ continue;
+ }
+ }
+ if(b == nil)
+ goto Again; /* only when the putbuf() is still adding to dirties */
+ qunlock(&drts.lck);
+ return b;
+}
+
+static void stats(void);
+
+/* the Iobuf should be wlock()'ed at entry */
+void
+putwrite(Iobuf *b)
+{
+ qlock(&drts.lck);
+ if(drts.n > Npendingwrites){
+ stats();
+ }
+ b->dirty = 1;
+ if(drts.head == nil){
+ drts.head = drts.tail = b;
+ rwakeup(&drts.isempty);
+ }else{
+ drts.tail->nextdirty = b;
+ b->prevdirty = drts.tail;
+ drts.tail = b;
+ }
+ drts.n++;
+ if(chkwunlock(b) == 0){
+ showbuf(b);
+ panic("putbuf chkwunlock(p) == 0 called by %#p\n", getcallerpc(&b));
+ }
+ if(chatty9p > 4)
+ dprint(" .. wunlock()'ed\n");
+ qunlock(&drts.lck);
+ if(chatty9p > 1)
+ stats();
+}
+
+void
+dowrite(Iobuf *p)
+{
+ u64 n;
+
+ if(canwlock(p)){
+ panic("putbuf: buffer not locked %llud\n", p->blkno);
+ }
+ p->io->dirty = 1;
+ if((n = devwrite(p->blkno, p->io, p->len)) != p->len*Rawblocksize){
+ dprint("%s\n", errstring[Esystem]);
+ panic("error writing block %llud: %llud bytes: %r\n",
+ p->blkno, n);
+ }
+ p->io->dirty = 0;
+ devwritedirtyclear(p->blkno);
+ p->dirty = 0;
+ if(chkwunlock(p) == 0){
+ showbuf(p);
+ panic("putbuf chkwunlock(p) == 0 called by %#p\n", getcallerpc(&p));
+ }
+ if(chatty9p > 4)
+ dprint(" .. wunlock()'ed\n");
+}
+
+void
+startwriter(int id)
+{
+ char name[16];
+ Iobuf *b;
+
+ switch(rfork(RFPROC|RFMEM|RFFDG)){
+ case -1:
+ panic("can't fork");
+ case 0:
+ if(chatty9p > 1)
+ dprint("child %d pid: %d\n", id, getpid());
+ break;
+ default:
+ return;
+ }
+ writers[id].pid = id;
+ procname = name;
+ snprint(name, 16, "mafs writer %d", id);
+ procsetname(name);
+ while((b=getwrite()) != nil)
+ dowrite(b);
+ dprint("%s process exited\n", name);
+ _exits(nil);
+}
+
+void
+initwriters(u8 nws)
+{
+ u8 i;
+
+ nwriters = nws;
+ // release all locks, set everything to null values
+ memset(&drts, 0, sizeof(drts));
+ writers = malloc(sizeof(Writer)*nwriters);
+
+ // set the locks used by the Rendezes
+ drts.isempty.l = &drts.lck;
+
+ for(i = 0; i < nwriters; i++)
+ startwriter(i);
+}
+
+int
+stopwriters(void)
+{
+ int i, a;
+
+ a = 0;
+ for(i = 0; i<nwriters; i++){
+ if(writers[i].blkno == 0){
+ // rwakeup(&buf.isempty); TODO why can't I get this to work?
+ postnote(PNPROC, writers[i].pid, "interrupt");
+ writers[i].pid = 0;
+ }else
+ a++;
+ }
+ dprint("%d processes still writing, %llud writes still pending\n", a, drts.n);
+ return a;
+}
+
+static void
+stats(void)
+{
+ int n, w, inv, i;
+
+ n = w = inv = 0;
+ for(i = 0; i<Nprocs; i++){
+ if(writers[i].pid == 0)
+ inv++;
+ else if(writers[i].blkno == 0)
+ n++;
+ else if(writers[i].blkno > 0)
+ w++;
+ }
+ dprint("Nprocs %d inv %d idle %d working %d dirties nworks %d hd %p tl %p\n",
+ nwriters, inv, n, w, drts.n, drts.head, drts.tail);
+}
+