Luigi: Running one task multiples times

Rodrigo Lazarini Gil
2 min readFeb 12, 2018
Luigi’s basic structure

I started working with Luigi only a few weeks ago. I understood the basics and felt like trying it in a data warehouse project to link all the dependency graph.

It was really easy to make this work until one coleague of mine asked me “How do I execute one task, multiple times, with differentes parameters? I wanna load all history data processed last year”

After some struggle, I managed to do that in two ways. One is faster, the other may be more flexible in other situations.

Using a WrapperTask

A wrapper task is a task that doesn’t do anything. It’s complete when all your requirementes are met.

So I create one based class on WrapperTask:

This class will receive two parameters (start, stop) to determine the range of tasks created. I also passed the task name as a parameter and instantiated it to retrieve the requirements.

Luigi run with command line arguments

This way, luigi will treat ‘ExampleAllYear’ as a dummy task, lookup and run its dependencies, one by one.

Although very easy to implement, every task executed will check if they need to run. In my case, the check is a count in a postgres table. It might be a problem in some cases.

Using the RangeDaily wrapper

The RangeDaily is an already implemented wrapper. It has many parameters and validations implemented.

I just called using this command:

I didn’t have to implement any wrapper task. So, to me is the better solution, if you don’t have to customize the way yours tasks will be called.

Using RangeDaily also makes you change a little bit your task. I had to add na ‘bulk_complete’ method override:

This is called once, before the RangeDaily starts calling the tasks. You receive one parameter with all a date list passed in the command line. And you must return only the dates that don’t have to run.

In my case, I used the list to find the distinct dates already inserted in my database table.

Because this method is executed once, the tasks start much faster.

Besides the bulk complete, pay attention to others RangeDaily parameters:

I had problems with ‘days_back’, ‘days_forward’, ‘task_limit’. They work just fine, but I had trouble finding out that these were the responsibles for my pipeline to say that there were nothing else to run.

This is my first article in here. Hope this becomes a routine. :)

--

--

Rodrigo Lazarini Gil

Working through the years with SQL, data modeling, data platform and engineering. Currently focused on data platform and spark jobs with python.