diff options
Diffstat (limited to 'Lib/multiprocessing/managers.py')
-rw-r--r-- | Lib/multiprocessing/managers.py | 44 |
1 files changed, 23 insertions, 21 deletions
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py index 36cd650e2e9..f580e9ec1e0 100644 --- a/Lib/multiprocessing/managers.py +++ b/Lib/multiprocessing/managers.py @@ -19,11 +19,15 @@ import threading import array import queue -from traceback import format_exc -from multiprocessing import Process, current_process, active_children, Pool, util, connection -from multiprocessing.process import AuthenticationString -from multiprocessing.forking import Popen, ForkingPickler from time import time as _time +from traceback import format_exc + +from . import connection +from . import pool +from . import process +from . import popen +from . import reduction +from . import util # # Register some things for pickling @@ -31,16 +35,14 @@ from time import time as _time def reduce_array(a): return array.array, (a.typecode, a.tobytes()) -ForkingPickler.register(array.array, reduce_array) +reduction.register(array.array, reduce_array) view_types = [type(getattr({}, name)()) for name in ('items','keys','values')] if view_types[0] is not list: # only needed in Py3.0 def rebuild_as_list(obj): return list, (list(obj),) for view_type in view_types: - ForkingPickler.register(view_type, rebuild_as_list) - import copyreg - copyreg.pickle(view_type, rebuild_as_list) + reduction.register(view_type, rebuild_as_list) # # Type for identifying shared objects @@ -130,7 +132,7 @@ class Server(object): def __init__(self, registry, address, authkey, serializer): assert isinstance(authkey, bytes) self.registry = registry - self.authkey = AuthenticationString(authkey) + self.authkey = process.AuthenticationString(authkey) Listener, Client = listener_client[serializer] # do authentication later @@ -146,7 +148,7 @@ class Server(object): Run the server forever ''' self.stop_event = threading.Event() - current_process()._manager_server = self + process.current_process()._manager_server = self try: accepter = threading.Thread(target=self.accepter) accepter.daemon = True @@ -438,9 +440,9 @@ class BaseManager(object): def __init__(self, address=None, authkey=None, serializer='pickle'): if authkey is None: - authkey = current_process().authkey + authkey = process.current_process().authkey self._address = address # XXX not final address if eg ('', 0) - self._authkey = AuthenticationString(authkey) + self._authkey = process.AuthenticationString(authkey) self._state = State() self._state.value = State.INITIAL self._serializer = serializer @@ -476,7 +478,7 @@ class BaseManager(object): reader, writer = connection.Pipe(duplex=False) # spawn process which runs a server - self._process = Process( + self._process = process.Process( target=type(self)._run_server, args=(self._registry, self._address, self._authkey, self._serializer, writer, initializer, initargs), @@ -691,11 +693,11 @@ class BaseProxy(object): self._Client = listener_client[serializer][1] if authkey is not None: - self._authkey = AuthenticationString(authkey) + self._authkey = process.AuthenticationString(authkey) elif self._manager is not None: self._authkey = self._manager._authkey else: - self._authkey = current_process().authkey + self._authkey = process.current_process().authkey if incref: self._incref() @@ -704,7 +706,7 @@ class BaseProxy(object): def _connect(self): util.debug('making connection to manager') - name = current_process().name + name = process.current_process().name if threading.current_thread().name != 'MainThread': name += '|' + threading.current_thread().name conn = self._Client(self._token.address, authkey=self._authkey) @@ -798,7 +800,7 @@ class BaseProxy(object): def __reduce__(self): kwds = {} - if Popen.thread_is_spawning(): + if popen.get_spawning_popen() is not None: kwds['authkey'] = self._authkey if getattr(self, '_isauto', False): @@ -835,14 +837,14 @@ def RebuildProxy(func, token, serializer, kwds): If possible the shared object is returned, or otherwise a proxy for it. ''' - server = getattr(current_process(), '_manager_server', None) + server = getattr(process.current_process(), '_manager_server', None) if server and server.address == token.address: return server.id_to_obj[token.id][0] else: incref = ( kwds.pop('incref', True) and - not getattr(current_process(), '_inheriting', False) + not getattr(process.current_process(), '_inheriting', False) ) return func(token, serializer, incref=incref, **kwds) @@ -889,7 +891,7 @@ def AutoProxy(token, serializer, manager=None, authkey=None, if authkey is None and manager is not None: authkey = manager._authkey if authkey is None: - authkey = current_process().authkey + authkey = process.current_process().authkey ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed) proxy = ProxyType(token, serializer, manager=manager, authkey=authkey, @@ -1109,7 +1111,7 @@ SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore, AcquirerProxy) SyncManager.register('Condition', threading.Condition, ConditionProxy) SyncManager.register('Barrier', threading.Barrier, BarrierProxy) -SyncManager.register('Pool', Pool, PoolProxy) +SyncManager.register('Pool', pool.Pool, PoolProxy) SyncManager.register('list', list, ListProxy) SyncManager.register('dict', dict, DictProxy) SyncManager.register('Value', Value, ValueProxy) |