Introduction¶
This is a hand-on tutorial where concepts will be introduced in steps.
At the end of each step there will be a summary slide with an exercise which gives you time to try out the new concept.
Resources¶
Slides¶
https://ewoksfordevs.readthedocs.io
Ewoks Documentation¶
Tutorial Resources¶
git clone https://gitlab.esrf.fr/workflow/ewokstutorials/ewoksfordevs
- jupyter notebook that generated these slides
resultsdirectory with the results of all exercises- links to the slides in PDF format in the README
When to use Ewoks¶
- Online data processing / beamline automation (Bliss, Daiquiri or MXcube)
- Make data processing reproducible, repeatable and traceable
- Ewoks adapts to location (beamline computer, Slurm, user at home, ...)
- Ewoks takes care of integration (beamline acquisition, upload results to the data portal, ...)
- Libraries/scripts/notebooks that are ewoksified can be used by all Ewoks users
Part 2: Ewoks Tasks¶
- Create workflow tasks
- Use new tasks in the GUI
- Use third-party task libraries
- Install workflow dependencies
Part 3: Ewoks Integration¶
- Remote execution
- Slurm execution
- Online data processing in Bliss
Part 4: Final remarks¶
- Ewoks features not covered
- Deploy software on Slurm
Part 1: Ewoks Workflows¶
- Ewoks workflow glossary
- Create workflows
- Execute workflows
- Visualize workflows (GUI)
Ewoks workflow glossary¶
Create Ewoks Workflows¶
An Ewoks workflow can be created
- in Python using basic types
- in a web browser (Ewoksweb)
Other ways of creating a workflow
- in a desktop GUI (Orange)
- in Python using the
TaskGraphclass - JSON or YAML file
Create a workflow in Python¶
Define workflow nodes¶
node1 = {
"id": "node1",
"task_type": "class",
"task_identifier": "ewokscore.tests.examples.tasks.sumtask.SumTask",
}
node2 = {
"id": "node2",
"task_type": "class",
"task_identifier": "ewokscore.tests.examples.tasks.sumtask.SumTask",
}
- id: unique within the context of a workflow
- task_type:
"class" - task_identifier: full python qualifier name
node3 = {
"id": "node3",
"task_type": "class",
"task_identifier": "ewokscore.tests.examples.tasks.sumtask.SumTask",
}
node4 = {
"id": "node4",
"task_type": "class",
"task_identifier": "ewokscore.tests.examples.tasks.sumtask.SumTask",
}
Define workflow links¶
link1 = {"source": "node1", "target": "node3"}
link2 = {"source": "node2", "target": "node3"}
link3 = {"source": "node3", "target": "node4"}
Data mapping between the nodes
link1["data_mapping"] = [
{"source_output": "result", "target_input": "a"}
]
link2["data_mapping"] = [
{"source_output": "result", "target_input": "b"}
]
link3["data_mapping"] = [
{"source_output": "result", "target_input": "a"}
]
So a link is defined by a source, target and data_mapping
Define workflow inputs¶
node1["default_inputs"] = [
{"name": "a", "value": 2},
{"name": "b", "value": 3},
]
node2["default_inputs"] = [
{"name": "a", "value": 4},
{"name": "b", "value": 5},
]
So a node is defined by an id, task_type, task_identifier and (optionally) default_inputs
Define workflow¶
workflow = {
"graph": {"id": "workflow1"},
"nodes": [node1, node2, node3, node4],
"links": [link1, link2, link3],
}
You have created your first Ewoks workflow!
A workflow is defined by nodes, links and (optionally) graph.
More details: https://ewokscore.readthedocs.io/en/stable/reference/specs.html
Execute Ewoks workflows¶
pip install ewoks
Execute workflow from python¶
from ewoks import execute_graph
execute_graph(workflow)
{'result': 14}
Execute workflow from the command line¶
Save with the convert_graph function
from ewoks import convert_graph
convert_graph(workflow, "results/workflow1.json")
'results/workflow1.json'
You can also use json.dump but convert_graph adds validation.
Execute with the ewoks command (the output parameters will be discussed later)
!ewoks execute results/workflow1.json --outputs=end --merge-outputs
###################################
# Execute workflow 'results/workflow1.json'
###################################
RESULTS:
{'result': 14}
FINISHED
✏️ Exercise: make a workflow and execute it¶
Create a workflow with ewokscore.tests.examples.tasks.sumtask.SumTask tasks
w = {"graph": {"id": "workflow1"}, "nodes": [...], "links": [...]}
Execute
from ewoks import execute_graph
execute_graph(w)
Save
from ewoks import convert_graph
convert_graph(w, "results/workflow1.json")
Execute from the command line
ewoks execute results/workflow1.json --outputs=end --merge-outputs
Documentation: https://ewokscore.readthedocs.io/en/stable/reference/specs.html
Execution engine¶
In Python
execute_graph(workflow, engine="...")
On the command line
ewoks execute --engine=...
Currently the engines supported by Ewoks are: "core" (default), "dask", "ppf" (for pypushflow) and "orange".
"orange" will be used during the GUI part so we will focus on "dask" and "ppf" for now.
When using an execution engine, it needs to be installed first
pip install "ewoks[dask,ppf,orange]"
| engine | Loops | Conditional Links | Parallel execution | Interaction (GUI) | An Ewoks task is enough |
|---|---|---|---|---|---|
"core" |
$\color{red}{\text{✗}}$ | $\color{red}{\text{✗}}$ | $\color{red}{\text{✗}}$ | $\color{red}{\text{✗}}$ | $\color{green}{\text{✓}}$ |
"dask" |
$\color{red}{\text{✗}}$ | $\color{red}{\text{✗}}$ | $\color{green}{\text{✓}}$ | $\color{red}{\text{✗}}$ | $\color{green}{\text{✓}}$ |
"ppf" |
$\color{green}{\text{✓}}$ | $\color{green}{\text{✓}}$ | $\color{green}{\text{✓}}$ | $\color{red}{\text{✗}}$ | $\color{green}{\text{✓}}$ |
"orange" |
$\color{red}{\text{✗}}$ | $\color{red}{\text{✗}}$ | $\color{orange}{\text{(✓)}}$ | $\color{green}{\text{✓}}$ | $\color{red}{\text{✗}}$ |
Example on how to use and configure the execution engines:
https://ewoks.readthedocs.io/en/stable/howtoguides/running_workflows.html
Overview on the overhead you can expect for each engine:
https://ewoks.readthedocs.io/en/stable/howtoguides/benchmark.html
An example of parallel execution
Time without parallel execution (the workflow parameters will be discussed later)
%%time
execute_graph(
"results/workflow1.json",
inputs=[{"name": "delay", "value": 1, "all": True}],
)
CPU times: user 23 ms, sys: 12.4 ms, total: 35.4 ms Wall time: 4.06 s
{'result': 14}
An example of parallel execution
Time with parallel execution (1 sec faster because node1 and node2 run in parallel)
%%time
execute_graph(
"results/workflow1.json",
inputs=[{"name": "delay", "value": 1, "all": True}],
engine="ppf",
)
CPU times: user 40.1 ms, sys: 37.3 ms, total: 77.5 ms Wall time: 3.12 s
{'result': 14}
✏️ Exercise: execute a workflow with different engines¶
From Python
from ewoks import execute_graph
execute_graph(
"results/workflow1.json",
engine="ppf",
inputs=[{"name": "delay", "value": 1, "all": True}],
pool_type='thread' # Only required for Windows
)
From the command line
ewoks execute results/workflow1.json --engine=ppf -p delay=1 \
--inputs=all [-o pool_type=thread] # For Windows
See execution engine and tutorial how to execute a workflow.
ℹ️ Windows users can time commands in a Powershell with Measure-Command {ewoks execute ...} instead of time.
Workflow execution inputs and outputs¶
Workflow inputs¶
When providing inputs to a workflow upon execution, each argument can be passed to either
- all start nodes * (default)
- all nodes
- all nodes with a specific
task_identifier - one specific node identified by the node
id(orlabel)
Nodes that receive an input they don't expect will silently ignore it.
* A start node is a node that does NOT require any upstream processing.
Provide the input a=1 to node1 and b=1 to node2
execute_graph(
workflow,
inputs=[
{"name": "a", "value": 1, "id": "node1"},
{"name": "b", "value": 1, "id": "node2"},
],
)
{'result': 9}
Provide the input a=1 to node1 and b=1 to node2
!ewoks execute results/workflow1.json --outputs=end --merge-outputs \
-p node1:a=1 -p node2:b=1 # noqa E999
################################### # Execute workflow 'results/workflow1.json' ###################################
RESULTS:
{'result': 9}
FINISHED
List all the available workflow parameters before execution
!ewoks show results/workflow1.json -p node2:b=999
Workflow: results/workflow1.json Id: workflow1 Description: workflow1 ╒════════╤════════════════╤═══════════════════╤═══════╕ │ Name │ Value │ Task identifier │ Id │ ╞════════╪════════════════╪═══════════════════╪═══════╡ │ a │ 2 │ SumTask │ node1 │ ├────────┼────────────────┼───────────────────┼───────┤ │ b │ 3 │ SumTask │ node1 │ ├────────┼────────────────┼───────────────────┼───────┤ │ delay │ <MISSING_DATA> │ SumTask │ node1 │ ├────────┼────────────────┼───────────────────┼───────┤ │ a │ 4 │ SumTask │ node2 │ ├────────┼────────────────┼───────────────────┼───────┤ │ b │ 999 │ SumTask │ node2 │ ├────────┼────────────────┼───────────────────┼───────┤ │ delay │ <MISSING_DATA> │ SumTask │ node2 │ ├────────┼────────────────┼───────────────────┼───────┤ │ delay │ <MISSING_DATA> │ SumTask │ node3 │ ├────────┼────────────────┼───────────────────┼───────┤ │ b │ <MISSING_DATA> │ SumTask │ node4 │ ├────────┼────────────────┼───────────────────┼───────┤ │ delay │ <MISSING_DATA> │ SumTask │ node4 │ ╘════════╧════════════════╧═══════════════════╧═══════╛
Note that node3 can receive three types of inputs:
- data mapping (defined in links'
data_mappingin workflow1.json) - execution parameters (defined at the start of the execution)
- default inputs (defined in nodes'
default_inputsin workflow1.json)
Priority:
Data mapping > Execution parameters > Default inputs
Ex: The default value of a in node 3 is overwritten by the value a given when executing the workflow, which is overwritten by the result of the node 1.
Workflow outputs¶
The output of the workflow is the combination of the outputs of either
- all end nodes (default in python)
- all nodes
- all nodes with a specific
task_identifier - one or more specific nodes identified by their node
id(orlabel) - none of the nodes, i.e. no output at all (default for the CLI)
You can select only specific output variables of a node or all of them (default).
Node outputs can be merged (default in python) or kept separately (default for the CLI).
For more information and to know with parameter to use see workflow inputs and outputs doc
In Python, the output of a workflow is the merged output of all end nodes (in this case there is only one end node with one output variable called result)
execute_graph(workflow)
{'result': 9}
On the command line, nothing is returned by default. So to obtain the same as in Python
!ewoks execute results/workflow1.json --outputs=end --merge-outputs
################################### # Execute workflow 'results/workflow1.json' ###################################
RESULTS:
{'result': 14}
FINISHED
Return the output of all end nodes separately
execute_graph(workflow, merge_outputs=False)
{'node4': {'result': 9}}
!ewoks execute results/workflow1.json --outputs=end
###################################
# Execute workflow 'results/workflow1.json'
###################################
RESULTS:
{'node4': {'result': 14}}
FINISHED
Output can be fully customized: for example result from node2 and node4
execute_graph(
workflow,
outputs=[
{"name": "result", "id": "node2"},
{"name": "result", "id": "node4"},
],
merge_outputs=False,
)
{'node2': {'result': 5}, 'node4': {'result': 9}}
Return the outputs from all nodes
execute_graph(workflow, outputs=[{"all": True}], merge_outputs=False)
{'node1': {'result': 4},
'node2': {'result': 5},
'node3': {'result': 9},
'node4': {'result': 9}}
Return the outputs from all nodes
!ewoks execute results/workflow1.json --outputs=all
###################################
# Execute workflow 'results/workflow1.json'
###################################
RESULTS:
{'node1': {'result': 5},
'node2': {'result': 9},
'node3': {'result': 14},
'node4': {'result': 14}}
FINISHED
✏️ Exercise: execute a workflow with inputs/outputs¶
In Python
execute_graph(
workflow,
inputs=[
{"name": "a", "value": 2, "id": "node1"},
{"name": "a", "value": 3, "id": "node2"},
],
outputs=[
{"name": "result", "id": "node1"},
{"name": "result", "id": "node4"},
],
merge_outputs=False,
)
On the command line
ewoks execute ... -p node1:a=2 -p node2:a=3 --outputs=all
Documentation: https://ewokscore.readthedocs.io/en/stable/howtoguides/execute_io.html
Workflow GUI¶
Install ewoksweb: the Ewoks workflow web editor. It is a full-stack app made of a web front-end and of a REST server (ewoksserver).
pip install ewoksweb
Start the app (frontend+server), create a workflow and save it
ewoksweb --port 5174 # The default port is 8000
RESOURCE DIRECTORY:
/path/to/resource/directory
EWOKS JOB SCHEDULING
Local workflow execution
EWOKS EXECUTION:
{...}
Uvicorn running on http://127.0.0.1:5174 (Press CTRL+C to quit)
RESOURCE DIRECTORY is where all workflows will be saved. EWOKS EXECUTION specifies where execution events are saved.
✏️ Exercise: use the web GUI¶
- create a workflow
- save a workflow
- download/upload a workflow
- execute a workflow
pip install ewoksweb
ewoksweb --port 5174
Part 2: Ewoks Tasks¶
- Create workflow tasks
- Use new tasks in the GUI
- Use third-party task libraries
- Install workflow dependencies
We will create two tasks in order to define a workflow like this
Create workflow tasks¶
%%writefile results/tasks.py
from ewokscore import Task
class Add(
Task,
input_names=["a"],
optional_input_names=["b"],
output_names=["sum"],
):
def run(self):
if self.missing_inputs.b:
self.outputs.sum = self.inputs.a
else:
self.outputs.sum = self.inputs.a + self.inputs.b
Overwriting results/tasks.py
Task attributes used in the run method:
inputs: object with inputs as fieldsoutputs: object with outputs as fieldsmissing_inputs: object with missing inputs as fields
%%writefile -a results/tasks.py
import numpy
class Linspace1(
Task,
optional_input_names=["start", "stop", "num"],
output_names=["numbers"],
):
def run(self):
start = self.get_input_value("start", 0)
stop = self.get_input_value("stop", 1)
num = self.get_input_value("num", 1)
self.outputs.numbers = numpy.linspace(start, stop, num)
Appending to results/tasks.py
Task attributes used in the run method:
get_input_value: get a input by its name. Optionally specify a default valueoutputs: object with outputs as fields
%%writefile -a results/tasks.py
class Linspace2(
Task,
optional_input_names=["start", "stop", "num"],
output_names=["numbers"],
):
"""Same as Linspace1, alternative implementation"""
def run(self):
inputs = self.get_input_values()
inputs.setdefault("start", 0)
inputs.setdefault("stop", 1)
inputs.setdefault("num", 1)
self.outputs.numbers = numpy.linspace(**inputs)
Appending to results/tasks.py
Task attributes used in the run method:
get_input_values: returns a dictionary with input names as keysoutputs: object with outputs as fields
node1 = {
"id": "node1",
"task_type": "class",
"task_identifier": "results.tasks.Add",
}
node2 = {
"id": "node2",
"task_type": "class",
"task_identifier": "results.tasks.Add",
}
node3 = {
"id": "node3",
"task_type": "class",
"task_identifier": "results.tasks.Add",
}
node4 = {
"id": "node4",
"task_type": "class",
"task_identifier": "results.tasks.Linspace2",
}
Resolving task identifiers¶
task_identifier works like a Python import path for task_type="class" or task_type="method".
node1 = {
"id": "node1",
"task_type": "class",
"task_identifier": "results.tasks.Add",
}
- Ewoks will attempt to import the
Addclass from theresults.tasksmodule. - If
resultsis not installed via pip, Ewoks will look for the file./results/tasks.pyin the current working directory.
link1 = {"source": "node1", "target": "node3"}
link2 = {"source": "node2", "target": "node3"}
link3 = {"source": "node3", "target": "node4"}
link1["data_mapping"] = [
{"source_output": "sum", "target_input": "a"}
]
link2["data_mapping"] = [
{"source_output": "sum", "target_input": "b"}
]
link3["data_mapping"] = [
{"source_output": "sum", "target_input": "num"}
]
workflow = {
"graph": {"id": "workflow2"},
"nodes": [node1, node2, node3, node4],
"links": [link1, link2, link3],
}
Save with the convert_graph method
convert_graph(
workflow,
"results/workflow2.json",
inputs=[{"name": "a", "value": 1}, {"name": "b", "value": 1}],
)
'results/workflow2.json'
Note that both convert_graph and execute_graph take the inputs parameter.
The inputs provided to convert_graph override the default_inputs before saving the workflow.
The inputs provided to execute_graph override the default_inputs after loading the workflow.
Execute in Python
execute_graph("results/workflow2.json")
{'numbers': array([0. , 0.33333333, 0.66666667, 1. ])}
Execute from the command line
!ewoks execute results/workflow2.json --outputs=all --merge-outputs
###################################
# Execute workflow 'results/workflow2.json'
###################################
RESULTS:
{'numbers': array([0. , 0.33333333, 0.66666667, 1. ]), 'sum': 4}
FINISHED
✏️ Exercise: create ewoks tasks and use them in a workflow¶
from ewokscore import Task
class Add(
Task,
input_names=["a"],
optional_input_names=["b"],
output_names=["sum"],
):
def run(self):
if self.missing_inputs.b:
self.outputs.sum = self.inputs.a
else:
self.outputs.sum = self.inputs.a + self.inputs.b
Hello world: https://ewoks.readthedocs.io/en/stable/tutorials/hello_world.html
Documentation: https://ewokscore.readthedocs.io/en/stable/reference/specs.html#task-implementation
Use new tasks in the GUI¶
Create a workflow with these task classes in the Ewoksweb application (discover tasks from the python module results.tasks)
ewoksweb --port 5174
To open a workflow in the Ewoks desktop application
pip install "ewoks[orange]" pyqt5
ewoks execute results/workflow2.json --engine=orange
You need to create an Orange3 add-on project with widgets for all tasks in this workflow in order to create the workflow from scratch in the Ewoks desktop application and save it in OWS format.
✏️ Exercise: use new tasks in a GUI¶
Web GUI (perform task discovery first)
ewoksweb --port 5174
Desktop GUI (only load, not create)
ewoks execute results/workflow2.json --engine=orange
⚠️ On Windows, running pip install ewoks[orange] may raise an error because the installation paths are too long.
If so, you can enable long paths at Edit group policy > Computer Configuration > Administrative Templates > System > Filesystem > Enable Win32 long paths in your Windows system.
Use third-party task libraries¶
Find task libraries¶
To find existing task libraries: https://ewoks.esrf.fr
For example a library with tasks for image registration
pip install ewoksndreg
Use task libraries¶
To use the new tasks in the web GUI, they need to be discovered by the server (in this case from module name ewoksndreg.tasks). Tasks from installed projects (as opposed to local files) are discovered automatically when starting the server:
ewoksweb --port 5174
The desktop GUI also re-discovers all tasks when starting the application
ewoks-canvas
✏️ Exercise: create an image alignment workflow in Orange and load it in the web app or vice versa¶
Install the task library and create an OWS file
pip install ewoksndreg
ewoks-canvas
Convert to JSON format
ewoks convert ...
Discover ewoksndreg.tasks (from the interface or restart the application) and load the JSON file
ewoksweb --port 5174
Install workflow dependencies¶
Workflows are data provenance documents that record:
- The tasks to be executed.
- The parameters used for each task.
- The libraries required to run the workflow (list created by ewoks convert).
To install the dependencies of a workflow (uses pip under the hood):
pip install ewoks
ewoks install results/workflows1.json
⚠️ For the workflow to be fully self-contained, the tasks (e.g. Python functions or Ewoks task classes) must be included in the dependency packages.
Part 3: Ewoks Integration¶
- Remote execution
- Slurm execution
- Online data processing in Bliss
Client environment (use either sql or redis)
pip install "ewoksjob[sql,redis]"
Worker environment (use either sql or redis)
pip install "ewoksjob[sql,redis,worker]"
Monitoring environment (does not work with sql message broker)
pip install "ewoksjob[redis,monitor]"
Documentation: https://ewoksjob.readthedocs.io
Celery configuration¶
Both client and worker need this information to communicate with the message broker and the result backend
%%writefile results/celeryconfig_sql.py
broker_url = "sqla+sqlite:///celery.db"
result_backend = "db+sqlite:///celery_results.db"
result_serializer = "pickle"
accept_content = [
"application/json",
"application/x-python-serialize",
]
result_expires = 600
task_remote_tracebacks = True
broker_connection_retry_on_startup = True
enable_utc = False
Overwriting results/celeryconfig_sql.py
If you want monitoring to work, install Redis
apt install redis-server
conda install redis-server
and start a server in a separate terminal
redis-server
The first fields of the Celery configuration become
broker_url = "redis://localhost:6379/3"
result_backend = "redis://localhost:6379/4"
Alternatively use RabbitMQ (Sql or Redis for the results)
apt install rabbitmq-server
conda install rabbitmq-server
and start a server in a separate terminal
rabbitmq-server
The first fields of the Celery configuration become
broker_url = "amqp://localhost:5672//"
result_backend = "db+sqlite:///celery_results.db"
Start and test a worker¶
Start a worker in the worker environment
ewoksjob --config=results.celeryconfig_sql worker
You should get
-------------- celery@lindenolf v5.3.0b1 (dawn-chorus)
--- ***** -----
-- ******* ---- Linux-5.4.0-144-generic-x86_64 ...
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: ewoks:0x7ff87d66a700
- ** ---------- .> transport: sqla+sqlite:///celery.db
- ** ---------- .> results: sqlite:///celery_results.db
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
Warning: Windows users should use ewoksjob ... worker --pool=threads (prefork is not supported)
With Redis¶
Start the worker in the worker environment
ewoksjob --config=results.celeryconfig_redis worker
Start the monitor in the monitoring environment
export FLOWER_UNAUTHENTICATED_API="true"
ewoksjob --config=results.celeryconfig_redis monitor
[I 230320 11:44:13 command:162] Visit me at http://localhost:5555
[I 230320 13:17:17 command:170] Broker: redis://localhost:6379/3
With RabbitMQ¶
Start the worker in the worker environment
ewoksjob --config=results.celeryconfig_rabbitmq worker
Start the monitor in the monitoring environment
export FLOWER_UNAUTHENTICATED_API="true"
ewoksjob --config=results.celeryconfig_rabbitmq monitor \
--broker_api=http://localhost:15672/api/
[I 250831 15:53:51 command:168] Visit me at http://0.0.0.0:5555
[I 250831 15:53:51 command:176] Broker: amqp://guest:**@localhost:5672//
Submit a test workflow in the client environment.¶
Define the environment variable
export EWOKS_CONFIG_URI=results.celeryconfig_sql # Linux / macOS (bash/zsh)
set EWOKS_CONFIG_URI=results.celeryconfig_sql # Windows (Command Prompt)
$env:EWOKS_CONFIG_URI="results.celeryconfig_sql" # Windows (PowerShell)
Replacing sql by redis or rabbitmq if you use those.
The value "results.celeryconfig_sql" is resolved like task_identifier in the workflow node definition, in this case the file ./results/celeryconfig_sql.py.
Then in Python,
from ewoksjob.client import submit_test
result = submit_test().result()
✏️ Exercise: setup and test remote execution¶
Documentation: https://ewoksjob.readthedocs.io/
Environment variable for client and worker
export EWOKS_CONFIG_URI=results.celeryconfig_sql # Linux / macOS (bash/zsh)
set EWOKS_CONFIG_URI=results.celeryconfig_sql # Windows (Command Prompt)
$env:EWOKS_CONFIG_URI="results.celeryconfig_sql" # Windows (PowerShell)
Worker
ewoksjob worker # Linux and macOS
ewoksjob worker --pool=threads # Windows
Client
from ewoksjob.client import submit_test
submit_test().result()
Submit a workflow from Python¶
This is a local execution (blocking call)
from ewoks import execute_graph
result = execute_graph(workflow, outputs=[{"all": True}], merge_outputs=False)
This is the equivalent for a remote execution (non-blocking call)
from ewoksjob.client import submit
future = submit(args=(workflow,), kwargs={'outputs': [{'all': True}], 'merge_outputs': False})
result = future.result(timeout=None)
args and kwargs for execute_graph must be passed as args=tuple(...) and kwargs=dict(...) to submit.
Submit a workflow from the command line¶
Submit and wait for the results
ewoksjob submit results/workflow2.json --outputs=end --wait inf
⚠️ Loads the workflow locally by default. Use --load-remote to let the worker read the JSON file.
Submit without waiting
ewoksjob submit results/workflow2.json --outputs=end
Workflow 'results/workflow2.json' submitted (ID: d3d20d06-20da-4861-8ddf-6cf70c0bab0b)
Retrieve the result from the job ID in Python
from ewoksjob.client import get_future
future = get_future('d3d20d06-20da-4861-8ddf-6cf70c0bab0b')
print(future.result())
{'node4': {'numbers': array([0. , 0.33333333, 0.66666667, 1. ])}}
✏️ Exercise: submit a workflow remotely¶
Documentation: https://ewoksjob.readthedocs.io/
export EWOKS_CONFIG_URI=results.celeryconfig_sql # Linux / macOS (bash/zsh)
set EWOKS_CONFIG_URI=results.celeryconfig_sql # Windows (Command Prompt)
$env:EWOKS_CONFIG_URI="results.celeryconfig_sql" # Windows (PowerShell)
From Python
from ewoksjob.client import submit
future = submit(args=args, kwargs=kwargs)
result = future.result(timeout=None)
From the command line
ewoksjob submit results/workflow2.json --wait inf
Slurm execution¶
Supported through ewoksjob with pyslurmutils as a way to start Slurm jobs and receive results.
Requires an extra dependency in the worker environment
pip install "ewoksjob[sql,redis,worker,slurm]"
Create a Slurm access token¶
ssh myname@cluster-access
scontrol token lifespan=86400
SLURM_JWT=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2Nzk1NjU5ODUsImlhdCI6MTY3OTU2NTk4NCwic3VuIjoiZGVub2xmIn0.s9uGkTPg9xNO21LG5a10fdqvfQfnfVZ4ad58bVYncrg
Start a Slurm worker¶
You can start a Slurm worker on any computer on the intranet
ewoksjob --config=results.celeryconfig_sql worker --pool=slurm \
--slurm-url=http://... \
--slurm-user=myname \
--slurm-token=eyJhbGciOiJIUzI1NiIs...
ewoksjob uses the pyslurmutils library for the communication with Slurm (REST API).
Environment variables can be used instead of command line arguments
export EWOKS_CONFIG_URI=results.celeryconfig_sql
export SLURM_URL=https://...
export SLURM_USER=myname
export SLURM_TOKEN=eyJhbGciOiJIUzI1NiIs...
ewoksjob worker --pool=slurm
On Slurm or Visa (https://visa.esrf.fr/) you can activate an ewoks client or worker environment with
module load ewoks
This environment also provides a shell function to request a Slurm token
slurm-token-env
Slurm job parameters can be provided as worker arguments with -sp
ewoksjob worker --pool=slurm \
--slurm-pre-script="module load ewoks" \
-sp time_limit="04:00:00" \
-sp current_working_directory=/home/esrf/${USER}/ewoksfordevs
pyslurmutils documentation:
https://pyslurmutils.readthedocs.io/en/stable/tutorials/parameters.html
Slurm REST documentation:
https://slurm.schedmd.com/rest_api.html#v0.0.43_job_desc_msg
Submit a workflow to Slurm¶
From Python
from ewoksjob.client import submit
# Optionally override the worker's slurm parameters set with `-sp`
wd = "/home/esrf/myname/ewoksfordevs"
kwargs["slurm_arguments"] = {
"parameters": {
"time_limit": "04:00:00",
"current_working_directory": wd,
},
"pre_script": "module load ewoks",
}
# If the workflow is defined on the client side
# send it as a python dictionary
future = submit(args=("results/workflow1.json",), kwargs=kwargs)
result = future.result(timeout=None)
The path results/workflow1.json must exist in the current_working_directory on Slurm.
From the command line
ewoksjob submit results/workflow1.json --wait inf
The path results/workflow1.json must exist on the client side and is sent to Slurm.
All ewoks tasks of workflow1 must be available in the loaded Python environment on Slurm loaded by the --slurm-pre-script command.
The pyslurmutils library comes with a CLI. It is possible for example to monitor SLURM jobs:
pyslurmutils status -m 1 --url=... --user=... --token=...
The environment variables SLURM_URL, SLURM_USER and SLURM_TOKEN can also be used here instead of the command line arguments.
Documentation: https://pyslurmutils.readthedocs.io
✏️ Exercise: submit a workflow to Slurm¶
Documentation: https://ewoksjob.readthedocs.io/en/stable/howtoguides/slurm.html
Environment variable for client and worker
export EWOKS_CONFIG_URI=results.celeryconfig_sql
Environment variable for the worker
export SLURM_URL=https://...
export SLURM_USER=myname
export SLURM_TOKEN=eyJhbGciOiJIUzI1NiIs...
Start the worker (needs EWOKS_CONFIG_URI and slurm variables)
ewoksjob worker --pool=slurm \
--slurm-pre-script="module load ewoks" \
--slurm-log-directory=/home/esrf/${USER}/slurm/ \
-sp current_working_directory=/home/esrf/${USER}/ewoksfordevs \
-sp time_limit="00:10:00"
Worker parameters:
--slurm-pre-script: python environment with ewoks and task dependencies--slurm-log-directory: in case you want slurm logs during execution (cleaned up afterwards)-sp current_working_directory: to find workflow files and local tasks-sp time_limit: job wall time
Submit workflow from the client (needs EWOKS_CONFIG_URI)
ewoksjob submit results/workflow1.json --wait inf
Optionally, monitor slurm jobs (needs slurm variables)
pyslurmutils status -m 1
Celery configuration¶
We could directly specify the URI to the Celery configuration
export EWOKS_CONFIG_URI=beacon://<host>:<port>/ewoks/config.yml
But, it is enough the specify the URL of the Beacon host
export BEACON_HOST=<host>:<port>
⚠️ Warning: make sure EWOKS_CONFIG_URI is not defined because it has priority over BEACON_HOST.
Workflow for online and offline processing¶
We will create a workflow that can run both online (during the scan) and offline (after the experiment).
Data processing: azimuthal integration of X-ray Powder Diffraction data (XRPD).
The tutorial resources provide a simplified integration task
results.xrpd.PyFaiIntegrate
and a workflow
results/workflow3.json
which takes the URL to a Bliss scan and saves the integrated diffraction patterns in the official PROCESSED_DATA directory of the proposal.
A proper implementation of diffraction tasks can be found in ewoksxrpd.
Defining an ewoks task that works during and after the scan requires taking care of some aspects
- diffraction patterns need to be fetched during the acquisition (not simply reading from a file)
- when saving results in HDF5, it can happen that multiple workflows try to write to the same file at the same time
- when saving results in HDF5 chunking, compression and write optimization is desirable
- where do I save results?
Here we highlight the different solutions used in results.xrpd.PyFaiIntegrate
Read Bliss data during or after the scan with blissdata.¶
from blissdata.h5api import dynamic_hdf5
with dynamic_hdf5.File("/path/to/file.h5") as f:
...
The goal of the blissdata.h5api is to read the data as if you read it from a static HDF5 file with h5py.
https://bliss.gitlab-pages.esrf.fr/bliss/master/blissdata/h5py_like_api.html
Deal with concurrent writing from parallel workflows¶
silx provides a utility that yields an HDF5 group when it becomes available. This effectively serializes concurrent writing from different workflows.
from silx.io import h5py_utils
with h5py_utils.open_item("/path/to/result.h5", "/", mode="a") as f:
...
Optimize saving in HDF5¶
ewoksdata provides a class which takes care of HDF5 write optimization (chunking size, compression, write aligned with the chunks).
from ewoksdata.data.hdf5.dataset_writer import DatasetWriter
with DatasetWriter(group, "intensity") as writer:
for data in ...:
writer.add_point(data)
Build the saving path in a robust way¶
There is an official location to save results from online data processing:
/data/visitor/ch6562/id31/20230309/PROCESSED_DATA
Never compose this path directly as it might change in the future. Derive it from a dataset filename with esrf-pathlib:
from esrf_pathlib import ESRFPath
raw_filename = ESRFPath(dataset_filename)
processed_filename = raw_filename.processed_dataset_file
✏️ Exercise: process a live XRPD scan¶
Start a Bliss demo environment and shell on https://visa.esrf.fr/ (make sure the machine has at least 16GB of memory)
Create a worker environment on VISA
python3 -m venv worker_env
source worker_env/bin/activate
pip install "ewoksjob[blissworker]" pyfai blissdata \
esrf-pathlib ewoksdata silx
Start a worker on VISA (make sure it can import results.xrpd.PyFaiIntegrate)
export BEACON_HOST="localhost:10001"
ewoksjob worker -Q xrpd -n xrpd@$(hostname)
The -Q (for queues) option will allow clients to submit workflow to this specific worker. The -n option is a unique identifier in case there are several workers running on the same host.
Create an ewoks client environment on VISA
python3 -m venv client_env
source client_env/bin/activate
pip install "ewoksjob[beacon,redis]"
Start the processing before, during or after starting the scan
export BEACON_HOST="localhost:10001"
ewoksjob submit results/workflow3.json \
-c queue=xrpd --wait inf --outputs=all \
-p filename=/tmp/.../sample_0001.h5 \
-p 'scan="1.1"'
Start the scan
DEMO [1] loopscan(1000, 0.1, difflab6)
Part 4: Final remarks¶
Ewoks features not covered¶
Conditional links¶
For example: self-triggering workflow with a global error handler
Not supported by the desktop GUI, web GUI only.
https://ewokscore.readthedocs.io/en/stable/reference/specs.html
Sub-Workflows as nodes in other workflows¶
Workflow as a node in another workflow
Links to/from workflow nodes have an additional sub_target/sub_source argument.
Not supported by the desktop GUI, web GUI only.
https://ewokscore.readthedocs.io/en/stable/reference/specs.html
Desktop GUI¶
To provide support for the Ewoks desktop GUI (ewoks-canvas) you need to create an Orange add-on (which is a Python project) in which you define a Qt widget for every Ewoks task class.
Documentation: https://ewoksorange.readthedocs.io
When persistence is enabled, the red tasks are not executed again because their inputs did not change.
Persistence mechanism not solidified yet (current HDF5 or JSON files only).
In production at one beamline: XRPD at ID31 for STREAMLINE.
Ewoks events¶
Ewoks events are implemented with Python's logging facility.
ewoks execute results/workflow2.json -l info
Event handlers can be created and registered.
Event content: https://ewoksutils.readthedocs.io
For example: node end events contain persistence URI's (if any).
Example: https://ewoks.readthedocs.io/en/stable/howtoguides/ewoks_events.html
Celery worker events¶
Celery events on a higher level than Ewoks events.
Documentation: https://docs.celeryq.dev/en/stable/userguide/monitoring.html
Data portal integration¶
from ewoks import execute_graph
result = execute_graph(..., upload_parameters=...)
Uses IcatClient from pyicat-plus.
Documentation: https://pyicat-plus.readthedocs.io
Data provenance¶
Save the workflow with the results.
from ewoks import execute_graph
execute_graph(..., convert_destination="/path/to/file.json")
A more systematic way is currently missing. Perhaps it could be done in conjunction with task persistence or ewoks events?
Ewoks macros in Bliss¶
Bliss support user macros: user defined functions executed in the Bliss terminal (typically automates Bliss scans)
https://bliss.gitlab-pages.esrf.fr/bliss/master/bliss_user_sequences.html#load-script
Blissoda supports Ewoks macros: user defined functions executed by an Ewoks worker but called in Bliss like any other user macro
https://blissoda.readthedocs.io/en/stable/tutorials/ewoks_macros.html
Deploy software on Slurm¶
Automated with gitlab: https://gitlab.esrf.fr/apptainer
Make sure the environment has Ewoks installed with all Ewoks task libraries you need.
Documentation: https://apptainer.gitlab-pages.esrf.fr/admin/templates/
Questions ?