模式:使用流水线增加吞吐量#

如果您有多个工作项,且每个工作项都需要多个步骤才能完成, 则可以使用 pipelining 技术来提高集群利用率并增加系统的吞吐量。

Note

流水线是提高性能的重要技术,被 Ray 库广泛使用。 请参阅 Ray Data 作为示例。

../../_images/pipelining.svg

用例#

应用程序的组件需要同时执行计算密集型工作并与其他进程通信。 理想情况下,您希望将计算和通信重叠以使 CPU 饱和并提高整体吞吐量。

代码#

import ray


@ray.remote
class WorkQueue:
    def __init__(self):
        self.queue = list(range(10))

    def get_work_item(self):
        if self.queue:
            return self.queue.pop(0)
        else:
            return None


@ray.remote
class WorkerWithoutPipelining:
    def __init__(self, work_queue):
        self.work_queue = work_queue

    def process(self, work_item):
        print(work_item)

    def run(self):
        while True:
            # Get work from the remote queue.
            work_item = ray.get(self.work_queue.get_work_item.remote())

            if work_item is None:
                break

            # Do work.
            self.process(work_item)


@ray.remote
class WorkerWithPipelining:
    def __init__(self, work_queue):
        self.work_queue = work_queue

    def process(self, work_item):
        print(work_item)

    def run(self):
        self.work_item_ref = self.work_queue.get_work_item.remote()

        while True:
            # Get work from the remote queue.
            work_item = ray.get(self.work_item_ref)

            if work_item is None:
                break

            self.work_item_ref = self.work_queue.get_work_item.remote()

            # Do work while we are fetching the next work item.
            self.process(work_item)


work_queue = WorkQueue.remote()
worker_without_pipelining = WorkerWithoutPipelining.remote(work_queue)
ray.get(worker_without_pipelining.run.remote())

work_queue = WorkQueue.remote()
worker_with_pipelining = WorkerWithPipelining.remote(work_queue)
ray.get(worker_with_pipelining.run.remote())

在上面的例子中,一个工作 Actor 从队列中取出工作,然后对其进行一些计算。 如果没有流水线,我们在请求工作项后立即使用 ray.get(),这会在 RPC 调用期间阻塞,导致 CPU 空闲。 使用流水线,我们会在处理当前工作项之前先请求下一个工作项,因此我们可以在 RPC 进行时使用 CPU,从而增加 CPU 利用率。