ray.data.Dataset
ray.data.Dataset#
- class ray.data.Dataset(plan: ray.data._internal.plan.ExecutionPlan, epoch: int, lazy: bool = True, logical_plan: Optional[ray.data._internal.logical.interfaces.logical_plan.LogicalPlan] = None)[source]#
Bases:
objectA Dataset is a distributed data collection for data loading and processing.
Datasets are distributed pipelines that produce
ObjectRef[Block]outputs, where each block holds data in Arrow format, representing a shard of the overall data collection. The block also determines the unit of parallelism. For more details, see Ray Data Internals.Datasets can be created in multiple ways: from synthetic data via
range_*()APIs, from existing memory data viafrom_*()APIs (this creates a subclass of Dataset calledMaterializedDataset), or from external storage systems such as local disk, S3, HDFS etc. via theread_*()APIs. The (potentially processed) Dataset can be saved back to external storage systems via thewrite_*()APIs.Examples
import ray # Create dataset from synthetic data. ds = ray.data.range(1000) # Create dataset from in-memory data. ds = ray.data.from_items( [{"col1": i, "col2": i * 2} for i in range(1000)] ) # Create dataset from external storage system. ds = ray.data.read_parquet("s3://bucket/path") # Save dataset back to external storage system. ds.write_csv("s3://bucket/output")
Dataset has two kinds of operations: transformation, which takes in Dataset and outputs a new Dataset (e.g.
map_batches()); and consumption, which produces values (not a data stream) as output (e.g.iter_batches()).Dataset transformations are lazy, with execution of the transformations being triggered by downstream consumption.
Dataset supports parallel processing at scale: transformations such as
map_batches(), aggregations such asmin()/max()/mean(), grouping viagroupby(), shuffling operations such assort(),random_shuffle(), andrepartition().Examples
>>> import ray >>> ds = ray.data.range(1000) >>> # Transform batches (Dict[str, np.ndarray]) with map_batches(). >>> ds.map_batches(lambda batch: {"id": batch["id"] * 2}) MapBatches(<lambda>) +- Dataset(num_blocks=..., num_rows=1000, schema={id: int64}) >>> # Compute the maximum. >>> ds.max("id") 999 >>> # Shuffle this dataset randomly. >>> ds.random_shuffle() RandomShuffle +- Dataset(num_blocks=..., num_rows=1000, schema={id: int64}) >>> # Sort it back in order. >>> ds.sort("id") Sort +- Dataset(num_blocks=..., num_rows=1000, schema={id: int64})
Both unexecuted and materialized Datasets can be passed between Ray tasks and actors without incurring a copy. Dataset supports conversion to/from several more featureful dataframe libraries (e.g., Spark, Dask, Modin, MARS), and are also compatible with distributed TensorFlow / PyTorch.
PublicAPI: This API is stable across Ray releases.
Methods
__init__(plan, epoch[, lazy, logical_plan])Construct a Dataset (internal API).
add_column(col, fn, *[, compute])Add the given column to the dataset.
aggregate(*aggs)Aggregate values using one or more functions.
columns([fetch_if_missing])Returns the columns of this Dataset.
count()Count the number of records in the dataset.
deserialize_lineage(serialized_ds)Deserialize the provided lineage-serialized Dataset.
drop_columns(cols, *[, compute])Drop one or more columns from the dataset.
filter(fn, *[, compute])Filter out rows that don't satisfy the given predicate.
flat_map(fn, *[, compute, ...])Apply the given function to each row and then flatten results.
Get a list of references to the underlying blocks of this dataset.
groupby(key)Group rows of a
Datasetaccording to a column.Whether this dataset's lineage is able to be serialized for storage and later deserialized, possibly on a different cluster.
Return the list of input files for the dataset.
iter_batches(*[, prefetch_batches, ...])Return an iterator over batches of data.
iter_rows(*[, prefetch_blocks])Return an iterator over the rows in this dataset.
iter_tf_batches(*[, prefetch_batches, ...])Return an iterator over batches of data represented as TensorFlow tensors.
iter_torch_batches(*[, prefetch_batches, ...])Return an iterator over batches of data represented as Torch tensors.
iterator()Return a
DataIteratorover this dataset.lazy()Enable lazy evaluation.
limit(limit)Truncate the dataset to the first
limitrows.map(fn, *[, compute, fn_constructor_args, ...])Apply the given function to each row of this dataset.
map_batches(fn, *[, batch_size, compute, ...])Apply the given function to batches of data.
Execute and materialize this dataset into object store memory.
max([on, ignore_nulls])Return the maximum of one or more columns.
mean([on, ignore_nulls])Compute the mean of one or more columns.
min([on, ignore_nulls])Return the minimum of one or more columns.
Return the number of blocks of this dataset.
random_sample(fraction, *[, seed])Returns a new
Datasetcontaining a random fraction of the rows.random_shuffle(*[, seed, num_blocks])Randomly shuffle the rows of this
Dataset.randomize_block_order(*[, seed])repartition(num_blocks, *[, shuffle])repeat([times])Convert this into a DatasetPipeline by looping over this dataset.
schema([fetch_if_missing])Return the schema of the dataset.
select_columns(cols, *[, compute])Select one or more columns from the dataset.
Serialize this dataset's lineage, not the actual data or the existing data futures, to bytes that can be stored and later deserialized, possibly on a different cluster.
show([limit])Print up to the given number of rows from the
Dataset.Return the in-memory size of the dataset.
sort([key, descending])Sort the dataset by the specified key column or key function.
split(n, *[, equal, locality_hints])Materialize and split the dataset into
ndisjoint pieces.split_at_indices(indices)Materialize and split the dataset at the given indices (like
np.split).split_proportionately(proportions)Materialize and split the dataset using proportions.
stats()Returns a string containing execution timing information.
std([on, ddof, ignore_nulls])Compute the standard deviation of one or more columns.
streaming_split(n, *[, equal, locality_hints])Returns
nDataIteratorsthat can be used to read disjoint subsets of the dataset in parallel.sum([on, ignore_nulls])Compute the sum of one or more columns.
take([limit])Return up to
limitrows from theDataset.take_all([limit])Return all of the rows in this
Dataset.take_batch([batch_size, batch_format])Return up to
batch_sizerows from theDatasetin a batch.Convert this
Datasetinto a distributed set of PyArrow tables.to_dask([meta, verify_meta])Convert this
Datasetinto a Dask DataFrame.to_mars()Convert this
Datasetinto a Mars DataFrame.to_modin()Convert this
Datasetinto a Modin DataFrame.to_numpy_refs(*[, column])Converts this
Datasetinto a distributed set of NumPy ndarrays or dictionary of NumPy ndarrays.to_pandas([limit])Convert this
Datasetto a single pandas DataFrame.Converts this
Datasetinto a distributed set of Pandas dataframes.to_random_access_dataset(key[, num_workers])Convert this dataset into a distributed RandomAccessDataset (EXPERIMENTAL).
to_spark(spark)Convert this
Datasetinto a Spark DataFrame.to_tf(feature_columns, label_columns, *[, ...])Return a TensorFlow Dataset over this
Dataset.to_torch(*[, label_column, feature_columns, ...])Return a Torch IterableDataset over this
Dataset.train_test_split(test_size, *[, shuffle, seed])Materialize and split the dataset into train and test subsets.
union(*other)Materialize and concatenate
Datasetsacross rows.unique(column)List the unique elements in a given column.
window(*[, blocks_per_window, bytes_per_window])Convert this into a DatasetPipeline by windowing over data blocks.
write_csv(path, *[, filesystem, ...])Writes the
Datasetto CSV files.write_datasource(datasource, *[, ...])Writes the dataset to a custom
Datasource.write_images(path, column[, file_format, ...])Writes the
Datasetto images.write_json(path, *[, filesystem, ...])Writes the
Datasetto JSON and JSONL files.write_mongo(uri, database, collection[, ...])Writes the
Datasetto a MongoDB database.write_numpy(path, *, column[, filesystem, ...])Writes a column of the
Datasetto .npy files.write_parquet(path, *[, filesystem, ...])Writes the
Datasetto parquet files under the providedpath.write_sql(sql, connection_factory[, ...])Write to a database that provides a Python DB API2-compliant connector.
write_tfrecords(path, *[, tf_schema, ...])Write the
Datasetto TFRecord files.write_webdataset(path, *[, filesystem, ...])Writes the dataset to WebDataset files.
zip(other)Materialize and zip the columns of this dataset with the columns of another.
Attributes
Return the DataContext used to create this Dataset.