模式:使用资源限制并发运行任务的数量#

在这种模式中,我们使用 资源 来限制并发运行的任务数量。

默认情况下,每个 Ray 任务需要 1 个 CPU,每个 Ray Actor 需要 0 个 CPU,因此调度程序将任务并发限制为可用 CPU,将 Actor 并发限制为无限。 使用 1 个以上 CPU(例如通过多线程)的任务可能会因并发任务的干扰而变慢,但除此之外可以安全运行。

但是,如果任务或 actor 使用的内存超过其应有的份额,则可能会使节点过载并导致 OOM 等问题。 如果是这种情况,我们可以通过增加它们请求的资源量来减少每个节点上同时运行的任务或 actor 的数量。 这是有效的,因为 Ray 确保给定节点上所有同时运行的任务和 actor 的资源需求总和不超过该节点的总资源。

Note

对于 actor 任务,正在运行的 actor 的数量限制了我们可以同时运行的 actor 任务的数量。

用例#

您有一个数据处理工作负载,它使用 Ray remote functions 独立处理每个输入文件。 由于每个任务都需要将输入数据加载到堆内存中并进行处理,因此运行过多的任务可能会导致 OOM。 在这种情况下,您可以使用 memory 资源来限制并发运行的任务数量(使用其他资源如 num_cpus 也可以实现相同的目标)。 请注意,与 num_cpus 类似, memory 资源请求是 物理逻辑 的,这意味着如果每个任务的物理内存使用量超过此数量,Ray 将不会强制执行。

代码#

无限制:

import ray

# Assume this Ray node has 16 CPUs and 16G memory.
ray.init()


@ray.remote
def process(file):
    # Actual work is reading the file and process the data.
    # Assume it needs to use 2G memory.
    pass


NUM_FILES = 1000
result_refs = []
for i in range(NUM_FILES):
    # By default, process task will use 1 CPU resource and no other resources.
    # This means 16 tasks can run concurrently
    # and will OOM since 32G memory is needed while the node only has 16G.
    result_refs.append(process.remote(f"{i}.csv"))
ray.get(result_refs)

有限制:

result_refs = []
for i in range(NUM_FILES):
    # Now each task will use 2G memory resource
    # and the number of concurrently running tasks is limited to 8.
    # In this case, setting num_cpus to 2 has the same effect.
    result_refs.append(
        process.options(memory=2 * 1024 * 1024 * 1024).remote(f"{i}.csv")
    )
ray.get(result_refs)