Source code for myqueue.queue

"""Queue class for interacting with the queue.

File format versions:

5) Changed from mod:func to mod@func.
6) Relative paths.
8) Type of Task.id changed from int to str.
9) Added "user".

"""
from __future__ import annotations

import json
import os
import time
from collections import defaultdict
from types import TracebackType
from typing import Sequence

from myqueue.config import Configuration
from myqueue.email import configure_email, send_notification
from myqueue.pretty import pprint
from myqueue.resources import Resources
from myqueue.run import run_tasks
from myqueue.scheduler import Scheduler, get_scheduler
from myqueue.selection import Selection
from myqueue.states import State
from myqueue.submitting import submit_tasks
from myqueue.task import Task
from myqueue.utils import Lock, plural


[docs]class Queue(Lock): """Object for interacting with the scheduler.""" def __init__(self, config: Configuration, verbosity: int = 1, need_lock: bool = True, dry_run: bool = False): self.verbosity = verbosity self.need_lock = need_lock self.dry_run = dry_run self.config = config self.folder = config.home / '.myqueue' self.fname = self.folder / 'queue.json' Lock.__init__(self, self.fname.with_name('queue.json.lock'), timeout=10.0) self._scheduler: Scheduler | None = None self.tasks: list[Task] = [] self.changed: set[Task] = set() @property def scheduler(self) -> Scheduler: """Scheduler object.""" if self._scheduler is None: self._scheduler = get_scheduler(self.config) return self._scheduler def __enter__(self) -> Queue: if self.dry_run: return self if self.need_lock: self.acquire() else: try: self.acquire() except PermissionError: pass return self def __exit__(self, type: Exception, value: Exception, tb: TracebackType) -> None: if self.changed and not self.dry_run: self._write() self.release()
[docs] def ls(self, selection: Selection, columns: str, sort: str | None = None, reverse: bool = False, short: bool = False, use_log_file: bool = False) -> list[Task]: """Pretty-print list of tasks.""" self._read(use_log_file) tasks = selection.select(self.tasks) if isinstance(sort, str): tasks.sort(key=lambda task: task.order(sort), # type: ignore reverse=reverse) pprint(tasks, self.verbosity, columns, short) return tasks
[docs] def submit(self, tasks: Sequence[Task], force: bool = False, max_tasks: int = 1_000_000_000, read: bool = True) -> None: """Submit tasks to queue. Parameters ========== force: bool Ignore and remove name.FAILED files. """ if read: self._read() current = {task.dname: task for task in self.tasks} submitted, skipped, ex = submit_tasks( self.scheduler, tasks, current, force, max_tasks, self.verbosity, self.dry_run) for task in submitted: if task.workflow: oldtask = current.get(task.dname) if oldtask: self.tasks.remove(oldtask) if 'MYQUEUE_TESTING' in os.environ: if any(task.cmd.args == ['SIMULATE-CTRL-C'] for task in submitted): raise KeyboardInterrupt self.tasks += submitted self.changed.update(submitted) if ex: print() print('Skipped', plural(len(skipped), 'task')) pprint(submitted, 0, 'ifnaIr', maxlines=10 if self.verbosity < 2 else 99999999999999) if submitted: if self.dry_run: print(plural(len(submitted), 'task'), 'to submit') else: print(plural(len(submitted), 'task'), 'submitted') if ex: raise ex
[docs] def run(self, tasks: list[Task]) -> None: """Run tasks locally.""" self._read() dnames = {task.dname for task in tasks} self._remove([task for task in self.tasks if task.dname in dnames]) if self.dry_run: for task in tasks: print(f'{task.folder}: {task.cmd}') else: run_tasks(tasks)
[docs] def remove(self, selection: Selection) -> None: """Remove or cancel tasks.""" self._read() tasks = selection.select(self.tasks) tasks = self.find_depending(tasks) self._remove(tasks)
def _remove(self, tasks: list[Task]) -> None: t = time.time() for task in tasks: if task.tstop is None: task.tstop = t # XXX is this for dry_run only? if self.dry_run: if tasks: pprint(tasks, 0) print(plural(len(tasks), 'task'), 'to be removed') else: if self.verbosity > 0: if tasks: pprint(tasks, 0) print(plural(len(tasks), 'task'), 'removed') for task in tasks: if task.state in ['running', 'hold', 'queued']: self.scheduler.cancel(task) self.tasks.remove(task) # XXX why cancel? task.cancel_dependents(self.tasks, time.time()) self.changed.add(task)
[docs] def sync(self) -> None: """Syncronize queue with the real world.""" self._read() in_the_queue = {'running', 'hold', 'queued'} ids = self.scheduler.get_ids() cancel = [] remove = [] for task in self.tasks: if task.id not in ids: if task.state in in_the_queue: cancel.append(task) if not task.folder.is_dir(): remove.append(task) if cancel: if self.dry_run: print(plural(len(cancel), 'job'), 'to be canceled') else: for task in cancel: task.state = State.CANCELED self.changed.add(task) print(plural(len(cancel), 'job'), 'canceled') if remove: if self.dry_run: print(plural(len(remove), 'job'), 'to be removed') else: for task in remove: self.tasks.remove(task) self.changed.add(task) print(plural(len(remove), 'job'), 'removed')
[docs] def find_depending(self, tasks: list[Task]) -> list[Task]: """Generate list of tasks including dependencies.""" map = {task.dname: task for task in self.tasks} d: dict[Task, list[Task]] = defaultdict(list) for task in self.tasks: for dname in task.deps: tsk = map.get(dname) if tsk: d[tsk].append(task) removed = [] def remove(task: Task) -> None: removed.append(task) for j in d[task]: remove(j) for task in tasks: remove(task) return sorted(set(removed), key=lambda task: task.id)
[docs] def modify(self, selection: Selection, newstate: State, email: set[State]) -> None: """Modify task(s).""" self._read() tasks = selection.select(self.tasks) if email != {State.undefined}: configure_email(self.config) for task in tasks: if self.dry_run: print(task, email) else: task.notifications = ''.join(state.value for state in email) self.changed.add(task) if newstate != State.undefined: for task in tasks: if task.state == 'hold' and newstate == 'queued': if self.dry_run: print('Release:', task) else: self.scheduler.release_hold(task) elif task.state == 'queued' and newstate == 'hold': if self.dry_run: print('Hold:', task) else: self.scheduler.hold(task) elif task.state == 'FAILED' and newstate in ['MEMORY', 'TIMEOUT']: if self.dry_run: print('FAILED ->', newstate, task) else: task.state = newstate self.changed.add(task) else: raise ValueError(f'Can\'t do {task.state} -> {newstate}!') print(f'{task.state} -> {newstate}: {task}') task.state = newstate self.changed.add(task)
[docs] def resubmit(self, selection: Selection, resources: Resources | None) -> None: """Resubmit failed or timed-out tasks.""" self._read() tasks = [] for task in selection.select(self.tasks): if task.state not in {'queued', 'hold', 'running'}: self.tasks.remove(task) task.remove_state_file() self.changed.add(task) task = Task(task.cmd, deps=task.deps, resources=resources or task.resources, folder=task.folder, restart=task.restart, workflow=task.workflow, creates=task.creates, diskspace=0) tasks.append(task) self.submit(tasks, read=False)
def _read(self, use_log_file: bool = False) -> None: if use_log_file: logfile = self.folder / 'log.csv' if logfile.is_file(): import csv with logfile.open() as fd: reader = csv.reader(fd) next(reader) # skip header self.tasks = [Task.fromcsv(row) for row in reader] return if self.fname.is_file(): data = json.loads(self.fname.read_text()) root = self.folder.parent for dct in data['tasks']: task = Task.fromdict(dct, root) self.tasks.append(task) if self.locked: self.read_change_files() self.check() def read_change_files(self) -> None: paths = list(self.folder.glob('*-*-*')) files = [] for path in paths: _, id, state = path.name.split('-') files.append((path.stat().st_ctime, id, state)) path.unlink() states = {'0': State.running, '1': State.done, '2': State.FAILED, '3': State.TIMEOUT} for t, id, state in sorted(files): self.update(id, states[state], t) def update(self, id: str, state: State, t: float = 0.0) -> None: for task in self.tasks: if task.id == id: break else: print(f'No such task: {id}, {state}') return t = t or time.time() task.state = state if state == 'done': for tsk in self.tasks: if task.dname in tsk.deps: tsk.deps.remove(task.dname) task.write_state_file() task.tstop = t elif state == 'running': task.trunning = t elif state in ['FAILED', 'TIMEOUT', 'MEMORY']: task.cancel_dependents(self.tasks, t) task.tstop = t task.write_state_file() else: raise ValueError(f'Bad state: {state}') if state != 'running': mem = self.scheduler.maxrss(id) task.memory_usage = mem self.changed.add(task) def check(self) -> None: t = time.time() for task in self.tasks: if task.state == 'running': delta = t - task.trunning - task.resources.tmax if delta > 0: if self.scheduler.has_timed_out(task) or delta > 1800: task.state = State.TIMEOUT task.tstop = t task.cancel_dependents(self.tasks, t) self.changed.add(task) bad = {task.dname for task in self.tasks if task.state.is_bad()} for task in self.tasks: if task.state == 'queued': for dep in task.deps: if dep in bad: task.state = State.CANCELED task.tstop = t self.changed.add(task) break for task in self.tasks: if task.state == 'FAILED': if not task.error: oom = task.read_error(self.scheduler) if oom: task.state = State.MEMORY task.write_state_file() self.changed.add(task)
[docs] def kick(self) -> dict[str, int]: """Kick the system. * Send email notifications * restart timed-out tasks * restart out-of-memory tasks * release/hold tasks to stay under *maximum_diskspace* """ self._read() mytasks = [task for task in self.tasks if task.user == self.config.user] result = {} ndct = self.config.notifications if ndct: notifications = send_notification(mytasks, **ndct) self.changed.update(task for task, statename in notifications) result['notifications'] = len(notifications) tasks = [] for task in mytasks: if task.state in ['TIMEOUT', 'MEMORY'] and task.restart: nodes = self.config.nodes or [('', {'cores': 1})] if not self.dry_run: task.resources = task.resources.bigger(task.state, nodes) task.restart -= 1 tasks.append(task) if tasks: tasks = self.find_depending(tasks) if self.dry_run: pprint(tasks) else: if self.verbosity > 0: print('Restarting', plural(len(tasks), 'task')) for task in tasks: self.tasks.remove(task) task.error = '' task.id = '0' task.state = State.undefined self.submit(tasks, read=False) result['restarts'] = len(tasks) result.update(self.hold_or_release(mytasks)) return result
def hold_or_release(self, tasks: list[Task]) -> dict[str, int]: maxmem = self.config.maximum_diskspace mem = 0 for task in tasks: if task.state in {'queued', 'running', 'FAILED', 'TIMEOUT', 'MEMORY'}: mem += task.diskspace held = 0 released = 0 if mem > maxmem: for task in tasks: if task.state == 'queued': if task.diskspace > 0: self.scheduler.hold(task) held += 1 task.state = State.hold self.changed.add(task) mem -= task.diskspace if mem < maxmem: break elif mem < maxmem: for task in tasks[::-1]: if task.state == 'hold' and task.diskspace > 0: self.scheduler.release_hold(task) released += 1 task.state = State.queued self.changed.add(task) mem += task.diskspace if mem > maxmem: break return {name: n for name, n in [('held', held), ('released', released)] if n > 0} def _write(self) -> None: root = self.folder.parent dicts = [] for task in self.tasks: dicts.append(task.todict(root)) text = json.dumps( {'version': 9, 'warning': 'Do NOT edit this file!', 'unless': 'you know what you are doing.', 'tasks': dicts}, indent=2) self.fname.write_text(text) # Write to log: logfile = root / '.myqueue/log.csv' write_header = not logfile.is_file() with logfile.open('a') as fd: for task in self.changed: task.tocsv(fd, write_header) write_header = False