aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Doc/includes/mp_benchmarks.py239
-rw-r--r--Doc/includes/mp_newtype.py14
-rw-r--r--Doc/includes/mp_pool.py337
-rw-r--r--Doc/includes/mp_synchronize.py278
-rw-r--r--Doc/includes/mp_webserver.py70
-rw-r--r--Doc/includes/mp_workers.py13
-rw-r--r--Doc/library/multiprocessing.rst240
-rw-r--r--Doc/whatsnew/3.4.rst13
-rw-r--r--Lib/multiprocessing/__init__.py107
-rw-r--r--Lib/multiprocessing/connection.py61
-rw-r--r--Lib/multiprocessing/dummy/__init__.py4
-rw-r--r--Lib/multiprocessing/forking.py477
-rw-r--r--Lib/multiprocessing/forkserver.py238
-rw-r--r--Lib/multiprocessing/heap.py56
-rw-r--r--Lib/multiprocessing/managers.py44
-rw-r--r--Lib/multiprocessing/pool.py84
-rw-r--r--Lib/multiprocessing/popen.py78
-rw-r--r--Lib/multiprocessing/popen_fork.py87
-rw-r--r--Lib/multiprocessing/popen_forkserver.py75
-rw-r--r--Lib/multiprocessing/popen_spawn_posix.py75
-rw-r--r--Lib/multiprocessing/popen_spawn_win32.py102
-rw-r--r--Lib/multiprocessing/process.py60
-rw-r--r--Lib/multiprocessing/queues.py36
-rw-r--r--Lib/multiprocessing/reduction.py363
-rw-r--r--Lib/multiprocessing/resource_sharer.py158
-rw-r--r--Lib/multiprocessing/semaphore_tracker.py135
-rw-r--r--Lib/multiprocessing/sharedctypes.py7
-rw-r--r--Lib/multiprocessing/spawn.py258
-rw-r--r--Lib/multiprocessing/synchronize.py73
-rw-r--r--Lib/multiprocessing/util.py70
-rw-r--r--Lib/test/_test_multiprocessing.py (renamed from Lib/test/test_multiprocessing.py)481
-rw-r--r--Lib/test/mp_fork_bomb.py5
-rwxr-xr-xLib/test/regrtest.py2
-rw-r--r--Lib/test/test_multiprocessing_fork.py7
-rw-r--r--Lib/test/test_multiprocessing_forkserver.py7
-rw-r--r--Lib/test/test_multiprocessing_spawn.py7
-rw-r--r--Makefile.pre.in4
-rw-r--r--Modules/_multiprocessing/multiprocessing.c1
-rw-r--r--Modules/_multiprocessing/multiprocessing.h1
-rw-r--r--Modules/_multiprocessing/semaphore.c78
40 files changed, 2433 insertions, 2012 deletions
diff --git a/Doc/includes/mp_benchmarks.py b/Doc/includes/mp_benchmarks.py
deleted file mode 100644
index 3763ea94484..00000000000
--- a/Doc/includes/mp_benchmarks.py
+++ /dev/null
@@ -1,239 +0,0 @@
-#
-# Simple benchmarks for the multiprocessing package
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
-import time
-import multiprocessing
-import threading
-import queue
-import gc
-
-_timer = time.perf_counter
-
-delta = 1
-
-
-#### TEST_QUEUESPEED
-
-def queuespeed_func(q, c, iterations):
- a = '0' * 256
- c.acquire()
- c.notify()
- c.release()
-
- for i in range(iterations):
- q.put(a)
-
- q.put('STOP')
-
-def test_queuespeed(Process, q, c):
- elapsed = 0
- iterations = 1
-
- while elapsed < delta:
- iterations *= 2
-
- p = Process(target=queuespeed_func, args=(q, c, iterations))
- c.acquire()
- p.start()
- c.wait()
- c.release()
-
- result = None
- t = _timer()
-
- while result != 'STOP':
- result = q.get()
-
- elapsed = _timer() - t
-
- p.join()
-
- print(iterations, 'objects passed through the queue in', elapsed, 'seconds')
- print('average number/sec:', iterations/elapsed)
-
-
-#### TEST_PIPESPEED
-
-def pipe_func(c, cond, iterations):
- a = '0' * 256
- cond.acquire()
- cond.notify()
- cond.release()
-
- for i in range(iterations):
- c.send(a)
-
- c.send('STOP')
-
-def test_pipespeed():
- c, d = multiprocessing.Pipe()
- cond = multiprocessing.Condition()
- elapsed = 0
- iterations = 1
-
- while elapsed < delta:
- iterations *= 2
-
- p = multiprocessing.Process(target=pipe_func,
- args=(d, cond, iterations))
- cond.acquire()
- p.start()
- cond.wait()
- cond.release()
-
- result = None
- t = _timer()
-
- while result != 'STOP':
- result = c.recv()
-
- elapsed = _timer() - t
- p.join()
-
- print(iterations, 'objects passed through connection in',elapsed,'seconds')
- print('average number/sec:', iterations/elapsed)
-
-
-#### TEST_SEQSPEED
-
-def test_seqspeed(seq):
- elapsed = 0
- iterations = 1
-
- while elapsed < delta:
- iterations *= 2
-
- t = _timer()
-
- for i in range(iterations):
- a = seq[5]
-
- elapsed = _timer() - t
-
- print(iterations, 'iterations in', elapsed, 'seconds')
- print('average number/sec:', iterations/elapsed)
-
-
-#### TEST_LOCK
-
-def test_lockspeed(l):
- elapsed = 0
- iterations = 1
-
- while elapsed < delta:
- iterations *= 2
-
- t = _timer()
-
- for i in range(iterations):
- l.acquire()
- l.release()
-
- elapsed = _timer() - t
-
- print(iterations, 'iterations in', elapsed, 'seconds')
- print('average number/sec:', iterations/elapsed)
-
-
-#### TEST_CONDITION
-
-def conditionspeed_func(c, N):
- c.acquire()
- c.notify()
-
- for i in range(N):
- c.wait()
- c.notify()
-
- c.release()
-
-def test_conditionspeed(Process, c):
- elapsed = 0
- iterations = 1
-
- while elapsed < delta:
- iterations *= 2
-
- c.acquire()
- p = Process(target=conditionspeed_func, args=(c, iterations))
- p.start()
-
- c.wait()
-
- t = _timer()
-
- for i in range(iterations):
- c.notify()
- c.wait()
-
- elapsed = _timer() - t
-
- c.release()
- p.join()
-
- print(iterations * 2, 'waits in', elapsed, 'seconds')
- print('average number/sec:', iterations * 2 / elapsed)
-
-####
-
-def test():
- manager = multiprocessing.Manager()
-
- gc.disable()
-
- print('\n\t######## testing Queue.Queue\n')
- test_queuespeed(threading.Thread, queue.Queue(),
- threading.Condition())
- print('\n\t######## testing multiprocessing.Queue\n')
- test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
- multiprocessing.Condition())
- print('\n\t######## testing Queue managed by server process\n')
- test_queuespeed(multiprocessing.Process, manager.Queue(),
- manager.Condition())
- print('\n\t######## testing multiprocessing.Pipe\n')
- test_pipespeed()
-
- print()
-
- print('\n\t######## testing list\n')
- test_seqspeed(list(range(10)))
- print('\n\t######## testing list managed by server process\n')
- test_seqspeed(manager.list(list(range(10))))
- print('\n\t######## testing Array("i", ..., lock=False)\n')
- test_seqspeed(multiprocessing.Array('i', list(range(10)), lock=False))
- print('\n\t######## testing Array("i", ..., lock=True)\n')
- test_seqspeed(multiprocessing.Array('i', list(range(10)), lock=True))
-
- print()
-
- print('\n\t######## testing threading.Lock\n')
- test_lockspeed(threading.Lock())
- print('\n\t######## testing threading.RLock\n')
- test_lockspeed(threading.RLock())
- print('\n\t######## testing multiprocessing.Lock\n')
- test_lockspeed(multiprocessing.Lock())
- print('\n\t######## testing multiprocessing.RLock\n')
- test_lockspeed(multiprocessing.RLock())
- print('\n\t######## testing lock managed by server process\n')
- test_lockspeed(manager.Lock())
- print('\n\t######## testing rlock managed by server process\n')
- test_lockspeed(manager.RLock())
-
- print()
-
- print('\n\t######## testing threading.Condition\n')
- test_conditionspeed(threading.Thread, threading.Condition())
- print('\n\t######## testing multiprocessing.Condition\n')
- test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
- print('\n\t######## testing condition managed by a server process\n')
- test_conditionspeed(multiprocessing.Process, manager.Condition())
-
- gc.enable()
-
-if __name__ == '__main__':
- multiprocessing.freeze_support()
- test()
diff --git a/Doc/includes/mp_newtype.py b/Doc/includes/mp_newtype.py
index 729174363ed..1c796a56404 100644
--- a/Doc/includes/mp_newtype.py
+++ b/Doc/includes/mp_newtype.py
@@ -1,11 +1,3 @@
-#
-# This module shows how to use arbitrary callables with a subclass of
-# `BaseManager`.
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
@@ -27,12 +19,10 @@ def baz():
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
- _exposed_ = ('next', '__next__')
+ _exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
- return self._callmethod('next')
- def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
@@ -90,8 +80,6 @@ def test():
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
- print('op.getslice(range(10), 2, 6) =', op.getslice(list(range(10)), 2, 6))
- print('op.repeat(range(5), 3) =', op.repeat(list(range(5)), 3))
print('op._exposed_ =', op._exposed_)
##
diff --git a/Doc/includes/mp_pool.py b/Doc/includes/mp_pool.py
index 15784987540..11101e1a909 100644
--- a/Doc/includes/mp_pool.py
+++ b/Doc/includes/mp_pool.py
@@ -1,10 +1,3 @@
-#
-# A test of `multiprocessing.Pool` class
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
import multiprocessing
import time
import random
@@ -46,269 +39,115 @@ def noop(x):
#
def test():
- print('cpu_count() = %d\n' % multiprocessing.cpu_count())
-
- #
- # Create pool
- #
-
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
- pool = multiprocessing.Pool(PROCESSES)
- print('pool = %s' % pool)
- print()
-
- #
- # Tests
- #
-
- TASKS = [(mul, (i, 7)) for i in range(10)] + \
- [(plus, (i, 8)) for i in range(10)]
-
- results = [pool.apply_async(calculate, t) for t in TASKS]
- imap_it = pool.imap(calculatestar, TASKS)
- imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
-
- print('Ordered results using pool.apply_async():')
- for r in results:
- print('\t', r.get())
- print()
-
- print('Ordered results using pool.imap():')
- for x in imap_it:
- print('\t', x)
- print()
-
- print('Unordered results using pool.imap_unordered():')
- for x in imap_unordered_it:
- print('\t', x)
- print()
-
- print('Ordered results using pool.map() --- will block till complete:')
- for x in pool.map(calculatestar, TASKS):
- print('\t', x)
- print()
-
- #
- # Simple benchmarks
- #
-
- N = 100000
- print('def pow3(x): return x**3')
- t = time.time()
- A = list(map(pow3, range(N)))
- print('\tmap(pow3, range(%d)):\n\t\t%s seconds' % \
- (N, time.time() - t))
+ with multiprocessing.Pool(PROCESSES) as pool:
+ #
+ # Tests
+ #
- t = time.time()
- B = pool.map(pow3, range(N))
- print('\tpool.map(pow3, range(%d)):\n\t\t%s seconds' % \
- (N, time.time() - t))
+ TASKS = [(mul, (i, 7)) for i in range(10)] + \
+ [(plus, (i, 8)) for i in range(10)]
- t = time.time()
- C = list(pool.imap(pow3, range(N), chunksize=N//8))
- print('\tlist(pool.imap(pow3, range(%d), chunksize=%d)):\n\t\t%s' \
- ' seconds' % (N, N//8, time.time() - t))
+ results = [pool.apply_async(calculate, t) for t in TASKS]
+ imap_it = pool.imap(calculatestar, TASKS)
+ imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
- assert A == B == C, (len(A), len(B), len(C))
- print()
+ print('Ordered results using pool.apply_async():')
+ for r in results:
+ print('\t', r.get())
+ print()
- L = [None] * 1000000
- print('def noop(x): pass')
- print('L = [None] * 1000000')
+ print('Ordered results using pool.imap():')
+ for x in imap_it:
+ print('\t', x)
+ print()
- t = time.time()
- A = list(map(noop, L))
- print('\tmap(noop, L):\n\t\t%s seconds' % \
- (time.time() - t))
+ print('Unordered results using pool.imap_unordered():')
+ for x in imap_unordered_it:
+ print('\t', x)
+ print()
- t = time.time()
- B = pool.map(noop, L)
- print('\tpool.map(noop, L):\n\t\t%s seconds' % \
- (time.time() - t))
+ print('Ordered results using pool.map() --- will block till complete:')
+ for x in pool.map(calculatestar, TASKS):
+ print('\t', x)
+ print()
- t = time.time()
- C = list(pool.imap(noop, L, chunksize=len(L)//8))
- print('\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
- (len(L)//8, time.time() - t))
+ #
+ # Test error handling
+ #
- assert A == B == C, (len(A), len(B), len(C))
- print()
+ print('Testing error handling:')
- del A, B, C, L
-
- #
- # Test error handling
- #
-
- print('Testing error handling:')
-
- try:
- print(pool.apply(f, (5,)))
- except ZeroDivisionError:
- print('\tGot ZeroDivisionError as expected from pool.apply()')
- else:
- raise AssertionError('expected ZeroDivisionError')
-
- try:
- print(pool.map(f, list(range(10))))
- except ZeroDivisionError:
- print('\tGot ZeroDivisionError as expected from pool.map()')
- else:
- raise AssertionError('expected ZeroDivisionError')
-
- try:
- print(list(pool.imap(f, list(range(10)))))
- except ZeroDivisionError:
- print('\tGot ZeroDivisionError as expected from list(pool.imap())')
- else:
- raise AssertionError('expected ZeroDivisionError')
-
- it = pool.imap(f, list(range(10)))
- for i in range(10):
try:
- x = next(it)
+ print(pool.apply(f, (5,)))
except ZeroDivisionError:
- if i == 5:
- pass
- except StopIteration:
- break
+ print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
- if i == 5:
- raise AssertionError('expected ZeroDivisionError')
-
- assert i == 9
- print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
- print()
+ raise AssertionError('expected ZeroDivisionError')
- #
- # Testing timeouts
- #
-
- print('Testing ApplyResult.get() with timeout:', end=' ')
- res = pool.apply_async(calculate, TASKS[0])
- while 1:
- sys.stdout.flush()
try:
- sys.stdout.write('\n\t%s' % res.get(0.02))
- break
- except multiprocessing.TimeoutError:
- sys.stdout.write('.')
- print()
- print()
+ print(pool.map(f, list(range(10))))
+ except ZeroDivisionError:
+ print('\tGot ZeroDivisionError as expected from pool.map()')
+ else:
+ raise AssertionError('expected ZeroDivisionError')
- print('Testing IMapIterator.next() with timeout:', end=' ')
- it = pool.imap(calculatestar, TASKS)
- while 1:
- sys.stdout.flush()
try:
- sys.stdout.write('\n\t%s' % it.next(0.02))
- except StopIteration:
- break
- except multiprocessing.TimeoutError:
- sys.stdout.write('.')
- print()
- print()
-
- #
- # Testing callback
- #
-
- print('Testing callback:')
-
- A = []
- B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
-
- r = pool.apply_async(mul, (7, 8), callback=A.append)
- r.wait()
-
- r = pool.map_async(pow3, list(range(10)), callback=A.extend)
- r.wait()
-
- if A == B:
- print('\tcallbacks succeeded\n')
- else:
- print('\t*** callbacks failed\n\t\t%s != %s\n' % (A, B))
-
- #
- # Check there are no outstanding tasks
- #
-
- assert not pool._cache, 'cache = %r' % pool._cache
-
- #
- # Check close() methods
- #
-
- print('Testing close():')
-
- for worker in pool._pool:
- assert worker.is_alive()
-
- result = pool.apply_async(time.sleep, [0.5])
- pool.close()
- pool.join()
-
- assert result.get() is None
-
- for worker in pool._pool:
- assert not worker.is_alive()
-
- print('\tclose() succeeded\n')
-
- #
- # Check terminate() method
- #
-
- print('Testing terminate():')
-
- pool = multiprocessing.Pool(2)
- DELTA = 0.1
- ignore = pool.apply(pow3, [2])
- results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
- pool.terminate()
- pool.join()
-
- for worker in pool._pool:
- assert not worker.is_alive()
-
- print('\tterminate() succeeded\n')
-
- #
- # Check garbage collection
- #
-
- print('Testing garbage collection:')
-
- pool = multiprocessing.Pool(2)
- DELTA = 0.1
- processes = pool._pool
- ignore = pool.apply(pow3, [2])
- results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
-
- results = pool = None
-
- time.sleep(DELTA * 2)
-
- for worker in processes:
- assert not worker.is_alive()
-
- print('\tgarbage collection succeeded\n')
+ print(list(pool.imap(f, list(range(10)))))
+ except ZeroDivisionError:
+ print('\tGot ZeroDivisionError as expected from list(pool.imap())')
+ else:
+ raise AssertionError('expected ZeroDivisionError')
+
+ it = pool.imap(f, list(range(10)))
+ for i in range(10):
+ try:
+ x = next(it)
+ except ZeroDivisionError:
+ if i == 5:
+ pass
+ except StopIteration:
+ break
+ else:
+ if i == 5:
+ raise AssertionError('expected ZeroDivisionError')
+
+ assert i == 9
+ print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
+ print()
+
+ #
+ # Testing timeouts
+ #
+
+ print('Testing ApplyResult.get() with timeout:', end=' ')
+ res = pool.apply_async(calculate, TASKS[0])
+ while 1:
+ sys.stdout.flush()
+ try:
+ sys.stdout.write('\n\t%s' % res.get(0.02))
+ break
+ except multiprocessing.TimeoutError:
+ sys.stdout.write('.')
+ print()
+ print()
+
+ print('Testing IMapIterator.next() with timeout:', end=' ')
+ it = pool.imap(calculatestar, TASKS)
+ while 1:
+ sys.stdout.flush()
+ try:
+ sys.stdout.write('\n\t%s' % it.next(0.02))
+ except StopIteration:
+ break
+ except multiprocessing.TimeoutError:
+ sys.stdout.write('.')
+ print()
+ print()
if __name__ == '__main__':
multiprocessing.freeze_support()
-
- assert len(sys.argv) in (1, 2)
-
- if len(sys.argv) == 1 or sys.argv[1] == 'processes':
- print(' Using processes '.center(79, '-'))
- elif sys.argv[1] == 'threads':
- print(' Using threads '.center(79, '-'))
- import multiprocessing.dummy as multiprocessing
- else:
- print('Usage:\n\t%s [processes | threads]' % sys.argv[0])
- raise SystemExit(2)
-
test()
diff --git a/Doc/includes/mp_synchronize.py b/Doc/includes/mp_synchronize.py
deleted file mode 100644
index 81dbc387fc4..00000000000
--- a/Doc/includes/mp_synchronize.py
+++ /dev/null
@@ -1,278 +0,0 @@
-#
-# A test file for the `multiprocessing` package
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
-import time
-import sys
-import random
-from queue import Empty
-
-import multiprocessing # may get overwritten
-
-
-#### TEST_VALUE
-
-def value_func(running, mutex):
- random.seed()
- time.sleep(random.random()*4)
-
- mutex.acquire()
- print('\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished')
- running.value -= 1
- mutex.release()
-
-def test_value():
- TASKS = 10
- running = multiprocessing.Value('i', TASKS)
- mutex = multiprocessing.Lock()
-
- for i in range(TASKS):
- p = multiprocessing.Process(target=value_func, args=(running, mutex))
- p.start()
-
- while running.value > 0:
- time.sleep(0.08)
- mutex.acquire()
- print(running.value, end=' ')
- sys.stdout.flush()
- mutex.release()
-
- print()
- print('No more running processes')
-
-
-#### TEST_QUEUE
-
-def queue_func(queue):
- for i in range(30):
- time.sleep(0.5 * random.random())
- queue.put(i*i)
- queue.put('STOP')
-
-def test_queue():
- q = multiprocessing.Queue()
-
- p = multiprocessing.Process(target=queue_func, args=(q,))
- p.start()
-
- o = None
- while o != 'STOP':
- try:
- o = q.get(timeout=0.3)
- print(o, end=' ')
- sys.stdout.flush()
- except Empty:
- print('TIMEOUT')
-
- print()
-
-
-#### TEST_CONDITION
-
-def condition_func(cond):
- cond.acquire()
- print('\t' + str(cond))
- time.sleep(2)
- print('\tchild is notifying')
- print('\t' + str(cond))
- cond.notify()
- cond.release()
-
-def test_condition():
- cond = multiprocessing.Condition()
-
- p = multiprocessing.Process(target=condition_func, args=(cond,))
- print(cond)
-
- cond.acquire()
- print(cond)
- cond.acquire()
- print(cond)
-
- p.start()
-
- print('main is waiting')
- cond.wait()
- print('main has woken up')
-
- print(cond)
- cond.release()
- print(cond)
- cond.release()
-
- p.join()
- print(cond)
-
-
-#### TEST_SEMAPHORE
-
-def semaphore_func(sema, mutex, running):
- sema.acquire()
-
- mutex.acquire()
- running.value += 1
- print(running.value, 'tasks are running')
- mutex.release()
-
- random.seed()
- time.sleep(random.random()*2)
-
- mutex.acquire()
- running.value -= 1
- print('%s has finished' % multiprocessing.current_process())
- mutex.release()
-
- sema.release()
-
-def test_semaphore():
- sema = multiprocessing.Semaphore(3)
- mutex = multiprocessing.RLock()
- running = multiprocessing.Value('i', 0)
-
- processes = [
- multiprocessing.Process(target=semaphore_func,
- args=(sema, mutex, running))
- for i in range(10)
- ]
-
- for p in processes:
- p.start()
-
- for p in processes:
- p.join()
-
-
-#### TEST_JOIN_TIMEOUT
-
-def join_timeout_func():
- print('\tchild sleeping')
- time.sleep(5.5)
- print('\n\tchild terminating')
-
-def test_join_timeout():
- p = multiprocessing.Process(target=join_timeout_func)
- p.start()
-
- print('waiting for process to finish')
-
- while 1:
- p.join(timeout=1)
- if not p.is_alive():
- break
- print('.', end=' ')
- sys.stdout.flush()
-
-
-#### TEST_EVENT
-
-def event_func(event):
- print('\t%r is waiting' % multiprocessing.current_process())
- event.wait()
- print('\t%r has woken up' % multiprocessing.current_process())
-
-def test_event():
- event = multiprocessing.Event()
-
- processes = [multiprocessing.Process(target=event_func, args=(event,))
- for i in range(5)]
-
- for p in processes:
- p.start()
-
- print('main is sleeping')
- time.sleep(2)
-
- print('main is setting event')
- event.set()
-
- for p in processes:
- p.join()
-
-
-#### TEST_SHAREDVALUES
-
-def sharedvalues_func(values, arrays, shared_values, shared_arrays):
- for i in range(len(values)):
- v = values[i][1]
- sv = shared_values[i].value
- assert v == sv
-
- for i in range(len(values)):
- a = arrays[i][1]
- sa = list(shared_arrays[i][:])
- assert a == sa
-
- print('Tests passed')
-
-def test_sharedvalues():
- values = [
- ('i', 10),
- ('h', -2),
- ('d', 1.25)
- ]
- arrays = [
- ('i', list(range(100))),
- ('d', [0.25 * i for i in range(100)]),
- ('H', list(range(1000)))
- ]
-
- shared_values = [multiprocessing.Value(id, v) for id, v in values]
- shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
-
- p = multiprocessing.Process(
- target=sharedvalues_func,
- args=(values, arrays, shared_values, shared_arrays)
- )
- p.start()
- p.join()
-
- assert p.exitcode == 0
-
-
-####
-
-def test(namespace=multiprocessing):
- global multiprocessing
-
- multiprocessing = namespace
-
- for func in [test_value, test_queue, test_condition,
- test_semaphore, test_join_timeout, test_event,
- test_sharedvalues]:
-
- print('\n\t######## %s\n' % func.__name__)
- func()
-
- ignore = multiprocessing.active_children() # cleanup any old processes
- if hasattr(multiprocessing, '_debug_info'):
- info = multiprocessing._debug_info()
- if info:
- print(info)
- raise ValueError('there should be no positive refcounts left')
-
-
-if __name__ == '__main__':
- multiprocessing.freeze_support()
-
- assert len(sys.argv) in (1, 2)
-
- if len(sys.argv) == 1 or sys.argv[1] == 'processes':
- print(' Using processes '.center(79, '-'))
- namespace = multiprocessing
- elif sys.argv[1] == 'manager':
- print(' Using processes and a manager '.center(79, '-'))
- namespace = multiprocessing.Manager()
- namespace.Process = multiprocessing.Process
- namespace.current_process = multiprocessing.current_process
- namespace.active_children = multiprocessing.active_children
- elif sys.argv[1] == 'threads':
- print(' Using threads '.center(79, '-'))
- import multiprocessing.dummy as namespace
- else:
- print('Usage:\n\t%s [processes | manager | threads]' % sys.argv[0])
- raise SystemExit(2)
-
- test(namespace)
diff --git a/Doc/includes/mp_webserver.py b/Doc/includes/mp_webserver.py
deleted file mode 100644
index 651024d9393..00000000000
--- a/Doc/includes/mp_webserver.py
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Example where a pool of http servers share a single listening socket
-#
-# On Windows this module depends on the ability to pickle a socket
-# object so that the worker processes can inherit a copy of the server
-# object. (We import `multiprocessing.reduction` to enable this pickling.)
-#
-# Not sure if we should synchronize access to `socket.accept()` method by
-# using a process-shared lock -- does not seem to be necessary.
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
-import os
-import sys
-
-from multiprocessing import Process, current_process, freeze_support
-from http.server import HTTPServer
-from http.server import SimpleHTTPRequestHandler
-
-if sys.platform == 'win32':
- import multiprocessing.reduction # make sockets pickable/inheritable
-
-
-def note(format, *args):
- sys.stderr.write('[%s]\t%s\n' % (current_process().name, format % args))
-
-
-class RequestHandler(SimpleHTTPRequestHandler):
- # we override log_message() to show which process is handling the request
- def log_message(self, format, *args):
- note(format, *args)
-
-def serve_forever(server):
- note('starting server')
- try:
- server.serve_forever()
- except KeyboardInterrupt:
- pass
-
-
-def runpool(address, number_of_processes):
- # create a single server object -- children will each inherit a copy
- server = HTTPServer(address, RequestHandler)
-
- # create child processes to act as workers
- for i in range(number_of_processes - 1):
- Process(target=serve_forever, args=(server,)).start()
-
- # main process also acts as a worker
- serve_forever(server)
-
-
-def test():
- DIR = os.path.join(os.path.dirname(__file__), '..')
- ADDRESS = ('localhost', 8000)
- NUMBER_OF_PROCESSES = 4
-
- print('Serving at http://%s:%d using %d worker processes' % \
- (ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES))
- print('To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32'])
-
- os.chdir(DIR)
- runpool(ADDRESS, NUMBER_OF_PROCESSES)
-
-
-if __name__ == '__main__':
- freeze_support()
- test()
diff --git a/Doc/includes/mp_workers.py b/Doc/includes/mp_workers.py
index e66d97bd50b..3b92269f897 100644
--- a/Doc/includes/mp_workers.py
+++ b/Doc/includes/mp_workers.py
@@ -1,16 +1,3 @@
-#
-# Simple example which uses a pool of workers to carry out some tasks.
-#
-# Notice that the results will probably not come out of the output
-# queue in the same in the same order as the corresponding tasks were
-# put on the input queue. If it is important to get the results back
-# in the original order then consider using `Pool.map()` or
-# `Pool.imap()` (which will save on the amount of code needed anyway).
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
import time
import random
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst
index f659eaccbd3..f58c3081a82 100644
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -93,11 +93,80 @@ To show the individual process IDs involved, here is an expanded example::
p.start()
p.join()
-For an explanation of why (on Windows) the ``if __name__ == '__main__'`` part is
+For an explanation of why the ``if __name__ == '__main__'`` part is
necessary, see :ref:`multiprocessing-programming`.
+Start methods
+~~~~~~~~~~~~~
+
+Depending on the platform, :mod:`multiprocessing` supports three ways
+to start a process. These *start methods* are
+
+ *spawn*
+ The parent process starts a fresh python interpreter process. The
+ child process will only inherit those resources necessary to run
+ the process objects :meth:`~Process.run` method. In particular,
+ unnecessary file descriptors and handles from the parent process
+ will not be inherited. Starting a process using this method is
+ rather slow compared to using *fork* or *forkserver*.
+
+ Available on Unix and Windows. The default on Windows.
+
+ *fork*
+ The parent process uses :func:`os.fork` to fork the Python
+ interpreter. The child process, when it begins, is effectively
+ identical to the parent process. All resources of the parent are
+ inherited by the child process. Note that safely forking a
+ multithreaded process is problematic.
+
+ Available on Unix only. The default on Unix.
+
+ *forkserver*
+ When the program starts and selects the *forkserver* start method,
+ a server process is started. From then on, whenever a new process
+ is need the parent process connects to the server and requests
+ that it fork a new process. The fork server process is single
+ threaded so it is safe for it to use :func:`os.fork`. No
+ unnecessary resources are inherited.
+
+ Available on Unix platforms which support passing file descriptors
+ over unix pipes.
+
+Before Python 3.4 *fork* was the only option available on Unix. Also,
+prior to Python 3.4, child processes would inherit all the parents
+inheritable handles on Windows.
+
+On Unix using the *spawn* or *forkserver* start methods will also
+start a *semaphore tracker* process which tracks the unlinked named
+semaphores created by processes of the program. When all processes
+have exited the semaphore tracker unlinks any remaining semaphores.
+Usually there should be none, but if a process was killed by a signal
+there may some "leaked" semaphores. (Unlinking the named semaphores
+is a serious matter since the system allows only a limited number, and
+they will not be automatically unlinked until the next reboot.)
+
+To select the a start method you use the :func:`set_start_method` in
+the ``if __name__ == '__main__'`` clause of the main module. For
+example::
+
+ import multiprocessing as mp
+
+ def foo():
+ print('hello')
+
+ if __name__ == '__main__':
+ mp.set_start_method('spawn')
+ p = mp.Process(target=foo)
+ p.start()
+ p.join()
+
+:func:`set_start_method` should not be used more than once in the
+program.
+
+
+
Exchanging objects between processes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -274,15 +343,31 @@ processes in a few different ways.
For example::
from multiprocessing import Pool
+ from time import sleep
def f(x):
return x*x
if __name__ == '__main__':
- with Pool(processes=4) as pool: # start 4 worker processes
- result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
- print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
- print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
+ # start 4 worker processes
+ with Pool(processes=4) as pool:
+
+ # print "[0, 1, 4,..., 81]"
+ print(pool.map(f, range(10)))
+
+ # print same numbers in arbitrary order
+ for i in pool.imap_unordered(f, range(10)):
+ print(i)
+
+ # evaluate "f(10)" asynchronously
+ res = pool.apply_async(f, [10])
+ print(res.get(timeout=1)) # prints "100"
+
+ # make worker sleep for 10 secs
+ res = pool.apply_async(sleep, 10)
+ print(res.get(timeout=1)) # raises multiprocessing.TimeoutError
+
+ # exiting the 'with'-block has stopped the pool
Note that the methods of a pool should only ever be used by the
process which created it.
@@ -763,6 +848,24 @@ Miscellaneous
If the module is being run normally by the Python interpreter then
:func:`freeze_support` has no effect.
+.. function:: get_all_start_methods()
+
+ Returns a list of the supported start methods, the first of which
+ is the default. The possible start methods are ``'fork'``,
+ ``'spawn'`` and ``'forkserver'``. On Windows only ``'spawn'`` is
+ available. On Unix ``'fork'`` and ``'spawn'`` are always
+ supported, with ``'fork'`` being the default.
+
+ .. versionadded:: 3.4
+
+.. function:: get_start_method()
+
+ Return the current start method. This can be ``'fork'``,
+ ``'spawn'`` or ``'forkserver'``. ``'fork'`` is the default on
+ Unix, while ``'spawn'`` is the default on Windows.
+
+ .. versionadded:: 3.4
+
.. function:: set_executable()
Sets the path of the Python interpreter to use when starting a child process.
@@ -771,8 +874,21 @@ Miscellaneous
set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
- before they can create child processes. (Windows only)
+ before they can create child processes.
+
+ .. versionchanged:: 3.4
+ Now supported on Unix when the ``'spawn'`` start method is used.
+
+.. function:: set_start_method(method)
+
+ Set the method which should be used to start child processes.
+ *method* can be ``'fork'``, ``'spawn'`` or ``'forkserver'``.
+
+ Note that this should be called at most once, and it should be
+ protected inside the ``if __name__ == '__main__'`` clause of the
+ main module.
+ .. versionadded:: 3.4
.. note::
@@ -2175,43 +2291,8 @@ Below is an example session with logging turned on::
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
-In addition to having these two logging functions, the multiprocessing also
-exposes two additional logging level attributes. These are :const:`SUBWARNING`
-and :const:`SUBDEBUG`. The table below illustrates where theses fit in the
-normal level hierarchy.
-
-+----------------+----------------+
-| Level | Numeric value |
-+================+================+
-| ``SUBWARNING`` | 25 |
-+----------------+----------------+
-| ``SUBDEBUG`` | 5 |
-+----------------+----------------+
-
For a full table of logging levels, see the :mod:`logging` module.
-These additional logging levels are used primarily for certain debug messages
-within the multiprocessing module. Below is the same example as above, except
-with :const:`SUBDEBUG` enabled::
-
- >>> import multiprocessing, logging
- >>> logger = multiprocessing.log_to_stderr()
- >>> logger.setLevel(multiprocessing.SUBDEBUG)
- >>> logger.warning('doomed')
- [WARNING/MainProcess] doomed
- >>> m = multiprocessing.Manager()
- [INFO/SyncManager-...] child process calling self.run()
- [INFO/SyncManager-...] created temp directory /.../pymp-...
- [INFO/SyncManager-...] manager serving at '/.../pymp-djGBXN/listener-...'
- >>> del m
- [SUBDEBUG/MainProcess] finalizer calling ...
- [INFO/MainProcess] sending shutdown message to manager
- [DEBUG/SyncManager-...] manager received shutdown message
- [SUBDEBUG/SyncManager-...] calling <Finalize object, callback=unlink, ...
- [SUBDEBUG/SyncManager-...] finalizer calling <built-in function unlink> ...
- [SUBDEBUG/SyncManager-...] calling <Finalize object, dead>
- [SUBDEBUG/SyncManager-...] finalizer calling <function rmtree at 0x5aa730> ...
- [INFO/SyncManager-...] manager exiting with exitcode 0
The :mod:`multiprocessing.dummy` module
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -2232,8 +2313,10 @@ There are certain guidelines and idioms which should be adhered to when using
:mod:`multiprocessing`.
-All platforms
-~~~~~~~~~~~~~
+All start methods
+~~~~~~~~~~~~~~~~~
+
+The following applies to all start methods.
Avoid shared state
@@ -2266,11 +2349,13 @@ Joining zombie processes
Better to inherit than pickle/unpickle
- On Windows many types from :mod:`multiprocessing` need to be picklable so
- that child processes can use them. However, one should generally avoid
- sending shared objects to other processes using pipes or queues. Instead
- you should arrange the program so that a process which needs access to a
- shared resource created elsewhere can inherit it from an ancestor process.
+ When using the *spawn* or *forkserver* start methods many types
+ from :mod:`multiprocessing` need to be picklable so that child
+ processes can use them. However, one should generally avoid
+ sending shared objects to other processes using pipes or queues.
+ Instead you should arrange the program so that a process which
+ needs access to a shared resource created elsewhere can inherit it
+ from an ancestor process.
Avoid terminating processes
@@ -2314,15 +2399,17 @@ Joining processes that use queues
Explicitly pass resources to child processes
- On Unix a child process can make use of a shared resource created in a
- parent process using a global resource. However, it is better to pass the
- object as an argument to the constructor for the child process.
+ On Unix using the *fork* start method, a child process can make
+ use of a shared resource created in a parent process using a
+ global resource. However, it is better to pass the object as an
+ argument to the constructor for the child process.
- Apart from making the code (potentially) compatible with Windows this also
- ensures that as long as the child process is still alive the object will not
- be garbage collected in the parent process. This might be important if some
- resource is freed when the object is garbage collected in the parent
- process.
+ Apart from making the code (potentially) compatible with Windows
+ and the other start methods this also ensures that as long as the
+ child process is still alive the object will not be garbage
+ collected in the parent process. This might be important if some
+ resource is freed when the object is garbage collected in the
+ parent process.
So for instance ::
@@ -2381,17 +2468,19 @@ Beware of replacing :data:`sys.stdin` with a "file like object"
For more information, see :issue:`5155`, :issue:`5313` and :issue:`5331`
-Windows
-~~~~~~~
+The *spawn* and *forkserver* start methods
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-Since Windows lacks :func:`os.fork` it has a few extra restrictions:
+There are a few extra restriction which don't apply to the *fork*
+start method.
More picklability
- Ensure that all arguments to :meth:`Process.__init__` are picklable. This
- means, in particular, that bound or unbound methods cannot be used directly
- as the ``target`` argument on Windows --- just define a function and use
- that instead.
+ Ensure that all arguments to :meth:`Process.__init__` are
+ picklable. This means, in particular, that bound or unbound
+ methods cannot be used directly as the ``target`` (unless you use
+ the *fork* start method) --- just define a function and use that
+ instead.
Also, if you subclass :class:`Process` then make sure that instances will be
picklable when the :meth:`Process.start` method is called.
@@ -2411,7 +2500,8 @@ Safe importing of main module
interpreter without causing unintended side effects (such a starting a new
process).
- For example, under Windows running the following module would fail with a
+ For example, using the *spawn* or *forkserver* start method
+ running the following module would fail with a
:exc:`RuntimeError`::
from multiprocessing import Process
@@ -2425,13 +2515,14 @@ Safe importing of main module
Instead one should protect the "entry point" of the program by using ``if
__name__ == '__main__':`` as follows::
- from multiprocessing import Process, freeze_support
+ from multiprocessing import Process, freeze_support, set_start_method
def foo():
print('hello')
if __name__ == '__main__':
freeze_support()
+ set_start_method('spawn')
p = Process(target=foo)
p.start()
@@ -2462,26 +2553,7 @@ Using :class:`Pool`:
:language: python3
-Synchronization types like locks, conditions and queues:
-
-.. literalinclude:: ../includes/mp_synchronize.py
- :language: python3
-
-
An example showing how to use queues to feed tasks to a collection of worker
processes and collect the results:
.. literalinclude:: ../includes/mp_workers.py
-
-
-An example of how a pool of worker processes can each run a
-:class:`~http.server.SimpleHTTPRequestHandler` instance while sharing a single
-listening socket.
-
-.. literalinclude:: ../includes/mp_webserver.py
-
-
-Some simple benchmarks comparing :mod:`multiprocessing` with :mod:`threading`:
-
-.. literalinclude:: ../includes/mp_benchmarks.py
-
diff --git a/Doc/whatsnew/3.4.rst b/Doc/whatsnew/3.4.rst
index 937da00c57e..1c30d431994 100644
--- a/Doc/whatsnew/3.4.rst
+++ b/Doc/whatsnew/3.4.rst
@@ -108,6 +108,8 @@ Significantly Improved Library Modules:
* Single-dispatch generic functions (:pep:`443`)
* SHA-3 (Keccak) support for :mod:`hashlib`.
* TLSv1.1 and TLSv1.2 support for :mod:`ssl`.
+* :mod:`multiprocessing` now has option to avoid using :func:`os.fork`
+ on Unix (:issue:`8713`).
Security improvements:
@@ -254,6 +256,17 @@ mmap objects can now be weakref'ed.
(Contributed by Valerie Lambert in :issue:`4885`.)
+multiprocessing
+---------------
+
+On Unix two new *start methods* have been added for starting processes
+using :mod:`multiprocessing`. These make the mixing of processes with
+threads more robust. See :issue:`8713`.
+
+Also, except when using the old *fork* start method, child processes
+will no longer inherit unneeded handles/file descriptors from their parents.
+
+
poplib
------
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py
index b42613f8c2d..fd75839f580 100644
--- a/Lib/multiprocessing/__init__.py
+++ b/Lib/multiprocessing/__init__.py
@@ -21,6 +21,8 @@ __all__ = [
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
'Event', 'Barrier', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool',
'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING',
+ 'set_executable', 'set_start_method', 'get_start_method',
+ 'get_all_start_methods', 'set_forkserver_preload'
]
#
@@ -30,8 +32,14 @@ __all__ = [
import os
import sys
-from multiprocessing.process import Process, current_process, active_children
-from multiprocessing.util import SUBDEBUG, SUBWARNING
+from .process import Process, current_process, active_children
+
+#
+# XXX These should not really be documented or public.
+#
+
+SUBDEBUG = 5
+SUBWARNING = 25
#
# Alias for main module -- will be reset by bootstrapping child processes
@@ -56,8 +64,6 @@ class TimeoutError(ProcessError):
class AuthenticationError(ProcessError):
pass
-import _multiprocessing
-
#
# Definitions not depending on native semaphores
#
@@ -69,7 +75,7 @@ def Manager():
The managers methods such as `Lock()`, `Condition()` and `Queue()`
can be used to create shared objects.
'''
- from multiprocessing.managers import SyncManager
+ from .managers import SyncManager
m = SyncManager()
m.start()
return m
@@ -78,7 +84,7 @@ def Pipe(duplex=True):
'''
Returns two connection object connected by a pipe
'''
- from multiprocessing.connection import Pipe
+ from .connection import Pipe
return Pipe(duplex)
def cpu_count():
@@ -97,21 +103,21 @@ def freeze_support():
If so then run code specified by commandline and exit.
'''
if sys.platform == 'win32' and getattr(sys, 'frozen', False):
- from multiprocessing.forking import freeze_support
+ from .spawn import freeze_support
freeze_support()
def get_logger():
'''
Return package logger -- if it does not already exist then it is created
'''
- from multiprocessing.util import get_logger
+ from .util import get_logger
return get_logger()
def log_to_stderr(level=None):
'''
Turn on logging and add a handler which prints to stderr
'''
- from multiprocessing.util import log_to_stderr
+ from .util import log_to_stderr
return log_to_stderr(level)
def allow_connection_pickling():
@@ -120,7 +126,7 @@ def allow_connection_pickling():
'''
# This is undocumented. In previous versions of multiprocessing
# its only effect was to make socket objects inheritable on Windows.
- import multiprocessing.connection
+ from . import connection
#
# Definitions depending on native semaphores
@@ -130,120 +136,151 @@ def Lock():
'''
Returns a non-recursive lock object
'''
- from multiprocessing.synchronize import Lock
+ from .synchronize import Lock
return Lock()
def RLock():
'''
Returns a recursive lock object
'''
- from multiprocessing.synchronize import RLock
+ from .synchronize import RLock
return RLock()
def Condition(lock=None):
'''
Returns a condition object
'''
- from multiprocessing.synchronize import Condition
+ from .synchronize import Condition
return Condition(lock)
def Semaphore(value=1):
'''
Returns a semaphore object
'''
- from multiprocessing.synchronize import Semaphore
+ from .synchronize import Semaphore
return Semaphore(value)
def BoundedSemaphore(value=1):
'''
Returns a bounded semaphore object
'''
- from multiprocessing.synchronize import BoundedSemaphore
+ from .synchronize import BoundedSemaphore
return BoundedSemaphore(value)
def Event():
'''
Returns an event object
'''
- from multiprocessing.synchronize import Event
+ from .synchronize import Event
return Event()
def Barrier(parties, action=None, timeout=None):
'''
Returns a barrier object
'''
- from multiprocessing.synchronize import Barrier
+ from .synchronize import Barrier
return Barrier(parties, action, timeout)
def Queue(maxsize=0):
'''
Returns a queue object
'''
- from multiprocessing.queues import Queue
+ from .queues import Queue
return Queue(maxsize)
def JoinableQueue(maxsize=0):
'''
Returns a queue object
'''
- from multiprocessing.queues import JoinableQueue
+ from .queues import JoinableQueue
return JoinableQueue(maxsize)
def SimpleQueue():
'''
Returns a queue object
'''
- from multiprocessing.queues import SimpleQueue
+ from .queues import SimpleQueue
return SimpleQueue()
def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
'''
Returns a process pool object
'''
- from multiprocessing.pool import Pool
+ from .pool import Pool
return Pool(processes, initializer, initargs, maxtasksperchild)
def RawValue(typecode_or_type, *args):
'''
Returns a shared object
'''
- from multiprocessing.sharedctypes import RawValue
+ from .sharedctypes import RawValue
return RawValue(typecode_or_type, *args)
def RawArray(typecode_or_type, size_or_initializer):
'''
Returns a shared array
'''
- from multiprocessing.sharedctypes import RawArray
+ from .sharedctypes import RawArray
return RawArray(typecode_or_type, size_or_initializer)
def Value(typecode_or_type, *args, lock=True):
'''
Returns a synchronized shared object
'''
- from multiprocessing.sharedctypes import Value
+ from .sharedctypes import Value
return Value(typecode_or_type, *args, lock=lock)
def Array(typecode_or_type, size_or_initializer, *, lock=True):
'''
Returns a synchronized shared array
'''
- from multiprocessing.sharedctypes import Array
+ from .sharedctypes import Array
return Array(typecode_or_type, size_or_initializer, lock=lock)
#
#
#
-if sys.platform == 'win32':
+def set_executable(executable):
+ '''
+ Sets the path to a python.exe or pythonw.exe binary used to run
+ child processes instead of sys.executable when using the 'spawn'
+ start method. Useful for people embedding Python.
+ '''
+ from .spawn import set_executable
+ set_executable(executable)
+
+def set_start_method(method):
+ '''
+ Set method for starting processes: 'fork', 'spawn' or 'forkserver'.
+ '''
+ from .popen import set_start_method
+ set_start_method(method)
+
+def get_start_method():
+ '''
+ Get method for starting processes: 'fork', 'spawn' or 'forkserver'.
+ '''
+ from .popen import get_start_method
+ return get_start_method()
- def set_executable(executable):
- '''
- Sets the path to a python.exe or pythonw.exe binary used to run
- child processes on Windows instead of sys.executable.
- Useful for people embedding Python.
- '''
- from multiprocessing.forking import set_executable
- set_executable(executable)
+def get_all_start_methods():
+ '''
+ Get list of availables start methods, default first.
+ '''
+ from .popen import get_all_start_methods
+ return get_all_start_methods()
- __all__ += ['set_executable']
+def set_forkserver_preload(module_names):
+ '''
+ Set list of module names to try to load in the forkserver process
+ when it is started. Properly chosen this can significantly reduce
+ the cost of starting a new process using the forkserver method.
+ The default list is ['__main__'].
+ '''
+ try:
+ from .forkserver import set_forkserver_preload
+ except ImportError:
+ pass
+ else:
+ set_forkserver_preload(module_names)
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
index 1093d9fb97f..443fa7e138f 100644
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -21,9 +21,13 @@ import tempfile
import itertools
import _multiprocessing
-from multiprocessing import current_process, AuthenticationError, BufferTooShort
-from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
-from multiprocessing.forking import ForkingPickler
+
+from . import reduction
+from . import util
+
+from . import AuthenticationError, BufferTooShort
+from .reduction import ForkingPickler
+
try:
import _winapi
from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
@@ -71,7 +75,7 @@ def arbitrary_address(family):
if family == 'AF_INET':
return ('localhost', 0)
elif family == 'AF_UNIX':
- return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
+ return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
elif family == 'AF_PIPE':
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
(os.getpid(), next(_mmap_counter)))
@@ -505,7 +509,7 @@ if sys.platform != 'win32':
c1 = Connection(s1.detach())
c2 = Connection(s2.detach())
else:
- fd1, fd2 = os.pipe()
+ fd1, fd2 = util.pipe()
c1 = Connection(fd1, writable=False)
c2 = Connection(fd2, readable=False)
@@ -577,7 +581,7 @@ class SocketListener(object):
self._last_accepted = None
if family == 'AF_UNIX':
- self._unlink = Finalize(
+ self._unlink = util.Finalize(
self, os.unlink, args=(address,), exitpriority=0
)
else:
@@ -625,8 +629,8 @@ if sys.platform == 'win32':
self._handle_queue = [self._new_handle(first=True)]
self._last_accepted = None
- sub_debug('listener created with address=%r', self._address)
- self.close = Finalize(
+ util.sub_debug('listener created with address=%r', self._address)
+ self.close = util.Finalize(
self, PipeListener._finalize_pipe_listener,
args=(self._handle_queue, self._address), exitpriority=0
)
@@ -668,7 +672,7 @@ if sys.platform == 'win32':
@staticmethod
def _finalize_pipe_listener(queue, address):
- sub_debug('closing listener with address=%r', address)
+ util.sub_debug('closing listener with address=%r', address)
for handle in queue:
_winapi.CloseHandle(handle)
@@ -919,15 +923,32 @@ else:
#
if sys.platform == 'win32':
- from . import reduction
- ForkingPickler.register(socket.socket, reduction.reduce_socket)
- ForkingPickler.register(Connection, reduction.reduce_connection)
- ForkingPickler.register(PipeConnection, reduction.reduce_pipe_connection)
+ def reduce_connection(conn):
+ handle = conn.fileno()
+ with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
+ from . import resource_sharer
+ ds = resource_sharer.DupSocket(s)
+ return rebuild_connection, (ds, conn.readable, conn.writable)
+ def rebuild_connection(ds, readable, writable):
+ sock = ds.detach()
+ return Connection(sock.detach(), readable, writable)
+ reduction.register(Connection, reduce_connection)
+
+ def reduce_pipe_connection(conn):
+ access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
+ (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
+ dh = reduction.DupHandle(conn.fileno(), access)
+ return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
+ def rebuild_pipe_connection(dh, readable, writable):
+ handle = dh.detach()
+ return PipeConnection(handle, readable, writable)
+ reduction.register(PipeConnection, reduce_pipe_connection)
+
else:
- try:
- from . import reduction
- except ImportError:
- pass
- else:
- ForkingPickler.register(socket.socket, reduction.reduce_socket)
- ForkingPickler.register(Connection, reduction.reduce_connection)
+ def reduce_connection(conn):
+ df = reduction.DupFd(conn.fileno())
+ return rebuild_connection, (df, conn.readable, conn.writable)
+ def rebuild_connection(df, readable, writable):
+ fd = df.detach()
+ return Connection(fd, readable, writable)
+ reduction.register(Connection, reduce_connection)
diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py
index 20ae957b803..97f7af737ea 100644
--- a/Lib/multiprocessing/dummy/__init__.py
+++ b/Lib/multiprocessing/dummy/__init__.py
@@ -22,7 +22,7 @@ import sys
import weakref
import array
-from multiprocessing.dummy.connection import Pipe
+from .connection import Pipe
from threading import Lock, RLock, Semaphore, BoundedSemaphore
from threading import Event, Condition, Barrier
from queue import Queue
@@ -113,7 +113,7 @@ def shutdown():
pass
def Pool(processes=None, initializer=None, initargs=()):
- from multiprocessing.pool import ThreadPool
+ from ..pool import ThreadPool
return ThreadPool(processes, initializer, initargs)
JoinableQueue = Queue
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
deleted file mode 100644
index 39cfd8d6eee..00000000000
--- a/Lib/multiprocessing/forking.py
+++ /dev/null
@@ -1,477 +0,0 @@
-#
-# Module for starting a process object using os.fork() or CreateProcess()
-#
-# multiprocessing/forking.py
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# Licensed to PSF under a Contributor Agreement.
-#
-
-import io
-import os
-import pickle
-import sys
-import signal
-import errno
-
-from multiprocessing import util, process
-
-__all__ = ['Popen', 'assert_spawning', 'duplicate', 'close', 'ForkingPickler']
-
-#
-# Check that the current thread is spawning a child process
-#
-
-def assert_spawning(self):
- if not Popen.thread_is_spawning():
- raise RuntimeError(
- '%s objects should only be shared between processes'
- ' through inheritance' % type(self).__name__
- )
-
-#
-# Try making some callable types picklable
-#
-
-from pickle import Pickler
-from copyreg import dispatch_table
-
-class ForkingPickler(Pickler):
- _extra_reducers = {}
- def __init__(self, *args):
- Pickler.__init__(self, *args)
- self.dispatch_table = dispatch_table.copy()
- self.dispatch_table.update(self._extra_reducers)
- @classmethod
- def register(cls, type, reduce):
- cls._extra_reducers[type] = reduce
-
- @staticmethod
- def dumps(obj):
- buf = io.BytesIO()
- ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
- return buf.getbuffer()
-
- loads = pickle.loads
-
-
-def _reduce_method(m):
- if m.__self__ is None:
- return getattr, (m.__class__, m.__func__.__name__)
- else:
- return getattr, (m.__self__, m.__func__.__name__)
-class _C:
- def f(self):
- pass
-ForkingPickler.register(type(_C().f), _reduce_method)
-
-
-def _reduce_method_descriptor(m):
- return getattr, (m.__objclass__, m.__name__)
-ForkingPickler.register(type(list.append), _reduce_method_descriptor)
-ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
-
-try:
- from functools import partial
-except ImportError:
- pass
-else:
- def _reduce_partial(p):
- return _rebuild_partial, (p.func, p.args, p.keywords or {})
- def _rebuild_partial(func, args, keywords):
- return partial(func, *args, **keywords)
- ForkingPickler.register(partial, _reduce_partial)
-
-#
-# Unix
-#
-
-if sys.platform != 'win32':
- duplicate = os.dup
- close = os.close
-
- #
- # We define a Popen class similar to the one from subprocess, but
- # whose constructor takes a process object as its argument.
- #
-
- class Popen(object):
-
- def __init__(self, process_obj):
- sys.stdout.flush()
- sys.stderr.flush()
- self.returncode = None
-
- r, w = os.pipe()
- self.sentinel = r
-
- self.pid = os.fork()
- if self.pid == 0:
- os.close(r)
- if 'random' in sys.modules:
- import random
- random.seed()
- code = process_obj._bootstrap()
- os._exit(code)
-
- # `w` will be closed when the child exits, at which point `r`
- # will become ready for reading (using e.g. select()).
- os.close(w)
- util.Finalize(self, os.close, (r,))
-
- def poll(self, flag=os.WNOHANG):
- if self.returncode is None:
- while True:
- try:
- pid, sts = os.waitpid(self.pid, flag)
- except OSError as e:
- if e.errno == errno.EINTR:
- continue
- # Child process not yet created. See #1731717
- # e.errno == errno.ECHILD == 10
- return None
- else:
- break
- if pid == self.pid:
- if os.WIFSIGNALED(sts):
- self.returncode = -os.WTERMSIG(sts)
- else:
- assert os.WIFEXITED(sts)
- self.returncode = os.WEXITSTATUS(sts)
- return self.returncode
-
- def wait(self, timeout=None):
- if self.returncode is None:
- if timeout is not None:
- from .connection import wait
- if not wait([self.sentinel], timeout):
- return None
- # This shouldn't block if wait() returned successfully.
- return self.poll(os.WNOHANG if timeout == 0.0 else 0)
- return self.returncode
-
- def terminate(self):
- if self.returncode is None:
- try:
- os.kill(self.pid, signal.SIGTERM)
- except OSError:
- if self.wait(timeout=0.1) is None:
- raise
-
- @staticmethod
- def thread_is_spawning():
- return False
-
-#
-# Windows
-#
-
-else:
- import _thread
- import msvcrt
- import _winapi
-
- from pickle import load, HIGHEST_PROTOCOL
-
- def dump(obj, file, protocol=None):
- ForkingPickler(file, protocol).dump(obj)
-
- #
- #
- #
-
- TERMINATE = 0x10000
- WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
- WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
-
- close = _winapi.CloseHandle
-
- #
- # _python_exe is the assumed path to the python executable.
- # People embedding Python want to modify it.
- #
-
- if WINSERVICE:
- _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
- else:
- _python_exe = sys.executable
-
- def set_executable(exe):
- global _python_exe
- _python_exe = exe
-
- #
- #
- #
-
- def duplicate(handle, target_process=None, inheritable=False):
- if target_process is None:
- target_process = _winapi.GetCurrentProcess()
- return _winapi.DuplicateHandle(
- _winapi.GetCurrentProcess(), handle, target_process,
- 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS
- )
-
- #
- # We define a Popen class similar to the one from subprocess, but
- # whose constructor takes a process object as its argument.
- #
-
- class Popen(object):
- '''
- Start a subprocess to run the code of a process object
- '''
- _tls = _thread._local()
-
- def __init__(self, process_obj):
- cmd = ' '.join('"%s"' % x for x in get_command_line())
- prep_data = get_preparation_data(process_obj._name)
-
- # create pipe for communication with child
- rfd, wfd = os.pipe()
-
- # get handle for read end of the pipe and make it inheritable
- rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
- os.close(rfd)
-
- with open(wfd, 'wb', closefd=True) as to_child:
- # start process
- try:
- hp, ht, pid, tid = _winapi.CreateProcess(
- _python_exe, cmd + (' %s' % rhandle),
- None, None, 1, 0, None, None, None
- )
- _winapi.CloseHandle(ht)
- finally:
- close(rhandle)
-
- # set attributes of self
- self.pid = pid
- self.returncode = None
- self._handle = hp
- self.sentinel = int(hp)
- util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
-
- # send information to child
- Popen._tls.process_handle = int(hp)
- try:
- dump(prep_data, to_child, HIGHEST_PROTOCOL)
- dump(process_obj, to_child, HIGHEST_PROTOCOL)
- finally:
- del Popen._tls.process_handle
-
- @staticmethod
- def thread_is_spawning():
- return getattr(Popen._tls, 'process_handle', None) is not None
-
- @staticmethod
- def duplicate_for_child(handle):
- return duplicate(handle, Popen._tls.process_handle)
-
- def wait(self, timeout=None):
- if self.returncode is None:
- if timeout is None:
- msecs = _winapi.INFINITE
- else:
- msecs = max(0, int(timeout * 1000 + 0.5))
-
- res = _winapi.WaitForSingleObject(int(self._handle), msecs)
- if res == _winapi.WAIT_OBJECT_0:
- code = _winapi.GetExitCodeProcess(self._handle)
- if code == TERMINATE:
- code = -signal.SIGTERM
- self.returncode = code
-
- return self.returncode
-
- def poll(self):
- return self.wait(timeout=0)
-
- def terminate(self):
- if self.returncode is None:
- try:
- _winapi.TerminateProcess(int(self._handle), TERMINATE)
- except OSError:
- if self.wait(timeout=1.0) is None:
- raise
-
- #
- #
- #
-
- def is_forking(argv):
- '''
- Return whether commandline indicates we are forking
- '''
- if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
- assert len(argv) == 3
- return True
- else:
- return False
-
-
- def freeze_support():
- '''
- Run code for process object if this in not the main process
- '''
- if is_forking(sys.argv):
- main()
- sys.exit()
-
-
- def get_command_line():
- '''
- Returns prefix of command line used for spawning a child process
- '''
- if getattr(process.current_process(), '_inheriting', False):
- raise RuntimeError('''
- Attempt to start a new process before the current process
- has finished its bootstrapping phase.
-
- This probably means that you are on Windows and you have
- forgotten to use the proper idiom in the main module:
-
- if __name__ == '__main__':
- freeze_support()
- ...
-
- The "freeze_support()" line can be omitted if the program
- is not going to be frozen to produce a Windows executable.''')
-
- if getattr(sys, 'frozen', False):
- return [sys.executable, '--multiprocessing-fork']
- else:
- prog = 'from multiprocessing.forking import main; main()'
- opts = util._args_from_interpreter_flags()
- return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
-
-
- def main():
- '''
- Run code specifed by data received over pipe
- '''
- assert is_forking(sys.argv)
-
- handle = int(sys.argv[-1])
- fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
- from_parent = os.fdopen(fd, 'rb')
-
- process.current_process()._inheriting = True
- preparation_data = load(from_parent)
- prepare(preparation_data)
- self = load(from_parent)
- process.current_process()._inheriting = False
-
- from_parent.close()
-
- exitcode = self._bootstrap()
- sys.exit(exitcode)
-
-
- def get_preparation_data(name):
- '''
- Return info about parent needed by child to unpickle process object
- '''
- from .util import _logger, _log_to_stderr
-
- d = dict(
- name=name,
- sys_path=sys.path,
- sys_argv=sys.argv,
- log_to_stderr=_log_to_stderr,
- orig_dir=process.ORIGINAL_DIR,
- authkey=process.current_process().authkey,
- )
-
- if _logger is not None:
- d['log_level'] = _logger.getEffectiveLevel()
-
- if not WINEXE and not WINSERVICE:
- main_path = getattr(sys.modules['__main__'], '__file__', None)
- if not main_path and sys.argv[0] not in ('', '-c'):
- main_path = sys.argv[0]
- if main_path is not None:
- if not os.path.isabs(main_path) and \
- process.ORIGINAL_DIR is not None:
- main_path = os.path.join(process.ORIGINAL_DIR, main_path)
- d['main_path'] = os.path.normpath(main_path)
-
- return d
-
-#
-# Prepare current process
-#
-
-old_main_modules = []
-
-def prepare(data):
- '''
- Try to get current process ready to unpickle process object
- '''
- old_main_modules.append(sys.modules['__main__'])
-
- if 'name' in data:
- process.current_process().name = data['name']
-
- if 'authkey' in data:
- process.current_process()._authkey = data['authkey']
-
- if 'log_to_stderr' in data and data['log_to_stderr']:
- util.log_to_stderr()
-
- if 'log_level' in data:
- util.get_logger().setLevel(data['log_level'])
-
- if 'sys_path' in data:
- sys.path = data['sys_path']
-
- if 'sys_argv' in data:
- sys.argv = data['sys_argv']
-
- if 'dir' in data:
- os.chdir(data['dir'])
-
- if 'orig_dir' in data:
- process.ORIGINAL_DIR = data['orig_dir']
-
- if 'main_path' in data:
- # XXX (ncoghlan): The following code makes several bogus
- # assumptions regarding the relationship between __file__
- # and a module's real name. See PEP 302 and issue #10845
- main_path = data['main_path']
- main_name = os.path.splitext(os.path.basename(main_path))[0]
- if main_name == '__init__':
- main_name = os.path.basename(os.path.dirname(main_path))
-
- if main_name == '__main__':
- main_module = sys.modules['__main__']
- main_module.__file__ = main_path
- elif main_name != 'ipython':
- # Main modules not actually called __main__.py may
- # contain additional code that should still be executed
- import importlib
- import types
-
- if main_path is None:
- dirs = None
- elif os.path.basename(main_path).startswith('__init__.py'):
- dirs = [os.path.dirname(os.path.dirname(main_path))]
- else:
- dirs = [os.path.dirname(main_path)]
-
- assert main_name not in sys.modules, main_name
- sys.modules.pop('__mp_main__', None)
- # We should not try to load __main__
- # since that would execute 'if __name__ == "__main__"'
- # clauses, potentially causing a psuedo fork bomb.
- loader = importlib.find_loader(main_name, path=dirs)
- main_module = types.ModuleType(main_name)
- try:
- loader.init_module_attrs(main_module)
- except AttributeError: # init_module_attrs is optional
- pass
- main_module.__name__ = '__mp_main__'
- code = loader.get_code(main_name)
- exec(code, main_module.__dict__)
-
- sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
new file mode 100644
index 00000000000..628808e53f9
--- /dev/null
+++ b/Lib/multiprocessing/forkserver.py
@@ -0,0 +1,238 @@
+import errno
+import os
+import select
+import signal
+import socket
+import struct
+import sys
+import threading
+
+from . import connection
+from . import process
+from . import reduction
+from . import spawn
+from . import util
+
+__all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
+ 'set_forkserver_preload']
+
+#
+#
+#
+
+MAXFDS_TO_SEND = 256
+UNSIGNED_STRUCT = struct.Struct('Q') # large enough for pid_t
+
+_inherited_fds = None
+_lock = threading.Lock()
+_preload_modules = ['__main__']
+
+
+#
+# Public function
+#
+
+def set_forkserver_preload(modules_names):
+ '''Set list of module names to try to load in forkserver process.'''
+ global _preload_modules
+ _preload_modules = modules_names
+
+
+def get_inherited_fds():
+ '''Return list of fds inherited from parent process.
+
+ This returns None if the current process was not started by fork server.
+ '''
+ return _inherited_fds
+
+
+def connect_to_new_process(fds):
+ '''Request forkserver to create a child process.
+
+ Returns a pair of fds (status_r, data_w). The calling process can read
+ the child process's pid and (eventually) its returncode from status_r.
+ The calling process should write to data_w the pickled preparation and
+ process data.
+ '''
+ if len(fds) + 3 >= MAXFDS_TO_SEND:
+ raise ValueError('too many fds')
+ address, alive_w = process.current_process()._config['forkserver_info']
+ with socket.socket(socket.AF_UNIX) as client:
+ client.connect(address)
+ parent_r, child_w = util.pipe()
+ child_r, parent_w = util.pipe()
+ allfds = [child_r, child_w, alive_w]
+ allfds += fds
+ try:
+ reduction.sendfds(client, allfds)
+ return parent_r, parent_w
+ except:
+ os.close(parent_r)
+ os.close(parent_w)
+ raise
+ finally:
+ os.close(child_r)
+ os.close(child_w)
+
+
+def ensure_running():
+ '''Make sure that a fork server is running.
+
+ This can be called from any process. Note that usually a child
+ process will just reuse the forkserver started by its parent, so
+ ensure_running() will do nothing.
+ '''
+ with _lock:
+ config = process.current_process()._config
+ if config.get('forkserver_info') is not None:
+ return
+
+ assert all(type(mod) is str for mod in _preload_modules)
+ semaphore_tracker_fd = config['semaphore_tracker_fd']
+ cmd = ('from multiprocessing.forkserver import main; ' +
+ 'main(%d, %d, %r, **%r)')
+
+ if _preload_modules:
+ desired_keys = {'main_path', 'sys_path'}
+ data = spawn.get_preparation_data('ignore')
+ data = dict((x,y) for (x,y) in data.items() if x in desired_keys)
+ else:
+ data = {}
+
+ with socket.socket(socket.AF_UNIX) as listener:
+ address = connection.arbitrary_address('AF_UNIX')
+ listener.bind(address)
+ os.chmod(address, 0o600)
+ listener.listen(100)
+
+ # all client processes own the write end of the "alive" pipe;
+ # when they all terminate the read end becomes ready.
+ alive_r, alive_w = os.pipe()
+ config['forkserver_info'] = (address, alive_w)
+ fds_to_pass = [listener.fileno(), alive_r, semaphore_tracker_fd]
+ cmd %= (listener.fileno(), alive_r, _preload_modules, data)
+ exe = spawn.get_executable()
+ args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd]
+ pid = util.spawnv_passfds(exe, args, fds_to_pass)
+
+
+def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
+ '''Run forkserver.'''
+ if preload:
+ if '__main__' in preload and main_path is not None:
+ process.current_process()._inheriting = True
+ try:
+ spawn.import_main_path(main_path)
+ finally:
+ del process.current_process()._inheriting
+ for modname in preload:
+ try:
+ __import__(modname)
+ except ImportError:
+ pass
+
+ # close sys.stdin
+ if sys.stdin is not None:
+ try:
+ sys.stdin.close()
+ sys.stdin = open(os.devnull)
+ except (OSError, ValueError):
+ pass
+
+ # ignoring SIGCHLD means no need to reap zombie processes
+ handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN)
+ with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener:
+ readers = [listener, alive_r]
+
+ while True:
+ try:
+ rfds, wfds, xfds = select.select(readers, [], [])
+
+ if alive_r in rfds:
+ # EOF because no more client processes left
+ assert os.read(alive_r, 1) == b''
+ raise SystemExit
+
+ assert listener in rfds
+ with listener.accept()[0] as s:
+ code = 1
+ if os.fork() == 0:
+ try:
+ _serve_one(s, listener, alive_r, handler)
+ except Exception:
+ sys.excepthook(*sys.exc_info())
+ sys.stderr.flush()
+ finally:
+ os._exit(code)
+
+ except InterruptedError:
+ pass
+ except OSError as e:
+ if e.errno != errno.ECONNABORTED:
+ raise
+
+#
+# Code to bootstrap new process
+#
+
+def _serve_one(s, listener, alive_r, handler):
+ global _inherited_fds
+
+ # close unnecessary stuff and reset SIGCHLD handler
+ listener.close()
+ os.close(alive_r)
+ signal.signal(signal.SIGCHLD, handler)
+
+ # receive fds from parent process
+ fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
+ s.close()
+ assert len(fds) <= MAXFDS_TO_SEND
+ child_r, child_w, alive_w, *_inherited_fds = fds
+
+ # send pid to client processes
+ write_unsigned(child_w, os.getpid())
+
+ # reseed random number generator
+ if 'random' in sys.modules:
+ import random
+ random.seed()
+
+ # run process object received over pipe
+ code = spawn._main(child_r)
+
+ # write the exit code to the pipe
+ write_unsigned(child_w, code)
+
+#
+# Read and write unsigned numbers
+#
+
+def read_unsigned(fd):
+ data = b''
+ length = UNSIGNED_STRUCT.size
+ while len(data) < length:
+ while True:
+ try:
+ s = os.read(fd, length - len(data))
+ except InterruptedError:
+ pass
+ else:
+ break
+ if not s:
+ raise EOFError('unexpected EOF')
+ data += s
+ return UNSIGNED_STRUCT.unpack(data)[0]
+
+def write_unsigned(fd, n):
+ msg = UNSIGNED_STRUCT.pack(n)
+ while msg:
+ while True:
+ try:
+ nbytes = os.write(fd, msg)
+ except InterruptedError:
+ pass
+ else:
+ break
+ if nbytes == 0:
+ raise RuntimeError('should not get here')
+ msg = msg[nbytes:]
diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py
index e63fdb8755f..b95f90f5d1f 100644
--- a/Lib/multiprocessing/heap.py
+++ b/Lib/multiprocessing/heap.py
@@ -8,15 +8,17 @@
#
import bisect
+import itertools
import mmap
import os
import sys
+import tempfile
import threading
-import itertools
-
import _multiprocessing
-from multiprocessing.util import Finalize, info
-from multiprocessing.forking import assert_spawning
+
+from . import popen
+from . import reduction
+from . import util
__all__ = ['BufferWrapper']
@@ -30,17 +32,25 @@ if sys.platform == 'win32':
class Arena(object):
- _counter = itertools.count()
+ _rand = tempfile._RandomNameSequence()
def __init__(self, size):
self.size = size
- self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter))
- self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
- assert _winapi.GetLastError() == 0, 'tagname already in use'
+ for i in range(100):
+ name = 'pym-%d-%s' % (os.getpid(), next(self._rand))
+ buf = mmap.mmap(-1, size, tagname=name)
+ if _winapi.GetLastError() == 0:
+ break
+ # We have reopened a preexisting mmap.
+ buf.close()
+ else:
+ raise FileExistsError('Cannot find name for new mmap')
+ self.name = name
+ self.buffer = buf
self._state = (self.size, self.name)
def __getstate__(self):
- assert_spawning(self)
+ popen.assert_spawning(self)
return self._state
def __setstate__(self, state):
@@ -52,10 +62,28 @@ else:
class Arena(object):
- def __init__(self, size):
- self.buffer = mmap.mmap(-1, size)
+ def __init__(self, size, fd=-1):
self.size = size
- self.name = None
+ self.fd = fd
+ if fd == -1:
+ self.fd, name = tempfile.mkstemp(
+ prefix='pym-%d-'%os.getpid(), dir=util.get_temp_dir())
+ os.unlink(name)
+ util.Finalize(self, os.close, (self.fd,))
+ with open(self.fd, 'wb', closefd=False) as f:
+ f.write(b'\0'*size)
+ self.buffer = mmap.mmap(self.fd, self.size)
+
+ def reduce_arena(a):
+ if a.fd == -1:
+ raise ValueError('Arena is unpicklable because '
+ 'forking was enabled when it was created')
+ return rebuild_arena, (a.size, reduction.DupFd(a.fd))
+
+ def rebuild_arena(size, dupfd):
+ return Arena(size, dupfd.detach())
+
+ reduction.register(Arena, reduce_arena)
#
# Class allowing allocation of chunks of memory from arenas
@@ -90,7 +118,7 @@ class Heap(object):
if i == len(self._lengths):
length = self._roundup(max(self._size, size), mmap.PAGESIZE)
self._size *= 2
- info('allocating a new mmap of length %d', length)
+ util.info('allocating a new mmap of length %d', length)
arena = Arena(length)
self._arenas.append(arena)
return (arena, 0, length)
@@ -216,7 +244,7 @@ class BufferWrapper(object):
assert 0 <= size < sys.maxsize
block = BufferWrapper._heap.malloc(size)
self._state = (block, size)
- Finalize(self, BufferWrapper._heap.free, args=(block,))
+ util.Finalize(self, BufferWrapper._heap.free, args=(block,))
def create_memoryview(self):
(arena, start, stop), size = self._state
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
index 36cd650e2e9..f580e9ec1e0 100644
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -19,11 +19,15 @@ import threading
import array
import queue
-from traceback import format_exc
-from multiprocessing import Process, current_process, active_children, Pool, util, connection
-from multiprocessing.process import AuthenticationString
-from multiprocessing.forking import Popen, ForkingPickler
from time import time as _time
+from traceback import format_exc
+
+from . import connection
+from . import pool
+from . import process
+from . import popen
+from . import reduction
+from . import util
#
# Register some things for pickling
@@ -31,16 +35,14 @@ from time import time as _time
def reduce_array(a):
return array.array, (a.typecode, a.tobytes())
-ForkingPickler.register(array.array, reduce_array)
+reduction.register(array.array, reduce_array)
view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
if view_types[0] is not list: # only needed in Py3.0
def rebuild_as_list(obj):
return list, (list(obj),)
for view_type in view_types:
- ForkingPickler.register(view_type, rebuild_as_list)
- import copyreg
- copyreg.pickle(view_type, rebuild_as_list)
+ reduction.register(view_type, rebuild_as_list)
#
# Type for identifying shared objects
@@ -130,7 +132,7 @@ class Server(object):
def __init__(self, registry, address, authkey, serializer):
assert isinstance(authkey, bytes)
self.registry = registry
- self.authkey = AuthenticationString(authkey)
+ self.authkey = process.AuthenticationString(authkey)
Listener, Client = listener_client[serializer]
# do authentication later
@@ -146,7 +148,7 @@ class Server(object):
Run the server forever
'''
self.stop_event = threading.Event()
- current_process()._manager_server = self
+ process.current_process()._manager_server = self
try:
accepter = threading.Thread(target=self.accepter)
accepter.daemon = True
@@ -438,9 +440,9 @@ class BaseManager(object):
def __init__(self, address=None, authkey=None, serializer='pickle'):
if authkey is None:
- authkey = current_process().authkey
+ authkey = process.current_process().authkey
self._address = address # XXX not final address if eg ('', 0)
- self._authkey = AuthenticationString(authkey)
+ self._authkey = process.AuthenticationString(authkey)
self._state = State()
self._state.value = State.INITIAL
self._serializer = serializer
@@ -476,7 +478,7 @@ class BaseManager(object):
reader, writer = connection.Pipe(duplex=False)
# spawn process which runs a server
- self._process = Process(
+ self._process = process.Process(
target=type(self)._run_server,
args=(self._registry, self._address, self._authkey,
self._serializer, writer, initializer, initargs),
@@ -691,11 +693,11 @@ class BaseProxy(object):
self._Client = listener_client[serializer][1]
if authkey is not None:
- self._authkey = AuthenticationString(authkey)
+ self._authkey = process.AuthenticationString(authkey)
elif self._manager is not None:
self._authkey = self._manager._authkey
else:
- self._authkey = current_process().authkey
+ self._authkey = process.current_process().authkey
if incref:
self._incref()
@@ -704,7 +706,7 @@ class BaseProxy(object):
def _connect(self):
util.debug('making connection to manager')
- name = current_process().name
+ name = process.current_process().name
if threading.current_thread().name != 'MainThread':
name += '|' + threading.current_thread().name
conn = self._Client(self._token.address, authkey=self._authkey)
@@ -798,7 +800,7 @@ class BaseProxy(object):
def __reduce__(self):
kwds = {}
- if Popen.thread_is_spawning():
+ if popen.get_spawning_popen() is not None:
kwds['authkey'] = self._authkey
if getattr(self, '_isauto', False):
@@ -835,14 +837,14 @@ def RebuildProxy(func, token, serializer, kwds):
If possible the shared object is returned, or otherwise a proxy for it.
'''
- server = getattr(current_process(), '_manager_server', None)
+ server = getattr(process.current_process(), '_manager_server', None)
if server and server.address == token.address:
return server.id_to_obj[token.id][0]
else:
incref = (
kwds.pop('incref', True) and
- not getattr(current_process(), '_inheriting', False)
+ not getattr(process.current_process(), '_inheriting', False)
)
return func(token, serializer, incref=incref, **kwds)
@@ -889,7 +891,7 @@ def AutoProxy(token, serializer, manager=None, authkey=None,
if authkey is None and manager is not None:
authkey = manager._authkey
if authkey is None:
- authkey = current_process().authkey
+ authkey = process.current_process().authkey
ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
@@ -1109,7 +1111,7 @@ SyncManager.register('BoundedSemaphore', threading.BoundedSemaphore,
AcquirerProxy)
SyncManager.register('Condition', threading.Condition, ConditionProxy)
SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
-SyncManager.register('Pool', Pool, PoolProxy)
+SyncManager.register('Pool', pool.Pool, PoolProxy)
SyncManager.register('list', list, ListProxy)
SyncManager.register('dict', dict, DictProxy)
SyncManager.register('Value', Value, ValueProxy)
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 8082ad6f345..1cecd093848 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -7,7 +7,7 @@
# Licensed to PSF under a Contributor Agreement.
#
-__all__ = ['Pool']
+__all__ = ['Pool', 'ThreadPool']
#
# Imports
@@ -21,8 +21,10 @@ import os
import time
import traceback
-from multiprocessing import Process, TimeoutError
-from multiprocessing.util import Finalize, debug
+# If threading is available then ThreadPool should be provided. Therefore
+# we avoid top-level imports which are liable to fail on some systems.
+from . import util
+from . import Process, cpu_count, TimeoutError, SimpleQueue
#
# Constants representing the state of a pool
@@ -104,11 +106,11 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
try:
task = get()
except (EOFError, OSError):
- debug('worker got EOFError or OSError -- exiting')
+ util.debug('worker got EOFError or OSError -- exiting')
break
if task is None:
- debug('worker got sentinel -- exiting')
+ util.debug('worker got sentinel -- exiting')
break
job, i, func, args, kwds = task
@@ -121,11 +123,11 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None):
put((job, i, result))
except Exception as e:
wrapped = MaybeEncodingError(e, result[1])
- debug("Possible encoding error while sending result: %s" % (
+ util.debug("Possible encoding error while sending result: %s" % (
wrapped))
put((job, i, (False, wrapped)))
completed += 1
- debug('worker exiting after %d tasks' % completed)
+ util.debug('worker exiting after %d tasks' % completed)
#
# Class representing a process pool
@@ -184,7 +186,7 @@ class Pool(object):
self._result_handler._state = RUN
self._result_handler.start()
- self._terminate = Finalize(
+ self._terminate = util.Finalize(
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
self._worker_handler, self._task_handler,
@@ -201,7 +203,7 @@ class Pool(object):
worker = self._pool[i]
if worker.exitcode is not None:
# worker exited
- debug('cleaning up worker %d' % i)
+ util.debug('cleaning up worker %d' % i)
worker.join()
cleaned = True
del self._pool[i]
@@ -221,7 +223,7 @@ class Pool(object):
w.name = w.name.replace('Process', 'PoolWorker')
w.daemon = True
w.start()
- debug('added worker')
+ util.debug('added worker')
def _maintain_pool(self):
"""Clean up any exited workers and start replacements for them.
@@ -230,7 +232,6 @@ class Pool(object):
self._repopulate_pool()
def _setup_queues(self):
- from .queues import SimpleQueue
self._inqueue = SimpleQueue()
self._outqueue = SimpleQueue()
self._quick_put = self._inqueue._writer.send
@@ -358,7 +359,7 @@ class Pool(object):
time.sleep(0.1)
# send sentinel to stop workers
pool._taskqueue.put(None)
- debug('worker handler exiting')
+ util.debug('worker handler exiting')
@staticmethod
def _handle_tasks(taskqueue, put, outqueue, pool):
@@ -368,36 +369,36 @@ class Pool(object):
i = -1
for i, task in enumerate(taskseq):
if thread._state:
- debug('task handler found thread._state != RUN')
+ util.debug('task handler found thread._state != RUN')
break
try:
put(task)
except OSError:
- debug('could not put task on queue')
+ util.debug('could not put task on queue')
break
else:
if set_length:
- debug('doing set_length()')
+ util.debug('doing set_length()')
set_length(i+1)
continue
break
else:
- debug('task handler got sentinel')
+ util.debug('task handler got sentinel')
try:
# tell result handler to finish when cache is empty
- debug('task handler sending sentinel to result handler')
+ util.debug('task handler sending sentinel to result handler')
outqueue.put(None)
# tell workers there is no more work
- debug('task handler sending sentinel to workers')
+ util.debug('task handler sending sentinel to workers')
for p in pool:
put(None)
except OSError:
- debug('task handler got OSError when sending sentinels')
+ util.debug('task handler got OSError when sending sentinels')
- debug('task handler exiting')
+ util.debug('task handler exiting')
@staticmethod
def _handle_results(outqueue, get, cache):
@@ -407,16 +408,16 @@ class Pool(object):
try:
task = get()
except (OSError, EOFError):
- debug('result handler got EOFError/OSError -- exiting')
+ util.debug('result handler got EOFError/OSError -- exiting')
return
if thread._state:
assert thread._state == TERMINATE
- debug('result handler found thread._state=TERMINATE')
+ util.debug('result handler found thread._state=TERMINATE')
break
if task is None:
- debug('result handler got sentinel')
+ util.debug('result handler got sentinel')
break
job, i, obj = task
@@ -429,11 +430,11 @@ class Pool(object):
try:
task = get()
except (OSError, EOFError):
- debug('result handler got EOFError/OSError -- exiting')
+ util.debug('result handler got EOFError/OSError -- exiting')
return
if task is None:
- debug('result handler ignoring extra sentinel')
+ util.debug('result handler ignoring extra sentinel')
continue
job, i, obj = task
try:
@@ -442,7 +443,7 @@ class Pool(object):
pass
if hasattr(outqueue, '_reader'):
- debug('ensuring that outqueue is not full')
+ util.debug('ensuring that outqueue is not full')
# If we don't make room available in outqueue then
# attempts to add the sentinel (None) to outqueue may
# block. There is guaranteed to be no more than 2 sentinels.
@@ -454,7 +455,7 @@ class Pool(object):
except (OSError, EOFError):
pass
- debug('result handler exiting: len(cache)=%s, thread._state=%s',
+ util.debug('result handler exiting: len(cache)=%s, thread._state=%s',
len(cache), thread._state)
@staticmethod
@@ -472,19 +473,19 @@ class Pool(object):
)
def close(self):
- debug('closing pool')
+ util.debug('closing pool')
if self._state == RUN:
self._state = CLOSE
self._worker_handler._state = CLOSE
def terminate(self):
- debug('terminating pool')
+ util.debug('terminating pool')
self._state = TERMINATE
self._worker_handler._state = TERMINATE
self._terminate()
def join(self):
- debug('joining pool')
+ util.debug('joining pool')
assert self._state in (CLOSE, TERMINATE)
self._worker_handler.join()
self._task_handler.join()
@@ -495,7 +496,7 @@ class Pool(object):
@staticmethod
def _help_stuff_finish(inqueue, task_handler, size):
# task_handler may be blocked trying to put items on inqueue
- debug('removing tasks from inqueue until task handler finished')
+ util.debug('removing tasks from inqueue until task handler finished')
inqueue._rlock.acquire()
while task_handler.is_alive() and inqueue._reader.poll():
inqueue._reader.recv()
@@ -505,12 +506,12 @@ class Pool(object):
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
worker_handler, task_handler, result_handler, cache):
# this is guaranteed to only be called once
- debug('finalizing pool')
+ util.debug('finalizing pool')
worker_handler._state = TERMINATE
task_handler._state = TERMINATE
- debug('helping task handler/workers to finish')
+ util.debug('helping task handler/workers to finish')
cls._help_stuff_finish(inqueue, task_handler, len(pool))
assert result_handler.is_alive() or len(cache) == 0
@@ -520,31 +521,31 @@ class Pool(object):
# We must wait for the worker handler to exit before terminating
# workers because we don't want workers to be restarted behind our back.
- debug('joining worker handler')
+ util.debug('joining worker handler')
if threading.current_thread() is not worker_handler:
worker_handler.join()
# Terminate workers which haven't already finished.
if pool and hasattr(pool[0], 'terminate'):
- debug('terminating workers')
+ util.debug('terminating workers')
for p in pool:
if p.exitcode is None:
p.terminate()
- debug('joining task handler')
+ util.debug('joining task handler')
if threading.current_thread() is not task_handler:
task_handler.join()
- debug('joining result handler')
+ util.debug('joining result handler')
if threading.current_thread() is not result_handler:
result_handler.join()
if pool and hasattr(pool[0], 'terminate'):
- debug('joining pool workers')
+ util.debug('joining pool workers')
for p in pool:
if p.is_alive():
# worker has not yet exited
- debug('cleaning up worker %d' % p.pid)
+ util.debug('cleaning up worker %d' % p.pid)
p.join()
def __enter__(self):
@@ -730,7 +731,10 @@ class IMapUnorderedIterator(IMapIterator):
class ThreadPool(Pool):
- from .dummy import Process
+ @staticmethod
+ def Process(*args, **kwds):
+ from .dummy import Process
+ return Process(*args, **kwds)
def __init__(self, processes=None, initializer=None, initargs=()):
Pool.__init__(self, processes, initializer, initargs)
diff --git a/Lib/multiprocessing/popen.py b/Lib/multiprocessing/popen.py
new file mode 100644
index 00000000000..b0c80d5d87c
--- /dev/null
+++ b/Lib/multiprocessing/popen.py
@@ -0,0 +1,78 @@
+import sys
+import threading
+
+__all__ = ['Popen', 'get_spawning_popen', 'set_spawning_popen',
+ 'assert_spawning']
+
+#
+# Check that the current thread is spawning a child process
+#
+
+_tls = threading.local()
+
+def get_spawning_popen():
+ return getattr(_tls, 'spawning_popen', None)
+
+def set_spawning_popen(popen):
+ _tls.spawning_popen = popen
+
+def assert_spawning(obj):
+ if get_spawning_popen() is None:
+ raise RuntimeError(
+ '%s objects should only be shared between processes'
+ ' through inheritance' % type(obj).__name__
+ )
+
+#
+#
+#
+
+_Popen = None
+
+def Popen(process_obj):
+ if _Popen is None:
+ set_start_method()
+ return _Popen(process_obj)
+
+def get_start_method():
+ if _Popen is None:
+ set_start_method()
+ return _Popen.method
+
+def set_start_method(meth=None, *, start_helpers=True):
+ global _Popen
+ try:
+ modname = _method_to_module[meth]
+ __import__(modname)
+ except (KeyError, ImportError):
+ raise ValueError('could not use start method %r' % meth)
+ module = sys.modules[modname]
+ if start_helpers:
+ module.Popen.ensure_helpers_running()
+ _Popen = module.Popen
+
+
+if sys.platform == 'win32':
+
+ _method_to_module = {
+ None: 'multiprocessing.popen_spawn_win32',
+ 'spawn': 'multiprocessing.popen_spawn_win32',
+ }
+
+ def get_all_start_methods():
+ return ['spawn']
+
+else:
+ _method_to_module = {
+ None: 'multiprocessing.popen_fork',
+ 'fork': 'multiprocessing.popen_fork',
+ 'spawn': 'multiprocessing.popen_spawn_posix',
+ 'forkserver': 'multiprocessing.popen_forkserver',
+ }
+
+ def get_all_start_methods():
+ from . import reduction
+ if reduction.HAVE_SEND_HANDLE:
+ return ['fork', 'spawn', 'forkserver']
+ else:
+ return ['fork', 'spawn']
diff --git a/Lib/multiprocessing/popen_fork.py b/Lib/multiprocessing/popen_fork.py
new file mode 100644
index 00000000000..fd25ddc56d2
--- /dev/null
+++ b/Lib/multiprocessing/popen_fork.py
@@ -0,0 +1,87 @@
+import os
+import sys
+import signal
+import errno
+
+from . import util
+
+__all__ = ['Popen']
+
+#
+# Start child process using fork
+#
+
+class Popen(object):
+ method = 'fork'
+
+ def __init__(self, process_obj):
+ sys.stdout.flush()
+ sys.stderr.flush()
+ self.returncode = None
+ self._launch(process_obj)
+
+ def duplicate_for_child(self, fd):
+ return fd
+
+ def poll(self, flag=os.WNOHANG):
+ if self.returncode is None:
+ while True:
+ try:
+ pid, sts = os.waitpid(self.pid, flag)
+ except OSError as e:
+ if e.errno == errno.EINTR:
+ continue
+ # Child process not yet created. See #1731717
+ # e.errno == errno.ECHILD == 10
+ return None
+ else:
+ break
+ if pid == self.pid:
+ if os.WIFSIGNALED(sts):
+ self.returncode = -os.WTERMSIG(sts)
+ else:
+ assert os.WIFEXITED(sts)
+ self.returncode = os.WEXITSTATUS(sts)
+ return self.returncode
+
+ def wait(self, timeout=None):
+ if self.returncode is None:
+ if timeout is not None:
+ from .connection import wait
+ if not wait([self.sentinel], timeout):
+ return None
+ # This shouldn't block if wait() returned successfully.
+ return self.poll(os.WNOHANG if timeout == 0.0 else 0)
+ return self.returncode
+
+ def terminate(self):
+ if self.returncode is None:
+ try:
+ os.kill(self.pid, signal.SIGTERM)
+ except ProcessLookupError:
+ pass
+ except OSError:
+ if self.wait(timeout=0.1) is None:
+ raise
+
+ def _launch(self, process_obj):
+ code = 1
+ parent_r, child_w = util.pipe()
+ self.pid = os.fork()
+ if self.pid == 0:
+ try:
+ os.close(parent_r)
+ if 'random' in sys.modules:
+ import random
+ random.seed()
+ code = process_obj._bootstrap()
+ finally:
+ os._exit(code)
+ else:
+ os.close(child_w)
+ util.Finalize(self, os.close, (parent_r,))
+ self.sentinel = parent_r
+
+ @staticmethod
+ def ensure_helpers_running():
+ pass
diff --git a/Lib/multiprocessing/popen_forkserver.py b/Lib/multiprocessing/popen_forkserver.py
new file mode 100644
index 00000000000..f1c4b57a269
--- /dev/null
+++ b/Lib/multiprocessing/popen_forkserver.py
@@ -0,0 +1,75 @@
+import io
+import os
+
+from . import reduction
+if not reduction.HAVE_SEND_HANDLE:
+ raise ImportError('No support for sending fds between processes')
+from . import forkserver
+from . import popen
+from . import popen_fork
+from . import spawn
+from . import util
+
+
+__all__ = ['Popen']
+
+#
+# Wrapper for an fd used while launching a process
+#
+
+class _DupFd(object):
+ def __init__(self, ind):
+ self.ind = ind
+ def detach(self):
+ return forkserver.get_inherited_fds()[self.ind]
+
+#
+# Start child process using a server process
+#
+
+class Popen(popen_fork.Popen):
+ method = 'forkserver'
+ DupFd = _DupFd
+
+ def __init__(self, process_obj):
+ self._fds = []
+ super().__init__(process_obj)
+
+ def duplicate_for_child(self, fd):
+ self._fds.append(fd)
+ return len(self._fds) - 1
+
+ def _launch(self, process_obj):
+ prep_data = spawn.get_preparation_data(process_obj._name)
+ buf = io.BytesIO()
+ popen.set_spawning_popen(self)
+ try:
+ reduction.dump(prep_data, buf)
+ reduction.dump(process_obj, buf)
+ finally:
+ popen.set_spawning_popen(None)
+
+ self.sentinel, w = forkserver.connect_to_new_process(self._fds)
+ util.Finalize(self, os.close, (self.sentinel,))
+ with open(w, 'wb', closefd=True) as f:
+ f.write(buf.getbuffer())
+ self.pid = forkserver.read_unsigned(self.sentinel)
+
+ def poll(self, flag=os.WNOHANG):
+ if self.returncode is None:
+ from .connection import wait
+ timeout = 0 if flag == os.WNOHANG else None
+ if not wait([self.sentinel], timeout):
+ return None
+ try:
+ self.returncode = forkserver.read_unsigned(self.sentinel)
+ except (OSError, EOFError):
+ # The process ended abnormally perhaps because of a signal
+ self.returncode = 255
+ return self.returncode
+
+ @staticmethod
+ def ensure_helpers_running():
+ from . import semaphore_tracker
+ semaphore_tracker.ensure_running()
+ forkserver.ensure_running()
diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py
new file mode 100644
index 00000000000..de262aaa02f
--- /dev/null
+++ b/Lib/multiprocessing/popen_spawn_posix.py
@@ -0,0 +1,75 @@
+import fcntl
+import io
+import os
+
+from . import popen
+from . import popen_fork
+from . import reduction
+from . import spawn
+from . import util
+
+from . import current_process
+
+__all__ = ['Popen']
+
+
+#
+# Wrapper for an fd used while launching a process
+#
+
+class _DupFd(object):
+ def __init__(self, fd):
+ self.fd = fd
+ def detach(self):
+ return self.fd
+
+#
+# Start child process using a fresh interpreter
+#
+
+class Popen(popen_fork.Popen):
+ method = 'spawn'
+ DupFd = _DupFd
+
+ def __init__(self, process_obj):
+ self._fds = []
+ super().__init__(process_obj)
+
+ def duplicate_for_child(self, fd):
+ self._fds.append(fd)
+ return fd
+
+ def _launch(self, process_obj):
+ tracker_fd = current_process()._config['semaphore_tracker_fd']
+ self._fds.append(tracker_fd)
+ prep_data = spawn.get_preparation_data(process_obj._name)
+ fp = io.BytesIO()
+ popen.set_spawning_popen(self)
+ try:
+ reduction.dump(prep_data, fp)
+ reduction.dump(process_obj, fp)
+ finally:
+ popen.set_spawning_popen(None)
+
+ parent_r = child_w = child_r = parent_w = None
+ try:
+ parent_r, child_w = util.pipe()
+ child_r, parent_w = util.pipe()
+ cmd = spawn.get_command_line() + [str(child_r)]
+ self._fds.extend([child_r, child_w])
+ self.pid = util.spawnv_passfds(spawn.get_executable(),
+ cmd, self._fds)
+ self.sentinel = parent_r
+ with open(parent_w, 'wb', closefd=False) as f:
+ f.write(fp.getbuffer())
+ finally:
+ if parent_r is not None:
+ util.Finalize(self, os.close, (parent_r,))
+ for fd in (child_r, child_w, parent_w):
+ if fd is not None:
+ os.close(fd)
+
+ @staticmethod
+ def ensure_helpers_running():
+ from . import semaphore_tracker
+ semaphore_tracker.ensure_running()
diff --git a/Lib/multiprocessing/popen_spawn_win32.py b/Lib/multiprocessing/popen_spawn_win32.py
new file mode 100644
index 00000000000..7e0c4b3b501
--- /dev/null
+++ b/Lib/multiprocessing/popen_spawn_win32.py
@@ -0,0 +1,102 @@
+import os
+import msvcrt
+import signal
+import sys
+import _winapi
+
+from . import spawn
+from . import popen
+from . import reduction
+from . import util
+
+__all__ = ['Popen']
+
+#
+#
+#
+
+TERMINATE = 0x10000
+WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
+WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
+
+#
+# We define a Popen class similar to the one from subprocess, but
+# whose constructor takes a process object as its argument.
+#
+
+class Popen(object):
+ '''
+ Start a subprocess to run the code of a process object
+ '''
+ method = 'spawn'
+
+ def __init__(self, process_obj):
+ prep_data = spawn.get_preparation_data(process_obj._name)
+ cmd = ' '.join('"%s"' % x for x in spawn.get_command_line())
+
+ # read end of pipe will be "stolen" by the child process
+ # -- see spawn_main() in spawn.py.
+ rhandle, whandle = _winapi.CreatePipe(None, 0)
+ wfd = msvcrt.open_osfhandle(whandle, 0)
+ cmd += ' {} {}'.format(os.getpid(), rhandle)
+
+ with open(wfd, 'wb', closefd=True) as to_child:
+ # start process
+ try:
+ hp, ht, pid, tid = _winapi.CreateProcess(
+ spawn.get_executable(), cmd,
+ None, None, False, 0, None, None, None)
+ _winapi.CloseHandle(ht)
+ except:
+ _winapi.CloseHandle(rhandle)
+ raise
+
+ # set attributes of self
+ self.pid = pid
+ self.returncode = None
+ self._handle = hp
+ self.sentinel = int(hp)
+ util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
+
+ # send information to child
+ popen.set_spawning_popen(self)
+ try:
+ reduction.dump(prep_data, to_child)
+ reduction.dump(process_obj, to_child)
+ finally:
+ popen.set_spawning_popen(None)
+
+ def duplicate_for_child(self, handle):
+ assert self is popen.get_spawning_popen()
+ return reduction.duplicate(handle, self.sentinel)
+
+ def wait(self, timeout=None):
+ if self.returncode is None:
+ if timeout is None:
+ msecs = _winapi.INFINITE
+ else:
+ msecs = max(0, int(timeout * 1000 + 0.5))
+
+ res = _winapi.WaitForSingleObject(int(self._handle), msecs)
+ if res == _winapi.WAIT_OBJECT_0:
+ code = _winapi.GetExitCodeProcess(self._handle)
+ if code == TERMINATE:
+ code = -signal.SIGTERM
+ self.returncode = code
+
+ return self.returncode
+
+ def poll(self):
+ return self.wait(timeout=0)
+
+ def terminate(self):
+ if self.returncode is None:
+ try:
+ _winapi.TerminateProcess(int(self._handle), TERMINATE)
+ except OSError:
+ if self.wait(timeout=1.0) is None:
+ raise
+
+ @staticmethod
+ def ensure_helpers_running():
+ pass
diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
index 893507bcd5a..c1cb36f4c70 100644
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -43,7 +43,7 @@ def active_children():
Return list of process objects corresponding to live child processes
'''
_cleanup()
- return list(_current_process._children)
+ return list(_children)
#
#
@@ -51,9 +51,9 @@ def active_children():
def _cleanup():
# check for processes which have finished
- for p in list(_current_process._children):
+ for p in list(_children):
if p._popen.poll() is not None:
- _current_process._children.discard(p)
+ _children.discard(p)
#
# The `Process` class
@@ -63,21 +63,16 @@ class Process(object):
'''
Process objects represent activity that is run in a separate process
- The class is analagous to `threading.Thread`
+ The class is analogous to `threading.Thread`
'''
_Popen = None
def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
*, daemon=None):
assert group is None, 'group argument must be None for now'
- count = next(_current_process._counter)
+ count = next(_process_counter)
self._identity = _current_process._identity + (count,)
- self._authkey = _current_process._authkey
- if daemon is not None:
- self._daemonic = daemon
- else:
- self._daemonic = _current_process._daemonic
- self._tempdir = _current_process._tempdir
+ self._config = _current_process._config.copy()
self._parent_pid = os.getpid()
self._popen = None
self._target = target
@@ -85,6 +80,8 @@ class Process(object):
self._kwargs = dict(kwargs)
self._name = name or type(self).__name__ + '-' + \
':'.join(str(i) for i in self._identity)
+ if daemon is not None:
+ self.daemon = daemon
_dangling.add(self)
def run(self):
@@ -101,16 +98,16 @@ class Process(object):
assert self._popen is None, 'cannot start a process twice'
assert self._parent_pid == os.getpid(), \
'can only start a process object created by current process'
- assert not _current_process._daemonic, \
+ assert not _current_process._config.get('daemon'), \
'daemonic processes are not allowed to have children'
_cleanup()
if self._Popen is not None:
Popen = self._Popen
else:
- from .forking import Popen
+ from .popen import Popen
self._popen = Popen(self)
self._sentinel = self._popen.sentinel
- _current_process._children.add(self)
+ _children.add(self)
def terminate(self):
'''
@@ -126,7 +123,7 @@ class Process(object):
assert self._popen is not None, 'can only join a started process'
res = self._popen.wait(timeout)
if res is not None:
- _current_process._children.discard(self)
+ _children.discard(self)
def is_alive(self):
'''
@@ -154,7 +151,7 @@ class Process(object):
'''
Return whether process is a daemon
'''
- return self._daemonic
+ return self._config.get('daemon', False)
@daemon.setter
def daemon(self, daemonic):
@@ -162,18 +159,18 @@ class Process(object):
Set whether process is a daemon
'''
assert self._popen is None, 'process has already started'
- self._daemonic = daemonic
+ self._config['daemon'] = daemonic
@property
def authkey(self):
- return self._authkey
+ return self._config['authkey']
@authkey.setter
def authkey(self, authkey):
'''
Set authorization key of process
'''
- self._authkey = AuthenticationString(authkey)
+ self._config['authkey'] = AuthenticationString(authkey)
@property
def exitcode(self):
@@ -227,17 +224,17 @@ class Process(object):
status = 'stopped[%s]' % _exitcode_to_name.get(status, status)
return '<%s(%s, %s%s)>' % (type(self).__name__, self._name,
- status, self._daemonic and ' daemon' or '')
+ status, self.daemon and ' daemon' or '')
##
def _bootstrap(self):
from . import util
- global _current_process
+ global _current_process, _process_counter, _children
try:
- self._children = set()
- self._counter = itertools.count(1)
+ _process_counter = itertools.count(1)
+ _children = set()
if sys.stdin is not None:
try:
sys.stdin.close()
@@ -285,8 +282,8 @@ class Process(object):
class AuthenticationString(bytes):
def __reduce__(self):
- from .forking import Popen
- if not Popen.thread_is_spawning():
+ from .popen import get_spawning_popen
+ if get_spawning_popen() is None:
raise TypeError(
'Pickling an AuthenticationString object is '
'disallowed for security reasons'
@@ -301,16 +298,19 @@ class _MainProcess(Process):
def __init__(self):
self._identity = ()
- self._daemonic = False
self._name = 'MainProcess'
self._parent_pid = None
self._popen = None
- self._counter = itertools.count(1)
- self._children = set()
- self._authkey = AuthenticationString(os.urandom(32))
- self._tempdir = None
+ self._config = {'authkey': AuthenticationString(os.urandom(32)),
+ 'semprefix': 'mp'}
+ # Note that some versions of FreeBSD only allow named
+ # semaphores to have names of up to 14 characters. Therfore
+ # we choose a short prefix.
+
_current_process = _MainProcess()
+_process_counter = itertools.count(1)
+_children = set()
del _MainProcess
#
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
index ec188ee4e14..10e40a582d0 100644
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -18,11 +18,15 @@ import weakref
import errno
from queue import Empty, Full
+
import _multiprocessing
-from multiprocessing.connection import Pipe
-from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
-from multiprocessing.util import debug, info, Finalize, register_after_fork
-from multiprocessing.forking import assert_spawning, ForkingPickler
+
+from . import connection
+from . import popen
+from . import synchronize
+
+from .util import debug, info, Finalize, register_after_fork, is_exiting
+from .reduction import ForkingPickler
#
# Queue type using a pipe, buffer and thread
@@ -34,14 +38,14 @@ class Queue(object):
if maxsize <= 0:
maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
self._maxsize = maxsize
- self._reader, self._writer = Pipe(duplex=False)
- self._rlock = Lock()
+ self._reader, self._writer = connection.Pipe(duplex=False)
+ self._rlock = synchronize.Lock()
self._opid = os.getpid()
if sys.platform == 'win32':
self._wlock = None
else:
- self._wlock = Lock()
- self._sem = BoundedSemaphore(maxsize)
+ self._wlock = synchronize.Lock()
+ self._sem = synchronize.BoundedSemaphore(maxsize)
# For use by concurrent.futures
self._ignore_epipe = False
@@ -51,7 +55,7 @@ class Queue(object):
register_after_fork(self, Queue._after_fork)
def __getstate__(self):
- assert_spawning(self)
+ popen.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
@@ -208,8 +212,6 @@ class Queue(object):
@staticmethod
def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
debug('starting thread to feed data to pipe')
- from .util import is_exiting
-
nacquire = notempty.acquire
nrelease = notempty.release
nwait = notempty.wait
@@ -279,8 +281,8 @@ class JoinableQueue(Queue):
def __init__(self, maxsize=0):
Queue.__init__(self, maxsize)
- self._unfinished_tasks = Semaphore(0)
- self._cond = Condition()
+ self._unfinished_tasks = synchronize.Semaphore(0)
+ self._cond = synchronize.Condition()
def __getstate__(self):
return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
@@ -331,19 +333,19 @@ class JoinableQueue(Queue):
class SimpleQueue(object):
def __init__(self):
- self._reader, self._writer = Pipe(duplex=False)
- self._rlock = Lock()
+ self._reader, self._writer = connection.Pipe(duplex=False)
+ self._rlock = synchronize.Lock()
self._poll = self._reader.poll
if sys.platform == 'win32':
self._wlock = None
else:
- self._wlock = Lock()
+ self._wlock = synchronize.Lock()
def empty(self):
return not self._poll()
def __getstate__(self):
- assert_spawning(self)
+ popen.assert_spawning(self)
return (self._reader, self._writer, self._rlock, self._wlock)
def __setstate__(self, state):
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
index 656fa8ff6b0..5bbbcf409a6 100644
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -1,6 +1,5 @@
#
-# Module to allow connection and socket objects to be transferred
-# between processes
+# Module which deals with pickling of objects.
#
# multiprocessing/reduction.py
#
@@ -8,27 +7,57 @@
# Licensed to PSF under a Contributor Agreement.
#
-__all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle']
-
+import copyreg
+import functools
+import io
import os
-import sys
+import pickle
import socket
-import threading
-import struct
-import signal
+import sys
-from multiprocessing import current_process
-from multiprocessing.util import register_after_fork, debug, sub_debug
-from multiprocessing.util import is_exiting, sub_warning
+from . import popen
+from . import util
+__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
+
+
+HAVE_SEND_HANDLE = (sys.platform == 'win32' or
+ (hasattr(socket, 'CMSG_LEN') and
+ hasattr(socket, 'SCM_RIGHTS') and
+ hasattr(socket.socket, 'sendmsg')))
#
+# Pickler subclass
#
-#
-if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
- hasattr(socket, 'SCM_RIGHTS'))):
- raise ImportError('pickling of connections not supported')
+class ForkingPickler(pickle.Pickler):
+ '''Pickler subclass used by multiprocessing.'''
+ _extra_reducers = {}
+ _copyreg_dispatch_table = copyreg.dispatch_table
+
+ def __init__(self, *args):
+ super().__init__(*args)
+ self.dispatch_table = self._copyreg_dispatch_table.copy()
+ self.dispatch_table.update(self._extra_reducers)
+
+ @classmethod
+ def register(cls, type, reduce):
+ '''Register a reduce function for a type.'''
+ cls._extra_reducers[type] = reduce
+
+ @classmethod
+ def dumps(cls, obj, protocol=None):
+ buf = io.BytesIO()
+ cls(buf, protocol).dump(obj)
+ return buf.getbuffer()
+
+ loads = pickle.loads
+
+register = ForkingPickler.register
+
+def dump(obj, file, protocol=None):
+ '''Replacement for pickle.dump() using ForkingPickler.'''
+ ForkingPickler(file, protocol).dump(obj)
#
# Platform specific definitions
@@ -36,20 +65,44 @@ if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
if sys.platform == 'win32':
# Windows
- __all__ += ['reduce_pipe_connection']
+ __all__ += ['DupHandle', 'duplicate', 'steal_handle']
import _winapi
+ def duplicate(handle, target_process=None, inheritable=False):
+ '''Duplicate a handle. (target_process is a handle not a pid!)'''
+ if target_process is None:
+ target_process = _winapi.GetCurrentProcess()
+ return _winapi.DuplicateHandle(
+ _winapi.GetCurrentProcess(), handle, target_process,
+ 0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
+
+ def steal_handle(source_pid, handle):
+ '''Steal a handle from process identified by source_pid.'''
+ source_process_handle = _winapi.OpenProcess(
+ _winapi.PROCESS_DUP_HANDLE, False, source_pid)
+ try:
+ return _winapi.DuplicateHandle(
+ source_process_handle, handle,
+ _winapi.GetCurrentProcess(), 0, False,
+ _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
+ finally:
+ _winapi.CloseHandle(source_process_handle)
+
def send_handle(conn, handle, destination_pid):
+ '''Send a handle over a local connection.'''
dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
conn.send(dh)
def recv_handle(conn):
+ '''Receive a handle over a local connection.'''
return conn.recv().detach()
class DupHandle(object):
+ '''Picklable wrapper for a handle.'''
def __init__(self, handle, access, pid=None):
- # duplicate handle for process with given pid
if pid is None:
+ # We just duplicate the handle in the current process and
+ # let the receiving process steal the handle.
pid = os.getpid()
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
try:
@@ -62,9 +115,12 @@ if sys.platform == 'win32':
self._pid = pid
def detach(self):
+ '''Get the handle. This should only be called once.'''
# retrieve handle from process which currently owns it
if self._pid == os.getpid():
+ # The handle has already been duplicated for this process.
return self._handle
+ # We must steal the handle from the process whose pid is self._pid.
proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
self._pid)
try:
@@ -74,207 +130,112 @@ if sys.platform == 'win32':
finally:
_winapi.CloseHandle(proc)
- class DupSocket(object):
- def __init__(self, sock):
- new_sock = sock.dup()
- def send(conn, pid):
- share = new_sock.share(pid)
- conn.send_bytes(share)
- self._id = resource_sharer.register(send, new_sock.close)
-
- def detach(self):
- conn = resource_sharer.get_connection(self._id)
- try:
- share = conn.recv_bytes()
- return socket.fromshare(share)
- finally:
- conn.close()
-
- def reduce_socket(s):
- return rebuild_socket, (DupSocket(s),)
-
- def rebuild_socket(ds):
- return ds.detach()
-
- def reduce_connection(conn):
- handle = conn.fileno()
- with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
- ds = DupSocket(s)
- return rebuild_connection, (ds, conn.readable, conn.writable)
-
- def rebuild_connection(ds, readable, writable):
- from .connection import Connection
- sock = ds.detach()
- return Connection(sock.detach(), readable, writable)
-
- def reduce_pipe_connection(conn):
- access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
- (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
- dh = DupHandle(conn.fileno(), access)
- return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
-
- def rebuild_pipe_connection(dh, readable, writable):
- from .connection import PipeConnection
- handle = dh.detach()
- return PipeConnection(handle, readable, writable)
-
else:
# Unix
+ __all__ += ['DupFd', 'sendfds', 'recvfds']
+ import array
# On MacOSX we should acknowledge receipt of fds -- see Issue14669
ACKNOWLEDGE = sys.platform == 'darwin'
+ def sendfds(sock, fds):
+ '''Send an array of fds over an AF_UNIX socket.'''
+ fds = array.array('i', fds)
+ msg = bytes([len(fds) % 256])
+ sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
+ if ACKNOWLEDGE and sock.recv(1) != b'A':
+ raise RuntimeError('did not receive acknowledgement of fd')
+
+ def recvfds(sock, size):
+ '''Receive an array of fds over an AF_UNIX socket.'''
+ a = array.array('i')
+ bytes_size = a.itemsize * size
+ msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
+ if not msg and not ancdata:
+ raise EOFError
+ try:
+ if ACKNOWLEDGE:
+ sock.send(b'A')
+ if len(ancdata) != 1:
+ raise RuntimeError('received %d items of ancdata' %
+ len(ancdata))
+ cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+ if (cmsg_level == socket.SOL_SOCKET and
+ cmsg_type == socket.SCM_RIGHTS):
+ if len(cmsg_data) % a.itemsize != 0:
+ raise ValueError
+ a.frombytes(cmsg_data)
+ assert len(a) % 256 == msg[0]
+ return list(a)
+ except (ValueError, IndexError):
+ pass
+ raise RuntimeError('Invalid data received')
+
def send_handle(conn, handle, destination_pid):
+ '''Send a handle over a local connection.'''
with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
- s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
- struct.pack("@i", handle))])
- if ACKNOWLEDGE and conn.recv_bytes() != b'ACK':
- raise RuntimeError('did not receive acknowledgement of fd')
+ sendfds(s, [handle])
def recv_handle(conn):
- size = struct.calcsize("@i")
+ '''Receive a handle over a local connection.'''
with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
- msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
- try:
- if ACKNOWLEDGE:
- conn.send_bytes(b'ACK')
- cmsg_level, cmsg_type, cmsg_data = ancdata[0]
- if (cmsg_level == socket.SOL_SOCKET and
- cmsg_type == socket.SCM_RIGHTS):
- return struct.unpack("@i", cmsg_data[:size])[0]
- except (ValueError, IndexError, struct.error):
- pass
- raise RuntimeError('Invalid data received')
-
- class DupFd(object):
- def __init__(self, fd):
- new_fd = os.dup(fd)
- def send(conn, pid):
- send_handle(conn, new_fd, pid)
- def close():
- os.close(new_fd)
- self._id = resource_sharer.register(send, close)
+ return recvfds(s, 1)[0]
+
+ def DupFd(fd):
+ '''Return a wrapper for an fd.'''
+ popen_obj = popen.get_spawning_popen()
+ if popen_obj is not None:
+ return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
+ elif HAVE_SEND_HANDLE:
+ from . import resource_sharer
+ return resource_sharer.DupFd(fd)
+ else:
+ raise ValueError('SCM_RIGHTS appears not to be available')
- def detach(self):
- conn = resource_sharer.get_connection(self._id)
- try:
- return recv_handle(conn)
- finally:
- conn.close()
+#
+# Try making some callable types picklable
+#
- def reduce_socket(s):
- df = DupFd(s.fileno())
- return rebuild_socket, (df, s.family, s.type, s.proto)
+def _reduce_method(m):
+ if m.__self__ is None:
+ return getattr, (m.__class__, m.__func__.__name__)
+ else:
+ return getattr, (m.__self__, m.__func__.__name__)
+class _C:
+ def f(self):
+ pass
+register(type(_C().f), _reduce_method)
- def rebuild_socket(df, family, type, proto):
- fd = df.detach()
- s = socket.fromfd(fd, family, type, proto)
- os.close(fd)
- return s
- def reduce_connection(conn):
- df = DupFd(conn.fileno())
- return rebuild_connection, (df, conn.readable, conn.writable)
+def _reduce_method_descriptor(m):
+ return getattr, (m.__objclass__, m.__name__)
+register(type(list.append), _reduce_method_descriptor)
+register(type(int.__add__), _reduce_method_descriptor)
- def rebuild_connection(df, readable, writable):
- from .connection import Connection
- fd = df.detach()
- return Connection(fd, readable, writable)
+
+def _reduce_partial(p):
+ return _rebuild_partial, (p.func, p.args, p.keywords or {})
+def _rebuild_partial(func, args, keywords):
+ return functools.partial(func, *args, **keywords)
+register(functools.partial, _reduce_partial)
#
-# Server which shares registered resources with clients
+# Make sockets picklable
#
-class ResourceSharer(object):
- def __init__(self):
- self._key = 0
- self._cache = {}
- self._old_locks = []
- self._lock = threading.Lock()
- self._listener = None
- self._address = None
- self._thread = None
- register_after_fork(self, ResourceSharer._afterfork)
-
- def register(self, send, close):
- with self._lock:
- if self._address is None:
- self._start()
- self._key += 1
- self._cache[self._key] = (send, close)
- return (self._address, self._key)
-
- @staticmethod
- def get_connection(ident):
- from .connection import Client
- address, key = ident
- c = Client(address, authkey=current_process().authkey)
- c.send((key, os.getpid()))
- return c
-
- def stop(self, timeout=None):
- from .connection import Client
- with self._lock:
- if self._address is not None:
- c = Client(self._address, authkey=current_process().authkey)
- c.send(None)
- c.close()
- self._thread.join(timeout)
- if self._thread.is_alive():
- sub_warn('ResourceSharer thread did not stop when asked')
- self._listener.close()
- self._thread = None
- self._address = None
- self._listener = None
- for key, (send, close) in self._cache.items():
- close()
- self._cache.clear()
-
- def _afterfork(self):
- for key, (send, close) in self._cache.items():
- close()
- self._cache.clear()
- # If self._lock was locked at the time of the fork, it may be broken
- # -- see issue 6721. Replace it without letting it be gc'ed.
- self._old_locks.append(self._lock)
- self._lock = threading.Lock()
- if self._listener is not None:
- self._listener.close()
- self._listener = None
- self._address = None
- self._thread = None
-
- def _start(self):
- from .connection import Listener
- assert self._listener is None
- debug('starting listener and thread for sending handles')
- self._listener = Listener(authkey=current_process().authkey)
- self._address = self._listener.address
- t = threading.Thread(target=self._serve)
- t.daemon = True
- t.start()
- self._thread = t
-
- def _serve(self):
- if hasattr(signal, 'pthread_sigmask'):
- signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
- while 1:
- try:
- conn = self._listener.accept()
- msg = conn.recv()
- if msg is None:
- break
- key, destination_pid = msg
- send, close = self._cache.pop(key)
- send(conn, destination_pid)
- close()
- conn.close()
- except:
- if not is_exiting():
- import traceback
- sub_warning(
- 'thread for sharing handles raised exception :\n' +
- '-'*79 + '\n' + traceback.format_exc() + '-'*79
- )
-
-resource_sharer = ResourceSharer()
+if sys.platform == 'win32':
+ def _reduce_socket(s):
+ from .resource_sharer import DupSocket
+ return _rebuild_socket, (DupSocket(s),)
+ def _rebuild_socket(ds):
+ return ds.detach()
+ register(socket.socket, _reduce_socket)
+
+else:
+ def _reduce_socket(s):
+ df = DupFd(s.fileno())
+ return _rebuild_socket, (df, s.family, s.type, s.proto)
+ def _rebuild_socket(df, family, type, proto):
+ fd = df.detach()
+ return socket.socket(family, type, proto, fileno=fd)
+ register(socket.socket, _reduce_socket)
diff --git a/Lib/multiprocessing/resource_sharer.py b/Lib/multiprocessing/resource_sharer.py
new file mode 100644
index 00000000000..5e46fc65b4e
--- /dev/null
+++ b/Lib/multiprocessing/resource_sharer.py
@@ -0,0 +1,158 @@
+#
+# We use a background thread for sharing fds on Unix, and for sharing sockets on
+# Windows.
+#
+# A client which wants to pickle a resource registers it with the resource
+# sharer and gets an identifier in return. The unpickling process will connect
+# to the resource sharer, sends the identifier and its pid, and then receives
+# the resource.
+#
+
+import os
+import signal
+import socket
+import sys
+import threading
+
+from . import process
+from . import reduction
+from . import util
+
+__all__ = ['stop']
+
+
+if sys.platform == 'win32':
+ __all__ += ['DupSocket']
+
+ class DupSocket(object):
+ '''Picklable wrapper for a socket.'''
+ def __init__(self, sock):
+ new_sock = sock.dup()
+ def send(conn, pid):
+ share = new_sock.share(pid)
+ conn.send_bytes(share)
+ self._id = _resource_sharer.register(send, new_sock.close)
+
+ def detach(self):
+ '''Get the socket. This should only be called once.'''
+ with _resource_sharer.get_connection(self._id) as conn:
+ share = conn.recv_bytes()
+ return socket.fromshare(share)
+
+else:
+ __all__ += ['DupFd']
+
+ class DupFd(object):
+ '''Wrapper for fd which can be used at any time.'''
+ def __init__(self, fd):
+ new_fd = os.dup(fd)
+ def send(conn, pid):
+ reduction.send_handle(conn, new_fd, pid)
+ def close():
+ os.close(new_fd)
+ self._id = _resource_sharer.register(send, close)
+
+ def detach(self):
+ '''Get the fd. This should only be called once.'''
+ with _resource_sharer.get_connection(self._id) as conn:
+ return reduction.recv_handle(conn)
+
+
+class _ResourceSharer(object):
+ '''Manager for resouces using background thread.'''
+ def __init__(self):
+ self._key = 0
+ self._cache = {}
+ self._old_locks = []
+ self._lock = threading.Lock()
+ self._listener = None
+ self._address = None
+ self._thread = None
+ util.register_after_fork(self, _ResourceSharer._afterfork)
+
+ def register(self, send, close):
+ '''Register resource, returning an identifier.'''
+ with self._lock:
+ if self._address is None:
+ self._start()
+ self._key += 1
+ self._cache[self._key] = (send, close)
+ return (self._address, self._key)
+
+ @staticmethod
+ def get_connection(ident):
+ '''Return connection from which to receive identified resource.'''
+ from .connection import Client
+ address, key = ident
+ c = Client(address, authkey=process.current_process().authkey)
+ c.send((key, os.getpid()))
+ return c
+
+ def stop(self, timeout=None):
+ '''Stop the background thread and clear registered resources.'''
+ from .connection import Client
+ with self._lock:
+ if self._address is not None:
+ c = Client(self._address,
+ authkey=process.current_process().authkey)
+ c.send(None)
+ c.close()
+ self._thread.join(timeout)
+ if self._thread.is_alive():
+ util.sub_warning('_ResourceSharer thread did '
+ 'not stop when asked')
+ self._listener.close()
+ self._thread = None
+ self._address = None
+ self._listener = None
+ for key, (send, close) in self._cache.items():
+ close()
+ self._cache.clear()
+
+ def _afterfork(self):
+ for key, (send, close) in self._cache.items():
+ close()
+ self._cache.clear()
+ # If self._lock was locked at the time of the fork, it may be broken
+ # -- see issue 6721. Replace it without letting it be gc'ed.
+ self._old_locks.append(self._lock)
+ self._lock = threading.Lock()
+ if self._listener is not None:
+ self._listener.close()
+ self._listener = None
+ self._address = None
+ self._thread = None
+
+ def _start(self):
+ from .connection import Listener
+ assert self._listener is None
+ util.debug('starting listener and thread for sending handles')
+ self._listener = Listener(authkey=process.current_process().authkey)
+ self._address = self._listener.address
+ t = threading.Thread(target=self._serve)
+ t.daemon = True
+ t.start()
+ self._thread = t
+
+ def _serve(self):
+ if hasattr(signal, 'pthread_sigmask'):
+ signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
+ while 1:
+ try:
+ with self._listener.accept() as conn:
+ msg = conn.recv()
+ if msg is None:
+ break
+ key, destination_pid = msg
+ send, close = self._cache.pop(key)
+ try:
+ send(conn, destination_pid)
+ finally:
+ close()
+ except:
+ if not util.is_exiting():
+ sys.excepthook(*sys.exc_info())
+
+
+_resource_sharer = _ResourceSharer()
+stop = _resource_sharer.stop
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py
new file mode 100644
index 00000000000..4a2d63658d3
--- /dev/null
+++ b/Lib/multiprocessing/semaphore_tracker.py
@@ -0,0 +1,135 @@
+#
+# On Unix we run a server process which keeps track of unlinked
+# semaphores. The server ignores SIGINT and SIGTERM and reads from a
+# pipe. Every other process of the program has a copy of the writable
+# end of the pipe, so we get EOF when all other processes have exited.
+# Then the server process unlinks any remaining semaphore names.
+#
+# This is important because the system only supports a limited number
+# of named semaphores, and they will not be automatically removed till
+# the next reboot. Without this semaphore tracker process, "killall
+# python" would probably leave unlinked semaphores.
+#
+
+import errno
+import os
+import signal
+import sys
+import threading
+import warnings
+import _multiprocessing
+
+from . import spawn
+from . import util
+from . import current_process
+
+__all__ = ['ensure_running', 'register', 'unregister']
+
+
+_lock = threading.Lock()
+
+
+def ensure_running():
+ '''Make sure that semaphore tracker process is running.
+
+ This can be run from any process. Usually a child process will use
+ the semaphore created by its parent.'''
+ with _lock:
+ config = current_process()._config
+ if config.get('semaphore_tracker_fd') is not None:
+ return
+ fds_to_pass = []
+ try:
+ fds_to_pass.append(sys.stderr.fileno())
+ except Exception:
+ pass
+ cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)'
+ r, semaphore_tracker_fd = util.pipe()
+ try:
+ fds_to_pass.append(r)
+ # process will out live us, so no need to wait on pid
+ exe = spawn.get_executable()
+ args = [exe] + util._args_from_interpreter_flags()
+ args += ['-c', cmd % r]
+ util.spawnv_passfds(exe, args, fds_to_pass)
+ except:
+ os.close(semaphore_tracker_fd)
+ raise
+ else:
+ config['semaphore_tracker_fd'] = semaphore_tracker_fd
+ finally:
+ os.close(r)
+
+
+def register(name):
+ '''Register name of semaphore with semaphore tracker.'''
+ _send('REGISTER', name)
+
+
+def unregister(name):
+ '''Unregister name of semaphore with semaphore tracker.'''
+ _send('UNREGISTER', name)
+
+
+def _send(cmd, name):
+ msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
+ if len(name) > 512:
+ # posix guarantees that writes to a pipe of less than PIPE_BUF
+ # bytes are atomic, and that PIPE_BUF >= 512
+ raise ValueError('name too long')
+ fd = current_process()._config['semaphore_tracker_fd']
+ nbytes = os.write(fd, msg)
+ assert nbytes == len(msg)
+
+
+def main(fd):
+ '''Run semaphore tracker.'''
+ # protect the process from ^C and "killall python" etc
+ signal.signal(signal.SIGINT, signal.SIG_IGN)
+ signal.signal(signal.SIGTERM, signal.SIG_IGN)
+
+ for f in (sys.stdin, sys.stdout):
+ try:
+ f.close()
+ except Exception:
+ pass
+
+ cache = set()
+ try:
+ # keep track of registered/unregistered semaphores
+ with open(fd, 'rb') as f:
+ for line in f:
+ try:
+ cmd, name = line.strip().split(b':')
+ if cmd == b'REGISTER':
+ cache.add(name)
+ elif cmd == b'UNREGISTER':
+ cache.remove(name)
+ else:
+ raise RuntimeError('unrecognized command %r' % cmd)
+ except Exception:
+ try:
+ sys.excepthook(*sys.exc_info())
+ except:
+ pass
+ finally:
+ # all processes have terminated; cleanup any remaining semaphores
+ if cache:
+ try:
+ warnings.warn('semaphore_tracker: There appear to be %d '
+ 'leaked semaphores to clean up at shutdown' %
+ len(cache))
+ except Exception:
+ pass
+ for name in cache:
+ # For some reason the process which created and registered this
+ # semaphore has failed to unregister it. Presumably it has died.
+ # We therefore unlink it.
+ try:
+ name = name.decode('ascii')
+ try:
+ _multiprocessing.sem_unlink(name)
+ except Exception as e:
+ warnings.warn('semaphore_tracker: %r: %s' % (name, e))
+ finally:
+ pass
diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py
index a358ed4f120..a97dadf53f0 100644
--- a/Lib/multiprocessing/sharedctypes.py
+++ b/Lib/multiprocessing/sharedctypes.py
@@ -10,8 +10,11 @@
import ctypes
import weakref
-from multiprocessing import heap, RLock
-from multiprocessing.forking import assert_spawning, ForkingPickler
+from . import heap
+
+from .synchronize import RLock
+from .reduction import ForkingPickler
+from .popen import assert_spawning
__all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py
new file mode 100644
index 00000000000..83561dbd3fa
--- /dev/null
+++ b/Lib/multiprocessing/spawn.py
@@ -0,0 +1,258 @@
+#
+# Code used to start processes when using the spawn or forkserver
+# start methods.
+#
+# multiprocessing/spawn.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+import os
+import pickle
+import sys
+
+from . import process
+from . import util
+from . import popen
+
+__all__ = ['_main', 'freeze_support', 'set_executable', 'get_executable',
+ 'get_preparation_data', 'get_command_line', 'import_main_path']
+
+#
+# _python_exe is the assumed path to the python executable.
+# People embedding Python want to modify it.
+#
+
+if sys.platform != 'win32':
+ WINEXE = False
+ WINSERVICE = False
+else:
+ WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
+ WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
+
+if WINSERVICE:
+ _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
+else:
+ _python_exe = sys.executable
+
+def set_executable(exe):
+ global _python_exe
+ _python_exe = exe
+
+def get_executable():
+ return _python_exe
+
+#
+#
+#
+
+def is_forking(argv):
+ '''
+ Return whether commandline indicates we are forking
+ '''
+ if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
+ return True
+ else:
+ return False
+
+
+def freeze_support():
+ '''
+ Run code for process object if this in not the main process
+ '''
+ if is_forking(sys.argv):
+ main()
+ sys.exit()
+
+
+def get_command_line():
+ '''
+ Returns prefix of command line used for spawning a child process
+ '''
+ if getattr(sys, 'frozen', False):
+ return [sys.executable, '--multiprocessing-fork']
+ else:
+ prog = 'from multiprocessing.spawn import spawn_main; spawn_main()'
+ opts = util._args_from_interpreter_flags()
+ return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
+
+
+def spawn_main():
+ '''
+ Run code specifed by data received over pipe
+ '''
+ assert is_forking(sys.argv)
+ handle = int(sys.argv[-1])
+ if sys.platform == 'win32':
+ import msvcrt
+ from .reduction import steal_handle
+ pid = int(sys.argv[-2])
+ new_handle = steal_handle(pid, handle)
+ fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
+ else:
+ fd = handle
+ exitcode = _main(fd)
+ sys.exit(exitcode)
+
+
+def _main(fd):
+ with os.fdopen(fd, 'rb', closefd=True) as from_parent:
+ process.current_process()._inheriting = True
+ try:
+ preparation_data = pickle.load(from_parent)
+ prepare(preparation_data)
+ self = pickle.load(from_parent)
+ finally:
+ del process.current_process()._inheriting
+ return self._bootstrap()
+
+
+def _check_not_importing_main():
+ if getattr(process.current_process(), '_inheriting', False):
+ raise RuntimeError('''
+ An attempt has been made to start a new process before the
+ current process has finished its bootstrapping phase.
+
+ This probably means that you are not using fork to start your
+ child processes and you have forgotten to use the proper idiom
+ in the main module:
+
+ if __name__ == '__main__':
+ freeze_support()
+ ...
+
+ The "freeze_support()" line can be omitted if the program
+ is not going to be frozen to produce an executable.''')
+
+
+def get_preparation_data(name):
+ '''
+ Return info about parent needed by child to unpickle process object
+ '''
+ _check_not_importing_main()
+ d = dict(
+ log_to_stderr=util._log_to_stderr,
+ authkey=process.current_process().authkey,
+ )
+
+ if util._logger is not None:
+ d['log_level'] = util._logger.getEffectiveLevel()
+
+ sys_path=sys.path.copy()
+ try:
+ i = sys_path.index('')
+ except ValueError:
+ pass
+ else:
+ sys_path[i] = process.ORIGINAL_DIR
+
+ d.update(
+ name=name,
+ sys_path=sys_path,
+ sys_argv=sys.argv,
+ orig_dir=process.ORIGINAL_DIR,
+ dir=os.getcwd(),
+ start_method=popen.get_start_method(),
+ )
+
+ if sys.platform != 'win32' or (not WINEXE and not WINSERVICE):
+ main_path = getattr(sys.modules['__main__'], '__file__', None)
+ if not main_path and sys.argv[0] not in ('', '-c'):
+ main_path = sys.argv[0]
+ if main_path is not None:
+ if (not os.path.isabs(main_path) and
+ process.ORIGINAL_DIR is not None):
+ main_path = os.path.join(process.ORIGINAL_DIR, main_path)
+ d['main_path'] = os.path.normpath(main_path)
+
+ return d
+
+#
+# Prepare current process
+#
+
+old_main_modules = []
+
+def prepare(data):
+ '''
+ Try to get current process ready to unpickle process object
+ '''
+ if 'name' in data:
+ process.current_process().name = data['name']
+
+ if 'authkey' in data:
+ process.current_process().authkey = data['authkey']
+
+ if 'log_to_stderr' in data and data['log_to_stderr']:
+ util.log_to_stderr()
+
+ if 'log_level' in data:
+ util.get_logger().setLevel(data['log_level'])
+
+ if 'sys_path' in data:
+ sys.path = data['sys_path']
+
+ if 'sys_argv' in data:
+ sys.argv = data['sys_argv']
+
+ if 'dir' in data:
+ os.chdir(data['dir'])
+
+ if 'orig_dir' in data:
+ process.ORIGINAL_DIR = data['orig_dir']
+
+ if 'start_method' in data:
+ popen.set_start_method(data['start_method'], start_helpers=False)
+
+ if 'main_path' in data:
+ import_main_path(data['main_path'])
+
+
+def import_main_path(main_path):
+ '''
+ Set sys.modules['__main__'] to module at main_path
+ '''
+ # XXX (ncoghlan): The following code makes several bogus
+ # assumptions regarding the relationship between __file__
+ # and a module's real name. See PEP 302 and issue #10845
+ if getattr(sys.modules['__main__'], '__file__', None) == main_path:
+ return
+
+ main_name = os.path.splitext(os.path.basename(main_path))[0]
+ if main_name == '__init__':
+ main_name = os.path.basename(os.path.dirname(main_path))
+
+ if main_name == '__main__':
+ main_module = sys.modules['__main__']
+ main_module.__file__ = main_path
+ elif main_name != 'ipython':
+ # Main modules not actually called __main__.py may
+ # contain additional code that should still be executed
+ import importlib
+ import types
+
+ if main_path is None:
+ dirs = None
+ elif os.path.basename(main_path).startswith('__init__.py'):
+ dirs = [os.path.dirname(os.path.dirname(main_path))]
+ else:
+ dirs = [os.path.dirname(main_path)]
+
+ assert main_name not in sys.modules, main_name
+ sys.modules.pop('__mp_main__', None)
+ # We should not try to load __main__
+ # since that would execute 'if __name__ == "__main__"'
+ # clauses, potentially causing a psuedo fork bomb.
+ loader = importlib.find_loader(main_name, path=dirs)
+ main_module = types.ModuleType(main_name)
+ try:
+ loader.init_module_attrs(main_module)
+ except AttributeError: # init_module_attrs is optional
+ pass
+ main_module.__name__ = '__mp_main__'
+ code = loader.get_code(main_name)
+ exec(code, main_module.__dict__)
+
+ old_main_modules.append(sys.modules['__main__'])
+ sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
index 0faca78412c..09736efbb00 100644
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -11,20 +11,24 @@ __all__ = [
'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
]
+import os
import threading
import sys
-
+import itertools
+import tempfile
import _multiprocessing
-from multiprocessing.process import current_process
-from multiprocessing.util import register_after_fork, debug
-from multiprocessing.forking import assert_spawning, Popen
+
from time import time as _time
+from . import popen
+from . import process
+from . import util
+
# Try to import the mp.synchronize module cleanly, if it fails
# raise ImportError for platforms lacking a working sem_open implementation.
# See issue 3770
try:
- from _multiprocessing import SemLock
+ from _multiprocessing import SemLock, sem_unlink
except (ImportError):
raise ImportError("This platform lacks a functioning sem_open" +
" implementation, therefore, the required" +
@@ -44,15 +48,45 @@ SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
class SemLock(object):
+ _rand = tempfile._RandomNameSequence()
+
def __init__(self, kind, value, maxvalue):
- sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
- debug('created semlock with handle %s' % sl.handle)
+ unlink_immediately = (sys.platform == 'win32' or
+ popen.get_start_method() == 'fork')
+ for i in range(100):
+ try:
+ sl = self._semlock = _multiprocessing.SemLock(
+ kind, value, maxvalue, self._make_name(),
+ unlink_immediately)
+ except FileExistsError:
+ pass
+ else:
+ break
+ else:
+ raise FileExistsError('cannot find name for semaphore')
+
+ util.debug('created semlock with handle %s' % sl.handle)
self._make_methods()
if sys.platform != 'win32':
def _after_fork(obj):
obj._semlock._after_fork()
- register_after_fork(self, _after_fork)
+ util.register_after_fork(self, _after_fork)
+
+ if self._semlock.name is not None:
+ # We only get here if we are on Unix with forking
+ # disabled. When the object is garbage collected or the
+ # process shuts down we unlink the semaphore name
+ from .semaphore_tracker import register
+ register(self._semlock.name)
+ util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
+ exitpriority=0)
+
+ @staticmethod
+ def _cleanup(name):
+ from .semaphore_tracker import unregister
+ sem_unlink(name)
+ unregister(name)
def _make_methods(self):
self.acquire = self._semlock.acquire
@@ -65,15 +99,24 @@ class SemLock(object):
return self._semlock.__exit__(*args)
def __getstate__(self):
- assert_spawning(self)
+ popen.assert_spawning(self)
sl = self._semlock
- return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
+ if sys.platform == 'win32':
+ h = popen.get_spawning_popen().duplicate_for_child(sl.handle)
+ else:
+ h = sl.handle
+ return (h, sl.kind, sl.maxvalue, sl.name)
def __setstate__(self, state):
self._semlock = _multiprocessing.SemLock._rebuild(*state)
- debug('recreated blocker with handle %r' % state[0])
+ util.debug('recreated blocker with handle %r' % state[0])
self._make_methods()
+ @staticmethod
+ def _make_name():
+ return '/%s-%s' % (process.current_process()._config['semprefix'],
+ next(SemLock._rand))
+
#
# Semaphore
#
@@ -122,7 +165,7 @@ class Lock(SemLock):
def __repr__(self):
try:
if self._semlock._is_mine():
- name = current_process().name
+ name = process.current_process().name
if threading.current_thread().name != 'MainThread':
name += '|' + threading.current_thread().name
elif self._semlock._get_value() == 1:
@@ -147,7 +190,7 @@ class RLock(SemLock):
def __repr__(self):
try:
if self._semlock._is_mine():
- name = current_process().name
+ name = process.current_process().name
if threading.current_thread().name != 'MainThread':
name += '|' + threading.current_thread().name
count = self._semlock._count()
@@ -175,7 +218,7 @@ class Condition(object):
self._make_methods()
def __getstate__(self):
- assert_spawning(self)
+ popen.assert_spawning(self)
return (self._lock, self._sleeping_count,
self._woken_count, self._wait_semaphore)
@@ -342,7 +385,7 @@ class Barrier(threading.Barrier):
def __init__(self, parties, action=None, timeout=None):
import struct
- from multiprocessing.heap import BufferWrapper
+ from .heap import BufferWrapper
wrapper = BufferWrapper(struct.calcsize('i') * 2)
cond = Condition()
self.__setstate__((parties, action, timeout, cond, wrapper))
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
index f5862b49a40..d9e4799d0a6 100644
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -17,13 +17,13 @@ import threading # we want threading to install it's
# cleanup function before multiprocessing does
from subprocess import _args_from_interpreter_flags
-from multiprocessing.process import current_process, active_children
+from . import process
__all__ = [
'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
'log_to_stderr', 'get_temp_dir', 'register_after_fork',
'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
- 'SUBDEBUG', 'SUBWARNING',
+ 'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
]
#
@@ -71,8 +71,6 @@ def get_logger():
_logger = logging.getLogger(LOGGER_NAME)
_logger.propagate = 0
- logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
- logging.addLevelName(SUBWARNING, 'SUBWARNING')
# XXX multiprocessing should cleanup before logging
if hasattr(atexit, 'unregister'):
@@ -111,13 +109,14 @@ def log_to_stderr(level=None):
def get_temp_dir():
# get name of a temp directory which will be automatically cleaned up
- if current_process()._tempdir is None:
+ tempdir = process.current_process()._config.get('tempdir')
+ if tempdir is None:
import shutil, tempfile
tempdir = tempfile.mkdtemp(prefix='pymp-')
info('created temp directory %s', tempdir)
Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
- current_process()._tempdir = tempdir
- return current_process()._tempdir
+ process.current_process()._config['tempdir'] = tempdir
+ return tempdir
#
# Support for reinitialization of objects when bootstrapping a child process
@@ -273,8 +272,8 @@ def is_exiting():
_exiting = False
def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
- active_children=active_children,
- current_process=current_process):
+ active_children=process.active_children,
+ current_process=process.current_process):
# We hold on to references to functions in the arglist due to the
# situation described below, where this function is called after this
# module's globals are destroyed.
@@ -303,7 +302,7 @@ def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
# #9207.
for p in active_children():
- if p._daemonic:
+ if p.daemon:
info('calling terminate() for daemon %s', p.name)
p._popen.terminate()
@@ -335,3 +334,54 @@ class ForkAwareLocal(threading.local):
register_after_fork(self, lambda obj : obj.__dict__.clear())
def __reduce__(self):
return type(self), ()
+
+#
+# Close fds except those specified
+#
+
+try:
+ MAXFD = os.sysconf("SC_OPEN_MAX")
+except Exception:
+ MAXFD = 256
+
+def close_all_fds_except(fds):
+ fds = list(fds) + [-1, MAXFD]
+ fds.sort()
+ assert fds[-1] == MAXFD, 'fd too large'
+ for i in range(len(fds) - 1):
+ os.closerange(fds[i]+1, fds[i+1])
+
+#
+# Start a program with only specified fds kept open
+#
+
+def spawnv_passfds(path, args, passfds):
+ import _posixsubprocess, fcntl
+ passfds = sorted(passfds)
+ tmp = []
+ # temporarily unset CLOEXEC on passed fds
+ for fd in passfds:
+ flag = fcntl.fcntl(fd, fcntl.F_GETFD)
+ if flag & fcntl.FD_CLOEXEC:
+ fcntl.fcntl(fd, fcntl.F_SETFD, flag & ~fcntl.FD_CLOEXEC)
+ tmp.append((fd, flag))
+ errpipe_read, errpipe_write = _posixsubprocess.cloexec_pipe()
+ try:
+ return _posixsubprocess.fork_exec(
+ args, [os.fsencode(path)], True, passfds, None, None,
+ -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
+ False, False, None)
+ finally:
+ os.close(errpipe_read)
+ os.close(errpipe_write)
+ # reset CLOEXEC where necessary
+ for fd, flag in tmp:
+ fcntl.fcntl(fd, fcntl.F_SETFD, flag)
+
+#
+# Return pipe with CLOEXEC set on fds
+#
+
+def pipe():
+ import _posixsubprocess
+ return _posixsubprocess.cloexec_pipe()
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 2a6381dac37..f777edc6bea 100644
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -43,7 +43,7 @@ from multiprocessing import util
try:
from multiprocessing import reduction
- HAS_REDUCTION = True
+ HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
except ImportError:
HAS_REDUCTION = False
@@ -99,6 +99,9 @@ try:
except:
MAXFD = 256
+# To speed up tests when using the forkserver, we can preload these:
+PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
+
#
# Some tests require ctypes
#
@@ -330,7 +333,6 @@ class _TestProcess(BaseTestCase):
@classmethod
def _test_recursion(cls, wconn, id):
- from multiprocessing import forking
wconn.send(id)
if len(id) < 2:
for i in range(2):
@@ -378,7 +380,7 @@ class _TestProcess(BaseTestCase):
self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
event.set()
p.join()
- self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
+ self.assertTrue(wait_for_handle(sentinel, timeout=1))
#
#
@@ -2493,7 +2495,7 @@ class _TestPicklingConnections(BaseTestCase):
@classmethod
def tearDownClass(cls):
- from multiprocessing.reduction import resource_sharer
+ from multiprocessing import resource_sharer
resource_sharer.stop(timeout=5)
@classmethod
@@ -2807,30 +2809,40 @@ class _TestFinalize(BaseTestCase):
# Test that from ... import * works for each module
#
-class _TestImportStar(BaseTestCase):
+class _TestImportStar(unittest.TestCase):
- ALLOWED_TYPES = ('processes',)
+ def get_module_names(self):
+ import glob
+ folder = os.path.dirname(multiprocessing.__file__)
+ pattern = os.path.join(folder, '*.py')
+ files = glob.glob(pattern)
+ modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
+ modules = ['multiprocessing.' + m for m in modules]
+ modules.remove('multiprocessing.__init__')
+ modules.append('multiprocessing')
+ return modules
def test_import(self):
- modules = [
- 'multiprocessing', 'multiprocessing.connection',
- 'multiprocessing.heap', 'multiprocessing.managers',
- 'multiprocessing.pool', 'multiprocessing.process',
- 'multiprocessing.synchronize', 'multiprocessing.util'
- ]
-
- if HAS_REDUCTION:
- modules.append('multiprocessing.reduction')
+ modules = self.get_module_names()
+ if sys.platform == 'win32':
+ modules.remove('multiprocessing.popen_fork')
+ modules.remove('multiprocessing.popen_forkserver')
+ modules.remove('multiprocessing.popen_spawn_posix')
+ else:
+ modules.remove('multiprocessing.popen_spawn_win32')
+ if not HAS_REDUCTION:
+ modules.remove('multiprocessing.popen_forkserver')
- if c_int is not None:
+ if c_int is None:
# This module requires _ctypes
- modules.append('multiprocessing.sharedctypes')
+ modules.remove('multiprocessing.sharedctypes')
for name in modules:
__import__(name)
mod = sys.modules[name]
+ self.assertTrue(hasattr(mod, '__all__'), name)
- for attr in getattr(mod, '__all__', ()):
+ for attr in mod.__all__:
self.assertTrue(
hasattr(mod, attr),
'%r does not have attribute %r' % (mod, attr)
@@ -2953,131 +2965,6 @@ class TestInvalidHandle(unittest.TestCase):
self.assertRaises((ValueError, OSError),
multiprocessing.connection.Connection, -1)
-#
-# Functions used to create test cases from the base ones in this module
-#
-
-def create_test_cases(Mixin, type):
- result = {}
- glob = globals()
- Type = type.capitalize()
- ALL_TYPES = {'processes', 'threads', 'manager'}
-
- for name in list(glob.keys()):
- if name.startswith('_Test'):
- base = glob[name]
- assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
- if type in base.ALLOWED_TYPES:
- newname = 'With' + Type + name[1:]
- class Temp(base, Mixin, unittest.TestCase):
- pass
- result[newname] = Temp
- Temp.__name__ = Temp.__qualname__ = newname
- Temp.__module__ = Mixin.__module__
- return result
-
-#
-# Create test cases
-#
-
-class ProcessesMixin(object):
- TYPE = 'processes'
- Process = multiprocessing.Process
- connection = multiprocessing.connection
- current_process = staticmethod(multiprocessing.current_process)
- active_children = staticmethod(multiprocessing.active_children)
- Pool = staticmethod(multiprocessing.Pool)
- Pipe = staticmethod(multiprocessing.Pipe)
- Queue = staticmethod(multiprocessing.Queue)
- JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
- Lock = staticmethod(multiprocessing.Lock)
- RLock = staticmethod(multiprocessing.RLock)
- Semaphore = staticmethod(multiprocessing.Semaphore)
- BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
- Condition = staticmethod(multiprocessing.Condition)
- Event = staticmethod(multiprocessing.Event)
- Barrier = staticmethod(multiprocessing.Barrier)
- Value = staticmethod(multiprocessing.Value)
- Array = staticmethod(multiprocessing.Array)
- RawValue = staticmethod(multiprocessing.RawValue)
- RawArray = staticmethod(multiprocessing.RawArray)
-
-testcases_processes = create_test_cases(ProcessesMixin, type='processes')
-globals().update(testcases_processes)
-
-
-class ManagerMixin(object):
- TYPE = 'manager'
- Process = multiprocessing.Process
- Queue = property(operator.attrgetter('manager.Queue'))
- JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
- Lock = property(operator.attrgetter('manager.Lock'))
- RLock = property(operator.attrgetter('manager.RLock'))
- Semaphore = property(operator.attrgetter('manager.Semaphore'))
- BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
- Condition = property(operator.attrgetter('manager.Condition'))
- Event = property(operator.attrgetter('manager.Event'))
- Barrier = property(operator.attrgetter('manager.Barrier'))
- Value = property(operator.attrgetter('manager.Value'))
- Array = property(operator.attrgetter('manager.Array'))
- list = property(operator.attrgetter('manager.list'))
- dict = property(operator.attrgetter('manager.dict'))
- Namespace = property(operator.attrgetter('manager.Namespace'))
-
- @classmethod
- def Pool(cls, *args, **kwds):
- return cls.manager.Pool(*args, **kwds)
-
- @classmethod
- def setUpClass(cls):
- cls.manager = multiprocessing.Manager()
-
- @classmethod
- def tearDownClass(cls):
- # only the manager process should be returned by active_children()
- # but this can take a bit on slow machines, so wait a few seconds
- # if there are other children too (see #17395)
- t = 0.01
- while len(multiprocessing.active_children()) > 1 and t < 5:
- time.sleep(t)
- t *= 2
- gc.collect() # do garbage collection
- if cls.manager._number_of_objects() != 0:
- # This is not really an error since some tests do not
- # ensure that all processes which hold a reference to a
- # managed object have been joined.
- print('Shared objects which still exist at manager shutdown:')
- print(cls.manager._debug_info())
- cls.manager.shutdown()
- cls.manager.join()
- cls.manager = None
-
-testcases_manager = create_test_cases(ManagerMixin, type='manager')
-globals().update(testcases_manager)
-
-
-class ThreadsMixin(object):
- TYPE = 'threads'
- Process = multiprocessing.dummy.Process
- connection = multiprocessing.dummy.connection
- current_process = staticmethod(multiprocessing.dummy.current_process)
- active_children = staticmethod(multiprocessing.dummy.active_children)
- Pool = staticmethod(multiprocessing.Pool)
- Pipe = staticmethod(multiprocessing.dummy.Pipe)
- Queue = staticmethod(multiprocessing.dummy.Queue)
- JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
- Lock = staticmethod(multiprocessing.dummy.Lock)
- RLock = staticmethod(multiprocessing.dummy.RLock)
- Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
- BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
- Condition = staticmethod(multiprocessing.dummy.Condition)
- Event = staticmethod(multiprocessing.dummy.Event)
- Barrier = staticmethod(multiprocessing.dummy.Barrier)
- Value = staticmethod(multiprocessing.dummy.Value)
- Array = staticmethod(multiprocessing.dummy.Array)
-
-testcases_threads = create_test_cases(ThreadsMixin, type='threads')
-globals().update(testcases_threads)
class OtherTest(unittest.TestCase):
@@ -3427,7 +3314,7 @@ class TestFlags(unittest.TestCase):
def test_flags(self):
import json, subprocess
# start child process using unusual flags
- prog = ('from test.test_multiprocessing import TestFlags; ' +
+ prog = ('from test._test_multiprocessing import TestFlags; ' +
'TestFlags.run_in_child()')
data = subprocess.check_output(
[sys.executable, '-E', '-S', '-O', '-c', prog])
@@ -3474,13 +3361,14 @@ class TestTimeouts(unittest.TestCase):
class TestNoForkBomb(unittest.TestCase):
def test_noforkbomb(self):
+ sm = multiprocessing.get_start_method()
name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
- if WIN32:
- rc, out, err = test.script_helper.assert_python_failure(name)
+ if sm != 'fork':
+ rc, out, err = test.script_helper.assert_python_failure(name, sm)
self.assertEqual('', out.decode('ascii'))
self.assertIn('RuntimeError', err.decode('ascii'))
else:
- rc, out, err = test.script_helper.assert_python_ok(name)
+ rc, out, err = test.script_helper.assert_python_ok(name, sm)
self.assertEqual('123', out.decode('ascii').rstrip())
self.assertEqual('', err.decode('ascii'))
@@ -3514,6 +3402,72 @@ class TestForkAwareThreadLock(unittest.TestCase):
self.assertLessEqual(new_size, old_size)
#
+# Check that non-forked child processes do not inherit unneeded fds/handles
+#
+
+class TestCloseFds(unittest.TestCase):
+
+ def get_high_socket_fd(self):
+ if WIN32:
+ # The child process will not have any socket handles, so
+ # calling socket.fromfd() should produce WSAENOTSOCK even
+ # if there is a handle of the same number.
+ return socket.socket().detach()
+ else:
+ # We want to produce a socket with an fd high enough that a
+ # freshly created child process will not have any fds as high.
+ fd = socket.socket().detach()
+ to_close = []
+ while fd < 50:
+ to_close.append(fd)
+ fd = os.dup(fd)
+ for x in to_close:
+ os.close(x)
+ return fd
+
+ def close(self, fd):
+ if WIN32:
+ socket.socket(fileno=fd).close()
+ else:
+ os.close(fd)
+
+ @classmethod
+ def _test_closefds(cls, conn, fd):
+ try:
+ s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
+ except Exception as e:
+ conn.send(e)
+ else:
+ s.close()
+ conn.send(None)
+
+ def test_closefd(self):
+ if not HAS_REDUCTION:
+ raise unittest.SkipTest('requires fd pickling')
+
+ reader, writer = multiprocessing.Pipe()
+ fd = self.get_high_socket_fd()
+ try:
+ p = multiprocessing.Process(target=self._test_closefds,
+ args=(writer, fd))
+ p.start()
+ writer.close()
+ e = reader.recv()
+ p.join(timeout=5)
+ finally:
+ self.close(fd)
+ writer.close()
+ reader.close()
+
+ if multiprocessing.get_start_method() == 'fork':
+ self.assertIs(e, None)
+ else:
+ WSAENOTSOCK = 10038
+ self.assertIsInstance(e, OSError)
+ self.assertTrue(e.errno == errno.EBADF or
+ e.winerror == WSAENOTSOCK, e)
+
+#
# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
#
@@ -3557,10 +3511,10 @@ class TestIgnoreEINTR(unittest.TestCase):
def handler(signum, frame):
pass
signal.signal(signal.SIGUSR1, handler)
- l = multiprocessing.connection.Listener()
- conn.send(l.address)
- a = l.accept()
- a.send('welcome')
+ with multiprocessing.connection.Listener() as l:
+ conn.send(l.address)
+ a = l.accept()
+ a.send('welcome')
@unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
def test_ignore_listener(self):
@@ -3581,26 +3535,221 @@ class TestIgnoreEINTR(unittest.TestCase):
finally:
conn.close()
+class TestStartMethod(unittest.TestCase):
+ def test_set_get(self):
+ multiprocessing.set_forkserver_preload(PRELOAD)
+ count = 0
+ old_method = multiprocessing.get_start_method()
+ try:
+ for method in ('fork', 'spawn', 'forkserver'):
+ try:
+ multiprocessing.set_start_method(method)
+ except ValueError:
+ continue
+ self.assertEqual(multiprocessing.get_start_method(), method)
+ count += 1
+ finally:
+ multiprocessing.set_start_method(old_method)
+ self.assertGreaterEqual(count, 1)
+
+ def test_get_all(self):
+ methods = multiprocessing.get_all_start_methods()
+ if sys.platform == 'win32':
+ self.assertEqual(methods, ['spawn'])
+ else:
+ self.assertTrue(methods == ['fork', 'spawn'] or
+ methods == ['fork', 'spawn', 'forkserver'])
+
+#
+# Check that killing process does not leak named semaphores
+#
+
+@unittest.skipIf(sys.platform == "win32",
+ "test semantics don't make sense on Windows")
+class TestSemaphoreTracker(unittest.TestCase):
+ def test_semaphore_tracker(self):
+ import subprocess
+ cmd = '''if 1:
+ import multiprocessing as mp, time, os
+ mp.set_start_method("spawn")
+ lock1 = mp.Lock()
+ lock2 = mp.Lock()
+ os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
+ os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
+ time.sleep(10)
+ '''
+ print("\nTestSemaphoreTracker will output warnings a bit like:\n"
+ " ... There appear to be 2 leaked semaphores"
+ " to clean up at shutdown\n"
+ " ... '/mp-03jgqz': [Errno 2] No such file or directory",
+ file=sys.stderr)
+ r, w = os.pipe()
+ p = subprocess.Popen([sys.executable,
+ #'-W', 'ignore:semaphore_tracker',
+ '-c', cmd % (w, w)],
+ pass_fds=[w])
+ os.close(w)
+ with open(r, 'rb', closefd=True) as f:
+ name1 = f.readline().rstrip().decode('ascii')
+ name2 = f.readline().rstrip().decode('ascii')
+ _multiprocessing.sem_unlink(name1)
+ p.terminate()
+ p.wait()
+ time.sleep(1.0)
+ with self.assertRaises(OSError) as ctx:
+ _multiprocessing.sem_unlink(name2)
+ # docs say it should be ENOENT, but OSX seems to give EINVAL
+ self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
+
#
+# Mixins
#
+
+class ProcessesMixin(object):
+ TYPE = 'processes'
+ Process = multiprocessing.Process
+ connection = multiprocessing.connection
+ current_process = staticmethod(multiprocessing.current_process)
+ active_children = staticmethod(multiprocessing.active_children)
+ Pool = staticmethod(multiprocessing.Pool)
+ Pipe = staticmethod(multiprocessing.Pipe)
+ Queue = staticmethod(multiprocessing.Queue)
+ JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
+ Lock = staticmethod(multiprocessing.Lock)
+ RLock = staticmethod(multiprocessing.RLock)
+ Semaphore = staticmethod(multiprocessing.Semaphore)
+ BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
+ Condition = staticmethod(multiprocessing.Condition)
+ Event = staticmethod(multiprocessing.Event)
+ Barrier = staticmethod(multiprocessing.Barrier)
+ Value = staticmethod(multiprocessing.Value)
+ Array = staticmethod(multiprocessing.Array)
+ RawValue = staticmethod(multiprocessing.RawValue)
+ RawArray = staticmethod(multiprocessing.RawArray)
+
+
+class ManagerMixin(object):
+ TYPE = 'manager'
+ Process = multiprocessing.Process
+ Queue = property(operator.attrgetter('manager.Queue'))
+ JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
+ Lock = property(operator.attrgetter('manager.Lock'))
+ RLock = property(operator.attrgetter('manager.RLock'))
+ Semaphore = property(operator.attrgetter('manager.Semaphore'))
+ BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
+ Condition = property(operator.attrgetter('manager.Condition'))
+ Event = property(operator.attrgetter('manager.Event'))
+ Barrier = property(operator.attrgetter('manager.Barrier'))
+ Value = property(operator.attrgetter('manager.Value'))
+ Array = property(operator.attrgetter('manager.Array'))
+ list = property(operator.attrgetter('manager.list'))
+ dict = property(operator.attrgetter('manager.dict'))
+ Namespace = property(operator.attrgetter('manager.Namespace'))
+
+ @classmethod
+ def Pool(cls, *args, **kwds):
+ return cls.manager.Pool(*args, **kwds)
+
+ @classmethod
+ def setUpClass(cls):
+ cls.manager = multiprocessing.Manager()
+
+ @classmethod
+ def tearDownClass(cls):
+ # only the manager process should be returned by active_children()
+ # but this can take a bit on slow machines, so wait a few seconds
+ # if there are other children too (see #17395)
+ t = 0.01
+ while len(multiprocessing.active_children()) > 1 and t < 5:
+ time.sleep(t)
+ t *= 2
+ gc.collect() # do garbage collection
+ if cls.manager._number_of_objects() != 0:
+ # This is not really an error since some tests do not
+ # ensure that all processes which hold a reference to a
+ # managed object have been joined.
+ print('Shared objects which still exist at manager shutdown:')
+ print(cls.manager._debug_info())
+ cls.manager.shutdown()
+ cls.manager.join()
+ cls.manager = None
+
+
+class ThreadsMixin(object):
+ TYPE = 'threads'
+ Process = multiprocessing.dummy.Process
+ connection = multiprocessing.dummy.connection
+ current_process = staticmethod(multiprocessing.dummy.current_process)
+ active_children = staticmethod(multiprocessing.dummy.active_children)
+ Pool = staticmethod(multiprocessing.Pool)
+ Pipe = staticmethod(multiprocessing.dummy.Pipe)
+ Queue = staticmethod(multiprocessing.dummy.Queue)
+ JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
+ Lock = staticmethod(multiprocessing.dummy.Lock)
+ RLock = staticmethod(multiprocessing.dummy.RLock)
+ Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
+ BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
+ Condition = staticmethod(multiprocessing.dummy.Condition)
+ Event = staticmethod(multiprocessing.dummy.Event)
+ Barrier = staticmethod(multiprocessing.dummy.Barrier)
+ Value = staticmethod(multiprocessing.dummy.Value)
+ Array = staticmethod(multiprocessing.dummy.Array)
+
+#
+# Functions used to create test cases from the base ones in this module
#
-def setUpModule():
- if sys.platform.startswith("linux"):
- try:
- lock = multiprocessing.RLock()
- except OSError:
- raise unittest.SkipTest("OSError raises on RLock creation, "
- "see issue 3111!")
- check_enough_semaphores()
- util.get_temp_dir() # creates temp directory for use by all processes
- multiprocessing.get_logger().setLevel(LOG_LEVEL)
+def install_tests_in_module_dict(remote_globs, start_method):
+ __module__ = remote_globs['__name__']
+ local_globs = globals()
+ ALL_TYPES = {'processes', 'threads', 'manager'}
+ for name, base in local_globs.items():
+ if not isinstance(base, type):
+ continue
+ if issubclass(base, BaseTestCase):
+ if base is BaseTestCase:
+ continue
+ assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
+ for type_ in base.ALLOWED_TYPES:
+ newname = 'With' + type_.capitalize() + name[1:]
+ Mixin = local_globs[type_.capitalize() + 'Mixin']
+ class Temp(base, Mixin, unittest.TestCase):
+ pass
+ Temp.__name__ = Temp.__qualname__ = newname
+ Temp.__module__ = __module__
+ remote_globs[newname] = Temp
+ elif issubclass(base, unittest.TestCase):
+ class Temp(base, object):
+ pass
+ Temp.__name__ = Temp.__qualname__ = name
+ Temp.__module__ = __module__
+ remote_globs[name] = Temp
-def tearDownModule():
- # pause a bit so we don't get warning about dangling threads/processes
- time.sleep(0.5)
+ def setUpModule():
+ multiprocessing.set_forkserver_preload(PRELOAD)
+ remote_globs['old_start_method'] = multiprocessing.get_start_method()
+ try:
+ multiprocessing.set_start_method(start_method)
+ except ValueError:
+ raise unittest.SkipTest(start_method +
+ ' start method not supported')
+ print('Using start method %r' % multiprocessing.get_start_method())
+ if sys.platform.startswith("linux"):
+ try:
+ lock = multiprocessing.RLock()
+ except OSError:
+ raise unittest.SkipTest("OSError raises on RLock creation, "
+ "see issue 3111!")
+ check_enough_semaphores()
+ util.get_temp_dir() # creates temp directory
+ multiprocessing.get_logger().setLevel(LOG_LEVEL)
+
+ def tearDownModule():
+ multiprocessing.set_start_method(remote_globs['old_start_method'])
+ # pause a bit so we don't get warning about dangling threads/processes
+ time.sleep(0.5)
-if __name__ == '__main__':
- unittest.main()
+ remote_globs['setUpModule'] = setUpModule
+ remote_globs['tearDownModule'] = tearDownModule
diff --git a/Lib/test/mp_fork_bomb.py b/Lib/test/mp_fork_bomb.py
index 908afe3045f..017e010ba0e 100644
--- a/Lib/test/mp_fork_bomb.py
+++ b/Lib/test/mp_fork_bomb.py
@@ -7,6 +7,11 @@ def foo():
# correctly on Windows. However, we should get a RuntimeError rather
# than the Windows equivalent of a fork bomb.
+if len(sys.argv) > 1:
+ multiprocessing.set_start_method(sys.argv[1])
+else:
+ multiprocessing.set_start_method('spawn')
+
p = multiprocessing.Process(target=foo)
p.start()
p.join()
diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py
index c8bbcb27bcb..b9945d7bc2b 100755
--- a/Lib/test/regrtest.py
+++ b/Lib/test/regrtest.py
@@ -149,7 +149,7 @@ try:
except ImportError:
threading = None
try:
- import multiprocessing.process
+ import _multiprocessing, multiprocessing.process
except ImportError:
multiprocessing = None
diff --git a/Lib/test/test_multiprocessing_fork.py b/Lib/test/test_multiprocessing_fork.py
new file mode 100644
index 00000000000..2bf4e756449
--- /dev/null
+++ b/Lib/test/test_multiprocessing_fork.py
@@ -0,0 +1,7 @@
+import unittest
+import test._test_multiprocessing
+
+test._test_multiprocessing.install_tests_in_module_dict(globals(), 'fork')
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/Lib/test/test_multiprocessing_forkserver.py b/Lib/test/test_multiprocessing_forkserver.py
new file mode 100644
index 00000000000..193a04a5fcb
--- /dev/null
+++ b/Lib/test/test_multiprocessing_forkserver.py
@@ -0,0 +1,7 @@
+import unittest
+import test._test_multiprocessing
+
+test._test_multiprocessing.install_tests_in_module_dict(globals(), 'forkserver')
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/Lib/test/test_multiprocessing_spawn.py b/Lib/test/test_multiprocessing_spawn.py
new file mode 100644
index 00000000000..334ae9e8f7a
--- /dev/null
+++ b/Lib/test/test_multiprocessing_spawn.py
@@ -0,0 +1,7 @@
+import unittest
+import test._test_multiprocessing
+
+test._test_multiprocessing.install_tests_in_module_dict(globals(), 'spawn')
+
+if __name__ == '__main__':
+ unittest.main()
diff --git a/Makefile.pre.in b/Makefile.pre.in
index 881d945427a..ca404cd7630 100644
--- a/Makefile.pre.in
+++ b/Makefile.pre.in
@@ -938,7 +938,9 @@ buildbottest: all platform
QUICKTESTOPTS= $(TESTOPTS) -x test_subprocess test_io test_lib2to3 \
test_multibytecodec test_urllib2_localnet test_itertools \
- test_multiprocessing test_mailbox test_socket test_poll \
+ test_multiprocessing_fork test_multiprocessing_spawn \
+ test_multiprocessing_forkserver \
+ test_mailbox test_socket test_poll \
test_select test_zipfile test_concurrent_futures
quicktest: all platform
$(TESTRUNNER) $(QUICKTESTOPTS)
diff --git a/Modules/_multiprocessing/multiprocessing.c b/Modules/_multiprocessing/multiprocessing.c
index a4c4b2d38c0..30cb5eb4511 100644
--- a/Modules/_multiprocessing/multiprocessing.c
+++ b/Modules/_multiprocessing/multiprocessing.c
@@ -126,6 +126,7 @@ static PyMethodDef module_methods[] = {
{"recv", multiprocessing_recv, METH_VARARGS, ""},
{"send", multiprocessing_send, METH_VARARGS, ""},
#endif
+ {"sem_unlink", _PyMp_sem_unlink, METH_VARARGS, ""},
{NULL}
};
diff --git a/Modules/_multiprocessing/multiprocessing.h b/Modules/_multiprocessing/multiprocessing.h
index 68a59844b9a..9aeea8d618f 100644
--- a/Modules/_multiprocessing/multiprocessing.h
+++ b/Modules/_multiprocessing/multiprocessing.h
@@ -98,5 +98,6 @@ PyObject *_PyMp_SetError(PyObject *Type, int num);
*/
extern PyTypeObject _PyMp_SemLockType;
+extern PyObject *_PyMp_sem_unlink(PyObject *ignore, PyObject *args);
#endif /* MULTIPROCESSING_H */
diff --git a/Modules/_multiprocessing/semaphore.c b/Modules/_multiprocessing/semaphore.c
index dcf3b42bc10..de85a90d472 100644
--- a/Modules/_multiprocessing/semaphore.c
+++ b/Modules/_multiprocessing/semaphore.c
@@ -18,6 +18,7 @@ typedef struct {
int count;
int maxvalue;
int kind;
+ char *name;
} SemLockObject;
#define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid)
@@ -397,7 +398,8 @@ semlock_release(SemLockObject *self, PyObject *args)
*/
static PyObject *
-newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue)
+newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue,
+ char *name)
{
SemLockObject *self;
@@ -409,21 +411,22 @@ newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue)
self->count = 0;
self->last_tid = 0;
self->maxvalue = maxvalue;
+ self->name = name;
return (PyObject*)self;
}
static PyObject *
semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
- char buffer[256];
SEM_HANDLE handle = SEM_FAILED;
- int kind, maxvalue, value;
+ int kind, maxvalue, value, unlink;
PyObject *result;
- static char *kwlist[] = {"kind", "value", "maxvalue", NULL};
- static int counter = 0;
+ char *name, *name_copy = NULL;
+ static char *kwlist[] = {"kind", "value", "maxvalue", "name", "unlink",
+ NULL};
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "iii", kwlist,
- &kind, &value, &maxvalue))
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "iiisi", kwlist,
+ &kind, &value, &maxvalue, &name, &unlink))
return NULL;
if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) {
@@ -431,18 +434,23 @@ semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
return NULL;
}
- PyOS_snprintf(buffer, sizeof(buffer), "/mp%ld-%d", (long)getpid(), counter++);
+ if (!unlink) {
+ name_copy = PyMem_Malloc(strlen(name) + 1);
+ if (name_copy == NULL)
+ goto failure;
+ strcpy(name_copy, name);
+ }
SEM_CLEAR_ERROR();
- handle = SEM_CREATE(buffer, value, maxvalue);
+ handle = SEM_CREATE(name, value, maxvalue);
/* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */
if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0)
goto failure;
- if (SEM_UNLINK(buffer) < 0)
+ if (unlink && SEM_UNLINK(name) < 0)
goto failure;
- result = newsemlockobject(type, handle, kind, maxvalue);
+ result = newsemlockobject(type, handle, kind, maxvalue, name_copy);
if (!result)
goto failure;
@@ -451,6 +459,7 @@ semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
failure:
if (handle != SEM_FAILED)
SEM_CLOSE(handle);
+ PyMem_Free(name_copy);
_PyMp_SetError(NULL, MP_STANDARD_ERROR);
return NULL;
}
@@ -460,12 +469,30 @@ semlock_rebuild(PyTypeObject *type, PyObject *args)
{
SEM_HANDLE handle;
int kind, maxvalue;
+ char *name, *name_copy = NULL;
- if (!PyArg_ParseTuple(args, F_SEM_HANDLE "ii",
- &handle, &kind, &maxvalue))
+ if (!PyArg_ParseTuple(args, F_SEM_HANDLE "iiz",
+ &handle, &kind, &maxvalue, &name))
return NULL;
- return newsemlockobject(type, handle, kind, maxvalue);
+ if (name != NULL) {
+ name_copy = PyMem_Malloc(strlen(name) + 1);
+ if (name_copy == NULL)
+ return PyErr_NoMemory();
+ strcpy(name_copy, name);
+ }
+
+#ifndef MS_WINDOWS
+ if (name != NULL) {
+ handle = sem_open(name, 0);
+ if (handle == SEM_FAILED) {
+ PyMem_Free(name_copy);
+ return PyErr_SetFromErrno(PyExc_OSError);
+ }
+ }
+#endif
+
+ return newsemlockobject(type, handle, kind, maxvalue, name_copy);
}
static void
@@ -473,6 +500,7 @@ semlock_dealloc(SemLockObject* self)
{
if (self->handle != SEM_FAILED)
SEM_CLOSE(self->handle);
+ PyMem_Free(self->name);
PyObject_Del(self);
}
@@ -574,6 +602,8 @@ static PyMemberDef semlock_members[] = {
""},
{"maxvalue", T_INT, offsetof(SemLockObject, maxvalue), READONLY,
""},
+ {"name", T_STRING, offsetof(SemLockObject, name), READONLY,
+ ""},
{NULL}
};
@@ -621,3 +651,23 @@ PyTypeObject _PyMp_SemLockType = {
/* tp_alloc */ 0,
/* tp_new */ semlock_new,
};
+
+/*
+ * Function to unlink semaphore names
+ */
+
+PyObject *
+_PyMp_sem_unlink(PyObject *ignore, PyObject *args)
+{
+ char *name;
+
+ if (!PyArg_ParseTuple(args, "s", &name))
+ return NULL;
+
+ if (SEM_UNLINK(name) < 0) {
+ _PyMp_SetError(NULL, MP_STANDARD_ERROR);
+ return NULL;
+ }
+
+ Py_RETURN_NONE;
+}