diff options
author | 2012-06-12 19:07:23 +0200 | |
---|---|---|
committer | 2012-06-12 19:07:23 +0200 | |
commit | adb14edc11a41eea30f1897e807c0cedfb80aa99 (patch) | |
tree | 89d29df0b5f9a33691db999128e579e690829a68 /roverlay/depres/depresolver.py | |
parent | depres listeners, basic threading, misc fixes (diff) | |
download | R_overlay-adb14edc11a41eea30f1897e807c0cedfb80aa99.tar.gz R_overlay-adb14edc11a41eea30f1897e807c0cedfb80aa99.tar.bz2 R_overlay-adb14edc11a41eea30f1897e807c0cedfb80aa99.zip |
dependency resolution
* added a 'negative' result cache that stores unresolvable deps
* improved threading: channels now use a queue to wait for results from the
resolver instead of inefficient looping until all deps are done
* remove unused code
modified: roverlay/depres/channels.py
modified: roverlay/depres/communication.py
modified: roverlay/depres/depenv.py
modified: roverlay/depres/depresolver.py
modified: roverlay/depres/simpledeprule.py
Diffstat (limited to 'roverlay/depres/depresolver.py')
-rw-r--r-- | roverlay/depres/depresolver.py | 221 |
1 files changed, 121 insertions, 100 deletions
diff --git a/roverlay/depres/depresolver.py b/roverlay/depres/depresolver.py index a7f9dfb..299d3c6 100644 --- a/roverlay/depres/depresolver.py +++ b/roverlay/depres/depresolver.py @@ -2,8 +2,6 @@ # Copyright 2006-2012 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 -# todo depres result cache - import logging import threading @@ -14,39 +12,17 @@ except ImportError: import Queue as queue -from roverlay import config -from roverlay.depres import simpledeprule, communication, events -from roverlay.depres.worker import DepResWorker +from roverlay import config +from roverlay.depres import simpledeprule, communication, events #from roverlay.depres.depenv import DepEnv (implicit) -class PseudoAtomicCounter ( object ): - def __init__ ( self, number=0 ): - self.nlock = threading.Lock() - self._number = number - # --- end of __init__ (...) --- - - def increment_and_get ( self, step=1 ): - with self.nlock: - self._number += step - ret = self._number - return ret - # --- end of increment_and_get (...) --- - - def get ( self ): return self._number - - def __ge__ ( self, other_int ): return self._number >= other_int - def __gt__ ( self, other_int ): return self._number > other_int - def __le__ ( self, other_int ): return self._number <= other_int - def __lt__ ( self, other_int ): return self._number < other_int - - class DependencyResolver ( object ): """Main object for dependency resolution.""" LOGGER = logging.getLogger ( "DependencyResolver" ) - NUMTHREADS = config.get ( "DEPRES.jobcount", 0 ) + NUMTHREADS = config.get ( "DEPRES.jobcount", 2 ) def __init__ ( self ): """Initializes a DependencyResolver.""" @@ -61,8 +37,13 @@ class DependencyResolver ( object ): 'RESOLVED', 'UNRESOLVABLE' ) - self.runlock = threading.Lock() - self._threads = None + # this lock tells whether a dep res 'master' thread is running (locked) + self.runlock = threading.Lock() + # the dep res main thread + self._mainthread = None + # the dep res worker threads + self._threads = None + # the list of registered listener modules self.listeners = list () @@ -76,12 +57,16 @@ class DependencyResolver ( object ): # or marked as unresolvable self._depqueue_failed = queue.Queue() - # map: channel identifier -> number of done deps (resolved/unresolvable) + # the 'negative' result cache, stores unresolvable deps + # has to be (selectively?) cleared when + # new dep rule found / new rulepool etc. + self._dep_unresolvable = set () + + # map: channel identifier -> queue of done deps (resolved/unresolvable) # this serves two purposes: - # (a) obviously: the number of resolved deps which is useful for channels - # (b) the keys of this dict is the list of known channels - # - self._depdone = dict () + # (a) channels can do a blocking call on this queue + # (b) the keys of this dict are the list of known channels + self._depqueue_done = dict () # list of rule pools that have been created from reading files self.static_rule_pools = list () @@ -103,6 +88,7 @@ class DependencyResolver ( object ): * pool_type -- ignored. """ self.static_rule_pools.append ( rulepool ) + self._dep_unresolvable.clear() self._sort() # --- end of add_rulepool (...) --- @@ -191,37 +177,39 @@ class DependencyResolver ( object ): returns: channel """ - if channel in self._depdone: + if channel in self._depqueue_done: raise Exception ( "channel is already registered." ) - if channel._depres_master is None: - channel._depres_master = self + # register channel and allocate a queue in depqueue_done + self._depqueue_done [channel.ident] = queue.Queue() - # register channel and allocate a result counter in depdone - self._depdone [channel.ident] = PseudoAtomicCounter (0) + channel.set_resolver ( + self, channel_queue=self._depqueue_done [channel.ident] + ) return channel # --- end of register_channel (...) --- def channel_closed ( self, channel_id ): - # TODO + """Tells the dependency resolver that a channel has been closed. + It will then unregister the channel; this operation does not fail if the + channel is not registered with this resolver. + + arguments: + * channel_id -- identifier of the closed channel + + returns: None (implicit) + """ # not removing channel_id's DepEnvs from the queues # 'cause this costs time - del self._depdone [channel_id] - # --- end of channel_closed (...) --- + try: + del self._depqueue_done [channel_id] + except KeyError as expected: + # ok + pass - def get_worker ( self, max_dep_resolve=0 ): - """Returns a dep resolver worker (thread). - -- Threading is not implemented, this method is just a reminder. - - arguments: - * max_dep_resolve -- if > 0 : worker stops after resolving # deps - if 0 : worker stops when queue is empty - else : worker does not stop unless told to do so - """ - raise Exception ( "DependencyResolver.get_worker(...) is TODO!" ) - # --- end of get_worker (...) --- + # --- end of channel_closed (...) --- def _queue_previously_failed ( self ): """Inserts all previously failed dependency lookups into the queue @@ -232,34 +220,50 @@ class DependencyResolver ( object ): while not self._depqueue_failed.empty(): # it has to be guaranteed that no items are removed from # _depqueue_failed while calling this method, - # else Queue.Empty will be raised + # else queue.Empty will be raised self._depqueue.put ( self._depqueue_failed.get_nowait() ) # --- end of _queue_previously_failed (...) --- - def start ( self ): - """Tells the resolver to run.""" if not self.runlock.acquire ( False ): # already running return True # -- + if DependencyResolver.NUMTHREADS > 0: + # no need to wait for the old thread + self._mainthread = threading.Thread ( target=self._thread_run_main ) + self._mainthread.start() + else: + self._thread_run_main() + + # self.runlock is released when _thread_run_main is done + # --- end of start (...) --- + + def _thread_run_main ( self ): + """Tells the resolver to run.""" + jobcount = DependencyResolver.NUMTHREADS if jobcount < 1: - if jobcount < 0: - self.logger.warning ( "Running in sequential mode." ) - else: - self.logger.debug ( "Running in sequential mode." ) - self.thread_run () + ( self.logger.warning if jobcount < 0 else self.logger.debug ) ( + "Running in sequential mode." + ) + self._thread_run_resolve() else: + + # wait for old threads + if not self._threads is None: + self.logger.warning ( "Waiting for old threads..." ) + for t in self._threads: t.join() + self.logger.warning ( "Running in concurrent mode with %i jobs." % jobcount ) # create threads, self._threads = [ - threading.Thread ( target=self.thread_run ) + threading.Thread ( target=self._thread_run_resolve ) for n in range (jobcount) ] # run them @@ -273,18 +277,33 @@ class DependencyResolver ( object ): # iterate over _depqueue_failed and report unresolved - while not ( self._depqueue_failed.empty() ): + ## todo can thread this + while not self._depqueue_failed.empty(): + try: + channel_id, dep_env = self._depqueue_failed.get_nowait() - channel_id, dep_env = self._depqueue_failed.get_nowait() - dep_env.set_unresolvable() - self._depdone [channel_id].increment_and_get() + dep_env.set_unresolvable() - self._report_event ( 'UNRESOLVABLE', dep_env ) + self._report_event ( 'UNRESOLVABLE', dep_env ) + + if channel_id in self._depqueue_done: + ## todo/fixme/whatever: this 'if' can filter out channels that have + ## been added again + self._depqueue_done [channel_id].put ( dep_env ) + + except queue.Empty: + # race cond empty() <-> get_nowait() + break + except KeyError: + # channel has been closed before calling put, ignore this err + pass + # release the lock self.runlock.release() - # --- end of start (...) --- - def thread_run ( self ): + # --- end of _thread_run_main (...) --- + + def _thread_run_resolve ( self ): """Resolves dependencies (thread target). returns: None (implicit) @@ -293,7 +312,7 @@ class DependencyResolver ( object ): while not self._depqueue.empty(): try: - to_resolve = self._depqueue.get_nowait() + to_resolve = self._depqueue.get_nowait() except queue.Empty: # this thread is done when the queue is empty, so this is # no error, but just the result of the race condition between @@ -302,49 +321,64 @@ class DependencyResolver ( object ): channel_id, dep_env = to_resolve - if channel_id in self._depdone: + if channel_id in self._depqueue_done: # else channel has been closed, drop dep self.logger.debug ( "Trying to resolve '%s'." % dep_env.dep_str ) #have_new_rule = False - # resolved can be None, so use a bool for checking resolved = None - is_resolved = False + # resolved can be None, so use a tri-state int for checking + # 0 -> unresolved, but resolvable + # 1 -> unresolved and (currently, new rules may change this) + # not resolvable + # 2 -> resolved + is_resolved = 0 - # search for a match in the rule pools - for rulepool in self.static_rule_pools: - result = rulepool.matches ( dep_env ) - if not result is None and result [0] > 0: - resolved = result [1] - is_resolved = True - break + # TODO: + # (threading: could search the pools in parallel) +# if dep_env.dep_str_low in self._dep_unresolvable: +# # cannot resolve +# is_resolved = 1 + if is_resolved == 0: + # search for a match in the rule pools + for rulepool in self.static_rule_pools: + result = rulepool.matches ( dep_env ) + if not result is None and result [0] > 0: + resolved = result [1] + is_resolved = 2 + break - if is_resolved: - dep_env.set_resolved ( resolved, append=False ) - self._depdone [channel_id].increment_and_get() + + if is_resolved == 2: + dep_env.set_resolved ( resolved, append=False ) self._report_event ( 'RESOLVED', dep_env ) + self._depqueue_done [channel_id].put ( dep_env ) else: self._depqueue_failed.put ( to_resolve ) +# # does not work when adding new rules is possible +# self._dep_unresolvable.add ( dep_env.dep_str_low ) + """ ## only useful if new rules can be created # new rule found, requeue all previously failed dependency searches if have_new_rule: self._queue_previously_failed + self._dep_unresolvable.clear() #? """ - # --- end if channel_id in self._depdone + # --- end if channel_id in self._depqueue_done self._depqueue.task_done() # --- end while - # --- end of thread_run (...) --- + # --- end of _thread_run_resolve (...) --- def enqueue ( self, dep_env, channel_id ): """Adds a DepEnv to the queue of deps to resolve. @@ -359,22 +393,9 @@ class DependencyResolver ( object ): # --- end of enqueue (...) --- - def done ( self, channel_id, numdeps ): - """Returns True if channel_id exists in depdone and at least numdeps - dependencies have been resolved for that channel. - - arguments: - * channel_id -- - * numdeps -- - """ - - if channel_id in self._depdone: - return self._depdone [channel_id] >= numdeps - else: - return False - # --- end of done (...) --- - def close ( self ): + if isinstance ( self._mainthread, threading.Thread ): + self._mainthread.join() for lis in self.listeners: lis.close() del self.listeners # --- end of close (...) --- |