#!python # # Send inquiries to an active task manager # (see Scientific.DistributedComputing) # Written by Konrad Hinsen # last revision: 2006-12-12 # from Scientific.DistributedComputing import TaskManager import Pyro.core import Pyro.naming import Pyro.errors from optparse import OptionParser import os, sys Pyro.core.initClient(banner=False) commands = {} arg_count = {} def getTaskManager(label): if options.master is None: try: return Pyro.core.getProxyForURI("PYRONAME://TaskManager.%s" % label) except Pyro.errors.NamingError: print "No name server or no task manager with label %s" % label raise SystemExit else: try: uri = "PYROLOC://%s/TaskManager.%s" % (options.master, label) return Pyro.core.getProxyForURI(uri) except Pyro.errors.URIError: print "No host %s" % options.master raise SystemExit except Pyro.errors.ProtocolError: print "No task manager with label %s" % label raise SystemExit def showInfo(label): task_manager = getTaskManager(label) active_processes = task_manager.numberOfActiveProcesses() waiting, running, finished = task_manager.numberOfTasks() print "%d active processe(s)" % active_processes if options.verbose: for i in range(active_processes): print " %d: %s" % (i, task_manager.activeProcessInfo(i)) print "%d waiting tasks" % sum(waiting.values()) if options.verbose: for tag, count in waiting.items(): print " %s: %d" % (tag, count) print "%d running tasks" % sum(running.values()) if options.verbose: for tag, count in running.items(): print " %s: %d" % (tag, count) print "%d results waiting to be retrieved" % sum(finished.values()) if options.verbose: for tag, count in finished.items(): print " %s: %d" % (tag, count) commands["show"] = showInfo arg_count["show"] = 2 def listTaskManagers(): if options.master is not None: print "Command 'list' requires a name server" raise SystemExit pyro_ns = Pyro.naming.NameServerLocator().getNS() for label, type in pyro_ns.list("TaskManager"): if type == 1: print label if options.verbose: task_manager = getTaskManager(label) active_processes = task_manager.numberOfActiveProcesses() print " %d active processe(s)" % active_processes commands["list"] = listTaskManagers arg_count["list"] = 1 def runSlave(label): task_manager = getTaskManager(label) try: slave_code = task_manager.retrieveData("slave_code") directory = task_manager.retrieveData("cwd") except KeyError: print "No slave code available for %s" % label raise SystemExit namespace = {} sys.modules["__main__"].SLAVE_PROCESS_LABEL = label sys.modules["__main__"].SLAVE_NAMESPACE = namespace os.chdir(directory) exec slave_code in namespace commands["slave"] = runSlave arg_count["slave"] = 2 # Parse command line and execute command parser = OptionParser(usage="Usage: %prog [options] command [label]") parser.add_option("-m", "--master", help="hostname of the machine running the master process") parser.add_option("-v", "--verbose", action="store_true", help="verbose output") options, args = parser.parse_args() if len(args) < 1: parser.error("incorrect number of arguments") try: command = commands[args[0]] except KeyError: parser.error("unknown command %s" % args[0]) if len(args) != arg_count[args[0]]: parser.error("incorrect number of arguments") command(*args[1:])