Modern computers are equipped with processors that allow fast parallel computation at several levels: Vector or array operations, which allow to execute similar operations simultaneously on a bunch of data, and parallel computing, which allows to distribute data chunks on several CPU cores and process them in parallel. When working with large amounts of data, it is important to know how to exploit these features because this can reduce computation time drastically. Taking advantage of this usually requires some extra effort during implementation. With packages like NumPy and Python’s multiprocessing
module the additional work is manageable and usually pays off when compared to the enormous waiting time that you may need when doing large-scale calculations inefficiently.
Dump the loops: Vectorization with NumPy
Many calculations require to repeatedly do the same operations with all items in one or several sequences, e.g. multiplying two vectors a = [1, 2, 3, 4, 5]
and b = [6, 7, 8, 9, 10]
. This is usually implemented with a loop (e.g. for
or while
loop) where each item is treated one by one, e.g. 1 * 6
, then 2 * 7
, etc. Modern computers have special registers for such operations that allow to operate on several items at once. This means that a part of the data, say 4 items each, is loaded and multiplied simultaneously. For the mentioned example where both vectors have a size of 5, this means that instead of 5 operations, only 2 are necessary (one with the first 4 elements and one with the last “left over” element). With 12 items to be multiplied on each side we had 3 operations instead of 12, with 40 we had 10 and so on.
In Python we can multiply two sequences with a list comprehension:
>>> a = [1, 2, 3, 4, 5]
>>> b = [6, 7, 8, 9, 10]
>>> [x * y for x, y in zip(a, b)]
[6, 14, 24, 36, 50]
This is fine for smaller data. However, it is not as efficient as vectorizing the multiplication with NumPy. When we put the data into NumPy arrays, we can write the multiplication as follows:
>>> import numpy as np
>>> a = np.array([1, 2, 3, 4, 5])
>>> b = np.array([6, 7, 8, 9, 10])
>>> a * b
array([ 6, 14, 24, 36, 50])
First of all, that’s much more compact than writing a list comprehension. Furthermore, it’s also much faster due to vectorization, as we can see when we multiply two arrays with 1,000,000 integers each. Let’s start with the unoptimized, pure Python implementation using random integers:
>>> a = [random.randint(1, 100) for _ in range(1000000)]
>>> b = [random.randint(1, 100) for _ in range(1000000)]
>>> %timeit res = [x * y for x, y in zip(a, b)]
63.7 ms ± 554 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
I’m using the %timeit “magic command” in an IPython session to measure the execution time which gives me about 63ms on my machine. Now to the vectorized version implemented with NumPy:
>>> import numpy as np
>>> a = np.random.randint(1, 100, 1000000)
>>> b = np.random.randint(1, 100, 1000000)
>>> %timeit a * b
1.88 ms ± 5.21 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
The execution time goes down to about 1.9ms, which means the calculations are more than 30x faster! At the same time, the extra effort for implementation was low and I would say that using the *
operator for multiplying two NumPy arrays is more natural and concise than using a list comprehension or a loop.
A more practical example for vectorization
Let’s create a more practical example for vectorization to see how much can be achieved in an everyday task. One such task might be calculating the great circle distance (GCD) of two points on earth, which can be done with the haversine formula. The formula involves trigonometric operations, multiplications, square root, etc. so it might beneficial to use vectorization. Let’s have a look at a non-vectorized implementation in pure Python first:
import math
def haversine(row):
a_lat, a_lng, b_lat, b_lng = row
R = 6371 # earth radius in km
a_lat = math.radians(a_lat)
a_lng = math.radians(a_lng)
b_lat = math.radians(b_lat)
b_lng = math.radians(b_lng)
d_lat = b_lat - a_lat
d_lng = b_lng - a_lng
a = math.pow(math.sin(d_lat / 2), 2) \
+ math.cos(a_lat) * math.cos(b_lat) \
* math.pow(math.sin(d_lng / 2), 2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return R * c
The function takes a row
of data, which is a tuple of 4 elements: the latitude and longitude of two points, a and b in degrees. It converts them to radians, then applies the calculations in the formula and returns the GCD in km. The function accepts the data as row because this way it is easier to apply it to our data, which will be a matrix where the four columns represent the origin latitude and longitude, and the destination latitude and longitude as in this example:
import numpy as np
coords = np.array([
# orig_lat, orig_lng, dest_lat, dest_lng
[52.516667, 13.388889, 51.507222, -0.1275], # Berlin-London
[52.516667, 13.388889, 55.75, 37.616667], # Berlin-Moscow
[55.75, 37.616667, 51.507222, -0.1275], # Moscow-London
])
We can now apply the function to each row of the data, i.e. each row with its origin and destination coordinates will be passed to haversine
. Again, this could be done with a list comprehension, but we can also use NumPy’s apply_along_axis
, which is a little shorter to write. apply_along_axis
takes three arguments: the function to apply, the axis on which this function is applied (for a 2D matrix 0 means column-wise and 1 means row-wise), and finally the data itself:
>>> np.apply_along_axis(haversine, 1, coords)
array([ 930.45355655, 1609.90936067, 2500.54316693])
This correctly gives us about 930km for the distance Berlin–London (the first row of the data), 1609km for Berlin–Moscow (2nd row) and 2500km for Moscow–London (3rd row).
What we just did was taking each row of the input data, so four values per row, and then use these values for calculating the GCD. So each row is treated individually, just as with the initial example where two sequences were multiplied. In order to do the calculations more efficiently, we could vectorize them. Instead of thinking in row-wise calculations, we should think in column-wise calculations, where each column is a vector who’s values can be used simultaneously. So instead of converting a single origin’s latitude to radians with a_lat = math.radians(a_lat)
, we could take all origins’ latitudes, i.e. the whole column, and turn it into radians with a vectorized operation from NumPy like: a_lat = np.radians(a_lat)
. Note that a_lat
is not a scalar (single) value anymore but contains all origins’ latitudes.
When we want to vectorize a function, we have to think about the layout of the data first and also about the calculations that are made. Not all calculations and algorithms can be vectorized efficiently. In our case, however, we are lucky since the input data can be split into columns as input vectors and all operations translate nicely into vectorized operations that are implemented in NumPy:
def vec_haversine(a_lat, a_lng, b_lat, b_lng):
R = 6371 # earth radius in km
a_lat = np.radians(a_lat)
a_lng = np.radians(a_lng)
b_lat = np.radians(b_lat)
b_lng = np.radians(b_lng)
d_lat = b_lat - a_lat
d_lng = b_lng - a_lng
d_lat_sq = np.sin(d_lat / 2) ** 2
d_lng_sq = np.sin(d_lng / 2) ** 2
a = d_lat_sq + np.cos(a_lat) * np.cos(b_lat) * d_lng_sq
c = 2 * np.arctan2(np.sqrt(a), np.sqrt(1-a))
return R * c # returns distance between a and b in km
This vectorized version includes the same calculations as the previous version, but instead of a row with four values that represent single origin and destination coordinates, it takes vectors (NumPy arrays) of origin latitudes, origin longitudes, destination latitudes and destination longitudes. Most of the math functions have the same name in NumPy, so we can easily switch from the non-vectorized functions from Python’s math
module to NumPy’s versions.
We now pass our function the columns of the data and it gives us the same result as before:
>>> vec_haversine(coords[:, 0], coords[:, 1], coords[:, 2], coords[:, 3])
array([ 930.45355655, 1609.90936067, 2500.54316693])
The expression coords[:, 0]
says take all rows (:
on the first axis) and the first column (0
on the second axis).
Now to the test: We can generate 10,000 random coordinates and measure the execution time of the “classic” and the vectorized implementation (see links on the bottom of the article for the whole scripts). This gives 53.8 ms and 2.9 ms, respectively, which means a speed up factor of about 18.
Similar principles can be applied to pandas DataFrames:
>>> labels = [
'Berlin-London',
'Berlin-Moscow',
'Moscow-London',
]
>>> df_coords = pd.DataFrame(coords, index=labels,
columns=['origin_lat', 'origin_lng',
'destination_lat', 'destination_lng'])
>>> df_coords
origin_lat origin_lng destination_lat destination_lng
Berlin-London 52.516667 13.388889 51.507222 -0.127500
Berlin-Moscow 52.516667 13.388889 55.750000 37.616667
Moscow-London 55.750000 37.616667 51.507222 -0.127500
Now we can access the columns by name to pass them to the vectorized haversine function:
>>> vec_haversine(df_coords.origin_lat, df_coords.origin_lng,
df_coords.destination_lat, df_coords.destination_lng)
Berlin-London 930.453557
Berlin-Moscow 1609.909361
Moscow-London 2500.543167
dtype: float64
Comparing to a non-vectorized implementation (using DataFrame.apply
), we get a speed up factor of more than 30 (174 ms vs. 4.8 ms).
All in all, the speed up that can be achieved with vectorization is immense. It might not be noticeable with small data and simple calculations. But when your data grows and calculations get more complex, you might end up waiting hours for results. Dividing this time by a factor of 30 is significant. At the same time, I think the extra effort in implementation work is quite low. The example showed how to vectorize the haversine formula, but you can apply the same principles to many other formulas and algorithms.
Divide and accelerate: Parallel computing with multiprocessing
By using vectorization, we exploit one important feature of modern processors (CPUs). However, we only use one CPU core, whereas nowadays desktop machines are usually equipped with at least 4 cores. We can exploit this with parallel processing, which I already briefly explained in connection with text analysis. The idea is that the data is split into equally sized chunks and then those chunks are distributed to the CPU cores so that each of them works on its own chunk and returns its calculation results. Those partial results are then combined to form the final result.
If you want to implement parallel processing on a single machine and distributing the workload is not too complex, a good way to start is to use Python’s multiprocessing module and its Pool
class. We will use its method apply_async
to distribute the work across several “worker processes”. All of these processes run the same function (i.e. the same calculations) but with different parts (“chunks”) of the data.
At first we will define a function that is called by each worker process. A chunk of data is passed to this function and we calculate the GCD for this chunk. Finally, we set the index of the result data to be the same as the index in the input chunk. This is necessary in order to combine the partial results from the individual processes later.
def process_chunk(proc_chunk):
chunk_res = vec_haversine(proc_chunk.origin_lat,
proc_chunk.origin_lng,
proc_chunk.destination_lat,
proc_chunk.destination_lng)
chunk_res.index = proc_chunk.index
return chunk_res
Now we can take care about distributing the work. In parallel processing, this is often one of the most difficult tasks, because you want to make sure that each worker process takes about the same time to finish the calculations. You can only create the final results once you get all the partial results from the worker processes. Hence if one process takes much more time to compute than the others, you’ll have to wait for it to finish while the other processes are idle.
In our case, we can just distribute the data evenly across the worker processes, because each row of data will take about the same time to compute. If we had, for example, text strings of different lengths in the data rows, this distribution scheme would not be efficient.
We’re still working with a dataframe of coordinates, called df_coords
like in the previous examples. At first we determine the number of worker processes that we want to start. We’ll use all CPU cores in our machine:
import multiprocessing as mp
n_proc = mp.cpu_count()
Next we determine the size of each chunk by integer division:
chunksize = len(df_coords) // n_proc
Of course, this will often result in a remainder, e.g. when we have 13 rows of data and 4 processes, then chunksize
will be 3 but we’ll have 1 row as remainder. We will handle this now as follows: A list proc_chunks
will contain a data chunk for each worker process. For each process we define a start row number chunkstart
and an end row number chunkend
. We’ll use these row numbers for slicing the dataframe. The last process will always get everything that’s left. Please note that the row numbers start with 0 and end with “num. rows – 1”. So given the former example, we’d end up with the following distribution of data chunks for the individual worker processes:
process | rows in chunk |
---|---|
process #1 | 0, 1, 2 |
process #2 | 3, 4, 5 |
process #3 | 6, 7, 8 |
process #4 | 9, 10, 11, 12 |
The distribution of the remainder is not optimal but we’ll leave it like this for the sake of simplicity. We can implement this as follows:
proc_chunks = []
for i_proc in range(n_proc):
chunkstart = i_proc * chunksize
# make sure to include the division remainder for the last process
chunkend = (i_proc + 1) * chunksize if i_proc < n_proc - 1 else None
proc_chunks.append(df_coords.iloc[slice(chunkstart, chunkend)])
It’s always good to add an assertion to make sure all data ended up in the chunks (i.e. the sum of chunk lengths match):
assert sum(map(len, proc_chunks)) == len(df_coords)
We can now start a pool of parallel worker processes, pass each process its chunk of data and let it run the process_chunk
function with it. It’s important to use apply_async
for this, because this will distribute the data and start the processes simultaneously (“non-blocking”) without waiting for individual processes to finish. The results should be saved in a list proc_results
that will contain the partial result of each worker process. Fetching those results is then a “blocking” task, i.e. we wait for the processes to finish the calculations by calling get()
on each result object:
with mp.Pool(processes=n_proc) as pool:
# starts the sub-processes without blocking
# pass the chunk to each worker process
proc_results = [pool.apply_async(process_chunk,
args=(chunk,))
for chunk in proc_chunks]
# blocks until all results are fetched
result_chunks = [r.get() for r in proc_results]
When all worker processes are finished with their calculations, their partial results will be saved in result_chunks
. We can use concat
to concatenate those results to a single dataframe and join it with the input data so that the final result will contain the coordinates and their respective distances:
results = pd.concat(result_chunks)
results = pd.concat((df_coords, results), axis=1)
# make sure we got a result for each coordinate pair:
assert len(results) == len(df_coords)
Running this while investigating the operating system’s activity with a tool like htop
will show that all processors will be busy during execution instead of just one. So this should speed up our calculations even more, right? Unfortunately not necessarily! For example, with 1,000,000 random coordinate pairs my 4-core machine took about 50% longer for calculating the results in parallel than with single processor execution. What happened? The vectorized code already runs quite fast, even for large datasets. Splitting the data into chunks, starting the worker processes, distributing the data and then collecting and combining the results again introduces a lot of extra work (“overhead”). This extra work unfortunately does not pay off in this scenario, because the actual processing time for each chunk is quite low, compared to the time spend for the parallelization overhead.
Still, there are many scenarios where parallelization does pay off despite the overhead. This is usually the case when the processing time for the data is high as compare to the parallelization overhead. If you have, for example, algorithms or formulas that cannot be vectorized, then parallelization will usually pay off. Let’s say for example, that we wanted to generate a hash for each observation in our data (i.e. each row – not each value). If you have a dataframe, you could do so with df.apply(lambda row: hash(tuple(row)), axis=1)
.* Running this in parallel gives a speed up factor of ~3 on my 4-core machine (again, the theoretical speed up of 4 is not reached because of overhead).
So parallelization can also be very helpful when it comes to reducing the calculation time. This is especially helpful when you have access to cluster machines that often have dozens of CPUs. On the other hand, you should consider wisely if parallelization will really help, because it usually takes more effort to implement it and can cause more headaches due to asynchronous execution. Furthermore, it might be faster, but use more memory so this can be a trade-off decision.
There are lots of Python packages for parallel and distributed computing, and you should consider using them when Python’s default multiprocessing
module does not fit your needs:
- joblib provides an easier to use wrapper interface to multiprocessing and shared memory
- dask is a complex framework for parallel and distributed computing
I uploaded the full scripts as gists:
- unoptimized.py contains non-vectorized haversine and hashing
- vectorized.py contains vectorized haversine calculation
- parallelized.py contains parallel execution of vectorized haversine calculation and parallel hashing
* Of course this is a made up example since you could also vectorize the hashing function. But this is only a short placeholder for algorithms that cannot (easily) be vectorized, and should prove the point that parallelization can also reduce execution time drastically.
Recent Comments