from __future__ import annotations
import subprocess
from .task import Task
from .scheduler import Scheduler
from .utils import str2number
[docs]class LSF(Scheduler):
def submit(self,
task: Task,
dry_run: bool = False,
verbose: bool = False) -> None:
nodelist = self.config.nodes
nodes, nodename, nodedct = task.resources.select(nodelist)
name = task.cmd.short_name
hours, minutes = divmod(max(task.resources.tmax, 60) // 60, 60)
bsub = ['bsub',
'-J', name,
'-W', f'{hours:02}:{minutes:02}',
'-n', str(task.resources.cores),
'-o', f'{task.folder}/{name}.%J.out',
'-e', f'{task.folder}/{name}.%J.err',
'-R', f'select[model == {nodename}]']
mem = nodedct['memory']
gbytes = int(str2number(mem) / 1_000_000_000 / nodedct['cores'])
bsub += ['-R', f'rusage[mem={gbytes}G]']
extra_args = self.config.extra_args + nodedct.get('extra_args', [])
if extra_args:
bsub += extra_args
if task.dtasks:
ids = ' && '.join(f'done({t.id})'
for t in task.dtasks)
bsub += ['-w', f"{ids}"]
# bsub += ['-R', f'span[hosts={nodes}]']
bsub += ['-R', 'span[hosts=1]']
cmd = str(task.cmd)
if task.resources.processes > 1:
cmd = 'mpiexec ' + cmd.replace('python3',
self.config.parallel_python)
home = self.config.home
script = (
'#!/bin/bash -l\n'
'id=$LSB_JOBID\n'
f'mq={home}/.myqueue/lsf-$id\n')
script += task.get_venv_activation_line()
script += (
'(touch $mq-0 && \\\n'
f' cd {str(task.folder)!r} && \\\n'
f' {cmd} && \\\n'
' touch $mq-1) || \\\n'
'(touch $mq-2; exit 1)\n')
# print(' \\\n '.join(bsub))
# print(script)
if dry_run:
print(' \\\n '.join(bsub))
print(script)
return
p = subprocess.run(bsub,
input=script.encode(),
capture_output=True)
id = p.stdout.split()[1][1:-1].decode()
task.id = id
def has_timed_out(self, task: Task) -> bool:
path = self.error_file(task).expanduser()
if path.is_file():
task.tstop = path.stat().st_mtime
lines = path.read_text().splitlines()
for line in lines:
if line.startswith('TERM_RUNLIMIT:'):
return True
return False
def cancel(self, task: Task) -> None:
subprocess.run(['bkill', task.id])
def get_ids(self) -> set[str]:
p = subprocess.run(['bjobs'], stdout=subprocess.PIPE)
queued = {line.split()[0].decode()
for line in p.stdout.splitlines()
if line[:1].isdigit()}
return queued
def get_config(self, queue: str = '') -> tuple[list[tuple[str, int, str]],
list[str]]:
from collections import defaultdict
from .utils import str2number
cmd = ['nodestat', '-F', queue]
p = subprocess.run(cmd, stdout=subprocess.PIPE)
cores: dict[str, int] = {}
memory: dict[str, list[str]] = defaultdict(list)
for line in p.stdout.decode().splitlines():
id, state, procs, load, name, mem, unit, *_ = line.split()
if state == 'State':
continue # skip header
if state == 'Down':
continue
cores[name] = int(procs.split(':')[1])
memory[name].append(mem + unit)
nodes = [
(name, cores[name], min(memory[name], key=str2number))
for name in cores]
if queue:
extra_args = ['-q', queue]
else:
extra_args = []
return nodes, extra_args