使用 Ray DAG API 的惰性计算图#

使用 ray.remote,您可以在运行时远程执行计算。 对于 ray.remote 装饰的类或函数, 您还可以在函数体上使用 .bind 来构建静态计算图。

Note

Ray DAG 旨在成为面向开发人员的 API,其中推荐的用例是

  1. 在本地迭代并测试由更高级别的库编写的应用程序。

  2. 在 Ray DAG API 之上构建库。

.bind() 被调用在一个 ray.remote 装饰的类或函数上时,它将生成一个中间表示(IR)节点, 该节点作为 DAG 的骨干和构建块,静态地保持计算图在一起,其中每个 IR 节点在执行时按照拓扑顺序解析为值。

IR 节点也可以赋值给一个变量,并作为参数传递给其他节点。

带有函数的 Ray DAG#

IR 节点生成的 .bind()ray.remote 装饰的函数上,将在执行时作为 Ray 任务执行,该任务将被解析为任务输出。

此示例展示了如何构建一个函数链,其中每个节点都可以作为根节点执行, 同时迭代,或者作为其他函数的输入参数或关键字参数,以形成更复杂的 DAG。

任何 IR 节点都可以直接执行 dag_node.execute(),作为 DAG 的根节点, 其中所有其他从根节点不可达的节点将被忽略。

import ray

ray.init()

@ray.remote
def func(src, inc=1):
    return src + inc

a_ref = func.bind(1, inc=2)
assert ray.get(a_ref.execute()) == 3 # 1 + 2 = 3
b_ref = func.bind(a_ref, inc=3)
assert ray.get(b_ref.execute()) == 6 # (1 + 2) + 3 = 6
c_ref = func.bind(b_ref, inc=a_ref)
assert ray.get(c_ref.execute()) == 9 # ((1 + 2) + 3) + (1 + 2) = 9

具有类和类方法的 Ray DAG#

IR 节点生成的 .bind()ray.remote 装饰的类上,将在执行时作为 Ray Actor 执行。 Actor 将在每次执行节点时实例化,而 classmethod 调用可以形成特定于父 Actor 实例的函数调用链。

从函数、类或类方法生成的 DAG IR 节点可以组合在一起形成 DAG。

import ray

ray.init()

@ray.remote
class Actor:
    def __init__(self, init_value):
        self.i = init_value

    def inc(self, x):
        self.i += x

    def get(self):
        return self.i

a1 = Actor.bind(10)  # Instantiate Actor with init_value 10.
val = a1.get.bind()  # ClassMethod that returns value from get() from
                     # the actor created.
assert ray.get(val.execute()) == 10

@ray.remote
def combine(x, y):
    return x + y

a2 = Actor.bind(10) # Instantiate another Actor with init_value 10.
a1.inc.bind(2)  # Call inc() on the actor created with increment of 2.
a1.inc.bind(4)  # Call inc() on the actor created with increment of 4.
a2.inc.bind(6)  # Call inc() on the actor created with increment of 6.

# Combine outputs from a1.get() and a2.get()
dag = combine.bind(a1.get.bind(), a2.get.bind())

# a1 +  a2 + inc(2) + inc(4) + inc(6)
# 10 + (10 + ( 2   +    4    +   6)) = 32
assert ray.get(dag.execute()) == 32

带有自定义 InputNode 的 Ray DAG#

InputNode 是 DAG 的单例节点,表示运行时的用户输入值。 它应该在没有参数的上下文管理器中使用,并作为 dag_node.execute() 的参数调用。

import ray

ray.init()

from ray.dag.input_node import InputNode

@ray.remote
def a(user_input):
    return user_input * 2

@ray.remote
def b(user_input):
    return user_input + 1

@ray.remote
def c(x, y):
    return x + y

with InputNode() as dag_input:
    a_ref = a.bind(dag_input)
    b_ref = b.bind(dag_input)
    dag = c.bind(a_ref, b_ref)

#   a(2)  +   b(2)  = c
# (2 * 2) + (2 * 1)
assert ray.get(dag.execute(2)) == 7

#   a(3)  +   b(3)  = c
# (3 * 2) + (3 * 1)
assert ray.get(dag.execute(3)) == 10

更多资源#

您可以在以下资源中找到更多基于 Ray DAG API 并使用相同机制构建的其他 Ray 库的应用模式和示例。