Shortcuts

hfai.client

任务类

Experiment

任务类

创建任务

create_experiment_v2

根据 v2 配置文件创建任务

bind_hf_except_hook

该函数用于将 Process 类绑定异常 hook,在子进程发生异常时通知 server 将其强行关闭,并启动自我检查,发现硬件故障重启该任务

get_experiment

通过 name、id 或 chain_id 获取训练任务,不能都为空,只能获取自己的任务

get_experiments

获取自己最近提交的任务

self_health_check

对当前机器做系统检查,检查通过会退出该任务,检查失败会重启该任务 :param pid: :return:

任务管理

set_watchdog_time

设置任务超时时间,规定时间内无 log 该任务会被认为已失败,默认为 1800 秒

set_whole_life_state

设置 whole_life_state

get_whole_life_state

获取当前 chain_id 的上一个 id 任务留下来的 whole_life_state

receive_suspend_command

获取该任务是否即将被打断

go_suspend

通知 server 该任务可以被打断

EXP_PRIORITY

set_priority

设置当前任务的优先级,注意如果你没有该优先级的权限可能会导致任务被立刻打断

class hfai.client.api.Experiment(implement_cls=None, **kwargs)[source]

任务类

包含如下属性:

  • id (int): 任务 id

  • nb_name (str): 任务名

  • user_name (str): 用户名

  • code_file (str): 训练任务代码的路径

  • workspace (str): 训练任务代码的 workspace

  • config_json (dict): 任务的配置信息,包括:priority (int),environment (dict[str, str]),whole_life_state (int)

  • group (str): 任务所在组

  • nodes (int): 任务占用节点数量

  • assigned_nodes (list[str]): 分配的节点

  • whole_life_state (int): 当前设置的 whole_life_state

  • star(bool): 是否是星标任务

  • first_id (int): 整个 chain_id 中最小的 id

  • backend (str): 任务所在环境

  • task_type (str): 任务类型

  • queue_status (str): 任务当前运行状态

  • priority (int): 任务当前的优先级

  • chain_id (str): 任务 chain_id

  • stop_code (int): 任务退出情况

  • worker_status (str): 任务结束时的状态

  • begin_at (str): 任务开始时间

  • end_at (str): 任务结束时间

  • created_at (str): 任务创建时间

  • id_list (list[int]): 整个 chain_id 的所有 id

  • begin_at_list (list[str]): 整个 chain_id 所有 id 的启动时间

  • end_at_list (list[str]): 整个 chain_id 所有 id 的结束时间

  • stop_code_list (list[int]): 整个 chain_id 所有 id 的退出情况

  • whole_life_state_list (list[int]): 整个 chain_id 所有 id 的最新 whole_life_state

  • _pods_ (list[Pod]): 该任务每个 pod 的各项参数

Examples:

from hfai.client import get_experiment
import asyncio
experiment: Experiment = asyncio.run(get_experiment(id=1))
log = asyncio.run(experiment.log_ng(rank=0))  # 获取 rank0 的日志
asyncio.run(experiment.stop())  # 结束该任务
async log_ng(rank, last_seen=None)

通过rank获取日志,last_seen用于断点续读

async suspend(restart_delay=0)

打断该任务,restart_delay 暂未实现

async stop()

结束该任务

async hfai.client.create_experiment_v2(config, **kwargs)

根据 v2 配置文件创建任务

配置文件示例:

version: 2
name: test_create_experiment
priority: 20 # 可选,内部用户 50 40 30 20, 外部用户 0, 不填为 -1
spec: # 任务定义,根据定义,将在集群上做下面的运行
  # cd /xxx/xxx; YOUR_ENV_KEY=YOUR_ENV_KEY python xxx.py --config config
  workspace: /xxx/xxx               # 必填
  entrypoint: xxx.py                # 必填, 若 entrypoint_binary 为 False 或者不填,那么支持 .py 或者 .sh, .sh 则使用 bash xxx.sh 运行;
                                    #      若 entrypoint_binary 为 True,那么认为 entrypoint 是可执行文件,直接使用 <entrypoint> 运行
  parameters: --config config       # 可选
  environments:                     # 可选
    YOUR_ENV_KEY: YOUR_ENV_VALUE
  entrypoint_executable: False      # 可选,不填则默认为 False,若为 True,那么认为 entrypoint 是可执行文件
resource:
  image: registry.high-flyer.cn/hfai/docker_ubuntu2004:20220630.2   # 可选,不指定,默认 default,通过 hfai 上传的 image,或者集群内建的 template
  group: jd_a100#heavy                                              # 可选, jd_a100, jd_a100#heavy, jd_a100#light, jd_a100#A, jd_a100#B
  node_count: 1                                                     # 必填
options: # 可选
  whole_life_state: 1   # hfai.get_whole_life_state() => 1
  mount_code: 2         # use 3fs prod mount
  py_venv: 202111 # 会在运行脚本前,source 一下 python 环境,根据输入不同选择 hf_env 或 hfai_env。
                  # 分为两类:1. 202111 => source haienv 202111; 2.1 hfai_env_name[hfai_env_owner] => source haienv hfai_env_name -u hfai_env_owner
                  #                                            2.2 hfai_env_name => source haienv hfai_env_name
                  # hf_env 可选: 202105, 202111, 202207, 其中202111会根据镜像选择py3.6或者py3.8
  override_node_resource: # 覆盖默认的resource选项
    cpu: 0
    memory: 0
Parameters

config (str, StringIO, munch.Munch) – 配置路径,yaml 的 string,或 Munch

Returns

生成的任务

Return type

Experiment

Examples:

from hfai.client import create_experiment
import asyncio
asyncio.run(create_experiment('config/path'))  # python3.8以下可能不支持asyncio.run的用法,需要用其它异步调用接口

await create_experiment('''
        version: 2
        name: test_create_experiment
        priority: 20
        ... yaml file
''')
hfai.client.bind_hf_except_hook(f)[source]

该函数用于将 Process 类绑定异常 hook,在子进程发生异常时通知 server 将其强行关闭,并启动自我检查,发现硬件故障重启该任务

Parameters

f (class) – 传进来的 Process 类

Examples

>>> from hfai.client import bind_hf_except_hook
>>> from torch.multiprocessing import Process
>>> bind_hf_except_hook(Process)
async hfai.client.get_experiment(name=None, id=None, chain_id=None, **kwargs)[source]

通过 name、id 或 chain_id 获取训练任务,不能都为空,只能获取自己的任务

Parameters
  • name (str) – 任务名

  • id (int) – 任务 id

  • chain_id (str) – 任务 chain_id

Returns

返回的任务

Return type

Experiment

Examples

>>> from hfai.client import get_experiment
>>> import asyncio
>>> asyncio.run(get_experiment(id=1))  # python3.8以下可能不支持asyncio.run的用法,需要用其它异步调用接口
async hfai.client.get_experiments(page, page_size, only_star=False, select_pods=True, nb_name_pattern=None, task_type_list=['training', 'virtual', 'background'], worker_status_list=[], queue_status_list=[], tag_list=[], **kwargs)[source]

获取自己最近提交的任务

Parameters
  • page (int) – 第几页

  • page_size (int) – 每一页的任务个数

  • only_star (bool) – 只考虑 star 的任务(默认为 False

  • select_pods (bool) – 是否查询 pod

  • nb_name_pattern (str) – 查询 nb_name 带有这个字符串的任务

  • task_type_list (list[str]) – 查询 task_type,默认拿 training 和 validation

  • worker_status_list (list[str]) – 查询 worker_status

  • queue_status_list (list[str]) – 查询 queue_status

  • tag_list – 查询 tag

Returns

符合条件的任务总数,返回的任务列表

Return type

int, list[Experiment]

Examples

>>> from hfai.client import get_experiments
>>> import asyncio
>>> asyncio.run(get_experiments(page=1, page_size=10))  # python3.8以下可能不支持asyncio.run的用法,需要用其它异步调用接口
hfai.client.self_health_check(pid=204)[source]

对当前机器做系统检查,检查通过会退出该任务,检查失败会重启该任务 :param pid: :return:

hfai.client.set_watchdog_time(seconds)[source]

设置任务超时时间,规定时间内无 log 该任务会被认为已失败,默认为 1800 秒

Parameters

seconds (int) – 超时时间,单位为秒

Examples

>>> from hfai.client import set_watchdog_time
>>> set_watchdog_time(1800)
hfai.client.set_whole_life_state(state, timeout=500, raise_exception=True)[source]

设置 whole_life_state

Parameters
  • state (int) – 想要设置的 whole_life_state

  • timeout (int) – 设置请求超时时间,默认为 500 秒

  • raise_exception (bool) – 调用runtime接口时发生异常是否需要抛出,默认为抛出

Examples

>>> from hfai.client import set_whole_life_state
>>> set_whole_life_state(100)
hfai.client.get_whole_life_state()[source]

获取当前 chain_id 的上一个 id 任务留下来的 whole_life_state

Returns

whole_life_state

Return type

int

Examples

>>> from hfai.client import get_whole_life_state
>>> get_whole_life_state()
hfai.client.receive_suspend_command(timeout=500, raise_exception=False)[source]

获取该任务是否即将被打断

Parameters
  • timeout (int) – 设置请求超时时间,默认为 500 秒

  • raise_exception (bool) – 调用runtime接口时发生异常是否需要抛出,默认为不抛出

Returns

表示是否即将被打断

Return type

bool

Examples

>>> from hfai.client import receive_suspend_command
>>> receive_suspend_command()
hfai.client.go_suspend(timeout=500, raise_exception=False)[source]

通知 server 该任务可以被打断

Parameters
  • timeout (int) – 设置请求超时时间,默认为 500 秒

  • raise_exception (bool) – 调用runtime接口时发生异常是否需要抛出,默认为不抛出

Examples

>>> from hfai.client import go_suspend
>>> go_suspend()
class hfai.client.EXP_PRIORITY[source]
hfai.client.set_priority(priority, timeout=500, raise_exception=False)[source]

设置当前任务的优先级,注意如果你没有该优先级的权限可能会导致任务被立刻打断

Parameters
  • priority (int) – 设置的任务优先级

  • timeout (int) – 设置请求超时时间,默认为 500 秒

  • raise_exception (bool) – 调用runtime接口时发生异常是否需要抛出,默认为不抛出

Returns

是否设置成功

Return type

bool

Examples

>>> from hfai.client import set_priority, EXP_PRIORITY
>>> set_priority(EXP_PRIORITY.LOW)