模式:使用 Actor 同步其他任务和 actor
Contents
模式:使用 Actor 同步其他任务和 actor#
当您有多个任务需要等待某种条件或需要在集群上的任务和 actor 之间进行同步时,您可以使用中央 actor 来协调它们。
用例#
您可以使用 asyncio.Event actor 来实现多个任务可以等待的分布式。
代码#
import asyncio
import ray
# We set num_cpus to zero because this actor will mostly just block on I/O.
@ray.remote(num_cpus=0)
class SignalActor:
def __init__(self):
self.ready_event = asyncio.Event()
def send(self, clear=False):
self.ready_event.set()
if clear:
self.ready_event.clear()
async def wait(self, should_wait=True):
if should_wait:
await self.ready_event.wait()
@ray.remote
def wait_and_go(signal):
ray.get(signal.wait.remote())
print("go!")
signal = SignalActor.remote()
tasks = [wait_and_go.remote(signal) for _ in range(4)]
print("ready...")
# Tasks will all be waiting for the signals.
print("set..")
ray.get(signal.send.remote())
# Tasks are unblocked.
ray.get(tasks)
# Output is:
# ready...
# set..
# (wait_and_go pid=77366) go!
# (wait_and_go pid=77372) go!
# (wait_and_go pid=77367) go!
# (wait_and_go pid=77358) go!