ref: 7152244a1be2ce903bf0680a1756ddd7c574610d
parent: 81469d4229199e64b3e3fd233d4856d1f5a6a7de
author: cinap_lenrek <cinap_lenrek@gmx.de>
date: Wed Aug 7 21:07:01 EDT 2013
cwfs: fix out of order replies using a shared reply queue and a pool of worker procs does result in replies to be send out of order under some conditions. the symptoms are mnt errors when interrupting requests (Rflush arriving before the original requests response). this change gives each connection its own reply queue and its own srvo process. so now a connection consists of one reply queue, a srvi process reading the connections file descriptor and a srvo process reading the reply queue and writng replies to the connections file descriptor. the srvi processes live as long as the connection is established. the srvo prcoesses live forever and are attached to the chan (which gets reused). to avoid excessive process creation, we limit the number of connections to 30. srvchan() returns nil when all 30 network channels are in use.
--- a/sys/src/cmd/cwfs/net.c
+++ b/sys/src/cmd/cwfs/net.c
@@ -56,17 +56,20 @@
net = v;
for(;;) { if((lisfd = listen(net->anndir, net->lisdir)) < 0){- fprint(2, "listen %s failed: %r\n", net->anndir);
+ fprint(2, "%s: listen %s failed: %r\n", argv0, net->anndir);
break;
}
/* got new call on lisfd */
if((accfd = accept(lisfd, net->lisdir)) < 0){- fprint(2, "accept %d (from %s) failed: %r\n", lisfd, net->lisdir);
+ fprint(2, "%s: accept %d (from %s) failed: %r\n", argv0, lisfd, net->lisdir);
close(lisfd);
continue;
}
nci = getnetconninfo(net->lisdir, accfd);
- srvchan(accfd, nci->raddr);
+ if(srvchan(accfd, nci->raddr) == nil){+ fprint(2, "%s: srvchan failed for: %s\n", argv0, nci->raddr);
+ close(accfd);
+ }
freenetconninfo(nci);
}
}
--- a/sys/src/cmd/cwfs/srv.c
+++ b/sys/src/cmd/cwfs/srv.c
@@ -4,9 +4,8 @@
#include <thread.h>
enum {- Maxfdata = 8192,
- Nqueue = 200, /* queue size (tunable) */
- Nsrvo = 8, /* number of write workers */
+ Nqueue = 100, /* reply queue size per connection (tunable) */
+ Nchans = 30, /* maximum number of connections */
};
typedef struct Srv Srv;
@@ -22,8 +21,6 @@
Chan *hd;
} freechans;
-static Queue *srvoq;
-
void
chanhangup(Chan *chan, char *msg)
{@@ -69,14 +66,16 @@
}
static void
-srvo(void *)
+srvo(void *aux)
{+ Chan *chan;
Srv *srv;
Msgbuf *mb;
char buf[ERRMAX];
+ chan = aux;
for(;;){- mb = fs_recv(srvoq, 0);
+ mb = fs_recv(chan->reply, 0);
if(mb == nil)
continue;
if(mb->data == nil){@@ -84,12 +83,12 @@
mbfree(mb);
continue;
}
- srv = (Srv*)mb->param;
+ srv = chan->pdata;
while(write(srv->fd, mb->data, mb->count) != mb->count){rerrstr(buf, sizeof(buf));
if(strstr(buf, "interrupt"))
continue;
- chanhangup(srv->chan, buf);
+ chanhangup(chan, buf);
break;
}
mbfree(mb);
@@ -100,13 +99,17 @@
static void
srvi(void *aux)
{- Srv *srv = aux;
+ Chan *chan;
+ Srv *srv;
Msgbuf *mb, *ms;
uchar *b, *p, *e;
int n, m;
char buf[ERRMAX];
- if((mb = mballoc(IOHDRSZ+Maxfdata, srv->chan, Mbeth1)) == nil)
+ chan = aux;
+ srv = chan->pdata;
+
+ if((mb = mballoc(IOHDRSZ+MAXDAT, chan, Mbeth1)) == nil)
panic("srvi: mballoc failed");b = mb->data;
p = b;
@@ -126,12 +129,12 @@
goto Read;
}
if(m <= SMALLBUF){- if((ms = mballoc(m, srv->chan, Mbeth1)) == nil)
+ if((ms = mballoc(m, chan, Mbeth1)) == nil)
panic("srvi: mballoc failed");memmove(ms->data, b, m);
} else {ms = mb;
- if((mb = mballoc(mb->count, srv->chan, Mbeth1)) == nil)
+ if((mb = mballoc(mb->count, chan, Mbeth1)) == nil)
panic("srvi: mballoc failed");ms->count = m;
}
@@ -141,8 +144,7 @@
p = b + n;
incref(srv);
- ms->param = (uint)srv;
- fs_send(serveq, ms);
+ fs_send(chan->send, ms);
}
e = b + mb->count;
}
@@ -152,7 +154,7 @@
if(strstr(buf, "interrupt"))
goto Read;
- chanhangup(srv->chan, buf);
+ chanhangup(chan, buf);
srvput(srv);
mbfree(mb);
@@ -166,18 +168,15 @@
Srv *srv;
lock(&freechans);
- if(chan = freechans.hd){- srv = chan->pdata;
- freechans.hd = srv->chan;
+ chan = freechans.hd;
+ if(chan == nil){unlock(&freechans);
- } else {- unlock(&freechans);
- chan = fs_chaninit(1, sizeof(*srv));
- srv = chan->pdata;
+ return nil;
}
- chan->reply = srvoq;
- if(chan->send == nil)
- chan->send = serveq;
+ srv = chan->pdata;
+ freechans.hd = srv->chan;
+ unlock(&freechans);
+
chan->protocol = nil;
chan->msize = 0;
chan->whotime = 0;
@@ -187,8 +186,16 @@
incref(srv);
srv->chan = chan;
srv->fd = fd;
+
+ if(chan->reply == nil){+ chan->reply = newqueue(Nqueue, "srvoq");
+ newproc(srvo, chan, "srvo");
+ }
+
+ if(chan->send == nil)
+ chan->send = serveq;
snprint(buf, sizeof(buf), "srvi %s", name);
- newproc(srvi, srv, buf);
+ newproc(srvi, chan, buf);
return chan;
}
@@ -196,12 +203,17 @@
void
srvinit(void)
{+ Chan *chan;
+ Srv *srv;
int i;
- if(srvoq != nil)
+ if(freechans.hd != nil)
return;
-
- srvoq = newqueue(Nqueue, "srvoq");
- for(i=0; i<Nsrvo; i++)
- newproc(srvo, nil, "srvo");
+ for(i=0; i<Nchans; i++){+ chan = fs_chaninit(1, sizeof(Srv));
+ srv = chan->pdata;
+ srv->fd = -1;
+ srv->chan = freechans.hd;
+ freechans.hd = chan;
+ }
}
--
⑨