数据转换#

转换允许您处理和修改数据集。您可以组合转换来表达计算链。

Note

默认情况下,转换是惰性的。在您通过 迭代 Dataset保存 Dataset、或 检查 Dataset 属性 来触发数据消耗之前,它们不会被执行。

本指南向您展示如何:

转换行#

要转换行,请调用 map()flat_map()

使用 map 转换行#

如果您的转换为每个输入行返回一行,请调用 map()

import os
from typing import Any, Dict
import ray

def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
    row["filename"] = os.path.basename(row["path"])
    return row

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple", include_paths=True)
    .map(parse_filename)
)

Tip

如果您的转换是矢量化的,则需要 map_batches() 获取更好的性能。 要了解更多信息,请参阅 批量转换

用 flat map 转换行#

如果您的转换为每个输入行返回多行,请调用 flat_map().

from typing import Any, Dict, List
import ray

def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
    return [row] * 2

print(
    ray.data.range(3)
    .flat_map(duplicate_row)
    .take_all()
)
[{'id': 0}, {'id': 0}, {'id': 1}, {'id': 1}, {'id': 2}, {'id': 2}]

批量转换#

如果您的转换像大多数 NumPy 或 pandas 操作一样进行矢量化,则批量转换的性能比转换行的性能更高。

在任务和 actor 之间进行选择#

Ray Data 使用任务或 actor 进行批量转换。 Actor 只执行一次设置。相反, 任务需要每批进行设置。因此,如果您的转换涉及昂贵的设置(例如下载模型权重), 请使用 actor。否则,请使用任务。

要了解有关任务和 actor 的更多信息,请阅读 Ray 核心关键概念

使用任务批量转换#

要使用任务批量转换,调用 map_batches()。 Ray Data 默认使用的是任务。

from typing import Dict
import numpy as np
import ray

def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    batch["image"] = np.clip(batch["image"] + 4, 0, 255)
    return batch

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    .map_batches(increase_brightness)
)

使用 actor 批量转换#

要使用 actor 批量转换,请完成以下步骤:

  1. 实现一个类。 __init__ 进行设置,在 __call__ 转换数据。

  2. 创建 ActorPoolStrategy 并配置并发 worker 的数量。 每个 worker 都会转换一个数据分区。

  3. 调用 map_batches() 并传递 ActorPoolStrategycompute

from typing import Dict
import numpy as np
import torch
import ray

class TorchPredictor:

    def __init__(self):
        self.model = torch.nn.Identity()
        self.model.eval()

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        inputs = torch.as_tensor(batch["data"], dtype=torch.float32)
        with torch.inference_mode():
            batch["output"] = self.model(inputs).detach().numpy()
        return batch

ds = (
    ray.data.from_numpy(np.ones((32, 100)))
    .map_batches(TorchPredictor, compute=ray.data.ActorPoolStrategy(size=2))
)
from typing import Dict
import numpy as np
import torch
import ray

class TorchPredictor:

    def __init__(self):
        self.model = torch.nn.Identity().cuda()
        self.model.eval()

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        inputs = torch.as_tensor(batch["data"], dtype=torch.float32).cuda()
        with torch.inference_mode():
            batch["output"] = self.model(inputs).detach().cpu().numpy()
        return batch

ds = (
    ray.data.from_numpy(np.ones((32, 100)))
    .map_batches(
        TorchPredictor,
        # Two workers with one GPU each
        compute=ray.data.ActorPoolStrategy(size=2),
        # Batch size is required if you're using GPUs.
        batch_size=4,
        num_gpus=1
    )
)

配置批处理格式#

Ray Data 将批次表示为 NumPy ndarray 或 pandas DataFrame 的字典。默认情况下,Ray Data 将批次表示为 NumPy ndarray 的字典。

要配置批处理类型,请在 batch_format 中指定 map_batches()。您可以从函数中返回任一格式。

from typing import Dict
import numpy as np
import ray

def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    batch["image"] = np.clip(batch["image"] + 4, 0, 255)
    return batch

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    .map_batches(increase_brightness, batch_format="numpy")
)
import pandas as pd
import ray

def drop_nas(batch: pd.DataFrame) -> pd.DataFrame:
    return batch.dropna()

ds = (
    ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
    .map_batches(drop_nas, batch_format="pandas")
)

配置批量大小#

增加 batch_size 可以提高 NumPy 函数和模型推理等矢量化转换的性能。 但是,如果您的批处理大小太大,您的程序可能会耗尽内存。 如果遇到内存不足错误,请减少 batch_size

Note

默认批量大小取决于您的资源类型。如果您使用的是 CPU, 则默认批处理大小为 4096。 如果您使用的是 GPU,则必须指定显式批处理大小。

Groupby 和转换组#

要转换组,请调用 groupby() 进行分组 。然后,调用 map_groups() 来转换组。

from typing import Dict
import numpy as np
import ray

items = [
    {"image": np.zeros((32, 32, 3)), "label": label}
    for _ in range(10) for label in range(100)
]

def normalize_images(group: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    group["image"] = (group["image"] - group["image"].mean()) / group["image"].std()
    return group

ds = (
    ray.data.from_items(items)
    .groupby("label")
    .map_groups(normalize_images)
)
import pandas as pd
import ray

def normalize_features(group: pd.DataFrame) -> pd.DataFrame:
    target = group.drop("target")
    group = (group - group.min()) / group.std()
    group["target"] = target
    return group

ds = (
    ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
    .groupby("target")
    .map_groups(normalize_features)
)

打乱行#

要随机打乱所有行,请调用 random_shuffle()

import ray

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    .random_shuffle()
)

Tip

random_shuffle() 很慢。为了获得更好的性能,请尝试 Iterating over batches with shuffling

重新分区数据#

Dataset 对一系列分布式数据 blocks 进行操作。如果您想实现更细粒度的并行化, 请通过设置更高的 parallelism 读取时间来增加块数。

要更改现有数据集的块数,请调用 Dataset.repartition()

import ray

ds = ray.data.range(10000, parallelism=1000)

# Repartition the data into 100 blocks. Since shuffle=False, Ray Data will minimize
# data movement during this operation by merging adjacent blocks.
ds = ds.repartition(100, shuffle=False).materialize()

# Repartition the data into 200 blocks, and force a full data shuffle.
# This operation will be more expensive
ds = ds.repartition(200, shuffle=True).materialize()