模式:使用嵌套任务实现嵌套并行#

在此模式下,远程任务可以动态调用其他远程任务(包括其自身)以实现嵌套并行。 当子任务可以并行化时,这很有用。

但请记住,嵌套任务有其自身的成本:额外的工作进程、调度开销、bookkeeping 开销等。 要通过嵌套并行实现加速,请确保每个嵌套任务都执行大量工作。有关更多详细信息,请参阅 反模式:过度并行化和太细力度的任务会损害性能 会损害加速。

用例#

您想要对一大串数字进行快速排序。 通过使用嵌套任务,我们可以以分布式和并行的方式对列表进行排序。

../../_images/tree-of-tasks.svg

Tree of tasks#

代码#

import ray
import time
from numpy import random


def partition(collection):
    # Use the last element as the pivot
    pivot = collection.pop()
    greater, lesser = [], []
    for element in collection:
        if element > pivot:
            greater.append(element)
        else:
            lesser.append(element)
    return lesser, pivot, greater


def quick_sort(collection):
    if len(collection) <= 200000:  # magic number
        return sorted(collection)
    else:
        lesser, pivot, greater = partition(collection)
        lesser = quick_sort(lesser)
        greater = quick_sort(greater)
    return lesser + [pivot] + greater


@ray.remote
def quick_sort_distributed(collection):
    # Tiny tasks are an antipattern.
    # Thus, in our example we have a "magic number" to
    # toggle when distributed recursion should be used vs
    # when the sorting should be done in place. The rule
    # of thumb is that the duration of an individual task
    # should be at least 1 second.
    if len(collection) <= 200000:  # magic number
        return sorted(collection)
    else:
        lesser, pivot, greater = partition(collection)
        lesser = quick_sort_distributed.remote(lesser)
        greater = quick_sort_distributed.remote(greater)
        return ray.get(lesser) + [pivot] + ray.get(greater)


for size in [200000, 4000000, 8000000]:
    print(f"Array size: {size}")
    unsorted = random.randint(1000000, size=(size)).tolist()
    s = time.time()
    quick_sort(unsorted)
    print(f"Sequential execution: {(time.time() - s):.3f}")
    s = time.time()
    ray.get(quick_sort_distributed.remote(unsorted))
    print(f"Distributed execution: {(time.time() - s):.3f}")
    print("--" * 10)

# Outputs:

# Array size: 200000
# Sequential execution: 0.040
# Distributed execution: 0.152
# --------------------
# Array size: 4000000
# Sequential execution: 6.161
# Distributed execution: 5.779
# --------------------
# Array size: 8000000
# Sequential execution: 15.459
# Distributed execution: 11.282
# --------------------

我们在两个 quick_sort_distributed 函数调用都发生后调用 ray.get()。 这允许您最大化工作负载中的并行性。 有关更多详细信息,请参阅 反模式:循环中调用 ray.get 会损害并行性

请注意,上面的执行时间表明,任务越小,非分布式版本的速度越快。 但是,随着任务执行时间的增加(即由于要排序的列表越大),分布式版本的速度就越快。