{
"cells": [
{
"attachments": {},
"cell_type": "markdown",
"id": "5fb89b3d",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"(xgboost-example-ref)=\n",
"\n",
"# Training a model with distributed XGBoost\n",
"In this example we will train a model in Ray Train using distributed XGBoost."
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "53d57c1f",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"Let's start with installing our dependencies:"
]
},
{
"cell_type": "code",
"execution_count": 21,
"id": "41f20cc1",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"\n",
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m A new release of pip available: \u001b[0m\u001b[31;49m22.3.1\u001b[0m\u001b[39;49m -> \u001b[0m\u001b[32;49m23.1.2\u001b[0m\n",
"\u001b[1m[\u001b[0m\u001b[34;49mnotice\u001b[0m\u001b[1;39;49m]\u001b[0m\u001b[39;49m To update, run: \u001b[0m\u001b[32;49mpip install --upgrade pip\u001b[0m\n"
]
}
],
"source": [
"!pip install -qU \"ray[data,train]\" xgboost_ray"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "d2fe8d4a",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"Then we need some imports:"
]
},
{
"cell_type": "code",
"execution_count": 22,
"id": "7232303d",
"metadata": {},
"outputs": [],
"source": [
"from typing import Tuple\n",
"\n",
"import ray\n",
"from ray.data import Dataset, Preprocessor\n",
"from ray.data.preprocessors import StandardScaler\n",
"from ray.train.xgboost import XGBoostTrainer\n",
"from ray.train import Result, ScalingConfig\n",
"import xgboost"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "1c75b5ca",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"Next we define a function to load our train, validation, and test datasets."
]
},
{
"cell_type": "code",
"execution_count": 23,
"id": "37c4f38f",
"metadata": {},
"outputs": [],
"source": [
"def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:\n",
" dataset = ray.data.read_csv(\"s3://anonymous@air-example-data/breast_cancer.csv\")\n",
" train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)\n",
" test_dataset = valid_dataset.drop_columns([\"target\"])\n",
" return train_dataset, valid_dataset, test_dataset"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "9b2850dd",
"metadata": {
"pycharm": {
"name": "#%% md\n"
}
},
"source": [
"The following function will create a XGBoost trainer, train it, and return the result."
]
},
{
"cell_type": "code",
"execution_count": 24,
"id": "dae8998d",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"def train_xgboost(num_workers: int, use_gpu: bool = False) -> Result:\n",
" train_dataset, valid_dataset, _ = prepare_data()\n",
"\n",
" # Scale some random columns\n",
" columns_to_scale = [\"mean radius\", \"mean texture\"]\n",
" preprocessor = StandardScaler(columns=columns_to_scale)\n",
" train_dataset = preprocessor.fit_transform(train_dataset)\n",
" valid_dataset = preprocessor.transform(valid_dataset)\n",
"\n",
" # XGBoost specific params\n",
" params = {\n",
" \"tree_method\": \"approx\",\n",
" \"objective\": \"binary:logistic\",\n",
" \"eval_metric\": [\"logloss\", \"error\"],\n",
" }\n",
"\n",
" trainer = XGBoostTrainer(\n",
" scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),\n",
" label_column=\"target\",\n",
" params=params,\n",
" datasets={\"train\": train_dataset, \"valid\": valid_dataset},\n",
" num_boost_round=100,\n",
" metadata = {\"preprocessor_pkl\": preprocessor.serialize()}\n",
" )\n",
" result = trainer.fit()\n",
" print(result.metrics)\n",
"\n",
" return result"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "ce05af87",
"metadata": {},
"source": [
"Once we have the result, we can do batch inference on the obtained model. Let's define a utility function for this."
]
},
{
"cell_type": "code",
"execution_count": 25,
"id": "5b8076d3",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [],
"source": [
"import pandas as pd\n",
"from ray.train import Checkpoint\n",
"from ray.data import ActorPoolStrategy\n",
"\n",
"\n",
"class Predict:\n",
"\n",
" def __init__(self, checkpoint: Checkpoint):\n",
" self.model = XGBoostTrainer.get_model(checkpoint)\n",
" self.preprocessor = Preprocessor.deserialize(checkpoint.get_metadata()[\"preprocessor_pkl\"])\n",
"\n",
" def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:\n",
" preprocessed_batch = self.preprocessor.transform_batch(batch)\n",
" dmatrix = xgboost.DMatrix(preprocessed_batch)\n",
" return {\"predictions\": self.model.predict(dmatrix)}\n",
"\n",
"\n",
"def predict_xgboost(result: Result):\n",
" _, _, test_dataset = prepare_data()\n",
"\n",
" scores = test_dataset.map_batches(\n",
" Predict, \n",
" fn_constructor_args=[result.checkpoint], \n",
" compute=ActorPoolStrategy(), \n",
" batch_format=\"pandas\"\n",
" )\n",
" \n",
" predicted_labels = scores.map_batches(lambda df: (df > 0.5).astype(int), batch_format=\"pandas\")\n",
" print(f\"PREDICTED LABELS\")\n",
" predicted_labels.show()"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "7e172f66",
"metadata": {},
"source": [
"Now we can run the training:"
]
},
{
"cell_type": "code",
"execution_count": 26,
"id": "0f96d62b",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [
{
"data": {
"text/html": [
"
\n",
"
\n",
"
\n",
"
Tune Status
\n",
"
\n",
"\n",
"| Current time: | 2023-07-06 18:33:25 |
\n",
"| Running for: | 00:00:06.19 |
\n",
"| Memory: | 14.9/64.0 GiB |
\n",
"\n",
"
\n",
"
\n",
"
\n",
"
\n",
"
System Info
\n",
" Using FIFO scheduling algorithm.
Logical resource usage: 2.0/10 CPUs, 0/0 GPUs\n",
" \n",
" \n",
"
\n",
"
\n",
"
\n",
"
Trial Status
\n",
"
\n",
"\n",
"| Trial name | status | loc | iter | total time (s) | train-logloss | train-error | valid-logloss |
\n",
"\n",
"\n",
"| XGBoostTrainer_40fed_00000 | TERMINATED | 127.0.0.1:40725 | 101 | 4.90132 | 0.00587595 | 0 | 0.06215 |
\n",
"\n",
"
\n",
"
\n",
"
\n",
"\n"
],
"text/plain": [
""
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\u001b[2m\u001b[36m(XGBoostTrainer pid=40725)\u001b[0m The `preprocessor` arg to Trainer is deprecated. Apply preprocessor transformations ahead of time by calling `preprocessor.transform(ds)`. Support for the preprocessor arg will be dropped in a future release.\n",
"\u001b[2m\u001b[36m(XGBoostTrainer pid=40725)\u001b[0m Tip: Use `take_batch()` instead of `take() / show()` to return records in pandas or numpy batch format.\n",
"\u001b[2m\u001b[36m(XGBoostTrainer pid=40725)\u001b[0m Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Aggregate]\n",
"\u001b[2m\u001b[36m(XGBoostTrainer pid=40725)\u001b[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n",
"\u001b[2m\u001b[36m(XGBoostTrainer pid=40725)\u001b[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n",
"\n",
"\u001b[A\n",
"\u001b[A\n",
"\n",
"\u001b[A\u001b[A\n",
"\n",
"(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/14 [00:00, ?it/s]\n",
"\u001b[A\n",
"\u001b[A \n",
"\n",
"\u001b[A\u001b[A \n",
"\n",
"\n",
"\u001b[2m\u001b[36m(XGBoostTrainer pid=40725)\u001b[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(StandardScaler._transform_pandas)]\n",
"\n",
"\u001b[A\n",
"\n",
"(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/14 [00:01, ?it/s]\n",
"\u001b[A \n",
"\n",
"\u001b[A\u001b[A \n",
"\n",
"\n",
"\u001b[2m\u001b[36m(XGBoostTrainer pid=40725)\u001b[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n",
"\n",
"\u001b[A\n",
"\n",
"(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/14 [00:01, ?it/s]\n",
"\u001b[A \n",
"\n",
"\u001b[A\u001b[A \n",
"\n",
"\n",
"\u001b[2m\u001b[36m(XGBoostTrainer pid=40725)\u001b[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n",
"\n",
"\u001b[A\n",
"\n",
"(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/14 [00:01, ?it/s]\n",
"\u001b[A \n",
"\n",
"\u001b[A\u001b[A \n",
"\n",
"\n",
"\u001b[2m\u001b[36m(XGBoostTrainer pid=40725)\u001b[0m Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(StandardScaler._transform_pandas)]\n",
"\n",
"\u001b[A\n",
"\n",
"(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/14 [00:01, ?it/s]\n",
"\u001b[A \n",
"\n",
"\u001b[A\u001b[A \n",
"\n",
"\n",
"\u001b[2m\u001b[36m(XGBoostTrainer pid=40725)\u001b[0m Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n",
"\n",
"\u001b[A\n",
"\n",
"(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/14 [00:01, ?it/s]\n",
"\u001b[A \n",
"\n",
"\u001b[A\u001b[A \n",
"\n",
"\n",
"\u001b[2m\u001b[36m(XGBoostTrainer pid=40725)\u001b[0m Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n",
"\n",
"\u001b[A\n",
"\n",
"(pid=40725) Running: 0.0/10.0 CPU, 0.0/0.0 GPU, 0.0 MiB/512.0 MiB object_store_memory: 0%| | 0/14 [00:01, ?it/s]\n",
"\u001b[A\n",
"\n",
" \n",
"\u001b[A\n",
"\n",
"\u001b[A\u001b[A\n",
"\n",
"\u001b[2m\u001b[36m(_RemoteRayXGBoostActor pid=40741)\u001b[0m [18:33:23] task [xgboost.ray]:5022217360 got new rank 1 \n",
"2023-07-06 18:33:25,975\tINFO tune.py:1148 -- Total run time: 6.20 seconds (6.19 seconds for the tuning loop).\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'train-logloss': 0.00587594546605992, 'train-error': 0.0, 'valid-logloss': 0.06215000962556052, 'valid-error': 0.02941176470588235, 'time_this_iter_s': 0.0101318359375, 'should_checkpoint': True, 'done': True, 'training_iteration': 101, 'trial_id': '40fed_00000', 'date': '2023-07-06_18-33-25', 'timestamp': 1688693605, 'time_total_s': 4.901317834854126, 'pid': 40725, 'hostname': 'Balajis-MacBook-Pro-16', 'node_ip': '127.0.0.1', 'config': {}, 'time_since_restore': 4.901317834854126, 'iterations_since_restore': 101, 'experiment_tag': '0'}\n"
]
}
],
"source": [
"result = train_xgboost(num_workers=2, use_gpu=False)"
]
},
{
"attachments": {},
"cell_type": "markdown",
"id": "7055ad1b",
"metadata": {},
"source": [
"And perform inference on the obtained model:"
]
},
{
"cell_type": "code",
"execution_count": 27,
"id": "283b1dba",
"metadata": {
"pycharm": {
"name": "#%%\n"
}
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"2023-07-06 18:33:27,259\tINFO read_api.py:374 -- To satisfy the requested parallelism of 20, each read task output will be split into 20 smaller blocks.\n",
"2023-07-06 18:33:28,112\tINFO streaming_executor.py:92 -- Executing DAG InputDataBuffer[Input] -> ActorPoolMapOperator[MapBatches()->MapBatches(Predict)] -> TaskPoolMapOperator[MapBatches()]\n",
"2023-07-06 18:33:28,112\tINFO streaming_executor.py:93 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)\n",
"2023-07-06 18:33:28,114\tINFO streaming_executor.py:95 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`\n",
"2023-07-06 18:33:28,150\tINFO actor_pool_map_operator.py:117 -- MapBatches()->MapBatches(Predict): Waiting for 1 pool actors to start...\n"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"PREDICTED LABELS\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" "
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 0}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 0}\n",
"{'predictions': 1}\n",
"{'predictions': 0}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 0}\n",
"{'predictions': 0}\n",
"{'predictions': 1}\n",
"{'predictions': 1}\n",
"{'predictions': 0}\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\r"
]
}
],
"source": [
"predict_xgboost(result)"
]
}
],
"metadata": {
"jupytext": {
"cell_metadata_filter": "-all",
"main_language": "python",
"notebook_metadata_filter": "-all"
},
"kernelspec": {
"display_name": "Python 3.8.10 ('venv': venv)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.2"
},
"orphan": true,
"vscode": {
"interpreter": {
"hash": "3c0d54d489a08ae47a06eae2fd00ff032d6cddb527c382959b7b2575f6a8167f"
}
}
},
"nbformat": 4,
"nbformat_minor": 5
}