Python API

Simple examples

from myqueue.task import task
task('./script.py', tmax='2d', cores=24).submit()

The helper function myqueue.task.task() makes it easy to create myqueue.task.Task objects. The myqueue.submit() function can be used if you want to submit several tasks at the same time:

from myqueue import submit
from myqueue.task import task
tasks = [task('mymodule@func', args=[arg], tmax='2d', cores=24)
         for arg in [42, 117, 999]]
submit(*tasks)

Advanced example

from myqueue.commands import PythonModule
from myqueue.resources import Resources
from myqueue.task import Task

task = Task(PythonModule('module', []),
            Resources(cores=8, tmax=3600))
task.submit()

API

myqueue.submit(*tasks, verbosity=1, dry_run=False)[source]

Submit tasks.

Parameters:
  • tasks (List of Task objects) – Tasks to submit.

  • verbosity (int) – Must be 0, 1 or 2.

  • dry_run (bool) – Don’t actually submit the task.

Return type:

None

Tasks

myqueue.task.task(cmd, args=[], *, resources='', workflow=False, name='', deps='', cores=0, nodename='', processes=0, tmax='', weight=-1.0, folder='', restart=0, creates=[])

Create a Task object.

task = task('abc.py')
Parameters:
  • cmd (str) – Command to be run.

  • args (list of str) – Command-line arguments or function arguments.

  • resources (str) –

    Resources:

    'cores[:nodename][:processes]:tmax'
    

    Examples: ‘48:1d’, ‘32:1h’, ‘8:xeon8:1:30m’. Can not be used togeter with any of “cores”, “nodename”, “processes” and “tmax”.

  • name (str) – Name to use for task. Default is <cmd>[+<arg1>[_<arg2>[_<arg3>]…]].

  • deps (str, list of str, Task object or list of Task objects) – Dependencies. Examples: “task1,task2”, “[‘task1’, ‘task2’]”.

  • cores (int) – Number of cores (default is 1).

  • nodename (str) – Name of node.

  • processes (int) – Number of processes to start (default is one for each core).

  • tmax (str) – Maximum time for task. Examples: “40s”, “30m”, “20h” and “2d”.

  • workflow (bool) – Task is part of a workflow.

  • folder (str) – Folder where task should run (default is current folder).

  • restart (int) – How many times to restart task.

  • weight (float) – Weight of task. See Task weight.

  • creates (list of str) – Name of files created by task (can be both full filenames or patterns matching filenames).

Returns:

Object representing the task.

Return type:

Task

class myqueue.task.Task(cmd, *, resources, deps, restart, workflow, folder, creates, notifications='', state=State.undefined, id=0, error='', tqueued=0.0, trunning=0.0, tstop=0.0, user='')[source]

Task object.

Parameters:
  • cmd (myqueue.commands.Command) – Command to be run.

  • resources (Resources) – Combination of number of cores, nodename, number of processes and maximum time.

  • deps (list of Path objects) – Dependencies.

  • restart (int) – How many times to restart task.

  • workflow (bool) – Task is part of a workflow.

  • folder (Path) – Folder where task should run.

  • creates (list of str) – Name of files created by task.

  • notifications (str) –

  • state (State) –

  • id (int) –

  • error (str) –

  • tqueued (float) –

  • trunning (float) –

  • tstop (float) –

  • user (str) –

submit(verbosity=1, dry_run=False)[source]

Submit task.

Parameters:
  • verbosity (int) – Must be 0, 1 or 2.

  • dry_run (bool) – Don’t actually submit the task.

Return type:

None

Commands

myqueue.commands.create_command(cmd, args=[], type=None, name='')[source]

Create command object.

Parameters:
Return type:

Command

class myqueue.commands.ShellCommand(cmd, args)[source]
Parameters:
class myqueue.commands.ShellScript(cmd, args)[source]
Parameters:
class myqueue.commands.PythonScript(script, args)[source]
Parameters:
class myqueue.commands.PythonModule(mod, args)[source]
Parameters:
class myqueue.commands.PythonFunction(cmd, args)[source]
Parameters:

States

class myqueue.queue.State(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Task-state enum.

The following 9 states are defined:

>>> for state in State:
...     state
<State.undefined: 'u'>
<State.queued: 'q'>
<State.hold: 'h'>
<State.running: 'r'>
<State.done: 'd'>
<State.FAILED: 'F'>
<State.TIMEOUT: 'T'>
<State.MEMORY: 'M'>
<State.CANCELED: 'C'>
>>> State.queued == State.queued
True
>>> State.queued == 'queued'
True
>>> State.queued == 'queue'
Traceback (most recent call last):
  ...
TypeError: Unknown state: queue
>>> State.queued == 117
False
>>> State.done in {'queued', 'running'}
False
is_active()[source]

Return true for queued, hold, running.

>>> State.hold.is_active()
True
Return type:

bool

is_bad()[source]

Return true for FAILED, TIMEOUT, MEMORY and CANCELED.

>>> State.running.is_bad()
False
Return type:

bool

static str2states(s)[source]

Convert single character state string to set of State objects.

>>> names = [state.name for state in State.str2states('rdA')]
>>> sorted(names)
['CANCELED', 'FAILED', 'MEMORY', 'TIMEOUT', 'done', 'running']
>>> State.str2states('x')
Traceback (most recent call last):
  ...
ValueError: Unknown state: x.  Must be one of q, ..., a or A.
Parameters:

s (str) –

Return type:

set[State]

Resources

class myqueue.resources.Resources(cores=0, nodename='', processes=0, tmax=0, weight=-1.0)[source]

Resource description.

Parameters:
  • cores (int) –

  • nodename (str) –

  • processes (int) –

  • tmax (int) –

  • weight (float) –

bigger(state, nodelist, maxtmax=172800)[source]

Create new Resource object with larger tmax or more cores.

>>> nodes = [('node1', {'cores': 8})]
>>> r = Resources(tmax=100, cores=8)
>>> r.bigger(State.TIMEOUT, nodes)
Resources(cores=8, tmax=200)
>>> r.bigger(State.MEMORY, nodes)
Resources(cores=16, tmax=100)
Parameters:
Return type:

Resources

static from_string(s)[source]

Create Resource object from string.

>>> r = Resources.from_string('16:1:xeon8:2h')
>>> r
Resources(cores=16, processes=1, tmax=7200, nodename='xeon8')
>>> print(r)
16:1:xeon8:2h
>>> Resources.from_string('16:1m')
Resources(cores=16, tmax=60)
>>> r = Resources.from_string('16:1m:25')
>>> r
Resources(cores=16, tmax=60, weight=25.0)
>>> print(r)
16:1m:25
Parameters:

s (str) –

Return type:

Resources

select(nodelist)[source]

Select appropriate node.

>>> nodes = [('node1', {'cores': 16}),
...          ('node2', {'cores': 8}),
...          ('fatnode2', {'cores': 8})]
>>> Resources(cores=24).select(nodes)
(3, 'node2', {'cores': 8})
>>> Resources(cores=32).select(nodes)
(2, 'node1', {'cores': 16})
>>> Resources(cores=32, nodename='fatnode2').select(nodes)
(4, 'fatnode2', {'cores': 8})
>>> Resources(cores=1).select(nodes)
(1, 'node2', {'cores': 8})
>>> Resources(cores=32, nodename='node3').select(nodes)
Traceback (most recent call last):
    ...
ValueError: No such node: node3
Parameters:

nodelist (list[Tuple[str, Dict[str, Any]]]) –

Return type:

tuple[int, str, dict[str, Any]]

todict()[source]

Convert to dict.

Return type:

dict[str, Any]

Queue

class myqueue.queue.Queue(config=None, *, need_lock=True, dry_run=False)[source]

Object for interacting with your .myqueue/queue.sqlite3 file

Parameters:
  • config (Configuration) –

  • need_lock (bool) –

  • dry_run (bool) –

add(*tasks)[source]

Add tasks to database.

Parameters:

tasks (Task) –

Return type:

None

cancel_dependents(ids)[source]

Set state of dependents to CANCELED.

Parameters:

ids (Iterable[int]) –

Return type:

None

check_for_oom()[source]

Fin out-of-memory tasks.

Return type:

None

check_for_timeout()[source]

Find “running” tasks that are actually timed out.

Return type:

None

property connection: Connection

Get or create a connection object.

find_dependents(ids, known=None)[source]

Yield dependents.

Parameters:
Return type:

Iterator[int]

process_change_files()[source]

Process state-change files from running tasks.

Return type:

None

remove(ids)[source]

Remove tasks.

Parameters:

ids (Iterable[int]) –

Return type:

None

property scheduler: Scheduler

Scheduler object.

select(selection=None)[source]

Create tasks from selection object.

Parameters:

selection (Selection) –

Return type:

list[Task]

sql(statement, args=None)[source]

Raw SQL execution.

Parameters:
  • statement (LiteralString) –

  • args (list[str | int]) –

Return type:

Iterator[tuple]

tasks(where, args=None)[source]

Create tasks from SQL WHERE statement.

Parameters:
  • where (LiteralString) –

  • args (list[str | int]) –

Return type:

list[Task]

update_one_task(id, newstate, ctime, path)[source]

Update single task.

Parameters:
Return type:

None

Schedulers

class myqueue.schedulers.slurm.SLURM(config)[source]
Parameters:

config (Configuration) –

cancel(id)[source]

Cancel a task.

Parameters:

id (int) –

Return type:

None

get_ids()[source]

Get ids for all tasks the scheduler knows about.

Return type:

set[int]

submit(task, dry_run=False, verbose=False)[source]

Submit a task.

Parameters:
Return type:

int

class myqueue.schedulers.lsf.LSF(config)[source]
Parameters:

config (Configuration) –

cancel(id)[source]

Cancel a task.

Parameters:

id (int) –

Return type:

None

get_ids()[source]

Get ids for all tasks the scheduler knows about.

Return type:

set[int]

submit(task, dry_run=False, verbose=False)[source]

Submit a task.

Parameters:
Return type:

int

class myqueue.schedulers.pbs.PBS(config)[source]
Parameters:

config (Configuration) –

cancel(id)[source]

Cancel a task.

Parameters:

id (int) –

Return type:

None

get_ids()[source]

Get ids for all tasks the scheduler knows about.

Return type:

set[int]

submit(task, dry_run=False, verbose=False)[source]

Submit a task.

Parameters:
Return type:

int