ref: af95fef24ea28b23f80390d630dd357680283ac0
parent: d0e61ce9e62bfa1c6b3dad44d3d860b301cb65a9
author: aiju <aiju@phicode.de>
date: Tue Aug 16 18:00:34 EDT 2011
lib9p: added toilet queues
--- a/sys/include/9p.h
+++ b/sys/include/9p.h
@@ -27,10 +27,28 @@
typedef struct Tree Tree;
typedef struct Readdir Readdir;
typedef struct Srv Srv;
+typedef struct Reqqueue Reqqueue;
+typedef struct Queueelem Queueelem;
#pragma incomplete Filelist
#pragma incomplete Readdir
+struct Queueelem
+{+ Queueelem *prev, *next;
+ void (*f)(Req *);
+};
+
+struct Reqqueue
+{+ QLock;
+ Rendez;
+ Queueelem;
+ int pid;
+ Req *cur;
+ jmp_buf flush;
+};
+
struct Fid
{ulong fid;
@@ -60,6 +78,8 @@
Fid* afid;
Fid* newfid;
Srv* srv;
+
+ Queueelem qu;
/* below is implementation-specific; don't use */
QLock lk;
@@ -255,3 +275,7 @@
extern void (*_forker)(void (*)(void*), void*, int);
+Reqqueue* reqqueuecreate(void);
+void reqqueuepush(Reqqueue*, Req*, void (*)(Req *));
+void reqqueueflush(Reqqueue*, Req*);
+int reqqueueflushed(void);
--- a/sys/src/lib9p/mkfile
+++ b/sys/src/lib9p/mkfile
@@ -12,6 +12,7 @@
req.$O\
parse.$O\
post.$O\
+ queue.$O\
rfork.$O\
srv.$O\
thread.$O\
--- /dev/null
+++ b/sys/src/lib9p/queue.c
@@ -1,0 +1,113 @@
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+#include <fcall.h>
+#include <9p.h>
+
+static int
+_reqqueuenote(void *uregs, char *note)
+{+ Reqqueue *q;
+
+ if(strcmp(note, "flush") != 0)
+ return 0;
+ q = *threaddata();
+ if(q != nil){+ q->cur = nil;
+ notejmp(uregs, q->flush, 1);
+ }
+ return 1;
+}
+
+static void
+_reqqueueproc(void *v)
+{+ Reqqueue *q;
+ Req *r;
+ void (*f)(Req *);
+
+ q = v;
+ *threaddata() = q;
+ rfork(RFNOTEG);
+ threadnotify(_reqqueuenote, 1);
+ for(;;){+ qlock(q);
+ q->cur = nil;
+ while(q->next == q)
+ rsleep(q);
+ r = (Req*)(((char*)q->next) - ((char*)&((Req*)0)->qu));
+ r->qu.next->prev = r->qu.prev;
+ r->qu.prev->next = r->qu.next;
+ f = r->qu.f;
+ qlock(&r->lk);
+ memset(&r->qu, 0, sizeof(r->qu));
+ qunlock(&r->lk);
+ q->cur = r;
+ if(setjmp(q->flush)){+ respond(r, "interrupted");
+ continue;
+ }
+ qunlock(q);
+ f(r);
+ }
+}
+
+Reqqueue *
+reqqueuecreate(void)
+{+ Reqqueue *q;
+
+ q = emalloc9p(sizeof(*q));
+ memset(q, 0, sizeof(*q));
+ q->l = q;
+ q->next = q->prev = q;
+ q->pid = threadpid(proccreate(_reqqueueproc, q, mainstacksize));
+ print("%d\n", q->pid);+ return q;
+}
+
+void
+reqqueuepush(Reqqueue *q, Req *r, void (*f)(Req *))
+{+ qlock(q);
+ r->qu.f = f;
+ r->qu.next = q;
+ r->qu.prev = q->prev;
+ q->prev->next = &r->qu;
+ q->prev = &r->qu;
+ rwakeupall(q);
+ qunlock(q);
+}
+
+void
+reqqueueflush(Reqqueue *q, Req *r)
+{+ qlock(q);
+ if(q->cur == r){+ postnote(PNPROC, q->pid, "flush");
+ qunlock(q);
+ }else{+ if(r->qu.next != nil){+ r->qu.next->prev = r->qu.prev;
+ r->qu.prev->next = r->qu.next;
+ }
+ qlock(&r->lk);
+ memset(&r->qu, 0, sizeof(r->qu));
+ qunlock(&r->lk);
+ qunlock(q);
+ respond(r, "interrupted");
+ }
+}
+
+int
+reqqueueflushed(void)
+{+ Reqqueue *q;
+
+ q = *threaddata();
+ qlock(q);
+ if(setjmp(q->flush))
+ return 1;
+ qunlock(q);
+ return 0;
+}
--
⑨