Usage¶
This module allows you to easily push your data through one or several functions, in a specified order, while keeping a persistent state of the data along the way which allows the execution to be stopped and resumed at a later time.
Create callbacks¶
The functions in the workflow are called callbacks (or tasks). Each callback must at least allow two arguments:
def halt_if_higher_than_20(obj, eng):
"""Check if current data is more than than 20."""
if obj.data > 20:
eng.halt("Data higher than 20.")
- obj (WorkflowObject)
is the current object being worked on
obj adds extra functionality by wrapping around your data and provide utilities to interface with, e.g. in the Holding Pen (invenio-workflow-ui) interface.
- eng (WorkflowEngine)
is the current instance of the workflow engine
eng give you access to manipulating the workflow execution itself and to retrieve all the objects being processed.
Pass additional arguments¶
To allow arguments being passed to the task from the workflow definition, simply wrap your task in a closure:
def add_data(data_param):
"""Add data_param to the obj.data."""
def _add_data(obj, eng):
data = data_param
obj.data += data
return _add_data
It can then be called from the workflow definition as add_data(20), returning the inner function.
Create and register a workflow¶
With the callbacks ready, you can now design a workflow using them. In the same file as the callbacks add a class definition:
class MyWorkflow(object):
"""Add 20 to data and halt if higher."""
workflow = [add_data(20),
halt_if_higher_than_20]
Save it as a new file in your Invenio 3 overlay. For example at youroverlay/workflows.py.
The workflow attribute of MyWorkflow
should be a list of functions
(or even list of lists of tasks) as per the requirements of the
underlying workflows-module this library is built upon.
Next, register the workflow to be used with invenio-workflows API via
entry_points in your setup.py
inside the overlay.
entry_points={
...,
'invenio_workflows.workflows': [
'MyWorkflow = youroverlay.workflows:MyWorkflow',
],
...
}
This workflow is now referred to later as MyWorkflow.
Run a workflow¶
Finally, to run your workflow you there are mainly two use-cases:
- run it immediately in the same process, or
- delay execution asynchronously with Celery
Generally, which method you apply depends on your use case, but usually heavy workflows are better run asynchronously as they can then be queued and run in a distributed manner.
Run workflows synchronously¶
from invenio_workflows import start
eng_uuid = start("MyWorkflow", data=10)
Once the workflow completes it will return the UUID of the
WorkflowEngine
that ran it.
Your data (and much more) is contained inside a
WorkflowObject
instance that you can
get from the engine instance in the following way:
from invenio_workflow import WorkflowEngine
engine = WorkflowEngine.from_uuid(eng_uuid)
engine.objects
Finally, to get the data, simply lookup the data attribute of the
WorkflowObject
:
engine.objects[0].data # returns the new data. E.g. 30
Pass multiple data objects¶
To run several objects through the same workflow, simply pass a list of data items:
from invenio_workflows import start
eng_uuid = start("MyWorkflow", data=[5, 10])
As usual, the invenio_workflows.start
function returns the UUID
of the engine that ran the workflow. You can query this object to retrieve the
data you sent in:
from invenio_workflow import WorkflowEngine
engine = WorkflowEngine.from_uuid(eng_uuid)
len(engine.objects) # E.g. 2, since two data items was given
Moreover, to retrieve the data from the first object, you can use data as with single objects:
engine.objects[0].data # E.g. 25
Run workflows asynchronously¶
So far we have been running our workflows in the current process. However, for long running processes we might not want to wait for the workflow to finish before continuing the processing.
Since invenio-workflows
is based on Celery, we simply use the Celery
options added to the API functions. In the case of start
, the function
delay
has been added by Celery to queue the execution of the function:
from invenio_workflows import start
async_result = start.delay("MyWorkflow", data=10)
The delayed API returns a AsyncResult
class where you check the status
of the task, and if you want to wait for the task to finish you can call
the AsyncResult.get
function:
from invenio_workflow import WorkflowEngine
eng_uuid = async_result.get() # Will wait until task has completed
engine = WorkflowEngine.from_uuid()
engine.objects[0].data # E.g. 30
Warning
To use this functionality you need to make sure you are running a Celery
worker that will run the workflow in a separate process. Otherwise
AsyncResult.get
will never return.
Working with extra data¶
If you need to add some extra data to the
WorkflowObject
that is
not suitable to add to the obj.data
attribute, you can make use if the
obj.extra_data
attribute.
The extra_data attribute is basically a normal dictionary that you can fill.
However, it might contain some additional information by default. This
information is used by the WorkflowObject
to store some additional data related to the workflow execution and additional
data added by tasks.