Ray 集体通信库#

Ray 集体通信库(ray.util.collective) 提供了一组原生的集体原语,用于在分布式 CPU 或 GPU 之间进行通信。

Ray 集体通信库

  • 使 Ray actor 和 task 进程之间的集体通信效率提高了 10 倍,

  • 可在分布式 CPU 和 GPU 上运行,

  • 使用 NCCL 和 GLOO 作为可选的高性能通信后端,

  • 适用于 Ray 上的分布式 ML 程序。

集体原语支持矩阵#

查看下面的支持矩阵,了解不同后端的所有集体调用的当前支持情况。

Backend

gloo

nccl

Device

CPU

GPU

CPU

GPU

send

recv

broadcast

allreduce

reduce

allgather

gather

scatter

reduce_scatter

all-to-all

barrier

支持的张量类型#

  • torch.Tensor

  • numpy.ndarray

  • cupy.ndarray

用法#

安装及引用#

Ray 集体库与发布的 Ray 轮包捆绑在一起。除了 Ray 之外,用户还需要安装 pygloocupy 以便使用 GLOO 和 NCCL 后端的集体通信。

pip install pygloo
pip install cupy-cudaxxx # replace xxx with the right cuda version in your environment

要使用这些 APIs,请通过以下方式在 actor/task 或 driver 代码中导入 collective 包:

import ray.util.collective as col

初始化#

集合函数在集合组上运行。 集合组包含一组进程(在 Ray 中,它们通常是 Ray 管理的 actor 或 task),这些进程将一起进入集合函数调用。 在进行集体调用之前,用户需要将一组 actor/task 静态地声明为一个集体组。

以下是一个示例代码片段,使用两个 API init_collective_group()declare_collective_group() 在几个远程 actor 之间初始化集体组。 参考 APIs 以获取这两个 API 的详细描述。

import ray
import ray.util.collective as collective

import cupy as cp


@ray.remote(num_gpus=1)
class Worker:
   def __init__(self):
       self.send = cp.ones((4, ), dtype=cp.float32)
       self.recv = cp.zeros((4, ), dtype=cp.float32)

   def setup(self, world_size, rank):
       collective.init_collective_group(world_size, rank, "nccl", "default")
       return True

   def compute(self):
       collective.allreduce(self.send, "default")
       return self.send

   def destroy(self):
       collective.destroy_group()

# imperative
num_workers = 2
workers = []
init_rets = []
for i in range(num_workers):
   w = Worker.remote()
   workers.append(w)
   init_rets.append(w.setup.remote(num_workers, i))
_ = ray.get(init_rets)
results = ray.get([w.compute.remote() for w in workers])


# declarative
for i in range(num_workers):
   w = Worker.remote()
   workers.append(w)
_options = {
   "group_name": "177",
   "world_size": 2,
   "ranks": [0, 1],
   "backend": "nccl"
}
collective.declare_collective_group(workers, **_options)
results = ray.get([w.compute.remote() for w in workers])

注意,对于相同的 actor/task 进程集合,可以构建多个集合组,其中 group_name 是它们的唯一标识符。 这使得可以在不同(子)进程集之间指定复杂的通信模式。

集合通信#

检查 支持矩阵 以了解支持的集体调用和后端的当前状态。

注意,当前的集体通信 API 是命令式的,并表现出以下行为:

  • 所有的集体 API 都是同步阻塞调用

  • 由于每个 API 仅指定集体通信的一部分,因此预计该 API 将由(预先声明的)集体组的每个参与进程调用。当所有进程都进行了调用并相互会合后,集体通信就会发生并继续进行。

  • API 是命令式的,并且通信发生在带外 —— 它们需要在集体流程( actor /任务)代码内使用。

一个使用 ray.util.collective.allreduce 的示例如下:

import ray
import cupy
import ray.util.collective as col


@ray.remote(num_gpus=1)
class Worker:
    def __init__(self):
        self.buffer = cupy.ones((10,), dtype=cupy.float32)

    def compute(self):
        col.allreduce(self.buffer, "default")
        return self.buffer

# Create two actors A and B and create a collective group following the previous example...
A = Worker.remote()
B = Worker.remote()
# Invoke allreduce remotely
ray.get([A.compute.remote(), B.compute.remote()])

点对点通信#

ray.util.collective 也提供了进程之间的 P2P 发送/接收通信。

send/recv 与集体函数表现出相同的行为: 它们是同步阻塞调用 - 必须在成对进程上一起调用一对 send 和 receive,以便指定整个通信, 并且必须成功地彼此会合才能继续。请参阅下面的代码示例:

import ray
import cupy
import ray.util.collective as col


@ray.remote(num_gpus=1)
class Worker:
    def __init__(self):
        self.buffer = cupy.ones((10,), dtype=cupy.float32)

    def get_buffer(self):
        return self.buffer

    def do_send(self, target_rank=0):
        # this call is blocking
        col.send(target_rank)

    def do_recv(self, src_rank=0):
        # this call is blocking
        col.recv(src_rank)

    def do_allreduce(self):
        # this call is blocking as well
        col.allreduce(self.buffer)
        return self.buffer

# Create two actors
A = Worker.remote()
B = Worker.remote()

# Put A and B in a collective group
col.declare_collective_group([A, B], options={rank=[0, 1], ...})

# let A to send a message to B; a send/recv has to be specified once at each worker
ray.get([A.do_send.remote(target_rank=1), B.do_recv.remote(src_rank=0)])

# An anti-pattern: the following code will hang, because it doesn't instantiate the recv side call
ray.get([A.do_send.remote(target_rank=1)])

单 GPU 和多 GPU 集体基元#

在许多集群设置中,一台机器通常具有多个 GPU; 有效利用GPU-GPU带宽,例如 NVLINK, 可以显着提高通信性能。

ray.util.collective 支持多GPU集体调用,在这种情况下,一个进程 (actor/tasks) 管理超过1个GPU(例如,通过 ray.remote(num_gpus=4))。 使用这些多 GPU 集体函数通常比使用单 GPU 集体 API 更具性能优势, 并且生成的进程数量等于 GPU 数量。 请参阅 API 参考,了解多 GPU 集体 API 的签名。

另请注意,所有多 GPU API 均具有以下限制:

  • 仅支持 NCCL 后端。

  • 进行多GPU集体或P2P调用的集体进程需要拥有相同数量的GPU设备。

  • 多 GPU 集体函数的输入通常是张量列表,每个张量位于调用者进程拥有的不同 GPU 设备上。

下面提供了利用多 GPU 集体 API 的示例代码:

import ray
import ray.util.collective as collective

import cupy as cp
from cupy.cuda import Device


@ray.remote(num_gpus=2)
class Worker:
   def __init__(self):
       with Device(0):
           self.send1 = cp.ones((4, ), dtype=cp.float32)
       with Device(1):
           self.send2 = cp.ones((4, ), dtype=cp.float32) * 2
       with Device(0):
           self.recv1 = cp.ones((4, ), dtype=cp.float32)
       with Device(1):
           self.recv2 = cp.ones((4, ), dtype=cp.float32) * 2

   def setup(self, world_size, rank):
       self.rank = rank
       collective.init_collective_group(world_size, rank, "nccl", "177")
       return True

   def allreduce_call(self):
       collective.allreduce_multigpu([self.send1, self.send2], "177")
       return [self.send1, self.send2]

   def p2p_call(self):
       if self.rank == 0:
          collective.send_multigpu(self.send1 * 2, 1, 1, "8")
       else:
          collective.recv_multigpu(self.recv2, 0, 0, "8")
       return self.recv2

# Note that the world size is 2 but there are 4 GPUs.
num_workers = 2
workers = []
init_rets = []
for i in range(num_workers):
   w = Worker.remote()
   workers.append(w)
   init_rets.append(w.setup.remote(num_workers, i))
a = ray.get(init_rets)
results = ray.get([w.allreduce_call.remote() for w in workers])
results = ray.get([w.p2p_call.remote() for w in workers])

更多资源#

以下链接提供了有关如何有效利用 ray.util.collective 库的有用资源。

API 参考#

APIs exposed under the namespace ray.util.collective.

class ray.util.collective.collective.GroupManager[source]#

Use this class to manage the collective groups we created so far.

Each process will have an instance of GroupManager. Each process could belong to multiple collective groups. The membership information and other metadata are stored in the global _group_mgr object.

create_collective_group(backend, world_size, rank, group_name)[source]#

The entry to create new collective groups in the manager.

Put the registration and the group information into the manager metadata as well.

get_group_by_name(group_name)[source]#

Get the collective group handle by its name.

destroy_collective_group(group_name)[source]#

Group destructor.

ray.util.collective.collective.is_group_initialized(group_name)[source]#

Check if the group is initialized in this process by the group name.

ray.util.collective.collective.init_collective_group(world_size: int, rank: int, backend='nccl', group_name: str = 'default')[source]#

Initialize a collective group inside an actor process.

Parameters
  • world_size – the total number of processes in the group.

  • rank – the rank of the current process.

  • backend – the CCL backend to use, NCCL or GLOO.

  • group_name – the name of the collective group.

Returns

None

ray.util.collective.collective.create_collective_group(actors, world_size: int, ranks: List[int], backend='nccl', group_name: str = 'default')[source]#

Declare a list of actors as a collective group.

Note: This function should be called in a driver process.

Parameters
  • actors – a list of actors to be set in a collective group.

  • world_size – the total number of processes in the group.

  • ranks (List[int]) – the rank of each actor.

  • backend – the CCL backend to use, NCCL or GLOO.

  • group_name – the name of the collective group.

Returns

None

ray.util.collective.collective.destroy_collective_group(group_name: str = 'default') None[source]#

Destroy a collective group given its group name.

ray.util.collective.collective.get_rank(group_name: str = 'default') int[source]#

Return the rank of this process in the given group.

Parameters

group_name – the name of the group to query

Returns

the rank of this process in the named group, -1 if the group does not exist or the process does not belong to the group.

ray.util.collective.collective.get_collective_group_size(group_name: str = 'default') int[source]#

Return the size of the collective group with the given name.

Parameters

group_name – the name of the group to query

Returns

The world size of the collective group, -1 if the group does

not exist or the process does not belong to the group.

ray.util.collective.collective.allreduce(tensor, group_name: str = 'default', op=ReduceOp.SUM)[source]#

Collective allreduce the tensor across the group.

Parameters
  • tensor – the tensor to be all-reduced on this process.

  • group_name – the collective group name to perform allreduce.

  • op – The reduce operation.

Returns

None

ray.util.collective.collective.allreduce_multigpu(tensor_list: list, group_name: str = 'default', op=ReduceOp.SUM)[source]#

Collective allreduce a list of tensors across the group.

Parameters
  • tensor_list (List[tensor]) – list of tensors to be allreduced, each on a GPU.

  • group_name – the collective group name to perform allreduce.

Returns

None

ray.util.collective.collective.barrier(group_name: str = 'default')[source]#

Barrier all processes in the collective group.

Parameters

group_name – the name of the group to barrier.

Returns

None

ray.util.collective.collective.reduce(tensor, dst_rank: int = 0, group_name: str = 'default', op=ReduceOp.SUM)[source]#

Reduce the tensor across the group to the destination rank.

Parameters
  • tensor – the tensor to be reduced on this process.

  • dst_rank – the rank of the destination process.

  • group_name – the collective group name to perform reduce.

  • op – The reduce operation.

Returns

None

ray.util.collective.collective.reduce_multigpu(tensor_list: list, dst_rank: int = 0, dst_tensor: int = 0, group_name: str = 'default', op=ReduceOp.SUM)[source]#

Reduce the tensor across the group to the destination rank and destination tensor.

Parameters
  • tensor_list – the list of tensors to be reduced on this process; each tensor located on a GPU.

  • dst_rank – the rank of the destination process.

  • dst_tensor – the index of GPU at the destination.

  • group_name – the collective group name to perform reduce.

  • op – The reduce operation.

Returns

None

ray.util.collective.collective.broadcast(tensor, src_rank: int = 0, group_name: str = 'default')[source]#

Broadcast the tensor from a source process to all others.

Parameters
  • tensor – the tensor to be broadcasted (src) or received (destination).

  • src_rank – the rank of the source process.

  • group_name – the collective group name to perform broadcast.

Returns

None

ray.util.collective.collective.broadcast_multigpu(tensor_list, src_rank: int = 0, src_tensor: int = 0, group_name: str = 'default')[source]#

Broadcast the tensor from a source GPU to all other GPUs.

Parameters
  • tensor_list – the tensors to broadcast (src) or receive (dst).

  • src_rank – the rank of the source process.

  • src_tensor – the index of the source GPU on the source process.

  • group_name – the collective group name to perform broadcast.

Returns

None

ray.util.collective.collective.allgather(tensor_list: list, tensor, group_name: str = 'default')[source]#

Allgather tensors from each process of the group into a list.

Parameters
  • tensor_list – the results, stored as a list of tensors.

  • tensor – the tensor (to be gathered) in the current process

  • group_name – the name of the collective group.

Returns

None

ray.util.collective.collective.allgather_multigpu(output_tensor_lists: list, input_tensor_list: list, group_name: str = 'default')[source]#

Allgather tensors from each gpus of the group into lists.

Parameters
  • output_tensor_lists (List[List[tensor]]) – gathered results, with shape must be num_gpus * world_size * shape(tensor).

  • input_tensor_list – (List[tensor]): a list of tensors, with shape num_gpus * shape(tensor).

  • group_name – the name of the collective group.

Returns

None

ray.util.collective.collective.reducescatter(tensor, tensor_list: list, group_name: str = 'default', op=ReduceOp.SUM)[source]#

Reducescatter a list of tensors across the group.

Reduce the list of the tensors across each process in the group, then scatter the reduced list of tensors – one tensor for each process.

Parameters
  • tensor – the resulted tensor on this process.

  • tensor_list – The list of tensors to be reduced and scattered.

  • group_name – the name of the collective group.

  • op – The reduce operation.

Returns

None

ray.util.collective.collective.reducescatter_multigpu(output_tensor_list, input_tensor_lists, group_name: str = 'default', op=ReduceOp.SUM)[source]#

Reducescatter a list of tensors across all GPUs.

Parameters
  • output_tensor_list – the resulted list of tensors, with shape: num_gpus * shape(tensor).

  • input_tensor_lists – the original tensors, with shape: num_gpus * world_size * shape(tensor).

  • group_name – the name of the collective group.

  • op – The reduce operation.

Returns

None.

ray.util.collective.collective.send(tensor, dst_rank: int, group_name: str = 'default')[source]#

Send a tensor to a remote process synchronously.

Parameters
  • tensor – the tensor to send.

  • dst_rank – the rank of the destination process.

  • group_name – the name of the collective group.

Returns

None

ray.util.collective.collective.send_multigpu(tensor, dst_rank: int, dst_gpu_index: int, group_name: str = 'default', n_elements: int = 0)[source]#

Send a tensor to a remote GPU synchronously.

The function asssume each process owns >1 GPUs, and the sender process and receiver process has equal nubmer of GPUs.

Parameters
  • tensor – the tensor to send, located on a GPU.

  • dst_rank – the rank of the destination process.

  • dst_gpu_index – the destination gpu index.

  • group_name – the name of the collective group.

  • n_elements – if specified, send the next n elements from the starting address of tensor.

Returns

None

ray.util.collective.collective.recv(tensor, src_rank: int, group_name: str = 'default')[source]#

Receive a tensor from a remote process synchronously.

Parameters
  • tensor – the received tensor.

  • src_rank – the rank of the source process.

  • group_name – the name of the collective group.

Returns

None

ray.util.collective.collective.recv_multigpu(tensor, src_rank: int, src_gpu_index: int, group_name: str = 'default', n_elements: int = 0)[source]#

Receive a tensor from a remote GPU synchronously.

The function asssume each process owns >1 GPUs, and the sender process and receiver process has equal nubmer of GPUs.

Parameters
  • tensor – the received tensor, located on a GPU.

  • src_rank – the rank of the source process.

  • src_gpu_index (int) –

  • group_name – the name of the collective group.

Returns

None

ray.util.collective.collective.synchronize(gpu_id: int)[source]#

Synchronize the current process to a give device.

Parameters

gpu_id – the GPU device id to synchronize.

Returns

None