ref: eef0fe67ee4bb8c254f0f9e61c24ef5d34d05a4f
dir: /writer.c/
#include "all.h" /* Only 1 writer to maintain the sequence of writes. Else, the directory entry could get written before the directory content by another write process. This creates a mess to recover on a crash. */ /* below is from nemo's Pg 252 */ typedef struct Dirties Dirties; typedef struct Wbuf Wbuf; struct Dirties { QLock lck; /* controls access to this queue */ Wbuf *head, *tail; /* linked list of dirty blocks yet to be written to the disk */ s32 n; Rendez isfull; /* write throttling */ Rendez isempty; /* writer does not have to keep polling to find work */ } drts = {0}; struct Wbuf { u64 blkno; /* block number on the disk, primary key */ Wbuf *prev, *next; Iobuf *iobuf; /* pointer to the used Iobuf in the buffer cache */ union{ u8 payload; /* "real" contents */ Content io; /* cast'able to contents */ }; }; u64 npendingwrites; /* write throttling */ u8 stopwrites = 0; static void stats(void); static Wbuf * pluck(Wbuf *b) { if(b == nil) return nil; else if(b->prev == nil && b->next == nil){ /* only one */ drts.head = drts.tail = nil; goto Done; }else if(b->prev == nil){ /* first in the linked list */ drts.head = b->next; b->next = nil; drts.head->prev = nil; goto Done; }else if(b->prev != nil && b->next != nil){ /* somewhere in the middle */ b->next->prev = b->prev; b->prev->next = b->next; b->prev = b->next = nil; goto Done; }else if(b->next == nil){ /* last in the linked list */ drts.tail = b->prev; b->prev->next = nil; b->prev = nil; goto Done; } panic("pluck should not be here\n"); return nil; // too late, b was written already Done: drts.n--; if(drts.n < 0) panic("drts.n < 0\n"); return b; } Wbuf * getwrite(void) { Wbuf *b; qlock(&drts.lck); if(drts.n == 0){ if(stopwrites){ qunlock(&drts.lck); return nil; } rsleep(&drts.isempty); if(drts.n == 0 && stopwrites){ qunlock(&drts.lck); return nil; } } /* using canwlock() here as getbuf() could have wlock()'ed the Iobuf too */ if(drts.head != nil){ b = pluck(drts.head); rwakeup(&drts.isfull); }else b = nil; if(chatty9p > 4 && b!=nil){ dprint("getwrite done b->blkno %llud\n", b->blkno); stats(); } qunlock(&drts.lck); return b; } /* the Iobuf should be wlock()'ed at entry */ void putwrite(Iobuf *b) { Wbuf *w; qlock(&drts.lck); if(chatty9p > 4){ dprint("putwrite start p->blkno %llud\n", b->blkno); stats(); } if(drts.n > Npendingwrites) rsleep(&drts.isfull); w = emalloc9p(sizeof(Wbuf)+Rawblocksize); w->blkno = b->blkno; memcpy(&w->io, b->io, Rawblocksize); if(drts.head == nil){ drts.head = drts.tail = w; }else{ drts.tail->next = w; w->prev = drts.tail; drts.tail = w; } drts.n++; if(drts.n == 1) rwakeup(&drts.isempty); b->dirties++; w->iobuf = b; if(chkwunlock(b) == 0){ showbuf(b); panic("putwrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&b)); } if(chatty9p > 4 && b!=nil){ dprint("putwrite done b->blkno %llud\n", b->blkno); stats(); } qunlock(&drts.lck); } void dowrite(Wbuf *p) { u64 n; if(chatty9p > 4){ dprint("dowrite p->blkno %llud\n", p->blkno); stats(); } if(chatty9p > 4) dprint("dowrite p->blkno %llud locked\n", p->blkno); p->io.dirty = 1; if((n = devwrite(p->blkno, &p->payload)) != Rawblocksize){ dprint("%s\n", errstring[Esystem]); panic("error writing block %llud: %llud bytes: %r\n", p->blkno, n); } p->io.dirty = 0; devwritedirtyclear(p->blkno); n = p->blkno; if(chatty9p > 4) dprint("dowrite %llud wunlock()'ed\n", n); wlock(p->iobuf); p->iobuf->dirties--; if(p->iobuf->tofree && p->iobuf->dirties == 0){ p->iobuf->tofree = 0; bfree(&frees, p->iobuf->blkno, 1); } wunlock(p->iobuf); free(p); } void initwriter(void) { Wbuf *b; char name[Namelen]; // set the locks used by the Rendezes drts.isempty.l = &drts.lck; drts.isfull.l = &drts.lck; switch(rfork(RFPROC|RFMEM)){ case -1: panic("can't fork"); case 0: if(chatty9p > 4) dprint("writer started\n"); break; default: return; } procname = name; snprint(name, Namelen, "%s writer", service); procsetname(name); while(stopwrites == 0 || drts.n > 0){ b=getwrite(); if(b != nil) dowrite(b); } if(chatty9p > 4) dprint("%s process exited\n", name); exits(nil); } void stopwriter(void) { u64 n; stopwrites = 1; do{ qlock(&drts.lck); if(chatty9p > 4) dprint("stopwriter drts.n %d\n", drts.n); if(drts.n == 0) rwakeup(&drts.isempty); n = drts.n; qunlock(&drts.lck); if(n == 0) return; else sleep(1000); }while(n > 0); } static void stats(void) { dprint("dirties nwrites %d hd %llud tl %llud\n", drts.n, drts.head == nil ? 0 : drts.head->blkno, drts.tail == nil ? 0 : drts.tail->blkno); } u64 pendingwrites(void) { u64 n; qlock(&drts.lck); n = drts.n; if(chatty9p>4) stats(); qunlock(&drts.lck); return n; } void sync(void) { while(drts.n > 0) sleep(1000); }