容错
Contents
容错#
Ray 是一个分布式系统,这意味着故障可能会发生。通常,故障可以分为两类:1)应用程序级故障,2)系统级故障。 前者可能是由于用户级代码中的错误或外部系统失败引起的。 后者可能是由节点故障、网络故障或 Ray 中的错误引起的。 在这里,我们描述了 Ray 提供的机制,允许应用程序从故障中恢复。
要处理应用程序级故障,Ray 提供了机制来捕获错误、重试失败的代码和处理行为不当的代码。 有关这些机制的更多信息,请参见 task 和 actor 容错性。
Ray 同样提供了一些机制来自动从内部系统级故障中恢复,例如 节点故障。 特别是,Ray 可以自动从 分布式对象存储 中的一些故障中恢复。
如何编写容错 Ray 应用程序#
有几条建议可以使 Ray 应用程序具有容错性:
首先,如果 Ray 提供的容错机制不适用于您, 您可以捕获由故障引起的 异常 并手动恢复。
@ray.remote
class Actor:
def read_only(self):
import sys
import random
rand = random.random()
if rand < 0.2:
return 2 / 0
elif rand < 0.3:
sys.exit(1)
return 2
actor = Actor.remote()
# Manually retry the actor task.
while True:
try:
print(ray.get(actor.read_only.remote()))
break
except ZeroDivisionError:
pass
except ray.exceptions.RayActorError:
# Manually restart the actor
actor = Actor.remote()
其次,避免让 ObjectRef 超出其 所有者 任务或 actor 的生命周期
(任务或 actor 通过调用 ray.put() 或 foo.remote() 创建初始 ObjectRef 的)。
只要仍有对对象的引用,对象的所有者 worker 就会在相应task 或 actor完成后继续运行。
如果对象的所有者 worker 失败,Ray 无法自动为尝试访问对象的用户 <fault-tolerance-ownership>`恢复对象。
从任务返回由 ``ray.put()` 创建的 ObjectRef 是创建这种超出生命周期的对象的一个例子:
import ray
# Non-fault tolerant version:
@ray.remote
def a():
x_ref = ray.put(1)
return x_ref
x_ref = ray.get(a.remote())
# Object x outlives its owner task A.
try:
# If owner of x (i.e. the worker process running task A) dies,
# the application can no longer get value of x.
print(ray.get(x_ref))
except ray.exceptions.OwnerDiedError:
pass
上例中,对象 x 超出了其所有者任务 a 的生命周期。
如果 worker 进程运行任务 a 失败,之后调用 ray.get 获取 x_ref 将导致 OwnerDiedError 异常。
容错版本是直接返回 x,这样它就由 driver 拥有,并且只在 driver 的生命周期内访问。
x 如果丢失,Ray 可以通过 lineage reconstruction 自动恢复。
参考 反模式:从任务返回 ray.put() ObjectRefs 会影响性能和容错性 了解更多细节。
# Fault tolerant version:
@ray.remote
def a():
# Here we return the value directly instead of calling ray.put() first.
return 1
# The owner of x is the driver
# so x is accessible and can be auto recovered
# during the entire lifetime of the driver.
x_ref = a.remote()
print(ray.get(x_ref))
第三,避免使用只能由特定节点满足的 自定义资源需求。 如果特定节点失败,正在运行的task 或 actor将无法重试。
@ray.remote
def b():
return 1
# If the node with ip 127.0.0.3 fails while task b is running,
# Ray cannot retry the task on other nodes.
b.options(resources={"node:127.0.0.3": 1}).remote()
如果你倾向于在特定节点上运行任务,你可以使用 NodeAffinitySchedulingStrategy。
它允许你将亲和性作为软约束来指定,因此即使目标节点失败,任务仍然可以在其他节点上重试。
# Prefer running on the particular node specified by node id
# but can also run on other nodes if the target node fails.
b.options(
scheduling_strategy=ray.util.scheduling_strategies.NodeAffinitySchedulingStrategy(
node_id=ray.get_runtime_context().get_node_id(), soft=True
)
).remote()