from functools import partial
import os
import pandas as pd
import pyarrow
from typing import Optional, Union
from ray.air.result import Result
from ray.train._internal.storage import _use_storage_context
from ray.cloudpickle import cloudpickle
from ray.exceptions import RayTaskError
from ray.tune.analysis import ExperimentAnalysis
from ray.tune.error import TuneError
from ray.tune.experiment import Trial
from ray.tune.trainable.util import TrainableUtil
from ray.util import PublicAPI
[docs]@PublicAPI(stability="beta")
class ResultGrid:
"""A set of ``Result`` objects for interacting with Ray Tune results.
You can use it to inspect the trials and obtain the best result.
The constructor is a private API. This object can only be created as a result of
``Tuner.fit()``.
Example:
.. testcode::
import random
from ray import train, tune
def random_error_trainable(config):
if random.random() < 0.5:
return {"loss": 0.0}
else:
raise ValueError("This is an error")
tuner = tune.Tuner(
random_error_trainable,
run_config=train.RunConfig(name="example-experiment"),
tune_config=tune.TuneConfig(num_samples=10),
)
try:
result_grid = tuner.fit()
except ValueError:
pass
for i in range(len(result_grid)):
result = result_grid[i]
if not result.error:
print(f"Trial finishes successfully with metrics"
f"{result.metrics}.")
else:
print(f"Trial failed with error {result.error}.")
.. testoutput::
:hide:
...
You can also use ``result_grid`` for more advanced analysis.
>>> # Get the best result based on a particular metric.
>>> best_result = result_grid.get_best_result( # doctest: +SKIP
... metric="loss", mode="min")
>>> # Get the best checkpoint corresponding to the best result.
>>> best_checkpoint = best_result.checkpoint # doctest: +SKIP
>>> # Get a dataframe for the last reported results of all of the trials
>>> df = result_grid.get_dataframe() # doctest: +SKIP
>>> # Get a dataframe for the minimum loss seen for each trial
>>> df = result_grid.get_dataframe(metric="loss", mode="min") # doctest: +SKIP
Note that trials of all statuses are included in the final result grid.
If a trial is not in terminated state, its latest result and checkpoint as
seen by Tune will be provided.
See :doc:`/tune/examples/tune_analyze_results` for more usage examples.
"""
def __init__(
self,
experiment_analysis: ExperimentAnalysis,
):
self._experiment_analysis = experiment_analysis
self._results = [
self._trial_to_result(trial) for trial in self._experiment_analysis.trials
]
@property
def experiment_path(self) -> str:
"""Path pointing to the experiment directory on persistent storage.
This can point to a remote storage location (e.g. S3) or to a local
location (path on the head node)."""
return self._experiment_analysis.experiment_path
@property
def filesystem(self) -> pyarrow.fs.FileSystem:
"""Return the filesystem that can be used to access the experiment path.
Returns:
pyarrow.fs.FileSystem implementation.
"""
return self._experiment_analysis._fs
[docs] def get_best_result(
self,
metric: Optional[str] = None,
mode: Optional[str] = None,
scope: str = "last",
filter_nan_and_inf: bool = True,
) -> Result:
"""Get the best result from all the trials run.
Args:
metric: Key for trial info to order on. Defaults to
the metric specified in your Tuner's ``TuneConfig``.
mode: One of [min, max]. Defaults to the mode specified
in your Tuner's ``TuneConfig``.
scope: One of [all, last, avg, last-5-avg, last-10-avg].
If `scope=last`, only look at each trial's final step for
`metric`, and compare across trials based on `mode=[min,max]`.
If `scope=avg`, consider the simple average over all steps
for `metric` and compare across trials based on
`mode=[min,max]`. If `scope=last-5-avg` or `scope=last-10-avg`,
consider the simple average over the last 5 or 10 steps for
`metric` and compare across trials based on `mode=[min,max]`.
If `scope=all`, find each trial's min/max score for `metric`
based on `mode`, and compare trials based on `mode=[min,max]`.
filter_nan_and_inf: If True (default), NaN or infinite
values are disregarded and these trials are never selected as
the best trial.
"""
if len(self._experiment_analysis.trials) == 1:
return self._trial_to_result(self._experiment_analysis.trials[0])
if not metric and not self._experiment_analysis.default_metric:
raise ValueError(
"No metric is provided. Either pass in a `metric` arg to "
"`get_best_result` or specify a metric in the "
"`TuneConfig` of your `Tuner`."
)
if not mode and not self._experiment_analysis.default_mode:
raise ValueError(
"No mode is provided. Either pass in a `mode` arg to "
"`get_best_result` or specify a mode in the "
"`TuneConfig` of your `Tuner`."
)
best_trial = self._experiment_analysis.get_best_trial(
metric=metric,
mode=mode,
scope=scope,
filter_nan_and_inf=filter_nan_and_inf,
)
if not best_trial:
error_msg = (
"No best trial found for the given metric: "
f"{metric or self._experiment_analysis.default_metric}. "
"This means that no trial has reported this metric"
)
error_msg += (
", or all values reported for this metric are NaN. To not ignore NaN "
"values, you can set the `filter_nan_and_inf` arg to False."
if filter_nan_and_inf
else "."
)
raise RuntimeError(error_msg)
return self._trial_to_result(best_trial)
[docs] def get_dataframe(
self,
filter_metric: Optional[str] = None,
filter_mode: Optional[str] = None,
) -> pd.DataFrame:
"""Return dataframe of all trials with their configs and reported results.
Per default, this returns the last reported results for each trial.
If ``filter_metric`` and ``filter_mode`` are set, the results from each
trial are filtered for this metric and mode. For example, if
``filter_metric="some_metric"`` and ``filter_mode="max"``, for each trial,
every received result is checked, and the one where ``some_metric`` is
maximal is returned.
Example:
.. testcode::
from ray import train
from ray.train import RunConfig
from ray.tune import Tuner
def training_loop_per_worker(config):
train.report({"accuracy": 0.8})
result_grid = Tuner(
trainable=training_loop_per_worker,
run_config=RunConfig(name="my_tune_run")
).fit()
# Get last reported results per trial
df = result_grid.get_dataframe()
# Get best ever reported accuracy per trial
df = result_grid.get_dataframe(
filter_metric="accuracy", filter_mode="max"
)
.. testoutput::
:hide:
...
Args:
filter_metric: Metric to filter best result for.
filter_mode: If ``filter_metric`` is given, one of ``["min", "max"]``
to specify if we should find the minimum or maximum result.
Returns:
Pandas DataFrame with each trial as a row and their results as columns.
"""
return self._experiment_analysis.dataframe(
metric=filter_metric, mode=filter_mode
)
def __len__(self) -> int:
return len(self._results)
def __getitem__(self, i: int) -> Result:
"""Returns the i'th result in the grid."""
return self._results[i]
@property
def errors(self):
"""Returns the exceptions of errored trials."""
return [result.error for result in self if result.error]
@property
def num_errors(self):
"""Returns the number of errored trials."""
return len(
[t for t in self._experiment_analysis.trials if t.status == Trial.ERROR]
)
@property
def num_terminated(self):
"""Returns the number of terminated (but not errored) trials."""
return len(
[
t
for t in self._experiment_analysis.trials
if t.status == Trial.TERMINATED
]
)
@staticmethod
def _populate_exception(trial: Trial) -> Optional[Union[TuneError, RayTaskError]]:
if trial.status == Trial.TERMINATED:
return None
if trial.pickled_error_file and os.path.exists(trial.pickled_error_file):
with open(trial.pickled_error_file, "rb") as f:
e = cloudpickle.load(f)
return e
elif trial.error_file and os.path.exists(trial.error_file):
with open(trial.error_file, "r") as f:
return TuneError(f.read())
return None
def _trial_to_result(self, trial: Trial) -> Result:
if _use_storage_context():
from ray.train._internal.checkpoint_manager import (
_CheckpointManager as _NewCheckpointManager,
)
cpm = trial.run_metadata.checkpoint_manager
assert isinstance(cpm, _NewCheckpointManager)
checkpoint = None
if cpm.latest_checkpoint_result:
checkpoint = cpm.latest_checkpoint_result.checkpoint
best_checkpoint_results = cpm.best_checkpoint_results
best_checkpoints = [
(checkpoint_result.checkpoint, checkpoint_result.metrics)
for checkpoint_result in best_checkpoint_results
]
else:
local_to_remote_path_fn = (
partial(
TrainableUtil.get_remote_storage_path,
local_path_prefix=trial.local_path,
remote_path_prefix=trial.remote_path,
)
if trial.uses_cloud_checkpointing
else None
)
checkpoint = trial.checkpoint.to_air_checkpoint(
local_to_remote_path_fn,
)
best_checkpoints = [
(
checkpoint.to_air_checkpoint(local_to_remote_path_fn),
checkpoint.metrics,
)
for checkpoint in trial.get_trial_checkpoints()
]
if _use_storage_context():
metrics_df = self._experiment_analysis.trial_dataframes.get(trial.trial_id)
else:
metrics_df = self._experiment_analysis.trial_dataframes.get(
trial.local_path
)
result = Result(
checkpoint=checkpoint,
metrics=trial.last_result.copy(),
error=self._populate_exception(trial),
_local_path=trial.local_path,
_remote_path=trial.remote_path,
_storage_filesystem=(
self._experiment_analysis._fs
if isinstance(self._experiment_analysis, ExperimentAnalysis)
else None
),
metrics_dataframe=metrics_df,
best_checkpoints=best_checkpoints,
)
return result
def __repr__(self) -> str:
all_results_repr = [result._repr(indent=2) for result in self]
all_results_repr = ",\n".join(all_results_repr)
return f"ResultGrid<[\n{all_results_repr}\n]>"