Job queue using xmlrpc and threading

Collapse
This topic is closed.
X
X
 
  • Time
  • Show
Clear All
new posts
  • psaffrey@googlemail.com

    Job queue using xmlrpc and threading

    I'm trying to implement an application that will listen for requests,
    run them when they are present but also be able to add new requests
    even while it's running. I've tried to do this using the thread and
    xmlrpc modules - the idea is that an XML-RPC exposed object tell the
    queue thread object to add a job. If there are no jobs running, it
    creates a file, adds the new job to the end and then another
    consumption thread starts working through the jobs in the file. New
    jobs coming in are just added to the end of the file by the queue
    thread.

    Unfortunately, I can't get it to work. The problem is that the
    consumption thread seems to read the job queue before it gets written,
    even though I've used a lock. I've also had the application get to the
    stage where it ignores ctrl-c, which is a little worrying - I fear it
    doesn't bode well for future stability... I don't have a lot of
    experience with multi-threaded applications, so I may well have chosen
    a poor approach.

    I've posted the code below. It's in three parts, the job queue, the
    manager that listens for new requests and an application to add jobs
    to the queue. Sorry for the long listings...

    Any guidance gratefully received,

    Peter

    ===
    testqueue.py:

    import thread
    import time
    import shutil
    import os

    class JobQueue:

    def __init__(self, filename):
    self.queuefile = filename
    self.jobthread = 0
    # lock for the jobfile queue file
    self.jfqlock = thread.allocate _lock()

    def addJob(self, jobfileuri, email):
    self.jfqlock.ac quire()
    if not self.jobthread:
    print "starting jobfile consumption thread"
    if os.access(self. queuefile, os.R_OK):
    print "cleaning stale jobfile queue file"
    try:
    os.remove(self. queuefile)
    except:
    print "problem removing jobfile queue file"
    raise
    self.jobthread = thread.start_ne w_thread(self.m ain, ())
    else:
    print "using existing jobfile consumption thread in file",
    self.queuefile
    fh = open(self.queue file, "a")
    # choose "::::" as a delimiter
    print >fh, jobfileuri + "::::" + email
    self.jfqlock.re lease()
    return 1

    def main(self):
    while 1:
    self.jfqlock.ac quire()
    rfh = open(self.queue file, "r")
    # breakpoint()
    finput = rfh.readline()
    print "found:", finput
    if not finput:
    print "finished jobfiles. Closing thread"
    rfh.close()
    self.jobthread = 0
    self.jfqlock.re lease()
    return
    else:
    print "found jobfile in queue: attempting to run"
    # most of this is to shift up the lines in the file
    tmpname = self.queuefile + ".tmp"
    wfh = open(tmpname, "w")
    line = rfh.readline()
    while line:
    wfh.write(line)
    line = rfh.readline()
    wfh.close()
    rfh.close()
    shutil.move(tmp name, self.queuefile)
    self.jfqlock.re lease()
    # lop off the trailing line break
    print
    print "***run Starting***"
    try:
    self.runJob(fin put[:-1])
    print "***run finished***"
    except:
    print "***run failed***"
    print

    def runJob(self, job):
    time.sleep(2.0)
    print "running job", job
    if not report:
    print "some problem with run. Cannot mail out report"
    return


    ===
    queuemanager.py

    from testqueue import JobQueue
    from SimpleXMLRPCSer ver import *


    class QM:
    def __init__(self, filename):
    self.jq = JobQueue("queue .txt")

    def addJob(self, jobname):
    self.jq.addJob( jobname, "tester@testdom ain")

    if __name__=="__ma in__":
    qm = QM("jobqueue.tx t")
    rpcserver = SimpleXMLRPCSer ver(("localhost ", 8000))
    rpcserver.regis ter_instance(qm )
    rpcserver.serve _forever()

    ===
    addjob.py:

    import xmlrpclib
    import sys

    server = "localhost"
    port = 8000

    serveradd = "http://%s:%s" % (server, port)
    manager = xmlrpclib.Serve rProxy(serverad d)

    jobname = sys.argv[1]

    manager.addJob( jobname)
  • Jeff

    #2
    Re: Job queue using xmlrpc and threading

    Try using the Queue module - http://docs.python.org/lib/module-Queue.html.
    Here is a tutorial with it - http://www.artfulcode.net/articles/m...eading-python/.

    Comment

    Working...