模式:使用流水线增加吞吐量
Contents
模式:使用流水线增加吞吐量#
如果您有多个工作项,且每个工作项都需要多个步骤才能完成, 则可以使用 pipelining 技术来提高集群利用率并增加系统的吞吐量。
Note
流水线是提高性能的重要技术,被 Ray 库广泛使用。 请参阅 Ray Data 作为示例。
用例#
应用程序的组件需要同时执行计算密集型工作并与其他进程通信。 理想情况下,您希望将计算和通信重叠以使 CPU 饱和并提高整体吞吐量。
代码#
import ray
@ray.remote
class WorkQueue:
def __init__(self):
self.queue = list(range(10))
def get_work_item(self):
if self.queue:
return self.queue.pop(0)
else:
return None
@ray.remote
class WorkerWithoutPipelining:
def __init__(self, work_queue):
self.work_queue = work_queue
def process(self, work_item):
print(work_item)
def run(self):
while True:
# Get work from the remote queue.
work_item = ray.get(self.work_queue.get_work_item.remote())
if work_item is None:
break
# Do work.
self.process(work_item)
@ray.remote
class WorkerWithPipelining:
def __init__(self, work_queue):
self.work_queue = work_queue
def process(self, work_item):
print(work_item)
def run(self):
self.work_item_ref = self.work_queue.get_work_item.remote()
while True:
# Get work from the remote queue.
work_item = ray.get(self.work_item_ref)
if work_item is None:
break
self.work_item_ref = self.work_queue.get_work_item.remote()
# Do work while we are fetching the next work item.
self.process(work_item)
work_queue = WorkQueue.remote()
worker_without_pipelining = WorkerWithoutPipelining.remote(work_queue)
ray.get(worker_without_pipelining.run.remote())
work_queue = WorkQueue.remote()
worker_with_pipelining = WorkerWithPipelining.remote(work_queue)
ray.get(worker_with_pipelining.run.remote())
在上面的例子中,一个工作 Actor 从队列中取出工作,然后对其进行一些计算。
如果没有流水线,我们在请求工作项后立即使用 ray.get(),这会在 RPC 调用期间阻塞,导致 CPU 空闲。
使用流水线,我们会在处理当前工作项之前先请求下一个工作项,因此我们可以在 RPC 进行时使用 CPU,从而增加 CPU 利用率。