Scale out your Pandas DataFrame operations using Dask

In Pandas, one can easily apply operations on all the data using the apply method. However, this method is quite slow and is not useful when scaling up your methods. Is there a way to speed up these operations? And if so, how? Yes, there is! This blog post will explain how you can use Dask to maximize the power of parallelization and to scale out your DataFrame operations.

An example

As an example, consider the following: suppose we generate a collection of numbers. We generate 1,000,000 integer between 0 and 9999. Then, we turn the integers into a Pandas DataFrame since this is a common operation which is done in most data-related projects:

1
2
3
4
5
6
7
8
9
10
11
12
import pandas as pd
import random

if __name__ == '__main__':
 # The amount of numbers to generate
 N = 1000000

 # Now generate 1,000,000 integers between 0 and 9999
 data = [random.randint(0, 9999) for _ in range(N)]
 
 # And finally, create the DataFrame
 df = pd.DataFrame(data, columns=['number'])

The task is then to filter out all the integers smaller than 1000 (so 0 … 999).

How fast can we process all the numbers?

We can check whether the numbers are small (< 1000) using the following code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import pandas as pd
import random


def is_small_number(x):
 # Returns True if x is less than 1000, False otherwise
 return x < 1000


if __name__ == '__main__':
 # The amount of numbers to generate
 N = 1000000

 # Now generate 1,000,000 integers between 0 and 9999
 data = [random.randint(0, 9999) for _ in range(N)]

 # And finally, create the DataFrame
 df = pd.DataFrame(data, columns=['number'])

 # Now apply the "is_small_number" operation to othe data
 small_number_booleans = df.apply(is_small_number)

Here, the final variable contains booleans whether the number is small enough (less than 1000). We can now use timeit to figure out the speed of the apply operation. We run timeit for 100 times using the following code:``

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import pandas as pd
from timeit import timeit
import random


def is_small_number(x):
 # Returns True if x is less than 1000, False otherwise
 return x < 1000


if __name__ == '__main__':
 # The amount of numbers to generate
 N = 1000000

 # Now generate 1,000,000 integers between 0 and 9999
 data = [random.randint(0, 9999) for _ in range(N)]

 # And finally, create the DataFrame
 df = pd.DataFrame(data, columns=['number'])

 # Now we can figure out the speed of the apply method
 print('pandas apply method', timeit(lambda: df.apply(is_small_number), number=100))

This resulted into the following outcome:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import pandas as pd
import dask.dataframe as dd
from timeit import timeit
import random


def is_small_number(x):
 # Returns True if x is less than 1000, False otherwise
 return x < 1000


if __name__ == '__main__':
 # The amount of numbers to generate
 N = 1000000

 # Now generate 1,000,000 integers between 0 and 9999
 data = [random.randint(0, 9999) for _ in range(N)]

 # And finally, create the DataFrame
 df = pd.DataFrame(data, columns=['number'])

 # Now convert it to a Dask DataFrame and chunk it into 4 partitions
 df = dd.from_pandas(df, npartitions=4)

 # Now we can figure out the speed of the apply method
 print('dask parallel map partitions method', timeit(lambda: df.map_partitions(is_small_number).compute(), number=100))

This resulted into the following:

That is, 0.78 seconds which means a speed-up of 1.8x! A few words here: the more partitions you use, the more overhead for setting up the threads. Be careful with the number of partitions. For small tasks like this, a low number of partitions works better. For larger tasks, it might be the case that a larger number of partitions works better. Also note that the interface is similar to the Apache Spark interface. The map-reduce concepts are also available in Dask and Dask builds a computation graph in the background. When the compute() method is called, the computation graph is executed. This will be explained in a later post on Dask. For now, it is interesting that you can speed-up your Pandas DataFrame apply method calls!

Conclusions

  • You now know how Dask can scale out operations on your Pandas DataFrames.
  • In the simple example, we achieved a speed-up of 1.8x. This speed-up is way larger for heavy tasks and datasets.
  • Parallelization is key to faster computations on large amounts of data.