Source code for myqueue.pbs

from __future__ import annotations
import os
import subprocess
from pathlib import Path

from .task import Task
from .scheduler import Scheduler


[docs]class PBS(Scheduler): output_file_pattern = r'[a-zA-Z]+.*\.[eo]+[1-9]+[0-9]*' def submit(self, task: Task, dry_run: bool = False, verbose: bool = False) -> None: nodelist = self.config.nodes nodes, nodename, nodedct = task.resources.select(nodelist) processes = task.resources.processes if processes < nodedct['cores']: ppn = processes else: assert processes % nodes == 0 ppn = processes // nodes hours, rest = divmod(task.resources.tmax, 3600) minutes, seconds = divmod(rest, 60) qsub = ['qsub', '-N', task.cmd.short_name, '-l', f'walltime={hours}:{minutes:02}:{seconds:02}', '-l', f'nodes={nodes}:ppn={ppn}', '-d', str(task.folder)] qsub += self.config.extra_args + nodedct.get('extra_args', []) if task.dtasks: ids = ':'.join(tsk.id for tsk in task.dtasks) qsub.extend(['-W', f'depend=afterok:{ids}']) cmd = str(task.cmd) if task.resources.processes > 1: mpiexec = 'mpiexec -x OMP_NUM_THREADS=1 -x MPLBACKEND=Agg ' if 'mpiargs' in nodedct: mpiexec += nodedct['mpiargs'] + ' ' cmd = mpiexec + cmd.replace('python3', self.config.parallel_python) else: cmd = 'MPLBACKEND=Agg ' + cmd home = self.config.home script = '#!/bin/bash -l\n' script += task.get_venv_activation_line() script += ( '#!/bin/bash -l\n' 'id=${{PBS_JOBID%.*}}\n' 'mq={home}/.myqueue/pbs-$id\n' '(touch $mq-0 && cd {dir} && {cmd} && touch $mq-1) || ' 'touch $mq-2\n' .format(home=home, dir=task.folder, cmd=cmd)) if dry_run: print(qsub, script) return p = subprocess.run(qsub, input=script.encode(), capture_output=True) id = p.stdout.split(b'.')[0].decode() task.id = id def error_file(self, task: Task) -> Path: return task.folder / f'{task.cmd.short_name}.e{task.id}' def cancel(self, task: Task) -> None: subprocess.run(['qdel', task.id]) def get_ids(self) -> set[str]: user = os.environ.get('USER', 'test') cmd = ['qstat', '-u', user] p = subprocess.run(cmd, stdout=subprocess.PIPE) queued = {line.split()[0].split(b'.')[0].decode() for line in p.stdout.splitlines() if line[:1].isdigit()} return queued