入门#

使用 Ray 扩展笔记本电脑或云上的应用程序。为您的任务选择正确的指南。

Ray AI 库快速入门#

使用单独的库来处理 ML 工作负载。单击下面适合您的工作负载的下拉菜单。

ray 数据:用于 ML 的可扩展数据集

使用 Ray Data 扩展离线推理和训练摄取。– 一个专为 ML 设置的数据处理库。

要了解更多信息,请参阅 离线批量推理用于 ML 训练的数据预处理和摄取

Note

要运行此示例,请安装 Ray Data:

pip install -U "ray[data]"
from typing import Dict
import numpy as np
import ray

# 从本地磁盘文件,Python 对象,以及 s3 云存储 创建数据集
ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

# Apply functions to transform data. Ray Data executes transformations in parallel.
def compute_area(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    length = batch["petal length (cm)"]
    width = batch["petal width (cm)"]
    batch["petal area (cm^2)"] = length * width
    return batch

transformed_ds = ds.map_batches(compute_area)

# Iterate over batches of data.
for batch in transformed_ds.iter_batches(batch_size=4):
    print(batch)

# Save dataset contents to on-disk files or cloud storage.
transformed_ds.write_parquet("local:///tmp/iris/")

了解更多关于 Ray Data 的信息

ray 训练:分布式模型训练

Ray Train 抽象化了建立分布式训练系统的复杂性。

此示例展示了如何将 Ray Train 与 PyTorch 结合使用。

要运行此示例,请安装 Ray Train 和 PyTorch 软件包:

Note

pip install -U "ray[train]" torch torchvision

设置您的数据集和模型。

import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

def get_dataset():
    return datasets.FashionMNIST(
        root="/tmp/data",
        train=True,
        download=True,
        transform=ToTensor(),
    )

class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(28 * 28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10),
        )

    def forward(self, inputs):
        inputs = self.flatten(inputs)
        logits = self.linear_relu_stack(inputs)
        return logits

现在定义您的 single-worker PyTorch 训练函数。

def train_func():
    num_epochs = 3
    batch_size = 64

    dataset = get_dataset()
    dataloader = DataLoader(dataset, batch_size=batch_size)

    model = NeuralNetwork()

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    for epoch in range(num_epochs):
        for inputs, labels in dataloader:
            optimizer.zero_grad()
            pred = model(inputs)
            loss = criterion(pred, labels)
            loss.backward()
            optimizer.step()
        print(f"epoch: {epoch}, loss: {loss.item()}")

该训练函数可以通过以下方式执行:

train_func()

将其转换为分布式 multi-worker 训练函数。

使用 ray.train.torch.prepare_modelray.train.torch.prepare_data_loader 实用函数设置模型和数据以进行分布式训练。 这会自动包装模型并将 DistributedDataParallel 其放置在正确的设备上, 然后添加 DistributedSamplerDataLoaders 中。

from ray import train

def train_func_distributed():
    num_epochs = 3
    batch_size = 64

    dataset = get_dataset()
    dataloader = DataLoader(dataset, batch_size=batch_size)
    dataloader = train.torch.prepare_data_loader(dataloader)

    model = NeuralNetwork()
    model = train.torch.prepare_model(model)

    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

    for epoch in range(num_epochs):
        for inputs, labels in dataloader:
            optimizer.zero_grad()
            pred = model(inputs)
            loss = criterion(pred, labels)
            loss.backward()
            optimizer.step()
        print(f"epoch: {epoch}, loss: {loss.item()}")

实例化一个有 4 个工作线程的 TorchTrainer, 并使用他运行新的训练方法。

from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

# For GPU Training, set `use_gpu` to True.
use_gpu = False

trainer = TorchTrainer(
    train_func_distributed,
    scaling_config=ScalingConfig(num_workers=4, use_gpu=use_gpu)
)

results = trainer.fit()

This example shows how you can use Ray Train to set up Multi-worker training with Keras.

要运行本示例需安装 Ray Train 以及 Tensorflow 包:

Note

pip install -U "ray[train]" tensorflow

设置数据集和模型。

import numpy as np
import tensorflow as tf

def mnist_dataset(batch_size):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = tf.data.Dataset.from_tensor_slices(
        (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
    return train_dataset


def build_and_compile_cnn_model():
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(28, 28)),
        tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
        tf.keras.layers.Conv2D(32, 3, activation='relu'),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
        metrics=['accuracy'])
    return model

现在定义 single-worker TensorFlow 训练方法。

def train_func():
    batch_size = 64
    single_worker_dataset = mnist_dataset(batch_size)
    single_worker_model = build_and_compile_cnn_model()
    single_worker_model.fit(single_worker_dataset, epochs=3, steps_per_epoch=70)

此训练方法可通过以下执行:

train_func()

现在将其转换成分布式 multi-worker 训练方法。

  1. 设置 global 批量大小 - each worker processes the same size batch as in the single-worker code.

  2. Choose your TensorFlow distributed training strategy. This examples uses the MultiWorkerMirroredStrategy.

import json
import os

def train_func_distributed():
    per_worker_batch_size = 64
    # This environment variable will be set by Ray Train.
    tf_config = json.loads(os.environ['TF_CONFIG'])
    num_workers = len(tf_config['cluster']['worker'])

    strategy = tf.distribute.MultiWorkerMirroredStrategy()

    global_batch_size = per_worker_batch_size * num_workers
    multi_worker_dataset = mnist_dataset(global_batch_size)

    with strategy.scope():
        # Model building/compiling need to be within `strategy.scope()`.
        multi_worker_model = build_and_compile_cnn_model()

    multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)

初始化一个 4 个工作现成的 TensorflowTrainer, 并使用它运行新的训练方法。

from ray.train.tensorflow import TensorflowTrainer
from ray.train import ScalingConfig

# For GPU Training, set `use_gpu` to True.
use_gpu = False

trainer = TensorflowTrainer(train_func_distributed, scaling_config=ScalingConfig(num_workers=4, use_gpu=use_gpu))

trainer.fit()

了解更多 Ray Train

ray Tune: Hyperparameter Tuning at Scale

Tune 是用于任何规模的超参数调优的库。 借助 Tune,您可以用不到 10 行代码启动多节点分布式超参数扫描。 Tune 支持任何深度学习框架,包括 PyTorch、TensorFlow 和 Keras。

Note

要运行此示例,请安装 Ray Tune:

pip install -U "ray[tune]"

此示例使用迭代训练函数运行小型网格搜索。

from ray import train, tune


def objective(config):  # ①
    score = config["a"] ** 2 + config["b"]
    return {"score": score}


search_space = {  # ②
    "a": tune.grid_search([0.001, 0.01, 0.1, 1.0]),
    "b": tune.choice([1, 2, 3]),
}

tuner = tune.Tuner(objective, param_space=search_space)  # ③

results = tuner.fit()
print(results.get_best_result(metric="score", mode="min").config)

如果安装了 TensorBoard, automatically visualize all trial results:

tensorboard --logdir ~/ray_results

了解更多 Ray Tune

ray 服务:可扩展模型服务

Ray Serve 是一个基于 Ray 构建的可扩展模型服务库。

Note

运行此示例,需安装 Ray Serve 和 scikit-learn:

pip install -U "ray[serve]" scikit-learn

此示例运行服务于 scikit-learn 梯度增强分类器。

import requests
from starlette.requests import Request
from typing import Dict

from sklearn.datasets import load_iris
from sklearn.ensemble import GradientBoostingClassifier

from ray import serve


# Train model.
iris_dataset = load_iris()
model = GradientBoostingClassifier()
model.fit(iris_dataset["data"], iris_dataset["target"])


@serve.deployment(route_prefix="/iris")
class BoostingModel:
    def __init__(self, model):
        self.model = model
        self.label_list = iris_dataset["target_names"].tolist()

    async def __call__(self, request: Request) -> Dict:
        payload = (await request.json())["vector"]
        print(f"Received http request with data {payload}")

        prediction = self.model.predict([payload])[0]
        human_name = self.label_list[prediction]
        return {"result": human_name}


# Deploy model.
serve.run(BoostingModel.bind(model))

# Query it!
sample_request_input = {"vector": [1.2, 1.0, 1.1, 0.9]}
response = requests.get(
    "http://localhost:8000/iris", json=sample_request_input)
print(response.text)

你将看到如示结果 {"result": "versicolor"}

了解更多 Ray Serve

ray RLlib: Industry-Grade Reinforcement Learning

RLlib 是一个基于 Ray 构建的工业级强化学习 (RL) 库。 RLlib 为各种行业和研究应用程序提供高可扩展性和统一的 API。

Note

运行此示例,请安装 rllib 以及 tensorflowpytorch

pip install -U "ray[rllib]" tensorflow  # or torch
import gymnasium as gym
from ray.rllib.algorithms.ppo import PPOConfig


# Define your problem using python and Farama-Foundation's gymnasium API:
class SimpleCorridor(gym.Env):
    """Corridor in which an agent must learn to move right to reach the exit.

    ---------------------
    | S | 1 | 2 | 3 | G |   S=start; G=goal; corridor_length=5
    ---------------------

    Possible actions to chose from are: 0=left; 1=right
    Observations are floats indicating the current field index, e.g. 0.0 for
    starting position, 1.0 for the field next to the starting position, etc..
    Rewards are -0.1 for all steps, except when reaching the goal (+1.0).
    """

    def __init__(self, config):
        self.end_pos = config["corridor_length"]
        self.cur_pos = 0
        self.action_space = gym.spaces.Discrete(2)  # left and right
        self.observation_space = gym.spaces.Box(0.0, self.end_pos, shape=(1,))

    def reset(self, *, seed=None, options=None):
        """Resets the episode.

        Returns:
           Initial observation of the new episode and an info dict.
        """
        self.cur_pos = 0
        # Return initial observation.
        return [self.cur_pos], {}

    def step(self, action):
        """Takes a single step in the episode given `action`.

        Returns:
            New observation, reward, terminated-flag, truncated-flag, info-dict (empty).
        """
        # Walk left.
        if action == 0 and self.cur_pos > 0:
            self.cur_pos -= 1
        # Walk right.
        elif action == 1:
            self.cur_pos += 1
        # Set `terminated` flag when end of corridor (goal) reached.
        terminated = self.cur_pos >= self.end_pos
        truncated = False
        # +1 when goal reached, otherwise -1.
        reward = 1.0 if terminated else -0.1
        return [self.cur_pos], reward, terminated, truncated, {}


# Create an RLlib Algorithm instance from a PPOConfig object.
config = (
    PPOConfig().environment(
        # Env class to use (here: our gym.Env sub-class from above).
        env=SimpleCorridor,
        # Config dict to be passed to our custom env's constructor.
        # Use corridor with 20 fields (including S and G).
        env_config={"corridor_length": 28},
    )
    # Parallelize environment rollouts.
    .rollouts(num_rollout_workers=3)
)
# Construct the actual (PPO) algorithm object from the config.
algo = config.build()

# Train for n iterations and report results (mean episode rewards).
# Since we have to move at least 19 times in the env to reach the goal and
# each move gives us -0.1 reward (except the last move at the end: +1.0),
# we can expect to reach an optimal episode reward of -0.1*18 + 1.0 = -0.8
for i in range(5):
    results = algo.train()
    print(f"Iter: {i}; avg. reward={results['episode_reward_mean']}")

# Perform inference (action computations) based on given env observations.
# Note that we are using a slightly different env here (len 10 instead of 20),
# however, this should still work as the agent has (hopefully) learned
# to "just always walk right!"
env = SimpleCorridor({"corridor_length": 10})
# Get the initial observation (should be: [0.0] for the starting position).
obs, info = env.reset()
terminated = truncated = False
total_reward = 0.0
# Play one episode.
while not terminated and not truncated:
    # Compute a single action, given the current observation
    # from the environment.
    action = algo.compute_single_action(obs)
    # Apply the computed action in the environment.
    obs, reward, terminated, truncated, info = env.step(action)
    # Sum up rewards for reporting purposes.
    total_reward += reward
# Report results.
print(f"Played 1 episode; total-reward={total_reward}")

了解更多 Ray RLlib

Ray Core 入门#

将类和函数轻松转为 Ray 任务和 actor, 适用于 Python 和 Java,具有用于构建和运行分布式应用程序的简单原语。

ray Core: Parallelizing Functions with Ray Tasks

Note

要运行此示例,请安装 Ray Core:

pip install -U "ray"

导入 Ray 并用以下命令初始化它 ray.init()。 使用 @ray.remote 装饰方法来定义要在远程运行的函数。 最后,通过调用方法的 .remote() 来替代常规调用。 这个远程调用会产生一个 future,一个 Ray 对象引用,然后您可以使用 ray.get 获取它。

import ray
ray.init()

@ray.remote
def f(x):
    return x * x

futures = [f.remote(i) for i in range(4)]
print(ray.get(futures)) # [0, 1, 4, 9]

Note

要运行此示例,在你的项目添加 ray-apiray-runtime 依赖。

使用 Ray.init 实例化 Ray 运行时。 使用 Ray.task(...).remote() 来转换任何 Java 静态方法为 Ray 任务。 该任务在远程工作进程中异步运行。remote 方法返回一个 ObjectRef, 你可以通过 get 来获取实际的结果。

import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import java.util.ArrayList;
import java.util.List;

public class RayDemo {

    public static int square(int x) {
        return x * x;
    }

    public static void main(String[] args) {
        // Intialize Ray runtime.
        Ray.init();
        List<ObjectRef<Integer>> objectRefList = new ArrayList<>();
        // Invoke the `square` method 4 times remotely as Ray tasks.
        // The tasks will run in parallel in the background.
        for (int i = 0; i < 4; i++) {
            objectRefList.add(Ray.task(RayDemo::square, i).remote());
        }
        // Get the actual results of the tasks.
        System.out.println(Ray.get(objectRefList));  // [0, 1, 4, 9]
    }
}

在上面的代码块中,我们定义了一些 Ray 任务。 虽然这些对于无状态操作非常有用,但有时您必须维护应用程序的状态。您可以使用 Ray Actors 来做到这一点。

了解更多关于 Ray Core

ray Core: Parallelizing Classes with Ray Actors

Ray 提供了 actor 来允许您并行化 Python 或 Java 中的类实例。 当您实例化作为 Ray actor 的类时,Ray 将在集群中启动该类的远程实例。 然后,该 actor 可以执行远程方法调用并维护其自己的内部状态。

Note

安装 Ray Core 运行示例:

pip install -U "ray"
import ray
ray.init() # Only call this once.

@ray.remote
class Counter(object):
    def __init__(self):
        self.n = 0

    def increment(self):
        self.n += 1

    def read(self):
        return self.n

counters = [Counter.remote() for i in range(4)]
[c.increment.remote() for c in counters]
futures = [c.read.remote() for c in counters]
print(ray.get(futures)) # [1, 1, 1, 1]

Note

要运行示例,需在项目添加 ray-apiray-runtime 依赖。

import io.ray.api.ActorHandle;
import io.ray.api.ObjectRef;
import io.ray.api.Ray;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class RayDemo {

    public static class Counter {

        private int value = 0;

        public void increment() {
            this.value += 1;
        }

        public int read() {
            return this.value;
        }
    }

    public static void main(String[] args) {
        // Intialize Ray runtime.
        Ray.init();
        List<ActorHandle<Counter>> counters = new ArrayList<>();
        // Create 4 actors from the `Counter` class.
        // They will run in remote worker processes.
        for (int i = 0; i < 4; i++) {
            counters.add(Ray.actor(Counter::new).remote());
        }

        // Invoke the `increment` method on each actor.
        // This will send an actor task to each remote actor.
        for (ActorHandle<Counter> counter : counters) {
            counter.task(Counter::increment).remote();
        }
        // Invoke the `read` method on each actor, and print the results.
        List<ObjectRef<Integer>> objectRefList = counters.stream()
            .map(counter -> counter.task(Counter::read).remote())
            .collect(Collectors.toList());
        System.out.println(Ray.get(objectRefList));  // [1, 1, 1, 1]
    }
}

了解更多 Ray Core

Ray Cluster 入门#

在 Ray 集群部署你的应用,通常对现有代码进行最少的代码更改。

ray Clusters: Launching a Ray Cluster on AWS

Ray 程序可以在单台机器上运行,也可以无缝扩展到大型集群。 以这个等待各个节点加入集群的简单示例为例。

example.py
import sys
import time
from collections import Counter

import ray


@ray.remote
def get_host_name(x):
    import platform
    import time

    time.sleep(0.01)
    return x + (platform.node(),)


def wait_for_nodes(expected):
    # Wait for all nodes to join the cluster.
    while True:
        num_nodes = len(ray.nodes())
        if num_nodes < expected:
            print(
                "{} nodes have joined so far, waiting for {} more.".format(
                    num_nodes, expected - num_nodes
                )
            )
            sys.stdout.flush()
            time.sleep(1)
        else:
            break


def main():
    wait_for_nodes(4)

    # Check that objects can be transferred from each node to each other node.
    for i in range(10):
        print("Iteration {}".format(i))
        results = [get_host_name.remote(get_host_name.remote(())) for _ in range(100)]
        print(Counter(ray.get(results)))
        sys.stdout.flush()

    print("Success!")
    sys.stdout.flush()
    time.sleep(20)


if __name__ == "__main__":
    ray.init(address="localhost:6379")
    main()

你可以从我们的 GitHub 仓库 下载示例。 继续将其存储在本地一个名为 example.py 的文件中。

要在云端执行脚本,需要下载 配置文件, 或者从这里拷贝:

cluster.yaml
# An unique identifier for the head node and workers of this cluster.
cluster_name: default

# The maximum number of workers nodes to launch in addition to the head
# node.
max_workers: 2

# The autoscaler will scale up the cluster faster with higher upscaling speed.
# E.g., if the task requires adding more nodes then autoscaler will gradually
# scale up the cluster in chunks of upscaling_speed*currently_running_nodes.
# This number should be > 0.
upscaling_speed: 1.0

# This executes all commands on all nodes in the docker container,
# and opens all the necessary ports to support the Ray cluster.
# Empty string means disabled.
docker:
    image: "rayproject/ray-ml:latest-gpu" # You can change this to latest-cpu if you don't need GPU support and want a faster startup
    # image: rayproject/ray:latest-cpu   # use this one if you don't need ML dependencies, it's faster to pull
    container_name: "ray_container"
    # If true, pulls latest version of image. Otherwise, `docker run` will only pull the image
    # if no cached version is present.
    pull_before_run: True
    run_options:   # Extra options to pass into "docker run"
        - --ulimit nofile=65536:65536

    # Example of running a GPU head with CPU workers
    # head_image: "rayproject/ray-ml:latest-gpu"
    # Allow Ray to automatically detect GPUs

    # worker_image: "rayproject/ray-ml:latest-cpu"
    # worker_run_options: []

# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5

# Cloud-provider specific configuration.
provider:
    type: aws
    region: us-west-2
    # Availability zone(s), comma-separated, that nodes may be launched in.
    # Nodes will be launched in the first listed availability zone and will
    # be tried in the subsequent availability zones if launching fails.
    availability_zone: us-west-2a,us-west-2b
    # Whether to allow node reuse. If set to False, nodes will be terminated
    # instead of stopped.
    cache_stopped_nodes: True # If not present, the default is True.

# How Ray will authenticate with newly launched nodes.
auth:
    ssh_user: ubuntu
# By default Ray creates a new private keypair, but you can also use your own.
# If you do so, make sure to also set "KeyName" in the head and worker node
# configurations below.
#    ssh_private_key: /path/to/your/key.pem

# Tell the autoscaler the allowed node types and the resources they provide.
# The key is the name of the node type, which is just for debugging purposes.
# The node config specifies the launch config and physical instance type.
available_node_types:
    ray.head.default:
        # The node type's CPU and GPU resources are auto-detected based on AWS instance type.
        # If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler.
        # You can also set custom resources.
        # For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set
        # resources: {"CPU": 1, "GPU": 1, "custom": 5}
        resources: {}
        # Provider-specific config for this node type, e.g. instance type. By default
        # Ray will auto-configure unspecified fields such as SubnetId and KeyName.
        # For more documentation on available fields, see:
        # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
        node_config:
            InstanceType: m5.large
            # Default AMI for us-west-2.
            # Check https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/_private/aws/config.py
            # for default images for other zones.
            ImageId: ami-0387d929287ab193e
            # You can provision additional disk space with a conf as follows
            BlockDeviceMappings:
                - DeviceName: /dev/sda1
                  Ebs:
                      VolumeSize: 140
                      VolumeType: gp3
            # Additional options in the boto docs.
    ray.worker.default:
        # The minimum number of worker nodes of this type to launch.
        # This number should be >= 0.
        min_workers: 1
        # The maximum number of worker nodes of this type to launch.
        # This takes precedence over min_workers.
        max_workers: 2
        # The node type's CPU and GPU resources are auto-detected based on AWS instance type.
        # If desired, you can override the autodetected CPU and GPU resources advertised to the autoscaler.
        # You can also set custom resources.
        # For example, to mark a node type as having 1 CPU, 1 GPU, and 5 units of a resource called "custom", set
        # resources: {"CPU": 1, "GPU": 1, "custom": 5}
        resources: {}
        # Provider-specific config for this node type, e.g. instance type. By default
        # Ray will auto-configure unspecified fields such as SubnetId and KeyName.
        # For more documentation on available fields, see:
        # http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
        node_config:
            InstanceType: m5.large
            # Default AMI for us-west-2.
            # Check https://github.com/ray-project/ray/blob/master/python/ray/autoscaler/_private/aws/config.py
            # for default images for other zones.
            ImageId: ami-0387d929287ab193e
            # Run workers on spot by default. Comment this out to use on-demand.
            # NOTE: If relying on spot instances, it is best to specify multiple different instance
            # types to avoid interruption when one instance type is experiencing heightened demand.
            # Demand information can be found at https://aws.amazon.com/ec2/spot/instance-advisor/
            InstanceMarketOptions:
                MarketType: spot
                # Additional options can be found in the boto docs, e.g.
                #   SpotOptions:
                #       MaxPrice: MAX_HOURLY_PRICE
            # Additional options in the boto docs.

# Specify the node type of the head node (as configured above).
head_node_type: ray.head.default

# Files or directories to copy to the head and worker nodes. The format is a
# dictionary from REMOTE_PATH: LOCAL_PATH, e.g.
file_mounts: {
#    "/path1/on/remote/machine": "/path1/on/local/machine",
#    "/path2/on/remote/machine": "/path2/on/local/machine",
}

# Files or directories to copy from the head node to the worker nodes. The format is a
# list of paths. The same path on the head node will be copied to the worker node.
# This behavior is a subset of the file_mounts behavior. In the vast majority of cases
# you should just use file_mounts. Only use this if you know what you're doing!
cluster_synced_files: []

# Whether changes to directories in file_mounts or cluster_synced_files in the head node
# should sync to the worker node continuously
file_mounts_sync_continuously: False

# Patterns for files to exclude when running rsync up or rsync down
rsync_exclude:
    - "**/.git"
    - "**/.git/**"

# Pattern files to use for filtering out files when running rsync up or rsync down. The file is searched for
# in the source directory and recursively through all subdirectories. For example, if .gitignore is provided
# as a value, the behavior will match git's behavior for finding and using .gitignore files.
rsync_filter:
    - ".gitignore"

# List of commands that will be run before `setup_commands`. If docker is
# enabled, these commands will run outside the container and before docker
# is setup.
initialization_commands: []

# List of shell commands to run to set up nodes.
setup_commands: []
    # Note: if you're developing Ray, you probably want to create a Docker image that
    # has your Ray repo pre-cloned. Then, you can replace the pip installs
    # below with a git checkout <your_sha> (and possibly a recompile).
    # To run the nightly version of ray (as opposed to the latest), either use a rayproject docker image
    # that has the "nightly" (e.g. "rayproject/ray-ml:nightly-gpu") or uncomment the following line:
    # - pip install -U "ray[default] @ https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-3.0.0.dev0-cp37-cp37m-manylinux2014_x86_64.whl"

# Custom commands that will be run on the head node after common setup.
head_setup_commands: []

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
    - ray stop
    - ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host=0.0.0.0

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
    - ray stop
    - ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076

假设您已将此配置存储在名为 cluster.yaml 的文件中,您现在可以按如下方式启动 AWS 集群:

ray submit cluster.yaml example.py --start

了解有关启动 Ray Cluster 的更多信息

调试和监控快速入门#

使用内置的可观察性工具来监视和调试 Ray 应用程序和集群。

ray Ray Dashboard: Web GUI to monitor and debug Ray

Ray仪表板提供可视化界面,显示实时系统指标、节点级资源监控、作业分析和任务可视化。该仪表板旨在帮助用户了解 Ray 应用程序的性能并识别潜在问题。

https://raw.githubusercontent.com/ray-project/Images/master/docs/new-dashboard/Dashboard-overview.png

Note

要开始使用仪表板,请安装默认安装,如下所示:

pip install -U "ray[default]"

通过默认 URL 访问仪表板,http://localhost:8265。

了解更多 Ray Dashboard

ray Ray State APIs: CLI to access cluster states

Ray 状态 API 允许用户通过 CLI 或 Python SDK 方便地访问 Ray 的当前状态(快照)。

Note

要开始使用状态 API,请参考默认安装,如下所示:

pip install -U "ray[default]"

运行以下代码。

    import ray
    import time

    ray.init(num_cpus=4)

    @ray.remote
    def task_running_300_seconds():
        print("Start!")
        time.sleep(300)

    @ray.remote
    class Actor:
        def __init__(self):
            print("Actor created")

    # Create 2 tasks
    tasks = [task_running_300_seconds.remote() for _ in range(2)]

    # Create 2 actors
    actors = [Actor.remote() for _ in range(2)]

    ray.get(tasks)

使用 ray summary tasks 查看 Ray 任务的汇总统计数据。

    ray summary tasks
    ======== Tasks Summary: 2022-07-22 08:54:38.332537 ========
    Stats:
    ------------------------------------
    total_actor_scheduled: 2
    total_actor_tasks: 0
    total_tasks: 2


    Table (group by func_name):
    ------------------------------------
        FUNC_OR_CLASS_NAME        STATE_COUNTS    TYPE
    0   task_running_300_seconds  RUNNING: 2      NORMAL_TASK
    1   Actor.__init__            FINISHED: 2     ACTOR_CREATION_TASK

学习更多关于 Ray State APIs

了解更多#

以下是一些关于 Ray 及类库的演讲、论文和新闻报道。 如果以下任何链接断开,或者你想添加自己的演讲,请提出问题!

博客和印刷#

讨论 (视频)#

演示#

论文#