{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "(mmt-datasets)=\n", "\n", "# Batch Training with Ray Data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Batch training** and tuning are common tasks in simple machine learning use-cases such as time series forecasting. They require fitting of simple models on data batches corresponding to different locations, products, etc. Batch training can take less time to process all the data at once, but only if those batches can run in parallel!\n", "\n", "This notebook showcases how to conduct batch training regression algorithms from [XGBoost](https://docs.ray.io/en/latest/tune/examples/tune-xgboost.html) and [Scikit-learn](https://docs.ray.io/en/latest/ray-more-libs/joblib.html) with **[Ray Data](data)**. **XGBoost** is a popular open-source library used for regression and classification. **Scikit-learn** is a popular open-source library with a vast assortment of well-known ML algorithms.\n", "\n", "```{tip}\n", "The workload showcased in this notebook can be expressed using different Ray components, such as Ray Data, Ray Tune and Ray Core.\n", "For more information, including best practices, see {ref}`ref-use-cases-mmt`.\n", "```\n", "\n", "![Batch training diagram](../../data/examples/images/batch-training.svg)\n", "\n", "For the data, we will use the [NYC Taxi dataset](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page). This popular tabular dataset contains historical taxi pickups by timestamp and location in NYC.\n", "\n", "For the training, we will train separate regression models to predict `trip_duration`, with a different model for each dropoff location in NYC. Specifically, we will conduct an experiment for each `dropoff_location_id`, to find the best either XGBoost or Scikit-learn model, per location." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "# Contents\n", "\n", "In this this tutorial, you will learn about:\n", " 1. [Creating a Dataset](#create_ds)\n", " 2. [Filtering a Dataset on Read](#filter_ds)\n", " 3. [Inspecting a Dataset](#inspect_ds)\n", " 4. [Transforming a Dataset in parallel](#transform_ds)\n", " 5. [Batch training with Ray Data in parallel](#batch_train_ds)\n", " 6. [Load a saved model and perform batch prediction](#load_model)\n", "\n", "# Walkthrough\n", "\n", "Let us start by importing a few required libraries, including open-source [Ray](https://github.com/ray-project/ray) itself!" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of CPUs in this system: 8\n", "numpy: 1.23.3\n", "pyarrow: 6.0.1\n" ] } ], "source": [ "import os\n", "num_cpu = os.cpu_count()\n", "\n", "print(f\"Number of CPUs in this system: {num_cpu}\")\n", "from typing import Tuple, List, Union, Optional, Callable\n", "import time\n", "import pandas as pd\n", "import numpy as np\n", "\n", "print(f\"numpy: {np.__version__}\")\n", "import pyarrow\n", "import pyarrow.parquet as pq\n", "import pyarrow.dataset as pds\n", "\n", "print(f\"pyarrow: {pyarrow.__version__}\")\n", "from ray.data import Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import ray\n", "\n", "if ray.is_initialized():\n", " ray.shutdown()\n", "ray.init()" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{'CPU': 8.0, 'object_store_memory': 9093674188.0, 'memory': 18187348379.0, 'node:172.31.174.62': 1.0}\n" ] } ], "source": [ "print(ray.cluster_resources())" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "# For benchmarking purposes, we can print the times of various operations.\n", "# In order to reduce clutter in the output, this is set to False by default.\n", "PRINT_TIMES = False\n", "\n", "\n", "def print_time(msg: str):\n", " if PRINT_TIMES:\n", " print(msg)\n", "\n", "\n", "# To speed things up, we’ll only use a small subset of the full dataset consisting of two last months of 2019.\n", "# You can choose to use the full dataset for 2018-2019 by setting the SMOKE_TEST variable to False.\n", "SMOKE_TEST = True" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Creating a Dataset " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "```{tip}\n", "Ray Data uses PyArrow dataset and table for reading or writing large parquet files. Its native multithreaded C++ adpater is faster than pandas `read_parquet`, even using `engine='pyarrow'`. For more details see [Ray Data User Guide](https://docs.ray.io/en/latest/data/user-guide.html).\n", "```\n", "\n", "[Ray Data](data) is the standard way to load and exchange data in Ray libraries and applications. We will use the [Ray Data APIs](data-api) to read the data and quickly inspect it.\n", "\n", "First, we will define some global variables we will use throughout the notebook, such as the list of S3 links to the files making up the dataset and the possible location IDs." ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "NYC Taxi using 1 file(s)!\n", "s3_files: ['s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/2019/06/data.parquet/ab5b9d2b8cc94be19346e260b543ec35_000000.parquet']\n", "Locations: [141, 229, 173]\n" ] } ], "source": [ "# Define some global variables.\n", "TARGET = \"trip_duration\"\n", "s3_partitions = pds.dataset(\n", " \"s3://anonymous@air-example-data/ursa-labs-taxi-data/by_year/\",\n", " partitioning=[\"year\", \"month\"],\n", ")\n", "s3_files = [f\"s3://anonymous@{file}\" for file in s3_partitions.files]\n", "\n", "# Obtain all location IDs\n", "location_ids = (\n", " pq.read_table(s3_files[0], columns=[\"dropoff_location_id\"])[\"dropoff_location_id\"]\n", " .unique()\n", " .to_pylist()\n", ")\n", "\n", "# Use smoke testing or not.\n", "starting_idx = -1 if SMOKE_TEST else 0\n", "# drop location 199 to test error-handling before final git checkin\n", "sample_locations = [141, 229, 173] if SMOKE_TEST else location_ids\n", "\n", "# Display what data will be used.\n", "s3_files = s3_files[starting_idx:]\n", "print(f\"NYC Taxi using {len(s3_files)} file(s)!\")\n", "print(f\"s3_files: {s3_files}\")\n", "print(f\"Locations: {sample_locations}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The easiest way to create a ray dataset is to use `ray.data.read_parquet` to read parquet files in parallel onto the Ray cluster.\n", "\n", "Uncomment the cell below if you want to try it out. " ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# # This cell is commented out because it can take a long time!\n", "# # In the next section \"Filtering Read\" we make it faster.\n", "\n", "# # Read everything in the files list into a ray dataset.\n", "# start = time.time()\n", "# ds = ray.data.read_parquet(s3_files)\n", "# print(f\"Data loading time: {data_loading_time:.2f} seconds\")\n", "# ds" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Filtering a Dataset on Read \n", "\n", "Normally there is some last-mile data processing required before training. Let's just assume we know the data processing steps are:\n", "- Drop negative trip distances, 0 fares, 0 passengers.\n", "- Drop 2 unknown zones: `['264', '265']`.\n", "- Calculate trip duration and add it as a new column.\n", "- Drop trip durations smaller than 1 minute and greater than 24 hours.\n", "\n", "Instead of blindly reading all the data, it would be better if we only read the data we needed. This is similar concept to SQL `SELECT only rows, columns you need` vs `SELECT *`.\n", "\n", "```{tip}\n", "Best practice is to filter as much as you can directly in the Dataset `read_parquet()`.\n", "```\n", "\n", "Note that Ray Data' Parquet reader supports projection (column selection) and row filter pushdown, where we can push the above column selection and the row-based filter to the Parquet read. If we specify column selection at Parquet read time, the unselected columns won't even be read from disk. This can save a lot of memory, especially with big datasets, and allow us to avoid OOM issues.\n", "\n", "The row-based filter is specified via [Arrow's dataset field expressions](https://arrow.apache.org/docs/6.0/python/generated/pyarrow.dataset.Expression.html#pyarrow.dataset.Expression). \n" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "def pushdown_read_data(files_list: list, sample_ids: list) -> Dataset:\n", " start = time.time()\n", "\n", " filter_expr = (\n", " (pds.field(\"passenger_count\") > 0)\n", " & (pds.field(\"trip_distance\") > 0)\n", " & (pds.field(\"fare_amount\") > 0)\n", " & (~pds.field(\"pickup_location_id\").isin([264, 265]))\n", " & (~pds.field(\"dropoff_location_id\").isin([264, 265]))\n", " & (pds.field(\"dropoff_location_id\").isin(sample_ids))\n", " )\n", "\n", " dataset = ray.data.read_parquet(\n", " files_list,\n", " columns=[\n", " \"pickup_at\",\n", " \"dropoff_at\",\n", " \"pickup_location_id\",\n", " \"dropoff_location_id\",\n", " \"passenger_count\",\n", " \"trip_distance\",\n", " \"fare_amount\",\n", " ],\n", " filter=filter_expr,\n", " )\n", "\n", " data_loading_time = time.time() - start\n", " print_time(f\"Data loading time: {data_loading_time:.2f} seconds\")\n", "\n", " return dataset" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "2022-12-08 17:04:09,202\tWARNING read_api.py:291 -- ⚠️ The number of blocks in this dataset (1) limits its parallelism to 1 concurrent tasks. This is much less than the number of available CPU slots in the cluster. Use `.repartition(n)` to increase the number of dataset blocks.\n" ] } ], "source": [ "# Test the pushdown_read_data function\n", "ds_raw = pushdown_read_data(s3_files, sample_locations)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Inspecting a Dataset \n", "\n", "Let's get some basic statistics about our newly created Dataset.\n", "\n", "As our Dataset is backed by Parquet, we can obtain the number of rows from the metadata without triggering a full data read.\n" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of rows: 6941024\n" ] } ], "source": [ "print(f\"Number of rows: {ds_raw.count()}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Similarly, we can obtain the Dataset size (in bytes) from the metadata.\n" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Size bytes (from parquet metadata): 925892280\n" ] } ], "source": [ "print(f\"Size bytes (from parquet metadata): {ds_raw.size_bytes()}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "Let's fetch and inspect the schema of the underlying Parquet files." ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "Schema data types:\n", "pickup_at: timestamp[us]\n", "dropoff_at: timestamp[us]\n", "pickup_location_id: int32\n", "dropoff_location_id: int32\n", "passenger_count: int8\n", "trip_distance: float\n", "fare_amount: float\n" ] } ], "source": [ "print(\"\\nSchema data types:\")\n", "data_types = list(zip(ds_raw.schema().names, ds_raw.schema().types))\n", "for s in data_types:\n", " print(f\"{s[0]}: {s[1]}\")" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "### Transforming a Dataset in parallel using custom functions \n", "\n", "Ray Data allows you to specify custom data transform functions. These [user defined functions (UDFs)](transforming_data) can be called using `Dataset.map_batches(my_function)`. The transformation will be conducted in parallel for each data batch.\n", "\n", "```{tip}\n", "You may need to call `Dataset.repartition(n)` first to split the Dataset into more blocks internally. By default, each block corresponds to one file. The upper bound of parallelism is the number of blocks.\n", "```\n", "\n", "You can specify the data format you are using in the `batch_format` parameter. The dataset will be divided into batches and those batches converted into the specified format. Available data formats you can specify in the `batch_format` paramater include `\"pandas\", \"pyarrow\", \"numpy\"`. Tabular data will be passed into your UDF by default as a pandas DataFrame. Tensor data will be passed into your UDF as a numpy array.\n", "\n", "Here, we will use `batch_format=\"pandas\"` explicitly for clarity." ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "# A pandas DataFrame UDF for transforming the Dataset in parallel.\n", "def transform_df(input_df: pd.DataFrame) -> pd.DataFrame:\n", " df = input_df.copy()\n", "\n", " # calculate trip_duration\n", " df[\"trip_duration\"] = (df[\"dropoff_at\"] - df[\"pickup_at\"]).dt.seconds\n", " # filter trip_durations > 1 minute and less than 24 hours\n", " df = df[df[\"trip_duration\"] > 60]\n", " df = df[df[\"trip_duration\"] < 24 * 60 * 60]\n", " # keep only necessary columns\n", " df.drop(\n", " [\"dropoff_at\", \"pickup_at\", \"pickup_location_id\", \"fare_amount\"],\n", " axis=1,\n", " inplace=True,\n", " )\n", " df[\"dropoff_location_id\"] = df[\"dropoff_location_id\"].fillna(-1)\n", " return df" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Number of rows before transformation: 6941024\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "Read: 100%|██████████| 1/1 [00:01<00:00, 1.97s/it]\n", "Repartition: 100%|██████████| 6/6 [00:02<00:00, 2.87it/s]\n", "Map_Batches: 100%|██████████| 6/6 [00:02<00:00, 2.90it/s]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Number of rows after transformation: 285323\n", "CPU times: user 320 ms, sys: 114 ms, total: 434 ms\n", "Wall time: 6.19 s\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "%%time\n", "\n", "# Test the transform UDF.\n", "print(f\"Number of rows before transformation: {ds_raw.count()}\")\n", "\n", "# Repartition the dataset to allow for higher parallelism.\n", "# Best practice: repartition to all available cpu except a few, with a cap\n", "num_partitions = min(num_cpu - 2, 32)\n", "ds = ds_raw.repartition(num_partitions)\n", "\n", "# .map_batches applies a UDF to each partition of the data in parallel.\n", "ds = ds.map_batches(transform_df, batch_format=\"pandas\")\n", "\n", "# Verify row count.\n", "print(f\"Number of rows after transformation: {ds.count()}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Batch training with Ray Data " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have learned more about our data and written a pandas UDF to transform our data, we are ready to train a model on batches of this data in parallel.\n", "\n", "1. We will use the `dropoff_location_id` column in the dataset to group the dataset into data batches. \n", "2. Then we will fit a separate model for each batch to predict `trip_duration`." ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "sklearn: 1.1.2\n", "xgboost: 1.3.3\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/home/ray/anaconda3/lib/python3.8/site-packages/xgboost/compat.py:31: FutureWarning: pandas.Int64Index is deprecated and will be removed from pandas in a future version. Use pandas.Index with the appropriate dtype instead.\n", " from pandas import MultiIndex, Int64Index\n" ] } ], "source": [ "# import standard sklearn libraries\n", "import sklearn\n", "from sklearn.base import BaseEstimator\n", "from sklearn.model_selection import train_test_split\n", "from sklearn.linear_model import LinearRegression\n", "from sklearn.tree import DecisionTreeRegressor\n", "from sklearn.metrics import mean_absolute_error\n", "\n", "print(f\"sklearn: {sklearn.__version__}\")\n", "import xgboost as xgb\n", "\n", "print(f\"xgboost: {xgb.__version__}\")\n", "# set global random seed for sklearn models\n", "np.random.seed(415)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Define search space for training\n", "\n", "In this notebook, we will run parallel training jobs per data batch, drop-off location. The training jobs will be defined using a search space and simple grid search. Depending on your need, fancier search spaces and search algorithms are possible with [Ray Tune](https://docs.ray.io/en/master/tune/tutorials/tune-search-spaces.html#tune-search-space-tutorial).\n", "\n", "**Below, we define our search space consists of:**\n", "\n", "- Different algorithms, either:\n", " - Linear Regression or XGBoost Tree Regression.\n", " \n", "We want to train using every algorithm in the search space. What this means is every algorithm will be applied to every NYC Taxi drop-off location." ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "ALGORITHMS = [\n", " LinearRegression(fit_intercept=True),\n", " xgb.XGBRegressor(max_depth=4),\n", "]" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "### Define training functions\n", "\n", "We want to fit a linear regression model to the trip duration for each drop-off location. For scoring, we will calculate mean absolute error on the validation set, and report that as model error per drop-off location.\n", "\n", "The `fit_and_score_sklearn` function contains the logic necessary to fit a scikit-learn model and evaluate it using mean absolute error." ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "def fit_and_score_sklearn(\n", " train_df: pd.DataFrame, test_df: pd.DataFrame, model: BaseEstimator\n", ") -> pd.DataFrame:\n", "\n", " # Assemble train/test pandas dfs\n", " train_X = train_df[[\"passenger_count\", \"trip_distance\"]]\n", " train_y = train_df[TARGET]\n", " test_X = test_df[[\"passenger_count\", \"trip_distance\"]]\n", " test_y = test_df[TARGET]\n", "\n", " # Start training.\n", " model = model.fit(train_X, train_y)\n", " pred_y = model.predict(test_X)\n", "\n", " # Evaluate.\n", " error = sklearn.metrics.mean_absolute_error(test_y, pred_y)\n", " if error is None:\n", " error = 10000.0\n", "\n", " # Assemble return as a pandas dataframe.\n", " return_df = pd.DataFrame({\"model\": [model], \"error\": [error]})\n", "\n", " # return model, error\n", " return return_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `train_and_evaluate` function contains the logic for train-test splitting and fitting of a model using the `fit_and_score_sklearn` function.\n", "\n", "As an input, this function takes in a pandas DataFrame. When we call `Dataset.map_batches` or `Dataset.groupby().map_groups()`, the Dataset will be batched into multiple pandas DataFrames and this function will run for each batch in parallel. We will return the model and its error. Those results will be collected back into a Dataset." ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "def train_and_evaluate(\n", " df: pd.DataFrame, models: List[BaseEstimator], location_id: int\n", ") -> pd.DataFrame:\n", "\n", " # We need at least 4 rows to create a train / test split.\n", " if len(df) < 4:\n", " print_time(\n", " f\"Data batch for LocID {location_id} is empty or smaller than 4 rows\"\n", " )\n", " return None\n", "\n", " start = time.time()\n", "\n", " # Train / test split\n", " # Randomly split the data into 80/20 train/test.\n", " train_df, test_df = train_test_split(df, test_size=0.2, shuffle=True)\n", "\n", " # Launch a fit and score task for each model.\n", " # results is a list of pandas dataframes, one per model\n", " results = [fit_and_score_sklearn(train_df, test_df, model) for model in models]\n", "\n", " # Assemble location_id, name of model, and metrics in a pandas DataFrame\n", " results_df = pd.concat(results, axis=0, join=\"inner\", ignore_index=True)\n", " results_df.insert(0, column=\"location_id\", value=location_id)\n", "\n", " training_time = time.time() - start\n", " print_time(f\"Training time for LocID {location_id}: {training_time:.2f} seconds\")\n", "\n", " return results_df" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "Recall how we wrote a data transform `transform_batch` UDF? It was called with pattern:\n", "- `Dataset.map_batches(transform_batch, batch_format=\"pandas\")`\n", "\n", "Similarly, we can write a custom groupy-aggregate function `agg_func` which will run for each [Dataset *group-by*](transforming_groupby) group in parallel. The usage pattern is:\n", "- `Dataset.groupby(column).map_groups(agg_func, batch_format=\"pandas\")`.\n", "\n", "In the cell below, we define our custom `agg_func`." ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [], "source": [ "# A Pandas DataFrame aggregation function for processing\n", "# grouped batches of Dataset data.\n", "def agg_func(df: pd.DataFrame) -> pd.DataFrame:\n", " location_id = df[\"dropoff_location_id\"][0]\n", "\n", " # Handle errors in data groups\n", " try:\n", " # Transform the input pandas AND fit_and_evaluate the transformed pandas\n", " results_df = train_and_evaluate(df, ALGORITHMS, location_id)\n", " assert results_df is not None\n", " except Exception:\n", " # assemble a null entry\n", " print(f\"Failed on LocID {location_id}!\")\n", " results_df = pd.DataFrame(\n", " [[location_id, None, 10000.0]],\n", " columns=[\"location_id\", \"model\", \"error\"],\n", " dtypes=[\"int32\", BaseEstimator, \"float64\"],\n", " )\n", "\n", " return results_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run batch training using `map_groups`\n", "\n", "The main \"driver code\" reads each Parquet file (where each file corresponds to one month of NYC taxi data) into a Dataset `ds`. \n", "\n", "Then we use Dataset *group-by* to map each group into a batch of data and run `agg_func` on each grouping in parallel by calling `ds.groupby(\"dropoff_location_id\").map_groups(agg_func, batch_format=\"pandas\")`." ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Sort Sample: 100%|██████████| 6/6 [00:01<00:00, 4.17it/s]\n", "Shuffle Map: 100%|██████████| 6/6 [00:01<00:00, 3.67it/s]\n", "Shuffle Reduce: 100%|██████████| 6/6 [00:01<00:00, 3.61it/s]\n", "Map_Batches: 100%|██████████| 6/6 [01:43<00:00, 17.31s/it]" ] }, { "name": "stdout", "output_type": "stream", "text": [ "Total number of models: 6\n", "TOTAL TIME TAKEN: 108.69 seconds\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\n" ] } ], "source": [ "# Driver code to run this.\n", "\n", "start = time.time()\n", "\n", "# Read data into Dataset\n", "# ds = pushdown_read_data(s3_files, sample_locations)\\\n", "# .repartition(14)\\\n", "# .ds.map_batches(transform_df, batch_format=\"pandas\")\n", "\n", "# Use Dataset groupby.map_groups() to process each group in parallel and return a Dataset.\n", "results = ds.groupby(\"dropoff_location_id\").map_groups(agg_func, batch_format=\"pandas\")\n", "\n", "total_time_taken = time.time() - start\n", "print(f\"Total number of models: {results.count()}\")\n", "print(f\"TOTAL TIME TAKEN: {total_time_taken:.2f} seconds\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, we can inspect the models we have trained and their errors." ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Dataset(num_blocks=6, num_rows=6, schema={location_id: int32, model: object, error: float64})" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "results" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
location_idmodelerror
0141LinearRegression()535.858862
1141XGBRegressor(base_score=0.5, booster='gbtree',...527.156189
2173LinearRegression()1279.122424
3173XGBRegressor(base_score=0.5, booster='gbtree',...1377.166627
4229LinearRegression()556.860355
5229XGBRegressor(base_score=0.5, booster='gbtree',...559.876944
\n", "
" ], "text/plain": [ " location_id model error\n", "0 141 LinearRegression() 535.858862\n", "1 141 XGBRegressor(base_score=0.5, booster='gbtree',... 527.156189\n", "2 173 LinearRegression() 1279.122424\n", "3 173 XGBRegressor(base_score=0.5, booster='gbtree',... 1377.166627\n", "4 229 LinearRegression() 556.860355\n", "5 229 XGBRegressor(base_score=0.5, booster='gbtree',... 559.876944" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# sort values by location id\n", "results_df = results.to_pandas()\n", "results_df.sort_values(by=[\"location_id\"], ascending=True, inplace=True)\n", "results_df" ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "location_id int32\n", "model object\n", "error float64\n", "dtype: object" ] }, "execution_count": 22, "metadata": {}, "output_type": "execute_result" } ], "source": [ "results_df.dtypes" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "model object\n", "error float64\n", "dtype: object\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
modelerror
location_id
141XGBRegressor(base_score=0.5, booster='gbtree',...527.156189
229LinearRegression()556.860355
173LinearRegression()1279.122424
\n", "
" ], "text/plain": [ " model error\n", "location_id \n", "141 XGBRegressor(base_score=0.5, booster='gbtree',... 527.156189\n", "229 LinearRegression() 556.860355\n", "173 LinearRegression() 1279.122424" ] }, "execution_count": 23, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Keep only 1 model per location_id with minimum error\n", "final_df = results_df.copy()\n", "final_df = final_df.loc[(final_df.error > 0), :]\n", "final_df = final_df.loc[final_df.groupby(\"location_id\")[\"error\"].idxmin()]\n", "final_df.sort_values(by=[\"error\"], inplace=True)\n", "final_df.set_index(\"location_id\", inplace=True, drop=True)\n", "print(final_df.dtypes)\n", "final_df" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "model \n", "LinearRegression() 0.666667\n", "XGBRegressor(base_score=0.5, booster='gbtree', colsample_bylevel=1,\\n colsample_bynode=1, colsample_bytree=1, gamma=0, gpu_id=-1,\\n importance_type='gain', interaction_constraints='',\\n learning_rate=0.300000012, max_delta_step=0, max_depth=4,\\n min_child_weight=1, missing=nan, monotone_constraints='()',\\n n_estimators=100, n_jobs=8, num_parallel_tree=1, random_state=0,\\n reg_alpha=0, reg_lambda=1, scale_pos_weight=1, subsample=1,\\n tree_method='exact', validate_parameters=1, verbosity=None) 0.333333\n", "dtype: float64" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "final_df[[\"model\"]].astype(\"str\").value_counts(normalize=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Re-load a model and perform batch prediction " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will restore a regression model and demonstrate it can be used for prediction." ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "141" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Choose a dropoff location\n", "sample_location_id = final_df.index[0]\n", "sample_location_id" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "algorithm type:: \n", "sample_model type:: \n" ] } ], "source": [ "# Get the algorithm used\n", "sample_algorithm = final_df.loc[[sample_location_id]].model.values[0]\n", "print(f\"algorithm type:: {type(sample_algorithm)}\")\n", "\n", "# Get the saved model directly from the pandas dataframe of results\n", "sample_model = final_df.model[sample_location_id]\n", "print(f\"sample_model type:: {type(sample_model)}\")" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [], "source": [ "# Create some test data\n", "df = ds.to_pandas(limit=ds.count())\n", "df = df.loc[(df.dropoff_location_id == sample_location_id), :]\n", "_, test_df = train_test_split(df, test_size=0.2, shuffle=True)\n", "test_X = test_df[[\"passenger_count\", \"trip_distance\"]]\n", "test_y = np.array(test_df[TARGET]) # actual values" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "/home/ray/anaconda3/lib/python3.8/site-packages/xgboost/data.py:192: FutureWarning: pandas.Int64Index is deprecated and will be removed from pandas in a future version. Use pandas.Index with the appropriate dtype instead.\n", " from pandas import MultiIndex, Int64Index\n" ] }, { "data": { "text/html": [ "
\n", "\n", "\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
pred_ytrip_duration
01175.1190191174
1381.193146299
21099.7557371206
3260.620178566
4684.046021630
51038.442139852
61581.7628171596
7533.471680801
81618.9108891363
9695.661072715
\n", "
" ], "text/plain": [ " pred_y trip_duration\n", "0 1175.119019 1174\n", "1 381.193146 299\n", "2 1099.755737 1206\n", "3 260.620178 566\n", "4 684.046021 630\n", "5 1038.442139 852\n", "6 1581.762817 1596\n", "7 533.471680 801\n", "8 1618.910889 1363\n", "9 695.661072 715" ] }, "execution_count": 28, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# Perform batch prediction using restored model\n", "pred_y = sample_model.predict(test_X)\n", "\n", "# Zip together predictions and actuals to evaluate\n", "pd.DataFrame(zip(pred_y, test_y), columns=[\"pred_y\", \"trip_duration\"])[0:10]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Compare validation and test error.**\n", "\n", "During model training we reported error on \"validation\" data (random sample). Below, we will report error on a pretend \"test\" data set (a different random sample).\n", "\n", "Do a quick validation that both errors are reasonably close together." ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Test error: 930.7620476282492\n" ] } ], "source": [ "# Evaluate restored model on test data.\n", "error = sklearn.metrics.mean_absolute_error(test_y, pred_y)\n", "print(f\"Test error: {error}\")" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Validation error: 527.1561889430844\n" ] } ], "source": [ "# Compare test error with training validation error\n", "print(f\"Validation error: {final_df.error[sample_location_id]}\")\n", "\n", "# Validation and test errors should be reasonably close together." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.5" }, "vscode": { "interpreter": { "hash": "3c0d54d489a08ae47a06eae2fd00ff032d6cddb527c382959b7b2575f6a8167f" } } }, "nbformat": 4, "nbformat_minor": 4 }