Python API

Simple examples

from myqueue.task import task
task('./script.py', tmax='2d', cores=24).submit()

The helper function myqueue.task.task() makes it easy to create myqueue.task.Task objects. The myqueue.submit() function can be used if you want to submit several tasks at the same time:

from myqueue import submit
from myqueue.task import task
tasks = [task('mymodule@func', args=[arg], tmax='2d', cores=24)
         for arg in [42, 117, 999]]
submit(*tasks)

Advanced example

from myqueue.commands import PythonModule
from myqueue.resources import Resources
from myqueue.task import Task

task = Task(PythonModule('module', []),
            Resources(cores=8, tmax=3600))
task.submit()

API

myqueue.submit(*tasks, verbosity=1, dry_run=False)[source]

Submit tasks.

Parameters
  • tasks (List of Task objects) – Tasks to submit.

  • verbosity (int) – Must be 0, 1 or 2.

  • dry_run (bool) – Don’t actually submit the task.

Return type

None

Tasks

myqueue.task.task(cmd, args=[], *, resources='', workflow=False, name='', deps='', cores=0, nodename='', processes=0, tmax='', folder='', restart=0, diskspace=0.0, creates=[])[source]

Create a Task object.

task = task('abc.py')
Parameters
  • cmd (str) – Command to be run.

  • args (list of str) – Command-line arguments or function arguments.

  • resources (str) –

    Resources:

    'cores[:nodename][:processes]:tmax'
    

    Examples: ‘48:1d’, ‘32:1h’, ‘8:xeon8:1:30m’. Can not be used togeter with any of “cores”, “nodename”, “processes” and “tmax”.

  • name (str) – Name to use for task. Default is <cmd>[+<arg1>[_<arg2>[_<arg3>]…]].

  • deps (str, list of str, Task object or list of Task objects) – Dependencies. Examples: “task1,task2”, “[‘task1’, ‘task2’]”.

  • cores (int) – Number of cores (default is 1).

  • nodename (str) – Name of node.

  • processes (int) – Number of processes to start (default is one for each core).

  • tmax (str) – Maximum time for task. Examples: “40s”, “30m”, “20h” and “2d”.

  • workflow (bool) – Task is part of a workflow.

  • folder (str) – Folder where task should run (default is current folder).

  • restart (int) – How many times to restart task.

  • diskspace (float) – Diskspace used. See Maximum disk space.

  • creates (list of str) – Name of files created by task (can be both full filenames or patterns matching filenames).

Returns

Object representing the task.

Return type

Task

class myqueue.task.Task(cmd, resources, deps, restart, workflow, diskspace, folder, creates, notifications='', state=State.undefined, id='0', error='', memory_usage=0, tqueued=0.0, trunning=0.0, tstop=0.0)[source]

Task object.

Parameters
  • cmd (myqueue.commands.Command) – Command to be run.

  • resources (Resources) – Combination of number of cores, nodename, number of processes and maximum time.

  • deps (list of Path objects) – Dependencies.

  • restart (int) – How many times to restart task.

  • workflow (bool) – Task is part of a workflow.

  • diskspace (float) – Disk-space used. See Maximum disk space.

  • folder (Path) – Folder where task should run.

  • creates (list of str) – Name of files created by task.

  • notifications (str) –

  • state (State) –

  • id (str) –

  • error (str) –

  • memory_usage (int) –

  • tqueued (float) –

  • trunning (float) –

  • tstop (float) –

Return type

None

submit(verbosity=1, dry_run=False)[source]

Submit task.

Parameters
  • verbosity (int) – Must be 0, 1 or 2.

  • dry_run (bool) – Don’t actually submit the task.

Return type

None

Commands

myqueue.commands.create_command(cmd, args=[], type=None, name='')[source]

Create command object.

Parameters
Return type

Command

class myqueue.commands.ShellCommand(cmd, args)[source]
Parameters
class myqueue.commands.ShellScript(cmd, args)[source]
Parameters
class myqueue.commands.PythonScript(script, args)[source]
Parameters
class myqueue.commands.PythonModule(mod, args)[source]
Parameters
class myqueue.commands.PythonFunction(cmd, args)[source]
Parameters

States

class myqueue.queue.State(value)[source]

Task-state enum.

The following 9 states are defined:

>>> for state in State:
...     state
<State.undefined: 'u'>
<State.queued: 'q'>
<State.hold: 'h'>
<State.running: 'r'>
<State.done: 'd'>
<State.FAILED: 'F'>
<State.TIMEOUT: 'T'>
<State.MEMORY: 'M'>
<State.CANCELED: 'C'>
>>> State.queued == State.queued
True
>>> State.queued == 'queued'
True
>>> State.queued == 'queue'
Traceback (most recent call last):
  ...
TypeError: Unknown state: queue
>>> State.queued == 117
False
>>> State.done in {'queued', 'running'}
False
is_bad()[source]

Return true for FAILED, TIMEOUT, MEMORY and CANCELED.

>>> State.running.is_bad()
False
Return type

bool

static str2states(s)[source]

Convert single character state string to set of State objects.

>>> names = [state.name for state in State.str2states('rdA')]
>>> sorted(names)
['CANCELED', 'FAILED', 'MEMORY', 'TIMEOUT', 'done', 'running']
>>> State.str2states('x')
Traceback (most recent call last):
  ...
ValueError: Unknown state: x.  Must be one of q, ..., a or A.
Parameters

s (str) –

Return type

set[‘State’]

Resources

class myqueue.resources.Resources(cores=0, nodename='', processes=0, tmax=0)[source]

Resource description.

Parameters
  • cores (int) –

  • nodename (str) –

  • processes (int) –

  • tmax (int) –

bigger(state, nodelist, maxtmax=172800)[source]

Create new Resource object with larger tmax or more cores.

>>> nodes = [('node1', {'cores': 8})]
>>> r = Resources(tmax=100, cores=8)
>>> r.bigger(State.TIMEOUT, nodes)
Resources(cores=8, tmax=200)
>>> r.bigger(State.MEMORY, nodes)
Resources(cores=16, tmax=100)
Parameters
  • state (State) –

  • nodelist (list[Node]) –

  • maxtmax (int) –

Return type

Resources

static from_string(s)[source]

Create Resource object from string.

>>> Resources.from_string('16:1:xeon8:2h')
Resources(cores=16, processes=1, tmax=7200, nodename='xeon8')
>>> Resources.from_string('16:1m')
Resources(cores=16, tmax=60)
Parameters

s (str) –

Return type

myqueue.resources.Resources

select(nodelist)[source]

Select appropriate node.

>>> nodes = [('node1', {'cores': 16}),
...          ('node2', {'cores': 8}),
...          ('fatnode2', {'cores': 8})]
>>> Resources(cores=24).select(nodes)
(3, 'node2', {'cores': 8})
>>> Resources(cores=32).select(nodes)
(2, 'node1', {'cores': 16})
>>> Resources(cores=32, nodename='fatnode2').select(nodes)
(4, 'fatnode2', {'cores': 8})
>>> Resources(cores=1).select(nodes)
(1, 'node2', {'cores': 8})
>>> Resources(cores=32, nodename='node3').select(nodes)
Traceback (most recent call last):
    ...
ValueError: No such node: node3
Parameters

nodelist (list[Node]) –

Return type

tuple[int, str, dict[str, Any]]

todict()[source]

Convert to dict.

Return type

dict[str, Any]

Queue

class myqueue.queue.Queue(config, verbosity=1, need_lock=True, dry_run=False)[source]

Object for interacting with the scheduler.

Parameters
  • config (Configuration) –

  • verbosity (int) –

  • need_lock (bool) –

  • dry_run (bool) –

find_depending(tasks)[source]

Generate list of tasks including dependencies.

Parameters

tasks (list[Task]) –

Return type

list[Task]

kick()[source]

Kick the system.

  • Send email notifications

  • restart timed-out tasks

  • restart out-of-memory tasks

  • release/hold tasks to stay under maximum_diskspace

Return type

dict[str, int]

list_(selection, columns, sort=None, reverse=False, short=False, use_log_file=False)[source]

Pretty-print list of tasks.

Parameters
  • selection (Selection) –

  • columns (str) –

  • sort (str | None) –

  • reverse (bool) –

  • short (bool) –

  • use_log_file (bool) –

Return type

list[Task]

modify(selection, newstate, email)[source]

Modify task(s).

Parameters
  • selection (Selection) –

  • newstate (State) –

  • email (set[State]) –

Return type

None

remove(selection)[source]

Remove or cancel tasks.

Parameters

selection (myqueue.selection.Selection) –

Return type

None

resubmit(selection, resources)[source]

Resubmit failed or timed-out tasks.

Parameters
  • selection (Selection) –

  • resources (Resources | None) –

Return type

None

run(tasks)[source]

Run tasks locally.

Parameters

tasks (list[Task]) –

Return type

None

property scheduler: myqueue.scheduler.Scheduler

Scheduler object.

submit(tasks, force=False, max_tasks=1000000000, read=True)[source]

Submit tasks to queue.

Parameters
Return type

None

sync()[source]

Syncronize queue with the real world.

Return type

None

Schedulers

class myqueue.slurm.SLURM(config)[source]
Parameters

config (Configuration) –

class myqueue.lsf.LSF(config)[source]
Parameters

config (Configuration) –

class myqueue.pbs.PBS(config)[source]
Parameters

config (Configuration) –