git: 9front

Download patch

ref: af95fef24ea28b23f80390d630dd357680283ac0
parent: d0e61ce9e62bfa1c6b3dad44d3d860b301cb65a9
author: aiju <aiju@phicode.de>
date: Tue Aug 16 18:00:34 EDT 2011

lib9p: added toilet queues

--- a/sys/include/9p.h
+++ b/sys/include/9p.h
@@ -27,10 +27,28 @@
 typedef struct Tree		Tree;
 typedef struct Readdir	Readdir;
 typedef struct Srv Srv;
+typedef struct Reqqueue Reqqueue;
+typedef struct Queueelem Queueelem;
 
 #pragma incomplete Filelist
 #pragma incomplete Readdir
 
+struct Queueelem
+{
+	Queueelem *prev, *next;
+	void (*f)(Req *);
+};
+
+struct Reqqueue
+{
+	QLock;
+	Rendez;
+	Queueelem;
+	int pid;
+	Req *cur;
+	jmp_buf flush;
+};
+
 struct Fid
 {
 	ulong	fid;
@@ -60,6 +78,8 @@
 	Fid*		afid;
 	Fid*		newfid;
 	Srv*		srv;
+	
+	Queueelem qu;
 
 /* below is implementation-specific; don't use */
 	QLock	lk;
@@ -255,3 +275,7 @@
 
 extern void (*_forker)(void (*)(void*), void*, int);
 
+Reqqueue*	reqqueuecreate(void);
+void		reqqueuepush(Reqqueue*, Req*, void (*)(Req *));
+void		reqqueueflush(Reqqueue*, Req*);
+int		reqqueueflushed(void);
--- a/sys/src/lib9p/mkfile
+++ b/sys/src/lib9p/mkfile
@@ -12,6 +12,7 @@
 	req.$O\
 	parse.$O\
 	post.$O\
+	queue.$O\
 	rfork.$O\
 	srv.$O\
 	thread.$O\
--- /dev/null
+++ b/sys/src/lib9p/queue.c
@@ -1,0 +1,113 @@
+#include <u.h>
+#include <libc.h>
+#include <thread.h>
+#include <fcall.h>
+#include <9p.h>
+
+static int
+_reqqueuenote(void *uregs, char *note)
+{
+	Reqqueue *q;
+
+	if(strcmp(note, "flush") != 0)
+		return 0;
+	q = *threaddata();
+	if(q != nil){
+		q->cur = nil;
+		notejmp(uregs, q->flush, 1);
+	}
+	return 1;
+}
+
+static void
+_reqqueueproc(void *v)
+{
+	Reqqueue *q;
+	Req *r;
+	void (*f)(Req *);
+	
+	q = v;
+	*threaddata() = q;
+	rfork(RFNOTEG);
+	threadnotify(_reqqueuenote, 1);
+	for(;;){
+		qlock(q);
+		q->cur = nil;
+		while(q->next == q)
+			rsleep(q);
+		r = (Req*)(((char*)q->next) - ((char*)&((Req*)0)->qu));
+		r->qu.next->prev = r->qu.prev;
+		r->qu.prev->next = r->qu.next;
+		f = r->qu.f;
+		qlock(&r->lk);
+		memset(&r->qu, 0, sizeof(r->qu));
+		qunlock(&r->lk);
+		q->cur = r;
+		if(setjmp(q->flush)){
+			respond(r, "interrupted");
+			continue;
+		}
+		qunlock(q);
+		f(r);
+	}
+}
+
+Reqqueue *
+reqqueuecreate(void)
+{
+	Reqqueue *q;
+
+	q = emalloc9p(sizeof(*q));
+	memset(q, 0, sizeof(*q));
+	q->l = q;
+	q->next = q->prev = q;
+	q->pid = threadpid(proccreate(_reqqueueproc, q, mainstacksize));
+	print("%d\n", q->pid);
+	return q;
+}
+
+void
+reqqueuepush(Reqqueue *q, Req *r, void (*f)(Req *))
+{
+	qlock(q);
+	r->qu.f = f;
+	r->qu.next = q;
+	r->qu.prev = q->prev;
+	q->prev->next = &r->qu;
+	q->prev = &r->qu;
+	rwakeupall(q);
+	qunlock(q);
+}
+
+void
+reqqueueflush(Reqqueue *q, Req *r)
+{
+	qlock(q);
+	if(q->cur == r){
+		postnote(PNPROC, q->pid, "flush");
+		qunlock(q);
+	}else{
+		if(r->qu.next != nil){
+			r->qu.next->prev = r->qu.prev;
+			r->qu.prev->next = r->qu.next;
+		}
+		qlock(&r->lk);
+		memset(&r->qu, 0, sizeof(r->qu));
+		qunlock(&r->lk);
+		qunlock(q);
+		respond(r, "interrupted");
+	}
+}
+
+int
+reqqueueflushed(void)
+{
+	Reqqueue *q;
+	
+	q = *threaddata();
+	qlock(q);
+	if(setjmp(q->flush))
+		return 1;
+	qunlock(q);
+	return 0;
+}
--