Parallel Actor Pool
In some cases, the parallel pipeline is not enough for efficient processing. E.g. when one of the groups is significantly more demanding than others. The parallel actor pool was created to solve these kind of problems.
#
IdeaThe basic idea of a parallel actor pool is replicating groups (actors). This enables parallel processing of the same group for different jobs. It doesn't violate the parallel pipeline constraints - each graph is processed sequentialy within a single request (job). However, as it was mentioned earlier, the parallel actor pool replicates groups and graphs within them, which in turn increases the capacity of a pipeline at that certain fragment.
#
Module processing timelineThe idea is similar to the ray.ActorPool.
#
DefinitionActor pool can be declared by providing replicas
parameter (alongside other options like num_gpus
) of a group:
- Code-first
- Config-first
group = ParallelPipeline.Group('g1', replicas=3, **rest_options)
groups: - name: g1 options: replicas: 3 ...
On the logical layer, the pipeline will be the same regardless of the number of replicas
. Each request will be processed once by the g1
group.
However, on the physical layer, g1
will be spawned 3 times, allowing the same logical group to process 3 different requests.
#
Limitations- Replication is done on the physical layer - each physical copy requires its own resources! If group
g1
requires 1 GPU, theng1
replicated 4 times requires 4 GPUs. - Aggregation modules cannot be placed within a replicated group. It does not concern regular modules placed after the aggregation one.
- An Actor pool can be useful only for solving pipeline bottlenecks. The time overhead of replicas management can even slow down processing in other situations!
#
ExampleThe example is based on two modules definitions:
class NullInterface(Module.Interface): pass
@register('example-module')@accept(NullInterface)@finalize()class ExampleModule(Module.Runtime): def run(self, *args, **kwargs): time.sleep(1) # mimics some processing for 1 second return NullInterface()
@register('long-running-module')@accept(NullInterface)@finalize()class LongRunningModule(Module.Runtime): def run(self, *args, **kwargs): time.sleep(5) # mimics some processing for 5 seconds return NullInterface()
Using these modules, a simple pipeline is created:
- Code-first
- Config-first
from magda.pipeline.parallel import init, ParallelPipeline
# Initialize rayinit()
# Groupsg2 = ParallelPipeline.Group('g2', replicas=3)
# Modulesa = ExampleModule('A', group='g1')b = LongRunningModule('B', group='g2')c = ExampleModule('C', group='g3')
# Build pipelinepipeline = ( ParallelPipeline() .add_group(g2) .add_module(a) .add_module(b.depends_on(a)) .add_module(c.depends_on(b)) .build(context))
modules: - name: A type: example-module group: g1
- name: B type: long-running-module group: g2 depends_on: - A
- name: C type: example-module group: g3 depends_on: - B
groups: - name: g2 options: replicas: 3
from magda import ConfigReaderfrom magda.module import ModuleFactory
pipeline = ConfigReader.read('./config.yml', ModuleFactory, context=context)
#
Module processing timeline- Actor Pool
- Parallel
- Sequential
The advantage of actor pool is visible in the comparison with other pipelines:
Sequential | Parallel | Parralel Actor Pool |
---|---|---|
21 [s] | 17 [s] | 9 [s] |
#
OverloadRemember
Parallel actor pool can simultaneously process as many different jobs as there are replicas.
In the example above, requesting the 4th job (while having only 3 replicas of g2
) will significantly slow down the pipeline. g2
group can process the last job only after finishing any of the currently running jobs, which is shown below.