Seldon Python 组件

要创建在 Seldon 下运行的组件,您应该创建一个类来实现您正在创建的组件类型所需的签名。

模型

要封装您的机器学习模型,请创建一个具有以下签名的 predict 方法的类:

def predict(self, X: np.ndarray, names: Iterable[str], meta: Dict = None) -> Union[np.ndarray, List, str, bytes]:

您的 predict 方法将接收一个 numpy 数组 X 其中包含一组可迭代的列名(如果它们存在于输入特征中)和可选的元数据字典。它应该将预测结果返回为:

  • Numpy 数组

  • 值列表

  • 字符串或字节

一个简单的例子如下所示:

class MyModel(object):
    """
    Model template. 你可以在运行时从 __init__ 从本地加载模型参数
    """

    def __init__(self):
        """
        Add any initialization parameters. These will be passed at runtime from the graph definition parameters defined in your seldondeployment kubernetes resource manifest.
        添加任意初始化参数。在运行时将从kubernetes Sedlondeployment 资源定义的图中传入定义好的参数。
        """
        print("Initializing")

    def predict(self, X, features_names=None):
        """
        Return a prediction.

        Parameters
        ----------
        X : array-like
        feature_names : array of feature names (optional)
        """
        print("Predict called - will run identity function")
        return X

返回类名

您还可以提供一种方法来返回用于预测的列名称,class_names 其签名方法如下所示:

def class_names(self) -> Iterable[str]:

示例

你可以参考各种笔记本示例

转换器

Seldon Core 允许您创建组件以在输入请求方向(输入转换器)或输出响应方向(输出转换器)上转换特征。对于这些组件,使用以下签名创建方法:

def transform_input(self, X: np.ndarray, names: Iterable[str], meta: Dict = None) -> Union[np.ndarray, List, str, bytes]:

def transform_output(self, X: np.ndarray, names: Iterable[str], meta: Dict = None) -> Union[np.ndarray, List, str, bytes]:

合并器

Seldon Core 允许您创建将来自多个模型的响应组合成单个响应的组件。要为此创建一个类,请添加一个带有以下签名的方法:

def aggregate(self, features_list: List[Union[np.ndarray, str, bytes]], feature_names_list: List) -> Union[np.ndarray, List, str, bytes]:

下面显示了一个对一组响应求平均值的简单示例:

import numpy as np
logger = logging.getLogger(__name__)

class ImageNetCombiner(object):

    def aggregate(self, Xs, features_names):
        return (np.reshape(Xs[0],(1,-1)) + np.reshape(Xs[1], (1,-1)))/2.0

路由器

路由器提供将请求定向到一组子组件之一的功能。为此,您应该创建一个带有签名的方法,如下所示,该方法返回请求需要路由到的子组件的 idid 是连接子路由的索引。

def route(self, features: Union[np.ndarray, str, bytes], feature_names: Iterable[str]) -> int:

要查看这方面的示例,您可以遵循作为 Seldon Core 部分的各种示例路由

添加自定义指标

要返回与调用关联的指标,请创建一个带有签名的方法,如下所示:

def metrics(self) -> List[Dict]:

此方法应返回自定义指标文档中所述的指标字典。

下面是一个说明性示例:

class ModelWithMetrics(object):

    def __init__(self):
        print("Initialising")

    def predict(self,X,features_names):
        print("Predict called")
        return X

    def metrics(self):
        return [
            {"type": "COUNTER", "key": "mycounter", "value": 1}, # a counter which will increase by the given value
            {"type": "GAUGE", "key": "mygauge", "value": 100},   # a gauge which will be set to given value
            {"type": "TIMER", "key": "mytimer", "value": 20.2},  # a timer which will add sum and count metrics - assumed millisecs
        ]

注意:在 Seldon Core 1.1 之前,自定义指标始终返回给客户端。从 SC 1.1 开始,您可以将 INCLUDE_METRICS_IN_CLIENT_RESPONSE 环境变量设置为 truefalse 来控制此行为。尽管这个环境变量有价值,自定义指标将始终暴露给 Prometheus。

在 Seldon Core 1.1.0 之前,未实现在 info 等级的预估调用记录自定义指标日志信息。从 Seldon Core 1.1.0 开始,可在 debug 级别记录。要抑制此警告,请实现一个返回空列表的指标函数:

def metrics(self):
    return []

返回标签

如果希望向返回的元数据添加任意标签,可以提供一个 tags 签名方法,如下所示:

def tags(self) -> Dict:

个简单的例子如下所示:

class ModelWithTags(object):

    def predict(self,X,features_names):
        return X

    def tags(self,X):
        return {"system":"production"}

运行时指标和标签

从 SC 1.3 开始 metricstags 也可以定义 predicttransform_inputtransform_outputsend_feedbackrouteaggregate 输出。

这是线程安全的。

from seldon_core.user_model import SeldonResponse


class Model:
    def predict(self, features, names=[], meta={}):
        runtime_metrics = {"type": "COUNTER", "key": "instance_counter", "value": len(X)},
        runtime_tags = {"runtime": "tag", "shared": "right one"}
        return SeldonResponse(data=X, metrics=runtime_metrics, tags=runtime_tags)

    def metrics(self):
        return [{"type": "COUNTER", "key": "requests_counter", "value": 1}]

    def tags(self):
        return {"static": "tag", "shared": "not right one"}

主义 tagsmetrics 有限通过 SeldonResponse 定义。 在上面的示例中,返回的标签将是:

{"runtime":"tag", "shared":"right one", "static":"tag"}

REST 健康点

如果您想添加 REST 健康点,您可以实现 health_status 签名方法,如下所示:

def health_status(self) -> Union[np.ndarray, List, str, bytes]:

您可以使用它来验证您的服务是否可以在构建 docker 镜像后正常响应 HTTP 调用,也可以用作 kubernetes 活跃度和就可读探测器来验证您的模型是否健康。

一个简单的例子如下所示:

class ModelWithHealthEndpoint(object):
    def predict(self, X, features_names):
        return X

    def health_status(self):
        response = self.predict([1, 2], ["f1", "f2"])
        assert len(response) == 2, "health check returning bad predictions" # or some other simple validation
        return response

当您用于 seldon-core-microservice 启动 HTTP 服务器时, 您可以通过检查 /health/status 端点来验证模型是否已启动并正在运行:

$ curl localhost:5000/health/status
{"data":{"names":[],"tensor":{"shape":[2],"values":[1,2]}},"meta":{}}

此外,你可以使用 /health/ping 端点 来实现一个检查 HTTP 服务已启动的轻量级调用:

$ curl localhost:5000/health/ping
pong%

您还可以通过将 HTTP 运行状况端点添加到 SeldonDeployment YAML 中来覆盖默认的活跃度和可读探测。 您可以修改探针的参数以满足您的可靠性需求,而不会对容器施加太大压力。在 kubernetes 文档中阅读有关这些探测器的更多信息。 一个例子如下所示:

apiVersion: machinelearning.seldon.io/v1alpha2
kind: SeldonDeployment
spec:
  name: my-app
  predictors:
  - componentSpecs:
    - spec:
        containers:
        - image: my-app-image:version
          name: classifier
          livenessProbe:
            failureThreshold: 3
            initialDelaySeconds: 60
            periodSeconds: 5
            successThreshold: 1
            httpGet:
              path: /health/status
              port: http
              scheme: HTTP
            timeoutSeconds: 1
          readinessProbe:
            failureThreshold: 3
            initialDelaySeconds: 20
            periodSeconds: 5
            successThreshold: 1
            httpGet:
              path: /health/status
              port: http
              scheme: HTTP
            timeoutSeconds: 1

However, note if executor.fullHealthChecks is set to true then the Seldon orchestrator will call your health status method to check the model is ready.

低级别方法

如果想要更多的控制,可以提供一个低级方法将原始 proto 缓冲区有效负载作为输入。这些签名如下所示在 seldon_core>=0.2.6.1 发布:

def predict_raw(self, msg: Union[Dict, prediction_pb2.SeldonMessage]) -> prediction_pb2.SeldonMessage:

def send_feedback_raw(self, feedback: prediction_pb2.Feedback) -> prediction_pb2.SeldonMessage:

def transform_input_raw(self, msg: Union[Dict, prediction_pb2.SeldonMessage]) -> prediction_pb2.SeldonMessage:

def transform_output_raw(self, msg: Union[Dict, prediction_pb2.SeldonMessage]) -> prediction_pb2.SeldonMessage:

def route_raw(self, msg: prediction_pb2.SeldonMessage) -> prediction_pb2.SeldonMessage:

def aggregate_raw(self, msgs: prediction_pb2.SeldonMessageList) -> prediction_pb2.SeldonMessage:

def health_status_raw(self) -> prediction_pb2.SeldonMessage:

用户定义的异常

如果要处理自定义异常,请定义如下所示 model_error_handler 方法:

model_error_handler = flask.Blueprint('error_handlers', __name__)

一个例子如下:

"""
Model Template
"""
class MyModel(Object):

    """
    The field is used to register custom exceptions
    """
    model_error_handler = flask.Blueprint('error_handlers', __name__)

    """
    Register the handler for an exception
    """
    @model_error_handler.app_errorhandler(UserCustomException)
    def handleCustomError(error):
        response = jsonify(error.to_dict())
        response.status_code = error.status_code
        return response

    def __init__(self, metrics_ok=True, ret_nparray=False, ret_meta=False):
        pass

    def predict(self, X, features_names, **kwargs):
        raise UserCustomException('Test-Error-Msg',1402,402)
        return X
"""
User Defined Exception
"""
class UserCustomException(Exception):

    status_code = 404

    def __init__(self, message, application_error_code,http_status_code):
        Exception.__init__(self)
        self.message = message
        if http_status_code is not None:
            self.status_code = http_status_code
        self.application_error_code = application_error_code

    def to_dict(self):
        rv = {"status": {"status": self.status_code, "message": self.message,
                         "app_code": self.application_error_code}}
        return rv

多值 numpy 数组

默认的,当使用 data ndarray 参数,转换为 ndarray(默认情况下)会将所有内部类型转换为相同类型。对于模型需要转换为不同类型值作为的输入数组时,可通过 predict_raw 覆盖函数来实现此目的,该函数可以访问原始请求,并按如下方式创建 numpy 数组:

import numpy as np

class Model:
    def predict_raw(self, request):
        data = request.get("data", {}).get("ndarray")
        if data:
            mult_types_array = np.array(data, dtype=object)

        # Handle other data types as required + your logic

Gunicorn 和负载

如果封装的 python 类在 Gunicorn 下提供服务, 那么作为每个 gunicorn worker 初始化的一部分,load 方法 将在您的类上被调用(如果它有的话)。 您应该使用此方法来加载和初始化您的模型。 这对于需要在每个工作进程中创建会话的 Tensorflow 模型 很重要。 该 Tensorflow MNIST 例子做到这一点。

import tensorflow as tf
import numpy as np
import os

class DeepMnist(object):
    def __init__(self):
        self.loaded = False
        self.class_names = ["class:{}".format(str(i)) for i in range(10)]

    def load(self):
        print("Loading model",os.getpid())
        self.sess = tf.Session()
        saver = tf.train.import_meta_graph("model/deep_mnist_model.meta")
        saver.restore(self.sess,tf.train.latest_checkpoint("./model/"))
        graph = tf.get_default_graph()
        self.x = graph.get_tensor_by_name("x:0")
        self.y = graph.get_tensor_by_name("y:0")
        self.loaded = True
        print("Loaded model")

    def predict(self,X,feature_names):
        if not self.loaded:
            self.load()
        predictions = self.sess.run(self.y,feed_dict={self.x:X})
        return predictions.astype(np.float64)

整数

Python 中的 json 包将没有小数部分的数字解析为整数。 因此, 只包含没有小数部分的数字的张量将被解析为整数张量。

为了说明上述情况,我们可以考虑以下示例:

{
  "data": {
    "ndarray": [0, 1, 2, 3]
  }
}

默认情况下,该 json 包会将 data.ndarray 字段中的数组解析为 Python Integer 值数组。 由于没有浮点值,因此 numpy 将创建一个带有 dtype = np.int32 的张量。

如果我们想强制不同的行为,我们可以使用底层 predict_raw() 方法来控制输入有效负载的反序列化。 作为一个例子,使用上面的例子, 我们可以通过使用 dtype = np.float64 实现 predict_raw() 方法来强制结果张量 :

import numpy as np

class Model:
    def predict_raw(self, request):
        data = request.get("data", {}).get("ndarray")
        if data:
            float_array = np.array(data, dtype=np.float64)

        # Make predictions using float_array

孵化功能

REST 元数据端点

python 封装器将自动公开一个 /metadata 端点以返回有关加载模型的元数据。 开发人员在他们的类中实现一个 metadata 方法来提供包含模型元数据的返回的 dict

有关更多详细信息,请参阅元数据文档

示例格式:

class Model:
    ...

    def init_metadata(self):

        meta = {
            "name": "model-name",
            "versions": ["model-version"],
            "platform": "platform-name",
            "inputs": [{"name": "input", "datatype": "BYTES", "shape": [1]}],
            "outputs": [{"name": "output", "datatype": "BYTES", "shape": [1]}],
        }

        return meta

验证

输出开发者定义的 metadata 方法将按照 kfserving dataplane 提案 协议进行验证,查看 Github issue 详情:

$metadata_model_response =
{
  "name" : $string,
  "versions" : [ $string, ... ], // optional
  "platform" : $string,
  "inputs" : [ $metadata_tensor, ... ],
  "outputs" : [ $metadata_tensor, ... ]
}

以及

$metadata_tensor =
{
  "name" : $string,
  "datatype" : $string,
  "shape" : [ $number, ... ]
}

如果验证失败,服务器将在请求 metadata 时回复 500 响应 MICROSERVICE_BAD_METADATA

例子

下一步

创建组件之后,需要创建一个可被 Seldon Core 管理的 Docker 镜像。参考 s2iDocker 文档来完成。