使用 PyTorch
Contents
使用 PyTorch#
Ray Data 与 PyTorch 生态系统集成。
本指南介绍了如何:
迭代 Torch 张量进行训练#
要迭代 Torch 格式的批量数据,请调用 Dataset.iter_torch_batches()。每个批次表示为 Dict[str, torch.Tensor],数据集中每列一个张量。
这对于使用数据集中的批次训练 Torch 模型非常有用。有关提供 collate_fn 进行自定义转换的配置详细信息,请参阅 API 参考。
import ray
import torch
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
for batch in ds.iter_torch_batches(batch_size=2):
print(batch)
{'image': tensor([[[[...]]]], dtype=torch.uint8)}
...
{'image': tensor([[[[...]]]], dtype=torch.uint8)}
与 Ray Train 集成#
Ray Data 与 Ray Train 集成,可轻松摄取数据以进行数据并行训练,并支持 PyTorch、PyTorch Lightning 或 Hugging Face 训练。
import torch
from torch import nn
import ray
from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
def train_func(config):
model = nn.Sequential(nn.Linear(30, 1), nn.Sigmoid())
loss_fn = torch.nn.BCELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
# Datasets can be accessed in your train_func via ``get_dataset_shard``.
train_data_shard = train.get_dataset_shard("train")
for epoch_idx in range(2):
for batch in train_data_shard.iter_torch_batches(batch_size=128, dtypes=torch.float32):
features = torch.stack([batch[col_name] for col_name in batch.keys() if col_name != "target"], axis=1)
predictions = model(features)
train_loss = loss_fn(predictions, batch["target"].unsqueeze(1))
train_loss.backward()
optimizer.step()
train_dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
trainer = TorchTrainer(
train_func,
datasets={"train": train_dataset},
scaling_config=ScalingConfig(num_workers=2)
)
trainer.fit()
有关更多详细信息,请参阅 Ray 训练用户指南。
使用 Torch 张量进行变换#
使用 map 或 map_batches 变换,可以返回 Torch 张量。
Caution
在底层,Ray Data 自动将 Torch 张量转换为 NumPy 数组。后续转换接受 NumPy 数组作为输入,而不是 Torch 张量。
from typing import Dict
import numpy as np
import torch
import ray
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
def convert_to_torch(row: Dict[str, np.ndarray]) -> Dict[str, torch.Tensor]:
return {"tensor": torch.as_tensor(row["image"])}
# The tensor gets converted into a Numpy array under the hood
transformed_ds = ds.map(convert_to_torch)
print(transformed_ds.schema())
# Subsequent transformations take in Numpy array as input.
def check_numpy(row: Dict[str, np.ndarray]):
assert isinstance(row["tensor"], np.ndarray)
return row
transformed_ds.map(check_numpy).take_all()
Column Type
------ ----
tensor numpy.ndarray(shape=(32, 32, 3), dtype=uint8)
from typing import Dict
import numpy as np
import torch
import ray
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
def convert_to_torch(batch: Dict[str, np.ndarray]) -> Dict[str, torch.Tensor]:
return {"tensor": torch.as_tensor(batch["image"])}
# The tensor gets converted into a Numpy array under the hood
transformed_ds = ds.map_batches(convert_to_torch, batch_size=2)
print(transformed_ds.schema())
# Subsequent transformations take in Numpy array as input.
def check_numpy(batch: Dict[str, np.ndarray]):
assert isinstance(batch["tensor"], np.ndarray)
return batch
transformed_ds.map_batches(check_numpy, batch_size=2).take_all()
Column Type
------ ----
tensor numpy.ndarray(shape=(32, 32, 3), dtype=uint8)
有关转换数据的更多信息,请参阅 转换数据。
内置 PyTorch 转换#
你可以使用 Ray Data 转换内置的 PyTorch 转换如 torchvision, torchtext 和 torchaudio 。
from typing import Dict
import numpy as np
import torch
from torchvision import transforms
import ray
# Create the Dataset.
ds = ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
# Define the torchvision transform.
transform = transforms.Compose(
[
transforms.ToTensor(),
transforms.CenterCrop(10)
]
)
# Define the map function
def transform_image(row: Dict[str, np.ndarray]) -> Dict[str, torch.Tensor]:
row["transformed_image"] = transform(row["image"])
return row
# Apply the transform over the dataset.
transformed_ds = ds.map(transform_image)
print(transformed_ds.schema())
Column Type
------ ----
image numpy.ndarray(shape=(32, 32, 3), dtype=uint8)
transformed_image numpy.ndarray(shape=(3, 10, 10), dtype=float)
from typing import Dict, List
import numpy as np
from torchtext import transforms
import ray
# Create the Dataset.
ds = ray.data.read_text("s3://anonymous@ray-example-data/simple.txt")
# Define the torchtext transform.
VOCAB_FILE = "https://huggingface.co/bert-base-uncased/resolve/main/vocab.txt"
transform = transforms.BERTTokenizer(vocab_path=VOCAB_FILE, do_lower_case=True, return_tokens=True)
# Define the map_batches function.
def tokenize_text(batch: Dict[str, np.ndarray]) -> Dict[str, List[str]]:
batch["tokenized_text"] = transform(list(batch["text"]))
return batch
# Apply the transform over the dataset.
transformed_ds = ds.map_batches(tokenize_text, batch_size=2)
print(transformed_ds.schema())
Column Type
------ ----
text <class 'object'>
tokenized_text <class 'object'>
使用 PyTorch 进行批量推理#
借助 Ray 数据集,您可以通过将预先训练的模型映射到数据上,使用 Torch 模型进行可扩展的离线批量推理。
from typing import Dict
import numpy as np
import torch
import torch.nn as nn
import ray
# Step 1: Create a Ray Dataset from in-memory Numpy arrays.
# You can also create a Ray Dataset from many other sources and file
# formats.
ds = ray.data.from_numpy(np.ones((1, 100)))
# Step 2: Define a Predictor class for inference.
# Use a class to initialize the model just once in `__init__`
# and re-use it for inference across multiple batches.
class TorchPredictor:
def __init__(self):
# Load a dummy neural network.
# Set `self.model` to your pre-trained PyTorch model.
self.model = nn.Sequential(
nn.Linear(in_features=100, out_features=1),
nn.Sigmoid(),
)
self.model.eval()
# Logic for inference on 1 batch of data.
def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
tensor = torch.as_tensor(batch["data"], dtype=torch.float32)
with torch.inference_mode():
# Get the predictions from the input batch.
return {"output": self.model(tensor).numpy()}
# Use 2 parallel actors for inference. Each actor predicts on a
# different partition of data.
scale = ray.data.ActorPoolStrategy(size=2)
# Step 3: Map the Predictor over the Dataset to get predictions.
predictions = ds.map_batches(TorchPredictor, compute=scale)
# Step 4: Show one prediction output.
predictions.show(limit=1)
{'output': array([0.5590901], dtype=float32)}
更多信息,请参阅 批量推理用户指南。
保存包含 Torch 张量的数据集#
包含 Torch 张量的数据集可以保存到文件中,例如 parquet 或 NumPy。
更多信息,请参阅 保存数据。
Caution
Torch 张量在 GPU 设备上无法序列化并写入磁盘。在保存数据之前将张量转换为 CPU( tensor.to("cpu"))。
import torch
import ray
tensor = torch.Tensor(1)
ds = ray.data.from_items([{"tensor": tensor}])
ds.write_parquet("local:///tmp/tensor")
import torch
import ray
tensor = torch.Tensor(1)
ds = ray.data.from_items([{"tensor": tensor}])
ds.write_numpy("local:///tmp/tensor", column="tensor")
从 PyTorch 数据集和 DataLoaders 迁移#
如果您正在使用 PyTorch 数据集和 DataLoader,您可以迁移到 Ray Data 以处理分布式数据集。
PyTorch 数据集被 Dataset 抽象替换,PyTorch DataLoader 被 Dataset.iter_torch_batches() 替换。
内置 PyTorch 数据集#
如果你使用的是内置的 PyTorch 数据集,例如来自 torchvision,可以使用 from_torch() API 将其转换为 Ray 数据集。
Caution
from_torch() 需要 PyTorch 数据集适合内存。仅将其用于小型内置数据集以进行原型设计或测试。
import torchvision
import ray
mnist = torchvision.datasets.MNIST(root="/tmp/", download=True)
ds = ray.data.from_torch(mnist)
# The data for each item of the Torch dataset is under the "item" key.
print(ds.schema())
Column Type
------ ----
item <class 'object'>
自定义 PyTorch 数据集#
如果您有自定义 PyTorch 数据集,则可以通过 __getitem__ 替换 Ray 数据读取和准换操作逻辑来迁移到 Ray Data。
任何从云存储和磁盘读取数据的逻辑都可以通过 Ray Data read_* API 之一替换,任何转换逻辑都可以作为 Dataset 上的 map 调用应用。
以下示例显示了一个自定义 PyTorch 数据集,以及使用 Ray Data 的类似情况。
Note
与 PyTorch 地图样式数据集不同, Ray 数据集不可索引。
import tempfile
import boto3
from botocore import UNSIGNED
from botocore.config import Config
from torchvision import transforms
from torch.utils.data import Dataset
from PIL import Image
class ImageDataset(Dataset):
def __init__(self, bucket_name: str, dir_path: str):
self.s3 = boto3.resource("s3", config=Config(signature_version=UNSIGNED))
self.bucket = self.s3.Bucket(bucket_name)
self.files = [obj.key for obj in self.bucket.objects.filter(Prefix=dir_path)]
self.transform = transforms.Compose([
transforms.ToTensor(),
transforms.Resize((128, 128)),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
def __len__(self):
return len(self.files)
def __getitem__(self, idx):
img_name = self.files[idx]
# Infer the label from the file name.
last_slash_idx = img_name.rfind("/")
dot_idx = img_name.rfind(".")
label = int(img_name[last_slash_idx+1:dot_idx])
# Download the S3 file locally.
obj = self.bucket.Object(img_name)
tmp = tempfile.NamedTemporaryFile()
tmp_name = "{}.jpg".format(tmp.name)
with open(tmp_name, "wb") as f:
obj.download_fileobj(f)
f.flush()
f.close()
image = Image.open(tmp_name)
# Preprocess the image.
image = self.transform(image)
return image, label
dataset = ImageDataset(bucket_name="ray-example-data", dir_path="batoidea/JPEGImages/")
import torchvision
import ray
ds = ray.data.read_images("s3://anonymous@ray-example-data/batoidea/JPEGImages", include_paths=True)
# Extract the label from the file path.
def extract_label(row: dict):
filepath = row["path"]
last_slash_idx = filepath.rfind("/")
dot_idx = filepath.rfind('.')
label = int(filepath[last_slash_idx+1:dot_idx])
row["label"] = label
return row
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Resize((128, 128)),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
# Preprocess the images.
def transform_image(row: dict):
row["transformed_image"] = transform(row["image"])
return row
# Map the transformations over the dataset.
ds = ds.map(extract_label).map(transform_image)
PyTorch 数据加载器#
数据加载器可以通过调用 Dataset.iter_torch_batches() 来批次迭代数据集。
以下表格描述了 PyTorch DataLoader 的参数如何映射到 Ray Data。请注意,行为可能不一定相同。有关确切的语义和用法,请 参见 API 参考。
PyTorch DataLoader arguments |
Ray Data API |
|---|---|
|
|
|
|
|
|
|
Not supported. Can be manually implemented after iterating through the dataset with |
|
Not supported. Can be manually implemented after iterating through the dataset with |
|
|
|
Use |
|
Use |
|
Pass in |