code: 9ferno

ref: 6d69f6fba35087686f79adb2ea0d67944a62ca7b
dir: /appl/lib/styxflush.b/

View raw version
implement Styxflush;
include "sys.m";
	sys: Sys;
include "tables.m";
	tables: Tables;
	Table: import tables;
include "styx.m";
	styx: Styx;
	Tmsg, Rmsg: import styx;
include "styxflush.m";

reqs: ref Table[ref Req];
Req: adt {
	m: ref Tmsg;
	flushc: chan of (int, chan of int);
	oldreq: cyclic ref Req;
	flushes: cyclic ref Req;		# flushes queued on this req.
	nextflush: cyclic ref Req;		# (flush only) next req in flush queue.
	flushready: chan of int;		# (flush only) wait for flush attempt.
	flushing: int;				# request is subject of a flush.
	finished: chan of int;			# [1]; signals finish to late flushers.
	responded: int;
};

init()
{
	sys = load Sys Sys->PATH;
	tables = load Tables Tables->PATH;
	styx = load Styx Styx->PATH;
	styx->init();

	reqs = Table[ref Req].new(11, nil);
}

tmsg(gm: ref Styx->Tmsg, flushc: chan of (int, chan of int), reply: chan of ref Styx->Rmsg): (int, ref Rmsg)
{
	req := ref Req(
		gm,
		flushc,				# flushc
		nil,					# oldreq
		nil,					# flushes
		nil,					# nextflush
		nil,					# flushready
		0,					# flushing
		chan[1] of int,			# finished
		0					# responded
	);
	if(reqs.add(gm.tag, req) == 0)
		return (1, ref Rmsg.Error(gm.tag, "duplicate tag"));
	pick m := gm {
	Flush =>
		req.oldreq = reqs.find(m.oldtag);
		if(req.oldreq == nil)
			return (1, ref Rmsg.Flush(m.tag));
		addflush(req);
		req.flushc = chan of (int, chan of int);
		spawn flushreq(req, reply);
		return (1, nil);
	}
	return (0, nil);
}

rmsg(rm: ref Styx->Rmsg): int
{
	req := reqs.find(rm.tag);
	if(req == nil){
		complain("req has disappeared, reply "+rm.text());
		return 0;
	}
	reqs.del(rm.tag);
	if(tagof rm == tagof Rmsg.Flush)
		delflush(req);
	if(req.flushing)
		req.finished <-= 1;
	req.responded = 1;
	pick m := rm {
	Error =>
		if(m.ename == Einterrupted){
			if(!req.flushing)
				complain("interrupted reply but no flush "+req.m.text());
			return 0;
		}
	}
	return 1;
}

addflush(req: ref Req)
{
	o := req.oldreq;
	for(r := o.flushes; r != nil; r = r.nextflush)
		if(r.nextflush == nil)
			break;
	if(r == nil){
		o.flushes = req;
		req.flushready = nil;
	}else{
		r.nextflush = req;
		req.flushready = chan of int;
	}
	o.flushing = 1;
}

# remove req (a flush request) from the list of flushes pending
# for req.oldreq. if it was at the head of the list, then give
# the next req a go.
delflush(req: ref Req)
{
	oldreq := req.oldreq;
	prev: ref Req;
	for(r := oldreq.flushes; r != nil; r = r.nextflush){
		if(r == req)
			break;
		prev = r;
	}
	if(prev == nil){
		oldreq.flushes = r.nextflush;
		if(oldreq.flushes != nil)
			oldreq.flushes.flushready <-= 1;
	}else
		prev.nextflush = r.nextflush;
	r.nextflush = nil;
}

flushreq(req: ref Req, reply: chan of ref Styx->Rmsg)
{
	o := req.oldreq;
	# if we're queued up, wait our turn.
	if(req.flushready != nil)
		<-req.flushready;
	rc := chan of int;
	alt{
	o.flushc <-= (req.m.tag, rc) =>
		<-rc;
		reply <-= ref Rmsg.Flush(req.m.tag);
		# old request must have responded before sending on rc,
		# but be defensive because it's easy to forget.
		if(!o.responded){
			complain("flushed request not responded to: "+o.m.text());
			o.responded = 1;		# race but better than nothing.
		}
	(nil, nrc)  := <-req.flushc =>
		reply <-= ref Rmsg.Error(req.m.tag, Einterrupted);
		nrc <-= 1;
	<-o.finished =>
		o.finished <-= 1;
		reply <-= ref Rmsg.Flush(req.m.tag);
	}
}

complain(e: string)
{
	sys->fprint(sys->fildes(2), "styxflush: warning: %s\n", e);
}