Skip to main content
Version: Next

Parallel Pipeline

The second type of pipeline (besides the sequential pipeline) is a parallel pipeline. Its main goal is optimization by setting up some parts of the flow to be processed in parallel.

Big picture#

Parallel pipeline is a composition of groups. Similar to the modules in the sequential pipeline, the groups are linked together, creating a directed acyclic graph. What's more, the sequential pipeline can be transformed into parallel one by grouping together its modules. Then, these blocks can be processed at the same time.

Parallel processing is done via ray library. It is responsible for cluster management, resource provisioning and rest low-level operations. MAGDA focuses on combining it in a proper execution order and exposing a user-friendly interface suited for modular processing.

What is a Group?#

Group is a block of actions (a group of modules) that must be processed together by the same process. Under the hood, a group is just a graph from the sequential pipeline, which invokes modules one-by-one. Its module dependencies are resolved in the same manner as in the sequential pipeline.

The true advantage of a group is visible from the bigger perspective. Modules within a group are processed sequentially, while groups can be processed in parallel (it depends on the flow described below) by different processes and even different nodes in a cluster. Group can be identified as an Actor, which claims hardware resources (CPU/GPU) and invokes its modules within the same node and thread.

Parallel flow#

Groups and modules have the same logic behind their starting conditions: dependencies. The parallel pipeline is a manager, which invokes every group (sequentially calls all modules within the group) as soon as all its dependencies are resolved. Each group determines its dependencies (the starting condition). It is the list of all modules on which that group modules depend on and are not within the same group. In simple words: all links to the group's modules, which starts outside that group. A group starts processing as soon as results from all preceding modules are available.

ABCDg1g2g3

As seen above, groups g1 and g2 don't have any dependencies. It means that their starting conditions are already fulfilled and they can immediately begin processing. Group g3 will begin processing as soon as results from modules A from g1 and C from g2 are available (despite that, the B module in the g1 group can still be processing).

Technical details#

Pipeline#

The parallel pipeline can be built and used very similarly to the sequential pipeline.

from magda.pipeline.parallel import ParallelPipeline
# Building the parallel pipelinebuilder = (    ParallelPipeline()        .add_module(...)        .add_module(...)        .add_module(...))pipeline = builder.build(context)
# Usage - pipeline.run is coroutine so "await" is requiredawait pipeline.run(job)

The main difference is underneath. The parallel pipeline is based on Actors from the ray library. Therefore, the low-level operations are managed by the ray cluster and require initialization at the very beginning of the application:

magda.pipeline.parallel.init(*args, **kwargs)

This function wraps ray.init and passes all arguments to ray.

The main usage difference is that, the ParallelPipeline.run is an asyncio.coroutine. It means, that invoking the function is subject to asynchronous programming in asyncio. It gives a huge variety of possibilities in jobs management, like delegating multiple jobs at once:

tasks = [    asyncio.create_task(pipeline.run(job))    for job in jobs_to_process]results = await asyncio.gather(*tasks)

This example creates jobs for the pipeline simultaneously. They will be processed in parallel, following the order and restriction (groups replicas and resources) set by the parallel pipeline. After all jobs are finished, the output will be returned to the results as a List. The results order will be the same as jobs in the tasks i.e. results[0] corresponds to the jobs_to_process[0], sencond result to the second job etc.

Pro Tip

Check AsyncIO Documentation for more operators and design patterns.

Group#

magda.pipeline.parallel.ParallelPipeline.Group(    name: str, *, replicas: int = 1, **rest_ray_options,)

Parameters:

  • name (str) - name of a group used in a config file or in code
  • replicas (int) (Optional) - number of group (and modules) copies required by the ParallelPool

Most common ray options:

  • num_cpus (int) (Optional) - number of CPU cores assigned to a group/actor
  • num_gpus (float) (Optional) - the quantity of GPUs assigned to a group/actor
  • rest (kwargs) - ray.remote documentation

Each module in a parallel pipeline must be assigned to one of the groups. Each group can be created in 2 ways:

  • manually
    (definition added to the config/code)
    By passing additional arguments like resource requirements for ray or number of copies for the ParallelPool.

  • automatically
    (definition skipped in the config/code but group referenced by at least one module)
    All parameters are set to default values.

These methods can be mixed within the single pipeline, i.e. one group is created manually and the rest are skipped (created automatically).

# Simple groupgroup1 = ParallelPipeline.Group('g1')
# Group with resource declarationgroup2 = ParallelPipeline.Group('g2', num_gpus=1)
pipeline = (    ParallelPipeline()        .add_group(group1)        .add_group(group2)        # Other groups (to which modules are assigned to)        #   will be created automatically as "simple groups" (like "g1")        ...)

The same can be achieved with a config file:

modules: ...
groups:  - name: g1  - name: g2    options:      num_gpus: 1.0

Examples#

Each example uses the same module definition:

example_module.py
@register('example-module')@accept(self=True)@finalize()class ExampleModule(Module.Runtime):    def run(self, *args, **kwargs):        time.sleep(1)  # Mimics some processing for 1 second        return None

Simple parallel pipeline#

ABCDEg1g2g3

Module processing timeline#

0BBg1CDg2g3Job 1ABg1CDg2g3EJob 2ABg1CDg2g3EJob 3AADCEEABC12345678

Sequential pipeline processes these 3 jobs in 15 seconds (3 jobs × 5 modules × 1 second each) while the parallel one interlaces jobs execution and finishes approximately after 8 seconds.

main.py
from magda.pipeline.parallel import init, ParallelPipeline
# Initialize rayinit()
# Build pipelinebuilder = ParallelPipeline()
# Group g1builder.add_module(ExampleModule('A', group='g1'))builder.add_module(  ExampleModule('B', group='g1')  .depends_on(builder.get_module('A')))
# Group g2builder.add_module(ExampleModule('C', group='g2'))
# Group g3builder.add_module(  ExampleModule('D', group='g3')  .depends_on(builder.get_module('B'))  .depends_on(builder.get_module('C')))builder.add_module(  ExampleModule('E', group='g3')  .depends_on(builder.get_module('D')))
# Build pipelinepipeline = builder.build(context)
# Run 3 parallel jobsresult = await asyncio.gather(  asyncio.create_task(pipeline.run('Job1')),  asyncio.create_task(pipeline.run('Job2')),  asyncio.create_task(pipeline.run('Job3')),)

Resource claim#

ABCg1 | GPUg3g2 | CPU, GPU

Module processing timeline#

0BBCg3Job 1AAC1234g1BCg2g3Ag1BCg2g3AAJob 2Job 3g2g1

Sequential pipeline processes these 3 jobs in 9 seconds (3 jobs × 3 modules × 1 second each) while the parallel one interlaces jobs execution and finishes approximately after 4 seconds.

main.py
from magda.pipeline.parallel import init, ParallelPipeline
# Initialize rayinit()
# Build pipelinebuilder = ParallelPipeline()
# Group g1builder.add_group(ParallelPipeline.Group('g1', num_gpus=1.0))builder.add_module(ExampleModule('A', group='g1'))
# Group g2builder.add_group(ParallelPipeline.Group('g2', num_cpus=4, num_gpus=0.5))builder.add_module(ExampleModule('B', group='g2'))
# Group g3builder.add_group(ParallelPipeline.Group('g3')) # Optional linebuilder.add_module(  ExampleModule('C', group='g3')  .depends_on(builder.get_module('A'))  .depends_on(builder.get_module('B')))
# Build pipelinepipeline = builder.build(context)
# Run 3 parallel jobsresult = await asyncio.gather(  asyncio.create_task(pipeline.run('Job1')),  asyncio.create_task(pipeline.run('Job2')),  asyncio.create_task(pipeline.run('Job3')),)