aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndré Erdmann <dywi@mailerd.de>2012-06-12 19:07:23 +0200
committerAndré Erdmann <dywi@mailerd.de>2012-06-12 19:07:23 +0200
commitadb14edc11a41eea30f1897e807c0cedfb80aa99 (patch)
tree89d29df0b5f9a33691db999128e579e690829a68 /roverlay/depres/depresolver.py
parentdepres listeners, basic threading, misc fixes (diff)
downloadR_overlay-adb14edc11a41eea30f1897e807c0cedfb80aa99.tar.gz
R_overlay-adb14edc11a41eea30f1897e807c0cedfb80aa99.tar.bz2
R_overlay-adb14edc11a41eea30f1897e807c0cedfb80aa99.zip
dependency resolution
* added a 'negative' result cache that stores unresolvable deps * improved threading: channels now use a queue to wait for results from the resolver instead of inefficient looping until all deps are done * remove unused code modified: roverlay/depres/channels.py modified: roverlay/depres/communication.py modified: roverlay/depres/depenv.py modified: roverlay/depres/depresolver.py modified: roverlay/depres/simpledeprule.py
Diffstat (limited to 'roverlay/depres/depresolver.py')
-rw-r--r--roverlay/depres/depresolver.py221
1 files changed, 121 insertions, 100 deletions
diff --git a/roverlay/depres/depresolver.py b/roverlay/depres/depresolver.py
index a7f9dfb..299d3c6 100644
--- a/roverlay/depres/depresolver.py
+++ b/roverlay/depres/depresolver.py
@@ -2,8 +2,6 @@
# Copyright 2006-2012 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
-# todo depres result cache
-
import logging
import threading
@@ -14,39 +12,17 @@ except ImportError:
import Queue as queue
-from roverlay import config
-from roverlay.depres import simpledeprule, communication, events
-from roverlay.depres.worker import DepResWorker
+from roverlay import config
+from roverlay.depres import simpledeprule, communication, events
#from roverlay.depres.depenv import DepEnv (implicit)
-class PseudoAtomicCounter ( object ):
- def __init__ ( self, number=0 ):
- self.nlock = threading.Lock()
- self._number = number
- # --- end of __init__ (...) ---
-
- def increment_and_get ( self, step=1 ):
- with self.nlock:
- self._number += step
- ret = self._number
- return ret
- # --- end of increment_and_get (...) ---
-
- def get ( self ): return self._number
-
- def __ge__ ( self, other_int ): return self._number >= other_int
- def __gt__ ( self, other_int ): return self._number > other_int
- def __le__ ( self, other_int ): return self._number <= other_int
- def __lt__ ( self, other_int ): return self._number < other_int
-
-
class DependencyResolver ( object ):
"""Main object for dependency resolution."""
LOGGER = logging.getLogger ( "DependencyResolver" )
- NUMTHREADS = config.get ( "DEPRES.jobcount", 0 )
+ NUMTHREADS = config.get ( "DEPRES.jobcount", 2 )
def __init__ ( self ):
"""Initializes a DependencyResolver."""
@@ -61,8 +37,13 @@ class DependencyResolver ( object ):
'RESOLVED', 'UNRESOLVABLE'
)
- self.runlock = threading.Lock()
- self._threads = None
+ # this lock tells whether a dep res 'master' thread is running (locked)
+ self.runlock = threading.Lock()
+ # the dep res main thread
+ self._mainthread = None
+ # the dep res worker threads
+ self._threads = None
+
# the list of registered listener modules
self.listeners = list ()
@@ -76,12 +57,16 @@ class DependencyResolver ( object ):
# or marked as unresolvable
self._depqueue_failed = queue.Queue()
- # map: channel identifier -> number of done deps (resolved/unresolvable)
+ # the 'negative' result cache, stores unresolvable deps
+ # has to be (selectively?) cleared when
+ # new dep rule found / new rulepool etc.
+ self._dep_unresolvable = set ()
+
+ # map: channel identifier -> queue of done deps (resolved/unresolvable)
# this serves two purposes:
- # (a) obviously: the number of resolved deps which is useful for channels
- # (b) the keys of this dict is the list of known channels
- #
- self._depdone = dict ()
+ # (a) channels can do a blocking call on this queue
+ # (b) the keys of this dict are the list of known channels
+ self._depqueue_done = dict ()
# list of rule pools that have been created from reading files
self.static_rule_pools = list ()
@@ -103,6 +88,7 @@ class DependencyResolver ( object ):
* pool_type -- ignored.
"""
self.static_rule_pools.append ( rulepool )
+ self._dep_unresolvable.clear()
self._sort()
# --- end of add_rulepool (...) ---
@@ -191,37 +177,39 @@ class DependencyResolver ( object ):
returns: channel
"""
- if channel in self._depdone:
+ if channel in self._depqueue_done:
raise Exception ( "channel is already registered." )
- if channel._depres_master is None:
- channel._depres_master = self
+ # register channel and allocate a queue in depqueue_done
+ self._depqueue_done [channel.ident] = queue.Queue()
- # register channel and allocate a result counter in depdone
- self._depdone [channel.ident] = PseudoAtomicCounter (0)
+ channel.set_resolver (
+ self, channel_queue=self._depqueue_done [channel.ident]
+ )
return channel
# --- end of register_channel (...) ---
def channel_closed ( self, channel_id ):
- # TODO
+ """Tells the dependency resolver that a channel has been closed.
+ It will then unregister the channel; this operation does not fail if the
+ channel is not registered with this resolver.
+
+ arguments:
+ * channel_id -- identifier of the closed channel
+
+ returns: None (implicit)
+ """
# not removing channel_id's DepEnvs from the queues
# 'cause this costs time
- del self._depdone [channel_id]
- # --- end of channel_closed (...) ---
+ try:
+ del self._depqueue_done [channel_id]
+ except KeyError as expected:
+ # ok
+ pass
- def get_worker ( self, max_dep_resolve=0 ):
- """Returns a dep resolver worker (thread).
- -- Threading is not implemented, this method is just a reminder.
-
- arguments:
- * max_dep_resolve -- if > 0 : worker stops after resolving # deps
- if 0 : worker stops when queue is empty
- else : worker does not stop unless told to do so
- """
- raise Exception ( "DependencyResolver.get_worker(...) is TODO!" )
- # --- end of get_worker (...) ---
+ # --- end of channel_closed (...) ---
def _queue_previously_failed ( self ):
"""Inserts all previously failed dependency lookups into the queue
@@ -232,34 +220,50 @@ class DependencyResolver ( object ):
while not self._depqueue_failed.empty():
# it has to be guaranteed that no items are removed from
# _depqueue_failed while calling this method,
- # else Queue.Empty will be raised
+ # else queue.Empty will be raised
self._depqueue.put ( self._depqueue_failed.get_nowait() )
# --- end of _queue_previously_failed (...) ---
-
def start ( self ):
- """Tells the resolver to run."""
if not self.runlock.acquire ( False ):
# already running
return True
# --
+ if DependencyResolver.NUMTHREADS > 0:
+ # no need to wait for the old thread
+ self._mainthread = threading.Thread ( target=self._thread_run_main )
+ self._mainthread.start()
+ else:
+ self._thread_run_main()
+
+ # self.runlock is released when _thread_run_main is done
+ # --- end of start (...) ---
+
+ def _thread_run_main ( self ):
+ """Tells the resolver to run."""
+
jobcount = DependencyResolver.NUMTHREADS
if jobcount < 1:
- if jobcount < 0:
- self.logger.warning ( "Running in sequential mode." )
- else:
- self.logger.debug ( "Running in sequential mode." )
- self.thread_run ()
+ ( self.logger.warning if jobcount < 0 else self.logger.debug ) (
+ "Running in sequential mode."
+ )
+ self._thread_run_resolve()
else:
+
+ # wait for old threads
+ if not self._threads is None:
+ self.logger.warning ( "Waiting for old threads..." )
+ for t in self._threads: t.join()
+
self.logger.warning (
"Running in concurrent mode with %i jobs." % jobcount
)
# create threads,
self._threads = [
- threading.Thread ( target=self.thread_run )
+ threading.Thread ( target=self._thread_run_resolve )
for n in range (jobcount)
]
# run them
@@ -273,18 +277,33 @@ class DependencyResolver ( object ):
# iterate over _depqueue_failed and report unresolved
- while not ( self._depqueue_failed.empty() ):
+ ## todo can thread this
+ while not self._depqueue_failed.empty():
+ try:
+ channel_id, dep_env = self._depqueue_failed.get_nowait()
- channel_id, dep_env = self._depqueue_failed.get_nowait()
- dep_env.set_unresolvable()
- self._depdone [channel_id].increment_and_get()
+ dep_env.set_unresolvable()
- self._report_event ( 'UNRESOLVABLE', dep_env )
+ self._report_event ( 'UNRESOLVABLE', dep_env )
+
+ if channel_id in self._depqueue_done:
+ ## todo/fixme/whatever: this 'if' can filter out channels that have
+ ## been added again
+ self._depqueue_done [channel_id].put ( dep_env )
+
+ except queue.Empty:
+ # race cond empty() <-> get_nowait()
+ break
+ except KeyError:
+ # channel has been closed before calling put, ignore this err
+ pass
+ # release the lock
self.runlock.release()
- # --- end of start (...) ---
- def thread_run ( self ):
+ # --- end of _thread_run_main (...) ---
+
+ def _thread_run_resolve ( self ):
"""Resolves dependencies (thread target).
returns: None (implicit)
@@ -293,7 +312,7 @@ class DependencyResolver ( object ):
while not self._depqueue.empty():
try:
- to_resolve = self._depqueue.get_nowait()
+ to_resolve = self._depqueue.get_nowait()
except queue.Empty:
# this thread is done when the queue is empty, so this is
# no error, but just the result of the race condition between
@@ -302,49 +321,64 @@ class DependencyResolver ( object ):
channel_id, dep_env = to_resolve
- if channel_id in self._depdone:
+ if channel_id in self._depqueue_done:
# else channel has been closed, drop dep
self.logger.debug ( "Trying to resolve '%s'." % dep_env.dep_str )
#have_new_rule = False
- # resolved can be None, so use a bool for checking
resolved = None
- is_resolved = False
+ # resolved can be None, so use a tri-state int for checking
+ # 0 -> unresolved, but resolvable
+ # 1 -> unresolved and (currently, new rules may change this)
+ # not resolvable
+ # 2 -> resolved
+ is_resolved = 0
- # search for a match in the rule pools
- for rulepool in self.static_rule_pools:
- result = rulepool.matches ( dep_env )
- if not result is None and result [0] > 0:
- resolved = result [1]
- is_resolved = True
- break
+ # TODO:
+ # (threading: could search the pools in parallel)
+# if dep_env.dep_str_low in self._dep_unresolvable:
+# # cannot resolve
+# is_resolved = 1
+ if is_resolved == 0:
+ # search for a match in the rule pools
+ for rulepool in self.static_rule_pools:
+ result = rulepool.matches ( dep_env )
+ if not result is None and result [0] > 0:
+ resolved = result [1]
+ is_resolved = 2
+ break
- if is_resolved:
- dep_env.set_resolved ( resolved, append=False )
- self._depdone [channel_id].increment_and_get()
+
+ if is_resolved == 2:
+ dep_env.set_resolved ( resolved, append=False )
self._report_event ( 'RESOLVED', dep_env )
+ self._depqueue_done [channel_id].put ( dep_env )
else:
self._depqueue_failed.put ( to_resolve )
+# # does not work when adding new rules is possible
+# self._dep_unresolvable.add ( dep_env.dep_str_low )
+
"""
## only useful if new rules can be created
# new rule found, requeue all previously failed dependency searches
if have_new_rule:
self._queue_previously_failed
+ self._dep_unresolvable.clear() #?
"""
- # --- end if channel_id in self._depdone
+ # --- end if channel_id in self._depqueue_done
self._depqueue.task_done()
# --- end while
- # --- end of thread_run (...) ---
+ # --- end of _thread_run_resolve (...) ---
def enqueue ( self, dep_env, channel_id ):
"""Adds a DepEnv to the queue of deps to resolve.
@@ -359,22 +393,9 @@ class DependencyResolver ( object ):
# --- end of enqueue (...) ---
- def done ( self, channel_id, numdeps ):
- """Returns True if channel_id exists in depdone and at least numdeps
- dependencies have been resolved for that channel.
-
- arguments:
- * channel_id --
- * numdeps --
- """
-
- if channel_id in self._depdone:
- return self._depdone [channel_id] >= numdeps
- else:
- return False
- # --- end of done (...) ---
-
def close ( self ):
+ if isinstance ( self._mainthread, threading.Thread ):
+ self._mainthread.join()
for lis in self.listeners: lis.close()
del self.listeners
# --- end of close (...) ---