File: class/Extras/Code/pp3e/threadtools.py
#########################################################
# system-wide thread interface utilities for GUIs
# single thread queue and checker shared by all windows
# never block GUI - just verify operations and quits
# worker threads can overlap with main, other workers
#
# using a queue of callback functions and argument is
# more useful than a simple data queue if there can be
# many kinds of threads running at the same time - each
# kind may have a different implied exit action
#
# because GUI API is not completely thread-safe,
# instead of calling GUI update callbacks directly
# after thread exit, place them on a shared queue,
# to be run from a timer loop in the main thread;
# may be useful even if GUI is thread safe, to make
# GUI update points less random and unpredictable;
# to use, split modal action into pre and post-thread
# parts, plus threaded action and error handler;
#
# assumes thread action raises exception on failure,
# and thread action has a 'progress' callback argument
# if it supports progress updates; also assumes a
# that queue will contain callback functions for use
# in a GUI app: needs widget object and 'after' event;
# use queue directly for other producer/consumer apps;
#
# caveat: this loop could be diasabled if no threads running,
# but system-wide CPU utilization stayed at 1% on my machine
# before and after this program was launched, and this program's
# process took 0% or 1% of the CPU itself (usually 0%, and only
# occasionally 1% while worker thread were running)- the
# overhead of thread checker loop is trivial at 250msecs,
# 4x per second;
#
# see also: threadtools-nongui.py - fails if call update
##########################################################
#????????
# caveat: as coded, progress updates only reported 10x
# per second: if update come faster than they can be taken
# off the queue, GUI will fall behind and not do exit action
# at true thread exit; queue only supports get/put, not
# changes; better alts may be to consume entire queue on
# each timer callback (but may block rest of gui if many
# actions on queue), or signal progress without putting
# items on queue for each update - cd make mail xfer loops
# smarter and only call progress every N times; this may or
# may not be a realistic concern: how many network connections
# can respond to more than 10 top or retr request per second?
# only an ussue of very may messages in mbox
# bumped up timer to 10x per sec, cpu usage at 0% usally, 1% occasionally
# run if no threads
try: # raise ImportError to
import thread # run with gui blocking
except ImportError: # if threads not available
class fakeThread:
def start_new_thread(self, func, args):
func(*args)
thread = fakeThread()
import Queue, sys
threadQueue = Queue.Queue(maxsize=0) # infinite size
def threadChecker(widget, delayMsecs=100): # 10x per second
"""
in main thread: periodically check thread completions queue;
do implied GUI actions on queue in this main GUI thread;
one consumer (GUI), multiple producers (load,del,send);
a simple list may suffice: list.append/pop are atomic;
one action at a time here: a loop may block GUI temporarily;
"""
try:
(callback, args) = threadQueue.get(block=False)
except Queue.Empty:
pass
else:
callback(*args)
widget.after(delayMsecs, lambda: threadChecker(widget))
def threaded(action, args, context, onExit, onFail, onProgress):
"""
in a new thread: run action, manage thread queue puts;
calls added to queue here are dispatched in main thread;
run action with args now, later run on* calls with context;
allows action to be ignorant of use as a thread here;
passing callbacks into thread directly may update GUI in
thread - passed func in shared memory but called in thread;
progress callback just adds callback to queue with passed args;
don't update counters here: not finished till taken off queue
"""
try:
if not onProgress: # wait for action in this thread
action(*args) # assume raises exception if fails
else:
progress = (lambda *any: threadQueue.put((onProgress, any+context)))
action(progress=progress, *args)
except:
threadQueue.put((onFail, (sys.exc_info(),)+context))
else:
threadQueue.put((onExit, context))
def startThread(action, args, context, onExit, onFail, onProgress=None):
thread.start_new_thread(
threaded, (action, args, context, onExit, onFail, onProgress))
class ThreadCounter:
"""
a thread-safe counter or flag
"""
def __init__(self):
self.count = 0
self.mutex = thread.allocate_lock() # or use Threading.semaphore
def incr(self):
self.mutex.acquire()
self.count += 1
self.mutex.release()
def decr(self):
self.mutex.acquire()
self.count -= 1
self.mutex.release()
def __len__(self): return self.count # True/False if used as a flag
if __name__ == '__main__': # self-test code when run
import time, ScrolledText
def threadaction(id, reps, progress): # what the thread does
for i in range(reps):
time.sleep(1)
if progress: progress(i) # progress callback: queued
if id % 2 == 1: raise Exception # odd numbered: fail
def mainaction(i): # code that spawns thread
myname = 'thread-%s' % i
startThread(threadaction, (i, 3),
(myname,), threadexit, threadfail, threadprogress)
# thread callbacks: dispatched off queue in main thread
def threadexit(myname):
root.insert('end', '%s\texit\n' % myname)
root.see('end')
def threadfail(exc_info, myname):
root.insert('end', '%s\tfail\t%s\n' % (myname, exc_info[0]))
root.see('end')
def threadprogress(count, myname):
root.insert('end', '%s\tprog\t%s\n' % (myname, count))
root.see('end')
root.update() # works here: run in main thread
# make enclosing GUI
# spawn batch of worker threads on each mouse click: may overlap
root = ScrolledText.ScrolledText()
root.pack()
threadChecker(root) # start thread loop in main thread
root.bind('<Button-1>', lambda event: map(mainaction, range(6)))
root.mainloop() # popup window, enter tk event loop