code: plan9front

Download patch

ref: 51319cc5b56924b03d9c6188f6a1e189a497d42c
parent: 41f85d46f866c57a618b8df85c0e59cebc04f095
author: Ori Bernstein <ori@eigenstate.org>
date: Sat Jan 23 11:05:21 EST 2021

upas/runq: bring back -a

Turns out -a is useful in crontab, so bring
back a simplified version of it. This only
iterates through directories one at a time.

--- a/sys/man/8/qer
+++ b/sys/man/8/qer
@@ -15,7 +15,7 @@
 .br
 .B runq
 [
-.B -dER
+.B -adER
 ]
 [
 .B -f
@@ -50,13 +50,7 @@
 The data file contains the standard input to
 .IR qer .
 The files are created in the directory
-.IR root / subdir ,
-where
-.I subdir
-is the argument to
-.B -q
-if present, else the contents of
-.BR /dev/user .
+.IR root / subdi
 The names of the control and data files differ only
 in the first character which is `C' and `D' respectively.
 .IR Mktemp (2)
@@ -77,6 +71,18 @@
 only in the first character.  The first one
 starts with 'F', the second 'G', etc.
 .P
+Qer takes the following arguments:
+.TP
+.B -q subdir
+Specifies the queue subdirectory to use. If
+unspecified, the contents of
+.B /dev/user
+are used.
+.TP
+.B -f file
+Specifies the files to copy into the queue
+directory, in the manner described above.
+.P
 .I Runq
 processes the files queued by
 .IR qer .
@@ -124,32 +130,34 @@
 error file exists and was last modified within the preceding 10 minutes.
 A data file older than one hour will not be processed if its error
 file exists and was last modified within the preceding hour.
-The 
+.PP
+The following flags are accepted by runq:
+.TP
+.B -a
+Causes runq to process all user directories in sequence, instead
+of only the directory of the current user.
+.TP
 .B -E
-flag causes all files to be reprocessed regardless of
+Causes all files to be reprocessed regardless of
 the file times.
-.P
-The
+.TP
 .B -R
-flag instructs
+Instructs
 .I runq
 never to give up on a failed queue job, instead leaving
 it in the queue to be retried.
-.P
-The
+.TP
 .B -d
-option causes debugging output on standard error
+Causes debugging output on standard error
 describing the progress through the queues.
-.P
-The
+.TP
 .B -t
-flags specifies the number of hours
+Specifies the number of hours
 that retries will continue after a send
 failure.  The default is 48 hours.
-.P
-The
+.TP
 .BR -r
-flag limits the number of files that are processed in a single pass of a queue.
+Limits the number of files that are processed in a single pass of a queue.
 .I Runq
 accumulates the entire directory containing a queue before processing any
 files.  When a queue contains many files and the system does not
@@ -161,10 +169,9 @@
 be drained incrementally.  It is most useful in combination with the
 .I -q
 flag.
-.P
-The argument following the
+.TP
 .B -n
-flag specifies the number of queued jobs that are processed
+Specifies the number of queued jobs that are processed
 in parallel from the queue; the default is 1.
 This is useful for a large queue to be processed with a bounded
 amount of parallelism.
--- a/sys/src/cmd/upas/q/runq.c
+++ b/sys/src/cmd/upas/q/runq.c
@@ -2,7 +2,14 @@
 #include <ctype.h>
 
 typedef struct Job Job;
+typedef struct Wdir Wdir;
 
+struct Wdir {
+	Dir	*d;
+	int	nd;
+	char	*name;
+};
+
 struct Job {
 	Job	*next;
 	int	pid;
@@ -10,6 +17,7 @@
 	int	dfd;
 	char	**av;
 	char	*buf;	/* backing for av */
+	Wdir	*wdir;	/* work dir */
 	Dir	*dp;	/* not owned */
 	Mlock	*l;
 	Biobuf	*b;
@@ -17,7 +25,7 @@
 
 void	doalldirs(void);
 void	dodir(char*);
-Job*	dofile(Dir*);
+Job*	dofile(Wdir*, Dir*);
 Job*	donefile(Job*, Waitmsg*);
 void	freejob(Job*);
 void	rundir(char*);
@@ -24,11 +32,10 @@
 char*	file(char*, char);
 void	warning(char*, void*);
 void	error(char*, void*);
-int	returnmail(char**, char*, char*);
-void	logit(char*, char*, char**);
+int	returnmail(char**, Wdir*, char*, char*);
+void	logit(char*, Wdir*, char*, char**);
 void	doload(int);
 
-#define HUNK 32
 char	*cmd;
 char	*root;
 int	debug;
@@ -35,12 +42,6 @@
 int	giveup = 2*24*60*60;
 int	limit;
 
-/* the current directory */
-Dir	*dirbuf;
-long	ndirbuf = 0;
-int	nfiles;
-char	*curdir;
-
 char *runqlog = "runq";
 
 char	**badsys;		/* array of recalcitrant systems */
@@ -48,6 +49,7 @@
 int	njob = 1;		/* number of concurrent jobs to invoke */
 int	Eflag;			/* ignore E.xxxxxx dates */
 int	Rflag;			/* no giving up, ever */
+int	aflag;			/* do all dirs */
 
 void
 usage(void)
@@ -82,20 +84,27 @@
 	case 'q':
 		qdir = EARGF(usage());
 		break;
+	case 'a':
+		aflag++;
+		break;
 	case 'n':
 		njob = atoi(EARGF(usage()));
 		if(njob == 0)
 			usage();
 		break;
+	default:
+		usage();
+		break;
 	}ARGEND;
 
 	if(argc != 2)
 		usage();
 
-	if(qdir == nil) 
+	if(!aflag && qdir == nil){
 		qdir = getuser();
-	if(qdir == nil)
-		error("unknown user", 0);
+		if(qdir == nil)
+			error("unknown user", 0);
+	}
 	root = argv[0];
 	cmd = argv[1];
 
@@ -102,7 +111,10 @@
 	if(chdir(root) < 0)
 		error("can't cd to %s", root);
 
-	dodir(qdir);
+	if(aflag)
+		doalldirs();
+	else
+		dodir(qdir);
 	exits(0);
 }
 
@@ -129,13 +141,41 @@
 }
 
 /*
+ *  run all user directories, must be bootes (or root on unix) to do this
+ */
+void
+doalldirs(void)
+{
+	Dir *db;
+	int fd;
+	long i, n;
+
+
+	if((fd = open(".", OREAD)) == -1)
+		warning("opening %s", root);
+		return;
+	}
+	if((n = dirreadall(fd, &db)) == -1){
+		warning("reading %s: ", root);
+		return;
+	}
+	for(i=0; i<n; i++){
+		if((db[i].qid.type & QTDIR) == 0)
+			continue;
+		if(emptydir(db[i].name))
+			continue;
+		dodir(db[i].name);
+	}
+	free(db);
+	close(fd);
+}
+
+/*
  *  cd to a user directory and run it
  */
 void
 dodir(char *name)
 {
-	curdir = name;
-
 	if(chdir(name) < 0){
 		warning("cd to %s", name);
 		return;
@@ -152,9 +192,10 @@
 void
 rundir(char *name)
 {
-	Job *hd, *j, **p;
 	int nlive, fidx, fd, found;
+	Job *hd, *j, **p;
 	Waitmsg *w;
+	Wdir wd;
 
 	fd = open(".", OREAD);
 	if(fd == -1){
@@ -164,14 +205,15 @@
 	fidx= 0;
 	hd = nil;
 	nlive = 0;
-	nfiles = dirreadall(fd, &dirbuf);
-	while(nlive > 0 ||  fidx< nfiles){
-		while(fidx< nfiles && nlive < njob){
-			if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){
+	wd.name = name;
+	wd.nd = dirreadall(fd, &wd.d);
+	while(nlive > 0 ||  fidx< wd.nd){
+		while(fidx< wd.nd && nlive < njob){
+			if(strncmp(wd.d[fidx].name, "C.", 2) != 0){
 				fidx++;
 				continue;
 			}
-			if((j = dofile(&dirbuf[fidx])) != nil){
+			if((j = dofile(&wd, &wd.d[fidx])) != nil){
 				nlive++;
 				j->next = hd;
 				hd = j;
@@ -201,7 +243,7 @@
 			goto rescan;
 	}
 	assert(hd == nil);
-	free(dirbuf);
+	free(wd.d);
 	close(fd);
 }
 
@@ -209,15 +251,15 @@
  *  free files matching name in the current directory
  */
 void
-remmatch(char *name)
+remmatch(Wdir *w, char *name)
 {
 	long i;
 
-	syslog(0, runqlog, "removing %s/%s", curdir, name);
+	syslog(0, runqlog, "removing %s/%s", w->name, name);
 
-	for(i=0; i<nfiles; i++){
-		if(strcmp(&dirbuf[i].name[1], &name[1]) == 0)
-			remove(dirbuf[i].name);
+	for(i=0; i<w->nd; i++){
+		if(strcmp(&w->d[i].name[1], &name[1]) == 0)
+			remove(w->d[i].name);
 	}
 
 	/* error file (may have) appeared after we read the directory */
@@ -263,7 +305,7 @@
  *  tracks the running pid.
  */
 Job*
-dofile(Dir *dp)
+dofile(Wdir *w, Dir *dp)
 {
 	int dtime, efd, i, etime;
 	Job *j;
@@ -280,13 +322,13 @@
 	d = dirstat(file(dp->name, 'D'));
 	if(d == nil){
 		syslog(0, runqlog, "no data file for %s", dp->name);
-		remmatch(dp->name);
+		remmatch(w, dp->name);
 		return nil;
 	}
 	if(dp->length == 0){
 		if(time(0)-dp->mtime > 15*60){
 			syslog(0, runqlog, "empty ctl file for %s", dp->name);
-			remmatch(dp->name);
+			remmatch(w, dp->name);
 		}
 		return nil;
 	}
@@ -338,7 +380,7 @@
 	 *	- read args into (malloc'd) buffer
 	 *	- malloc a vector and copy pointers to args into it
 	 */
-
+	j->wdir = w;
 	j->buf = malloc(dp->length+1);
 	if(j->buf == nil){
 		warning("buffer allocation", 0);
@@ -381,9 +423,9 @@
 	j->av[j->ac] = 0;
 
 	if(!Eflag &&time(0) - dtime > giveup){
-		if(returnmail(j->av, dp->name, "Giveup") != 0)
-			logit("returnmail failed", dp->name, j->av);
-		remmatch(dp->name);
+		if(returnmail(j->av, w, dp->name, "Giveup") != 0)
+			logit("returnmail failed", w, dp->name, j->av);
+		remmatch(w, dp->name);
 		goto done;
 	}
 
@@ -415,7 +457,7 @@
 				fprint(2, " %s", j->av[i]);
 			fprint(2, "\n");
 		}
-		logit("execing", dp->name, j->av);
+		logit("execing", w, dp->name, j->av);
 		close(0);
 		dup(j->dfd, 0);
 		close(j->dfd);
@@ -461,9 +503,9 @@
 			fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
 		if(!Rflag && strstr(wm->msg, "Retry")==0){
 			/* return the message and remove it */
-			if(returnmail(j->av, j->dp->name, wm->msg) != 0)
-				logit("returnmail failed", j->dp->name, j->av);
-			remmatch(j->dp->name);
+			if(returnmail(j->av, j->wdir, j->dp->name, wm->msg) != 0)
+				logit("returnmail failed", j->wdir, j->dp->name, j->av);
+			remmatch(j->wdir, j->dp->name);
 		} else {
 			/* add sys to bad list and try again later */
 			nbad++;
@@ -472,7 +514,7 @@
 		}
 	} else {
 		/* it worked remove the message */
-		remmatch(j->dp->name);
+		remmatch(j->wdir, j->dp->name);
 	}
 	n = j->next;
 	freejob(j);
@@ -520,7 +562,7 @@
  *  return 0 if successful
  */
 int
-returnmail(char **av, char *name, char *msg)
+returnmail(char **av, Wdir *w, char *name, char *msg)
 {
 	char buf[256], attachment[Pathlen], *sender;
 	int i, fd, pfd[2];
@@ -529,7 +571,7 @@
 	String *s;
 
 	if(av[1] == 0 || av[2] == 0){
-		logit("runq - dumping bad file", name, av);
+		logit("runq - dumping bad file", w, name, av);
 		return 0;
 	}
 
@@ -537,21 +579,21 @@
 	sender = s_to_c(s);
 
 	if(!returnable(sender) || strcmp(sender, "postmaster") == 0) {
-		logit("runq - dumping p to p mail", name, av);
+		logit("runq - dumping p to p mail", w, name, av);
 		return 0;
 	}
 
 	if(pipe(pfd) < 0){
-		logit("runq - pipe failed", name, av);
+		logit("runq - pipe failed", w, name, av);
 		return -1;
 	}
 
 	switch(rfork(RFFDG|RFPROC|RFENVG)){
 	case -1:
-		logit("runq - fork failed", name, av);
+		logit("runq - fork failed", w, name, av);
 		return -1;
 	case 0:
-		logit("returning", name, av);
+		logit("returning", w, name, av);
 		close(pfd[1]);
 		close(0);
 		dup(pfd[0], 0);
@@ -592,7 +634,7 @@
 	wm = wait();
 	if(wm == nil){
 		syslog(0, "runq", "wait: %r");
-		logit("wait failed", name, av);
+		logit("wait failed", w, name, av);
 		return -1;
 	}
 	i = 0;
@@ -599,7 +641,7 @@
 	if(wm->msg[0]){
 		i = -1;
 		syslog(0, "runq", "returnmail child: %s", wm->msg);
-		logit("returnmail child failed", name, av);
+		logit("returnmail child failed", w, name, av);
 	}
 	free(wm);
 	return i;
@@ -635,12 +677,12 @@
 }
 
 void
-logit(char *msg, char *file, char **av)
+logit(char *msg, Wdir *w, char *file, char **av)
 {
 	int n, m;
 	char buf[256];
 
-	n = snprint(buf, sizeof(buf), "%s/%s: %s", curdir, file, msg);
+	n = snprint(buf, sizeof(buf), "%s/%s: %s", w->name, file, msg);
 	for(; *av; av++){
 		m = strlen(*av);
 		if(n + m + 4 > sizeof(buf))