read

Recently I was trying to quickly parallelize a task in Python and found it not as easy as I would have thought. Therefore I decided to make a decorator that could easily split tasks up and thread them using the concurrent.futures module added in Python 3.2.

Let’s say you have a task that looks like this:

def execute_task(val):
   # do something threadable
   return val * 10

def runner_fcn(values):
   results = []
   for v in values:
       results.append(execute_task(v))
   return results

runner_fcn(list(range(100)))

This seems pretty typical, we have a function that loops through a bunch of inputs and executes some task that is embarrassingly parallel. Well because this format is so predictable I was able to write a decorator that you could attach to this function that will split up values and using concurrent.futures thread the tasks to be accomplished.

Here’s the decorator code, also posted in a gist on Github.

def threaded(fcn):
   def wrapper(*args, **kwargs):
       results = {}
       # We can use a with statement to ensure threads are cleaned up promptly
       with concurrent.futures.ThreadPoolExecutor() as executor:

           futures = {executor.submit(fcn, [i]): idx for idx,
                      i in enumerate(args[0])}

           tasks = len(futures)
           tenth = round(tasks / 10)
           print('Formed pool of {} tasks'.format(tasks))

           for idx, future in enumerate(concurrent.futures.as_completed(futures)):
               i = futures[future]
               try:
                   # store result
                   data = future.result()
                   # check to see if in array form
                   if len(data) == 1:
                       data = data[0]
                   results[i] = data
               except Exception as exc:
                   print('{} generated an exception: {}'.format(
                       args[0][i], exc))

               if tenth != 0 and idx != 0 and idx % tenth == 0:
                   print('{}% Done'.format((idx // tenth) * 10))

       # sort and put in array
       final = []
       for k, v in sorted(results.items()):
           final.append(v)

       return final
   return wrapper

To walk through this we are basically intercepting the values list and splitting it up to call the runner_fcn multiple times. First, we create a ThreadPoolExecutor which will manage our futures objects. Then we go through our values list and submit jobs to the executor. After they are submitted we can just loop through the finished results using concurrent.futures.as_completed. This will add the results to a dictionary which we flatten back to a list to return. The extra print statements in this code are just for showing progress as the code works through the tasks. It will start by printing out how many tasks there are and print out the progress every 10% of the way.

The idea of using a decorator is that once you import it, you can use it as easily as adding one line! Here is the modified example code.

from threading_decorator import threaded

def execute_task(val):
   # do something threadable
   return val * 10

@threaded
def runner_fcn(values):
   results = []
   for v in values:
       results.append(execute_task(v))
   return results

runner_fcn(list(range(100)))

Next Steps

This example was mostly thrown together for one use case with only slight robustness. If I was to use this more frequently I would want to make some improvements1. These would all basically fall under the category of making the process more robust. For instance if the argument was a dictionary instead of a list or if there were multiple arguments I would need to change how they are passed through. Another example would be that right now I split every task into an instance of the runner. I could instead split the values list into just four parts or maybe the number of cpus and call the runner with those larger sections for less overhead. Finally, the results are returned in a list that assumes you passed values in as a sorted list. I could just use the sorting that was initially supplied rather than assuming. These next steps wouldn’t be that hard to implement and if I got around to it this could make a nice small package to add to PyPI. Maybe a good idea for the future.

  1. Maybe while one of these threading tasks is running ↩︎
Blog Logo

Andrew Yale


Published

Image

Andrew J Yale

A blog about projects in all areas of tech

Back to Overview