Source code for myqueue.task

from __future__ import annotations

import json
import os
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterator
from warnings import warn

from myqueue.commands import Command, create_command
from myqueue.errors import parse_stderr
from myqueue.resources import Resources
from myqueue.states import State

if TYPE_CHECKING:
    from myqueue.schedulers import Scheduler

UNSPECIFIED = 'hydelifytskibadut'


[docs] class Task: """Task object. Parameters ---------- cmd: :class:`myqueue.commands.Command` Command to be run. resources: Resources Combination of number of cores, nodename, number of processes and maximum time. deps: list of Path objects Dependencies. restart: int How many times to restart task. workflow: bool Task is part of a workflow. folder: Path Folder where task should run. creates: list of str Name of files created by task. """ def __init__(self, cmd: Command, *, resources: Resources, deps: list[Path], restart: int, workflow: bool, folder: Path, creates: list[str], notifications: str = '', state: State = State.undefined, id: int = 0, error: str = '', tqueued: float = 0.0, trunning: float = 0.0, tstop: float = 0.0, user: str = ''): self.cmd = cmd self.resources = resources self.deps = deps self.restart = restart self.workflow = workflow self.folder = folder self.notifications = notifications self.creates = creates assert isinstance(state, State), state self.state = state self.id = id self.error = error # Timing: self.tqueued = tqueued self.trunning = trunning self.tstop = tstop self.user = user or os.environ.get('USER', 'root') self.dname = folder / cmd.name self.dtasks: list[Task] = [] self._done: bool | None = None self.result = UNSPECIFIED @property def name(self) -> str: return f'{self.cmd.name}.{self.id}' def running_time(self, t: float = None) -> float: if self.state in ['CANCELED', 'queued', 'hold']: dt = 0.0 elif self.state == 'running': t = t or time.time() dt = t - self.trunning else: dt = self.tstop - self.trunning return dt def words(self) -> list[str]: t = time.time() age = t - self.tqueued dt = self.running_time(t) info = [] if self.restart: info.append(f'*{self.restart}') if self.deps: info.append(f'd{len(self.deps)}') if self.cmd.args: info.append(f'+{len(self.cmd.args)}') if self.notifications: info.append(self.notifications) return [str(self.id), str(self.folder) + '/', self.cmd.name.split('+', 1)[0], ' '.join(self.cmd.args), ','.join(info), str(self.resources), seconds_to_time_string(age), self.state.name, seconds_to_time_string(dt), self.error] def __str__(self) -> str: return ' '.join(self.words()) def __repr__(self) -> str: return f'Task({self.cmd.name})' def order_key(self, column: str) -> Any: """ifnAraste""" if column == 'i': return self.id if column == 'f': return self.folder if column == 'n': return self.name if column == 'A': return len(self.cmd.args) if column == 'r': return self.resources.cores * self.resources.tmax if column == 'a': return self.tqueued if column == 's': return self.state.name if column == 't': return self.running_time() if column == 'e': return self.error raise ValueError(f'Unknown column: {column}! ' 'Must be one of i, f, n, a, I, r, A, s, t or e') def todict(self, root: Path = None) -> dict[str, Any]: folder = self.folder deps = self.deps if root: folder = folder.relative_to(root) deps = [dep.relative_to(root) for dep in self.deps] return { 'id': self.id, 'folder': str(folder), 'cmd': self.cmd.todict(), 'state': self.state.name, 'resources': self.resources.todict(), 'restart': self.restart, 'workflow': self.workflow, 'deps': [str(dep) for dep in deps], 'notifications': self.notifications, 'creates': self.creates, 'tqueued': self.tqueued, 'trunning': self.trunning, 'tstop': self.tstop, 'error': self.error, 'user': self.user} def to_sql(self, root: Path) -> tuple[int, str, str, str, str, str, int, bool, str, float, str, str, float, float, float, str, str]: folder = str(self.folder.relative_to(root)) if folder == '.': folder = './' else: folder = f'./{folder}/' return (self.id, folder, self.state.name[0], # str(self.dname.relative_to(root)), self.dname.name, json.dumps(self.cmd.todict()), json.dumps(self.resources.todict()), self.restart, self.workflow, ','.join(str(dep.relative_to(root)) for dep in self.deps), self.resources.weight, self.notifications, ','.join(self.creates), self.tqueued, self.trunning, self.tstop, self.error, self.user) @staticmethod def from_sql_row(row: tuple, root: Path) -> Task: (id, folder, state, name, cmd, resources, restart, workflow, deps, weight, notifications, creates, tqueued, trunning, tstop, error, user) = row resources = Resources(**json.loads(resources)) assert resources.weight == weight return Task(id=id, folder=root / folder, state=State(state), cmd=create_command(**json.loads(cmd)), resources=resources, restart=restart, workflow=bool(workflow), deps=[] if not deps else [root / dep for dep in deps.split(',')], notifications=notifications, creates=[] if not creates else creates.split(','), tqueued=tqueued, trunning=trunning, tstop=tstop, error=error, user=user) @staticmethod def fromdict(dct: dict[str, Any], root: Path) -> Task: dct = dct.copy() # Backwards compatibility with version 2: if 'restart' not in dct: dct['restart'] = 0 else: dct['restart'] = int(dct['restart']) dct.pop('diskspace', None) # Backwards compatibility: if 'creates' not in dct: dct['creates'] = [] f = dct.pop('folder') if f.startswith('/'): # Backwards compatibility with version 5: folder = Path(f) deps = [Path(dep) for dep in dct.pop('deps')] else: folder = root / f deps = [root / dep for dep in dct.pop('deps')] id = int(dct.pop('id')) return Task(cmd=create_command(**dct.pop('cmd')), resources=Resources(**dct.pop('resources')), state=State[dct.pop('state')], folder=folder, deps=deps, notifications=dct.pop('notifications', ''), id=id, **dct) def infolder(self, folder: Path, recursive: bool) -> bool: """Check if task runs inside a folder tree.""" return folder == self.folder or (recursive and folder in self.folder.parents) def check_creates_files(self) -> bool: """Check if all files have been created.""" if self.creates: for pattern in self.creates: if not any(self.folder.glob(pattern)): return False return True return False def read_error_and_check_for_oom(self, scheduler: Scheduler) -> bool: """Check error message. Return True if out of memory. """ self.error = '-' # mark as already read path = scheduler.error_file(self) try: text = path.read_text() except (FileNotFoundError, UnicodeDecodeError): return False self.error, oom = parse_stderr(text) return oom def ideps(self, map: dict[Path, Task]) -> Iterator[Task]: """Yield task and its dependencies.""" yield self for dname in self.deps: yield from map[dname].ideps(map)
[docs] def submit(self, verbosity: int = 1, dry_run: bool = False) -> None: """Submit task. Parameters ---------- verbosity: int Must be 0, 1 or 2. dry_run: bool Don't actually submit the task. """ from myqueue.config import Configuration from myqueue.queue import Queue from myqueue.submitting import submit config = Configuration.read() with Queue(config, dry_run=dry_run) as queue: submit(queue, [self], verbosity=verbosity)
def run(self) -> None: self.result = self.cmd.run()
def create_task(cmd: str, args: list[str] = [], *, resources: str = '', workflow: bool = False, name: str = '', deps: str | list[str] | Task | list[Task] = '', cores: int = 0, nodename: str = '', processes: int = 0, tmax: str = '', weight: float = -1.0, folder: str = '', restart: int = 0, creates: list[str] = []) -> Task: """Create a Task object. :: task = task('abc.py') Parameters ---------- cmd: str Command to be run. args: list of str Command-line arguments or function arguments. resources: str Resources:: 'cores[:nodename][:processes]:tmax' Examples: '48:1d', '32:1h', '8:xeon8:1:30m'. Can not be used togeter with any of "cores", "nodename", "processes" and "tmax". name: str Name to use for task. Default is <cmd>[+<arg1>[_<arg2>[_<arg3>]...]]. deps: str, list of str, Task object or list of Task objects Dependencies. Examples: "task1,task2", "['task1', 'task2']". cores: int Number of cores (default is 1). nodename: str Name of node. processes: int Number of processes to start (default is one for each core). tmax: str Maximum time for task. Examples: "40s", "30m", "20h" and "2d". workflow: bool Task is part of a workflow. folder: str Folder where task should run (default is current folder). restart: int How many times to restart task. weight: float Weight of task. See :ref:`task_weight`. creates: list of str Name of files created by task (can be both full filenames or patterns matching filenames). Returns ------- Task Object representing the task. """ path = Path(folder).absolute() dpaths = [] if deps: if isinstance(deps, str): deps = deps.split(',') elif isinstance(deps, Task): deps = [deps] for dep in deps: if isinstance(dep, str): p = path / dep if '..' in p.parts: p = p.parent.resolve() / p.name dpaths.append(p) else: dpaths.append(dep.dname) if '@' in cmd: # Old way of specifying resources: c, r = cmd.rsplit('@', 1) if r[0].isdigit(): cmd = c resources = r warn(f'Please use resources={r!r} instead of deprecated ' f'...@{r} syntax!') command = create_command(cmd, args, name=name) res = Resources.from_args_and_command( cores, nodename, processes, tmax, weight, resources, command, path) return Task(command, resources=res, deps=dpaths, restart=restart, workflow=workflow, folder=path, creates=creates) task = create_task def seconds_to_time_string(n: float) -> str: """Convert number of seconds to string. >>> seconds_to_time_string(10) '0:10' >>> seconds_to_time_string(3601) '1:00:01' >>> seconds_to_time_string(24 * 3600) '1:00:00:00' """ n = int(n) d, n = divmod(n, 24 * 3600) h, n = divmod(n, 3600) m, s = divmod(n, 60) if d: return f'{d}:{h:02}:{m:02}:{s:02}' if h: return f'{h}:{m:02}:{s:02}' return f'{m}:{s:02}'