数据加载#

Ray Data 从各种来源加载数据。本指南向您展示如何:

读取文件#

Ray Data 从本地磁盘或云存储中读取多种文件格式的文件。 要查看支持的文件格式的完整列表,请参阅 Input/Output 参考

要读取 Parquet 文件,请调用 read_parquet()

import ray

ds = ray.data.read_parquet("local:///tmp/iris.parquet")

print(ds.schema())
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double
variety       string

读取原始图像,请调用 read_images()。Ray Data 将图像表示为 NumPy ndarray。

import ray

ds = ray.data.read_images("local:///tmp/batoidea/JPEGImages/")

print(ds.schema())
Column  Type
------  ----
image   numpy.ndarray(shape=(32, 32, 3), dtype=uint8)

要读取文本行,请调用 read_text()

import ray

ds = ray.data.read_text("local:///tmp/this.txt")

print(ds.schema())
Column  Type
------  ----
text    string

要读取 CSV 文件,请调用 read_csv()

import ray

ds = ray.data.read_csv("local:///tmp/iris.csv")

print(ds.schema())
Column             Type
------             ----
sepal length (cm)  double
sepal width (cm)   double
petal length (cm)  double
petal width (cm)   double
target             int64

要读取原始二进制文件,请调用 read_binary_files()

import ray

ds = ray.data.read_binary_files("local:///tmp/file.dat")

print(ds.schema())
Column  Type
------  ----
bytes   binary

要读取 TFRecords 文件,请调用 read_tfrecords()

import ray

ds = ray.data.read_tfrecords("local:///tmp/iris.tfrecords")

print(ds.schema())
Column             Type
------             ----
sepal length (cm)  double
sepal width (cm)   double
petal length (cm)  double
petal width (cm)   double
target             int64

从本地磁盘读取文件#

要从本地磁盘读取文件,请调用如 read_parquet() 函数,并使用 local:// 协议指定路径。路径可以指向文件或目录。

要读取 Parquet 以外的格式,请参阅 Input/Output 参考

Tip

如果您的文件可以在每个节点上访问,请排除 local:// 以在集群中并行读取任务。

import ray

ds = ray.data.read_parquet("local:///tmp/iris.parquet")

print(ds.schema())
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double
variety       string

从云存储读取文件#

要读取云存储中的文件,请向云服务提供商验证所有节点。然后,调用类似方法 read_parquet() 并指定具有适当架构的 URI。 URI 可以指向存储桶、文件夹或对象。

要读取 Parquet 以外的格式,请参阅 Input/Output 参考

要从 Amazon S3 读取文件,请使用 s3:// 协议。

import ray

ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")

print(ds.schema())
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double
variety       string

要从 Google Cloud Storage 读取文件,请安装 Google Cloud Storage 的文件系统接口

pip install gcsfs

然后,创建一个 GCSFileSystem 并使用 gcs:// 指定 URI。

import ray

ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")

print(ds.schema())
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double
variety       string

要从 Azure Blob 存储读取文件,请将 文件系统接口安装到 Azure-Datalake Gen1 和 Gen2 存储

pip install adlfs

然后,创建一个 AzureBlobFileSystem 并使用 az:// 协议的 URI。

import adlfs
import ray

ds = ray.data.read_parquet(
    "az://ray-example-data/iris.parquet",
    adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
)

print(ds.schema())
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double
variety       string

从 NFS 读取文件#

要从 NFS 文件系统读取文件,请调用类似函数 read_parquet() 并指定已挂载文件系统上的文件。路径可以指向文件或目录。

要读取 Parquet 以外的格式,请参阅 Input/Output 参考

import ray

ds = ray.data.read_parquet("/mnt/cluster_storage/iris.parquet")

print(ds.schema())
Column        Type
------        ----
sepal.length  double
sepal.width   double
petal.length  double
petal.width   double
variety       string

处理压缩文件#

要读取压缩文件,请再 compression 中指定 arrow_open_stream_args 。 您可以使用 Arrow 支持的任何编解码器

import ray

ds = ray.data.read_csv(
    "s3://anonymous@ray-example-data/iris.csv.gz",
    arrow_open_stream_args={"compression": "gzip"},
)

从其他库加载数据#

从单节点数据库加载数据#

Ray Data 与 pandas、NumPy 和 Arrow 等库进行互操作。

要从Python 对象创建 Dataset ,调用 from_items() 并传入 Dict. Ray Data 将每个据 Dict 数据视为一行。

import ray

ds = ray.data.from_items([
    {"food": "spam", "price": 9.34},
    {"food": "ham", "price": 5.37},
    {"food": "eggs", "price": 0.94}
])

print(ds)
MaterializedDataset(
   num_blocks=3,
   num_rows=3,
   schema={food: string, price: double}
)

您还可以从常规 Python 对象列表中创建一个 Dataset

import ray

ds = ray.data.from_items([1, 2, 3, 4, 5])

print(ds)
MaterializedDataset(num_blocks=5, num_rows=5, schema={item: int64})

To create a Dataset from a NumPy array, call from_numpy(). Ray Data treats the outer axis as the row dimension.

import numpy as np
import ray

array = np.ones((3, 2, 2))
ds = ray.data.from_numpy(array)

print(ds)
MaterializedDataset(
   num_blocks=1,
   num_rows=3,
   schema={data: numpy.ndarray(shape=(2, 2), dtype=double)}
)

To create a Dataset from a pandas DataFrame, call from_pandas().

import pandas as pd
import ray

df = pd.DataFrame({
    "food": ["spam", "ham", "eggs"],
    "price": [9.34, 5.37, 0.94]
})
ds = ray.data.from_pandas(df)

print(ds)
MaterializedDataset(
   num_blocks=1,
   num_rows=3,
   schema={food: object, price: float64}
)

To create a Dataset from an Arrow table, call from_arrow().

import pyarrow as pa

table = pa.table({
    "food": ["spam", "ham", "eggs"],
    "price": [9.34, 5.37, 0.94]
})
ds = ray.data.from_arrow(table)

print(ds)
MaterializedDataset(
   num_blocks=1,
   num_rows=3,
   schema={food: string, price: double}
)

从分布式 DataFrame 库加载数据#

Ray Data 与 DaskSparkModinMars 等分布式数据处理框架进行互操作 。

要从 Dask DataFrame 创建 Dataset,调用 from_dask()。 该函数构造一个由 Dask DataFrame 的分布式 Pandas DataFrame 分区支持的 Dataset

import dask.dataframe as dd
import pandas as pd
import ray

df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
ddf = dd.from_pandas(df, npartitions=4)
# Create a Dataset from a Dask DataFrame.
ds = ray.data.from_dask(ddf)

ds.show(3)
{'string': 'spam', 'number': 0}
{'string': 'ham', 'number': 1}
{'string': 'eggs', 'number': 2}

Spark DataFrame 创建 Dataset,调用 from_spark(). 该函数构造一个由 Spark DataFrame 的分布式 Pandas DataFrame 分区支持的 Dataset

import ray
import raydp

spark = raydp.init_spark(app_name="Spark -> Datasets Example",
                        num_executors=2,
                        executor_cores=2,
                        executor_memory="500MB")
df = spark.createDataFrame([(i, str(i)) for i in range(10000)], ["col1", "col2"])
ds = ray.data.from_spark(df)

ds.show(3)
{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}

要从 Modin DataFrame 创建 Dataset,调用 from_modin()。 该函数构造一个由 Modin DataFrame 的分布式 Pandas DataFrame 分区支持的 Dataset

import modin.pandas as md
import pandas as pd
import ray

df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
mdf = md.DataFrame(df)
# Create a Dataset from a Modin DataFrame.
ds = ray.data.from_modin(mdf)

ds.show(3)
{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}

从 Mars DataFrame 创建 Dataset ,调用 from_mars(). 该函数构造一个由 Mars DataFrame 的分布式 Pandas DataFrame 分区支持的 Dataset

import mars
import mars.dataframe as md
import pandas as pd
import ray

cluster = mars.new_cluster_in_ray(worker_num=2, worker_cpu=1)

df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
mdf = md.DataFrame(df, num_partitions=8)
# Create a tabular Dataset from a Mars DataFrame.
ds = ray.data.from_mars(mdf)

ds.show(3)
{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}

从 ML 库加载数据#

Ray Data 与 HuggingFace 和 TensorFlow 数据集互操作。

要将 🤗 数据集转换为 Ray 数据集,请调用 from_huggingface()。 该函数构造一个由 🤗 数据集的分布式 Pandas DataFrame 分区支持的 Dataset

Warning

from_huggingface 不支持并行读取。对于内存中 🤗 数据集来说这不是问题,但对于大型内存映射 🤗 数据集可能会失败。此外, 🤗 IterableDataset 对象不支持。

import ray.data
from datasets import load_dataset

hf_ds = load_dataset("wikitext", "wikitext-2-raw-v1")
ray_ds = ray.data.from_huggingface(hf_ds["train"])
ray_ds.take(2)
[{'text': ''}, {'text': ' = Valkyria Chronicles III = \n'}]

要转换 TensorFlow dataset 为 Ray Dataset,调用 from_tf()

Warning

from_tf 不支持并行读取。仅将此函数用于 MNIST 或 CIFAR 等小型数据集。

import ray
import tensorflow_datasets as tfds

tf_ds, _ = tfds.load("cifar10", split=["train", "test"])
ds = ray.data.from_tf(tf_ds)

print(ds)
MaterializedDataset(
   num_blocks=...,
   num_rows=50000,
   schema={
      id: binary,
      image: numpy.ndarray(shape=(32, 32, 3), dtype=uint8),
      label: int64
   }
)

读取数据库#

Ray Data 从 MySQL、PostgreSQL 和 MongoDB 等数据库读取。

读取 SQL 数据库#

调用 Python DB API2 标准 连接器的 read_sql() 从数据库中读取数据 。

要从 MySQL 读取数据,请安装 MySQL Connector/Python。它是第一方 MySQL 数据库连接器。

pip install mysql-connector-python

然后,定义连接逻辑并查询数据库。

import mysql.connector

import ray

def create_connection():
    return mysql.connector.connect(
        user="admin",
        password=...,
        host="example-mysql-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com",
        connection_timeout=30,
        database="example",
    )

# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
    "SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
    "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)

To read from PostgreSQL, install Psycopg 2. It’s the most popular PostgreSQL database connector.

pip install psycopg2-binary

Then, define your connection logic and query the database.

import psycopg2

import ray

def create_connection():
    return psycopg2.connect(
        user="postgres",
        password=...,
        host="example-postgres-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com",
        dbname="example",
    )

# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
    "SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
    "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)

To read from Snowflake, install the Snowflake Connector for Python.

pip install snowflake-connector-python

Then, define your connection logic and query the database.

import snowflake.connector

import ray

def create_connection():
    return snowflake.connector.connect(
        user=...,
        password=...
        account="ZZKXUVH-IPB52023",
        database="example",
    )

# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
    "SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
    "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)

To read from Databricks, install the Databricks SQL Connector for Python.

pip install databricks-sql-connector

Then, define your connection logic and read from the Databricks SQL warehouse.

from databricks import sql

import ray

def create_connection():
    return sql.connect(
        server_hostname="dbc-1016e3a4-d292.cloud.databricks.com",
        http_path="/sql/1.0/warehouses/a918da1fc0b7fed0",
        access_token=...,


# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
    "SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
    "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)

To read from BigQuery, install the Python Client for Google BigQuery. This package includes a DB API2-compliant database connector.

pip install google-cloud-bigquery

Then, define your connection logic and query the dataset.

from google.cloud import bigquery
from google.cloud.bigquery import dbapi

import ray

def create_connection():
    client = bigquery.Client(...)
    return dbapi.Connection(client)

# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
    "SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
    "SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)

读取 MongoDB#

要从 MongoDB 读取数据,请调用 read_mongo() 并指定源 URI、数据库和集合。 您还需要指定针对集合运行的管道。

import ray

# Read a local MongoDB.
ds = ray.data.read_mongo(
    uri="mongodb://localhost:27017",
    database="my_db",
    collection="my_collection",
    pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}],
)

# Reading a remote MongoDB is the same.
ds = ray.data.read_mongo(
    uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin",
    database="my_db",
    collection="my_collection",
    pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}],
)

# Write back to MongoDB.
ds.write_mongo(
    MongoDatasource(),
    uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin",
    database="my_db",
    collection="my_collection",
)

创建合成数据#

综合数据集可用于测试和基准测试。

要从一系列整数创建合成 Dataset ,调用 range()。 Ray Data 将整数范围存储在单列中。

import ray

ds = ray.data.range(10000)

print(ds.schema())
Column  Type
------  ----
id      int64

To create a synthetic Dataset containing arrays, call range_tensor(). Ray Data packs an integer range into ndarrays of the provided shape.

import ray

ds = ray.data.range_tensor(10, shape=(64, 64))

print(ds.schema())
Column  Type
------  ----
data    numpy.ndarray(shape=(64, 64), dtype=int64)

加载其他数据源#

如果 Ray Data 无法加载您的数据,请使用 Datasource。然后,构建自定义数据源的实例并将其传递给 给 read_datasource()

# Read from a custom datasource.
ds = ray.data.read_datasource(YourCustomDatasource(), **read_args)

# Write to a custom datasource.
ds.write_datasource(YourCustomDatasource(), **write_args)

有关示例,请参阅 实现自定义数据源

性能考虑#

parallelism 数据集决定了基础数据被分割成并行读取的块数。

Ray Data 在内部决定同时运行多少个读取任务,以充分利用集群,范围从 1...parallelism 个读取任务。

换句话说,并行度越高,Dataset 中的数据块越小,因此并行执行的机会就越多。

../_images/dataset-read.svg

可以通过 parallelism 数覆盖此默认并行性; 有关如何调整读取并行性的更多信息,请参阅 性能指南