Text Practice Mode

Python. Multiprocessing — Process-based “threading” interface

created Jul 2nd 2016, 08:39 by



1375 words
2 completed
multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows.
The multiprocessing module also introduces APIs which do not have analogs in the threading module. A prime example of this is the Pool object which offers a convenient means of parallelizing the execution of a function across multiple input values, distributing the input data across processes (data parallelism). The following example demonstrates the common practice of defining such functions in a module so that child processes can successfully import that module. This basic example of data parallelism using Pool,
from multiprocessing import Pool
def f(x):
    return x*x
if __name__ == '__main__':
    p = Pool(5)
    print(p.map(f, [1, 2, 3]))
will print to standard output
[1, 4, 9]
The Process class
In multiprocessing, processes are spawned by creating a Process object and then calling its start() method. Process follows the API of threading.Thread. A trivial example of a multiprocess program is
from multiprocessing import Process
def f(name):
    print 'hello', name
if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
To show the individual process IDs involved, here is an expanded example:
from multiprocessing import Process
import os
def info(title):
    print title
    print 'module name:', __name__
    if hasattr(os, 'getppid'):  # only available on Unix
        print 'parent process:', os.getppid()
    print 'process id:', os.getpid()
def f(name):
    info('function f')
    print 'hello', name
if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
For an explanation of why (on Windows) the if __name__ == '__main__' part is necessary, see Programming guidelines.
Exchanging objects between processes
multiprocessing supports two types of communication channel between processes:
The Queue class is a near clone of Queue.Queue. For example:
from multiprocessing import Process, Queue
def f(q):
    q.put([42, None, 'hello'])
if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    print q.get()    # prints "[42, None, 'hello']"
Queues are thread and process safe.
The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
from multiprocessing import Process, Pipe
def f(conn):
    conn.send([42, None, 'hello'])
if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    print parent_conn.recv()   # prints "[42, None, 'hello']"
The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
Synchronization between processes
multiprocessing contains equivalents of all the synchronization primitives from threading. For instance one can use a lock to ensure that only one process prints to standard output at a time:
from multiprocessing import Process, Lock
def f(l, i):
    print 'hello world', i
if __name__ == '__main__':
    lock = Lock()
    for num in range(10):
        Process(target=f, args=(lock, num)).start()
Without using the lock output from the different processes is liable to get all mixed up.
Sharing state between processes
As mentioned above, when doing concurrent programming it is usually best to avoid using shared state as far as possible. This is particularly true when using multiple processes.
However, if you really do need to use some shared data then multiprocessing provides a couple of ways of doing so.
Shared memory
Data can be stored in a shared memory map using Value or Array. For example, the following code
from multiprocessing import Process, Value, Array
def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]
if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))
    p = Process(target=f, args=(num, arr))
    print num.value
    print arr[:]
will print
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
The 'd' and 'i' arguments used when creating num and arr are typecodes of the kind used by the array module: 'd' indicates a double precision float and 'i' indicates a signed integer. These shared objects will be process and thread-safe.
For more flexibility in using shared memory one can use the multiprocessing.sharedctypes module which supports the creation of arbitrary ctypes objects allocated from shared memory.
Server process
A manager object returned by Manager() controls a server process which holds Python objects and allows other processes to manipulate them using proxies.
A manager returned by Manager() will support types list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Queue, Value and Array. For example,
from multiprocessing import Process, Manager
def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
if __name__ == '__main__':
    manager = Manager()
    d = manager.dict()
    l = manager.list(range(10))
    p = Process(target=f, args=(d, l))
    print d
    print l
will print
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Server process managers are more flexible than using shared memory objects because they can be made to support arbitrary object types. Also, a single manager can be shared by processes on different computers over a network. They are, however, slower than using shared memory.
Using a pool of workers
The Pool class represents a pool of worker processes. It has methods which allows tasks to be offloaded to the worker processes in a few different ways.
For example:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
    return x*x
if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    # print "[0, 1, 4,..., 81]"
    print pool.map(f, range(10))
    # print same numbers in arbitrary order
    for i in pool.imap_unordered(f, range(10)):
        print i
    # evaluate "f(20)" asynchronously
    res = pool.apply_async(f, (20,))      # runs in *only* one process
    print res.get(timeout=1)              # prints "400"
    # evaluate "os.getpid()" asynchronously
    res = pool.apply_async(os.getpid, ()) # runs in *only* one process
    print res.get(timeout=1)              # prints the PID of that process
    # launching multiple evaluations asynchronously *may* use more processes
    multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
    print [res.get(timeout=1) for res in multiple_results]
    # make a single worker sleep for 10 secs
    res = pool.apply_async(time.sleep, (10,))
        print res.get(timeout=1)
    except TimeoutError:
        print "We lacked patience and got a multiprocessing.TimeoutError"
Note that the methods of a pool should only ever be used by the process which created it.
Note Functionality within this package requires that the __main__ module be importable by the children. This is covered in Programming guidelines however it is worth pointing out here. This means that some examples, such as the Pool examples will not work in the interactive interpreter. For example:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
>>> p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(If you try this it will actually output three full tracebacks interleaved in a semi-random fashion, and then you may have to stop the master process somehow.)
The multiprocessing package mostly replicates the API of the threading module.
Process and exceptions
class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={})
Process objects represent activity that is run in a separate process. The Process class has equivalents of all the methods of threading.Thread.
The constructor should always be called with keyword arguments. group should always be None; it exists solely for compatibility with threading.Thread. target is the callable object to be invoked by the run() method. It defaults to None, meaning nothing is called. name is the process name. By default, a unique name is constructed of the form ‘Process-N1:N2:...:Nk‘ where N1,N2,...,Nk is a sequence of integers whose length is determined by the generation of the process. args is the argument tuple for the target invocation. kwargs is a dictionary of keyword arguments for the target invocation. By default, no arguments are passed to target.

saving score / loading statistics ...