Ray 集体通信库
Contents
Ray 集体通信库#
Ray 集体通信库(ray.util.collective) 提供了一组原生的集体原语,用于在分布式 CPU 或 GPU 之间进行通信。
Ray 集体通信库
使 Ray actor 和 task 进程之间的集体通信效率提高了 10 倍,
可在分布式 CPU 和 GPU 上运行,
使用 NCCL 和 GLOO 作为可选的高性能通信后端,
适用于 Ray 上的分布式 ML 程序。
集体原语支持矩阵#
查看下面的支持矩阵,了解不同后端的所有集体调用的当前支持情况。
Backend |
||||
|---|---|---|---|---|
Device |
CPU |
GPU |
CPU |
GPU |
send |
✔ |
✘ |
✘ |
✔ |
recv |
✔ |
✘ |
✘ |
✔ |
broadcast |
✔ |
✘ |
✘ |
✔ |
allreduce |
✔ |
✘ |
✘ |
✔ |
reduce |
✔ |
✘ |
✘ |
✔ |
allgather |
✔ |
✘ |
✘ |
✔ |
gather |
✘ |
✘ |
✘ |
✘ |
scatter |
✘ |
✘ |
✘ |
✘ |
reduce_scatter |
✔ |
✘ |
✘ |
✔ |
all-to-all |
✘ |
✘ |
✘ |
✘ |
barrier |
✔ |
✘ |
✘ |
✔ |
支持的张量类型#
torch.Tensornumpy.ndarraycupy.ndarray
用法#
安装及引用#
Ray 集体库与发布的 Ray 轮包捆绑在一起。除了 Ray 之外,用户还需要安装 pygloo 或 cupy 以便使用 GLOO 和 NCCL 后端的集体通信。
pip install pygloo
pip install cupy-cudaxxx # replace xxx with the right cuda version in your environment
要使用这些 APIs,请通过以下方式在 actor/task 或 driver 代码中导入 collective 包:
import ray.util.collective as col
初始化#
集合函数在集合组上运行。 集合组包含一组进程(在 Ray 中,它们通常是 Ray 管理的 actor 或 task),这些进程将一起进入集合函数调用。 在进行集体调用之前,用户需要将一组 actor/task 静态地声明为一个集体组。
以下是一个示例代码片段,使用两个 API init_collective_group() 和 declare_collective_group() 在几个远程 actor 之间初始化集体组。
参考 APIs 以获取这两个 API 的详细描述。
import ray
import ray.util.collective as collective
import cupy as cp
@ray.remote(num_gpus=1)
class Worker:
def __init__(self):
self.send = cp.ones((4, ), dtype=cp.float32)
self.recv = cp.zeros((4, ), dtype=cp.float32)
def setup(self, world_size, rank):
collective.init_collective_group(world_size, rank, "nccl", "default")
return True
def compute(self):
collective.allreduce(self.send, "default")
return self.send
def destroy(self):
collective.destroy_group()
# imperative
num_workers = 2
workers = []
init_rets = []
for i in range(num_workers):
w = Worker.remote()
workers.append(w)
init_rets.append(w.setup.remote(num_workers, i))
_ = ray.get(init_rets)
results = ray.get([w.compute.remote() for w in workers])
# declarative
for i in range(num_workers):
w = Worker.remote()
workers.append(w)
_options = {
"group_name": "177",
"world_size": 2,
"ranks": [0, 1],
"backend": "nccl"
}
collective.declare_collective_group(workers, **_options)
results = ray.get([w.compute.remote() for w in workers])
注意,对于相同的 actor/task 进程集合,可以构建多个集合组,其中 group_name 是它们的唯一标识符。
这使得可以在不同(子)进程集之间指定复杂的通信模式。
集合通信#
检查 支持矩阵 以了解支持的集体调用和后端的当前状态。
注意,当前的集体通信 API 是命令式的,并表现出以下行为:
所有的集体 API 都是同步阻塞调用
由于每个 API 仅指定集体通信的一部分,因此预计该 API 将由(预先声明的)集体组的每个参与进程调用。当所有进程都进行了调用并相互会合后,集体通信就会发生并继续进行。
API 是命令式的,并且通信发生在带外 —— 它们需要在集体流程( actor /任务)代码内使用。
一个使用 ray.util.collective.allreduce 的示例如下:
import ray
import cupy
import ray.util.collective as col
@ray.remote(num_gpus=1)
class Worker:
def __init__(self):
self.buffer = cupy.ones((10,), dtype=cupy.float32)
def compute(self):
col.allreduce(self.buffer, "default")
return self.buffer
# Create two actors A and B and create a collective group following the previous example...
A = Worker.remote()
B = Worker.remote()
# Invoke allreduce remotely
ray.get([A.compute.remote(), B.compute.remote()])
点对点通信#
ray.util.collective 也提供了进程之间的 P2P 发送/接收通信。
send/recv 与集体函数表现出相同的行为: 它们是同步阻塞调用 - 必须在成对进程上一起调用一对 send 和 receive,以便指定整个通信, 并且必须成功地彼此会合才能继续。请参阅下面的代码示例:
import ray
import cupy
import ray.util.collective as col
@ray.remote(num_gpus=1)
class Worker:
def __init__(self):
self.buffer = cupy.ones((10,), dtype=cupy.float32)
def get_buffer(self):
return self.buffer
def do_send(self, target_rank=0):
# this call is blocking
col.send(target_rank)
def do_recv(self, src_rank=0):
# this call is blocking
col.recv(src_rank)
def do_allreduce(self):
# this call is blocking as well
col.allreduce(self.buffer)
return self.buffer
# Create two actors
A = Worker.remote()
B = Worker.remote()
# Put A and B in a collective group
col.declare_collective_group([A, B], options={rank=[0, 1], ...})
# let A to send a message to B; a send/recv has to be specified once at each worker
ray.get([A.do_send.remote(target_rank=1), B.do_recv.remote(src_rank=0)])
# An anti-pattern: the following code will hang, because it doesn't instantiate the recv side call
ray.get([A.do_send.remote(target_rank=1)])
单 GPU 和多 GPU 集体基元#
在许多集群设置中,一台机器通常具有多个 GPU; 有效利用GPU-GPU带宽,例如 NVLINK, 可以显着提高通信性能。
ray.util.collective 支持多GPU集体调用,在这种情况下,一个进程 (actor/tasks) 管理超过1个GPU(例如,通过 ray.remote(num_gpus=4))。
使用这些多 GPU 集体函数通常比使用单 GPU 集体 API 更具性能优势,
并且生成的进程数量等于 GPU 数量。
请参阅 API 参考,了解多 GPU 集体 API 的签名。
另请注意,所有多 GPU API 均具有以下限制:
仅支持 NCCL 后端。
进行多GPU集体或P2P调用的集体进程需要拥有相同数量的GPU设备。
多 GPU 集体函数的输入通常是张量列表,每个张量位于调用者进程拥有的不同 GPU 设备上。
下面提供了利用多 GPU 集体 API 的示例代码:
import ray
import ray.util.collective as collective
import cupy as cp
from cupy.cuda import Device
@ray.remote(num_gpus=2)
class Worker:
def __init__(self):
with Device(0):
self.send1 = cp.ones((4, ), dtype=cp.float32)
with Device(1):
self.send2 = cp.ones((4, ), dtype=cp.float32) * 2
with Device(0):
self.recv1 = cp.ones((4, ), dtype=cp.float32)
with Device(1):
self.recv2 = cp.ones((4, ), dtype=cp.float32) * 2
def setup(self, world_size, rank):
self.rank = rank
collective.init_collective_group(world_size, rank, "nccl", "177")
return True
def allreduce_call(self):
collective.allreduce_multigpu([self.send1, self.send2], "177")
return [self.send1, self.send2]
def p2p_call(self):
if self.rank == 0:
collective.send_multigpu(self.send1 * 2, 1, 1, "8")
else:
collective.recv_multigpu(self.recv2, 0, 0, "8")
return self.recv2
# Note that the world size is 2 but there are 4 GPUs.
num_workers = 2
workers = []
init_rets = []
for i in range(num_workers):
w = Worker.remote()
workers.append(w)
init_rets.append(w.setup.remote(num_workers, i))
a = ray.get(init_rets)
results = ray.get([w.allreduce_call.remote() for w in workers])
results = ray.get([w.p2p_call.remote() for w in workers])
更多资源#
以下链接提供了有关如何有效利用 ray.util.collective 库的有用资源。
更多运行在
ray.util.collective.examples的 示例 。使用 Ray 集体库 扩展 Spacy 名称实体识别 (NER) 管道 。
实现了 AllReduce 策略 的数据并行分布式机器学习训练。
API 参考#
APIs exposed under the namespace ray.util.collective.
- class ray.util.collective.collective.GroupManager[source]#
Use this class to manage the collective groups we created so far.
Each process will have an instance of
GroupManager. Each process could belong to multiple collective groups. The membership information and other metadata are stored in the global_group_mgrobject.
- ray.util.collective.collective.is_group_initialized(group_name)[source]#
Check if the group is initialized in this process by the group name.
- ray.util.collective.collective.init_collective_group(world_size: int, rank: int, backend='nccl', group_name: str = 'default')[source]#
Initialize a collective group inside an actor process.
- Parameters
world_size – the total number of processes in the group.
rank – the rank of the current process.
backend – the CCL backend to use, NCCL or GLOO.
group_name – the name of the collective group.
- Returns
None
- ray.util.collective.collective.create_collective_group(actors, world_size: int, ranks: List[int], backend='nccl', group_name: str = 'default')[source]#
Declare a list of actors as a collective group.
Note: This function should be called in a driver process.
- Parameters
actors – a list of actors to be set in a collective group.
world_size – the total number of processes in the group.
ranks (List[int]) – the rank of each actor.
backend – the CCL backend to use, NCCL or GLOO.
group_name – the name of the collective group.
- Returns
None
- ray.util.collective.collective.destroy_collective_group(group_name: str = 'default') None[source]#
Destroy a collective group given its group name.
- ray.util.collective.collective.get_rank(group_name: str = 'default') int[source]#
Return the rank of this process in the given group.
- Parameters
group_name – the name of the group to query
- Returns
the rank of this process in the named group, -1 if the group does not exist or the process does not belong to the group.
- ray.util.collective.collective.get_collective_group_size(group_name: str = 'default') int[source]#
Return the size of the collective group with the given name.
- Parameters
group_name – the name of the group to query
- Returns
- The world size of the collective group, -1 if the group does
not exist or the process does not belong to the group.
- ray.util.collective.collective.allreduce(tensor, group_name: str = 'default', op=ReduceOp.SUM)[source]#
Collective allreduce the tensor across the group.
- Parameters
tensor – the tensor to be all-reduced on this process.
group_name – the collective group name to perform allreduce.
op – The reduce operation.
- Returns
None
- ray.util.collective.collective.allreduce_multigpu(tensor_list: list, group_name: str = 'default', op=ReduceOp.SUM)[source]#
Collective allreduce a list of tensors across the group.
- Parameters
tensor_list (List[tensor]) – list of tensors to be allreduced, each on a GPU.
group_name – the collective group name to perform allreduce.
- Returns
None
- ray.util.collective.collective.barrier(group_name: str = 'default')[source]#
Barrier all processes in the collective group.
- Parameters
group_name – the name of the group to barrier.
- Returns
None
- ray.util.collective.collective.reduce(tensor, dst_rank: int = 0, group_name: str = 'default', op=ReduceOp.SUM)[source]#
Reduce the tensor across the group to the destination rank.
- Parameters
tensor – the tensor to be reduced on this process.
dst_rank – the rank of the destination process.
group_name – the collective group name to perform reduce.
op – The reduce operation.
- Returns
None
- ray.util.collective.collective.reduce_multigpu(tensor_list: list, dst_rank: int = 0, dst_tensor: int = 0, group_name: str = 'default', op=ReduceOp.SUM)[source]#
Reduce the tensor across the group to the destination rank and destination tensor.
- Parameters
tensor_list – the list of tensors to be reduced on this process; each tensor located on a GPU.
dst_rank – the rank of the destination process.
dst_tensor – the index of GPU at the destination.
group_name – the collective group name to perform reduce.
op – The reduce operation.
- Returns
None
- ray.util.collective.collective.broadcast(tensor, src_rank: int = 0, group_name: str = 'default')[source]#
Broadcast the tensor from a source process to all others.
- Parameters
tensor – the tensor to be broadcasted (src) or received (destination).
src_rank – the rank of the source process.
group_name – the collective group name to perform broadcast.
- Returns
None
- ray.util.collective.collective.broadcast_multigpu(tensor_list, src_rank: int = 0, src_tensor: int = 0, group_name: str = 'default')[source]#
Broadcast the tensor from a source GPU to all other GPUs.
- Parameters
tensor_list – the tensors to broadcast (src) or receive (dst).
src_rank – the rank of the source process.
src_tensor – the index of the source GPU on the source process.
group_name – the collective group name to perform broadcast.
- Returns
None
- ray.util.collective.collective.allgather(tensor_list: list, tensor, group_name: str = 'default')[source]#
Allgather tensors from each process of the group into a list.
- Parameters
tensor_list – the results, stored as a list of tensors.
tensor – the tensor (to be gathered) in the current process
group_name – the name of the collective group.
- Returns
None
- ray.util.collective.collective.allgather_multigpu(output_tensor_lists: list, input_tensor_list: list, group_name: str = 'default')[source]#
Allgather tensors from each gpus of the group into lists.
- Parameters
output_tensor_lists (List[List[tensor]]) – gathered results, with shape must be num_gpus * world_size * shape(tensor).
input_tensor_list – (List[tensor]): a list of tensors, with shape num_gpus * shape(tensor).
group_name – the name of the collective group.
- Returns
None
- ray.util.collective.collective.reducescatter(tensor, tensor_list: list, group_name: str = 'default', op=ReduceOp.SUM)[source]#
Reducescatter a list of tensors across the group.
Reduce the list of the tensors across each process in the group, then scatter the reduced list of tensors – one tensor for each process.
- Parameters
tensor – the resulted tensor on this process.
tensor_list – The list of tensors to be reduced and scattered.
group_name – the name of the collective group.
op – The reduce operation.
- Returns
None
- ray.util.collective.collective.reducescatter_multigpu(output_tensor_list, input_tensor_lists, group_name: str = 'default', op=ReduceOp.SUM)[source]#
Reducescatter a list of tensors across all GPUs.
- Parameters
output_tensor_list – the resulted list of tensors, with shape: num_gpus * shape(tensor).
input_tensor_lists – the original tensors, with shape: num_gpus * world_size * shape(tensor).
group_name – the name of the collective group.
op – The reduce operation.
- Returns
None.
- ray.util.collective.collective.send(tensor, dst_rank: int, group_name: str = 'default')[source]#
Send a tensor to a remote process synchronously.
- Parameters
tensor – the tensor to send.
dst_rank – the rank of the destination process.
group_name – the name of the collective group.
- Returns
None
- ray.util.collective.collective.send_multigpu(tensor, dst_rank: int, dst_gpu_index: int, group_name: str = 'default', n_elements: int = 0)[source]#
Send a tensor to a remote GPU synchronously.
The function asssume each process owns >1 GPUs, and the sender process and receiver process has equal nubmer of GPUs.
- Parameters
tensor – the tensor to send, located on a GPU.
dst_rank – the rank of the destination process.
dst_gpu_index – the destination gpu index.
group_name – the name of the collective group.
n_elements – if specified, send the next n elements from the starting address of tensor.
- Returns
None
- ray.util.collective.collective.recv(tensor, src_rank: int, group_name: str = 'default')[source]#
Receive a tensor from a remote process synchronously.
- Parameters
tensor – the received tensor.
src_rank – the rank of the source process.
group_name – the name of the collective group.
- Returns
None
- ray.util.collective.collective.recv_multigpu(tensor, src_rank: int, src_gpu_index: int, group_name: str = 'default', n_elements: int = 0)[source]#
Receive a tensor from a remote GPU synchronously.
The function asssume each process owns >1 GPUs, and the sender process and receiver process has equal nubmer of GPUs.
- Parameters
tensor – the received tensor, located on a GPU.
src_rank – the rank of the source process.
src_gpu_index (int) –
group_name – the name of the collective group.
- Returns
None