ref: f916f6b77e7bc5716093850b66946ddbdc94c6a8
parent: 2dbddd386595eda64f97b1fae3747451c47dd4f2
author: cinap_lenrek <cinap_lenrek@felloff.net>
date: Sun Oct 19 08:32:51 EDT 2025
tcp: splice local tcp conversations together, bypassing ip stack When we are creating a connection to outselfs, we can skip tcp and ip completely and just make the conversations into a pair of pipes. This makes loopback tcp connections as fast as pipes and saves memory as no retransmission is needed. For this, new qsetbypass() function was added, that changes a (transmission) queue into a bypass at runtime. It drains whatever is in the queue first and then setting bypass function pointer. A queue can be un-bypassed again by passing nil as the bypass function.
--- a/sys/src/9/ip/tcp.c
+++ b/sys/src/9/ip/tcp.c
@@ -200,6 +200,7 @@
typedef struct Tcpctl Tcpctl;
struct Tcpctl
{+ Conv *bypass; /* The other when bypassing */
uchar state; /* Connection state */
uchar type; /* Listening or active connection */
uchar code; /* Icmp code */
@@ -251,6 +252,7 @@
ulong rttseq; /* Round trip sequence */
int srtt; /* Smoothed round trip */
int mdev; /* Mean deviation of round trip */
+
union {Tcp4hdr tcp4hdr;
Tcp6hdr tcp6hdr;
@@ -412,6 +414,10 @@
qclose(s->rq);
qclose(s->wq);
qclose(s->eq);
+ if(tcb->bypass != nil){+ tcb->bypass = nil;
+ qsetbypass(s->wq, nil);
+ }
break;
case Close_wait: /* Remote closes */
@@ -523,8 +529,14 @@
case Syn_sent:
localclose(c, nil);
break;
- case Syn_received:
case Established:
+ if(tcb->bypass != nil){+ qhangup(tcb->bypass->rq, nil);
+ localclose(c, nil);
+ break;
+ }
+ /* wet floor */
+ case Syn_received:
tcb->flgcnt++;
tcb->snd.nxt++;
tcpsetstate(c, Finwait1);
@@ -547,9 +559,12 @@
qlock(s);
switch(tcb->state) {+ case Established:
+ if(tcb->bypass != nil)
+ break;
+ /* wet floor */
case Syn_sent:
case Syn_received:
- case Established:
case Close_wait:
/*
* Push data
@@ -623,7 +638,7 @@
Tcpctl *tcb = (Tcpctl*)s->ptcl;
qlock(s);
- if(tcb->state != Closed){+ if(tcb->state != Closed && tcb->bypass == nil){tcb->flags |= FORCE;
tcpoutput(s);
}
@@ -1325,7 +1340,7 @@
{Tcpctl *tcb = (Tcpctl*)s->ptcl;
- if(ipcmp(s->raddr, IPnoaddr) != 0 && tcb->state != Closed) {+ if(ipcmp(s->raddr, IPnoaddr) != 0 && tcb->state != Closed && tcb->bypass == nil) {Block *bp;
Tcp seg;
@@ -1565,7 +1580,60 @@
tcb->mdev = rtt<<(LOGDGAIN-1);
}
+static void
+tcpbypass(void *arg, Block *b)
+{+ Conv *c = (Conv*)arg;
+ Tcpctl *tcb = (Tcpctl*)c->ptcl;
+ Conv *o = tcb->bypass;
+
+ if(o == nil || ((Tcpctl*)o->ptcl)->bypass != c){+ freeblist(b);
+ return;
+ }
+ qbwrite(o->rq, b);
+}
+
/*
+ * splice two local tcp conversations together,
+ * having the wq directly bypass into the other
+ * ends rq.
+ *
+ * both new, old and proto are locked.
+ */
+static void
+tcpsplice(Conv *new, Conv *old)
+{+ Tcppriv *tpriv;
+ Tcpctl *ntcb, *otcb;
+
+ ntcb = (Tcpctl*)new->ptcl;
+ otcb = (Tcpctl*)old->ptcl;
+
+ if(otcb->state != Established)
+ return;
+
+ ntcb->bypass = old;
+ otcb->bypass = new;
+
+ qsetlimit(new->rq, conf.pipeqsize);
+ qsetlimit(old->rq, conf.pipeqsize);
+ qsetbypass(new->wq, tcpbypass);
+ qsetbypass(old->wq, tcpbypass);
+
+ /* stop timers and drump resequencing queue */
+ tpriv = (Tcppriv*)new->p->priv;
+ tcphalt(tpriv, &ntcb->timer);
+ tcphalt(tpriv, &otcb->timer);
+ tcphalt(tpriv, &ntcb->acktimer);
+ tcphalt(tpriv, &otcb->acktimer);
+ tcphalt(tpriv, &ntcb->katimer);
+ tcphalt(tpriv, &otcb->katimer);
+ dumpreseq(ntcb);
+ dumpreseq(otcb);
+}
+
+/*
* come here when we finally get an ACK to our SYN-ACK.
* lookup call in limbo. if found, create a new conversation
*
@@ -1575,6 +1643,7 @@
tcpincoming(Conv *s, Tcp *seg, uchar *src, uchar *dst, int version)
{Tcppriv *tpriv;
+ Route *rp;
Limbo *lp, **l;
Conv *new;
Tcpctl *tcb;
@@ -1630,7 +1699,8 @@
tcpsynackrtt(tcb);
/* the same as what we sent in SYN,ACK */
- tcb->mss = tcpmtu(v6lookup(s->p->f, src, dst, new), &tcb->scale, version);
+ rp = v6lookup(s->p->f, src, dst, new);
+ tcb->mss = tcpmtu(rp, &tcb->scale, version);
tcpsetmss(new, lp->mss);
tcpsetscale(new, lp->ws);
@@ -1666,8 +1736,23 @@
panic("tcpincoming: version %d", new->ipversion);}
+ qlock(new);
tcpsetstate(new, Established);
iphtadd(&tpriv->ht, new);
+ if(rp != nil && (rp->type & Runi) != 0){+ /* see if we can find the local conversation and splice them together */
+ Iphash *iph = iphtlook(&tpriv->ht, new->laddr, new->lport, new->raddr, new->rport);
+ if(iph != nil && iph->trans == 0){+ Conv *old = iphconv(iph);
+ if(old != new){+ qlock(old);
+ tcpsplice(new, old);
+ qunlock(old);
+ }
+ }
+ }
+ qunlock(new);
+
return new;
}
@@ -2144,6 +2229,9 @@
tcpsetstate(s, Established);
break;
case Established:
+ if(tcb->bypass != nil)
+ goto raise;
+ /* wet floor */
case Close_wait:
case Finwait2:
update(s, &seg);
@@ -2513,7 +2601,7 @@
Tcpctl *tcb = (Tcpctl*)s->ptcl;
qlock(s);
- if(tcb->state != Closed){+ if(tcb->state != Closed && tcb->bypass == nil){ if(++(tcb->kato) > MAX_KAT) {localclose(s, Etimedout);
} else if(tcpsendka(s) < 0) {@@ -2536,6 +2624,9 @@
if(tcb->state != Established)
return "connection must be in Establised state";
+
+ if(tcb->bypass != nil)
+ return nil;
x = DEF_KAT;
if(n > 1){--- a/sys/src/9/port/portfns.h
+++ b/sys/src/9/port/portfns.h
@@ -288,6 +288,7 @@
Block* qbread(Queue*, int);
long qbwrite(Queue*, Block*);
Queue* qbypass(void (*)(void*, Block*), void*);
+void qsetbypass(Queue*, void (*)(void*, Block*));
int qcanread(Queue*);
void qclose(Queue*);
int qconsume(Queue*, void*, int);
--- a/sys/src/9/port/qio.c
+++ b/sys/src/9/port/qio.c
@@ -759,6 +759,24 @@
return q;
}
+void
+qsetbypass(Queue *q, void (*bypass)(void*, Block*))
+{+ Block *b;
+
+ ilock(q);
+ if(bypass != nil){+ while((b = qremove(q)) != nil){+ iunlock(q);
+ (*bypass)(q->arg, b);
+ ilock(q);
+ }
+ q->state |= Qstarve;
+ }
+ q->bypass = bypass;
+ iunlock_consumer(q);
+}
+
static int
notempty(void *a)
{--
⑨