Quick Start ⏱️
#
Installation#
pippip install magda
#
From the repository (next version)pip install https://github.com/NeuroSYS-pl/magda/archive/refs/heads/main.zip
#
UsageHaving installed MAGDA, a simplistic pipeline can be created with just a few lines of code.
The above pipeline is composed of just 2 modules. The first one sums all numbers from a given list and outputs a single number. And the second module raises that number to a given power.
There can be implemented several approaches to building the same pipeline.
SequentialPipeline
#
1. The simplest Every pipeline consists of a couple of steps:
- Class definition - defining each
Module
(andInterface
) - Module initialization - getting every
Module
instance, defining its dependencies and parameters - Pipeline creation - defining a pipeline and adding
Modules
to it - Pipeline build
- Pipeline run
import asynciofrom magda.module import Modulefrom magda.decorators import accept, finalize, exposefrom magda.pipeline import SequentialPipeline
@finalizeclass AddingNumbersModule(Module.Runtime): def run(self, data, request): return sum(request)
@accept(AddingNumbersModule)@expose()@finalizeclass RaisingToPowerModule(Module.Runtime): def run(self, data, **kwargs): number = data.get(AddingNumbersModule) return number ** self.parameters['power']
sum_module = AddingNumbersModule('module_sum')power_module = RaisingToPowerModule('module_power')power_module.depends_on(sum_module)power_module.set_parameters({'power': 2})
builder = SequentialPipeline()builder.add_module(sum_module)builder.add_module(power_module)
runtime = asyncio.run(builder.build())result = asyncio.run(runtime.run(request=[1, 2, 3]))print(result['module_power'])# output: 36
SequentialPipeline
with Interfaces#
2. MAGDA Interfaces are just classes encapsulating data passed between modules. However, it's recommended to use them as they straighten the code up, providing more clarity and flexibility.
The above code can be rewritten as follows:
import asynciofrom dataclasses import dataclassfrom magda.module import Modulefrom magda.decorators import accept, produce, finalize, exposefrom magda.pipeline import SequentialPipeline
@dataclassclass Number(Module.Interface): value: int
@dataclassclass Power(Module.Interface): number: int power: int = 1
@produce(Number)@finalizeclass AddingNumbersModule(Module.Runtime): def run(self, data, request): return Number(sum(request))
@accept(Number)@produce(Power)@expose()@finalizeclass RaisingToPowerModule(Module.Runtime): def run(self, data, **kwargs): number = data.get(Number).value power = self.parameters['power'] return Power(number ** power, power=power)
sum_module = AddingNumbersModule('module_sum')power_module = RaisingToPowerModule('module_power')power_module.depends_on(sum_module)power_module.set_parameters({'power': 2})
builder = SequentialPipeline()builder.add_module(sum_module)builder.add_module(power_module)
runtime = asyncio.run(builder.build())result = asyncio.run(runtime.run(request=[1, 2, 3]))print(result['module_power'])# output: Power(number=36, power=2)
SequentialPipeline
built from a config file#
3. It's also recommended to use configs - yaml
files that enable to define a pipeline easily. The Pipeline creation and Pipeline build steps are now replaced by registering Modules
in the ModuleFactory
and reading the pipeline from a configuration file.
The same pipeline as before can be obtained using the below config:
modules: - name: module_sum type: adding-numbers-module - name: module_power type: raising-to-power-module depends_on: - module_sum parameters: power: 2
import asynciofrom dataclasses import dataclassfrom magda.module import Modulefrom magda.decorators import accept, produce, finalize, exposefrom magda.pipeline import SequentialPipelinefrom magda.module.factory import ModuleFactoryfrom magda.config_reader import ConfigReader
@dataclassclass Number(Module.Interface): value: int
@dataclassclass Power(Module.Interface): number: int power: int = 1
@produce(Number)@finalizeclass AddingNumbersModule(Module.Runtime): def run(self, data, request): return Number(sum(request))
@accept(Number)@produce(Power)@expose()@finalizeclass RaisingToPowerModule(Module.Runtime): def run(self, data, **kwargs): number = data.get(Number).value power = self.parameters['power'] return Power(number ** power, power=power)
ModuleFactory.register('adding-numbers-module', AddingNumbersModule)ModuleFactory.register('raising-to-power-module', RaisingToPowerModule)
with open('my_config_file.yaml') as file: config = file.read() runtime = asyncio.run(ConfigReader.read(config, ModuleFactory))
result = asyncio.run(runtime.run(request=[1, 2, 3]))print(result['module_power'])# output: Power(number=36, power=2)