ref: babf901b4a508c3ec5d1f89655f10377bbdf9637
dir: /appl/collab/servers/mpx.b/
implement Service;
#
# 1 to many and many to 1 multiplexor
#
include "sys.m";
sys: Sys;
Qid: import Sys;
include "styx.m";
styx: Styx;
Tmsg, Rmsg: import styx;
include "styxservers.m";
styxservers: Styxservers;
Styxserver, Navigator: import styxservers;
nametree: Nametree;
Tree: import nametree;
include "service.m";
include "messages.m";
messages: Messages;
Msg, Msglist, Readreq, User: import messages;
Qdir, Qroot, Qusers, Qleaf: con iota;
srv: ref Styxserver;
clientidgen := 0;
Einactive: con "not currently active";
toleaf: ref Msglist;
toroot: ref Msglist;
userlist: list of ref User;
user := "inferno";
dir(name: string, perm: int, path: int): Sys->Dir
{
d := sys->zerodir;
d.name = name;
d.uid = user;
d.gid = user;
d.qid.path = big path;
if(perm & Sys->DMDIR)
d.qid.qtype = Sys->QTDIR;
else
d.qid.qtype = Sys->QTFILE;
d.mode = perm;
return d;
}
init(nil: list of string): (string, string, ref Sys->FD)
{
sys = load Sys Sys->PATH;
styx = load Styx Styx->PATH;
if(styx == nil)
return (sys->sprint("can't load %s: %r", Styx->PATH), nil, nil);
styxservers = load Styxservers Styxservers->PATH;
if(styxservers == nil)
return (sys->sprint("can't load %s: %r", Styxservers->PATH), nil, nil);
nametree = load Nametree Nametree->PATH;
if(nametree == nil)
return (sys->sprint("can't load %s: %r", Nametree->PATH), nil, nil);
styx->init();
styxservers->init(styx);
styxservers->traceset(1);
nametree->init();
messages = load Messages Messages->PATH;
if(messages == nil)
return (sys->sprint("can't load %s: %r", Messages->PATH), nil, nil);
(tree, treeop) := nametree->start();
tree.create(big Qdir, dir(".", Sys->DMDIR|8r555, Qdir));
tree.create(big Qdir, dir("leaf", 8r666, Qleaf));
tree.create(big Qdir, dir("root", 8r666, Qroot));
tree.create(big Qdir, dir("users", 8r444, Qusers));
p := array [2] of ref Sys->FD;
if (sys->pipe(p) < 0){
tree.quit();
return (sys->sprint("can't create pipe: %r"), nil, nil);
}
toleaf = Msglist.new();
toroot = Msglist.new();
tc: chan of ref Tmsg;
(tc, srv) = Styxserver.new(p[1], Navigator.new(treeop), big Qdir);
spawn mpx(tc, tree);
return (nil, "/", p[0]);
}
mpx(tc: chan of ref Tmsg, tree: ref Tree)
{
root: ref User;
while((tmsg := <-tc) != nil){
pick tm := tmsg {
Readerror =>
break;
Open =>
c := srv.getfid(tm.fid);
if(c == nil || c.isopen){
srv.reply(ref Rmsg.Error(tm.tag, Styxservers->Ebadfid));
continue;
}
case int c.path {
Qroot =>
if(root != nil){
srv.reply(ref Rmsg.Error(tm.tag, sys->sprint("interaction already directed by %s", root.name)));
continue;
}
c = srv.open(tm);
if (c == nil)
continue;
root = ref User(0, tm.fid, c.uname, nil);
root.initqueue(toroot);
Qleaf =>
if(root == nil){
srv.reply(ref Rmsg.Error(tm.tag, Einactive));
continue;
}
c = srv.open(tm);
if (c == nil)
continue;
userarrives(tm.fid, c.uname);
# mpxdir[1].qid.vers++; # TO DO
* =>
srv.open(tm);
}
Read =>
c := srv.getfid(tm.fid);
if (c == nil || !c.isopen) {
srv.reply(ref Rmsg.Error(tm.tag, Styxservers->Ebadfid));
continue;
}
case int c.path {
Qdir =>
srv.read(tm);
Qroot =>
tm.offset = big 0;
m := qread(toroot, root, tm, 1);
if(m != nil)
srv.reply(ref Rmsg.Read(tm.tag, m.data));
Qleaf =>
u := fid2user(tm.fid);
if (u == nil) {
srv.reply(ref Rmsg.Error(tm.tag, "internal error -- lost user"));
continue;
}
tm.offset = big 0;
m := qread(toleaf, u, tm, 0);
if(m == nil){
if(root == nil)
srv.reply(ref Rmsg.Read(tm.tag, nil));
else
qread(toleaf, u, tm, 1); # put us on the wait queue
}else
srv.reply(ref Rmsg.Read(tm.tag, m.data));
Qusers =>
srv.reply(styxservers->readstr(tm, usernames()));
* =>
srv.reply(ref Rmsg.Error(tm.tag, "phase error -- bad path"));
}
Write =>
c := srv.getfid(tm.fid);
if (c == nil || !c.isopen) {
srv.reply(ref Rmsg.Error(tm.tag, Styxservers->Ebadfid));
continue;
}
case int c.path {
Qroot =>
qwrite(toleaf, msg(root, 'M', tm.data));
srv.reply(ref Rmsg.Write(tm.tag, len tm.data));
Qleaf =>
u := fid2user(tm.fid);
if(u == nil) {
srv.reply(ref Rmsg.Error(tm.tag, "internal error -- lost user"));
continue;
}
if(root == nil){
srv.reply(ref Rmsg.Error(tm.tag, Einactive));
continue;
}
qwrite(toroot, msg(u, 'm', tm.data));
srv.reply(ref Rmsg.Write(tm.tag, len tm.data));
* =>
srv.reply(ref Rmsg.Error(tm.tag, Styxservers->Eperm));
}
Flush =>
cancelpending(tm.tag);
srv.reply(ref Rmsg.Flush(tm.tag));
Clunk =>
c := srv.getfid(tm.fid);
if(c.isopen){
case int c.path {
Qroot =>
# shut down?
qwrite(toleaf, msg(root, 'L', nil));
root = nil;
Qleaf =>
userleaves(tm.fid);
# mpxdir[1].qid.vers++; # TO DO
}
}
* =>
srv.default(tmsg);
}
}
tree.quit();
sys->print("mpx exit\n");
}
mpxseqgen := 0;
time(): int
{
return ++mpxseqgen; # server time; assumes 2^31-1 is large enough
}
userarrives(fid: int, name: string)
{
u := User.new(fid, name);
qwrite(toroot, msg(u, 'a', nil));
u.initqueue(toleaf); # sees leaf messages from now on
userlist = u :: userlist;
}
fid2user(fid: int): ref User
{
for(ul := userlist; ul != nil; ul = tl ul)
if((u := hd ul).fid == fid)
return u;
return nil;
}
userleaves(fid: int)
{
ul := userlist;
userlist = nil;
u: ref User;
for(; ul != nil; ul = tl ul)
if((hd ul).fid != fid)
userlist = hd ul :: userlist;
else
u = hd ul;
if(u != nil)
qwrite(toroot, msg(u, 'l', nil));
}
usernames(): string
{
s := "";
for(ul := userlist; ul != nil; ul = tl ul){
u := hd ul;
s += string u.id+" "+u.name+"\n";
}
return s;
}
qwrite(msgs: ref Msglist, m: ref Msg)
{
pending := msgs.write(m);
for(; pending != nil; pending = tl pending){
(u, req) := hd pending;
m = u.read(); # must succeed, or the code is wrong
data := m.data;
if(req.count < len data)
data = data[0:req.count];
srv.reply(ref Rmsg.Read(req.tag, data));
}
}
qread(msgs: ref Msglist, u: ref User, tm: ref Tmsg.Read, wait: int): ref Msg
{
m := u.read();
if(m != nil){
if(tm.count < len m.data)
m.data = m.data[0:tm.count];
}else if(wait)
msgs.wait(u, ref Readreq(tm.tag, tm.fid, tm.count, tm.offset));
return m;
}
cancelpending(tag: int)
{
toroot.flushtag(tag);
toleaf.flushtag(tag);
}
msg(u: ref User, op: int, data: array of byte): ref Msg
{
a := sys->aprint("%ud %d %c %s ", time(), u.id, op, u.name);
m := ref Msg(u, array[len a + len data] of byte, nil);
m.data[0:] = a;
m.data[len a:] = data;
return m;
}