Source code for myqueue.resources

"""Resource class to handle resource requirements: time, cores, processes."""
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING, Any, Tuple, Dict

if TYPE_CHECKING:
    from myqueue.commands import Command

from myqueue.states import State

Node = Tuple[str, Dict[str, Any]]

RESOURCES_URL = (
    'https://myqueue.readthedocs.io/en/latest/documentation.html#resources')


def seconds_to_short_time_string(n: float) -> str:
    """Convert seconds to time string.

    >>> seconds_to_short_time_string(42)
    '42s'
    >>> seconds_to_short_time_string(7200)
    '2h'
    """
    n = int(n)
    for s, t in [('d', 24 * 3600),
                 ('h', 3600),
                 ('m', 60),
                 ('s', 1)]:
        if n % t == 0:
            break

    return f'{n // t}{s}'


def T(t: str) -> int:
    """Convert string to seconds."""
    return {'s': 1,
            'm': 60,
            'h': 3600,
            'd': 24 * 3600}[t[-1]] * int(t[:-1])


[docs] class Resources: """Resource description.""" def __init__(self, cores: int = 0, nodename: str = '', processes: int = 0, tmax: int = 0, weight: float = -1.0): """Resource object. cores: int Number of cores. nodename: str Name of node. processes: int Number of processes to start. tmax: str Maximum time for task. Examples: "40s", "30m", "20h" and "2d". weight: float Weight of task. See :ref:`task_weight`. """ self.cores = cores or 1 self.nodename = nodename self.tmax = tmax or 600 # seconds self.weight = weight if processes == 0: self.processes = self.cores else: self.processes = processes def set_default_weight(self, weight: float) -> None: if self.weight == -1.0: self.weight = weight
[docs] @staticmethod def from_string(s: str) -> Resources: """Create Resource object from string. >>> r = Resources.from_string('16:1:xeon8:2h') >>> r Resources(cores=16, processes=1, tmax=7200, nodename='xeon8') >>> print(r) 16:1:xeon8:2h >>> Resources.from_string('16:1m') Resources(cores=16, tmax=60) >>> r = Resources.from_string('16:1m:25') >>> r Resources(cores=16, tmax=60, weight=25.0) >>> print(r) 16:1m:25 """ nodename = '' processes = 0 weight = -1.0 try: p1, *parts = s.split(':') cores = int(p1) if parts[-1][-1] not in 'smhd': weight = float(parts.pop()) tmax = T(parts.pop()) for p in parts: if p.isdigit(): processes = int(p) else: nodename = p except (ValueError, KeyError, IndexError) as ex: raise ValueError( f'Bad resource string: {s!r}. See {RESOURCES_URL}') from ex return Resources(cores, nodename, processes, tmax, weight)
@staticmethod def from_args_and_command(cores: int = 0, nodename: str = '', processes: int = 0, tmax: str = '', weight: float = -1.0, resources: str = '', command: Command = None, path: Path = None) -> Resources: all_defaults = (cores == 0 and nodename == '' and processes == 0 and tmax == '' and weight == -1.0) if all_defaults: if resources: return Resources.from_string(resources) assert command is not None and path is not None res = command.read_resources(path) if res is not None: return res else: if resources != '': url = 'https://myqueue.readthedocs.io/en/latest' raise ValueError( f'resources={resources!r} can\'t be combined with ' '"cores", "nodename", "processes", "tmax" or "weight". ' f'See {url}/documentation.html#resources') return Resources(cores, nodename, processes, T(tmax or '10m'), weight) def __str__(self) -> str: s = str(self.cores) if self.processes != self.cores: s += ':' + str(self.processes) if self.nodename: s += ':' + self.nodename s += ':' + seconds_to_short_time_string(self.tmax) if self.weight > 0.0: s += f':{int(self.weight)}' return s def __repr__(self) -> str: args = ', '.join(f'{key}={value!r}' for key, value in self.todict().items()) return f'Resources({args})'
[docs] def todict(self) -> dict[str, Any]: """Convert to dict.""" dct: dict[str, float | int | str] = {'cores': self.cores} if self.processes != self.cores: dct['processes'] = self.processes if self.tmax != 600: dct['tmax'] = self.tmax if self.nodename: dct['nodename'] = self.nodename if self.weight != -1.0: dct['weight'] = self.weight return dct
[docs] def bigger(self, state: State, nodelist: list[Node], maxtmax: int = 2 * 24 * 3600) -> Resources: """Create new Resource object with larger tmax or more cores. >>> nodes = [('node1', {'cores': 8})] >>> r = Resources(tmax=100, cores=8) >>> r.bigger(State.TIMEOUT, nodes) Resources(cores=8, tmax=200) >>> r.bigger(State.MEMORY, nodes) Resources(cores=16, tmax=100) """ new = Resources(**self.todict()) if state == 'TIMEOUT': new.tmax = int(min(self.tmax * 2, maxtmax)) elif state == 'MEMORY': coreslist = sorted({dct['cores'] for name, dct in nodelist}) nnodes = 1 while True: for c in coreslist: cores = nnodes * c if cores > self.cores: break else: nnodes += 1 continue break if self.processes == self.cores: new.processes = cores new.cores = cores else: raise ValueError return new
[docs] def select(self, nodelist: list[Node]) -> tuple[int, str, dict[str, Any]]: """Select appropriate node. >>> nodes = [('node1', {'cores': 16}), ... ('node2', {'cores': 8}), ... ('fatnode2', {'cores': 8})] >>> Resources(cores=24).select(nodes) (3, 'node2', {'cores': 8}) >>> Resources(cores=32).select(nodes) (2, 'node1', {'cores': 16}) >>> Resources(cores=32, nodename='fatnode2').select(nodes) (4, 'fatnode2', {'cores': 8}) >>> Resources(cores=1).select(nodes) (1, 'node2', {'cores': 8}) >>> Resources(cores=32, nodename='node3').select(nodes) Traceback (most recent call last): ... ValueError: No such node: node3 """ if self.nodename: for name, dct in nodelist: if name == self.nodename: break else: # no break raise ValueError(f'No such node: {self.nodename}') else: for name, dct in nodelist: if dct.get('special'): continue if self.cores % dct['cores'] == 0: break else: # no break node = min(nodelist, key=lambda node: node[1]['cores']) name, dct = node nodes, rest = divmod(self.cores, dct['cores']) if rest: nodes += 1 return nodes, name, dct