git: 9front

Download patch

ref: f916f6b77e7bc5716093850b66946ddbdc94c6a8
parent: 2dbddd386595eda64f97b1fae3747451c47dd4f2
author: cinap_lenrek <cinap_lenrek@felloff.net>
date: Sun Oct 19 08:32:51 EDT 2025

tcp: splice local tcp conversations together, bypassing ip stack

When we are creating a connection to outselfs, we can skip
tcp and ip completely and just make the conversations into
a pair of pipes.

This makes loopback tcp connections as fast as pipes
and saves memory as no retransmission is needed.

For this, new qsetbypass() function was added, that changes
a (transmission) queue into a bypass at runtime.

It drains whatever is in the queue first and then setting
bypass function pointer. A queue can be un-bypassed again
by passing nil as the bypass function.

--- a/sys/src/9/ip/tcp.c
+++ b/sys/src/9/ip/tcp.c
@@ -200,6 +200,7 @@
 typedef struct Tcpctl Tcpctl;
 struct Tcpctl
 {
+	Conv	*bypass;		/* The other when bypassing */
 	uchar	state;			/* Connection state */
 	uchar	type;			/* Listening or active connection */
 	uchar	code;			/* Icmp code */
@@ -251,6 +252,7 @@
 	ulong	rttseq;			/* Round trip sequence */
 	int	srtt;			/* Smoothed round trip */
 	int	mdev;			/* Mean deviation of round trip */
+
 	union {
 		Tcp4hdr	tcp4hdr;
 		Tcp6hdr	tcp6hdr;
@@ -412,6 +414,10 @@
 		qclose(s->rq);
 		qclose(s->wq);
 		qclose(s->eq);
+		if(tcb->bypass != nil){
+			tcb->bypass = nil;
+			qsetbypass(s->wq, nil);
+		}
 		break;
 
 	case Close_wait:		/* Remote closes */
@@ -523,8 +529,14 @@
 	case Syn_sent:
 		localclose(c, nil);
 		break;
-	case Syn_received:
 	case Established:
+		if(tcb->bypass != nil){
+			qhangup(tcb->bypass->rq, nil);
+			localclose(c, nil);
+			break;
+		}
+		/* wet floor */
+	case Syn_received:
 		tcb->flgcnt++;
 		tcb->snd.nxt++;
 		tcpsetstate(c, Finwait1);
@@ -547,9 +559,12 @@
 
 	qlock(s);
 	switch(tcb->state) {
+	case Established:
+		if(tcb->bypass != nil)
+			break;
+		/* wet floor */
 	case Syn_sent:
 	case Syn_received:
-	case Established:
 	case Close_wait:
 		/*
 		 * Push data
@@ -623,7 +638,7 @@
 	Tcpctl *tcb = (Tcpctl*)s->ptcl;
 
 	qlock(s);
-	if(tcb->state != Closed){
+	if(tcb->state != Closed && tcb->bypass == nil){
 		tcb->flags |= FORCE;
 		tcpoutput(s);
 	}
@@ -1325,7 +1340,7 @@
 {
 	Tcpctl *tcb = (Tcpctl*)s->ptcl;
 
-	if(ipcmp(s->raddr, IPnoaddr) != 0 && tcb->state != Closed) {
+	if(ipcmp(s->raddr, IPnoaddr) != 0 && tcb->state != Closed && tcb->bypass == nil) {
 		Block *bp;
 		Tcp seg;
 
@@ -1565,7 +1580,60 @@
 	tcb->mdev = rtt<<(LOGDGAIN-1);
 }
 
+static void
+tcpbypass(void *arg, Block *b)
+{
+	Conv *c = (Conv*)arg;
+	Tcpctl *tcb = (Tcpctl*)c->ptcl;
+	Conv *o = tcb->bypass;
+
+	if(o == nil || ((Tcpctl*)o->ptcl)->bypass != c){
+		freeblist(b);
+		return;
+	}
+	qbwrite(o->rq, b);
+}
+
 /*
+ *  splice two local tcp conversations together,
+ *  having the wq directly bypass into the other
+ *  ends rq.
+ *
+ *  both new, old and proto are locked.
+ */
+static void
+tcpsplice(Conv *new, Conv *old)
+{
+	Tcppriv *tpriv;
+	Tcpctl *ntcb, *otcb;
+
+	ntcb = (Tcpctl*)new->ptcl;
+	otcb = (Tcpctl*)old->ptcl;
+
+	if(otcb->state != Established)
+		return;
+
+	ntcb->bypass = old;
+	otcb->bypass = new;
+
+	qsetlimit(new->rq, conf.pipeqsize);
+	qsetlimit(old->rq, conf.pipeqsize);
+	qsetbypass(new->wq, tcpbypass);
+	qsetbypass(old->wq, tcpbypass);
+
+	/* stop timers and drump resequencing queue */
+	tpriv = (Tcppriv*)new->p->priv;
+	tcphalt(tpriv, &ntcb->timer);
+	tcphalt(tpriv, &otcb->timer);
+	tcphalt(tpriv, &ntcb->acktimer);
+	tcphalt(tpriv, &otcb->acktimer);
+	tcphalt(tpriv, &ntcb->katimer);
+	tcphalt(tpriv, &otcb->katimer);
+	dumpreseq(ntcb);
+	dumpreseq(otcb);
+}
+
+/*
  *  come here when we finally get an ACK to our SYN-ACK.
  *  lookup call in limbo.  if found, create a new conversation
  *
@@ -1575,6 +1643,7 @@
 tcpincoming(Conv *s, Tcp *seg, uchar *src, uchar *dst, int version)
 {
 	Tcppriv *tpriv;
+	Route *rp;
 	Limbo *lp, **l;
 	Conv *new;
 	Tcpctl *tcb;
@@ -1630,7 +1699,8 @@
 	tcpsynackrtt(tcb);
 
 	/* the same as what we sent in SYN,ACK */
-	tcb->mss = tcpmtu(v6lookup(s->p->f, src, dst, new), &tcb->scale, version);
+	rp = v6lookup(s->p->f, src, dst, new);
+	tcb->mss = tcpmtu(rp, &tcb->scale, version);
 
 	tcpsetmss(new, lp->mss);
 	tcpsetscale(new, lp->ws);
@@ -1666,8 +1736,23 @@
 		panic("tcpincoming: version %d", new->ipversion);
 	}
 
+	qlock(new);
 	tcpsetstate(new, Established);
 	iphtadd(&tpriv->ht, new);
+	if(rp != nil && (rp->type & Runi) != 0){
+		/* see if we can find the local conversation and splice them together */
+		Iphash *iph = iphtlook(&tpriv->ht, new->laddr, new->lport, new->raddr, new->rport);
+		if(iph != nil && iph->trans == 0){
+			Conv *old = iphconv(iph);
+			if(old != new){
+				qlock(old);
+				tcpsplice(new, old);
+				qunlock(old);
+			}
+		}
+	}
+	qunlock(new);
+
 	return new;
 }
 
@@ -2144,6 +2229,9 @@
 			tcpsetstate(s, Established);
 			break;
 		case Established:
+			if(tcb->bypass != nil)
+				goto raise;
+			/* wet floor */
 		case Close_wait:
 		case Finwait2:
 			update(s, &seg);
@@ -2513,7 +2601,7 @@
 	Tcpctl *tcb = (Tcpctl*)s->ptcl;
 
 	qlock(s);
-	if(tcb->state != Closed){
+	if(tcb->state != Closed && tcb->bypass == nil){
 		if(++(tcb->kato) > MAX_KAT) {
 			localclose(s, Etimedout);
 		} else if(tcpsendka(s) < 0) {
@@ -2536,6 +2624,9 @@
 
 	if(tcb->state != Established)
 		return "connection must be in Establised state";
+
+	if(tcb->bypass != nil)
+		return nil;
 
 	x = DEF_KAT;
 	if(n > 1){
--- a/sys/src/9/port/portfns.h
+++ b/sys/src/9/port/portfns.h
@@ -288,6 +288,7 @@
 Block*		qbread(Queue*, int);
 long		qbwrite(Queue*, Block*);
 Queue*		qbypass(void (*)(void*, Block*), void*);
+void		qsetbypass(Queue*, void (*)(void*, Block*));
 int		qcanread(Queue*);
 void		qclose(Queue*);
 int		qconsume(Queue*, void*, int);
--- a/sys/src/9/port/qio.c
+++ b/sys/src/9/port/qio.c
@@ -759,6 +759,24 @@
 	return q;
 }
 
+void
+qsetbypass(Queue *q, void (*bypass)(void*, Block*))
+{
+	Block *b;
+
+	ilock(q);
+	if(bypass != nil){
+		while((b = qremove(q)) != nil){
+			iunlock(q);
+			(*bypass)(q->arg, b);
+			ilock(q);
+		}
+		q->state |= Qstarve;
+	}
+	q->bypass = bypass;
+	iunlock_consumer(q);
+}
+
 static int
 notempty(void *a)
 {
--