{ "cells": [ { "attachments": {}, "cell_type": "markdown", "id": "5fb89b3d", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "(xgboost-example-ref)=\n", "\n", "# Training a model with distributed XGBoost\n", "In this example we will train a model in Ray Train using distributed XGBoost." ] }, { "attachments": {}, "cell_type": "markdown", "id": "53d57c1f", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "Let's start with installing our dependencies:" ] }, { "cell_type": "code", "execution_count": 21, "id": "41f20cc1", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "\n", "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip available: \u001b[0m\u001b[31;49m22.3.1\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m23.1.2\u001b[0m\n", "\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n" ] } ], "source": [ "!pip install -qU \"ray[data,train]\" xgboost_ray" ] }, { "attachments": {}, "cell_type": "markdown", "id": "d2fe8d4a", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "Then we need some imports:" ] }, { "cell_type": "code", "execution_count": 22, "id": "7232303d", "metadata": {}, "outputs": [], "source": [ "from typing import Tuple\n", "\n", "import ray\n", "from ray.data import Dataset, Preprocessor\n", "from ray.data.preprocessors import StandardScaler\n", "from ray.train.xgboost import XGBoostTrainer\n", "from ray.train import Result, ScalingConfig\n", "import xgboost" ] }, { "attachments": {}, "cell_type": "markdown", "id": "1c75b5ca", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "Next we define a function to load our train, validation, and test datasets." ] }, { "cell_type": "code", "execution_count": 23, "id": "37c4f38f", "metadata": {}, "outputs": [], "source": [ "def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:\n", " dataset = ray.data.read_csv(\"s3://anonymous@air-example-data/breast_cancer.csv\")\n", " train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)\n", " test_dataset = valid_dataset.drop_columns([\"target\"])\n", " return train_dataset, valid_dataset, test_dataset" ] }, { "attachments": {}, "cell_type": "markdown", "id": "9b2850dd", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "The following function will create a XGBoost trainer, train it, and return the result." ] }, { "cell_type": "code", "execution_count": 24, "id": "dae8998d", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "def train_xgboost(num_workers: int, use_gpu: bool = False) -> Result:\n", " train_dataset, valid_dataset, _ = prepare_data()\n", "\n", " # Scale some random columns\n", " columns_to_scale = [\"mean radius\", \"mean texture\"]\n", " preprocessor = StandardScaler(columns=columns_to_scale)\n", " train_dataset = preprocessor.fit_transform(train_dataset)\n", " valid_dataset = preprocessor.transform(valid_dataset)\n", "\n", " # XGBoost specific params\n", " params = {\n", " \"tree_method\": \"approx\",\n", " \"objective\": \"binary:logistic\",\n", " \"eval_metric\": [\"logloss\", \"error\"],\n", " }\n", "\n", " trainer = XGBoostTrainer(\n", " scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),\n", " label_column=\"target\",\n", " params=params,\n", " datasets={\"train\": train_dataset, \"valid\": valid_dataset},\n", " num_boost_round=100,\n", " metadata = {\"preprocessor_pkl\": preprocessor.serialize()}\n", " )\n", " result = trainer.fit()\n", " print(result.metrics)\n", "\n", " return result" ] }, { "attachments": {}, "cell_type": "markdown", "id": "ce05af87", "metadata": {}, "source": [ "Once we have the result, we can do batch inference on the obtained model. Let's define a utility function for this." ] }, { "cell_type": "code", "execution_count": 25, "id": "5b8076d3", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "import pandas as pd\n", "from ray.train import Checkpoint\n", "from ray.data import ActorPoolStrategy\n", "\n", "\n", "class Predict:\n", "\n", " def __init__(self, checkpoint: Checkpoint):\n", " self.model = XGBoostTrainer.get_model(checkpoint)\n", " self.preprocessor = Preprocessor.deserialize(checkpoint.get_metadata()[\"preprocessor_pkl\"])\n", "\n", " def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:\n", " preprocessed_batch = self.preprocessor.transform_batch(batch)\n", " dmatrix = xgboost.DMatrix(preprocessed_batch)\n", " return {\"predictions\": self.model.predict(dmatrix)}\n", "\n", "\n", "def predict_xgboost(result: Result):\n", " _, _, test_dataset = prepare_data()\n", "\n", " scores = test_dataset.map_batches(\n", " Predict, \n", " fn_constructor_args=[result.checkpoint], \n", " compute=ActorPoolStrategy(), \n", " batch_format=\"pandas\"\n", " )\n", " \n", " predicted_labels = scores.map_batches(lambda df: (df > 0.5).astype(int), batch_format=\"pandas\")\n", " print(f\"PREDICTED LABELS\")\n", " predicted_labels.show()" ] }, { "attachments": {}, "cell_type": "markdown", "id": "7e172f66", "metadata": {}, "source": [ "Now we can run the training:" ] }, { "cell_type": "code", "execution_count": 26, "id": "0f96d62b", "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [ { "data": { "text/html": [ "
\n", "
\n", "
\n", "

Tune Status

\n", " \n", "\n", "\n", "\n", "\n", "\n", "
Current time:2023-07-06 18:33:25
Running for: 00:00:06.19
Memory: 14.9/64.0 GiB
\n", "
\n", "
\n", "
\n", "

System Info

\n", " Using FIFO scheduling algorithm.
Logical resource usage: 2.0/10 CPUs, 0/0 GPUs\n", "
\n", " \n", "
\n", "
\n", "
\n", "

Trial Status

\n", " \n", "\n", "\n", "\n", "\n", "\n", "\n", "
Trial name status loc iter total time (s) train-logloss train-error valid-logloss
XGBoostTrainer_40fed_00000TERMINATED127.0.0.1:40725 101 4.90132 0.00587595 0 0.06215
\n", "
\n", "
\n", "\n" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "name": "stderr", "output_type": "stream", "text": [ "\u001b[2m\u001b[36m(XGBoostTrainer pid=40725)\u001b[0m The `preprocessor` arg to Trainer is deprecated. Apply preprocessor transformations ahead of time by calling `preprocessor.transform(ds)`. Support for the preprocessor arg will be dropped in a future release.\n", "\u001b[2m\u001b[36m(XGBoostTrainer pid=40725)\u001b[0m Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.\n", "\u001b[2m\u001b[36m(XGBoostTrainer pid=40725)\u001b[0m Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Aggregate]\n", "\u001b[2m\u001b[36m(XGBoostTrainer pid=40725)\u001b[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n", "\u001b[2m\u001b[36m(XGBoostTrainer pid=40725)\u001b[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n", "\n", "\u001b[A\n", "\u001b[A\n", "\n", "\u001b[A\u001b[A\n", "\n", "(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/14 [00:00 TaskPoolMapOperator[MapBatches(StandardScaler._transform_pandas)]\n", "\n", "\u001b[A\n", "\n", "(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/14 [00:01 TaskPoolMapOperator[MapBatches(StandardScaler._transform_pandas)]\n", "\n", "\u001b[A\n", "\n", "(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/14 [00:01 ActorPoolMapOperator[MapBatches()->MapBatches(Predict)] -> TaskPoolMapOperator[MapBatches()]\n", "2023-07-06 18:33:28,112\tINFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n", "2023-07-06 18:33:28,114\tINFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n", "2023-07-06 18:33:28,150\tINFO actor_pool_map_operator.py:117 -- MapBatches()->MapBatches(Predict): Waiting for 1 pool actors to start...\n" ] }, { "name": "stdout", "output_type": "stream", "text": [ "PREDICTED LABELS\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " " ] }, { "name": "stdout", "output_type": "stream", "text": [ "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 0}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 0}\n", "{'predictions': 1}\n", "{'predictions': 0}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 0}\n", "{'predictions': 0}\n", "{'predictions': 1}\n", "{'predictions': 1}\n", "{'predictions': 0}\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "\r" ] } ], "source": [ "predict_xgboost(result)" ] } ], "metadata": { "jupytext": { "cell_metadata_filter": "-all", "main_language": "python", "notebook_metadata_filter": "-all" }, "kernelspec": { "display_name": "Python 3.8.10 ('venv': venv)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.2" }, "orphan": true, "vscode": { "interpreter": { "hash": "3c0d54d489a08ae47a06eae2fd00ff032d6cddb527c382959b7b2575f6a8167f" } } }, "nbformat": 4, "nbformat_minor": 5 }