ray.data.read_mongo
ray.data.read_mongo#
- ray.data.read_mongo(uri: str, database: str, collection: str, *, pipeline: Optional[List[Dict]] = None, schema: Optional[pymongoarrow.api.Schema] = None, parallelism: int = - 1, ray_remote_args: Dict[str, Any] = None, **mongo_args) ray.data.dataset.Dataset[source]#
Create a
Datasetfrom a MongoDB database.The data to read from is specified via the
uri,databaseandcollectionof the MongoDB. The dataset is created from the results of executingpipelineagainst thecollection. Ifpipelineis None, the entirecollectionis read.Tip
For more details about these MongoDB concepts, see the following: - URI: https://www.mongodb.com/docs/manual/reference/connection-string/ - Database and Collection: https://www.mongodb.com/docs/manual/core/databases-and-collections/ - Pipeline: https://www.mongodb.com/docs/manual/core/aggregation-pipeline/
To read the MongoDB in parallel, the execution of the pipeline is run on partitions of the collection, with a Ray read task to handle a partition. Partitions are created in an attempt to evenly distribute the documents into the specified number of partitions. The number of partitions is determined by
parallelismwhich can be requested from this interface or automatically chosen if unspecified (see theparallelismarg below).Examples
>>> import ray >>> from pymongoarrow.api import Schema >>> ds = ray.data.read_mongo( ... uri="mongodb://username:password@mongodb0.example.com:27017/?authSource=admin", # noqa: E501 ... database="my_db", ... collection="my_collection", ... pipeline=[{"$match": {"col2": {"$gte": 0, "$lt": 100}}}, {"$sort": "sort_field"}], # noqa: E501 ... schema=Schema({"col1": pa.string(), "col2": pa.int64()}), ... parallelism=10, ... )
- Parameters
uri – The URI of the source MongoDB where the dataset is read from. For the URI format, see details in the MongoDB docs.
database – The name of the database hosted in the MongoDB. This database must exist otherwise ValueError is raised.
collection – The name of the collection in the database. This collection must exist otherwise ValueError is raised.
pipeline – A MongoDB pipeline, which is executed on the given collection with results used to create Dataset. If None, the entire collection will be read.
schema – The schema used to read the collection. If None, it’ll be inferred from the results of pipeline.
parallelism – The requested parallelism of the read. Defaults to -1, which automatically determines the optimal parallelism for your configuration. You should not need to manually set this value in most cases. For details on how the parallelism is automatically determined and guidance on how to tune it, see Tuning read parallelism.
ray_remote_args – kwargs passed to
remote()in the read tasks.mongo_args – kwargs passed to aggregate_arrow_all() in pymongoarrow in producing Arrow-formatted results.
- Returns
Datasetproducing rows from the results of executing the pipeline on the specified MongoDB collection.- Raises
ValueError – if
databasedoesn’t exist.ValueError – if
collectiondoesn’t exist.
PublicAPI (alpha): This API is in alpha and may change before becoming stable.