模式:使用 asyncio 并发运行 actor 方法
Contents
模式:使用 asyncio 并发运行 actor 方法#
默认情况下,Ray actor 在单个线程中运行,Actor 方法调用按顺序执行。
这意味着长时间运行的方法调用会阻止所有后续方法调用。
在此模式中,我们使用 await 从长时间运行的方法调用中让出控制权,以便其他方法调用可以同时运行。
通常,当方法执行 IO 操作时,会让出控制权,但您也可以使用 await asyncio.sleep(0) 显式让出控制权。
Note
您还可以使用 threaded actors 来实现并发。
用例#
您有一个采用长轮询方法的 Actor,该方法会不断从远程存储中获取任务并执行它们。 您还想查询长轮询方法运行时执行的任务数。
使用默认 actor 时,代码将如下所示:
import ray
@ray.remote
class TaskStore:
def get_next_task(self):
return "task"
@ray.remote
class TaskExecutor:
def __init__(self, task_store):
self.task_store = task_store
self.num_executed_tasks = 0
def run(self):
while True:
task = ray.get(task_store.get_next_task.remote())
self._execute_task(task)
def _execute_task(self, task):
# Executing the task
self.num_executed_tasks = self.num_executed_tasks + 1
def get_num_executed_tasks(self):
return self.num_executed_tasks
task_store = TaskStore.remote()
task_executor = TaskExecutor.remote(task_store)
task_executor.run.remote()
try:
# This will timeout since task_executor.run occupies the entire actor thread
# and get_num_executed_tasks cannot run.
ray.get(task_executor.get_num_executed_tasks.remote(), timeout=5)
except ray.exceptions.GetTimeoutError:
print("get_num_executed_tasks didn't finish in 5 seconds")
这是有问题的,因为 TaskExecutor.run 方法会永远运行,并且永远不会交出控制权来运行其他方法。
以通过使用 async actors 并用 await 交出控制权来解决这个问题:
@ray.remote
class AsyncTaskExecutor:
def __init__(self, task_store):
self.task_store = task_store
self.num_executed_tasks = 0
async def run(self):
while True:
# Here we use await instead of ray.get() to
# wait for the next task and it will yield
# the control while waiting.
task = await task_store.get_next_task.remote()
self._execute_task(task)
def _execute_task(self, task):
# Executing the task
self.num_executed_tasks = self.num_executed_tasks + 1
def get_num_executed_tasks(self):
return self.num_executed_tasks
async_task_executor = AsyncTaskExecutor.remote(task_store)
async_task_executor.run.remote()
# We are able to run get_num_executed_tasks while run method is running.
num_executed_tasks = ray.get(async_task_executor.get_num_executed_tasks.remote())
print(f"num of executed tasks so far: {num_executed_tasks}")
这里,我们不是使用阻塞的 ray.get() <ray.get>`来获取 ObjectRef 的值,我们使用 ``await`() 来在等待对象被获取时产生控制权。