By Gabriel Genest
[TOC]
Multiprocessing is what we call splitting our work on different cores on our computers. Cores are the basis of the processor. They are independent units that can do basic arithmetic and logical operations. For example, our operating systems natively use multiprocessing to do some work at the same time, like running many applications simultaneously.
Multiprocessing is similar to multithreading. In fact, when using multiprocessing, the programming language or the OS will use all the possible cores (or as many as we specified) and will also use all the threads associated. For example, a computer with 16 cores will most likely have up to 32 threads. Like I said earlier, cores are independent so they can run at the same time. On the other hand, a core can have a certain number of threads (like 2 in the previous example) and they are not independent. This means that they don't really run simultaneously. The way they work is more pseudo-simultaneous. If two threads have to do a certain job, the operating system will allow the first to run for a short period of time (mostly in micro seconds), they will go to the second one and return back to the first, then the cycle continues until the job is done. This then looks like they both run at the same time while it is not the case. In brief, multithreading is not really faster than normal execution, but allows for doing different tasks in a manner that is not sequential (i.e. not one after the other), while multiprocessing allows for simultaneous execution, hence much faster than normal sequential code.
Multiprocessing is not the same as parallelizing the code on a GPU. Multiprocessing is CPU parallelization, we split the jobs on the cores of the CPU, not the cores of the GPU. Working with the CPU is often (if not, all of the time) easier than working with the GPU, because most common programming languages natively work on the CPU, so in order to work on the GPU we often need to tweak our code and use third party libraries because each company building GPUs has its own architecture and methods to work with graphics cards. The same could be said for the CPU, i.e. Intel, AMD and Apple have different CPU architectures, but programming languages are used to that and do what must be done in order for languages to be able to run on the current CPU. But, if you work with the assembler language, you'll need to know the specifications of the CPU. Even if you work with C or C++ you'll need the right compiler for your OS and CPU architecture. I believe in the near future (maybe 5 to 10 years at maximum), GPU computations will become as easy as CPU computation. It is already far easier than before with NVIDIA's CUDA and other libraries like Open-CL. Some packages already provide an easy interface with CUDA or Open-CL. For example, cupy is the GPU version of numpy, while numba can be used to parallelize code (either CPU or GPU) without too many issues (understanding numba is another thing). In brief, GPU parallelization is often MUCH faster than its CPU counterpart, mainly because a GPU has many more cores than the CPU, but GPU computations are better only for certain tasks due to how graphics cards are physically created. In other words, CPU is often better than GPU, because they don't work the same way at the physical and logical level.
Most CPUs are made of cores. Each core has1:
- Data paths (like pipelines and ALUs)
- Control unit which controls data paths
- Memory components (caches and registers)
- Clock to make sure everything runs when it should
- Pad transceiver
- Logic gates to have logical and arithmetic operations
Each component is really important. For example, since most CPUs are synchronous devices, they require a clock to make sure that they run at the right time. This is mainly due to the fact that accessing memory (or receiving/sending instructions) can be complicated and requires a real physical current which is not instantaneous. We must wait some time to make sure the data is correct and the addresses or instructions are valid. They then must all be synchronize to take that into account and all components must wait for the slowest one, which can be a drawback.
In terms of memory, caches and registers are really small and are there to make sure that the processor does not need to access the main memory unit of the CPU every time it needs something. For example, if we do
Taken from https://www.build-your-own-computer.net/cpu.html
There exists multiple classes of multiprocessing. We won't go into details, after all we are not in a computer sciences courses and I don't know the classes. In terms of memory, there are two classes: shared memory and distributed memory. It is important to know the differences between both, because it can lead to pitfall if we don't. Shared memory is when all cores (or processors) share a large common memory unit. Each processor has its own version of the data. This can lead to problems of memory updates when processors want to update with their own local memory. Distributed memory is when each processor has its own memory unit (not talking about caches here) in which they copy the original data. In modern computers, most CPUs are made of a single chip with multiple processors (multi core processors) and are hence shared memory multiprocessors.
Multiprocessing code is complicated. It is difficult to correctly partition the code, it is difficult to coordinate the execution and it is difficult to communicate between cores. There is also cache coherence problems. Shared memory is not problematic when there are no caches involved. The problem arise when caches get data from the main shared memory unit, then the core does some operations and changes the data. The other caches don't have the updated data, since they too got the data from the shared memory unit and did not received any notification about the change done in the first cache. Some more information about ways to prevent incoherent memory can be found here.
The concept of process is fundamental in multiprocessing (after all, you can't spell multiprocessing without process). It is an instance of a program that is being run by one or many threads. It can be made to execute concurrently (in parallel). An example would be opening the same program many times (for instance Google Chrome) and watching them run independently, or in separate processes. In Python, this can be seen as the Process class which brings a friendly interface to the abstract concept of process (see section [The Process class](#The class))
A thread is the smallest sequence that can be managed independently by the scheduler (par of the OS). A thread can be seen as a component of a process. Multiple threads can be part of a single process and share memory, while processes cannot share memory. When multiple threads are working for a single process, it is said that they are working concurrently. It can be dangerous to use multiple threads on the same data, because not everything is thread safe. For example, two threads may attempt to change the data at the same time, which could cause bugs and such specific problems are not easily reproduced and can be really hard to find. This then brings us to some forms of protection: semaphores and locks.
A semaphore is and object that controls access to shared resources accessible by multiple processes. It can then help to prevent bugs occurring when two or more processes want to update the data at the same time. On the other hand, using semaphores does not guaranty that there won't be any race conditions (when two processes or more want to access the same data). While they keep record of which process controls what, there can still be some issues. For example, if the process is done playing with data and does not inform the semaphore, the data is still seen as occupied. Another problem arises when processes access data without going through the semaphores. They are not exempt of issues. We can still go around the protection, but it is best to not do it.
A lock is a synchronization primitive that limits the access on resources that are currently in use. They work well when every thread work together to acquire and release locks before accessing data. It has some drawbacks. For example, if a process or thread dies before releasing the lock, it can lead to inaccessible resources since the lock can end up broken.
The multiprocessing module is a built-in module already present in Python. It has a simple and efficient API and is quite easy to understand as long as what we need is basic. Most of the time, we will only play with pools of workers and (maybe) shared memory. It is used to spawn processes that are spread across a certain number of CPU cores. It works with Unix and Windows.
Important: we should always use the syntax if __name__ == "main" before calling and creating processes. Otherwise, it can lead to undesired issues. Read this for more info.
It is the basic class to use when we want to create and spawn processes. The syntax is:
multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, daemon=None)
# group should always be None (here for compatibility)
# target: callable to invoke when the Process is started
# name: name of the process. Only for identification, no semantics
# args: arguments to pass to the target
# kwargs: keyword arguments to pass to the target
# daemon: if True, the process will stop when the program finishes. In other words, the Python program stops when all non daemon threads are finished. In some cases, we want the thread to no be terminated until the user says so, hence its value could be False in such cases. By default, inherits the value from the creating processThe class has some key methods that should be known to the reader:
start(): starts the process's activity. It can be not called, but can also only be called once.join(timeout=None): joins the current process. Can be seen as "wait until this process is done". If not called, will execute the following lines. Timeout is the time maximum to wait. IfNone, will wait indefinitely.is_alive(): returns whether the process is still alive. A process if alive from when thestart()method returns until the process is terminated.terminate(): manual way to terminate the process. Should be avoided when using pips or queues since they can become corrupted.close(): closes the current instance while releasing all resources. Raises aValueErrorif the process is still running.
Here is a small example on how to spawn processes:
from multiprocessing import Process
from typing import Union
import time
def show_numbers(numbers):
for number in numbers:
print(f"{number}")
if __name__ == "__main__":
numbers_1 = [2, 4, 6, 8, 10]
numbers_2 = [1, 3, 5, 7, 9]
process_1 = Process(target=show_numbers, args=(numbers_1,))
process_2 = Process(target=show_numbers, args=(numbers_2,))
process_1.start()
process_1.join()
process_2.start()
process_2.join()The output is:
2
4
6
8
10
1
3
5
7
9But if we join them at the same time...
from multiprocessing import Process
from typing import Union
import time
def show_numbers(numbers):
for number in numbers:
print(f"{number}")
if __name__ == "__main__":
numbers_1 = [2, 4, 6, 8, 10]
numbers_2 = [1, 3, 5, 7, 9]
process_1 = Process(target=show_numbers, args=(numbers_1,))
process_2 = Process(target=show_numbers, args=(numbers_2,))
process_1.start()
process_2.start()
process_1.join()
process_2.join()We can sometimes see things like:
2
4
6
18
10
3
5
7
9where both processes printed at the same time (it is not 18, is is 1 followed by 8), which means that they are working at the same time, unlike the first example where process_2 was only started once the first finished.
It can easily and quickly become tedious to create single process one after the other. This is why is it great to have the Pool class to help us. It creates a pool of processes that will carry out tasks provided to the class. What is nice about Pool is that we don't have to handle single processes, they are mostly handled by the class. We can use the with statement to start processes and the class has some useful and different methods to dispatch workers with functions and arguments. The syntax we often use is:
with multiprocessing.Pool(n_workers) as pool:
...where we used the with statement that will help us a lot because we won't have to deal with joining or closing all the processes. It will be handled automatically. The most important methods are:
apply(func, args, kwds)where we give the functionfuncto a single worker of the pool. Ifargsandkwdsare also given,funcreceive them when executing. Usingapplyblocks until the result is ready, but can give more versatility to the jobs we want to do.argsandkwdsare two optional parameters.apply_async(func, args, kwds, ...)is similar toapply, but is asynchronous. Whileapplyblocks until done,apply_asyncalso blocks, but will switch to another process after some time, giving the impression they are working at the same time. This version is preferred when dealing with multiprocessing.map(func, iterable, chunksize)is the parallel equivalent of the built-inmapfunction. It takes a functionfuncand it applies it over an iterableiterable, but the iterable will be splitted and the function will be used by the workers to apply it on their own chunk of the data. We can also specify a chunksize we would like to be used, but it may not be the final one. Also, it is possible to partition the iterable before usingmapwhich could help us have a better control over how many chunks we have and how many elements there are in them. It is also worth noting that the result is an iterable, and the order in the resulting iterable is the same as the order of the processes started. For instance, the first process started will have its results at the beginning of the final iterable. Finally, it is also important to know thatfuncmust only take one argument.map_async(func, iterable, chunksize, ...)is the asynchronous version ofmultiprocessing'smap. The arguments are the same and it works essentially the same, except that each process will work asynchronously with its data (i.e. it will allocate some small amounts of time to each job queued on the process).imap(func, iterable, chunksize)is a "lazier" version ofmap. Compared tomap,imapdoes not convert theiterableargument into a list and the return is a generator (i.e. an iterable that is not "defined") instead of a list.imap_unordered(func, iterable, chunksize)is an unordered version ofimap, which means that the order in the resulting generator are arbitrary. As soon as it completes, it returns the generator without being ordered. As soon as a worker finishes, it yield its result.starmap(func, iterable, chunksize)is the "multi" arguments version ofmap. This means thatiterableis expected to be an iterable of iterables. Each inner iterable will then be unpacked intofunc, sofunccan now have more than one argument. For example,[(0, 1), (2, 3)]will give[func(0, 1), func(2, 3)].starmap_async(func, iterable, chunksize)is the asynchronous version ofstarmapand the "multi" argument version ofmap_async. There is nothing else to say.
My favourite is map because we don't really need to care that much about applying on each process and getting the values ordered in a list. The method does it automatically. If we have multiple arguments, we either must proceed carefully (i.e. join them in an iterable that will be passed to the function, hence looks like a single argument) or we use starmap which can take multiple argument. The latter is preferable, because we don't need to encapsulate our arguments into another iterable, which adds a useless layer to the arguments.
Array and RawArray are two methods used to create what is considered sharedctypes, in other words C types arrays. The former creates an array that can be locked for synchronization, while the second is considered raw because it can't be locked. This is the only difference between the two. We must specify its type and its size.
The same thing can be said about Value and RawValue. Instead of containing an array, they contain a single value. We must also specify its type. Value can be locked, while RawValue cannot. We will see some example on how to use them.
New to Python 3.8 is the shared_memory sub module of multiprocessing. It provides memory for easy and direct access on each spawned process. We will see some examples on how to use it. We will see that is looks like using Array and the other ones, but it is simpler and is prettier.
The following examples will show how to use Pool and its methods when summing on the elements of an array2. The code for the sum is:
def iterativeSum(iterable):
res = 0
for i in iterable:
res += i
return resFirst, we will need to split the (huge) array to sum into sub arrays (kind of divide to conquer approach) with the following method:
def split(container, count):
container = tuple(container)
return [container[i::count] for i in range(count)]*Note that the split method is not unique. We could come up with other definitions or simply use numpy.array_split.
I preferably split everything when I do multiprocessing, because it makes me feel like I control the inputs better. For example, if I want to do a certain thing on multiple images, I will put all the images in a list, then split the list and each process will iterate through its sub list to do the work on its part of the inputs, instead of giving the whole list of images and letting map do the splitting!
from multiprocessing import cpu_count, Pool
# ...
def apply(iterable, nWorkers):
# First, split the data into sub arrays (or sub lists).
# Each process will receive one sub array.
# It will sum on it.
splitted = split(iterable, nWorkers)
with Pool(nWorkers) as pool:
res = list(pool.apply(iterativeSum, (i,)) for i in splitted)
# Every method should have the same sum, we will compare
return sum(res)
def apply_async(iterable, nWorkers):
splitted = split(iterable, nWorkers)
with Pool(nWorkers) as pool:
res = list(pool.apply_async(iterativeSum, (i,)) for i in splitted)
# We can add some code here!
# It should be done *at the same time* as the other processes
# (depending on the order of the jobs in the CPU
# and how they are dispatched)
res = list(r.get() for r in res)
return sum(res)Here we see they are both very similar. In fact, the code for Pool.apply uses Pool.apply_async! The difference is that Pool.apply blocks the main process until every subprocesses are done, while the asynchronous version only blocks when we use get on its results.
# ...
def map_(iterable, nWorkers):
splitted = split(iterable, nWorkers)
with Pool(nWorkers) as pool:
# No need to iterate on the sub lists.
# Map handles it instead of us.
res = pool.map(iterativeSum, splitted)
return sum(res)
def map_async(iterable, nWorkers):
splitted = split(iterable, nWorkers)
with Pool(nWorkers) as pool:
res = pool.map_async(iterativeSum, splitted)
# We can add code here, async does'nt block
res = res.get()
return sum(res)Once again, both methods are really similar, but the asynchronous version doesn't block until we get the results. I personally recommend using map in your code, because it is simpler than working with apply and the differences in time are not that big of a deal (map may even be faster than apply, but maybe not than apply_async).
# ...
def imap(iterable, nWorkers):
splitted = split(iterable, nWorkers)
with Pool(nWorkers) as pool:
res = list(pool.imap(iterativeSum, splitted))
# Watch out! Here, `res` is a generator. This can lead to
# problems in some codes. This is why we created a list.
return sum(res)
def imap_unordered(iterable, nWorkers):
splitted = split(iterable, nWorkers)
with Pool(nWorkers) as pool:
res = list(pool.imap_unordered(iterativeSum, splitted))
return sum(res)We see here that we created lists from the results. This is because imap and imap_unordered return generators. If you can deal with generators, that's fine, but in our case we preferred working with a list in each case.
We will see here how we can use the different methods of memory sharing in some small but interesting cases.
Let's say we have a list of matrices and they all have the same shape (it can be seen as a tensor). We want to compute the determinant of each slice (i.e. of each matrix in the list). One way to do it is sequentially, but for fun let's use some multiprocessing. We will see how we can share the tensor for all the processes used. First, let's generate a tensor of random values. Then, we will need to give the tensor to the Array object. In fact, since we don't modify the array directly, we will use RawArray because we don't need to lock. This is pretty tedious to do, because the number of steps to follow is quite long in my opinion.
import numpy as np
from multiprocessing import Pool, cpu_count, RawArray
__variableDict__ = dict() # Global dict to keep the shared RawArray
def split(container, count):
container = tuple(container)
return [container[i::count] for i in range(count)]
def initWorker(tensor, tensorShape):
# Function used to initialize each worker with the same
# shared array. Putting it in the global dict.
__variableDict__["tensor"] = tensor
__variableDict__["tensorShape"] = tensorShape
def dets(slices):
tensor = __variableDict__["tensor"]
print(id(tensor)) # The objects should be the same
# First, get the values from the RawArray
# We can see it as a buffer, an object keeping data
tensorFromArray = np.frombuffer(tensor).reshape(__variableDict__["tensorShape"])
determinants = []
for s in slices:
determinants.append(np.linalg.det(tensorFromArray[:, :, s]))
return determinants
class Determinants:
def __init__(self, tensor):
self._tensor = tensor
self._tensorShape = self._tensor.shape
self._slices = range(self._tensorShape[-1])
self._nElements = self._tensor.size
def computeDeterminants(self, nCores):
tensorArray = RawArray("d", self._nElements) # Create a RawArray
tensorFromArray = np.frombuffer(tensorArray).reshape(self._tensorShape) # Recreate the tensor from the RawArray
# ^Any changes here will change the RawArray^
np.copyto(tensorFromArray, self._tensor) # Copy the values from the tensor to the *buffer*, i.e. the RawArray
splittedSlices = split(self._slices, nCores)
with Pool(nCores, initializer=initWorker, initargs=(tensorArray, tensor.shape)) as pool:
tempDeterminants = pool.map(dets, splittedSlices)
determinants = []
for subList in tempDeterminants:
determinants.extend(subList)
return determinants
class DeterminantsNotShared:
def __init__(self, tensor):
self._tensor = tensor
self._tensorShape = self._tensor.shape
self._slices = range(self._tensorShape[-1])
self._nElements = self._tensor.size
def dets(self, slices):
determinants = []
print(id(self._tensor)) # The objects should not be the same
for s in slices:
determinants.append(np.linalg.det(self._tensor[:, :, s]))
return determinants
def computeDeterminants(self, nCores):
splittedSlices = split(self._slices, nCores)
with Pool(nCores) as pool:
tempDeterminants = pool.map(self.dets, splittedSlices)
determinants = []
for subList in tempDeterminants:
determinants.extend(subList)
return determinants
if __name__ == '__main__':
nCores = cpu_count()
tensor = np.random.randint(-5, 5, (5, 5, 12)).astype(float)
d = Determinants(tensor)
dets = d.computeDeterminants(nCores)
print(sum(dets))
d_2 = DeterminantsNotShared(tensor)
dets = d_2.computeDeterminants(nCores)
print(sum(dets))
# Sequential, comparing the sum of determinants
sums = 0
for i in range(tensor.shape[-1]):
sums += np.linalg.det(tensor[:, :, i])
print(sums)
The output for the id looks like (the values will most likely be different each time):
2217829570752
2217829570752
2217829570752
2217829570752
2217829570752
2217829570752
2217829570752
2217829570752
2217829570752
2217829570752
2217829570752
2217829570752
for the shared RawArray. This makes sense since it is shared, hence it is exactly the same object. Changing its value in a certain process would change it also in the other ones. For the not shared version, we have something like (the values will most likely be different each time):
2455081188368
2455080799008
2455080796608
2455081223072
2455080799008
2455081188368
2455080796608
2455081223072
2455080799008
2455081188368
2455080796608
2455081223072
We can see that some values are repeated. I don't know how to explain it, but it is clear that some copies are made in the process of dispatching the jobs.
Working with RawArray is not the best in my opinion. Instead, let's work with the submodule shared_memory.
We will still have to deal with buffers, but we won't have to deal with global variables like before. It works with an object called SharedMemory that will keep our data and we will be able to reference it with a name.
import numpy as np
from multiprocessing import Pool, cpu_count
from multiprocessing.shared_memory import SharedMemory
from multiprocessing.managers import SharedMemoryManager
def split(container, count):
container = tuple(container)
return [container[i::count] for i in range(count)]
class Determinants:
def __init__(self, tensor):
self._tensor = tensor
self._tensorShape = self._tensor.shape
# Will have the name of the SharedMemory space
self._tensorSharedMemoryName = None
self._slices = range(self._tensorShape[-1])
def dets(self, slices):
tensor = SharedMemory(self._tensorSharedMemoryName)
print(id(tensor)) # The objects should be the same
# We recreate the array from the buffer with a specific shape
tensorFromSharedMemory = np.ndarray(self._tensorShape, float, buffer=tensor.buf)
determinants = []
for s in slices:
determinants.append(np.linalg.det(tensorFromSharedMemory[:, :, s]))
# We close the access to the SharedMemory space
tensor.close()
return determinants
def computeDeterminants(self, nCores):
# Will deal with closing and unlinking since we use `with`
with SharedMemoryManager() as manager:
sharedMemoryTensor = SharedMemory(create=True, size=self._tensor.nbytes)
# Acess the name
self._tensorSharedMemoryName = sharedMemoryTensor.name
tensor = np.ndarray(self._tensorShape, float, buffer=sharedMemoryTensor.buf)
# Like before, copy the values inside the SharedMemory space
np.copyto(tensor, self._tensor)
splittedSlices = split(self._slices, nCores)
with Pool(nCores) as pool:
tempDeterminants = pool.map(self.dets, splittedSlices)
determinants = []
for subList in tempDeterminants:
determinants.extend(subList)
return determinants
if __name__ == '__main__':
nCores = cpu_count()
tensor = np.random.randint(-5, 5, (5, 5, 12)).astype(float)
d = Determinants(tensor)
dets = d.computeDeterminants(nCores)
print(sum(dets))
sums = 0
for i in range(tensor.shape[-1]):
sums += np.linalg.det(tensor[:, :, i])
print(sums)Let's now consider another shared array, the one that is used as the storage of the determinants:
import numpy as np
from multiprocessing import Pool, cpu_count
from multiprocessing.shared_memory import SharedMemory
from multiprocessing.managers import SharedMemoryManager
def split(container, count):
container = tuple(container)
return [container[i::count] for i in range(count)]
class Determinants:
def __init__(self, tensor):
self._tensor = tensor
self._tensorShape = self._tensor.shape
self._tensorSharedMemoryName = None
self._determinantsName = None
self._slices = range(self._tensorShape[-1])
def dets(self, slices):
tensor = SharedMemory(self._tensorSharedMemoryName)
determinants = SharedMemory(self._determinantsName)
print(tensor, determinants)
tensorFromSharedMemory = np.ndarray(self._tensorShape, float, buffer=tensor.buf)
determinantsFromSharedMemory = np.ndarray((len(self._slices),), float, buffer=determinants.buf)
for s in slices:
determinantsFromSharedMemory[s] = (np.linalg.det(tensorFromSharedMemory[:, :, s]))
tensor.close()
determinants.close()
def computeDeterminants(self, nCores):
determinantsTemps = np.zeros(len(self._slices), float)
with SharedMemoryManager() as manager:
sharedMemoryTensor = SharedMemory(create=True, size=self._tensor.nbytes)
self._tensorSharedMemoryName = sharedMemoryTensor.name
tensor = np.ndarray(self._tensorShape, float, buffer=sharedMemoryTensor.buf)
np.copyto(tensor, self._tensor)
sharedMemoryDeterminants = SharedMemory(create=True, size=determinantsTemps.nbytes)
self._determinantsName = sharedMemoryDeterminants.name
determinants = np.ndarray(determinantsTemps.shape, float, buffer=sharedMemoryDeterminants.buf)
np.copyto(determinants, determinantsTemps)
splittedSlices = split(self._slices, nCores)
with Pool(nCores) as pool:
pool.map(self.dets, splittedSlices)
return determinants.copy()
if __name__ == '__main__':
nCores = cpu_count()
tensor = np.random.randint(-5, 5, (5, 5, 12)).astype(float)
d = Determinants(tensor)
dets = d.computeDeterminants(nCores)
print(sum(dets))
sums = 0
for i in range(tensor.shape[-1]):
sums += np.linalg.det(tensor[:, :, i])
print(sums)And the output of the tensor and determinants objects is:
SharedMemory('wnsm_1a240051', size=4096) SharedMemory('wnsm_6b449165', size=4096)
SharedMemory('wnsm_1a240051', size=4096) SharedMemory('wnsm_6b449165', size=4096)
SharedMemory('wnsm_1a240051', size=4096) SharedMemory('wnsm_6b449165', size=4096)
SharedMemory('wnsm_1a240051', size=4096) SharedMemory('wnsm_6b449165', size=4096)
SharedMemory('wnsm_1a240051', size=4096) SharedMemory('wnsm_6b449165', size=4096)
SharedMemory('wnsm_1a240051', size=4096) SharedMemory('wnsm_6b449165', size=4096)
SharedMemory('wnsm_1a240051', size=4096) SharedMemory('wnsm_6b449165', size=4096)
SharedMemory('wnsm_1a240051', size=4096) SharedMemory('wnsm_6b449165', size=4096)
SharedMemory('wnsm_1a240051', size=4096) SharedMemory('wnsm_6b449165', size=4096)
SharedMemory('wnsm_1a240051', size=4096) SharedMemory('wnsm_6b449165', size=4096)
SharedMemory('wnsm_1a240051', size=4096) SharedMemory('wnsm_6b449165', size=4096)
SharedMemory('wnsm_1a240051', size=4096) SharedMemory('wnsm_6b449165', size=4096)
We can see that the tensor objects are the same, as well as the determinants objects.
Numba is a Python module used to help with faster code and parallelization. It can be used to compile code in no Python mode, which makes it quite faster than normal and native Python code that has to go through the interpreter. It can also be used to parallelize loops or other computations and it supports GPU computations3. It can be seen as a replacement for the multiprocessing module when what we have to do is quite simple and don't really require us to manage memory ourselves.
@jit is probably the most used decorator of Numba. It allows to compile just in time (hence jit) with or without using the Python interpreter. Since it is not the goal of this HOWTO, we won't go into many details about what it means, but we will use it since it allows us to automatically parallelize our code. We will also use the prange object, which is similar to the built in range object, but is quite different since it can do loops in parallel automatically. Let's take our determinant example and see how we can use Numba:
import numpy as np
import numba as nb
import time
@nb.jit(parallel=True, nopython=False)
def determinants(tensor, determinants):
shape = tensor.shape
for i in nb.prange(shape[-1]):
determinants[i] = np.linalg.det(tensor[:, :, i])
return determinants
if __name__ == '__main__':
tensor = np.random.randint(-5, 5, (100, 100, 10_000)).astype(float)
dets = np.zeros((tensor.shape[-1],), float)
dets_2 = dets.copy()
determinants(tensor, dets)
for i in range(tensor.shape[-1]):
dets_2[i] = np.linalg.det(tensor[:, :, i])We see here that we use the prange object and that should automatically parallelize our code. There are some drawbacks to using Numba. First of all, I had to put the result array as a parameter instead of declaring it in the function directly. This is because Numba uses some of its own definitions of NumPy methods which can lead to some issues. Then, it can also be problematic with object oriented programming, so that's why I left it behind in this example, but this doesn't mean that it is always not compatible. Finally, it can be problematic to use other third party modules. For example, it is better to use it when we use NumPy and certain basic operations. It can still work with SciPy (for example) but there will be warnings and if we want to use it, the code may not compile in nopython=True mode. To be careful with!
It is also important to talk about race conditions. @jit doesn't seem to lock accesses to resources, so be careful when wanting to change an array or a value. Make sure it is not changed by another step of the loop. In our previous example, every part of the array is accessed and changed only once in the loop. For more information, see https://numba.pydata.org/numba-doc/dev/user/parallel.html.
import numpy as np
from scipy.signal import correlate2d
from multiprocessing import Pool, cpu_count
import time
def autocorrelation(a):
return correlate2d(a, a)
def autocorrelationList(listMatrices):
correlations = []
for matrix in listMatrices:
correlations.append(autocorrelation(matrix))
return correlations
def split(container, count):
container = tuple(container)
return [container[i::count] for i in range(count)]
if __name__ == "__main__":
nbWorkers = cpu_count()
listMatrices = [np.random.randint(0, 100, (100, 100)) for _ in range(30)]
start = time.time()
autocorrelationNotMp = autocorrelationList(listMatrices)
endNotMp = time.time()
with Pool(nbWorkers) as pool:
splittedList = split(listMatrices, nbWorkers)
autocorrelationMp = pool.map(autocorrelationList, splittedList)
endMP = time.time()
print(f"No multiprocessing: {endNotMp - start}s")
print(f"Multiprocessing: {endMP - endNotMp}s")And the results are:
No multiprocessing: 15.631449937820435s
Multiprocessing: 5.566772222518921s
We see that with multiprocessing we have a runtime significantly lower than not using it. This example is simple yet quite interesting, because autocorrelation can be really useful with images or signal in general. The following figure shows the difference of runtime between multiprocessing (12 threads) and no multiprocessing for different numbers of convolve2d was used, just like in the previous code block.
convolve2d for multiple (100x100) matrices.
We see that the use of multiprocessing is really useful in some cases. The next case will show that it is not always useful.
Instead of doing a 2D correlation, let's do a 2D fast Fourier transform.
import numpy as np
from scipy.signal import correlate2d
from multiprocessing import Pool, cpu_count
import time
def fft2(a):
return np.fft.fft2(a)
def autocorrelationList(listMatrices):
ffts = []
for matrix in listMatrices:
ffts.append(fft2(matrix))
return ffts
def split(container, count):
container = tuple(container)
return [container[i::count] for i in range(count)]
if __name__ == "__main__":
nbWorkers = cpu_count()
listMatrices = [np.random.randint(0, 100, (100, 100)) for _ in range(30)]
start = time.time()
fftsNoMp = autocorrelationList(listMatrices)
endNotMp = time.time()
with Pool(nbWorkers) as pool:
splittedList = split(listMatrices, nbWorkers)
fftsMp = pool.map(autocorrelationList, splittedList)
endMP = time.time()
print(f"No multiprocessing: {endNotMp - start}s")
print(f"Multiprocessing: {endMP - endNotMp}s")And the output is:
No multiprocessing: 0.006981611251831055s
Multiprocessing: 1.9027998447418213s
We see that the use of multiprocessing bring us to a larger runtime. Maybe we can say that the number of matrices is too small? The following figure shows the difference of runtime between multiprocessing (12 threads) and no multiprocessing for different numbers of np.fft.fft2 was used, just like in the previous code block.
fft for multiple (100x100) matrices.
We see that even for large number of matrices (up to 500), multiprocessing is way slower than sequential code. We can explain (a little of the difference) by the fact that we have to copy the data each time, which increase the time required to do the job. It is also important to mention that the fast Fourier transform is indeed really fast! There is no need to use multiprocessing.
Like I said earlier automatic parallelization with @jit is really optimal with basic array manipulations or some NumPy methods. For this benchmark, we will use our determinant example:
import numba
import numpy as np
import time
import matplotlib.pyplot as plt
plt.rcParams.update({'font.size': 22})
def split(container, count):
container = tuple(container)
return [container[i::count] for i in range(count)]
@numba.njit(parallel=True)
def detsNjit(tensor, determinants):
nMatrices = tensor.shape[-1]
for i in numba.prange(nMatrices):
determinants[i] = np.linalg.det(tensor[:, :, i])
@numba.jit(parallel=True)
def detsJit(tensor, determinants):
nMatrices = tensor.shape[-1]
for i in numba.prange(nMatrices):
determinants[i] = np.linalg.det(tensor[:, :, i])
@numba.jit(parallel=True, forceobj=True)
def detsJit2(tensor, determinants):
nMatrices = tensor.shape[-1]
for i in numba.prange(nMatrices):
determinants[i] = np.linalg.det(tensor[:, :, i])
def detsSequential(tensor, determinants):
nMatrices = tensor.shape[-1]
for i in range(nMatrices):
determinants[i] = np.linalg.det(tensor[:, :, i])
if __name__ == '__main__':
# For compiling purposes only! When the function is compiled,
# it is faster.
comp = np.ones((10, 10, 1), float)
determinant = np.zeros(1, float)
detsNjit(comp, determinant)
detsJit(comp, determinant)
detsJit2(comp, determinant)
nMatrices = range(10, 100_000, 250)
njitTimes = []
jitTimes = []
jit2Times = []
sequentialTimes = []
# Do multiple trials
for i, n in enumerate(nMatrices):
tensor = np.random.randint(-5, 5, (10, 10, 100_000)).astype(float)
tensorAsList = [tensor[:, :, i] for i in range(tensor.shape[-1])]
determinants = np.zeros(tensor.shape[-1], float)
s = time.time()
detsNjit(tensor, determinants)
e_1 = time.time()
detsJit(tensor, determinants)
e_2 = time.time()
detsJit2(tensor, determinants)
e_3 = time.time()
detsSequential(tensor, determinants)
e_4 = time.time()
njitTimes.append(e_1 - s)
jitTimes.append(e_2 - e_1)
jit2Times.append(e_3 - e_2)
sequentialTimes.append(e_4 - e_3)
print(f"{i + 1}/{len(nMatrices)} done")
plt.plot(nMatrices, njitTimes, label="Jit, no Python, parallel")
plt.plot(nMatrices, jitTimes, label="Jit, Python, parallel")
plt.plot(nMatrices, jit2Times, label="Jit, object mode, parallel")
plt.plot(nMatrices, sequentialTimes, label="Sequential")
plt.legend()
plt.xlabel(r"Number of matrices (shape $10\times 10$) [-]")
plt.ylabel("Time [s]")
plt.show()The results speak for themselves:
@jit variants vs sequential execution
We can see that when using object mode with forceobj=True it is as slow, even slower than sequential execution. This is expected, as said in the documentation here. We can see that both nopython=True and nopython=False are really similar. Both are parallel methods, but one is compiled without using the Python interpreter. In this case, there is no significant advantage to use nopython=True, but in some other cases there might be a huge time advantage.
In conclusion:
- It is sometimes easier to parallelize with
numba, but it mostly only works under certain particular circumstances.- When using NumPy methods and working with arrays, I would recommend using it since it does not require a lot of work from the programmer.
- I recommend using
multiprocessingbecause we have better control and much more freedom to what we can do. - I recommend the following architecture:
- If possible, don't bother working with shared memory if you begin in multiprocessing.
- Split your iterable of arguments in
$n$ parts where$n$ is the number of processes you plan using.- This allows for a better control of what goes in which process.
- Each process will have their (already) defined inputs
- Use the
withstatement with a pool of workers, instead of instantiating processes one by one. You may forget to join each process and it can be heavier to deal with. - Use the
mapmethod, because it starts all processes for us, unlikeapplywhere we have to start them one by one in a loop (see previous examples). You can forget aboutimapand the other (exceptstarmap: use it when multiple arguments per job!), unless you know what you are doing or require them.
- Experiment with
multiprocessing! Get to know it, hang out together and see where the relationship is going!
Don't hesitate to ask me questions (or, if I am gone, ask other people) if you need help. If this document has any mistake, please notify me or try to fix it! I may have done this document, but it is for everyone.
Thank you and happy parallelizing!
Footnotes
-
Taken from https://en.wikipedia.org/wiki/Processor_design ↩
-
The sequential sum would most likely be faster since it is only a sum, and we could also use
numpy.sumor evensumand both should be faster than the sequential sum. The goal in these examples is not to have a faster code but to have a feeling of how to use the different methods. ↩ -
Works for CUDA GPU architecture (most recent NVIDIA cards) and (experimentally) with ROC GPU architecture (most recent AMD cards). ↩

