浅谈 WSGI

WSGI 是 Python Web 开发中经常提到的名词,在维基百科中,定义如下:

Web服务器网关接口(Python Web Server Gateway Interface,缩写为WSGI)是为Python语言定义的Web服务器和Web应用程序或框架之间的一种简单而通用的接口。自从WSGI被开发出来以后,许多其它语言中也出现了类似接口。

正如定义,WSGI 不是服务器,不是 API,不是 Python 模块,而是一种规定服务器和客户端交互的 接口规范

WSGI 目标是在 Web 服务器和 Web 框架层之间提供一个通用的 API 标准,减少之间的互操作性并形成统一的调用方式。根据这个定义,满足 WSGI 的 Web 服务器会将两个固定参数传入 WSGI APP:环境变量字典和一个初始化 Response 的可调用对象。而 WSGI APP 会处理请求并返回一个可迭代对象。

WSGI APP

根据定义,我们可以实现一个非常简单的满足 WSGI 的 App:

def demo_wsgi_app(environ, start_response):
    status = '200 OK'
    headers = [('Content-type', 'text/plain')]
    start_response(status, headers)
    yield "Hello World!"

可以看到,该 App 通过 start_response 初始化请求,并通过 yield 将 body 返回。除了 yield,也可以直接返回一个可迭代对象。

在标准库 wsgiref 中已经包含了一个简单的 WSGI APP,可以在 wsgiref.simple_server 中找到,可以看到,这也是在做相同的事情:

def demo_app(environ,start_response):
    from io import StringIO
    stdout = StringIO()
    print("Hello world!", file=stdout)
    print(file=stdout)
    h = sorted(environ.items())
    for k,v in h:
        print(k,'=',repr(v), file=stdout)
    start_response("200 OK", [('Content-Type','text/plain; charset=utf-8')])
    return [stdout.getvalue().encode("utf-8")]

将这个 App 运行起来如下:

在 Django 中,可以在默认 app 下的 wsgi.py 中找到 get_wsgi_application,Django 通过这个方法创建并返回了一个 WSGIHandle,其本质,依然是一个 WSGI APP,可以看其 __call__ 方法:

class WSGIHandler(base.BaseHandler):
    request_class = WSGIRequest

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.load_middleware()

    def __call__(self, environ, start_response):
        set_script_prefix(get_script_name(environ))
        signals.request_started.send(sender=self.__class__, environ=environ)
        request = self.request_class(environ)
        response = self.get_response(request)

        response._handler_class = self.__class__

        status = '%d %s' % (response.status_code, response.reason_phrase)
        response_headers = list(response.items())
        for c in response.cookies.values():
            response_headers.append(('Set-Cookie', c.output(header='')))
        start_response(status, response_headers)
        if getattr(response, 'file_to_stream', None) is not None and environ.get('wsgi.file_wrapper'):
            response = environ['wsgi.file_wrapper'](response.file_to_stream)
        return response

WSGI 服务器

从 WSGI APP 的写法上就基本能推测出 WSGI 服务器做了什么,因此可以尝试实现一个简陋的 WSGI 服务器:

def run_wsgi_app(app, environ):
    from io import StringIO
    body = StringIO()

    def start_response(status, headers):
        body.write('Status: {}\r\n'.format(status))
        for header in headers:
            body.write("{}: {}".format(*header))
        return body.write

    iterable = app(environ, start_response)
    try:
        if not body.getvalue():
            raise RuntimeError("No exec start_response")
        body.write("\r\n{}\r\n".format('\r\n'.join(line for line in iterable)))
    finally:
        if hasattr(iterable, "close") and callable(iterable.close):
            iterable.close()
    # 这里瞎扯
    return body.getvalue()

对于真正(可用)的 WSGI 服务器,常用的比如 Gunicorn,在不同的 Worker(gunicorn.worker 模块中)中,都实现了一个叫 handle_request 的类方法,这个方法便是调用 WSGI APP,并完成 Response 拼装的,虽然不同的 Worker 的实现略有差异,但比较共通的代码:

respiter = self.wsgi(environ, resp.start_response)
try:
    if isinstance(respiter, environ['wsgi.file_wrapper']):
        resp.write_file(respiter)
    else:
        for item in respiter:
            resp.write(item)
    resp.close()
    request_time = datetime.now() - request_start
    self.log.access(resp, req, environ, request_time)
finally:
    if hasattr(respiter, "close"):
        respiter.close()

这段代码便是调用 WSGI APP,并通过循环把 Body 写入到 resp 中。

中间件

因为 WSGI 的定义方式,可以写多个 WSGI APP 进行嵌套并处理不同的逻辑,比如:

def first_wsgi_app(environ, start_response):
    import logging
    logging.info("new request")
    rsp = second_wsgi_app(environ, start_response)
    logging.info("finish request")
    return rsp


def second_wsgi_app(environ, start_response):
    if environ.get("HTTP_ROLE") == "ADMIN":
        return third_wsgi_app(environ, start_response)
    status = '200 OK'
    headers = [('Content-type', 'text/plain')]
    start_response(status, headers)
    yield "Hello User!"


def third_wsgi_app(environ, start_response):
    status = '200 OK'
    headers = [('Content-type', 'text/plain')]
    start_response(status, headers)
    yield "Hello Admin!"

这时候我们把第一个 WSGI APP first_wsgi_app 传给 Server。在执行时,first_wsgi_app 可以完成日志的记录,second_wsgi_app 可以完成鉴权,third_wsgi_app 来真正的处理请求。

这种 App 的洋葱结构,被正伦亲切的称为俄罗斯套娃。

2018/09/21 16:29 下午 posted in  Python 黑魔法

使用 Flask-RESTPlus 构建生产级应用

本文来自对某项目的实践总结,敏感信息已被隐藏或被 Resource 一词代替。

前几天有人辗转找到公众号,留言询问之前一篇介绍 Flask-RESTPlus 文章的源代码(Flask Api 文档管理与 Swagger 上手),Flask-RESTPlus 虽然看起来非常方便,但在实际编写代码时总有种和当前项目结构冲突的感觉,因此整理之前的一篇改造某项目的总结,分享并探讨最佳实践。

在生成 Swagger 文档上,Flask-RESTPlus 是比较常用的 flask 拓展,但引入该插件需要对项目结构些许调整,如果是从 0 到 1 的新项目,倒也无伤大雅,但是对于已经存在的旧项目,改造还是有一定的工作量的,本文通过总结具体的项目改造,对 Flask-RESTPlus 进一步的讲解,以此总结。

蓝图与 API

在大型 Flask 项目中,为了防止各个模块的依赖混乱,一般通过模块划分,并在 app 工厂方法中统一对各个模块的蓝图进行注册,Flask-RESTPlus 作为 flask 拓展可以通过与 flask app 绑定从而托管注册在 Flask-RESTPlus 的视图,比如官方文档的例子:

app = Flask(__name__)
api = Api(app)

但是这样会架空 flask 自带的蓝图,如果是新项目的话可以考虑使用 Flask-RESTPlus 的 Namespace 替代,但是如果是老项目迁移,成本还是蛮高的,因此可以将 蓝图与 Flask-RESTPlus Api 绑定,这样既保留了原有的模块划分,还可以利用 Namespace 进行更细致的逻辑划分。

比如对于当然项目来说,其中有多个 blueprint,来分割相对独立的模块,我们拿 Resource 模块举例,通过 flask 的蓝图对大模块进行划分之后,再通过 Namespace 对细节再次划分:

desc = """
resource type 1, type 2, type 3, type 4, type 5 api
"""

resource_blueprint = Blueprint("Resource", __name__)
api = Api(resource_blueprint, version='1.0', title='Resource info',
          description=desc)
api.add_namespace(resource_type_1_api)
api.add_namespace(resource_type_2_api)
api.add_namespace(resource_type_3_api)
api.add_namespace(resource_type_4_api)
api.add_namespace(resource_type_5_api)

Resource 支持五种不同的类型,虽然这几种类型的 api 同属在一个蓝图里,但是其本身相对独立,因此可以使用 Namespace 做更细致的区分,然后将这五个 namespace 注册到 api 里。因此 blueprints 目录结构如下:

.
├── __init__.py
├── action
│   ├── __init__.py
│   ├── apis.py
│   └── dto.py
├── health.py
├── json_schema.py
└── resource
    ├── __init__.py
    ├── type_1.py
    ├── type_2.py
    ├── type_3.py
    ├── type_4.py
    ├── type_5.py
    └── dto.py
    

参数检查与权限验证

解决了注册问题,还有部分公共设施需要修改,比如参数检查和 api 权限认证。在之前是这样处理的:

@resource_blueprint.route("/", methods=['POST'])
@internal_token_validator
@request_json_validator(SEND_TYPE_1_SCHEMA)
@tracing_span("post_type_1:type_1_api")
def op_type_1():
    pass

token 验证的逻辑写在 internal_token_validator 装饰器中,虽然 Flask-RESTPlus api 类支持注册装饰器,但是因为并不是所有的 api 都需要 token 认证,因此并不能直接注册在其中,但是有认证的 api 比例非常多,依然选择装饰器,那么装饰数量将要突破 6 个而且到处写一样的逻辑非常丑,因此我继承了 Flask-RESTPlus 视图类 Resource,并复写了 dispatch 函数,如果有方法需要 token 认证则动态将 internal_token_validator 装饰器放在 method_decorators 中,而后者会在 Flask-RESTPlus 处理视图方法时调用。

虽然 Flask-RESTPlus 提供了提供了参数验证的功能,但是对我们来讲并不够用(并不强大),而 DCS 中的参数验证一直使用的是 json-schema,在上面的例子中 request_json_validator 装饰器便是处理相关逻辑,该装饰器会将一个 json-schema 规则传入,然后在处理该 api 函数前将 request 中的 json body 验证,如果验证失败便会封装一个友好的 400 Response。

为了方便使用 json-schema 验证,我也将相关逻辑封装了继承的视图基类里,相关代码:

class BaseView(Resource):
    json_schemas = {}
    internal_token_required = ['get', 'post', 'delete', 'put', 'patch']

    def dispatch_request(self, *args, **kwargs):
        from message.common.errors import APIError
        try:
            method = request.method.lower()
            self.validate(self.json_schemas.get(method))
            if method in self.internal_token_required:
                self.method_decorators.append(internal_token_validator)
            return super(BaseView, self).dispatch_request(*args, **kwargs)
        except APIError as e:
            rsp = Response(
                response=json.dumps(e.render()), status=e.status_code,
                mimetype='application/json')
            return rsp

    @staticmethod
    def validate(schema):
        from message.common.errors import APIError
        if not schema:
            return
        data = request.json
        try:
            validate(data, schema)
        except ValidationError as e:
            raise APIError("ARGS_ERROR", e.message, 400)

DTO

最后谈一下导包的问题,在前一篇文章也提到 Flask-RESTPlus 容易产生相互引用, 而工程和 demo 不同,不能通过什么魔法技巧来避免这个问题 ,而应该通过更加细致的模块划分来避免,最后看到文章《How to structure a Flask-RESTPlus web service for production builds》(文后附链接)中介绍了 DTO 才让我找到了更 “结构化” 的解决办法。

DTO 即 data transfer object,这样设计的思路是和蓝图类似,传统 flask 应用中,在 app 工厂方法注册蓝图,而蓝图内的包相对独立,而 Flask-RESTPlus 引入了 namespace,按上文,我们把它作为蓝图更细以级的存在,因此,可以参考蓝图,将 namespace 的定义和依赖封装在一个类中,这样既避免了循环引用,还可以让整个项目的结构更清晰。

比如 Type1 DTO:

class Type1Dto:
    api = Namespace("type1", path="/type1", decorators=[internal_token_validator])
    action_model = api.model('ActionModel', action_desc)
    create_model = api.model("CreateType1Model", {
        "type1_title": fields.List(fields.String(description="Type1 title"), required=True),
        "type1_info": fields.Nested(api.model("Type1InfoModel", {
            "content": fields.String(description="Type1 content"),
        }), required=True),
    })
    template_model = api.model("TemplateType1Model", {
        "type1_title": fields.List(fields.String(description="Type1 title", required=True)),
        "content": fields.Nested(api.model("TemplateContent", {}), required=True),
    })
    model = api.model("Type1Model", {
        "total": fields.Integer(readOnly=True, description="action total"),
        "results": fields.List(fields.Nested(action_model)),
    })

其中包含了 namespace 的定义,request 的格式对象(Flask-RESTPlus 基于它生成 Request 文档),和 response 的返回对象(Flask-RESTPlus 基于它渲染 json 并生成 Response 文档)。

在使用时,将 dto 导入到视图层,而相关 model 也会在这派上用场:

from .dto import Type1Dto

api = Type1Dto.api


@api.route("/")
class Type1Api(Resource):
    json_schemas = {"post": SEND_TYPE_1_SCHEMA}

    @tracing_span("post_type_1:type_1_api")
    @api.expect(Type1Dto.create_model)
    @api.param("Token", description="internal token.", _in="header", required=True)
    def post(self):
        actions = deal_with_type_1(api.payload['type_1_title'], api.payload['content'])
        result = {
            "total": len(actions),
            "results": [a.render() for a in actions]
        }
        return result

最后将视图层的 api 导入到蓝图定义的地方完成注册,这样整个项目既做到了合理的结构分类,也完成和解决了导包问题。


参考资料:《How to structure a Flask-RESTPlus web service for production builds》: https://medium.freecodecamp.org/structuring-a-flask-restplus-web-service-for-production-builds-c2ec676de563

2018/05/26 16:52 下午 posted in  Python 黑魔法

Flask Api 文档管理与 Swagger 上手

Flask 是一个以自由度高、灵活性强著称的 Python Web 框架。但高灵活性也意味着无尽的代码维护成本、高自由度意味着代码质量更依赖程序员自身而没有一致的标准和规范。因此团队内开发时 Flask 项目更需要建立代码和文档规范以保证不会出现太大的偏差。

本文从 Api 的角度探究 Flask 项目的 Api 规范以及获得 Api 文档的最佳姿势。众数周知,文档的编写和整理工作将花费巨大精力甚至不亚于代码的编写,因此在时间紧任务重的情况下,文档是首先被忽略的工作。不过,就算项目在初期存在文档,但在后面的迭代中,文档落后严重,其产生的误导比没有文档更加可怕。

因此,个人认为 文档随代码走,代码改动时文档也应该跟进变动,但本着 人是不可靠的 原则,文档理想上是应该由代码生成,而不是靠人工维护。如果代码有任何改动,文档也能自动更新,这将是一件非常优雅的事情。虽然对很多文档来说这并不现实,但对于 Api 文档来说,实现成本并不高。

Flask-RESTPlus

对于 REST Api 来说,Flask-RESTPlus 是一个优秀的 Api 文档生成工具,这个包将会替换 Flask 路由层的编写方式,通过自己的语法来规定 Api 细节,并生成 Api 文档。

安装

安装 Flask-RESTPlus

pip install flask-restplus

或者:

easy_install flask-restplus

最小 Demo

详细 Demo 参考:使用 Flask-RESTPlus 构建生产级应用

使用 Flask-RESTPlus 时需要按照这个库规定的方式编写 Api 层,包括 request 的参数解析,以及 response 的返回格式。一个 hello world 级的示范:

from flask import Flask
from flask_restplus import Resource, Api

app = Flask(__name__)
api = Api(app, prefix="/v1", title="Users", description="Users CURD api.")

@api.route('/users')
class UserApi(Resource):
    def get(self):
        return {'user': '1'}

if __name__ == '__main__':
    app.run()

运行之后效果如下:

实践

这里我会实现一个完整的小项目来实践和介绍 Flask-RESTPlus 这个库。我们实现一个简单的 图书订单系统 ,实现用户、图书和订单的 CURD。

Model

用户 model,包含 id 和 username:

class User(object):
    user_id = None
    username = None

    def __init__(self, username: str):
        self.user_id = str(uuid.uuid4())
        self.username = username

图书 model,包含 id,名称和价格:

class Book(object):
    book_id = None
    book_name = None
    price = None

    def __init__(self, book_name: str, book_price: float):
        self.book_id = str(uuid.uuid4())
        self.book_name = book_name
        self.price = book_price

订单 model,包含 id,购买者 id,图书 id 和创建时间:

class Order(object):
    order_id = None
    user_id = None
    book_id = None
    created_at = None

    def __init__(self, user_id, book_id):
        self.order_id = str(uuid.uuid4())
        self.user_id = user_id
        self.book_id = book_id
        self.created_at = int(time.time())

蓝图

在 Flask 中构建大型 Web 项目,可以通过蓝图为路由分组,并在蓝图中添加通用的规则(url 前缀、静态文件路径、模板路径等)。这个项目我们只用一个 api 蓝图,在实际中可能会使用 openapi 蓝图,internal api 蓝图来区分大的分类。

Flask-RESTPlusclass::Api 将直接挂在在蓝图下面,这么我们即利用了 Flask 的蓝图进行对功能模块分类,也可以利用 Api 的版本对 Api 版本进行管理,对于小的模块分类,我们可以利用 Api 的 namespace,着这里我们可以分为 user namespacebook namespaceorder namespace:

Api 蓝图:

from flask import Blueprint
from flask_restplus import Api

api_blueprint = Blueprint("open_api", __name__, url_prefix="/api")
api = Api(api_blueprint, version="1.0",
          prefix="/v1", title="OpenApi", description="The Open Api Service")

然后,就可以创建出不同的 namespace,来编写自己的 api 代码了。而只需要在 app 工厂中注册该 blueprint,便可将自己的编写的 api 挂载到 flask app 中。

def create_app():
    app = Flask("Flask-Web-Demo")

    # register api namespace
    register_api()

    # register blueprint
    from apis import api_blueprint
    app.register_blueprint(api_blueprint)

    return app

要注意的是,因为 Api 中很多工具方法依赖 api 对象,因此在注册 namespace 的时候要避免循环引用,而且,这注册蓝图的时候,需要先将 namespace 注册,否则会 404。这个库的很多方法太依赖 api 对象,感觉设计并不合理,很容易就循环引用,并不是非常优雅。

注册 namespace:

def register_api():
    from apis.user_api import ns as user_api
    from apis.book_api import ns as book_api
    from apis.order_api import ns as order_api
    from apis import api
    api.add_namespace(user_api)
    api.add_namespace(book_api)
    api.add_namespace(order_api)

下面就是 Api 的编写了。

编写 Api

列表和创建

我们先完成用户的列表和创建 Api,代码如下:

from flask_restplus import Resource, fields, Namespace

from model import User
from apis import api

ns = Namespace("users", description="Users CURD api.")

user_model = ns.model('UserModel', {
    'user_id': fields.String(readOnly=True, description='The user unique identifier'),
    'username': fields.String(required=True, description='The user nickname'),
})
user_list_model = ns.model('UserListModel', {
    'users': fields.List(fields.Nested(user_model)),
    'total': fields.Integer,
})


@ns.route("")
class UserListApi(Resource):
    # 初始化数据
    users = [User("HanMeiMei"), User("LiLei")]

    @ns.doc('get_user_list')
    @ns.marshal_with(user_list_model)
    def get(self):
        return {
            "users": self.users,
            "total": len(self.users),
        }

    @ns.doc('create_user')
    @ns.expect(user_model)
    @ns.marshal_with(user_model, code=201)
    def post(self):
        user = User(api.payload['username'])
        return user

解释下上面的代码,首先需要创建一个 user model 来让 Flask-RESTPlus 知道我们如何渲染和解析 json:

user_model = ns.model('UserModel', {
    'user_id': fields.String(readOnly=True, description='The user unique identifier'),
    'username': fields.String(required=True, description='The user nickname'),
})

这里面定义了字段以及字段的描述,这些字段并不参与参数检查,而只是渲染到 api 文档上,来标记 api 将返回什么结果,以及应该怎么调用 api。

然后介绍下目前用到的装饰器:

  1. @ns.doc 来标记这个 api 的作用
  2. @ns.marshal_with 来标记如何渲染返回的 json
  3. @ns.expect 来标记我们预期什么样子的 request

运行程序我们可以看到以下结果:

我们也可以通过 try it 来调用 api:

查询和更新

因为路由是绑定到一个类上的,因此限定了这个类能处理的 url,对于 '/users/user_id' 类似的路径,需要单独的类来处理:

@ns.route("/<string:user_id>")
@ns.response(404, 'User not found')
@ns.param('user_id', 'The user identifier')
class UserInfoApi(Resource):
    users = [User("HanMeiMei"), User("LiLei")]
    print([u.user_id for u in users])

    @ns.doc("get_user_by_id")
    @ns.marshal_with(user_model)
    def get(self, user_id):
        for u in self.users:
            if u.user_id == user_id:
                return u
        ns.abort(404, "User {} doesn't exist".format(user_id))

    @ns.doc("update_user_info")
    @ns.expect(user_model)
    @ns.marshal_with(user_model)
    def put(self, user_id):
        user = None
        for u in self.users:
            if u.user_id == user_id:
                user = u
        if not user:
            ns.abort(404, "User {} doesn't exist".format(user_id))
        user.username = api.payload['username']
        return user

在这里面可以看到更改了 url 和新引入了两个装饰器:

  1. @ns.response 用来标记可能出现的 Response Status Code 并渲染在文档中
  2. @ns.param 用来标记 URL 参数

运行程序之后我们可以尝试根据 id 获得一个用户:

注意namespace 的 name 会被拼接到 url 中,比如上面 url 中的 “users” 即是 namespace name。

带嵌套的 Api

用户 Api 和图书 Api 基本一样而且简单,但是对于订单 Api 中,需要包含用户信息和图书信息,在实现上略微不同。

from flask_restplus import Resource, fields, Namespace

from model import Order, Book, User
from apis.user_api import user_model
from apis.book_api import book_model

ns = Namespace("order", description="Order CURD api.")

order_model = ns.model('OrderModel', {
    "order_id": fields.String(readOnly=True, description='The order unique identifier'),
    "user": fields.Nested(user_model, description='The order creator info'),
    "book": fields.Nested(book_model, description='The book info.'),
    "created_at": fields.Integer(readOnly=True, description='create time: unix timestamp.'),
})
order_list = ns.model('OrderListModel', {
    "orders": fields.List(fields.Nested(order_model)),
    "total": fields.Integer(description='len of orders')
})

book = Book("Book1", 10.5)
user = User("LiLei")
order = Order(user.user_id, book.book_id)


@ns.route("")
class UserListApi(Resource):

    @ns.doc('get_order_list')
    @ns.marshal_with(order_list)
    def get(self):
        return {
            "orders": [{
                "order_id": order.order_id,
                "created_at": order.created_at,
                "user": {
                    "user_id": user.user_id,
                    "username": user.username,
                },
                "book": {
                    "book_id": book.book_id,
                    "book_name": book.book_name,
                    "price": book.price,
                }
            }],
            "total": 1}

    @ns.doc('create_order')
    @ns.expect(order_model)
    @ns.marshal_with(order_model, code=201)
    def post(self):
        return {
            "order_id": order.order_id,
            "created_at": order.created_at,
            "user": {
                "user_id": user.user_id,
                "username": user.username,
            },
            "book": {
                "book_id": book.book_id,
                "book_name": book.book_name,
                "price": book.price,
            }
        }

这里使用了更灵活的格式组合,包括 fields.Nested 可以引入其他 model,因为 model 可以相互引用,因此还是有必要把这些 model 放在一起,来避免循环引用。不过由此也可以看出,Response 解析还是比较自由的。

备注:这里 return 的是一个字典,但是理想状态下应该是一个类(user 字段和 book 字段),只是因为没有数据库操作,简化处理。

到这里,这个小项目就是写完了,最后运行效果图如下:

改造

可以通过这个简单的 Demo 了解 Flask-RESTPlus 的使用,但是目前只是从零到一的写一个完成的项目,因此看起来非常容易上手,但是如果是旧项目改造,我们需要做什么?

通过上述代码,我们可以看到要做的主要是两件事:

  1. Api 层的改造
  2. 设计 Api Model

Api 层改造涉及到两点,因为 url 是由 blueprint、api obj、namespace 三个东西共同组成的,因此需要设计怎么分配,可能还有重写部分 api 的实现。但是理想的 api-service-model 架构的程序, api 应该是比较薄的一层,要接入并不困难,只是琐碎。

Api Model 一般是原有项目没有的,需要引入,其中包括的参数检查的 model(Flask-RESTPlus 提供了 Request Parsing,本文并没讨论,可以参考文档: Request Parsing )和解析 Response 的 model,这些需要梳理所有 api 和字段,工作量不小,如果数据库模型设计合理的话也许能减轻部分工作量。

Swagger

Swagger 是一款非常流行的 Api 文档管理、交互工具,适用于在团队中的 Api 管理,以及服务组件对接。其好用与重要程度不必赘言,下面基于上文的 demo,完成一个 Swagger 文档以及基于文档生成用于对接的 client。

获得 Swagger 文档

Flask-RESTPlus 是已经集成了 Swagger UI 的,在运行时所获得界面即是通过 Swagger UI 渲染的。而我们目前需要的是获取 Swagger 文档 json 或 yaml 文件。

在控制台可以看到,在访问程序时:

是的,这就是 Swagger 文档:

代码生成

使用 Swagger 生成文档需要

在 macOS 下载:

brew install swagger-codegen

然后可以通过 help 名称查看帮助:

Hypo-MBP:~ hypo$ swagger-codegen help
usage: swagger-codegen-cli <command> [<args>]

The most commonly used swagger-codegen-cli commands are:
    config-help   Config help for chosen lang
    generate      Generate code with chosen lang
    help          Display help information
    langs         Shows available langs
    meta          MetaGenerator. Generator for creating a new template set and configuration for Codegen.  The output will be based on the language you specify, and includes default templates to include.
    validate      Validate specification
    version       Show version information

See 'swagger-codegen-cli help <command>' for more information on a specific
command.

生成 Python client:

swagger-codegen generate -i http://127.0.0.1:5000/api/swagger.json -l python

执行完成后,便可以在当前路径的 swagger_client 下找到 api client 了。

总结

本文介绍了 Flask-RESTPlus 的使用,因为其本身就支持 Swagger 语法并内置了 Swagger UI,所以 Swagger 对接简单异常。因此,主要工作量放在了编写 api 层上,包括 model,以及 api 中起到解释说明作用的装饰器。虽然在代码上需要编写不少不必要的代码(介绍说明用的描述等),但是这些额外代码辅助生成了与代码一致的文档,在组件对接和维护上,实则降低了成本。

2018/05/05 16:52 下午 posted in  Python 黑魔法

非 flask 程序实现 Flask Shell

2018/04/29 16:07 下午 posted in  Python 黑魔法

Python 的并发编程

这篇文章将讲解 Python 并发编程的基本操作。并发和并行是对孪生兄弟,概念经常混淆。并发是指能够多任务处理,并行则是是能够同时多任务处理。Erlang 之父 Joe Armstrong 有一张非常有趣的图说明这两个概念:

我个人更喜欢的一种说法是:并发是宏观并行而微观串行。

GIL

虽然 Python 自带了很好的类库支持多线程/进程编程,但众所周知,因为 GIL 的存在,Python 很难做好真正的并行。

GIL 指全局解释器锁,对于 GIL 的介绍:

全局解释器锁(英语:Global Interpreter Lock,缩写GIL),是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。

  • 维基百科

其实与其说 GIL 是 Python 解释器的限制,不如说是 CPython 的限制,因为 Python 为了保障性能,底层大多使用 C 实现的,而 CPython 的内存管理并不是线程安全的,为了保障整体的线程安全,解释器便禁止多线程的并行执行。

因为 Python 社区认为操作系统的线程调度已经非常成熟了,没有必要自己再实现一遍,因此 Python 的线程切换基本是依赖操作系统,在实际的使用中,对于单核 CPU,GIL 并没有太大的影响,但对于多核 CPU 却引入了线程颠簸(thrashing)问题。

线程颠簸是指作为单一资源的 GIL 锁,在被多核心竞争强占时资源额外消耗的现象。

比如下图,线程1 在释放 GIL 锁后,操作系统唤醒了 线程2,并将 线程2 分配给 核心2 执行,但是如果此时 线程2 却没有成功获得 GIL 锁,只能再次被挂起。此时切换线程、切换上下文的资源都将白白浪费。

因此,Python 多线程程序在多核 CPU 机器下的性能不一定比单核高。那么如果是计算密集型的程序,一般还是考虑用 C 重写关键部分,或者使用多进程避开 GIL。

多线程

在 Python 中使用多线程,有 threadthreading 可供原则,thread 提供了低级别的、原始的线程以及一个简单的锁,因为 thread 过于简陋,线程管理容易出现人为失误,因此官方更建议使用 threading,而 threading 也不过是对 thread 的封装和补充。(Python3 中 thread 被改名为 _thread)。

在 Python 中创建线程非常简单:

import time
import threading


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(1)
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 创建 task
        tasks.append(threading.Thread(
            target=do_task,
            args=("task_{}".format(i),)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

直接创建线程简单优雅,如果逻辑复杂,也可以通过继承 Thread 基类完成多线程:

import time
import threading


class MyTask(threading.Thread):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(1)
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 创建 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

多进程

在 Python 中,可以使用 multiprocessing 库来实现多进程编程,和多线程一样,有两种方法可以使用多进程编程。

直接创建进程:

import time
import random
import multiprocessing


def do_something(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 创建 task
        tasks.append(multiprocessing.Process(
            target=do_something,
            args=("task_{}".format(i),)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

继承进程父类:

import time
import random
import multiprocessing


class MyTask(multiprocessing.Process):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(random.randint(1, 5))
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 创建 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

multiprocessing 除了常用的多进程编程外,我认为它最大的意义在于提供了一套规范,在该库下有一个 dummy 模块,即 multiprocessing.dummy,里面对 threading 进行封装,提供了和 multiprocessing 相同 API 的线程实现,换句话说,class::multiprocessing.Process 提供的是进程任务类,而 class::multiprocessing.dummy.Process,也正是有 multiprocessing.dummy 的存在,可以快速的讲一个多进程程序改为多线程:

import time
import random
from multiprocessing.dummy import Process


class MyTask(Process):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(random.randint(1, 5))
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 创建 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

无论是多线程还是多进程编程,这也是我一般会选择 multiprocessing 的原因。

除了直接创建进程,还可以用进程池(或者 multiprocessing.dummy 里的进程池):

import time
import random
from multiprocessing import Pool


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(0, 10):
        #     创建 task
        pool.apply_async(do_task, ("task_{}".format(i),))
    pool.close()
    pool.join()
    print("Finish.")

线程池:

import time
import random
from multiprocessing.dummy import Pool


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(0, 10):
        #     创建 task
        pool.apply_async(do_task, ("task_{}".format(i),))
    pool.close()
    pool.join()
    print("Finish.")

这里示例有个问题,pool 在 join 前需要 close 掉,否则就会抛出异常,不过 Python 之禅的作者 Tim Peters 给出解释:

As to Pool.close(), you should call that when - and only when - you're never going to submit more work to the Pool instance. So Pool.close() is typically called when the parallelizable part of your main program is finished. Then the worker processes will terminate when all work already assigned has completed.

It's also excellent practice to call Pool.join() to wait for the worker processes to terminate. Among other reasons, there's often no good way to report exceptions in parallelized code (exceptions occur in a context only vaguely related to what your main program is doing), and Pool.join() provides a synchronization point that can report some exceptions that occurred in worker processes that you'd otherwise never see.

同步原语

在多进程编程中,因为进程间的资源隔离,不需要考虑内存的线程安全问题,而在多线程编程中便需要同步原语来保存线程安全,因为 Python 是一门简单的语言,很多操作都是封装的操作系统 API,因此支持的同步原语蛮全,但这里只写两种常见的同步原语:锁和信号量。

通过使用锁可以用来保护一段内存空间,而信号量可以被多个线程共享。

threading 中可以看到 Lock 锁和 RLock 重用锁两种锁,区别如名。这两种锁都只能被一个线程拥有,第一种锁只能被获得一次,而重用锁可以被多次获得,但也需要同样次数的释放才能真正的释放。

当多个线程对同一块内存空间同时进行修改的时候,经常遇到奇怪的问题:

import time
import random
from threading import Thread, Lock

count = 0


def do_task():
    global count
    time.sleep(random.randint(1, 10) * 0.1)
    tmp = count
    tmp += 1
    time.sleep(random.randint(1, 10) * 0.1)
    count = tmp
    print(count)


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish. Count = {}".format(count))

如上就是典型的非线程安全导致 count 没有达到预期的效果。而通过锁便可以控制某一段代码,或者说某段内存空间的访问:

import time
import random
from threading import Thread, Lock

count = 0
lock = Lock()


def do_task():
    lock.acquire()
    global count
    time.sleep(random.randint(1, 10) * 0.1)
    tmp = count
    tmp += 1
    time.sleep(random.randint(1, 10) * 0.1)
    count = tmp
    print(count)
    lock.release()


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish. Count = {}".format(count))

当然,上述例子非常暴力,直接强行把并发改为串行。

对于信号量常见于有限资源强占的场景,可以定义固定大小的信号量供多个线程获取或者释放,从而控制线程的任务执行,比如下面的例子,控制最多有 5 个任务在执行:

import time
import random
from threading import Thread, BoundedSemaphore

sep = BoundedSemaphore(5)


def do_task(task_name):
    sep.acquire()
    print("do Task: {}".format(task_name))
    time.sleep(random.randint(1, 10))
    sep.release()


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task, args=("task_{}".format(i),)))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish.")

Queue 和 Pipe

因为多进程的内存隔离,不会存在内存竞争的问题。但同时,多个进程间的数据共享成为了新的问题,而进程间通信常见:队列,管道,信号。

这里只讲解队列和管道。

队列常见于双进程模型,一般用作生产者-消费者模式,由生产者进程向队列中发布任务,并由消费者从队列首部拿出任务进行执行:

import time
from multiprocessing import Process, Queue


class Task1(Process):
    def __init__(self, queue):
        super(Task1, self).__init__()
        self.queue = queue

    def run(self):
        item = self.queue.get()
        print("get item: [{}]".format(item))


class Task2(Process):
    def __init__(self, queue):
        super(Task2, self).__init__()
        self.queue = queue

    def run(self):
        print("put item: [Hello]")
        time.sleep(1)
        self.queue.put("Hello")


if __name__ == "__main__":
    queue = Queue()
    t1 = Task1(queue)
    t2 = Task2(queue)
    t1.start()
    t2.start()
    t1.join()
    print("Finish.")

理论上每个进程都可以向队列里的读或者写,可以认为队列是半双工路线。但是往往只有特定的读进程(比如消费者)和写进程(比如生产者),尽管这些进程只是开发者自己定义的。

而 Pipe 更像一个全工路线:

import time
from multiprocessing import Process, Pipe


class Task1(Process):
    def __init__(self, pipe):
        super(Task1, self).__init__()
        self.pipe = pipe

    def run(self):
        item = self.pipe.recv()
        print("Task1: recv item: [{}]".format(item))
        print("Task1: send item: [Hi]")
        self.pipe.send("Hi")


class Task2(Process):
    def __init__(self, pipe):
        super(Task2, self).__init__()
        self.pipe = pipe

    def run(self):
        print("Task2: send item: [Hello]")
        time.sleep(1)
        self.pipe.send("Hello")
        time.sleep(1)
        item = self.pipe.recv()
        print("Task2: recv item: [{}]".format(item))


if __name__ == "__main__":
    pipe = Pipe()
    t1 = Task1(pipe[0])
    t2 = Task2(pipe[1])
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("Finish.")

除了上面介绍的 threadingmultiprocessing 两个库外,还有一个好用的令人发指的库 concurrent.futures。和前面两个库不同,这个库是更高等级的抽象,隐藏了很多底层的东西,但也因此非常好用。用官方的例子:

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

该库中自带了进程池和线程池,可以通过上下文管理器来管理,而且对于异步任务执行完后,结果的获得也非常简单。再拿一个官方的多进程计算的例子作为结束:

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()
2018/01/18 20:53 下午 posted in  Python 黑魔法