ref: 5f15f2246d0fdac0fa98a0a910d40827edfaf68f
dir: /writer.c/
#include "all.h" /* Only 1 writer to maintain the sequence of writes. Else, the directory entry could get written before the directory content by another write process. This creates a mess to recover on a crash. */ /* below is from nemo's Pg 252 */ typedef struct Dirties Dirties; typedef struct Wbuf Wbuf; struct Dirties { QLock lck; /* controls access to this queue */ Wbuf *head, *tail; /* linked list of dirty blocks yet to be written to the disk */ s32 n; /* number of dirty blocks in this linked list */ Rendez isfull; /* write throttling */ Rendez isempty; /* writer does not have to keep polling to find work */ } drts = {0}; struct Wbuf { u64 blkno; /* block number on the disk, primary key */ Wbuf *prev, *next; /* writer queue */ Iobuf *iobuf; /* pointer to the used Iobuf in the buffer cache */ union{ u8 *payload; /* "real" contents */ Content *io; /* cast'able to contents */ }; }; u64 npendingwrites; /* write throttling */ u8 stopwrites = 0; static void stats(void); static Wbuf * pluck(Wbuf *b) { if(b == nil) return nil; else if(b->prev == nil && b->next == nil){ /* only one */ drts.head = drts.tail = nil; goto Done; }else if(b->prev == nil){ /* first in the linked list */ drts.head = b->next; b->next = nil; drts.head->prev = nil; goto Done; }else if(b->prev != nil && b->next != nil){ /* somewhere in the middle */ b->next->prev = b->prev; b->prev->next = b->next; b->prev = b->next = nil; goto Done; }else if(b->next == nil){ /* last in the linked list */ drts.tail = b->prev; b->prev->next = nil; b->prev = nil; goto Done; } panic("pluck should not be here\n"); return nil; // too late, b was written already Done: drts.n--; if(drts.n < 0) panic("drts.n < 0\n"); return b; } /* the Iobuf should be wlock()'ed at entry and until it is placed in the writer queue. It is unlocked after it is placed in writer queue to avoid another process putting it in the writer queue before us. This ensures that the write order is maintained and a newer write being overwritten by older write. */ void putwrite(Iobuf *b) { Wbuf *w; if(chatty9p > 4){ dprint("putwrite start p->blkno %llud\n", b->blkno); stats(); } w = emalloc9p(sizeof(Wbuf)); w->blkno = b->blkno; w->payload = allocmemunit(); memcpy(w->payload, b->xiobuf, Rawblocksize); incref(&b->dirties); w->iobuf = b; qlock(&drts.lck); if(drts.n >= npendingwrites) rsleep(&drts.isfull); if(chkwunlock(b) == 0){ showbuf(b); panic("putwrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&b)); } if(drts.head == nil){ drts.head = drts.tail = w; }else{ drts.tail->next = w; w->prev = drts.tail; drts.tail = w; } drts.n++; if(drts.n == 1) rwakeup(&drts.isempty); qunlock(&drts.lck); if(chatty9p > 4 && b!=nil){ dprint("putwrite done b->blkno %llud\n", b->blkno); stats(); } } void putwrites(Iobuf **bs, u64 len) { Wbuf *w, *ws[Ntogether]; u8 empty; u64 i; Iobuf *b; if(len == 0) return; empty = 0; for(i = 0; i < len; i++){ b = bs[i]; ws[i] = w = emalloc9p(sizeof(Wbuf)); w->blkno = b->blkno; w->payload = allocmemunit(); memcpy(w->payload, b->xiobuf, Rawblocksize); incref(&b->dirties); w->iobuf = b; } qlock(&drts.lck); if(drts.n >= npendingwrites) rsleep(&drts.isfull); if(drts.n == 0) empty = 1; for(i = 0; i < len; i++){ if(chkwunlock(bs[i]) == 0){ showbuf(bs[i]); panic("putwrite chkwunlock(p) == 0 called by %#p\n", getcallerpc(&bs)); } if(drts.head == nil){ drts.head = drts.tail = ws[i]; }else{ drts.tail->next = ws[i]; ws[i]->prev = drts.tail; drts.tail = ws[i]; } drts.n++; } if(empty) rwakeup(&drts.isempty); qunlock(&drts.lck); } /* dirties is decremented without a wlock() on the buffer in dowrite(). Using a wlock() in dowrite() deadlocks with putwrite(). getbuf() guarantees that even a free'ed block cannot be stolen until the dirties == 0. This avoids dirty blocks being stolen by other block numbers. incref(dirties) only happens with a wlock() in putwrite(). */ void dowrite(void) { Wbuf *b, *blks[Ntogether]; u64 prevblkno, startblkno, n, wn, i; u8 full, *jumbo; full = 0; qlock(&drts.lck); if(drts.n == 0){ if(stopwrites){ qunlock(&drts.lck); return; } rsleep(&drts.isempty); if(drts.n == 0 && stopwrites){ qunlock(&drts.lck); return; } } // dprint("dowrite: drts.n %llud\n", drts.n); if(drts.head == nil){ qunlock(&drts.lck); return; } if(drts.n >= npendingwrites) full = 1; if(drts.n > 1){ /* trying to write consecutive blocks with a write() call */ n = 1; prevblkno = startblkno = drts.head->blkno; for(b = drts.head->next; n <= drts.n && b != nil && b->blkno == prevblkno+1 && n < Ntogether; b = b->next){ prevblkno=b->blkno; n++; } if(n > 1){ for(i = 0; i < n; i++) blks[i] = pluck(drts.head); if(full && drts.n < npendingwrites) rwakeup(&drts.isfull); // dprint("dowrite: at return drts.n %llud\n", drts.n); qunlock(&drts.lck); if(chatty9p > 4){ if(b != nil) dprint("getwrite jumbo drts.n %llud > 1 n %llud start %llud next %llud\n", drts.n, n, startblkno, b->blkno); else dprint("getwrite jumbo drts.n %llud > 1 n %llud start %llud\n", drts.n, n, startblkno); } jumbo = emalloc9p(n*Rawblocksize); for(i = 0; i < n; i++){ b = blks[i]; memcpy(jumbo+(i*Rawblocksize), b->payload, Rawblocksize); if(chatty9p > 4 && b!=nil) dprint("getwrite b->blkno %llud\n", b->blkno); } if((wn = devwrites(startblkno, jumbo, n)) != n*Rawblocksize){ dprint("%s\n", errstring[Esystem]); panic("error writing jumbo block %llud: %llud bytes, written %llud: %r\n", startblkno, n, wn); } free(jumbo); for(i = 0; i < n; i++){ b = blks[i]; if(chatty9p > 4) dprint("dowrite %llud wunlock()'ed\n", b->blkno); decref(&b->iobuf->dirties); freememunit(b->payload); free(b); blks[i] = 0; } return; }else goto single; }else{ single: b = pluck(drts.head); if(full && drts.n < npendingwrites) rwakeup(&drts.isfull); qunlock(&drts.lck); if(chatty9p > 4 && b!=nil) dprint("getwrite done b->blkno %llud\n", b->blkno); if((n = devwrite(b->blkno, b->payload)) != Rawblocksize){ dprint("%s\n", errstring[Esystem]); panic("error writing block %llud: %llud bytes: %r\n", b->blkno, n); } if(chatty9p > 4) dprint("dowrite %llud wunlock()'ed\n", b->blkno); decref(&b->iobuf->dirties); freememunit(b->payload); free(b); } if(chatty9p > 4 && b!=nil) stats(); } void initwriter(void) { char name[Namelen]; // set the locks used by the Rendezes drts.isempty.l = &drts.lck; drts.isfull.l = &drts.lck; switch(rfork(RFPROC|RFMEM)){ case -1: panic("can't fork"); case 0: if(chatty9p > 4) dprint("writer started\n"); break; default: return; } procname = name; snprint(name, Namelen, "%s writer", service); procsetname(name); while(stopwrites == 0 || drts.n > 0){ dowrite(); } if(chatty9p > 4) dprint("%s process exited\n", name); exits(nil); } void stopwriter(void) { u64 n; stopwrites = 1; do{ qlock(&drts.lck); if(chatty9p > 4) dprint("stopwriter drts.n %d\n", drts.n); if(drts.n == 0) rwakeup(&drts.isempty); n = drts.n; qunlock(&drts.lck); if(n == 0) return; else sleep(1000); }while(n > 0); } static void stats(void) { dprint("dirties nwrites %d hd %llud tl %llud\n", drts.n, drts.head == nil ? 0 : drts.head->blkno, drts.tail == nil ? 0 : drts.tail->blkno); } u64 pendingwrites(void) { u64 n; qlock(&drts.lck); n = drts.n; if(chatty9p>4) stats(); qunlock(&drts.lck); return n; } void sync(void) { while(drts.n > 0) sleep(1000); }