git: 9front

Download patch

ref: 9ac01353657e112f9b9edb8dbbc4dbb0c8ce9604
parent: 3399b03de98a64353796820dfdd74a08fa246be5
author: Noam Preil <noam@pixelhero.dev>
date: Fri Nov 14 04:08:32 EST 2025

venti/wrarena: overkill performance!

Old data is now processed insanely fast - wrarena will detect fully
synced, sealed arenas immediately, and will exit after reading 1KiB
of data.

New data is also a bit faster - a total ~25-30x uplift over 9front
upstream due to the parallelization and reduction of overhead. Most
of the improvement is just with old data, though.

Since ventis can be synced regularly, this means that it's now fast
enough that they can reasonably be synced every minute instead of every
day, so long as the venti is below about 20TiB of old data.

--- a/sys/src/cmd/venti/srv/wrarena.c
+++ b/sys/src/cmd/venti/srv/wrarena.c
@@ -1,14 +1,22 @@
 #include "stdinc.h"
 #include "dat.h"
 #include "fns.h"
+#include "whack.h"
 
-QLock godot;
+#define NWORKERS 0
+#define NSENDERS 4
+#define	U32GET(p)	((u32int)(((p)[0]<<24)|((p)[1]<<16)|((p)[2]<<8)|(p)[3]))
+#define	U64GET(p)	(((u64int)U32GET(p)<<32)|(u64int)U32GET((p)+4))
+
 char *host;
 int readonly = 1;	/* for part.c */
 int mainstacksize = 256*1024;
 Channel *c;
 VtConn *z;
-int fast;	/* and a bit unsafe; only for benchmarking */
+Arena *arena;
+uchar *data;
+Channel *fin;
+u64int arenaend;
 int haveaoffset;
 int maxwrites = -1;
 int verbose;
@@ -16,11 +24,77 @@
 typedef struct ZClump ZClump;
 struct ZClump
 {
-	ZBlock *lump;
+	uchar *data;
 	Clump cl;
 	u64int aa;
 };
 
+static int
+fastreadclumpinfo(u64int aa, Clump *cl)
+{
+	u32int n = ClumpSize;
+	if(aa + n > arenaend){
+		werrstr("clump extends past arena");
+		return 0;
+	}
+	if(unpackclump(cl, &data[aa], arena->clumpmagic) < 0)
+		return 0;
+	return 1;
+}
+
+static uchar*
+readcompressedclump(u64int aa, Clump *cl)
+{
+	Unwhack uw;
+	int nunc;
+	uchar *buf;
+	buf = malloc(cl->info.uncsize);
+	unwhackinit(&uw);
+	nunc = unwhack(&uw, buf, cl->info.uncsize, &data[aa + ClumpSize], cl->info.size);
+	if(nunc != cl->info.uncsize){
+		if(nunc < 0)
+			sysfatal("decompression of %llud failed: %s", aa, uw.err);
+		else
+			sysfatal("decompression of %llud gave partial block: %d/%d\n", aa, nunc, cl->info.uncsize);
+	}
+	return buf;
+}
+
+static void
+verifyclump(u64int aa, uchar *buf, Clump *cl)
+{
+	u8int bh[VtScoreSize];
+	scoremem(bh, buf, cl->info.uncsize);
+	if(scorecmp(cl->info.score, bh) != 0)
+		sysfatal("clump corrupt at %s %llud; expected=%V got=%V", arena->name, aa, cl->info.score, bh);
+	if(vttypevalid(cl->info.type) < 0)
+		sysfatal("loading lump at %s %llud: invalid lump type %d", arena->name, aa, cl->info.type);
+}
+
+static void
+ensurenotcorrupt(u64int aa, Clump *cl)
+{
+	if(cl->info.type == VtCorruptType)
+		sysfatal("clump %lld corrupt", aa);
+}
+
+static uchar*
+fastloadclump(u64int aa, Clump *cl, u8int *score)
+{
+	uchar *buf = nil;
+	fastreadclumpinfo(aa, cl);
+	ensurenotcorrupt(aa, cl);
+	scorecp(score, cl->info.score);
+	if(cl->encoding == ClumpECompress)
+		buf = readcompressedclump(aa, cl);
+	else if (cl->encoding == ClumpENone)
+		buf = &data[aa + ClumpSize];
+	else
+		sysfatal("Unrecognized encoding %d for clump %llx", cl->encoding, aa);
+	verifyclump(aa, buf, cl);
+	return buf;
+}
+
 void
 usage(void)
 {
@@ -35,38 +109,39 @@
 
 	USED(v);
 	while(recv(c, &zcl) == 1){
-		if(zcl.lump == nil)
-			break;
-		if(vtwrite(z, zcl.cl.info.score, zcl.cl.info.type, zcl.lump->data, zcl.cl.info.uncsize) < 0)
+		if(vtwrite(z, zcl.cl.info.score, zcl.cl.info.type, zcl.data, zcl.cl.info.uncsize) < 0)
 			sysfatal("failed writing clump %llud: %r", zcl.aa);
+		if(zcl.cl.encoding == ClumpECompress)
+			free(zcl.data);
 		if(verbose)
-			print("%V\n", zcl.cl.info.score);
-		freezblock(zcl.lump);
+			fprint(2, "%V\n", zcl.cl.info.score);
 	}
-	/*
-	 * All the send threads try to exit right when
-	 * threadmain is calling threadexitsall.  
-	 * Either libthread or the Linux NPTL pthreads library
-	 * can't handle this condition (I suspect NPTL but have
-	 * not confirmed this) and we get a seg fault in exit.
-	 * I spent a day tracking this down with no success,
-	 * so we're going to work around it instead by just
-	 * sitting here and waiting for the threadexitsall to
-	 * take effect.
-	 */
-	qlock(&godot);
+	send(fin, nil);
+	threadexits(nil);
 }
 
+static int
+directreadci(Arena *arena, int clump, ClumpInfo *ci)
+{
+	u32int block, off;
+	if(clump >= arena->memstats.clumps){
+		seterr(EOk, "clump out of range");
+		return -1;
+	}
+	block = clump / arena->clumpmax;
+	off = (clump - (block * arena->clumpmax)) * ClumpInfoSize;
+	unpackclumpinfo(ci, &data[arena->size - (block+1)*arena->blocksize + off]);
+	return 0;
+}
+
 static void
-rdarena(Arena *arena, u64int offset)
+rdarenanew(u64int offset)
 {
 	int i;
 	u64int a, aa, e;
-	uchar score[VtScoreSize];
-	Clump cl;
 	ClumpInfo ci;
-	ZBlock *lump;
 	ZClump zcl;
+	uchar score[VtScoreSize];
 
 	fprint(2, "wrarena: copying %s to venti\n", arena->name);
 	printarena(2, arena);
@@ -81,41 +156,20 @@
 		aa = 0;
 
 	i = 0;
-	for(a = 0; maxwrites != 0 && i < arena->memstats.clumps;
+	for(a = 0; i < arena->memstats.clumps;
 	    a += ClumpSize + ci.size){
-		if(readclumpinfo(arena, i++, &ci) < 0)
+		if(directreadci(arena, i++, &ci) < 0)
 			break;
-		if(a < aa || ci.type == VtCorruptType){
-			if(ci.type == VtCorruptType)
-				fprint(2, "%s: corrupt clump read at %#llx: +%d\n",
+		if(a < aa)
+			continue;
+		zcl.data = fastloadclump(a, &zcl.cl, score);
+		if(z && zcl.data)
+			send(c, &zcl);
+		if(ci.type == VtCorruptType){
+			fprint(2, "%s: corrupt clump read at %#llx: +%d\n",
 					argv0, a, ClumpSize+ci.size);
 			continue;
 		}
-		lump = loadclump(arena, a, 0, &cl, score, 0);
-		if(lump == nil) {
-			fprint(2, "clump %#llx failed to read: %r\n", a);
-			continue;
-		}
-		if(!fast && cl.info.type != VtCorruptType) {
-			scoremem(score, lump->data, cl.info.uncsize);
-			if(scorecmp(cl.info.score, score) != 0) {
-				fprint(2, "clump %#llx has mismatched score\n",
-					a);
-				break;
-			}
-			if(vttypevalid(cl.info.type) < 0) {
-				fprint(2, "clump %#llx has bad type %d\n",
-					a, cl.info.type);
-				break;
-			}
-		}
-		if(z && cl.info.type != VtCorruptType){
-			zcl.cl = cl;
-			zcl.lump = lump;
-			zcl.aa = a;
-			send(c, &zcl);
-		}else
-			freezblock(lump);
 		if(maxwrites > 0)
 			--maxwrites;
 	}
@@ -122,7 +176,7 @@
 	if(a > aa)
 		aa = a;
 	if(haveaoffset)
-		print("end offset %#llx\n", aa);
+		print("%#llx", aa);
 }
 
 void
@@ -130,19 +184,14 @@
 {
 	int i;
 	char *file;
-	Arena *arena;
 	u64int offset, aoffset;
 	Part *part;
-	uchar buf[8192];
+	uchar buf[2][512];
 	ArenaHead head;
-	ZClump zerocl;
 
-	ventifmtinstall();
-	qlock(&godot);
 	aoffset = 0;
 	ARGBEGIN{
 	case 'f':
-		fast = 1;
 		ventidoublechecksha1 = 0;
 		break;
 	case 'h':
@@ -174,30 +223,50 @@
 		file = argv[0];
 	}
 
-	fmtinstall('V', vtscorefmt);
-
-	statsinit();
-
 	part = initpart(file, OREAD);
 	if(part == nil)
 		sysfatal("can't open file %s: %r", file);
-	if(readpart(part, aoffset, buf, sizeof buf) < 0)
+	if(readpart(part, aoffset, buf[0], 512) < 0)
 		sysfatal("can't read file %s: %r", file);
 
-	if(unpackarenahead(&head, buf) < 0)
-		sysfatal("corrupted arena header: %r");
+	head.blocksize = U32GET(buf[0] + 2 * U32Size + ANameSize);
+	head.size = U64GET(buf[0] + 3 * U32Size + ANameSize);
+	namecp(head.name, (char*)buf[0] + 2 * U32Size);
+	fprint(2, "processing %s...\n", head.name);
 
 	if(aoffset+head.size > part->size)
-		sysfatal("arena is truncated: want %llud bytes have %llud",
-			head.size, part->size);
+		sysfatal("arena is truncated: want %llud bytes have %llud", head.size, part->size);
+	
+	if(readpart(part, aoffset - head.blocksize + head.size, buf[1], 512) >= 0){
+		u32int version = U32GET(buf[1] + U32Size);
+		u8int *addr = buf[1] + 6 * U32Size + ANameSize;
+		if(version == 5)
+			addr += U32Size;
+		if(offset == U64GET(addr))
+			threadexitsall(0);
+	}
 
-	partblocksize(part, head.blocksize);
-	initdcache(8 * MaxDiskBlock);
+	ventifmtinstall();
+	fmtinstall('V', vtscorefmt);
 
+	if(unpackarenahead(&head, buf[0]) < 0)
+		sysfatal("corrupted arena header: %r");
+
 	arena = initarena(part, aoffset, head.size, head.blocksize);
 	if(arena == nil)
 		sysfatal("initarena: %r");
+	arenaend = arena->size - arenadirsize(arena, arena->memstats.clumps);
 
+	data = malloc(arena->size);
+	if(data == nil)
+		sysfatal("malloc failed");
+	uvlong before = nsec();
+	if(readpart(part, arena->base, data, arena->size) < 0){
+		sysfatal("failed to cache arena in memory");
+	}
+	uvlong after = nsec() - before;
+	fprint(2, "cached arena in %fs\n", ((double)after)/1000000000);
+
 	z = nil;
 	if(host==nil || strcmp(host, "/dev/null") != 0){
 		z = vtdial(host);
@@ -207,19 +276,25 @@
 			sysfatal("vtconnect: %r");
 	}
 	
-	c = chancreate(sizeof(ZClump), 0);
-	for(i=0; i<12; i++)
+	c = chancreate(sizeof(ZClump), NSENDERS * 100);
+	fin = chancreate(1, NWORKERS + NSENDERS);
+	for(i=0; i<NSENDERS; i++)
 		vtproc(vtsendthread, nil);
 
-	rdarena(arena, offset);
+	rdarenanew(offset);
+	for(i = 0; i < NWORKERS; i += 1)
+		recv(fin, nil);
+	chanclose(c);
+	for(i = 0; i < NSENDERS; i += 1)
+		recv(fin, nil);
+	fprint(2, "syncing... ");
 	if(vtsync(z) < 0)
 		sysfatal("executing sync: %r");
-
-	memset(&zerocl, 0, sizeof zerocl);
-	for(i=0; i<12; i++)
-		send(c, &zerocl);
-	if(z){
+	fprint(2, " hanging up... ");
+	if(z)
 		vthangup(z);
-	}
+	
+	fprint(2, "done!\n");
+
 	threadexitsall(0);
 }
--