git: 9front

ref: 5afb6d1c1439fa549ba8bb3586c51178faef8022
dir: /sys/src/libthread/channel.c/

View raw version
#include <u.h>
#include <libc.h>
#include <thread.h>
#include "threadimpl.h"

/* Value to indicate the channel is closed */
enum {
	CHANCLOSD = 0xc105ed,
};

static char errcl[] = "channel was closed";
static Lock chanlock;		/* central channel access lock */

static void enqueue(Alt*, Channel**);
static void dequeue(Alt*);
static int canexec(Alt*);
static int altexec(Alt*, int);

#define Closed	((void*)CHANCLOSD)
#define Intred	((void*)~0)		/* interrupted */

static void
_chanfree(Channel *c)
{
	int i, inuse;

	if(c->closed == 1)			/* chanclose is ongoing */
		inuse = 1;
	else{
		inuse = 0;
		for(i = 0; i < c->nentry; i++)	/* alt ongoing */
			if(c->qentry[i])
				inuse = 1;
	}
	if(inuse)
		c->freed = 1;
	else{
		if(c->qentry)
			free(c->qentry);
		free(c);
	}
}

void
chanfree(Channel *c)
{
	lock(&chanlock);
	_chanfree(c);
	unlock(&chanlock);
}

Channel*
chancreate(int elemsize, int elemcnt)
{
	Channel *c;

	if(elemcnt < 0 || elemsize <= 0)
		return nil;
	c = _threadmalloc(sizeof(Channel)+elemsize*elemcnt, 1);
	c->e = elemsize;
	c->s = elemcnt;
	_threaddebug(DBGCHAN, "chancreate %p", c);
	return c;
}

static int
isopenfor(Channel *c, int op)
{
	return c->closed == 0 || (op == CHANRCV && c->n > 0);
}

int
alt(Alt *alts)
{
	Alt *a, *xa, *ca;
	Channel volatile *c;
	int n, s, waiting, allreadycl;
	void* r;
	Thread *t;

	/*
	 * The point of going splhi here is that note handlers
	 * might reasonably want to use channel operations,
	 * but that will hang if the note comes while we hold the
	 * chanlock.  Instead, we delay the note until we've dropped
	 * the lock.
	 */
	t = _threadgetproc()->thread;
	if(t->moribund || _threadexitsallstatus)
		yield();	/* won't return */
	s = _procsplhi();
	lock(&chanlock);
	t->alt = alts;
	t->chan = Chanalt;

	/* test whether any channels can proceed */
	n = 0;
	a = nil;

	for(xa=alts; xa->op!=CHANEND && xa->op!=CHANNOBLK; xa++){
		xa->entryno = -1;
		if(xa->op == CHANNOP)
			continue;

		c = xa->c;
		if(c==nil){
			unlock(&chanlock);
			_procsplx(s);
			t->chan = Channone;
			return -1;
		}

		if(isopenfor(c, xa->op) && canexec(xa))
			if(nrand(++n) == 0)
				a = xa;
	}


	if(a==nil){
		/* nothing can proceed */
		if(xa->op == CHANNOBLK){
			unlock(&chanlock);
			_procsplx(s);
			t->chan = Channone;
			if(xa->op == CHANNOBLK)
				return xa - alts;
		}

		/* enqueue on all channels open for us. */
		c = nil;
		ca = nil;
		waiting = 0;
		allreadycl = 0;
		for(xa=alts; xa->op!=CHANEND; xa++)
			if(xa->op==CHANNOP)
				continue;
			else if(isopenfor(xa->c, xa->op)){
				waiting = 1;
				enqueue(xa, &c);
			} else if(xa->err != errcl)
				ca = xa;
			else
				allreadycl = 1;

		if(waiting == 0)
			if(ca != nil){
				/* everything was closed, select last channel */
				ca->err = errcl;
				unlock(&chanlock);
				_procsplx(s);
				t->chan = Channone;
				return ca - alts;
			} else if(allreadycl){
				/* everything was already closed */
				unlock(&chanlock);
				_procsplx(s);
				t->chan = Channone;
				return -1;
			}
		/*
		 * wait for successful rendezvous.
		 * we can't just give up if the rendezvous
		 * is interrupted -- someone else might come
		 * along and try to rendezvous with us, so
		 * we need to be here.
		 * if the channel was closed, the op is done
		 * and we flag an error for the entry.
		 */
	    Again:
		unlock(&chanlock);
		_procsplx(s);
		r = _threadrendezvous(&c, 0);
		s = _procsplhi();
		lock(&chanlock);

		if(r==Intred){		/* interrupted */
			if(c!=nil)	/* someone will meet us; go back */
				goto Again;
			c = (Channel*)~0;	/* so no one tries to meet us */
		}

		/* dequeue from channels, find selected one */
		a = nil;
		for(xa=alts; xa->op!=CHANEND; xa++){
			if(xa->op==CHANNOP)
				continue;
			if(xa->c == c){
				a = xa;
				a->err = nil;
				if(r == Closed)
					a->err = errcl;
			}
			dequeue(xa);
		}
		unlock(&chanlock);
		_procsplx(s);
		if(a == nil){	/* we were interrupted */
			assert(c==(Channel*)~0);
			return -1;
		}
	}else
		altexec(a, s);	/* unlocks chanlock, does splx */
	_sched();
	t->chan = Channone;
	return a - alts;
}

int
chanclose(Channel *c)
{
	Alt *a;
	int i, s;

	s = _procsplhi();	/* note handlers; see :/^alt */
	lock(&chanlock);
	if(c->closed){
		/* Already close; we fail but it's ok. don't print */
		unlock(&chanlock);
		_procsplx(s);
		return -1;
	}
	c->closed = 1;		/* Being closed */
	/*
	 * Locate entries that will fail due to close
	 * (send, and receive if nothing buffered) and wake them up.
	 * the situation cannot change because all queries
	 * should be committed by now and new ones will find the channel
	 * closed.  We still need to take the lock during the iteration
	 * because we can wake threads on qentrys we have not seen yet
	 * as in alt and there would be a race in the access to *a.
	 */
	for(i = 0; i < c->nentry; i++){
		if((a = c->qentry[i]) == nil || *a->tag != nil)
			continue;

		if(a->op != CHANSND && (a->op != CHANRCV || c->n != 0))
			continue;
		*a->tag = c;
		unlock(&chanlock);
		_procsplx(s);
		while(_threadrendezvous(a->tag, Closed) == Intred)
			;
		s = _procsplhi();
		lock(&chanlock);
	}

	c->closed = 2;		/* Fully closed */
	if(c->freed)
		_chanfree(c);
	unlock(&chanlock);
	_procsplx(s);
	return 0;
}

int
chanclosing(Channel *c)
{
	int n, s;

	s = _procsplhi();	/* note handlers; see :/^alt */
	lock(&chanlock);
	if(c->closed == 0)
		n = -1;
	else
		n = c->n;
	unlock(&chanlock);
	_procsplx(s);
	return n;
}

/*
 * superseded by chanclosing
int
chanisclosed(Channel *c)
{
	return chanisclosing(c) >= 0;
}
 */

static int
runop(int op, Channel *c, void *v, int nb)
{
	int r;
	Alt a[2];

	/*
	 * we could do this without calling alt,
	 * but the only reason would be performance,
	 * and i'm not convinced it matters.
	 */
	a[0].op = op;
	a[0].c = c;
	a[0].v = v;
	a[0].err = nil;
	a[1].op = CHANEND;
	if(nb)
		a[1].op = CHANNOBLK;
	switch(r=alt(a)){
	case -1:	/* interrupted */
		return -1;
	case 1:	/* nonblocking, didn't accomplish anything */
		assert(nb);
		return 0;
	case 0:
		/*
		 * Okay, but return -1 if the op is done because of a close.
		 */
		if(a[0].err != nil)
			return -1;
		return 1;
	default:
		fprint(2, "ERROR: channel alt returned %d\n", r);
		abort();
	}
}

int
recv(Channel *c, void *v)
{
	return runop(CHANRCV, c, v, 0);
}

int
nbrecv(Channel *c, void *v)
{
	return runop(CHANRCV, c, v, 1);
}

int
send(Channel *c, void *v)
{
	return runop(CHANSND, c, v, 0);
}

int
nbsend(Channel *c, void *v)
{
	return runop(CHANSND, c, v, 1);
}

#define channelsize(c, sz)	assert(c->e == sz)

int
sendul(Channel *c, ulong v)
{
	channelsize(c, sizeof(ulong));
	return send(c, &v);
}

ulong
recvul(Channel *c)
{
	ulong v;

	channelsize(c, sizeof(ulong));
	if(recv(c, &v) < 0)
		return ~0;
	return v;
}

int
sendp(Channel *c, void *v)
{
	channelsize(c, sizeof(void*));
	return send(c, &v);
}

void*
recvp(Channel *c)
{
	void *v;

	channelsize(c, sizeof(void*));
	if(recv(c, &v) < 0)
		return nil;
	return v;
}

int
nbsendul(Channel *c, ulong v)
{
	channelsize(c, sizeof(ulong));
	return nbsend(c, &v);
}

ulong
nbrecvul(Channel *c)
{
	ulong v;

	channelsize(c, sizeof(ulong));
	if(nbrecv(c, &v) == 0)
		return 0;
	return v;
}

int
nbsendp(Channel *c, void *v)
{
	channelsize(c, sizeof(void*));
	return nbsend(c, &v);
}

void*
nbrecvp(Channel *c)
{
	void *v;

	channelsize(c, sizeof(void*));
	if(nbrecv(c, &v) == 0)
		return nil;
	return v;
}

static int
emptyentry(Channel *c)
{
	int i, extra;

	assert((c->nentry==0 && c->qentry==nil) || (c->nentry && c->qentry));

	for(i=0; i<c->nentry; i++)
		if(c->qentry[i]==nil)
			return i;

	extra = 16;
	c->nentry += extra;
	c->qentry = realloc((void*)c->qentry, c->nentry*sizeof(c->qentry[0]));
	if(c->qentry == nil)
		sysfatal("realloc channel entries: %r");
	memset(&c->qentry[i], 0, extra*sizeof(c->qentry[0]));
	return i;
}

static void
enqueue(Alt *a, Channel **c)
{
	int i;

	_threaddebug(DBGCHAN, "Queuing alt %p on channel %p", a, a->c);
	a->tag = c;
	i = emptyentry(a->c);
	a->c->qentry[i] = a;
}

static void
dequeue(Alt *a)
{
	int i;
	Channel *c;

	c = a->c;
	for(i=0; i<c->nentry; i++)
		if(c->qentry[i]==a){
			_threaddebug(DBGCHAN, "Dequeuing alt %p from channel %p", a, a->c);
			c->qentry[i] = nil;
			/* release if freed and not closing */
			if(c->freed && c->closed != 1)
				_chanfree(c);
			return;
		}
}

static int
canexec(Alt *a)
{
	int i, otherop;
	Channel *c;

	c = a->c;
	/* are there senders or receivers blocked? */
	otherop = (CHANSND+CHANRCV) - a->op;
	for(i=0; i<c->nentry; i++)
		if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil){
			_threaddebug(DBGCHAN, "can rendez alt %p chan %p", a, c);
			return 1;
		}

	/* is there room in the channel? */
	if((a->op==CHANSND && c->n < c->s)
	|| (a->op==CHANRCV && c->n > 0)){
		_threaddebug(DBGCHAN, "can buffer alt %p chan %p", a, c);
		return 1;
	}

	return 0;
}

static void*
altexecbuffered(Alt *a, int willreplace)
{
	uchar *v;
	Channel *c;

	c = a->c;
	/* use buffered channel queue */
	if(a->op==CHANRCV && c->n > 0){
		_threaddebug(DBGCHAN, "buffer recv alt %p chan %p", a, c);
		v = c->v + c->e*(c->f%c->s);
		if(!willreplace)
			c->n--;
		c->f++;
		return v;
	}
	if(a->op==CHANSND && c->n < c->s){
		_threaddebug(DBGCHAN, "buffer send alt %p chan %p", a, c);
		v = c->v + c->e*((c->f+c->n)%c->s);
		if(!willreplace)
			c->n++;
		return v;
	}
	abort();
}

static void
altcopy(void *dst, void *src, int sz)
{
	if(dst){
		if(src)
			memmove(dst, src, sz);
		else
			memset(dst, 0, sz);
	}
}

static int
altexec(Alt *a, int spl)
{
	volatile Alt *b;
	int i, n, otherop;
	Channel *c;
	void *me, *waiter, *buf;

	c = a->c;

	/* rendezvous with others */
	otherop = (CHANSND+CHANRCV) - a->op;
	n = 0;
	b = nil;
	me = a->v;
	for(i=0; i<c->nentry; i++)
		if(c->qentry[i] && c->qentry[i]->op==otherop && *c->qentry[i]->tag==nil)
			if(nrand(++n) == 0)
				b = c->qentry[i];
	if(b != nil){
		_threaddebug(DBGCHAN, "rendez %s alt %p chan %p alt %p", a->op==CHANRCV?"recv":"send", a, c, b);
		waiter = b->v;
		if(c->s && c->n){
			/*
			 * if buffer is full and there are waiters
			 * and we're meeting a waiter,
			 * we must be receiving.
			 *
			 * we use the value in the channel buffer,
			 * copy the waiter's value into the channel buffer
			 * on behalf of the waiter, and then wake the waiter.
			 */
			if(a->op!=CHANRCV)
				abort();
			buf = altexecbuffered(a, 1);
			altcopy(me, buf, c->e);
			altcopy(buf, waiter, c->e);
		}else{
			if(a->op==CHANRCV)
				altcopy(me, waiter, c->e);
			else
				altcopy(waiter, me, c->e);
		}
		*b->tag = c;	/* commits us to rendezvous */
		_threaddebug(DBGCHAN, "unlocking the chanlock");
		unlock(&chanlock);
		_procsplx(spl);
		_threaddebug(DBGCHAN, "chanlock is %lud", *(ulong*)&chanlock);
		while(_threadrendezvous(b->tag, 0) == Intred)
			;
		return 1;
	}

	buf = altexecbuffered(a, 0);
	if(a->op==CHANRCV)
		altcopy(me, buf, c->e);
	else
		altcopy(buf, me, c->e);

	unlock(&chanlock);
	_procsplx(spl);
	return 1;
}