from __future__ import annotations
import subprocess
from myqueue.task import Task
from myqueue.schedulers import Scheduler, SchedulerError
from myqueue.utils import str2number
[docs]
class LSF(Scheduler):
[docs]
def submit(self,
task: Task,
dry_run: bool = False,
verbose: bool = False) -> int:
nodelist = self.config.nodes
nodes, nodename, nodedct = task.resources.select(nodelist)
name = task.cmd.short_name
hours, minutes = divmod(max(task.resources.tmax, 60) // 60, 60)
bsub = ['bsub',
'-J', name,
'-W', f'{hours:02}:{minutes:02}',
'-n', str(task.resources.cores),
'-o', f'{task.folder}/{name}.%J.out',
'-e', f'{task.folder}/{name}.%J.err',
'-R', f'select[model == {nodename}]']
mem = nodedct['memory']
gbytes = int(str2number(mem) / 1_000_000_000 / nodedct['cores'])
bsub += ['-R', f'rusage[mem={gbytes}G]']
extra_args = self.config.extra_args + nodedct.get('extra_args', [])
if extra_args:
bsub += extra_args
if task.dtasks:
ids = ' && '.join(f'done({t.id})'
for t in task.dtasks)
bsub += ['-w', f"{ids}"]
# bsub += ['-R', f'span[hosts={nodes}]']
bsub += ['-R', 'span[hosts=1]']
cmd = str(task.cmd)
if task.resources.processes > 1:
cmd = 'mpiexec ' + cmd.replace('python3',
self.config.parallel_python)
home = self.config.home
script = (
'#!/bin/bash -l\n'
'export MYQUEUE_TASK_ID=$LSB_JOBID\n'
f'mq={home}/.myqueue/lsf-$MYQUEUE_TASK_ID\n')
script += self.get_venv_activation_line()
script += (
'(touch $mq-0 && \\\n'
f' cd {str(task.folder)!r} && \\\n'
f' {cmd} && \\\n'
' touch $mq-1) || \\\n'
'(touch $mq-2; exit 1)\n')
# print(' \\\n '.join(bsub))
# print(script)
if dry_run:
print(' \\\n '.join(bsub))
print(script)
return 1
p = subprocess.run(bsub,
input=script.encode(),
capture_output=True)
if p.returncode:
raise SchedulerError((p.stderr + p.stdout).decode())
id = int(p.stdout.split()[1][1:-1].decode())
return id
def has_timed_out(self, task: Task) -> bool:
path = self.error_file(task).expanduser()
if path.is_file():
task.tstop = path.stat().st_mtime
lines = path.read_text().splitlines()
for line in lines:
if line.startswith('TERM_RUNLIMIT:'):
return True
return False
[docs]
def cancel(self, id: int) -> None:
subprocess.run(['bkill', str(id)])
[docs]
def get_ids(self) -> set[int]:
p = subprocess.run(['bjobs'], stdout=subprocess.PIPE)
queued = {int(line.split()[0].decode())
for line in p.stdout.splitlines()
if line[:1].isdigit()}
return queued
def get_config(self, queue: str = '') -> tuple[list[tuple[str, int, str]],
list[str]]:
from collections import defaultdict
cmd = ['nodestat', '-F', queue]
p = subprocess.run(cmd, stdout=subprocess.PIPE)
cores: dict[str, int] = {}
memory: dict[str, list[str]] = defaultdict(list)
for line in p.stdout.decode().splitlines():
_, state, procs, _, name, mem, unit, *_ = line.split()
if state == 'State':
continue # skip header
if state == 'Down':
continue
cores[name] = int(procs.split(':')[1])
memory[name].append(mem + unit)
nodes = [
(name, n, min(memory[name], key=str2number))
for name, n in cores.items()]
if queue:
extra_args = ['-q', queue]
else:
extra_args = []
return nodes, extra_args