Source code for myqueue.schedulers.pbs

from __future__ import annotations
import os
import subprocess
from pathlib import Path

from myqueue.task import Task
from myqueue.schedulers import Scheduler, SchedulerError


[docs] class PBS(Scheduler): output_file_pattern = r'[a-zA-Z]+.*\.[eo]+[1-9]+[0-9]*'
[docs] def submit(self, task: Task, dry_run: bool = False, verbose: bool = False) -> int: nodelist = self.config.nodes nodes, nodename, nodedct = task.resources.select(nodelist) processes = task.resources.processes if processes < nodedct['cores']: ppn = processes else: assert processes % nodes == 0 ppn = processes // nodes hours, rest = divmod(task.resources.tmax, 3600) minutes, seconds = divmod(rest, 60) qsub = ['qsub', '-N', task.cmd.short_name, '-l', f'walltime={hours}:{minutes:02}:{seconds:02}', '-l', f'nodes={nodes}:ppn={ppn}', '-d', str(task.folder)] qsub += self.config.extra_args + nodedct.get('extra_args', []) if task.dtasks: ids = ':'.join(str(tsk.id) for tsk in task.dtasks) qsub.extend(['-W', f'depend=afterok:{ids}']) cmd = str(task.cmd) if task.resources.processes > 1: mpiexec = 'mpiexec -x OMP_NUM_THREADS=1 -x MPLBACKEND=Agg ' if 'mpiargs' in nodedct: mpiexec += nodedct['mpiargs'] + ' ' cmd = mpiexec + cmd.replace('python3', self.config.parallel_python) else: cmd = 'MPLBACKEND=Agg ' + cmd home = self.config.home script = '#!/bin/bash -l\n' script += self.get_venv_activation_line() script += ( '#!/bin/bash -l\n' 'export MYQUEUE_TASK_ID=${PBS_JOBID%.*}\n' f'mq={home}/.myqueue/pbs-$MYQUEUE_TASK_ID\n' f'(touch $mq-0 && cd {task.folder} && {cmd} && touch $mq-1) || ' 'touch $mq-2\n') if dry_run: print(qsub, script) return 1 p = subprocess.run(qsub, input=script.encode(), capture_output=True) if p.returncode: raise SchedulerError((p.stderr + p.stdout).decode()) id = int(p.stdout.split(b'.')[0].decode()) return id
def error_file(self, task: Task) -> Path: return task.folder / f'{task.cmd.short_name}.e{task.id}'
[docs] def cancel(self, id: int) -> None: subprocess.run(['qdel', str(id)])
[docs] def get_ids(self) -> set[int]: user = os.environ.get('USER', 'test') cmd = ['qstat', '-u', user] p = subprocess.run(cmd, stdout=subprocess.PIPE) queued = {int(line.split()[0].split(b'.')[0].decode()) for line in p.stdout.splitlines() if line[:1].isdigit()} return queued