git: 9front

Download patch

ref: cc97df963884fe0e451db38fcf88f39dca68a032
parent: d0e7264790211a8372e30b9512a66f82ce7fae39
author: cinap_lenrek <cinap_lenrek@centraldogma>
date: Sun Dec 11 13:35:03 EST 2011

listen: add process limit option

--- a/sys/man/8/listen
+++ b/sys/man/8/listen
@@ -10,6 +10,8 @@
 .IR trustsrvdir ]
 .RB [ -n
 .IR namespace ]
+.RB [ -p
+.IR maxprocs ]
 .RI [ net ]
 .PP
 .B aux/listen1
@@ -56,6 +58,18 @@
 .B -n
 selects an alternate
 .IR namespace .
+The
+.B -p
+option limits the number of processes that
+.I listen
+spawns to service the connections. If the
+.I maxprocs
+limit is reached,
+.I listen
+will log the event and delay servicing until the number
+of connection processes drops below the limit again. A
+.I maxprocs
+smaller or equal zero means no limit (default).
 Option
 .B -q
 suppresses affirmative log information.
--- a/sys/src/cmd/aux/listen.c
+++ b/sys/src/cmd/aux/listen.c
@@ -22,7 +22,7 @@
 };
 
 int	readstr(char*, char*, char*, int);
-void	dolisten(char*, char*, int, char*, char*);
+void	dolisten(char*, char*, int, char*, char*, long*);
 void	newcall(int, char*, char*, Service*);
 int 	findserv(char*, char*, Service*, char*);
 int	getserv(char*, char*, Service*);
@@ -33,6 +33,8 @@
 
 char	listenlog[] = "listen";
 
+long	procs;
+long	maxprocs;
 int	quiet;
 int	immutable;
 char	*cpu;
@@ -45,7 +47,7 @@
 void
 usage(void)
 {
-	error("usage: aux/listen [-q] [-n namespace] [-d servdir] [-t trustdir]"
+	error("usage: aux/listen [-q] [-n namespace] [-d servdir] [-t trustdir] [-p maxprocs]"
 		" [proto]");
 }
 
@@ -88,6 +90,7 @@
 	quiet = 0;
 	immutable = 0;
 	argv0 = argv[0];
+	maxprocs = 0;
 	cpu = getenv("cputype");
 	if(cpu == 0)
 		error("can't get cputype");
@@ -105,6 +108,9 @@
 	case 'n':
 		namespace = EARGF(usage());
 		break;
+	case 'p':
+		maxprocs = atoi(EARGF(usage()));
+		break;
 	case 'i':
 		/*
 		 * fixed configuration, no periodic
@@ -161,9 +167,11 @@
 listendir(char *protodir, char *srvdir, int trusted)
 {
 	int ctl, pid, start;
-	char dir[40], err[128];
+	char dir[40], err[128], ds[128];
+	long childs;
 	Announce *a;
 	Waitmsg *wm;
+	int whined;
 
 	if (srvdir == 0)
 		return;
@@ -201,20 +209,25 @@
 
 			sleep((pid*10)%200);
 
+			/* copy to stack */
+			strncpy(ds, a->a, sizeof(ds));
+			whined = a->whined;
+
 			/* a process per service */
-			switch(pid = rfork(RFFDG|RFPROC)){
+			switch(pid = rfork(RFFDG|RFPROC|RFMEM)){
 			case -1:
-				syslog(1, listenlog, "couldn't fork for %s", a->a);
+				syslog(1, listenlog, "couldn't fork for %s", ds);
 				break;
 			case 0:
+				childs = 0;
 				for(;;){
-					ctl = announce(a->a, dir);
+					ctl = announce(ds, dir);
 					if(ctl < 0) {
 						errstr(err, sizeof err);
-						if (!a->whined)
+						if (!whined)
 							syslog(1, listenlog,
 							   "giving up on %s: %r",
-							a->a);
+							ds);
 						if(strstr(err, "address in use")
 						    != nil)
 							exits("addr-in-use");
@@ -221,7 +234,7 @@
 						else
 							exits("ctl");
 					}
-					dolisten(proto, dir, ctl, srvdir, a->a);
+					dolisten(proto, dir, ctl, srvdir, ds, &childs);
 					close(ctl);
 				}
 			default:
@@ -377,14 +390,57 @@
 }
 
 void
-dolisten(char *proto, char *dir, int ctl, char *srvdir, char *dialstr)
+dolisten(char *proto, char *dir, int ctl, char *srvdir, char *dialstr, long *pchilds)
 {
 	Service s;
-	char ndir[40];
-	int nctl, data;
+	char ndir[40], wbuf[64];
+	int nctl, data, wfd, nowait;
 
 	procsetname("%s %s", dir, dialstr);
+
+	wfd = -1;
+	nowait = RFNOWAIT;
+	if(pchilds && maxprocs > 0){
+		snprint(wbuf, sizeof(wbuf), "/proc/%d/wait", getpid());
+		if((wfd = open(wbuf, OREAD)) >= 0)
+			nowait = 0;
+	}
+
 	for(;;){
+		if(!nowait){
+			static int hit = 0;
+			Dir *d;
+
+			/*
+			 *  check for exited subprocesses
+			 */
+			if(procs >= maxprocs || (*pchilds % 8) == 0)
+				while(*pchilds > 0){
+					d = dirfstat(wfd);
+					if(d == nil || d->length == 0){
+						free(d);
+						break;
+					}
+					free(d);
+					if(read(wfd, wbuf, sizeof(wbuf)) > 0){
+						adec(&procs);
+						pchilds[0]--;
+					}
+				}
+
+			if(procs >= maxprocs){
+				if(!quiet && !hit)
+					syslog(1, listenlog, "%s: process limit of %ld reached",
+						proto, maxprocs);
+				if(hit < 8)
+					hit++;
+				sleep(10<<hit);
+				continue;
+			} 
+			if(hit > 0)
+				hit--;
+		}
+
 		/*
 		 *  wait for a call (or an error)
 		 */
@@ -392,6 +448,8 @@
 		if(nctl < 0){
 			if(!quiet)
 				syslog(1, listenlog, "listen: %r");
+			if(wfd >= 0)
+				close(wfd);
 			return;
 		}
 
@@ -398,7 +456,7 @@
 		/*
 		 *  start a subprocess for the connection
 		 */
-		switch(rfork(RFFDG|RFPROC|RFNOWAIT|RFENVG|RFNAMEG|RFNOTEG)){
+		switch(rfork(RFFDG|RFPROC|RFMEM|RFENVG|RFNAMEG|RFNOTEG|nowait)){
 		case -1:
 			reject(nctl, ndir, "host overloaded");
 			close(nctl);
@@ -423,10 +481,16 @@
 			fprint(nctl, "keepalive");
 			close(ctl);
 			close(nctl);
+			if(wfd >= 0)
+				close(wfd);
 			newcall(data, proto, ndir, &s);
 			exits(0);
 		default:
 			close(nctl);
+			if(nowait)
+				break;
+			ainc(&procs);
+			pchilds[0]++;
 			break;
 		}
 	}
--