数据保存#

Ray Data 允许您将数据保存在文件或其他 Python 对象中。

本指南向您展示如何:

将数据写入文件#

Ray Data写入本地磁盘和云存储。

将数据写入本地磁盘#

要将您的内容 Dataset 保存到本地磁盘,请调用 Dataset.write_parquet 方法并使用 local:// 指定本地目录。

Warning

如果您的集群包含多个节点并且您不使用 local://,Ray Data 会将不同分区的数据写入不同的节点。

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

ds.write_parquet("local:///tmp/iris/")

要将数据写入 Parquet 以外的格式,请阅读 Input/Output 参考

将数据写入云存储#

要将您的数据 Dataset 保存到云存储,请向您的云服务提供商验证所有节点。然后,调用类似方法 Dataset.write_parquet 并使用适当的方案指定 URI。 URI 可以指向存储桶或文件夹。

要将数据保存到 Amazon S3,请指定带有 s3:// 方案的 URI。

import ray

ds = ray.data.read_csv("local:///tmp/iris.csv")

ds.write_parquet("s3://my-bucket/my-folder")

To save data to Google Cloud Storage, install the Filesystem interface to Google Cloud Storage

pip install gcsfs

Then, create a GCSFileSystem and specify a URI with the gcs:// scheme.

import ray

ds = ray.data.read_csv("local:///tmp/iris.csv")

filesystem = gcsfs.GCSFileSystem(project="my-google-project")
ds.write_parquet("gcs://my-bucket/my-folder", filesystem=filesystem)

To save data to Azure Blob Storage, install the Filesystem interface to Azure-Datalake Gen1 and Gen2 Storage

pip install adlfs

Then, create a AzureBlobFileSystem and specify a URI with the az:// scheme.

import ray

ds = ray.data.read_csv("local:///tmp/iris.csv")

filesystem = adlfs.AzureBlobFileSystem(account_name="azureopendatastorage")
ds.write_parquet("az://my-bucket/my-folder", filesystem=filesystem)

要将数据写入 Parquet 以外的格式,请阅读 Input/Output 参考

将数据写入 NFS#

要将您的 Dataset 保存到 NFS 文件系统,请调用类似方法 Dataset.write_parquet 并指定挂载目录。

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

ds.write_parquet("/mnt/cluster_storage/iris")

要将数据写入 Parquet 以外的格式,请阅读 Input/Output 参考

更改输出文件的数量#

当您调用写入方法时,Ray Data 会将您的数据写入每个 block 的一个文件中。 要更改块数,请调用 repartition()

import os
import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
ds.repartition(2).write_csv("/tmp/two_files/")

print(os.listdir("/tmp/two_files/"))
['26b07dba90824a03bb67f90a1360e104_000003.csv', '26b07dba90824a03bb67f90a1360e104_000002.csv']

将数据集转换为其他 Python 库#

将数据集转换为 pandas#

要将 Dataset 转换为 pandas DataFrame,请调用 Dataset.to_pandas() 。 您的数据必须适合头节点上的内存。

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

df = ds.to_pandas()
print(df)
     sepal length (cm)  sepal width (cm)  ...  petal width (cm)  target
0                  5.1               3.5  ...               0.2       0
1                  4.9               3.0  ...               0.2       0
2                  4.7               3.2  ...               0.2       0
3                  4.6               3.1  ...               0.2       0
4                  5.0               3.6  ...               0.2       0
..                 ...               ...  ...               ...     ...
145                6.7               3.0  ...               2.3       2
146                6.3               2.5  ...               1.9       2
147                6.5               3.0  ...               2.0       2
148                6.2               3.4  ...               2.3       2
149                5.9               3.0  ...               1.8       2
<BLANKLINE>
[150 rows x 5 columns]

将数据集转换为分布式 DataFrame#

Ray Data 与 DaskSparkModinMars 等分布式数据处理框架进行互操作 。

要将 Dataset 转换成 Dask DataFrame ,调用 Dataset.to_dask()

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

df = ds.to_dask()

要将 Dataset 转换成 Spark DataFrame, 调用 Dataset.to_spark().

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

df = ds.to_spark()

要将 Dataset 转换成 Modin DataFrame,调用 Dataset.to_modin().

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

mdf = ds.to_modin()
要将 Dataset 转换成 Mars DataFrame,调用

Dataset.to_mars().

import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")

mdf = ds.to_mars()