ref: e82f565015fd0b75085d8d2a4f1aaca53e875f64
dir: /writer.c/
#include "all.h" /* Only 1 writer to maintain the sequence of writes. Else, the directory entry could get written before the directory content by another write process. This creates a mess to recover on a crash. */ /* below is from nemo's Pg 252 */ typedef struct Dirties Dirties; typedef struct Wbuf Wbuf; struct Dirties { QLock lck; /* controls access to this queue */ Wbuf *head, *tail; /* linked list of dirty blocks yet to be written to the disk */ s32 n; Rendez isfull; /* write throttling */ Rendez isempty; /* writer does not have to keep polling to find work */ } drts = {0}; struct Wbuf { u64 blkno; /* block number on the disk, primary key */ Wbuf *prev, *next; Iobuf *iobuf; /* pointer to the used Iobuf in the buffer cache */ union{ u8 *payload; /* "real" contents */ Content *io; /* cast'able to contents */ }; }; u64 npendingwrites; /* write throttling */ u8 stopwrites = 0; static void stats(void); static Wbuf * pluck(Wbuf *b) { if(b == nil) return nil; else if(b->prev == nil && b->next == nil){ /* only one */ drts.head = drts.tail = nil; goto Done; }else if(b->prev == nil){ /* first in the linked list */ drts.head = b->next; b->next = nil; drts.head->prev = nil; goto Done; }else if(b->prev != nil && b->next != nil){ /* somewhere in the middle */ b->next->prev = b->prev; b->prev->next = b->next; b->prev = b->next = nil; goto Done; }else if(b->next == nil){ /* last in the linked list */ drts.tail = b->prev; b->prev->next = nil; b->prev = nil; goto Done; } panic("pluck should not be here\n"); return nil; // too late, b was written already Done: drts.n--; if(drts.n < 0) panic("drts.n < 0\n"); return b; } Wbuf * getwrite(void) { Wbuf *b; qlock(&drts.lck); if(drts.n == 0){ if(stopwrites){ qunlock(&drts.lck); return nil; } rsleep(&drts.isempty); if(drts.n == 0 && stopwrites){ qunlock(&drts.lck); return nil; } } /* using canwlock() here as getbuf() could have wlock()'ed the Iobuf too */ if(drts.head != nil){ b = pluck(drts.head); if(drts.n == npendingwrites-1) rwakeup(&drts.isfull); if(chatty9p > 4 && b!=nil){ dprint("getwrite done b->blkno %llud\n", b->blkno); stats(); } }else b = nil; qunlock(&drts.lck); return b; } /* the Iobuf should be wlock()'ed at entry */ void putwrite(Iobuf *b) { Wbuf *w; w = emalloc9p(sizeof(Wbuf)); w->blkno = b->blkno; w->payload = allocmemunit(); memcpy(w->payload, b->xiobuf, Rawblocksize); incref(&b->dirties); w->iobuf = b; if(chkwunlock(b) == 0){ showbuf(b); panic("putwrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&b)); } qlock(&drts.lck); if(chatty9p > 4){ dprint("putwrite start p->blkno %llud\n", b->blkno); stats(); } if(drts.n == npendingwrites) rsleep(&drts.isfull); if(drts.head == nil){ drts.head = drts.tail = w; }else{ drts.tail->next = w; w->prev = drts.tail; drts.tail = w; } drts.n++; if(drts.n == 1) rwakeup(&drts.isempty); if(chatty9p > 4 && b!=nil){ dprint("putwrite done b->blkno %llud\n", b->blkno); stats(); } qunlock(&drts.lck); } /* dirties is decremented without a wlock() on the buffer in dowrite(). Using a wlock() in dowrite() deadlocks with putwrite(). getbuf() guarantees that even a free'ed block cannot be stolen until the dirties == 0. This avoids dirty blocks being stolen by other block numbers. incref(dirties) only happens with a wlock() in putwrite(). */ void dowrite(Wbuf *p) { u64 n; if(chatty9p > 4){ dprint("dowrite p->blkno %llud\n", p->blkno); stats(); } if(chatty9p > 4) dprint("dowrite p->blkno %llud locked\n", p->blkno); if((n = devwrite(p->blkno, p->payload)) != Rawblocksize){ dprint("%s\n", errstring[Esystem]); panic("error writing block %llud: %llud bytes: %r\n", p->blkno, n); } n = p->blkno; if(chatty9p > 4) dprint("dowrite %llud wunlock()'ed\n", n); decref(&p->iobuf->dirties); freememunit(p->payload); free(p); } void initwriter(void) { Wbuf *b; char name[Namelen]; // set the locks used by the Rendezes drts.isempty.l = &drts.lck; drts.isfull.l = &drts.lck; switch(rfork(RFPROC|RFMEM)){ case -1: panic("can't fork"); case 0: if(chatty9p > 4) dprint("writer started\n"); break; default: return; } procname = name; snprint(name, Namelen, "%s writer", service); procsetname(name); while(stopwrites == 0 || drts.n > 0){ b=getwrite(); if(b != nil) dowrite(b); } if(chatty9p > 4) dprint("%s process exited\n", name); exits(nil); } void stopwriter(void) { u64 n; stopwrites = 1; do{ qlock(&drts.lck); if(chatty9p > 4) dprint("stopwriter drts.n %d\n", drts.n); if(drts.n == 0) rwakeup(&drts.isempty); n = drts.n; qunlock(&drts.lck); if(n == 0) return; else sleep(1000); }while(n > 0); } static void stats(void) { dprint("dirties nwrites %d hd %llud tl %llud\n", drts.n, drts.head == nil ? 0 : drts.head->blkno, drts.tail == nil ? 0 : drts.tail->blkno); } u64 pendingwrites(void) { u64 n; qlock(&drts.lck); n = drts.n; if(chatty9p>4) stats(); qunlock(&drts.lck); return n; } void sync(void) { while(drts.n > 0) sleep(1000); }