Workflows¶
The workflow subcommand combined with a Workflow script can be used to run sequences of tasks in several folders. The script describes the tasks, their requirements and dependencies.
Example from real life:
Simple example¶
We want to factor some integers into primes. We want to do two tasks: factor the integer and check if the number was a prime number.
from __future__ import annotations
import json
from pathlib import Path
def factor(x: int) -> list[int]:
for f in range(2, x // 2 + 1):
if x % f == 0:
return [f] + factor(x // f)
return [x]
if __name__ == '__main__':
x = int(Path.cwd().name) # name of current folder
factors = factor(x)
Path('factors.json').write_text(json.dumps({'factors': factors}))
import json
from pathlib import Path
dct = json.loads(Path('factors.json').read_text())
factors = dct['factors']
if len(factors) == 1:
Path('PRIME').write_text('') # create empty file
Our Workflow script will create two tasks using the
myqueue.workflow.run()
function and the myqueue.workflow.resources()
decorator.
from myqueue.workflow import run, resources
@resources(tmax='2s', cores=1)
def workflow():
with run(module='prime.factor'):
run(module='prime.check')
We put the three Python files in a prime/
folder:
$ ls -l prime/
total 12
-rw-rw-r-- 1 jensj jensj 190 Oct 28 10:46 check.py
-rw-rw-r-- 1 jensj jensj 398 Oct 28 10:46 factor.py
-rw-rw-r-- 1 jensj jensj 164 Oct 28 10:46 workflow.py
Make sure Python can find the files by adding this line:
export PYTHONPATH=~/path/to/prime/:$PYTHONPATH
to your ~/.bash_profile
file.
Create some folders:
$ mkdir numbers
$ cd numbers
$ mkdir 99 1001 8069 36791 98769 100007
and start the workflow in one of the folders:
$ mq workflow ../prime/workflow.py 1001/ --dry-run
Scanning folders: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1/1
new : 2
Submitting tasks: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2/2
1 ./1001/ prime.factor 1:2s
1 ./1001/ prime.check d1 1:2s
2 tasks to submit
$ mq workflow ../prime/workflow.py 1001/
Scanning folders: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 1/1
new : 2
Submitting tasks: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2/2
1 ./1001/ prime.factor 1:2s
2 ./1001/ prime.check d1 1:2s
2 tasks submitted
$ sleep 2
and now in all subfolders:
$ mq ls
id folder name info res. age state time
── ─────── ──────────── ──── ──── ──── ───── ────
1 ./1001/ prime.factor 1:2s 0:02 done 0:00
2 ./1001/ prime.check d1 1:2s 0:02 done 0:00
── ─────── ──────────── ──── ──── ──── ───── ────
done: 2, total: 2
$ mq workflow ../prime/workflow.py */
Scanning folders: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 6/6
new : 10
done : 2
Submitting tasks: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 10/10
3 ./100007/ prime.factor 1:2s
4 ./100007/ prime.check d1 1:2s
5 ./36791/ prime.factor 1:2s
6 ./36791/ prime.check d1 1:2s
7 ./8069/ prime.factor 1:2s
8 ./8069/ prime.check d1 1:2s
9 ./98769/ prime.factor 1:2s
10 ./98769/ prime.check d1 1:2s
11 ./99/ prime.factor 1:2s
12 ./99/ prime.check d1 1:2s
10 tasks submitted
$ sleep 2 # wait for tasks to finish
$ mq ls
id folder name info res. age state time
── ───────── ──────────── ──── ──── ──── ───── ────
1 ./1001/ prime.factor 1:2s 0:04 done 0:00
2 ./1001/ prime.check d1 1:2s 0:04 done 0:00
3 ./100007/ prime.factor 1:2s 0:02 done 0:00
4 ./100007/ prime.check d1 1:2s 0:02 done 0:00
5 ./36791/ prime.factor 1:2s 0:02 done 0:00
6 ./36791/ prime.check d1 1:2s 0:02 done 0:00
7 ./8069/ prime.factor 1:2s 0:02 done 0:00
8 ./8069/ prime.check d1 1:2s 0:02 done 0:00
9 ./98769/ prime.factor 1:2s 0:02 done 0:00
10 ./98769/ prime.check d1 1:2s 0:02 done 0:00
11 ./99/ prime.factor 1:2s 0:02 done 0:00
12 ./99/ prime.check d1 1:2s 0:02 done 0:00
── ───────── ──────────── ──── ──── ──── ───── ────
done: 12, total: 12
Note that prime.check.done
and prime.factor.done
files are created
to mark that these tasks have been completed:
$ ls -l 1001/
total 4
-rw-rw-r-- 1 jensj jensj 24 Oct 28 10:46 factors.json
-rw-rw-r-- 1 jensj jensj 0 Oct 28 10:46 prime.check.2.err
-rw-rw-r-- 1 jensj jensj 0 Oct 28 10:46 prime.check.2.out
-rw-rw-r-- 1 jensj jensj 0 Oct 28 10:46 prime.factor.1.err
-rw-rw-r-- 1 jensj jensj 0 Oct 28 10:46 prime.factor.1.out
Now, add another number:
$ mkdir 42
$ mq workflow ../prime/workflow.py */
Scanning folders: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 7/7
done : 12
new : 2
Submitting tasks: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 2/2
13 ./42/ prime.factor 1:2s
14 ./42/ prime.check d1 1:2s
2 tasks submitted
Turns out, there were two prime numbers:
$ sleep 2
$ grep factors */factors.json
100007/factors.json:{"factors": [97, 1031]}
1001/factors.json:{"factors": [7, 11, 13]}
36791/factors.json:{"factors": [36791]}
42/factors.json:{"factors": [2, 3, 7]}
8069/factors.json:{"factors": [8069]}
98769/factors.json:{"factors": [3, 11, 41, 73]}
99/factors.json:{"factors": [3, 3, 11]}
$ ls */PRIME
36791/PRIME
8069/PRIME
Handling many tasks¶
In the case where you have a workflow script with many tasks combined with
many folders, it can happen that mq workflow ... */
will try to submit
more tasks than allowed by the scheduler. In that case, you will have to
submit the tasks in batches. Say you have 300 tasks from 150 folders:
$ mq workflow ../prime/workflow.py */ --max-tasks=200
Scanning folders: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 150/150
new : 200
Submitting tasks: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 200/200
...
$ # wait ten days ...
$ mq workflow ../prime/workflow.py */ --max-tasks=200
Scanning folders: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 150/150
new : 100
done : 200
Submitting tasks: ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100/100
...
Workflow script¶
A workflow script must contain a function:
The workflow()
function should call the myqueue.workflow.run()
function for each task in the workflow. Here is an example (flow.py
):
from myqueue.workflow import run
from somewhere import postprocess
def workflow():
r1 = run(script='task1.py')
r2 = run(script='task2.py', cores=8, tmax='2h')
run(function=postprocess, deps=[r1, r2])
where task1.py
and task2.py
are Python scripts and postprocess
is
a Python function. Calling the workflow()
function directly will run
the task1.py
script, then the task2.py
script and finally the
postprocess
function. If instead, the workflow()
function is
passed to the mq workflow flow.py
command, then the run()
function will not actually run the tasks, but instead collect them with
dependencies and submit them.
Here is an alternative way to specify the dependencies of the postprocess
step (see more below):
def workflow():
r1 = run(script='task1.py')
r2 = run(script='task2.py', cores=8, tmax='2h')
with r1, r2:
run(function=postprocess)
- myqueue.workflow.run(*, function=None, script=None, module=None, shell=None, name='', args=[], kwargs={}, deps=[], creates=[], tmax=None, cores=None, nodename=None, processes=None, restart=None, weight=None, folder='.')¶
Run or submit a task.
The type of task is specifyed by one and only one of the arguments function, script, module or shell.
- Parameters:
function (Callable) – A Python function.
script (Union[Path, str]) – Shell-script or Python-script.
module (str) – Name of a Python module.
shell (str) – Shell command. Must be found in $PATH.
args (list of objects) – Command-line arguments or function arguments.
name (str) – Name to use for task. Default is name of function, script, module or shell command.
deps (list of run handle objects) – Dependencies.
creates (list of filenames) – Files created by task.
cores (int) – Number of cores (default is 1).
nodename (str) – Name of node.
processes (int) – Number of processes to start (default is one for each core).
tmax (str) – Maximum time for task. Examples: “40s”, “30m”, “20h” and “2d”.
folder (str) – Folder where task should run (default is current folder).
restart (int) – How many times to restart task.
weight (float) – Weight of task. See Task weight.
- Returns:
Object representing the run.
- Return type:
- class myqueue.workflow.RunHandle(task, runner)[source]¶
Result of calling run(). Can be used as a context manager.
- Parameters:
task (Task) –
runner (Runner) –
- property result: Result¶
Result from Python-function tasks.
Resources¶
Resources for a task are set using the keywords:
cores
, tmax
, processes
, nodename
and repeats
.
See also
Here are three equivalent ways to set the cores
resource:
def workflow():
run(..., cores=24) # as an argument to run()
def workflow():
with resources(cores=24): # via a context manager
run(...)
@resources(cores=24) # with a decorator
def workflow():
run(...)
- myqueue.workflow.resources(*, tmax=None, cores=None, nodename=None, processes=None, restart=None, weight=None)¶
Resource decorator and context manager.
- Parameters:
cores (int) – Number of cores (default is 1).
nodename (str) – Name of node.
processes (int) – Number of processes to start (default is one for each core).
tmax (str) – Maximum time for task. Examples: “40s”, “30m”, “20h” and “2d”.
restart (int) – How many times to restart task.
weight (float | None) –
- Return type:
ResourceHandler
Functions¶
A task that calls a Python function will cache its
result by writing the return value as JSON to a file. MyQueue does this
using the json_cached_function()
function:
- myqueue.caching.json_cached_function(function, name, args, kwargs)[source]¶
Add file-caching to function.
The decorated function will write its result in JSON format to a file called <name>.result.
Helper wrapper for working with functions:
- myqueue.workflow.wrap(function, **run_kwargs)¶
Wrap a function as a task.
Takes the same keyword arguments as run (except module, script and shell).
These two are equivalent:
result = run(function=func, args=args, kwargs=kwargs, ...).result result = wrap(func, ...)(*args, **kwargs)
Return values that can be written to a JSON file include everything that
the Python standard library json
module supports and in addition also
the following types:
numpy.ndarray
- class myqueue.caching.Encoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]¶
Encode complex, datetime, Path and ndarray objects.
>>> import numpy as np >>> Encoder().encode(1+2j) '{"__complex__": [1.0, 2.0]}' >>> Encoder().encode(datetime(1969, 11, 11, 0, 0)) '{"__datetime__": "1969-11-11T00:00:00"}' >>> Encoder().encode(Path('abc/123.xyz')) '{"__path__": "abc/123.xyz"}' >>> Encoder().encode(np.array([1., 2.])) '{"__ndarray__": [1.0, 2.0]}'
- myqueue.caching.object_hook(dct)[source]¶
Decode complex, datetime, Path and ndarray representations.
>>> object_hook({'__complex__': [1.0, 2.0]}) (1+2j) >>> object_hook({'__datetime__': '1969-11-11T00:00:00'}) datetime.datetime(1969, 11, 11, 0, 0) >>> object_hook({'__path__': 'abc/123.xyz'}) PosixPath('abc/123.xyz') >>> object_hook({'__ndarray__': [1.0, 2.0]}) array([1., 2.])
Dependencies¶
Suppose we have two tasks and we want <task-2>
to start after <task-1>
.
We can specify the dependency explicitly like this:
def workflow():
run1 = run(<task-1>)
run(<task-2>, deps=[run1])
or like this using a context manager:
def workflow():
with run(<task-1>):
run(<task-2>)
If our tasks are functions then MyQueue can figure out the dependencies without specifying them explicitly or using with statements. Say we have the following two functions:
def f1():
return 2 + 2
def f2(x):
print(x)
and we want to call f2
with the result of f1
. Given this
workflow script:
def workflow():
run1 = run(function=f1)
run(function=f2, args=[run1.result])
MyQueue will know that the f2
task depends on the f1
task.
Here is a shorter version using the wrap()
function:
def workflow():
x = wrap(f1)()
wrap(f2)(x)
Workflows with if-statements¶
Some workflows may take different directions depending on the result of the
first part of the workflow. Continuing with out f1
and f2
functions,
we may only want to call f2
if the result of f1
is lees than five:
def workflow():
run1 = run(function=f1)
if run1.result < 5:
run(function=f2, args=[run1.result])
MyQueue will know that run1.result < 5
can’t be decided before the first
task has been run and it will therfore only submit one task. Running mq
workflow ...
a second time after the first task has finished will submit
the second task. Here is an equivalent script using functions:
def workflow():
x = wrap(f1)()
if x < 5:
wrap(f2)(x)
The RunHandle
object also has a done
attribute
that can be used to break up the workflow:
def workflow():
run1 = run(<task-1>)
if run1.done:
something = read_result_of_task1_from file()
if ... something ...:
run(<task-2>)
Old workflow script¶
Warning
Please use a new-style Workflow script!
Old-style workflow scripts contain a function:
- create_tasks() List[myqueue.task.Task] ¶
It should return a list of myqueue.task.Task
objects created with the
myqueue.task.task()
helper function. Here is an example:
from myqueue.task import task
def create_tasks():
t1 = task('<task-1>', resources=...)
t2 = task('<task-2>', resources=...)
t3 = task('<task-3>', resources=...,
deps=['<task-1>', '<task-2>'])
return [t1, t2, t3]
where <task-n>
is the name of a task. See Examples below.
Examples¶
Two equivalent ways to set the resources:
task('prime.factor', resources='8:1h')
task('prime.factor', cores=8, tmax='1h')
Given these two tasks:
t1 = task('mod@f1')
t2 = task('mod@f2')
here are three equivalent ways to set dependencies:
t3 = task('mod@f3', deps=[t1, t2])
t3 = task('mod@f3', deps=['mod@f1', 'mod@f2'])
t3 = task('mod@f3', deps='mod@f1,mod@f2')
Arguments in three equivalent ways:
task('math@sin+3.14')
task('math@sin', args=[3.14])
task('math@sin', args=['3.14'])
More than one argument:
task('math@gcd+42_117')
task('math@gcd', args=[42, 117]')
same as:
>>> import math
>>> math.gcd(42, 117)
3