Bootstrap

Seldon 使用 (三): 模型服务如何运行

当SeldonDeployment服务部署完成后,将会在kubernetes集群节点上创建服务容器,而服务容器内将会启动seldon-core-microservices进程。该进程负责加载模型,并对外提供http服务。接下来,我们将介绍seldon-core-microservices的启动过程及它所提供的相关http服务。

1 seldon-core-microservices启动过程

通过如下命令进入模型服务容器

kubectl exec -it seldon-model-example-0-classifier-xxxx bash

查看相关进程,如下:

/usr/local/bin/seldon-core-microservice MyModel REST --service-type MODEL --persistence 0

这与Dockerfile文件定义的cmd一致,seldon-core-microservice脚本负责启动服务。

查看该脚本文件内容,如下:

#!/usr/local/bin/python
# EASY-INSTALL-ENTRY-SCRIPT: 'seldon-core','console_scripts','seldon-core-microservice'
__requires__ = 'seldon-core'
import re
import sys
from pkg_resources import load_entry_point

if __name__ == '__main__':
    sys.argv[0] = re.sub(r'(-script\.pyw?|\.exe)?$', '', sys.argv[0])
    sys.exit(
        load_entry_point('seldon-core', 'console_scripts', 'seldon-core-microservice')()
    )

即这是seldon-core这个python包安装的启动脚本。启动时,它会执行console_scripts中定义的seldon-core-microservice所对应的方法。

接下来我们需要分析seldon-core源码包,看看启动脚本的执行内容。

按图索骥,我们找到安装文件seldon-core/python/setup.py,其中相关内容如下:

from setuptools import find_packages, setup
setup(
    entry_points={
        "console_scripts": [
            "seldon-core-microservice = seldon_core.microservice:main",
            "seldon-batch-processor = seldon_core.batch_processor:run_cli",
        ]
    },
)

由此可见,seldon-core-microservice所对应的是seldon_core/microservice.py模块的main()方法。

查看seldon-core/python/seldon_core/microservice.py源码文件,其中main()方法主要执行如下内容:

def main():
    # 1定义命令行参数
    parser = argparse.ArgumentParser()
    parser.add_argument("interface_name", type=str, help="Name of the user interface.")
    parser.add_argument(
        "--service-type",
        type=str,
        choices=["MODEL", "ROUTER", "TRANSFORMER", "COMBINER", "OUTLIER_DETECTOR"],
        default="MODEL",
    )
    parser.add_argument("--persistence", nargs="?", default=0, const=1, type=int)
    parser.add_argument(
        "--parameters", type=str, default=os.environ.get(PARAMETERS_ENV_NAME, "[]")
    )
    # 1.1解析命令行参数
    args, remaining = parser.parse_known_args()
    
    # 2加载模型
    interface_file = importlib.import_module(args.interface_name)
    user_class = getattr(interface_file, args.interface_name)
    if args.persistence:
        user_object = persistence.restore(user_class, parameters)
        persistence.persist(user_object, parameters.get("push_frequency"))
    else:
        user_object = user_class(**parameters)
    
    # 3定义rest服务
    def rest_prediction_server():
        # rest服务参数:ip、端口、进程数,超时时间,pid文件等
        options = {}
        app = seldon_microservice.get_rest_microservice(user_object, seldon_metrics)
        UserModelApplication(
            app,
            user_object,
            jaeger_extra_tags,
            args.interface_name,
            options=options,
        ).run()
    server1_func = rest_prediction_server
    
    # 4使用multiprocessing启动rest进程
    start_servers(server1_func, server2_func, server3_func, metrics_server_func)

如代码所示,启动过程很清晰简洁的,主要包含如下4个步骤:

  • 定义及解析命令行参数

  • 加载模型

  • 定义Rest服务

  • 启动进程

注:

  • --service-type参数,当前代码中并未使用到。该参数中声明的几种类型,在后面定义的rest服务有相对应的api接口。因暂未使用到,本文不展开讨论。

  • --persistence参数,如果开启,则会启动一个后台进程,定期地将user_model使用pickle的方式序列化并保存在redis中。后续启动时可以从redis中获取pickle数据并加载。因非核心功能,本文不展开讨论。

2 如何加载模型

首先我们看未使用persistence的模型加载方式。主要代码如下:

interface_file = importlib.import_module(args.interface_name)
user_class = getattr(interface_file, args.interface_name)
user_object = user_class(**parameters) 

使用importlib.import_module()加载模块,模块名称是由命令行参数指定的interface_name(在本例中就是MyModel)。加载完成后,从该模块获取属性名称interface_name(即MyModel)的类(class)。最后使用该类(即MyModel)创建模型对象(user_object)。

这样就完成了模型加载及初始化。通常在模型定义类(class)的__init__()方法负责创建模型,并加载模型的权重参数(load)。加载权重参数的另一个可选方式,是将其定义在模型类的load()方法内。

在Rest服务定义时,创建的UserModelApplication(继承gunicorn的BaseApplication)在启动时会调用其load()方法,而该方法会调用模型类的load()方法。其内容如下:

class UserModelApplication(StandaloneApplication):
    def load(self):
        try:
            self.user_object.load()
        except (NotImplementedError, AttributeError):
            logger.debug("No load method in user model")
        return self.application

3 Rest服务如何处理predict请求

由上文可知,Rest服务是由get_rest_microservice()定义的,其主要内容如下:

def get_rest_microservice(user_model, seldon_metrics):
    app = Flask(__name__, static_url_path="")
    
    @app.route("/predict", methods=["GET", "POST"])
    @app.route("/api/v1.0/predictions", methods=["POST"])
    @app.route("/api/v0.1/predictions", methods=["POST"])
    def Predict():
        requestJson = get_request(skip_decoding=PAYLOAD_PASSTHROUGH)
        response = seldon_core.seldon_methods.predict(
            user_model, requestJson, seldon_metrics
        )

        json_response = jsonify(response, skip_encoding=PAYLOAD_PASSTHROUGH)
        return json_response
    # 其他api接口定义,如/health, /route, /aggregate等
    return app

这是Flask处理Rest请求的基本过程:

  • 从http请求body序列化json数据requestJson

  • 使用requestJson请求数据,调用user_model进行推理,得到response

  • 返回将response序列化成json数据并返回

那么seldon_methods.predict()如何调用user_model进行推理呢?继续查看代码,predict()方法会先后检查模型类(class)是否实现predict_rest(), predict_grpc(), predict_raw()方法。若都没有,则调用默认实现模型类(class)的predict()方法,这里仅介绍默认情况,如下:

def predict(
    user_model: Any,
    request: Union[prediction_pb2.SeldonMessage, List, Dict, bytes],
    seldon_metrics: SeldonMetrics,
) -> Union[prediction_pb2.SeldonMessage, List, Dict, bytes]:
    (features, meta, datadef, data_type) = extract_request_parts_json(request)
    class_names = datadef["names"] if datadef and "names" in datadef else []
    client_response = client_predict(
        user_model, features, class_names, meta=meta
    )
    return construct_response_json(
        user_model,
        False,
        request,
        client_response.data,
        meta,
        metrics,
        client_response.tags,
    )

实现过程也是三段式:

  • 从请求数据中抽取features和metat信息

  • 调用client_predict()得到client_response

  • 格式化client_response并返回

而client_predict()其实就是直接调用user_model.predict()

try:
    client_response = user_model.predict(features, feature_names, **kwargs)
except TypeError:
    client_response = user_model.predict(features, feature_names)
return SeldonResponse.create(client_response)

综述整个Rest服务的推理过程,就是做了三件事情:

  • 格式化请求数据,转换成模型推理需要的格式features

  • 调用user_model.predict(features)进行推理,得到推理结果client_response

  • 格式化client_response,并将其序列化后返回

总结一下,本文主要介绍了一个典型的python模型服务,使用seldon-core-microservices是如何启动,如何通过REST方式响应模型推理请求的。

后续我们将介绍seldon-core所提供的其他类型推理服务。