ref: babf901b4a508c3ec5d1f89655f10377bbdf9637
dir: /appl/cmd/logfile.b/
implement Logfile;
#
# Copyright © 1999 Vita Nuova Limited. All rights reserved.
#
include "sys.m";
sys: Sys;
stderr: ref Sys->FD;
include "draw.m";
Logfile: module {
init: fn(nil: ref Draw->Context, argv: list of string);
};
Fidrec: adt {
fid: int; # fid of read
rq: list of (int, Sys->Rread); # outstanding read requests
pos: int; # current position in the logfile
};
Circbuf: adt {
start: int;
data: array of byte;
new: fn(size: int): ref Circbuf;
put: fn(b: self ref Circbuf, d: array of byte): int;
get: fn(b: self ref Circbuf, s, n: int): (int, array of byte);
};
Fidhash: adt
{
table: array of list of ref Fidrec;
get: fn(ht: self ref Fidhash, fid: int): ref Fidrec;
put: fn(ht: self ref Fidhash, fidrec: ref Fidrec);
del: fn(ht: self ref Fidhash, fidrec: ref Fidrec);
new: fn(): ref Fidhash;
};
usage()
{
sys->fprint(stderr, "usage: logfile [-size] file\n");
raise "fail: usage";
}
init(nil: ref Draw->Context, argv: list of string)
{
sys = load Sys Sys->PATH;
stderr = sys->fildes(2);
bufsize := Sys->ATOMICIO * 4;
if (argv != nil)
argv = tl argv;
if (argv != nil && len hd argv && (hd argv)[0] == '-' && len hd argv > 1) {
if ((bufsize = int ((hd argv)[1:])) <= 0) {
sys->fprint(stderr, "logfile: can't have a zero buffer size\n");
usage();
}
argv = tl argv;
}
if (argv == nil || tl argv != nil)
usage();
path := hd argv;
(dir, f) := pathsplit(path);
if (sys->bind("#s", dir, Sys->MBEFORE) == -1) {
sys->fprint(stderr, "logfile: bind #s failed: %r\n");
return;
}
fio := sys->file2chan(dir, f);
if (fio == nil) {
sys->fprint(stderr, "logfile: couldn't make %s: %r\n", path);
return;
}
spawn logserver(fio, bufsize);
}
logserver(fio: ref Sys->FileIO, bufsize: int)
{
waitlist: list of ref Fidrec;
readers := Fidhash.new();
availcount := 0;
availchan := chan of int;
workchan := chan of (Sys->Rread, array of byte);
buf := Circbuf.new(bufsize);
for (;;) alt {
<-availchan =>
availcount++;
(nil, count, fid, rc) := <-fio.read =>
r := readers.get(fid);
if (rc == nil) {
if (r != nil)
readers.del(r);
continue;
}
if (r == nil) {
r = ref Fidrec(fid, nil, buf.start);
if (r.pos < len buf.data)
r.pos = len buf.data; # first buffer's worth is garbage
readers.put(r);
}
(s, d) := buf.get(r.pos, count);
r.pos = s + len d;
if (d != nil) {
rc <-= (d, nil);
} else {
if (r.rq == nil)
waitlist = r :: waitlist;
r.rq = (count, rc) :: r.rq;
}
(nil, data, nil, wc) := <-fio.write =>
if (wc == nil)
continue;
if ((n := buf.put(data)) < len data)
wc <-= (n, "write too long for buffer");
else
wc <-= (n, nil);
wl := waitlist;
for (waitlist = nil; wl != nil; wl = tl wl) {
r := hd wl;
if (availcount == 0) {
spawn worker(workchan, availchan);
availcount++;
}
(count, rc) := hd r.rq;
r.rq = tl r.rq;
# optimisation: if the read request wants exactly the data provided
# in the write request, then use the original data buffer.
s: int;
d: array of byte;
if (count >= n && r.pos == buf.start + len buf.data - n)
(s, d) = (r.pos, data);
else
(s, d) = buf.get(r.pos, count);
r.pos = s + len d;
workchan <-= (rc, d);
availcount--;
if (r.rq != nil)
waitlist = r :: waitlist;
d = nil;
}
data = nil;
wl = nil;
}
}
worker(work: chan of (Sys->Rread, array of byte), ready: chan of int)
{
for (;;) {
(rc, data) := <-work; # blocks forever if the reading process is killed
rc <-= (data, nil);
(rc, data) = (nil, nil);
ready <-= 1;
}
}
Circbuf.new(size: int): ref Circbuf
{
return ref Circbuf(0, array[size] of byte);
}
# return number of bytes actually written
Circbuf.put(b: self ref Circbuf, d: array of byte): int
{
blen := len b.data;
# if too big to fit in buffer, truncate the write.
if (len d > blen)
d = d[0:blen];
dlen := len d;
offset := b.start % blen;
if (offset + dlen <= blen) {
b.data[offset:] = d;
} else {
b.data[offset:] = d[0:blen - offset];
b.data[0:] = d[blen - offset:];
}
b.start += dlen;
return dlen;
}
# return (start, data)
Circbuf.get(b: self ref Circbuf, s, n: int): (int, array of byte)
{
# if the beginning's been overrun, start from the earliest place we can.
# we could put some indication of elided bytes in the buffer.
if (s < b.start)
s = b.start;
blen := len b.data;
if (s + n > b.start + blen)
n = b.start + blen - s;
if (n <= 0)
return (s, nil);
o := s % blen;
d := array[n] of byte;
if (o + n <= blen)
d[0:] = b.data[o:o+n];
else {
d[0:] = b.data[o:];
d[blen - o:] = b.data[0:o+n-blen];
}
return (s, d);
}
FIDHASHSIZE: con 32;
Fidhash.new(): ref Fidhash
{
return ref Fidhash(array[FIDHASHSIZE] of list of ref Fidrec);
}
# put an entry in the hash table.
# assumes there is no current entry for the fid.
Fidhash.put(ht: self ref Fidhash, f: ref Fidrec)
{
slot := f.fid & (FIDHASHSIZE-1);
ht.table[slot] = f :: ht.table[slot];
}
Fidhash.get(ht: self ref Fidhash, fid: int): ref Fidrec
{
for (l := ht.table[fid & (FIDHASHSIZE-1)]; l != nil; l = tl l)
if ((hd l).fid == fid)
return hd l;
return nil;
}
Fidhash.del(ht: self ref Fidhash, f: ref Fidrec)
{
slot := f.fid & (FIDHASHSIZE-1);
nl: list of ref Fidrec;
for (l := ht.table[slot]; l != nil; l = tl l)
if ((hd l).fid != f.fid)
nl = (hd l) :: nl;
ht.table[slot] = nl;
}
pathsplit(p: string): (string, string)
{
for (i := len p - 1; i >= 0; i--)
if (p[i] != '/')
break;
if (i < 0)
return (p, nil);
p = p[0:i+1];
for (i = len p - 1; i >=0; i--)
if (p[i] == '/')
break;
if (i < 0)
return (".", p);
return (p[0:i+1], p[i+1:]);
}