Python API¶
Simple examples¶
from myqueue.task import task
task('./script.py', tmax='2d', cores=24).submit()
The helper function myqueue.task.task()
makes it easy to create
myqueue.task.Task
objects. The myqueue.submit()
function
can be used if you want to submit several tasks at the same time:
from myqueue import submit
from myqueue.task import task
tasks = [task('mymodule@func', args=[arg], tmax='2d', cores=24)
for arg in [42, 117, 999]]
submit(*tasks)
Advanced example¶
from myqueue.commands import PythonModule
from myqueue.resources import Resources
from myqueue.task import Task
task = Task(PythonModule('module', []),
Resources(cores=8, tmax=3600))
task.submit()
API¶
Tasks¶
- myqueue.task.task(cmd, args=[], *, resources='', workflow=False, name='', deps='', cores=0, nodename='', processes=0, tmax='', weight=-1.0, folder='', restart=0, creates=[])¶
Create a Task object.
task = task('abc.py')
- Parameters:
cmd (str) – Command to be run.
args (list of str) – Command-line arguments or function arguments.
resources (str) –
Resources:
'cores[:nodename][:processes]:tmax'
Examples: ‘48:1d’, ‘32:1h’, ‘8:xeon8:1:30m’. Can not be used togeter with any of “cores”, “nodename”, “processes” and “tmax”.
name (str) – Name to use for task. Default is <cmd>[+<arg1>[_<arg2>[_<arg3>]…]].
deps (str, list of str, Task object or list of Task objects) – Dependencies. Examples: “task1,task2”, “[‘task1’, ‘task2’]”.
cores (int) – Number of cores (default is 1).
nodename (str) – Name of node.
processes (int) – Number of processes to start (default is one for each core).
tmax (str) – Maximum time for task. Examples: “40s”, “30m”, “20h” and “2d”.
workflow (bool) – Task is part of a workflow.
folder (str) – Folder where task should run (default is current folder).
restart (int) – How many times to restart task.
weight (float) – Weight of task. See Task weight.
creates (list of str) – Name of files created by task (can be both full filenames or patterns matching filenames).
- Returns:
Object representing the task.
- Return type:
- class myqueue.task.Task(cmd, *, resources, deps, restart, workflow, folder, creates, notifications='', state=State.undefined, id=0, error='', tqueued=0.0, trunning=0.0, tstop=0.0, user='')[source]¶
Task object.
- Parameters:
cmd (
myqueue.commands.Command
) – Command to be run.resources (Resources) – Combination of number of cores, nodename, number of processes and maximum time.
deps (list of Path objects) – Dependencies.
restart (int) – How many times to restart task.
workflow (bool) – Task is part of a workflow.
folder (Path) – Folder where task should run.
notifications (str) –
state (State) –
id (int) –
error (str) –
tqueued (float) –
trunning (float) –
tstop (float) –
user (str) –
Commands¶
States¶
- class myqueue.queue.State(value)[source]¶
Task-state enum.
The following 9 states are defined:
>>> for state in State: ... state <State.undefined: 'u'> <State.queued: 'q'> <State.hold: 'h'> <State.running: 'r'> <State.done: 'd'> <State.FAILED: 'F'> <State.TIMEOUT: 'T'> <State.MEMORY: 'M'> <State.CANCELED: 'C'>
>>> State.queued == State.queued True >>> State.queued == 'queued' True >>> State.queued == 'queue' Traceback (most recent call last): ... TypeError: Unknown state: queue >>> State.queued == 117 False >>> State.done in {'queued', 'running'} False
- is_active()[source]¶
Return true for queued, hold, running.
>>> State.hold.is_active() True
- Return type:
- is_bad()[source]¶
Return true for FAILED, TIMEOUT, MEMORY and CANCELED.
>>> State.running.is_bad() False
- Return type:
- static str2states(s)[source]¶
Convert single character state string to set of State objects.
>>> names = [state.name for state in State.str2states('rdA')] >>> sorted(names) ['CANCELED', 'FAILED', 'MEMORY', 'TIMEOUT', 'done', 'running'] >>> State.str2states('x') Traceback (most recent call last): ... ValueError: Unknown state: x. Must be one of q, ..., a or A.
Resources¶
- class myqueue.resources.Resources(cores=0, nodename='', processes=0, tmax=0, weight=-1.0)[source]¶
Resource description.
- bigger(state, nodelist, maxtmax=172800)[source]¶
Create new Resource object with larger tmax or more cores.
>>> nodes = [('node1', {'cores': 8})] >>> r = Resources(tmax=100, cores=8) >>> r.bigger(State.TIMEOUT, nodes) Resources(cores=8, tmax=200) >>> r.bigger(State.MEMORY, nodes) Resources(cores=16, tmax=100)
- static from_string(s)[source]¶
Create Resource object from string.
>>> r = Resources.from_string('16:1:xeon8:2h') >>> r Resources(cores=16, processes=1, tmax=7200, nodename='xeon8') >>> print(r) 16:1:xeon8:2h >>> Resources.from_string('16:1m') Resources(cores=16, tmax=60) >>> r = Resources.from_string('16:1m:25') >>> r Resources(cores=16, tmax=60, weight=25.0) >>> print(r) 16:1m:25
- select(nodelist)[source]¶
Select appropriate node.
>>> nodes = [('node1', {'cores': 16}), ... ('node2', {'cores': 8}), ... ('fatnode2', {'cores': 8})] >>> Resources(cores=24).select(nodes) (3, 'node2', {'cores': 8}) >>> Resources(cores=32).select(nodes) (2, 'node1', {'cores': 16}) >>> Resources(cores=32, nodename='fatnode2').select(nodes) (4, 'fatnode2', {'cores': 8}) >>> Resources(cores=1).select(nodes) (1, 'node2', {'cores': 8}) >>> Resources(cores=32, nodename='node3').select(nodes) Traceback (most recent call last): ... ValueError: No such node: node3
Queue¶
- class myqueue.queue.Queue(config=None, *, need_lock=True, dry_run=False)[source]¶
Object for interacting with your .myqueue/queue.sqlite3 file
- property connection: Connection¶
Get or create a connection object.
- property scheduler: Scheduler¶
Scheduler object.