ref: babf901b4a508c3ec5d1f89655f10377bbdf9637
dir: /appl/cmd/iostats.b/
implement Iostats;
#
# iostats - gather file system access statistics
#
include "sys.m";
sys: Sys;
Qid: import sys;
include "draw.m";
include "styx.m";
styx: Styx;
Tmsg, Rmsg, NOFID, NOTAG: import styx;
include "workdir.m";
workdir: Workdir;
include "sh.m";
include "arg.m";
Iostats: module
{
init: fn(nil: ref Draw->Context, nil: list of string);
};
Maxmsg: con 128*1024+Styx->IOHDRSZ;
Ns2ms: con big 1000000;
Rpc: adt
{
name: string;
count: big;
time: big;
lo: big;
hi: big;
bin: big;
bout: big;
};
Stats: adt
{
totread: big;
totwrite: big;
nrpc: int;
nproto: int;
rpc: array of ref Rpc; # Maxrpc
};
Fid: adt {
nr: int; # fid number
path: ref Path; # path used to open Fid
qid: Qid;
mode: int;
nread: big;
nwrite: big;
bread: big;
bwrite: big;
offset: big; # for directories
};
Path: adt {
parent: cyclic ref Path;
name: string;
};
Frec: adt
{
op: ref Path; # first name?
qid: Qid;
nread: big;
nwrite: big;
bread: big;
bwrite: big;
opens: int;
};
Tag: adt {
m: ref Tmsg;
fid: ref Fid;
stime: big;
next: cyclic ref Tag;
};
NTAGHASH: con 1<<4; # power of 2
NFIDHASH: con 1<<4; # power of 2
tags := array[NTAGHASH] of ref Tag;
fids := array[NFIDHASH] of list of ref Fid;
dbg := 0;
stats: Stats;
frecs: list of ref Frec;
replymap := array[tagof Rmsg.Stat+1] of {
tagof Rmsg.Version => tagof Tmsg.Version,
tagof Rmsg.Auth => tagof Tmsg.Auth,
tagof Rmsg.Attach => tagof Tmsg.Attach,
tagof Rmsg.Flush => tagof Tmsg.Flush,
tagof Rmsg.Clunk => tagof Tmsg.Clunk,
tagof Rmsg.Remove => tagof Tmsg.Remove,
tagof Rmsg.Wstat => tagof Tmsg.Wstat,
tagof Rmsg.Walk => tagof Tmsg.Walk,
tagof Rmsg.Create => tagof Tmsg.Create,
tagof Rmsg.Open => tagof Tmsg.Open,
tagof Rmsg.Read => tagof Tmsg.Read,
tagof Rmsg.Write => tagof Tmsg.Write,
tagof Rmsg.Stat => tagof Tmsg.Stat,
* => -1,
};
init(ctxt: ref Draw->Context, args: list of string)
{
sys = load Sys Sys->PATH;
workdir = load Workdir Workdir->PATH;
sh := load Sh Sh->PATH;
styx = load Styx Styx->PATH;
styx->init();
wd := workdir->init();
dbfile := "iostats.out";
arg := load Arg Arg->PATH;
arg->init(args);
arg->setusage("iostats [-d] [-f debugfile] cmds [args ...]");
while((o := arg->opt()) != 0)
case o {
'd' => dbg++;
'f' => dbfile = arg->earg();
* => arg->usage();
}
args = arg->argv();
if(args == nil)
arg->usage();
arg = nil;
sys->pctl(Sys->FORKFD|Sys->FORKNS|Sys->NEWPGRP|Sys->FORKENV, nil);
if(dbg){
fd := sys->create(dbfile, Sys->OWRITE, 8r666);
if(fd == nil)
fatal(sys->sprint("can't create %q: %r", dbfile));
sys->dup(fd.fd, 2);
}
if(sys->chdir("/") < 0)
fatal(sys->sprint("chdir /: %r"));
stats.totread = big 0;
stats.totwrite = big 0;
stats.nrpc = 0;
stats.nproto = 0;
stats.rpc = array[tagof Tmsg.Wstat + 1] of ref Rpc;
stats.rpc[tagof Tmsg.Version] = mkrpc("version");
stats.rpc[tagof Tmsg.Auth] = mkrpc("auth");
stats.rpc[tagof Tmsg.Flush] = mkrpc("flush");
stats.rpc[tagof Tmsg.Attach] = mkrpc("attach");
stats.rpc[tagof Tmsg.Walk] = mkrpc("walk");
stats.rpc[tagof Tmsg.Open] = mkrpc("open");
stats.rpc[tagof Tmsg.Create] = mkrpc("create");
stats.rpc[tagof Tmsg.Clunk] = mkrpc("clunk");
stats.rpc[tagof Tmsg.Read] = mkrpc("read");
stats.rpc[tagof Tmsg.Write] = mkrpc("write");
stats.rpc[tagof Tmsg.Remove] = mkrpc("remove");
stats.rpc[tagof Tmsg.Stat] = mkrpc("stat");
stats.rpc[tagof Tmsg.Wstat] = mkrpc("wstat");
mpipe := array[2] of ref Sys->FD;
if(sys->pipe(mpipe) < 0)
fatal(sys->sprint("can't create pipe: %r"));
pids := chan of int;
cmddone := chan of int;
spawn cmd(sh, ctxt, args, wd, mpipe[0], pids, cmddone);
<-pids;
mpipe[0] = nil;
epipe := array[2] of ref Sys->FD;
if(sys->pipe(epipe) < 0)
fatal(sys->sprint("can't create pipe: %r"));
spawn export(epipe[1], pids);
<-pids;
epipe[1] = nil;
iodone := chan of int;
spawn iostats(epipe[0], mpipe[1], pids, iodone);
<-pids;
epipe[0] = mpipe[1] = nil;
<-cmddone;
<-iodone;
results();
}
cmd(sh: Sh, ctxt: ref Draw->Context, args: list of string, wdir: string, fsfd: ref Sys->FD, pids: chan of int, done: chan of int)
{
{
pids <-= sys->pctl(Sys->FORKNS|Sys->FORKFD, nil);
if(sys->mount(fsfd, nil, "/", Sys->MREPL, "") < 0)
fatal(sys->sprint("can't mount /: %r"));
fsfd = nil;
sys->bind("#e", "/env", Sys->MREPL | Sys->MCREATE);
sys->bind("#d", "/fd", Sys->MREPL); # better than nothing
if(sys->chdir(wdir) < 0)
fatal(sys->sprint("can't chdir to %s: %r", wdir));
sh->run(ctxt, args);
}exception{
"fail:*" =>
; # don't mention it
* =>
raise; # cause the fault
}
done <-= 1;
}
iostats(expfd: ref Sys->FD, mountfd: ref Sys->FD, pids: chan of int, done: chan of int)
{
pids <-= sys->pctl(Sys->NEWFD|Sys->NEWPGRP, 1 :: 2 :: expfd.fd :: mountfd.fd :: nil);
timefd := sys->open("/dev/time", Sys->OREAD);
if(timefd == nil)
fatal(sys->sprint("can't open /dev/time: %r"));
tmsgs := chan of (int, ref Tmsg);
spawn Treader(mountfd, expfd, tmsgs);
(tpid, nil) := <-tmsgs;
rmsgs := chan of (int, ref Rmsg);
spawn Rreader(expfd, mountfd, rmsgs);
(rpid, nil) := <-rmsgs;
expfd = mountfd = nil;
stderr := sys->fildes(2);
Run:
for(;;)alt{
(n, t) := <-tmsgs => # n.b.: received on tmsgs before it goes to server
if(t == nil || tagof t == tagof Tmsg.Readerror)
break Run; # TO DO?
if(dbg)
sys->fprint(stderr, "->%s\n", t.text());
tag := newtag(t, nsec(timefd));
stats.nrpc++;
stats.nproto += n;
rpc := stats.rpc[tagof t];
if(rpc == nil){
sys->fprint(stderr, "iostats: unexpected T-msg %d\n", tagof t);
continue;
}
rpc.count++;
rpc.bin += big n;
pick pt := t {
Auth =>
tag.fid = newfid(pt.afid);
Attach =>
tag.fid = newfid(pt.fid);
Walk =>
tag.fid = findfid(pt.fid);
Open =>
tag.fid = findfid(pt.fid);
Create =>
tag.fid = findfid(pt.fid);
Read =>
tag.fid = findfid(pt.fid);
Write =>
tag.fid = findfid(pt.fid);
pt.data = nil; # don't need to keep data
Clunk or
Stat or
Remove =>
tag.fid = findfid(pt.fid);
Wstat =>
tag.fid = findfid(pt.fid);
}
(n, r) := <-rmsgs =>
if(r == nil || tagof r == tagof Rmsg.Readerror){
break Run; # TO DO
}
if(dbg)
sys->fprint(stderr, "<-%s\n", r.text());
stats.nproto += n;
tag := findtag(r.tag, 1);
if(tag == nil)
continue; # client or server error TO DO: account for flush
if(tagof r < len replymap && (tt := replymap[tagof r]) >= 0 && (rpc := stats.rpc[tt]) != nil){
update(rpc, nsec(timefd)-tag.stime);
rpc.bout += big n;
}
fid := tag.fid;
pick pr := r {
Error =>
pick m := tag.m {
Auth =>
if(fid != nil){
if(fid.nread != big 0 || fid.nwrite != big 0)
fidreport(fid);
freefid(fid);
}
}
Version =>
# could pick up message size
# flush fids/tags
tags = array[len tags] of ref Tag;
fids = array[len fids] of list of ref Fid;
Auth =>
# afid from fid.t, qaid from auth
if(fid != nil){
fid.qid = pr.aqid;
fid.path = ref Path(nil, "#auth");
}
Attach =>
if(fid != nil){
fid.qid = pr.qid;
fid.path = ref Path(nil, "/");
}
Walk =>
pick m := tag.m {
Walk =>
if(len pr.qids != len m.names)
break; # walk failed, no change
if(fid == nil)
break;
if(m.newfid != m.fid){
nf := newfid(m.newfid);
nf.path = fid.path;
fid = nf; # walk new fid
}
for(i := 0; i < len m.names; i++){
fid.qid = pr.qids[i];
if(m.names[i] == ".."){
if(fid.path.parent != nil)
fid.path = fid.path.parent;
}else
fid.path = ref Path(fid.path, m.names[i]);
}
}
Open or
Create =>
if(fid != nil)
fid.qid = pr.qid;
Read =>
fid.nread++;
nr := big len pr.data;
fid.bread += nr;
stats.totread += nr;
Write =>
# count
fid.nwrite++;
fid.bwrite += big pr.count;
stats.totwrite += big pr.count;
Flush =>
pick m := tag.m {
Flush =>
findtag(m.oldtag, 1); # discard if there
}
Clunk or
Remove =>
if(fid != nil){
if(fid.nread != big 0 || fid.nwrite != big 0)
fidreport(fid);
freefid(fid);
}
}
}
kill(rpid, "kill");
kill(tpid, "kill");
done <-= 1;
}
results()
{
stderr := sys->fildes(2);
rpc := stats.rpc[tagof Tmsg.Read];
brpsec := real stats.totread / ((real rpc.time/1.0e9)+.000001);
rpc = stats.rpc[tagof Tmsg.Write];
bwpsec := real stats.totwrite / ((real rpc.time/1.0e9)+.000001);
ttime := big 0;
for(n := 0; n < len stats.rpc; n++){
rpc = stats.rpc[n];
if(rpc == nil || rpc.count == big 0)
continue;
ttime += rpc.time;
}
bppsec := real stats.nproto / ((real ttime/1.0e9)+.000001);
sys->fprint(stderr, "\nread %bud bytes, %g Kb/sec\n", stats.totread, brpsec/1024.0);
sys->fprint(stderr, "write %bud bytes, %g Kb/sec\n", stats.totwrite, bwpsec/1024.0);
sys->fprint(stderr, "protocol %ud bytes, %g Kb/sec\n", stats.nproto, bppsec/1024.0);
sys->fprint(stderr, "rpc %ud count\n\n", stats.nrpc);
sys->fprint(stderr, "%-10s %5s %5s %5s %5s %5s T R\n",
"Message", "Count", "Low", "High", "Time", " Avg");
for(n = 0; n < len stats.rpc; n++){
rpc = stats.rpc[n];
if(rpc == nil || rpc.count == big 0)
continue;
sys->fprint(stderr, "%-10s %5bud %5bud %5bud %5bud %5bud ms %8bud %8bud bytes\n",
rpc.name,
rpc.count,
rpc.lo/Ns2ms,
rpc.hi/Ns2ms,
rpc.time/Ns2ms,
rpc.time/Ns2ms/rpc.count,
rpc.bin,
rpc.bout);
}
# unclunked fids
for(n = 0; n < NFIDHASH; n++)
for(fl := fids[n]; fl != nil; fl = tl fl){
fid := hd fl;
if(fid.nread != big 0 || fid.nwrite != big 0)
fidreport(fid);
}
if(frecs == nil)
exit;
sys->fprint(stderr, "\nOpens Reads (bytes) Writes (bytes) File\n");
for(frl := frecs; frl != nil; frl = tl frl){
fr := hd frl;
case s := makepath(fr.op) {
"/fd/0" => s = "(stdin)";
"/fd/1" => s = "(stdout)";
"/fd/2" => s = "(stderr)";
"" => s = "/.";
}
sys->fprint(stderr, "%5ud %8bud %8bud %8bud %8bud %s\n", fr.opens, fr.nread, fr.bread,
fr.nwrite, fr.bwrite, s);
}
}
Treader(fd: ref Sys->FD, ofd: ref Sys->FD, out: chan of (int, ref Tmsg))
{
out <-= (sys->pctl(0, nil), nil);
fd = sys->fildes(fd.fd);
ofd = sys->fildes(ofd.fd);
for(;;){
(a, err) := styx->readmsg(fd, Maxmsg);
if(err != nil){
out <-= (0, ref Tmsg.Readerror(0, err));
break;
}
if(a == nil){
out <-= (0, nil);
break;
}
(nil, m) := Tmsg.unpack(a);
if(m == nil){
out <-= (0, ref Tmsg.Readerror(0, "bad Styx T-message format"));
break;
}
out <-= (len a, m);
sys->write(ofd, a, len a); # TO DO: errors
}
}
Rreader(fd: ref Sys->FD, ofd: ref Sys->FD, out: chan of (int, ref Rmsg))
{
out <-= (sys->pctl(0, nil), nil);
fd = sys->fildes(fd.fd);
ofd = sys->fildes(ofd.fd);
for(;;){
(a, err) := styx->readmsg(fd, Maxmsg);
if(err != nil){
out <-= (0, ref Rmsg.Readerror(0, err));
break;
}
if(a == nil){
out <-= (0, nil);
break;
}
(nil, m) := Rmsg.unpack(a);
if(m == nil){
out <-= (0, ref Rmsg.Readerror(0, "bad Styx R-message format"));
break;
}
out <-= (len a, m);
sys->write(ofd, a, len a); # TO DO: errors
}
}
reply(fd: ref Sys->FD, m: ref Rmsg)
{
d := m.pack();
sys->write(fd, d, len d);
}
mkrpc(s: string): ref Rpc
{
return ref Rpc(s, big 0, big 0, big 1 << 40, big 0, big 0, big 0);
}
newfid(nr: int): ref Fid
{
h := nr%NFIDHASH;
for(fl := fids[h]; fl != nil; fl = tl fl)
if((hd fl).nr == nr)
return hd fl; # shouldn't happen: faulty client
fid := ref Fid;
fid.nr = nr;
fid.nread = big 0;
fid.nwrite = big 0;
fid.bread = big 0;
fid.bwrite = big 0;
fid.qid = Qid(big 0, 0, -1);
fids[h] = fid :: fids[h];
return fid;
}
findfid(nr: int): ref Fid
{
for(fl := fids[nr%NFIDHASH]; fl != nil; fl = tl fl)
if((hd fl).nr == nr)
return hd fl;
return nil;
}
freefid(fid: ref Fid)
{
h := fid.nr%NFIDHASH;
nl: list of ref Fid;
for(fl := fids[h]; fl != nil; fl = tl fl)
if((hd fl).nr != fid.nr)
nl = hd fl :: nl;
fids[h] = nl;
}
makepath(p: ref Path): string
{
nl: list of string;
for(; p != nil; p = p.parent)
if(p.name != "/")
nl = p.name :: nl;
s := "";
for(; nl != nil; nl = tl nl)
if(s != nil)
s += "/" + hd nl;
else
s = hd nl;
return "/"+s;
}
fatal(s: string)
{
sys->fprint(sys->fildes(2), "iostats: %s: %r\n", s);
raise "fatal:error";
}
nsec(fd: ref Sys->FD): big
{
buf := array[100] of byte;
n := sys->pread(fd, buf, len buf, big 0);
if(n <= 0)
return big 0;
return big string buf[0:n];
}
fidreport(f: ref Fid)
{
for(fl := frecs; fl != nil; fl = tl fl){
fr := hd fl;
if(eqqid(f.qid, fr.qid)){
# could put f.path in list of paths if aliases were interesting
fr.nread += f.nread;
fr.nwrite += f.nwrite;
fr.bread += f.bread;
fr.bwrite += f.bwrite;
fr.opens++;
return;
}
}
fr := ref Frec;
fr.op = f.path;
fr.qid = f.qid;
fr.nread = f.nread;
fr.nwrite = f.nwrite;
fr.bread = f.bread;
fr.bwrite = f.bwrite;
fr.opens = 1;
frecs = fr :: frecs;
}
update(rpc: ref Rpc, t: big)
{
if(t < big 0)
t = big 0;
rpc.time += t;
if(t < rpc.lo)
rpc.lo = t;
if(t > rpc.hi)
rpc.hi = t;
}
newtag(m: ref Tmsg, t: big): ref Tag
{
slot := m.tag & (NTAGHASH - 1);
tag := ref Tag(m, nil, t, tags[slot]);
tags[slot] = tag;
return tag;
}
findtag(tag: int, destroy: int): ref Tag
{
slot := tag & (NTAGHASH - 1);
prev: ref Tag;
for(t := tags[slot]; t != nil; t = t.next){
if(t.m.tag == tag)
break;
prev = t;
}
if(t == nil || !destroy)
return t;
if(prev == nil)
tags[slot] = t.next;
else
prev.next = t.next;
return t;
}
eqqid(a, b: Qid): int
{
return a.path == b.path && a.qtype == b.qtype;
}
export(fd: ref Sys->FD, pid: chan of int)
{
pid <-= sys->pctl(Sys->NEWFD|Sys->FORKNS, fd.fd::0::1::2::nil);
sys->export(fd, "/", Sys->EXPWAIT);
}
kill(pid: int, what: string)
{
fd := sys->open("#p/"+string pid+"/ctl", Sys->OWRITE);
if(fd != nil)
sys->fprint(fd, "%s", what);
}