code: plan9front

Download patch

ref: f321298c551e4333fcf2819eabf7ce67ea443e20
parent: 5e20e8f963482a2008ed70cc8fa5973078248aed
author: Ori Bernstein <ori@eigenstate.org>
date: Sat Jan 23 06:03:05 EST 2021

upas/runq: support parallel queue processing, drop -a

When running a mail queue, it's useful to run it with limited
parallelism. This helps mailing lists process messages in a
reasonable time.

At the same time, we can remove the load balancing from runq,
since the kinds of systems that this matters on no longer
exist, and running multiple queues at once can be better
done through xargs.

--- a/sys/man/8/qer
+++ b/sys/man/8/qer
@@ -15,7 +15,7 @@
 .br
 .B runq
 [
-.B -adsER
+.B -dER
 ]
 [
 .B -f
@@ -26,10 +26,6 @@
 .I subdir
 ]
 [
-.B -l
-.I load
-]
-[
 .B -t
 .I time
 ]
@@ -39,7 +35,7 @@
 ]
 [
 .B -n
-.I nprocs
+.I njobs
 ]
 .I root cmd
 .SH DESCRIPTION
@@ -84,10 +80,7 @@
 .I Runq
 processes the files queued by
 .IR qer .
-Without the
-.B -a
-option,
-.I runq
+.I Runq
 processes all requests in the directory
 .IR root / subdir ,
 where
@@ -96,9 +89,6 @@
 .B -q
 if present, else the contents of
 .BR /dev/user .
-With the
-.B -a
-it processes all requests.
 Each request is processed by executing the command
 .I cmd
 with the contents of the control file as its arguments,
@@ -172,31 +162,12 @@
 .I -q
 flag.
 .P
-The
-.BR -s ,
-.BR -n ,
-and
-.B -l
-flags are only meaningful with the
-.B -a
-flag.  They control amount of parallelism that
-is used when sweeping all of the queues.  The argument following the
+The argument following the
 .B -n
-flag specifies the number of queues that are swept
-in parallel; the default is 50.  The argument following the
-.B -l
-flag specifies the total number of queues that are being swept.
-By default, there is no limit.  The number of active sweeps
-is cumulative over all active executions of
-.IR runq .
-The
-.B -s
-flag forces each queue directory to be processed by exactly
-one instance of
-.IR runq .
-This is useful on systems that connect to slow
-external systems and prevents all the queue sweeps from
-piling up trying to process a few slow systems.
+flag specifies the number of queued jobs that are processed
+in parallel from the queue; the default is 1.
+This is useful for a large queue to be processed with a bounded
+amount of parallelism.
 .PP
 .I Runq
 is often called from
--- a/sys/src/cmd/upas/q/runq.c
+++ b/sys/src/cmd/upas/q/runq.c
@@ -1,9 +1,25 @@
 #include "common.h"
 #include <ctype.h>
 
+typedef struct Job Job;
+
+struct Job {
+	Job	*next;
+	int	pid;
+	int	ac;
+	int	dfd;
+	char	**av;
+	char	*buf;	/* backing for av */
+	Dir	*dp;	/* not owned */
+	Mlock	*l;
+	Biobuf	*b;
+};
+
 void	doalldirs(void);
 void	dodir(char*);
-void	dofile(Dir*);
+Job*	dofile(Dir*);
+Job*	donefile(Job*, Waitmsg*);
+void	freejob(Job*);
 void	rundir(char*);
 char*	file(char*, char);
 void	warning(char*, void*);
@@ -17,7 +33,6 @@
 char	*root;
 int	debug;
 int	giveup = 2*24*60*60;
-int	load;
 int	limit;
 
 /* the current directory */
@@ -28,12 +43,9 @@
 
 char *runqlog = "runq";
 
-int	*pidlist;
 char	**badsys;		/* array of recalcitrant systems */
 int	nbad;
-int	npid = 50;
-int	sflag;			/* single thread per directory */
-int	aflag;			/* all directories */
+int	njob = 1;		/* number of concurrent jobs to invoke */
 int	Eflag;			/* ignore E.xxxxxx dates */
 int	Rflag;			/* no giving up, ever */
 
@@ -40,26 +52,18 @@
 void
 usage(void)
 {
-	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
-	exits("");
+	fprint(2, "usage: runq [-dE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
+	exits("usage");
 }
 
 void
 main(int argc, char **argv)
 {
-	char *qdir, *x;
+	char *qdir;
 
 	qdir = 0;
 
 	ARGBEGIN{
-	case 'l':
-		x = ARGF();
-		if(x == 0)
-			usage();
-		load = atoi(x);
-		if(load < 0)
-			load = 0;
-		break;
 	case 'E':
 		Eflag++;
 		break;
@@ -66,29 +70,21 @@
 	case 'R':	/* no giving up -- just leave stuff in the queue */
 		Rflag++;
 		break;
-	case 'a':
-		aflag++;
-		break;
 	case 'd':
 		debug++;
 		break;
 	case 'r':
-		limit = atoi(ARGF());
+		limit = atoi(EARGF(usage()));
 		break;
-	case 's':
-		sflag++;
-		break;
 	case 't':
-		giveup = 60*60*atoi(ARGF());
+		giveup = 60*60*atoi(EARGF(usage()));
 		break;
 	case 'q':
-		qdir = ARGF();
-		if(qdir == 0)
-			usage();
+		qdir = EARGF(usage());
 		break;
 	case 'n':
-		npid = atoi(ARGF());
-		if(npid == 0)
+		njob = atoi(EARGF(usage()));
+		if(njob == 0)
 			usage();
 		break;
 	}ARGEND;
@@ -96,15 +92,10 @@
 	if(argc != 2)
 		usage();
 
-	pidlist = malloc(npid*sizeof(*pidlist));
-	if(pidlist == 0)
-		error("can't malloc", 0);
-
-	if(aflag == 0 && qdir == 0) {
+	if(qdir == nil) 
 		qdir = getuser();
-		if(qdir == 0)
-			error("unknown user", 0);
-	}
+	if(qdir == nil)
+		error("unknown user", 0);
 	root = argv[0];
 	cmd = argv[1];
 
@@ -111,12 +102,7 @@
 	if(chdir(root) < 0)
 		error("can't cd to %s", root);
 
-	doload(1);
-	if(aflag)
-		doalldirs();
-	else
-		dodir(qdir);
-	doload(0);
+	dodir(qdir);
 	exits(0);
 }
 
@@ -142,75 +128,7 @@
 	return 0;
 }
 
-int
-forkltd(void)
-{
-	int i;
-	int pid;
-
-	for(i = 0; i < npid; i++){
-		if(pidlist[i] <= 0)
-			break;
-	}
-
-	while(i >= npid){
-		pid = waitpid();
-		if(pid < 0){
-			syslog(0, runqlog, "forkltd confused");
-			exits(0);
-		}
-
-		for(i = 0; i < npid; i++)
-			if(pidlist[i] == pid)
-				break;
-	}
-	pidlist[i] = fork();
-	return pidlist[i];
-}
-
 /*
- *  run all user directories, must be bootes (or root on unix) to do this
- */
-void
-doalldirs(void)
-{
-	Dir *db;
-	int fd;
-	long i, n;
-
-
-	fd = open(".", OREAD);
-	if(fd == -1){
-		warning("reading %s", root);
-		return;
-	}
-	n = dirreadall(fd, &db);
-	if(n > 0){
-		for(i=0; i<n; i++){
-			if(db[i].qid.type & QTDIR){
-				if(emptydir(db[i].name))
-					continue;
-				switch(forkltd()){
-				case -1:
-					syslog(0, runqlog, "out of procs");
-					doload(0);
-					exits(0);
-				case 0:
-					if(sysdetach() < 0)
-						error("%r", 0);
-					dodir(db[i].name);
-					exits(0);
-				default:
-					break;
-				}
-			}
-		}
-		free(db);
-	}
-	close(fd);
-}
-
-/*
  *  cd to a user directory and run it
  */
 void
@@ -234,30 +152,57 @@
 void
 rundir(char *name)
 {
-	int fd;
-	long i;
+	Job *hd, *j, **p;
+	int nlive, fidx, fd, found;
+	Waitmsg *w;
 
-	if(aflag && sflag)
-		fd = sysopenlocked(".", OREAD);
-	else
-		fd = open(".", OREAD);
+	fd = open(".", OREAD);
 	if(fd == -1){
 		warning("reading %s", name);
 		return;
 	}
+	fidx= 0;
+	hd = nil;
+	nlive = 0;
 	nfiles = dirreadall(fd, &dirbuf);
-	if(nfiles > 0){
-		for(i=0; i<nfiles; i++){
-			if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
+	while(nlive > 0 ||  fidx< nfiles){
+		while(fidx< nfiles && nlive < njob){
+			if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){
+				fidx++;
 				continue;
-			dofile(&dirbuf[i]);
+			}
+			if((j = dofile(&dirbuf[fidx])) != nil){
+				nlive++;
+				j->next = hd;
+				hd = j;
+			}
+			fidx++;
 		}
-		free(dirbuf);
+		if(nlive == 0){
+			fprint(2, "nothing live\n");
+			break;
+		}
+rescan:
+		if((w = wait()) == nil){
+			syslog(0, "runq", "wait error: %r");
+			break;
+		}
+		found = 0;
+		for(p = &hd; *p != nil; p = &(*p)->next){
+			if(w->pid == (*p)->pid){
+				*p = donefile(*p, w);
+				found++;
+				nlive--;
+				break;
+			}
+		}
+		free(w);
+		if(!found)
+			goto rescan;
 	}
-	if(aflag && sflag)
-		sysunlockfile(fd);
-	else
-		close(fd);
+	assert(hd == nil);
+	free(dirbuf);
+	close(fd);
 }
 
 /*
@@ -314,17 +259,16 @@
 }
 
 /*
- *  try a message
+ *  Launch trying a message, returning a job
+ *  tracks the running pid.
  */
-void
+Job*
 dofile(Dir *dp)
 {
+	int dtime, efd, i, etime;
+	Job *j;
 	Dir *d;
-	int dfd, ac, dtime, efd, pid, i, etime;
-	char *buf, *cp, **av;
-	Waitmsg *wm;
-	Biobuf *b;
-	Mlock *l = nil;
+	char *cp;
 
 	if(debug)
 		fprint(2, "dofile %s\n", dp->name);
@@ -337,7 +281,7 @@
 	if(d == nil){
 		syslog(0, runqlog, "no data file for %s", dp->name);
 		remmatch(dp->name);
-		return;
+		return nil;
 	}
 	if(dp->length == 0){
 		if(time(0)-dp->mtime > 15*60){
@@ -344,7 +288,7 @@
 			syslog(0, runqlog, "empty ctl file for %s", dp->name);
 			remmatch(dp->name);
 		}
-		return;
+		return nil;
 	}
 	dtime = d->mtime;
 	free(d);
@@ -358,31 +302,35 @@
 		if(etime - dtime < 60*60){
 			/* up to the first hour, try every 15 minutes */
 			if(time(0) - etime < 15*60)
-				return;
+				return nil;
 		} else {
 			/* after the first hour, try once an hour */
 			if(time(0) - etime < 60*60)
-				return;
+				return nil;
 		}
-
 	}
 
 	/*
 	 *  open control and data
 	 */
-	b = sysopen(file(dp->name, 'C'), "rl", 0660);
-	if(b == 0) {
+	j = malloc(sizeof(Job));
+	if(j == nil)
+		return nil;
+	memset(j, 0, sizeof(Job));
+	j->dp = dp;
+	j->dfd = -1;
+	j->b = sysopen(file(dp->name, 'C'), "rl", 0660);
+	if(j->b == 0) {
 		if(debug)
 			fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
-		return;
+		return nil;
 	}
-	dfd = open(file(dp->name, 'D'), OREAD);
-	if(dfd < 0){
+	j->dfd = open(file(dp->name, 'D'), OREAD);
+	if(j->dfd < 0){
 		if(debug)
 			fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		return;
+		freejob(j);
+		return nil;
 	}
 
 	/*
@@ -390,48 +338,36 @@
 	 *	- read args into (malloc'd) buffer
 	 *	- malloc a vector and copy pointers to args into it
 	 */
-	buf = malloc(dp->length+1);
-	if(buf == 0){
+
+	j->buf = malloc(dp->length+1);
+	if(j->buf == nil){
 		warning("buffer allocation", 0);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		close(dfd);
-		return;
+		freejob(j);
+		return nil;
 	}
-	if(Bread(b, buf, dp->length) != dp->length){
+	if(Bread(j->b, j->buf, dp->length) != dp->length){
 		warning("reading control file %s\n", dp->name);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		close(dfd);
-		free(buf);
-		return;
+		freejob(j);
+		return nil;
 	}
-	buf[dp->length] = 0;
-	av = malloc(2*sizeof(char*));
-	if(av == 0){
+	j->buf[dp->length] = 0;
+	j->av = malloc(2*sizeof(char*));
+	if(j->av == 0){
 		warning("argv allocation", 0);
-		close(dfd);
-		free(buf);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		return;
+		freejob(j);
+		return nil;
 	}
-	for(ac = 1, cp = buf; *cp; ac++){
+	for(j->ac = 1, cp = j->buf; *cp; j->ac++){
 		while(isspace(*cp))
 			*cp++ = 0;
 		if(*cp == 0)
 			break;
 
-		av = realloc(av, (ac+2)*sizeof(char*));
-		if(av == 0){
+		j->av = realloc(j->av, (j->ac+2)*sizeof(char*));
+		if(j->av == 0){
 			warning("argv allocation", 0);
-			close(dfd);
-			free(buf);
-			Bterm(b);
-			sysunlockfile(Bfildes(b));
-			return;
 		}
-		av[ac] = cp;
+		j->av[j->ac] = cp;
 		while(*cp && !isspace(*cp)){
 			if(*cp++ == '"'){
 				while(*cp && *cp != '"')
@@ -441,18 +377,18 @@
 			}
 		}
 	}
-	av[0] = cmd;
-	av[ac] = 0;
+	j->av[0] = cmd;
+	j->av[j->ac] = 0;
 
 	if(!Eflag &&time(0) - dtime > giveup){
-		if(returnmail(av, dp->name, "Giveup") != 0)
-			logit("returnmail failed", dp->name, av);
+		if(returnmail(j->av, dp->name, "Giveup") != 0)
+			logit("returnmail failed", dp->name, j->av);
 		remmatch(dp->name);
 		goto done;
 	}
 
 	for(i = 0; i < nbad; i++){
-		if(strcmp(av[3], badsys[i]) == 0)
+		if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0)
 			goto done;
 	}
 
@@ -460,33 +396,34 @@
 	 * Ken's fs, for example, gives us 5 minutes of inactivity before
 	 * the lock goes stale, so we have to keep reading it.
  	 */
-	l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
+	j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b));
 
 	/*
 	 *  transfer
 	 */
-	pid = fork();
-	switch(pid){
+	j->pid = fork();
+	switch(j->pid){
 	case -1:
-		sysunlock(l);
-		sysunlockfile(Bfildes(b));
+		sysunlock(j->l);
+		sysunlockfile(Bfildes(j->b));
 		syslog(0, runqlog, "out of procs");
 		exits(0);
 	case 0:
 		if(debug) {
-			fprint(2, "Starting %s", cmd);
-			for(ac = 0; av[ac]; ac++)
-				fprint(2, " %s", av[ac]);
+			fprint(2, "Starting %s\n", cmd);
+			for(i = 0; j->av[i]; i++)
+				fprint(2, " %s", j->av[i]);
 			fprint(2, "\n");
 		}
-		logit("execing", dp->name, av);
+		logit("execing", dp->name, j->av);
 		close(0);
-		dup(dfd, 0);
-		close(dfd);
+		dup(j->dfd, 0);
+		close(j->dfd);
 		close(2);
 		efd = open(file(dp->name, 'E'), OWRITE);
 		if(efd < 0){
-			if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
+			if(debug)
+				syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
 			efd = create(file(dp->name, 'E'), OWRITE, 0666);
 			if(efd < 0){
 				if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
@@ -494,50 +431,72 @@
 			}
 		}
 		seek(efd, 0, 2);
-		exec(cmd, av);
+		exec(cmd, j->av);
 		error("can't exec %s", cmd);
 		break;
 	default:
-		for(;;){
-			wm = wait();
-			if(wm == nil)
-				error("wait failed: %r", "");
-			if(wm->pid == pid)
-				break;
-			free(wm);
-		}
-		if(debug)
-			fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
+		return j;
+	}
+done:
+	freejob(j);
+	return nil;
+}
 
-		if(wm->msg[0]){
-			if(debug)
-				fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
-			if(!Rflag && strstr(wm->msg, "Retry")==0){
-				/* return the message and remove it */
-				if(returnmail(av, dp->name, wm->msg) != 0)
-					logit("returnmail failed", dp->name, av);
-				remmatch(dp->name);
-			} else {
-				/* add sys to bad list and try again later */
-				nbad++;
-				badsys = realloc(badsys, nbad*sizeof(char*));
-				badsys[nbad-1] = strdup(av[3]);
-			}
+/*
+ * Handle the completion of a job.
+ * Wait for the pid, check its status,
+ * and then pop the job off the list.
+ * Return the next running job.
+ */
+Job*
+donefile(Job *j, Waitmsg *wm)
+{
+	Job *n;
+
+	if(debug)
+		fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
+
+	if(wm->msg[0]){
+		if(debug)
+			fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
+		if(!Rflag && strstr(wm->msg, "Retry")==0){
+			/* return the message and remove it */
+			if(returnmail(j->av, j->dp->name, wm->msg) != 0)
+				logit("returnmail failed", j->dp->name, j->av);
+			remmatch(j->dp->name);
 		} else {
-			/* it worked remove the message */
-			remmatch(dp->name);
+			/* add sys to bad list and try again later */
+			nbad++;
+			badsys = realloc(badsys, nbad*sizeof(char*));
+			badsys[nbad-1] = strdup(j->av[3]);
 		}
-		free(wm);
+	} else {
+		/* it worked remove the message */
+		remmatch(j->dp->name);
+	}
+	n = j->next;
+	freejob(j);
+	return n;
+}
 
+/*
+ * Release resources associated with
+ * a job.
+ */
+void
+freejob(Job *j)
+{
+	if(j->b != nil){
+		sysunlockfile(Bfildes(j->b));
+		Bterm(j->b);
 	}
-done:
-	if (l)
-		sysunlock(l);
-	Bterm(b);
-	sysunlockfile(Bfildes(b));
-	free(buf);
-	free(av);
-	close(dfd);
+	if(j->dfd != -1)
+		close(j->dfd);
+	if(j->l != nil)
+		sysunlock(j->l);
+	free(j->buf);
+	free(j->av);
+	free(j);
 }
 
 
@@ -690,72 +649,4 @@
 		n += m + 3;
 	}
 	syslog(0, runqlog, "%s", buf);
-}
-
-char *loadfile = ".runqload";
-
-/*
- *  load balancing
- */
-void
-doload(int start)
-{
-	int fd;
-	char buf[32];
-	int i, n;
-	Mlock *l;
-	Dir *d;
-
-	if(load <= 0)
-		return;
-
-	if(chdir(root) < 0){
-		load = 0;
-		return;
-	}
-
-	l = syslock(loadfile);
-	fd = open(loadfile, ORDWR);
-	if(fd < 0){
-		fd = create(loadfile, 0666, ORDWR);
-		if(fd < 0){
-			load = 0;
-			sysunlock(l);
-			return;
-		}
-	}
-
-	/* get current load */
-	i = 0;
-	n = read(fd, buf, sizeof(buf)-1);
-	if(n >= 0){
-		buf[n] = 0;
-		i = atoi(buf);
-	}
-	if(i < 0)
-		i = 0;
-
-	/* ignore load if file hasn't been changed in 30 minutes */
-	d = dirfstat(fd);
-	if(d != nil){
-		if(d->mtime + 30*60 < time(0))
-			i = 0;
-		free(d);
-	}
-
-	/* if load already too high, give up */
-	if(start && i >= load){
-		sysunlock(l);
-		exits(0);
-	}
-
-	/* increment/decrement load */
-	if(start)
-		i++;
-	else
-		i--;
-	seek(fd, 0, 0);
-	fprint(fd, "%d\n", i);
-	sysunlock(l);
-	close(fd);
 }