原生 Kafka 流处理(>=1.2.0, alpha)¶
Seldon 从 1.2 版开始提供本地 kafka 集成。当您在 SeldonDeployment 中指定时 serverType: kafka。
当指定 serverType: kafka 还需要在 svcOrchSpec 设置 KAFKA_BROKER、KAFKA_INPUT_TOPIC、KAFKA_OUTPUT_TOPIC 环境变量。下面显示了 Tensorflow CIFAR10 模型的示例:
apiVersion: machinelearning.seldon.io/v1
kind: SeldonDeployment
metadata:
name: tfserving-cifar10
spec:
protocol: tensorflow
transport: rest
serverType: kafka
predictors:
- componentSpecs:
- spec:
containers:
- args:
- --port=8500
- --rest_api_port=8501
- --model_name=resnet32
- --model_base_path=gs://seldon-models/tfserving/cifar10/resnet32
- --enable_batching
image: tensorflow/serving
name: resnet32
ports:
- containerPort: 8501
name: http
svcOrchSpec:
env:
- name: KAFKA_BROKER
value: 10.12.10.16:9094
- name: KAFKA_INPUT_TOPIC
value: cifar10-rest-input
- name: KAFKA_OUTPUT_TOPIC
value: cifar10-rest-output
graph:
name: resnet32
type: MODEL
endpoint:
service_port: 8501
name: model
replicas: 1
以上使用 tensorflow 协议创建了一个 REST tensorflow 部署,并连接到输入和输出主题。
细节¶
对于 SeldonDeployment:
从任何 Seldon 推理图开始
设置
spec.serverType为kafka添加
spec.predictor[].svcOrchSpec.env为 KAFKA_BROKER, KAFKA_INPUT_TOPIC, KAFKA_OUTPUT_TOPIC。
对于输入 kafka 主题:
为指定协议和传输的输入预测创建请求流。
对于 REST: 给定预测请求协议中的 JSON 表示。
对于 gRPC:给定协议的请求的 protobuffer 二进制序列化。 You should also add a metadata field called
proto-namewith the package name of the protobuffer so it can be decoded, for exampletensorflow.serving.PredictRequest. We can only support proto buffers for native grpc protocols supported by Seldon.
TLS 设置¶
要允许消费者和生产者与 Kafka 建立 TLS 连接,请在服务编排环节使用以下环境变量:
设置 KAFKA_SECURITY_PROTOCOL 为 “ssl”
如果您有密钥和证书的值,请使用:
KAFKA_SSL_CA_CERT
KAFKA_SSL_CLIENT_CERT
KAFKA_SSL_CLIENT_KEY
如果您有证书的文件位置,请使用:
KAFKA_SSL_CA_CERT_FILE
KAFKA_SSL_CLIENT_CERT_FILE
KAFKA_SSL_CLIENT_KEY_FILE
如果您的密钥受密码保护,则添加
KAFKA_SSL_CLIENT_KEY_PASS (可选)
下面显示了一个从 screts 获取值的示例规范,来自 Kafka KEDA 演示.
svcOrchSpec:
env:
- name: KAFKA_BROKER
value: <nodepot>:9093
- name: KAFKA_INPUT_TOPIC
value: cifar10-rest-input
- name: KAFKA_OUTPUT_TOPIC
value: cifar10-rest-output
- name: KAFKA_SECURITY_PROTOCOL
value: ssl
- name: KAFKA_SSL_CA_CERT
valueFrom:
secretKeyRef:
name: seldon-cluster-ca-cert
key: ca.crt
- name: KAFKA_SSL_CLIENT_CERT
valueFrom:
secretKeyRef:
name: seldon-user
key: user.crt
- name: KAFKA_SSL_CLIENT_KEY
valueFrom:
secretKeyRef:
name: seldon-user
key: user.key
- name: KAFKA_SSL_CLIENT_KEY_PASS
valueFrom:
secretKeyRef:
name: seldon-user
key: user.password
KEDA 缩放¶
KEDA 可用于缩放 Kafka SeldonDeployments 通过查看消费者滞后来扩展。
kedaSpec:
pollingInterval: 15
minReplicaCount: 1
maxReplicaCount: 2
triggers:
- type: kafka
metadata:
bootstrapServers: <nodeport>:9093
consumerGroup: model.tfserving-cifar10.kafka
lagThreshold: "50"
topic: cifar10-rest-input
offsetResetPolicy: latest
#authMode: sasl_ssl (for latest KEDA - not released yet)
authenticationRef:
name: seldon-kafka-auth
在上面我们有:
定义启动服务通过
bootstrapServer连接 KEDA通过
consumerGroup定义消费组设置
lagThreshold进行扩展通过
topic监控特定主题通过 AuthenticanTrigger
authenticationRef设置 TLS 认证
我们使用的认证触发器从密钥中提取 TLS 明细,例如:
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: seldon-kafka-auth
namespace: kafka
spec:
secretTargetRef:
- parameter: tls
name: keda-enable-tls
key: tls
- parameter: ca
name: seldon-cluster-ca-cert
key: ca.crt
- parameter: cert
name: seldon-user
key: user.crt
- parameter: key
name: seldon-user
key: user.key
可工作示例在此处。