git: 9front

ref: 882ec4ba3d42f15e630119e1f43e5400123539b5
dir: /sys/src/cmd/venti/srv/wrarena.c/

View raw version
#include "stdinc.h"
#include "dat.h"
#include "fns.h"
#include "whack.h"

#define NWORKERS 0
#define NSENDERS 4
#define	U32GET(p)	((u32int)(((p)[0]<<24)|((p)[1]<<16)|((p)[2]<<8)|(p)[3]))
#define	U64GET(p)	(((u64int)U32GET(p)<<32)|(u64int)U32GET((p)+4))

char *host;
int readonly = 1;	/* for part.c */
int mainstacksize = 256*1024;
Channel *c;
VtConn *z;
Arena *arena;
uchar *data;
Channel *fin;
u64int arenaend;
int haveaoffset;
int maxwrites = -1;
int verbose;

typedef struct ZClump ZClump;
struct ZClump
{
	uchar *data;
	Clump cl;
	u64int aa;
};

static int
fastreadclumpinfo(u64int aa, Clump *cl)
{
	u32int n = ClumpSize;
	if(aa + n > arenaend){
		werrstr("clump extends past arena");
		return 0;
	}
	if(unpackclump(cl, &data[aa], arena->clumpmagic) < 0)
		return 0;
	return 1;
}

static uchar*
readcompressedclump(u64int aa, Clump *cl)
{
	Unwhack uw;
	int nunc;
	uchar *buf;
	buf = malloc(cl->info.uncsize);
	unwhackinit(&uw);
	nunc = unwhack(&uw, buf, cl->info.uncsize, &data[aa + ClumpSize], cl->info.size);
	if(nunc != cl->info.uncsize){
		if(nunc < 0)
			sysfatal("decompression of %llud failed: %s", aa, uw.err);
		else
			sysfatal("decompression of %llud gave partial block: %d/%d\n", aa, nunc, cl->info.uncsize);
	}
	return buf;
}

static void
verifyclump(u64int aa, uchar *buf, Clump *cl)
{
	u8int bh[VtScoreSize];
	scoremem(bh, buf, cl->info.uncsize);
	if(scorecmp(cl->info.score, bh) != 0)
		sysfatal("clump corrupt at %s %llud; expected=%V got=%V", arena->name, aa, cl->info.score, bh);
	if(vttypevalid(cl->info.type) < 0)
		sysfatal("loading lump at %s %llud: invalid lump type %d", arena->name, aa, cl->info.type);
}

static void
ensurenotcorrupt(u64int aa, Clump *cl)
{
	if(cl->info.type == VtCorruptType)
		sysfatal("clump %lld corrupt", aa);
}

static uchar*
fastloadclump(u64int aa, Clump *cl, u8int *score)
{
	uchar *buf = nil;
	fastreadclumpinfo(aa, cl);
	ensurenotcorrupt(aa, cl);
	scorecp(score, cl->info.score);
	if(cl->encoding == ClumpECompress)
		buf = readcompressedclump(aa, cl);
	else if (cl->encoding == ClumpENone)
		buf = &data[aa + ClumpSize];
	else
		sysfatal("Unrecognized encoding %d for clump %llx", cl->encoding, aa);
	verifyclump(aa, buf, cl);
	return buf;
}

void
usage(void)
{
	fprint(2, "usage: wrarena [-h host] arenafile [offset]\n");
	threadexitsall("usage");
}

void
vtsendthread(void *v)
{
	ZClump zcl;

	USED(v);
	while(recv(c, &zcl) == 1){
		if(vtwrite(z, zcl.cl.info.score, zcl.cl.info.type, zcl.data, zcl.cl.info.uncsize) < 0)
			sysfatal("failed writing clump %llud: %r", zcl.aa);
		if(zcl.cl.encoding == ClumpECompress)
			free(zcl.data);
		if(verbose)
			fprint(2, "%V\n", zcl.cl.info.score);
	}
	send(fin, nil);
	threadexits(nil);
}

static int
directreadci(Arena *arena, int clump, ClumpInfo *ci)
{
	u32int block, off;
	if(clump >= arena->memstats.clumps){
		seterr(EOk, "clump out of range");
		return -1;
	}
	block = clump / arena->clumpmax;
	off = (clump - (block * arena->clumpmax)) * ClumpInfoSize;
	unpackclumpinfo(ci, &data[arena->size - (block+1)*arena->blocksize + off]);
	return 0;
}

static void
rdarenanew(u64int offset)
{
	int i;
	u64int a, aa, e;
	ClumpInfo ci;
	ZClump zcl;
	uchar score[VtScoreSize];

	fprint(2, "wrarena: copying %s to venti\n", arena->name);
	printarena(2, arena);

	a = arena->base;
	e = arena->base + arena->size;
	if(offset != ~(u64int)0) {
		if(offset >= e - a)
			sysfatal("bad offset %#llx >= %#llx", offset, e - a);
		aa = offset;
	} else
		aa = 0;

	i = 0;
	for(a = 0; i < arena->memstats.clumps;
	    a += ClumpSize + ci.size){
		if(directreadci(arena, i++, &ci) < 0)
			break;
		if(a < aa)
			continue;
		zcl.data = fastloadclump(a, &zcl.cl, score);
		if(z && zcl.data)
			send(c, &zcl);
		if(ci.type == VtCorruptType){
			fprint(2, "%s: corrupt clump read at %#llx: +%d\n",
					argv0, a, ClumpSize+ci.size);
			continue;
		}
		if(maxwrites > 0)
			--maxwrites;
	}
	if(a > aa)
		aa = a;
	if(haveaoffset)
		print("%#llx", aa);
}

void
threadmain(int argc, char *argv[])
{
	int i;
	char *file;
	u64int offset, aoffset;
	Part *part;
	uchar buf[2][512];
	ArenaHead head;

	aoffset = 0;
	ARGBEGIN{
	case 'f':
		ventidoublechecksha1 = 0;
		break;
	case 'h':
		host = EARGF(usage());
		break;
	case 'o':
		haveaoffset = 1;
		aoffset = strtoull(EARGF(usage()), 0, 0);
		break;
	case 'M':
		maxwrites = atoi(EARGF(usage()));
		break;
	case 'v':
		verbose = 1;
		break;
	default:
		usage();
		break;
	}ARGEND

	offset = ~(u64int)0;
	switch(argc) {
	default:
		usage();
	case 2:
		offset = strtoull(argv[1], 0, 0);
		/* fall through */
	case 1:
		file = argv[0];
	}

	part = initpart(file, OREAD);
	if(part == nil)
		sysfatal("can't open file %s: %r", file);
	if(readpart(part, aoffset, buf[0], 512) < 0)
		sysfatal("can't read file %s: %r", file);

	head.blocksize = U32GET(buf[0] + 2 * U32Size + ANameSize);
	head.size = U64GET(buf[0] + 3 * U32Size + ANameSize);
	namecp(head.name, (char*)buf[0] + 2 * U32Size);
	fprint(2, "processing %s...\n", head.name);

	if(aoffset+head.size > part->size)
		sysfatal("arena is truncated: want %llud bytes have %llud", head.size, part->size);
	
	if(readpart(part, aoffset - head.blocksize + head.size, buf[1], 512) >= 0){
		u32int version = U32GET(buf[1] + U32Size);
		u8int *addr = buf[1] + 6 * U32Size + ANameSize;
		if(version == 5)
			addr += U32Size;
		if(offset == U64GET(addr))
			threadexitsall(0);
	}

	ventifmtinstall();
	fmtinstall('V', vtscorefmt);

	if(unpackarenahead(&head, buf[0]) < 0)
		sysfatal("corrupted arena header: %r");

	arena = initarena(part, aoffset, head.size, head.blocksize);
	if(arena == nil)
		sysfatal("initarena: %r");
	arenaend = arena->size - arenadirsize(arena, arena->memstats.clumps);

	data = malloc(arena->size);
	if(data == nil)
		sysfatal("malloc failed");
	uvlong before = nsec();
	if(readpart(part, arena->base, data, arena->size) < 0){
		sysfatal("failed to cache arena in memory");
	}
	uvlong after = nsec() - before;
	fprint(2, "cached arena in %fs\n", ((double)after)/1000000000);

	z = nil;
	if(host==nil || strcmp(host, "/dev/null") != 0){
		z = vtdial(host);
		if(z == nil)
			sysfatal("could not connect to server: %r");
		if(vtconnect(z) < 0)
			sysfatal("vtconnect: %r");
	}
	
	c = chancreate(sizeof(ZClump), NSENDERS * 100);
	fin = chancreate(1, NWORKERS + NSENDERS);
	for(i=0; i<NSENDERS; i++)
		vtproc(vtsendthread, nil);

	rdarenanew(offset);
	for(i = 0; i < NWORKERS; i += 1)
		recv(fin, nil);
	chanclose(c);
	for(i = 0; i < NSENDERS; i += 1)
		recv(fin, nil);
	fprint(2, "syncing... ");
	if(vtsync(z) < 0)
		sysfatal("executing sync: %r");
	fprint(2, " hanging up... ");
	if(z)
		vthangup(z);
	
	fprint(2, "done!\n");

	threadexitsall(0);
}