Skip to main content
Version: 0.1

Parallel Actor Pool

In some cases, the parallel pipeline is not enough for efficient processing. E.g. when one of the groups is significantly more demanding than others. The parallel actor pool was created to solve these kind of problems.

Idea#

The basic idea of a parallel actor pool is replicating groups (actors). This enables parallel processing of the same group for different jobs. It doesn't violate the parallel pipeline constraints - each graph is processed sequentialy within a single request (job). However, as it was mentioned earlier, the parallel actor pool replicates groups and graphs within them, which in turn increases the capacity of a pipeline at that certain fragment.

ACg1g3Bg2replicas = 3

Module processing timeline#

0Job 1Job 2Job 3123456789BCAABCABC

The idea is similar to the ray.ActorPool.

Definition#

Actor pool can be declared by providing replicas parameter (alongside other options like num_gpus) of a group:

group = ParallelPipeline.Group('g1', replicas=3, **rest_options)

On the logical layer, the pipeline will be the same regardless of the number of replicas. Each request will be processed once by the g1 group. However, on the physical layer, g1 will be spawned 3 times, allowing the same logical group to process 3 different requests.

Limitations#

  1. Replication is done on the physical layer - each physical copy requires its own resources! If group g1 requires 1 GPU, then g1 replicated 4 times requires 4 GPUs.
  2. Aggregation modules cannot be placed within a replicated group. It does not concern regular modules placed after the aggregation one.
  3. An Actor pool can be useful only for solving pipeline bottlenecks. The time overhead of replicas management can even slow down processing in other situations!

Example#

The example is based on two modules definitions:

example_modules.py
class NullInterface(Module.Interface):    pass
@register('example-module')@accept(NullInterface)@finalize()class ExampleModule(Module.Runtime):    def run(self, *args, **kwargs):        time.sleep(1)  # mimics some processing for 1 second        return NullInterface()
@register('long-running-module')@accept(NullInterface)@finalize()class LongRunningModule(Module.Runtime):    def run(self, *args, **kwargs):        time.sleep(5)  # mimics some processing for 5 seconds        return NullInterface()

Using these modules, a simple pipeline is created:

ACg1g3Bg2replicas = 3
main.py
from magda.pipeline.parallel import init, ParallelPipeline
# Initialize rayinit()
# Groupsg2 = ParallelPipeline.Group('g2', replicas=3)
# Modulesa = ExampleModule('A', group='g1')b = LongRunningModule('B', group='g2')c = ExampleModule('C', group='g3')
# Build pipelinepipeline = (    ParallelPipeline()    .add_group(g2)    .add_module(a)    .add_module(b.depends_on(a))    .add_module(c.depends_on(b))    .build(context))

Module processing timeline#

0Job 1Job 2Job 3123456789101112131415161719201821BCAABCABC

The advantage of actor pool is visible in the comparison with other pipelines:

SequentialParallelParralel Actor Pool
21 [s]17 [s]9 [s]

Overload#

Remember

Parallel actor pool can simultaneously process as many different jobs as there are replicas.

In the example above, requesting the 4th job (while having only 3 replicas of g2) will significantly slow down the pipeline. g2 group can process the last job only after finishing any of the currently running jobs, which is shown below.

Module processing timeline#

0Job 1Job 2Job 3123456789BCAABCABC101112Job 3ABC