ref: babf901b4a508c3ec5d1f89655f10377bbdf9637
dir: /appl/lib/iobuf.b/
implement IOBuf;
include "sys.m";
sys: Sys;
sprint: import sys;
include "iobuf.m";
LF: array of byte;
init()
{
sys = load Sys Sys->PATH;
LF = array[] of { byte '\n' };
}
ReadBuf.new(fd: ref Sys->FD, bufsize: int): ref ReadBuf
{
r := ref ReadBuf;
r.buf = array[bufsize] of byte;
r.s = 0;
r.e = 0;
r.setsep("\n", 1);
r.fd = fd;
r.reader = sysread;
r.is_eof = 0;
return r;
}
ReadBuf.newc(queuesize, bufsize: int): ref ReadBuf
{
r := ReadBuf.new(nil, bufsize);
r.queue = chan[queuesize] of array of byte;
r.pending = chan[1] of (array of byte, Sys->Rwrite);
r.is_pending = chan[1] of int;
r.reader = chanread;
return r;
}
ReadBuf.setsep(r: self ref ReadBuf, sep: string, strip: int)
{
if(sep == nil)
raise "iobuf:empty separator";
r.sep = array of byte sep;
r.strip = strip;
}
ReadBuf.reads(r: self ref ReadBuf): array of byte
{
if(len r.sep != 1)
raise "iobuf:multibyte separator not implemented yet";
c := r.sep[0];
for(;;){
if(r.is_eof)
if(r.s == r.e)
return nil;
else{
s := r.s;
r.s = r.e;
return r.buf[s:r.e];
}
for(i := r.s; i < r.e; i++)
if(r.buf[i] == c){
s := r.s;
r.s = i+1;
return r.buf[s:i + 1 * !r.strip];
}
if(r.s != 0){
r.buf[0:] = r.buf[r.s:r.e];
r.e -= r.s;
r.s = 0;
}
if(r.e == len r.buf)
raise "iobuf:no separator found in full buffer";
if(r.reader(r) == 0)
r.is_eof = 1;
}
}
sysread(r: ref ReadBuf): int
{
n := sys->read(r.fd, r.buf[r.e:], len r.buf - r.e);
if(n < 0)
raise sprint("iobuf:%r");
r.e += n;
return n;
}
bufread(r: ref ReadBuf, buf: array of byte): int
{
n := len buf;
if(len r.buf - r.e < n)
n = len r.buf - r.e;
r.buf[r.e:] = buf[0:n];
r.e += n;
if(len buf > n)
r.leftover = buf[n:];
else
r.leftover = nil;
return n;
}
chanread(r: ref ReadBuf): int
{
if(r.leftover != nil)
return bufread(r, r.leftover);
alt{
buf := <-r.queue =>
if(buf == nil)
return 0;
else
return bufread(r, buf);
(buf, wc) := <-r.pending =>
n := len buf;
alt{
buf2 := <-r.queue =>
r.queue <-= buf;
buf = buf2;
* =>
;
}
<-r.is_pending;
if(wc != nil)
wc <-= (n, nil);
if(buf == nil)
return 0;
else
return bufread(r, buf);
}
}
ReadBuf.readn(r: self ref ReadBuf, n: int): array of byte
{
if(r.is_eof)
return nil;
if(r.e - r.s >= n){
s := r.s;
r.s += n;
return r.buf[s:r.s];
}
oldbuf : array of byte;
if(len r.buf >= n){
if(len r.buf - r.s < n){
r.buf[0:] = r.buf[r.s:r.e];
r.e -= r.s;
r.s = 0;
}
}
else{
oldbuf = r.buf;
r.buf = array[n] of byte;
r.buf[0:] = oldbuf[r.s:r.e];
r.e -= r.s;
r.s = 0;
}
while(r.e - r.s < n)
if(r.reader(r) == 0){
r.is_eof = 1;
n = r.e - r.s;
}
if(oldbuf == nil){
s := r.s;
r.s += n;
return r.buf[s:r.s];
}
else{
tmp := r.buf;
r.buf = oldbuf;
r.s = r.e = 0;
return tmp[:n];
}
}
ReadBuf.fill(r: self ref ReadBuf, data: array of byte, wc: Sys->Rwrite)
{
alt{
r.is_pending <-= 1 =>
<-r.is_pending;
alt{
r.queue <-= data =>
if(wc != nil)
wc <-= (len data, nil);
* =>
r.is_pending <-= 1;
r.pending <-= (data, wc);
}
* =>
if(wc != nil)
wc <-= (0, "concurrent writes not supported");
}
}
#
WriteBuf.new(fd: ref Sys->FD, bufsize: int): ref WriteBuf
{
w := ref WriteBuf;
w.buf = array[bufsize] of byte;
w.s = 0;
w.e = 0;
w.fd = fd;
w.writer = syswrite;
return w;
}
WriteBuf.newc(bufsize: int): ref WriteBuf
{
w := WriteBuf.new(nil, bufsize);
w.pending = chan[1] of (int, Sys->Rread);
w.writer = chanwrite;
return w;
}
WriteBuf.write(w: self ref WriteBuf, buf: array of byte)
{
n := 0;
if(w.e != 0){
n = len w.buf - w.e;
if(n > len buf)
n = len buf;
w.buf[w.e:] = buf[:n];
w.e += n;
if(len w.buf == w.e)
w.flush();
}
if(len buf > n){
n2 := int((len buf - n) / len w.buf) * len w.buf;
if(n2 > 0){
tmp := w.buf;
w.buf = buf[n:n + n2];
w.s = 0;
w.e = n2;
w.flush();
w.buf = tmp;
n += n2;
}
w.buf[0:] = buf[n:];
w.e = len buf - n;
}
if(w.fd == nil && w.s != w.e)
optchanwrite(w);
}
WriteBuf.writeln(w: self ref WriteBuf, buf: array of byte)
{
w.write(buf);
w.write(LF);
}
syswrite(w: ref WriteBuf)
{
n := sys->write(w.fd, w.buf[w.s:w.e], w.e - w.s);
if(n != w.e - w.s)
raise sprint("iobuf:%r");
w.s = 0;
w.e = 0;
}
chanwrite(w: ref WriteBuf)
{
(n, rc) := <-w.pending;
if(rc == nil)
raise "iobuf:broken pipe";
if(n > w.e - w.s)
n = w.e - w.s;
buf := array[n] of byte;
buf[0:] = w.buf[w.s:w.s + n];
rc <-= (buf, nil);
w.s += n;
}
optchanwrite(w: ref WriteBuf)
{
alt{
(n, rc) := <-w.pending =>
if(rc == nil)
raise "iobuf:broken pipe";
if(n > w.e - w.s)
n = w.e - w.s;
buf := array[n] of byte;
buf[0:] = w.buf[w.s:w.s + n];
rc <-= (buf, nil);
w.s += n;
* =>
;
}
}
WriteBuf.flush(w: self ref WriteBuf)
{
while(w.s != w.e)
w.writer(w);
w.s = w.e = 0;
}
WriteBuf.eof(w: self ref WriteBuf)
{
w.flush();
if(w.fd != nil)
return;
for(;;){
(nil, rc) := <-w.pending;
if(rc == nil)
break;
rc <-= (nil, nil);
}
}
WriteBuf.request(w: self ref WriteBuf, n: int, rc: Sys->Rread)
{
if(rc == nil)
alt{
<-w.pending => ;
* => ;
}
alt{
w.pending <-= (n, rc) =>;
* => rc <-= (nil, "concurrent reads not supported");
}
}