1
2
3 """
4 A module that starts a bunch of subprocesses and distributes
5 work amongst them, then collects the results.
6
7 Subprocesses must follow a protocol: they must listen for commands
8 on the standard input (commands are encoded with C{cPickle}),
9 and they must produce C{cPickle}d tuples on their standard output.
10 NOTE: THEY CANNOT PRINT ANYTHING ELSE! (But it's OK for subprocesses
11 to produce stuff on the standard error output.)
12
13 PROTOCOL:
14
15 1. Execcing state: C{spread_jobs} execs a group of subprocesses. You have
16 full control of the argument list.
17
18 2. Preparation state: C{spread_jobs} will send a list of C{cPickled} things,
19 one at a time to the subprocess. No explicit terminator
20 is added, so the subprocess must either know how many things
21 are coming or the list should contain some terminator.
22 (E.g. the last item of the list could be L{None}, and the
23 subprocess would wait for it.)
24 In this state, the subprocess must not produce anything on
25 the standard output.
26
27 3. Running state: C{spread_jobs} will send one C{cPickled} thing to the
28 subprocess and then wait for a C{cPickled} C{tuple} to come
29 back.
30
31 The request to the subprocess is a C{tuple(int, arbitrary)}.
32 The L{int} is a task-ID number which must be returned with the answer.
33 The C{arbitrary} is whatever information the subprocess needs to do its job.
34
35 The subprocess responds with a 3-L{tuple}. The first element of the
36 tuple is either:
37
38 - An instance of L{TooBusy}. This causes the main process to
39 put the task back on the queue and ignore this subprocess for
40 a while. The second element is printed; the third is ignored.
41 - An instance of L{RemoteException}. This leads to termination of
42 the job and causes an exception to be raised on the main thread.
43 The other two elements of the tuple are printed.
44 - Anything else. In that case, the first element is returned on the
45 output list and the other two elements are printed.
46
47 The subprocess loops in the running state.
48 Normally, it should terminate when its standard input is closed.
49 (It can terminate itself if it wishes by simply closing the standard output and exiting.)
50
51 4. Shutdown state: The subprocess can produce anything it wants. This will
52 be collected up and returned to the caller of C{spread_jobs.main}.
53
54 You can use this to run certain normal linux commands by not sending anything
55 in the preparatory state or the running state. You will then be handed
56 the standard output as a list of strings.
57
58 Normally, the action happens in the running state.
59
60 Normally, the subprocess looks much like this::
61
62 import cPickle
63 while True:
64 try:
65 control = cPickle.load(stdin)
66 except EOFError:
67 break
68 d, so, se = compute(control)
69 cPickle.dump((d, so, se), stdout, CP.HIGHEST_PROTOCOL)
70 stdout.flush()
71 stdout.close()
72
73 @sort: main, replace, Replace, append
74 """
75
76
77 from __future__ import with_statement
78
79 import re
80 import sys
81 import math
82 import time
83 import random
84 import cPickle as CP
85 import threading
86 import subprocess
87 import StringIO
88
89 from gmisclib import die
90 from gmisclib import gpkmisc
91 from gmisclib import dictops
92 from gyropy import g_mailbox as MB
93
94
95
97 """A singleton marker for values that haven't been computed."""
98 pass
99
100
101
105
106
108 """An exception that corresponds to one raised by a subprocess.
109 This is raised in the parent process.
110 """
115
117 return '<%s.RemoteException: %s>' % (__name__, repr(self.args))
118
121
122
126
127
128
152
153
157
159 """This class represents a connection from the master process down to one of the slaves.
160 It also keeps track of how often the slave reports that it is too busy to work.
161 """
162
163 BUSYFAC1 = 0.85
164 BUSYFAC2 = 1-BUSYFAC1
165 ConnectError = ()
166 SendError = (IOError, ValueError)
167 GetError = (EOFError,)
168
170 """
171 @param arglist: an argument list to execute to start a subprocess.
172 @type arglist: a sequence of anything that can be converted to strings.
173 @note: This is where the arglist is finally converted to a list of strings.
174 @except OSError: when connection cannot be set up.
175 """
176 self.arglist = [str(q) for q in arglist ]
177 self.lock = threading.Lock()
178 self.uness = 0.0
179
180 - def send(self, todo):
181 """@except IOError: Trouble sending."""
182 raise RuntimeError, "Virtual Method"
183
185 """@return: (answer, standard_output, standard_error) or None.
186 @except EOFError: No data available from slave.
187 """
188 raise RuntimeError, "Virtual Method"
189
191 """Closes the channel to the slave."""
192 raise RuntimeError, "Virtual Method"
193
195 """Waits for the slave to terminate and closes the channel from the slave.
196 @return: any final output.
197 @rtype: list(str)
198 """
199 raise RuntimeError, "Virtual Method"
200
202 return ' '.join(self.arglist)
203
204
206 with self.lock:
207 assert 0 <= self.uness <= 1.0
208 return self.uness
209
213
215 with self.lock:
216 if state == "running":
217 self.uness = 0.5
218 else:
219 self.uness = 0.0
220
225
226
228 """This is a L{Connection} via stdin/stdout to a subprocess."""
229 GetError = (EOFError, CP.UnpicklingError, ValueError)
230 ConnectError = (OSError,)
231
233 Connection.__init__(self, arglist)
234 try:
235 self.p = subprocess.Popen(self.arglist, stdin=subprocess.PIPE,
236 stdout=subprocess.PIPE, stderr=sys.stderr,
237 close_fds=True
238 )
239 except self.ConnectError, ex:
240 raise CannotCreateConnection(str(ex), *(ex.args + ("subprocess.Popen",) + tuple(self.arglist)))
241
242 - def send(self, todo):
243 CP.dump(todo, self.p.stdin)
244 self.p.stdin.flush()
245
246
247
249 while True:
250 try:
251 rv = CP.load(self.p.stdout)
252 return rv
253 except CP.UnpicklingError, y:
254 die.warn("spread_jobs: Junk response: %s" % repr(y))
255 die.info("spread_jobs: Junk remainder: %s" % self.p.stdout.readline())
256 die.info("spread_jobs: Junk arglist: %s" % self.argstring())
257 raise
258 return None
259
260
264
266
267 tmp = self.p.stdout.readlines()
268 self.p.stdout.close()
269
270 self.p.wait()
271 self.p = None
272 return tmp
273
274
275
276
277
278
279
281 return max(0.01, min(1000.0, float(x)))
282
283
284
286
287 - def __init__(self, iqueue, oqueue, p, stdin, solock, wss):
288 """@param stdin: something to send at the start of the subprocess
289 to get it going. This is before the main processing starts.
290 @type stdin: any iterable that yields something that can be
291 given to L{cPickle.dumps}.
292 @param p: The process to run. It's already been started,
293 but no input/output has occurred.
294 @type p: L{Connection}
295 @param solock: a lock to serialize the standard output
296 @type solock: threading.Lock
297 """
298 threading.Thread.__init__(self, name='spread_jobs%s' % id(self))
299 self.iqueue = iqueue
300 self.oqueue = oqueue
301 self.wss = wss
302 assert isinstance(p, Connection)
303 self.p = p
304 self.solock = solock
305 try:
306 for x in stdin:
307 self.p.send(x)
308 except self.p.SendError, x:
309 die.info("I/O error in thread start-up: %s" % str(x))
310 self.p.close1()
311
312
314
315
316
317
318
319
320
321
322 na, nlive = self.wss.num_active()
323 return (na > 3+1.3*(len(self.iqueue)+1) and
324 nlive*self.p.usefulness() < na
325 )
326
327
329 """The reason we have the dependence on nw and nq is that we want to
330 shorten delays as the queue empties. Basically, we don't want any
331 processes sleeping when the queue is empty. That would just waste
332 time to no purpose.
333
334 The reason we have the dependence on delta_t is that we want to limit
335 the number of CPU cycles that are wasted in polling other machines to
336 see if they are too busy.
337 """
338 nw = self.wss.num_active()[0]
339 nq = len(self.iqueue)
340 delay = delay_sanitize(qdelay)
341 fac1 = math.exp(1.0-self.p.usefulness())
342 fac2 = max(0.1, float(nq+1)/float(nq+nw+1))
343 delay *= min(fac1, fac2)
344 return math.sqrt(delta_t*delay) + delay
345
346
348 self.p.mystate("running")
349 while True:
350
351 try:
352 i, todo = self.iqueue.get()
353 except MB.EOF:
354
355 self.p.mystate("iqueue EOF")
356 break
357 t0 = time.time()
358 try:
359 self.p.send(todo)
360 except self.p.SendError, x:
361 die.warn("IO Error on send task %d to worker: %s" % (i, str(x)))
362 self.iqueue.put((i, todo))
363 self.p.mystate("SendError")
364 break
365 try:
366 q, so, se = self.p.get()
367 except self.p.GetError, x:
368 die.warn("Exception %s when trying to read worker %s" % (x, self.p.argstring()))
369 self.iqueue.put((i, todo))
370 self.p.mystate("BadRead")
371 break
372 t2 = time.time()
373 if isinstance(q, TooBusy):
374 self.iqueue.put((i, todo))
375 if self.want_shutdown():
376 die.info("TooBusy: giving up on %s" % str(so))
377 self.p.mystate("giving up")
378 break
379 else:
380 tsleep = self.compute_delay(q.delay, t2-t0)
381 die.info("TooBusy: sleeping %.3f for %s" % (tsleep, str(so)))
382 self.p.I_am_working(0)
383 time.sleep(tsleep)
384 continue
385 self.p.I_am_working(1)
386
387 self.oqueue.put((i, t0, t2, q))
388 with self.solock:
389 if so:
390 sys.stdout.writelines('#slot so%d ------\n' % i)
391 sys.stdout.writelines(so)
392 sys.stdout.flush()
393 if se:
394 sys.stderr.writelines('#slot se%d ------\n' % i)
395 sys.stderr.writelines(se)
396 sys.stderr.flush()
397 if isinstance(q, RemoteException):
398 die.info("Remote Exception info: %s" % str(q.args))
399 die.warn("Exception from remote job (index=%d): %s" % (i, str(q)))
400 q.index = "index=%d" % i
401 q.comment = "so=%s # se=%s" % (gpkmisc.truncate(';'.join(so), 40),
402 gpkmisc.truncate(';'.join(se), 40)
403 )
404
405 while True:
406 try:
407 j, todo = self.iqueue.get()
408 except MB.EOF:
409 self.p.mystate("RemoteException")
410 break
411 self.oqueue.put( (j, t0, t2, notComputed) )
412
413 self.p.close1()
414
415
416 - def join(self, timeout=None):
417 tmp = self.p.close2()
418 threading.Thread.join(self, timeout=timeout)
419 self.wss = None
420 return tmp
421
422
423
424
425
426
427
428
429 _past_performance = PastPerformance()
430
431 -def main(todo, list_of_args, connection_factory=Connection_subprocess,
432 stdin=None, verbose=False, timing_callback=None, tail_callback=None,
433 past_performance=_past_performance):
434 """Pass a bunch of work to other processes.
435 @param stdin: a list of stuff to send to the other processes before the computation is
436 properly commenced.
437 @type stdin: list(whatever)
438 @param todo: a sequence of tasks to do
439 @type todo: sequence(whatever)
440 @param list_of_args:
441 @type list_of_args: list(list(str))
442 @param past_performance: a L{PastPerformance} object if you want the system to remember which
443 machines were more/less successful last time and to start jobs on the more successful
444 machines first. L{None} if you don't want any memory. The default is to have memory.
445 @type past_performance: None or L{PastPerformance}.
446 @rtype: C{tuple(list(whatever), list(list(str)))}
447 @return: A 2-tuple. The first item is
448 a list of the results produced by the other processes.
449 Items in the returned list correspond to items in the todo list.
450 These are the stuff that comes out, pickled, on the standard
451 output after each chunk of data is fed into the standard input.
452 The second item is a list of the remaining outputs, as read
453 by file.readlines(); these are one per process.
454 """
455
456 if stdin is None:
457 stdin = []
458 solock = threading.Lock()
459 iqueue = MB.maillist(enumerate(todo))
460 ntodo = len(iqueue)
461 oqueue = MB.mailbox()
462 ths = workers_c(connection_factory, list_of_args, iqueue, oqueue, stdin, solock,
463 tail_callback=tail_callback, verbose=verbose,
464 past_performance=past_performance
465 )
466 if verbose:
467 die.info("%d jobs started" % len(ths))
468 if not ths:
469 raise RuntimeError, "No subprocesses started."
470
471 oi = 0
472 rv = [notComputed] * ntodo
473 while oi < ntodo:
474 try:
475 i, ts, te, ans = oqueue.get()
476 except MB.EOF:
477 raise RuntimeError, "whoops"
478 if timing_callback:
479 timing_callback(ts, te)
480 assert rv[i] is notComputed
481 rv[i] = ans
482 oi += 1
483
484 iqueue.putclose()
485 oqueue.putclose()
486 if verbose:
487 die.info("Joining %d jobs" % len(ths))
488 ths.join()
489 if past_performance is not None:
490 ths.pass_performance(past_performance)
491 return rv
492
493
494
496 """This creates a group of worker threads that take tasks from the iqueue and put the
497 answers on the oqueue.
498 """
499 - def __init__(self, connection_factory, list_of_args, iqueue, oqueue, stdin, solock,
500 verbose=False, tail_callback=None, past_performance=None):
501
502 self.tail_callback = tail_callback
503 self.args = list_of_args
504 self.ths = []
505 self.verbose = verbose
506
507 for args in sorted(list_of_args, key=past_performance):
508
509
510
511
512
513
514 if self.verbose:
515 die.info("Args= %s" % str(args))
516 try:
517 p = connection_factory(args)
518 except CannotCreateConnection, x:
519 die.warn("Cannot create connection: %s on %s" % (x, args))
520 continue
521 t = _ThreadJob(iqueue, oqueue, p, stdin, solock, self)
522 self.ths.append(t)
523 t.start()
524
526 nj = 0
527 for t in self.ths:
528 oo = t.join()
529 nj += 1
530 if self.tail_callback:
531 self.tail_callback(t.arglist, oo)
532 if self.verbose:
533 die.info("Joined %d jobs" % nj)
534
539
542
543
544
546 """@return: total usefulness of all workers and the number of live workers
547 @rtype: (float, int)
548 """
549 EPS = 1e-6
550 na = 0.0
551 nlive = 0
552
553 for t in self.ths:
554 tmp = t.p.usefulness()
555 if tmp > EPS:
556
557
558 na += tmp
559 nlive += 1
560 return (na, nlive)
561
562
563
565 if x > 0 and random.random()<0.3:
566 sys.exit(random.randrange(2))
567 sys.stderr.write('test_worker starting\n')
568 while True:
569 sys.stderr.write('test_worker loop\n')
570 try:
571 txt = CP.load(sys.stdin)
572 except EOFError:
573 sys.stderr.write('test_worker got EOF\n')
574 break
575 if random.random() < 0.1:
576 sys.stderr.write("Sending TooBusy")
577 CP.dump((TooBusy(0.1), None, None), sys.stdout, CP.HIGHEST_PROTOCOL)
578 sys.stdout.flush()
579 continue
580 if random.random() < 0.5:
581 time.sleep(random.expovariate(30.0))
582 sys.stderr.write('test worker control=%s\n' % txt)
583 if txt is None:
584 sys.stderr.write('test_worker got stop\n')
585 break
586 sys.stderr.write('test_worker dump %s\n' % txt)
587 CP.dump((txt, ['stdout:%s\n' % txt], ['stderr\n']), sys.stdout, CP.HIGHEST_PROTOCOL)
588 sys.stdout.flush()
589 sys.stderr.write('test_worker finished\n')
590 sys.stdout.close()
591
592
594 for np in range(1, 5):
595 print 'NP=%d' % np
596 for i in range(1, 6):
597 print 'ntasks=%d' % (i*5)
598 x = ['a', 'b', 'c', 'd', 'e'] * i
599 arglist = [ ['python', script, 'worker', str(j)] for j in range(np) ]
600 y = list(main(x, arglist, verbose=True))
601 assert x == y
602
603
604
606 """For testing.
607 """
608
611
613 self.seek(0, 0)
614 while True:
615 try:
616 d, so, se = CP.load(self)
617 except EOFError:
618 break
619 sys.stdout.write('STDOUT:\n')
620 sys.stdout.writelines(so)
621 sys.stdout.write('STDERR:\n')
622 sys.stdout.writelines(se)
623 sys.stdout.write('d=%s\n' % str(d))
624 sys.stdout.flush()
625
626
628 stdin = StringIO.StringIO()
629 CP.dump(input, stdin)
630 stdin.flush()
631 stdin.seek(0, 0)
632 stdout = unpickled_pseudofile()
633 return (stdin, stdout)
634
635
636
638
639 frc = []
640 while fr:
641 frc.append( (re.compile(fr[0]), fr[1]) )
642 fr = fr[2:]
643 o = []
644 for l in list_of_lists:
645 assert isinstance(l, (tuple, list)), "List of lists contains %s within %s" % (repr(l), list_of_lists)
646 tmp = []
647 for t in l:
648 for (find, repl) in frc:
649 assert isinstance(t, str), "whoops! t=%s" % str(t)
650 t = find.sub(repl, t)
651 tmp.append(t)
652 o.append(tmp)
653 return o
654
655
656 -def Replace(list_of_lists, pat, length, replacement):
657 assert isinstance(replacement, list)
658 assert length > 0
659 cpat = re.compile(pat)
660 o = []
661 for l in list_of_lists:
662 assert isinstance(l, (tuple, list)), "List of lists contains %s within %s" % (repr(l), list_of_lists)
663 tmp = list(l)
664 while tmp:
665 found = None
666 for (i, t) in enumerate(tmp):
667 if cpat.match(t):
668 found = i
669 break
670 if found is not None:
671 tmp[i:i+length] = list(replacement)
672 else:
673 break
674 o.append(tmp)
675 return o
676
677
678 -def append(list_of_lists, *a):
679 o = []
680 for l in list_of_lists:
681 tmp = tuple(l) + a
682 o.append(tmp)
683 return o
684
685
686
687 if __name__ == '__main__':
688 if len(sys.argv)==3 and sys.argv[1] == 'worker':
689 test_worker(int(sys.argv[2]))
690 else:
691 test_(sys.argv[0])
692