File: class/Extras/Code/pp3e/threadtools.py

#########################################################
# system-wide thread interface utilities for GUIs
# single thread queue and checker shared by all windows
# never block GUI - just verify operations and quits
# worker threads can overlap with main, other workers
#
# using a queue of callback functions and argument is
# more useful than a simple data queue if there can be
# many kinds of threads running at the same time - each
# kind may have a different implied exit action
#
# because GUI API is not completely thread-safe,
# instead of calling GUI update callbacks directly
# after thread exit, place them on a shared queue,
# to be run from a timer loop in the main thread;
# may be useful even if GUI is thread safe, to make
# GUI update points less random and unpredictable;
# to use, split modal action into pre and post-thread
# parts, plus threaded action and error handler;
#
# assumes thread action raises exception on failure,
# and thread action has a 'progress' callback argument
# if it supports progress updates;  also assumes a
# that queue will contain callback functions for use
# in a GUI app: needs widget object and 'after' event;
# use queue directly for other producer/consumer apps;
#
# caveat: this loop could be diasabled if no threads running,
# but system-wide CPU utilization stayed at 1% on my machine
# before and after this program was launched, and this program's
# process took 0% or 1% of the CPU itself (usually 0%, and only
# occasionally 1% while worker thread were running)- the
# overhead of thread checker loop is trivial at 250msecs,
# 4x per second;
#
# see also: threadtools-nongui.py - fails if call update
##########################################################

#????????
# caveat: as coded, progress updates only reported 10x
# per second: if update come faster than they can be taken
# off the queue, GUI will fall behind and not do exit action
# at true thread exit; queue only supports get/put, not
# changes; better alts may be to consume entire queue on
# each timer callback (but may block rest of gui if many
# actions on queue), or signal progress without putting
# items on queue for each update - cd make mail xfer loops
# smarter and only call progress every N times; this may or
# may not be a realistic concern: how many network connections
# can respond to more than 10 top or retr request per second?
# only an ussue of very may messages in mbox
# bumped up timer to 10x per sec, cpu usage at 0% usally, 1% occasionally


# run if no threads
try:                                     # raise ImportError to
    import thread                        # run with gui blocking
except ImportError:                      # if threads not available 
    class fakeThread:
        def start_new_thread(self, func, args):
            func(*args)
    thread = fakeThread()

import Queue, sys
threadQueue = Queue.Queue(maxsize=0)              # infinite size


def threadChecker(widget, delayMsecs=100):        # 10x per second
    """
    in main thread: periodically check thread completions queue;
    do implied GUI actions on queue in this main GUI thread;
    one consumer (GUI), multiple producers (load,del,send);
    a simple list may suffice: list.append/pop are atomic;
    one action at a time here: a loop may block GUI temporarily;
    """
    try:
        (callback, args) = threadQueue.get(block=False)
    except Queue.Empty:
        pass
    else:
        callback(*args)
    widget.after(delayMsecs, lambda: threadChecker(widget))


def threaded(action, args, context, onExit, onFail, onProgress):
    """
    in a new thread: run action, manage thread queue puts;
    calls added to queue here are dispatched in main thread;
    run action with args now, later run on* calls with context;
    allows action to be ignorant of use as a thread here;
    passing callbacks into thread directly may update GUI in
    thread - passed func in shared memory but called in thread;
    progress callback just adds callback to queue with passed args;
    don't update counters here: not finished till taken off queue
    """
    try:
        if not onProgress:           # wait for action in this thread
            action(*args)            # assume raises exception if fails
        else:
            progress = (lambda *any: threadQueue.put((onProgress, any+context)))
            action(progress=progress, *args)
    except:
        threadQueue.put((onFail, (sys.exc_info(),)+context))
    else:
        threadQueue.put((onExit, context))

def startThread(action, args, context, onExit, onFail, onProgress=None):
    thread.start_new_thread(
        threaded, (action, args, context, onExit, onFail, onProgress)) 


class ThreadCounter:
    """
    a thread-safe counter or flag
    """
    def __init__(self):
        self.count = 0
        self.mutex = thread.allocate_lock()     # or use Threading.semaphore
    def incr(self):
        self.mutex.acquire()
        self.count += 1
        self.mutex.release()
    def decr(self):
        self.mutex.acquire()
        self.count -= 1
        self.mutex.release()
    def __len__(self): return self.count        # True/False if used as a flag


if __name__ == '__main__':                      # self-test code when run
    import time, ScrolledText
    
    def threadaction(id, reps, progress):       # what the thread does
        for i in range(reps):
            time.sleep(1)
            if progress: progress(i)            # progress callback: queued
        if id % 2 == 1: raise Exception         # odd numbered: fail

    def mainaction(i):                          # code that spawns thread
        myname = 'thread-%s' % i
        startThread(threadaction, (i, 3),
                    (myname,), threadexit, threadfail, threadprogress)

    # thread callbacks: dispatched off queue in main thread
    def threadexit(myname):
        root.insert('end', '%s\texit\n' % myname)
        root.see('end')
    def threadfail(exc_info, myname):
        root.insert('end', '%s\tfail\t%s\n' % (myname, exc_info[0]))
        root.see('end')
    def threadprogress(count, myname):
        root.insert('end', '%s\tprog\t%s\n' % (myname, count))
        root.see('end')
        root.update()   # works here: run in main thread

    # make enclosing GUI 
    # spawn batch of worker threads on each mouse click: may overlap
    root = ScrolledText.ScrolledText()
    root.pack()
    threadChecker(root)                 # start thread loop in main thread
    root.bind('<Button-1>', lambda event: map(mainaction, range(6)))
    root.mainloop()                     # popup window, enter tk event loop



[Home page] Books Code Blog Python Author Train Find ©M.Lutz