数据加载
Contents
数据加载#
Ray Data 从各种来源加载数据。本指南向您展示如何:
读取文件#
Ray Data 从本地磁盘或云存储中读取多种文件格式的文件。 要查看支持的文件格式的完整列表,请参阅 Input/Output 参考。
要读取 Parquet 文件,请调用 read_parquet()。
import ray
ds = ray.data.read_parquet("local:///tmp/iris.parquet")
print(ds.schema())
Column Type
------ ----
sepal.length double
sepal.width double
petal.length double
petal.width double
variety string
读取原始图像,请调用 read_images()。Ray Data 将图像表示为 NumPy ndarray。
import ray
ds = ray.data.read_images("local:///tmp/batoidea/JPEGImages/")
print(ds.schema())
Column Type
------ ----
image numpy.ndarray(shape=(32, 32, 3), dtype=uint8)
要读取文本行,请调用 read_text()。
import ray
ds = ray.data.read_text("local:///tmp/this.txt")
print(ds.schema())
Column Type
------ ----
text string
要读取 CSV 文件,请调用 read_csv()。
import ray
ds = ray.data.read_csv("local:///tmp/iris.csv")
print(ds.schema())
Column Type
------ ----
sepal length (cm) double
sepal width (cm) double
petal length (cm) double
petal width (cm) double
target int64
要读取原始二进制文件,请调用 read_binary_files()。
import ray
ds = ray.data.read_binary_files("local:///tmp/file.dat")
print(ds.schema())
Column Type
------ ----
bytes binary
要读取 TFRecords 文件,请调用 read_tfrecords()。
import ray
ds = ray.data.read_tfrecords("local:///tmp/iris.tfrecords")
print(ds.schema())
Column Type
------ ----
sepal length (cm) double
sepal width (cm) double
petal length (cm) double
petal width (cm) double
target int64
从本地磁盘读取文件#
要从本地磁盘读取文件,请调用如 read_parquet() 函数,并使用
local:// 协议指定路径。路径可以指向文件或目录。
要读取 Parquet 以外的格式,请参阅 Input/Output 参考。
Tip
如果您的文件可以在每个节点上访问,请排除 local:// 以在集群中并行读取任务。
import ray
ds = ray.data.read_parquet("local:///tmp/iris.parquet")
print(ds.schema())
Column Type
------ ----
sepal.length double
sepal.width double
petal.length double
petal.width double
variety string
从云存储读取文件#
要读取云存储中的文件,请向云服务提供商验证所有节点。然后,调用类似方法
read_parquet() 并指定具有适当架构的 URI。
URI 可以指向存储桶、文件夹或对象。
要读取 Parquet 以外的格式,请参阅 Input/Output 参考。
要从 Amazon S3 读取文件,请使用 s3:// 协议。
import ray
ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
print(ds.schema())
Column Type
------ ----
sepal.length double
sepal.width double
petal.length double
petal.width double
variety string
要从 Google Cloud Storage 读取文件,请安装 Google Cloud Storage 的文件系统接口
pip install gcsfs
然后,创建一个 GCSFileSystem 并使用 gcs:// 指定 URI。
import ray
ds = ray.data.read_parquet("s3://anonymous@ray-example-data/iris.parquet")
print(ds.schema())
Column Type
------ ----
sepal.length double
sepal.width double
petal.length double
petal.width double
variety string
要从 Azure Blob 存储读取文件,请将 文件系统接口安装到 Azure-Datalake Gen1 和 Gen2 存储
pip install adlfs
然后,创建一个 AzureBlobFileSystem 并使用 az:// 协议的 URI。
import adlfs
import ray
ds = ray.data.read_parquet(
"az://ray-example-data/iris.parquet",
adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
)
print(ds.schema())
Column Type
------ ----
sepal.length double
sepal.width double
petal.length double
petal.width double
variety string
从 NFS 读取文件#
要从 NFS 文件系统读取文件,请调用类似函数 read_parquet()
并指定已挂载文件系统上的文件。路径可以指向文件或目录。
要读取 Parquet 以外的格式,请参阅 Input/Output 参考。
import ray
ds = ray.data.read_parquet("/mnt/cluster_storage/iris.parquet")
print(ds.schema())
Column Type
------ ----
sepal.length double
sepal.width double
petal.length double
petal.width double
variety string
处理压缩文件#
要读取压缩文件,请再 compression 中指定 arrow_open_stream_args 。
您可以使用 Arrow 支持的任何编解码器。
import ray
ds = ray.data.read_csv(
"s3://anonymous@ray-example-data/iris.csv.gz",
arrow_open_stream_args={"compression": "gzip"},
)
从其他库加载数据#
从单节点数据库加载数据#
Ray Data 与 pandas、NumPy 和 Arrow 等库进行互操作。
要从Python 对象创建 Dataset ,调用
from_items() 并传入 Dict. Ray Data 将每个据 Dict 数据视为一行。
import ray
ds = ray.data.from_items([
{"food": "spam", "price": 9.34},
{"food": "ham", "price": 5.37},
{"food": "eggs", "price": 0.94}
])
print(ds)
MaterializedDataset(
num_blocks=3,
num_rows=3,
schema={food: string, price: double}
)
您还可以从常规 Python 对象列表中创建一个 Dataset 。
import ray
ds = ray.data.from_items([1, 2, 3, 4, 5])
print(ds)
MaterializedDataset(num_blocks=5, num_rows=5, schema={item: int64})
To create a Dataset from a NumPy array, call
from_numpy(). Ray Data treats the outer axis as the row
dimension.
import numpy as np
import ray
array = np.ones((3, 2, 2))
ds = ray.data.from_numpy(array)
print(ds)
MaterializedDataset(
num_blocks=1,
num_rows=3,
schema={data: numpy.ndarray(shape=(2, 2), dtype=double)}
)
To create a Dataset from a pandas DataFrame, call
from_pandas().
import pandas as pd
import ray
df = pd.DataFrame({
"food": ["spam", "ham", "eggs"],
"price": [9.34, 5.37, 0.94]
})
ds = ray.data.from_pandas(df)
print(ds)
MaterializedDataset(
num_blocks=1,
num_rows=3,
schema={food: object, price: float64}
)
To create a Dataset from an Arrow table, call
from_arrow().
import pyarrow as pa
table = pa.table({
"food": ["spam", "ham", "eggs"],
"price": [9.34, 5.37, 0.94]
})
ds = ray.data.from_arrow(table)
print(ds)
MaterializedDataset(
num_blocks=1,
num_rows=3,
schema={food: string, price: double}
)
从分布式 DataFrame 库加载数据#
Ray Data 与 Dask、 Spark、 Modin 和 Mars 等分布式数据处理框架进行互操作 。
要从
Dask DataFrame 创建 Dataset,调用
from_dask()。
该函数构造一个由 Dask DataFrame 的分布式 Pandas DataFrame 分区支持的 Dataset。
import dask.dataframe as dd
import pandas as pd
import ray
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
ddf = dd.from_pandas(df, npartitions=4)
# Create a Dataset from a Dask DataFrame.
ds = ray.data.from_dask(ddf)
ds.show(3)
{'string': 'spam', 'number': 0}
{'string': 'ham', 'number': 1}
{'string': 'eggs', 'number': 2}
从 Spark DataFrame 创建 Dataset,调用
from_spark().
该函数构造一个由 Spark DataFrame 的分布式 Pandas DataFrame 分区支持的 Dataset。
import ray
import raydp
spark = raydp.init_spark(app_name="Spark -> Datasets Example",
num_executors=2,
executor_cores=2,
executor_memory="500MB")
df = spark.createDataFrame([(i, str(i)) for i in range(10000)], ["col1", "col2"])
ds = ray.data.from_spark(df)
ds.show(3)
{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}
要从 Modin DataFrame 创建 Dataset,调用
from_modin()。
该函数构造一个由 Modin DataFrame 的分布式 Pandas DataFrame 分区支持的 Dataset。
import modin.pandas as md
import pandas as pd
import ray
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
mdf = md.DataFrame(df)
# Create a Dataset from a Modin DataFrame.
ds = ray.data.from_modin(mdf)
ds.show(3)
{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}
从 Mars DataFrame 创建 Dataset ,调用
from_mars().
该函数构造一个由 Mars DataFrame 的分布式 Pandas DataFrame 分区支持的 Dataset。
import mars
import mars.dataframe as md
import pandas as pd
import ray
cluster = mars.new_cluster_in_ray(worker_num=2, worker_cpu=1)
df = pd.DataFrame({"col1": list(range(10000)), "col2": list(map(str, range(10000)))})
mdf = md.DataFrame(df, num_partitions=8)
# Create a tabular Dataset from a Mars DataFrame.
ds = ray.data.from_mars(mdf)
ds.show(3)
{'col1': 0, 'col2': '0'}
{'col1': 1, 'col2': '1'}
{'col1': 2, 'col2': '2'}
从 ML 库加载数据#
Ray Data 与 HuggingFace 和 TensorFlow 数据集互操作。
要将 🤗 数据集转换为 Ray 数据集,请调用
from_huggingface()。
该函数构造一个由 🤗 数据集的分布式 Pandas DataFrame 分区支持的 Dataset。
Warning
from_huggingface 不支持并行读取。对于内存中 🤗 数据集来说这不是问题,但对于大型内存映射 🤗 数据集可能会失败。此外, 🤗 IterableDataset 对象不支持。
import ray.data
from datasets import load_dataset
hf_ds = load_dataset("wikitext", "wikitext-2-raw-v1")
ray_ds = ray.data.from_huggingface(hf_ds["train"])
ray_ds.take(2)
[{'text': ''}, {'text': ' = Valkyria Chronicles III = \n'}]
要转换 TensorFlow dataset 为 Ray Dataset,调用 from_tf()。
Warning
from_tf 不支持并行读取。仅将此函数用于 MNIST 或 CIFAR 等小型数据集。
import ray
import tensorflow_datasets as tfds
tf_ds, _ = tfds.load("cifar10", split=["train", "test"])
ds = ray.data.from_tf(tf_ds)
print(ds)
MaterializedDataset(
num_blocks=...,
num_rows=50000,
schema={
id: binary,
image: numpy.ndarray(shape=(32, 32, 3), dtype=uint8),
label: int64
}
)
读取数据库#
Ray Data 从 MySQL、PostgreSQL 和 MongoDB 等数据库读取。
读取 SQL 数据库#
调用 Python DB API2 标准 连接器的 read_sql() 从数据库中读取数据 。
要从 MySQL 读取数据,请安装 MySQL Connector/Python。它是第一方 MySQL 数据库连接器。
pip install mysql-connector-python
然后,定义连接逻辑并查询数据库。
import mysql.connector
import ray
def create_connection():
return mysql.connector.connect(
user="admin",
password=...,
host="example-mysql-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com",
connection_timeout=30,
database="example",
)
# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
"SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
"SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)
To read from PostgreSQL, install Psycopg 2. It’s the most popular PostgreSQL database connector.
pip install psycopg2-binary
Then, define your connection logic and query the database.
import psycopg2
import ray
def create_connection():
return psycopg2.connect(
user="postgres",
password=...,
host="example-postgres-database.c2c2k1yfll7o.us-west-2.rds.amazonaws.com",
dbname="example",
)
# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
"SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
"SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)
To read from Snowflake, install the Snowflake Connector for Python.
pip install snowflake-connector-python
Then, define your connection logic and query the database.
import snowflake.connector
import ray
def create_connection():
return snowflake.connector.connect(
user=...,
password=...
account="ZZKXUVH-IPB52023",
database="example",
)
# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
"SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
"SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)
To read from Databricks, install the Databricks SQL Connector for Python.
pip install databricks-sql-connector
Then, define your connection logic and read from the Databricks SQL warehouse.
from databricks import sql
import ray
def create_connection():
return sql.connect(
server_hostname="dbc-1016e3a4-d292.cloud.databricks.com",
http_path="/sql/1.0/warehouses/a918da1fc0b7fed0",
access_token=...,
# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
"SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
"SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)
To read from BigQuery, install the Python Client for Google BigQuery. This package includes a DB API2-compliant database connector.
pip install google-cloud-bigquery
Then, define your connection logic and query the dataset.
from google.cloud import bigquery
from google.cloud.bigquery import dbapi
import ray
def create_connection():
client = bigquery.Client(...)
return dbapi.Connection(client)
# Get all movies
dataset = ray.data.read_sql("SELECT * FROM movie", create_connection)
# Get movies after the year 1980
dataset = ray.data.read_sql(
"SELECT title, score FROM movie WHERE year >= 1980", create_connection
)
# Get the number of movies per year
dataset = ray.data.read_sql(
"SELECT year, COUNT(*) FROM movie GROUP BY year", create_connection
)
读取 MongoDB#
要从 MongoDB 读取数据,请调用 read_mongo() 并指定源 URI、数据库和集合。
您还需要指定针对集合运行的管道。
import ray
# Read a local MongoDB.
ds = ray.data.read_mongo(
uri="mongodb://localhost:27017",
database="my_db",
collection="my_collection",
pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}],
)
# Reading a remote MongoDB is the same.
ds = ray.data.read_mongo(
uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin",
database="my_db",
collection="my_collection",
pipeline=[{"$match": {"col": {"$gte": 0, "$lt": 10}}}, {"$sort": "sort_col"}],
)
# Write back to MongoDB.
ds.write_mongo(
MongoDatasource(),
uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin",
database="my_db",
collection="my_collection",
)
创建合成数据#
综合数据集可用于测试和基准测试。
要从一系列整数创建合成 Dataset ,调用
range()。 Ray Data 将整数范围存储在单列中。
import ray
ds = ray.data.range(10000)
print(ds.schema())
Column Type
------ ----
id int64
To create a synthetic Dataset containing arrays, call
range_tensor(). Ray Data packs an integer range into ndarrays of
the provided shape.
import ray
ds = ray.data.range_tensor(10, shape=(64, 64))
print(ds.schema())
Column Type
------ ----
data numpy.ndarray(shape=(64, 64), dtype=int64)
加载其他数据源#
如果 Ray Data 无法加载您的数据,请使用
Datasource。然后,构建自定义数据源的实例并将其传递给
给 read_datasource()。
# Read from a custom datasource.
ds = ray.data.read_datasource(YourCustomDatasource(), **read_args)
# Write to a custom datasource.
ds.write_datasource(YourCustomDatasource(), **write_args)
有关示例,请参阅 实现自定义数据源。
性能考虑#
parallelism数据集决定了基础数据被分割成并行读取的块数。Ray Data 在内部决定同时运行多少个读取任务,以充分利用集群,范围从
1...parallelism个读取任务。
换句话说,并行度越高,Dataset 中的数据块越小,因此并行执行的机会就越多。
可以通过 parallelism 数覆盖此默认并行性; 有关如何调整读取并行性的更多信息,请参阅
性能指南 。