Python 魔术方法

魔术方法是 Python 类中那些定义名称类似 __XXX__ 的函数,使用 Python 的魔术方法的优势在于可以使用 Python 提供的内置方法,比如 len()str(),或者重载运算符,让自定义对象类型可以表现得像内置类型一样。

Python 语言参考手册中一共列出了 83 个特殊方法的名字,其中 47 个用于实现算术运算、位运算和比较操作,概览如下。

跟运算符无关的特殊方法:

跟运算符相关的特殊方法:

有一份文档详细的介绍了所有的魔术方法的使用方式,本文只整理了常用魔术方法以及常见的坑。 文档见: http://pycoders-weekly-chinese.readthedocs.io/en/latest/issue6/a-guide-to-pythons-magic-methods.html

构造与析构

Python 对象通过 __init__ 进行初始化,但是在执行 __init__ 之前,首先被执行的是 __new__ 函数。在初始化一个对象时,会先调用 __new__ 方法,该方法是一个类方法,执行完相关逻辑之后调用 __init__ 并将初始化参数传递给后者。

__del__ 函数是 Python 对象的析构函数,无论是垃圾回收还是手动删除,都是执行这个方法:

class Model(object):
    def __init__(self):
        print("Run: __init__")

    def __del__(self):
        print("Run: __del__")


if __name__ == "__main__":
    m = Model()
    del m

结果:

Run: __init__
Run: __del__

打印

在 Python 魔术方法中,有两个和打印有关,__repr____str__

如果在 Python 直接打印一个类,将会获得一个内存地址:

<__main__.Model object at 0x10408fda0>

如果在解释器中想获得一个更友好的显示,__repr__ 是一个非常好的选择。另外,在不把 object 当做字符串处理的场合,也是会使用该方法。

而在把 object 当做字符串处理或者转换为字符串的场合,比如 print(object)"{}".format(object)str(object) 等,__str__ 将会被使用。

class Model(object):
    def __init__(self):
        self.id = uuid.uuid4()
        self.content = "fluent python demo"

    def __str__(self):
        return self.content

    def __repr__(self):
        return "<Model: {}>".format(self.id)

一般在 __str__ 中展现更人性化的信息,可能直接返回给用户;而 __repr__ 更多为了在调试中使用方便(解释器中等)。

另外,如果 __str__ 没有定义,那么将会直接使用 __repr__ 方法,因此如果不需要太多差别,只定义后者即可。

数组与字典

如果需要定义一个类似列表的自定义类,为了支持通过下标访问,可以覆写 __getitem__

class Model(object):
    def __init__(self):
        self.content = [r for r in range(0, 3)]

    def __getitem__(self, item):
        return self.content[item]

该类从此支持了下标访问,并且还支持切片,排序等 Python 内置方法:

if __name__ == "__main__":
    m = Model()
    print(m[1:], sorted(m, reverse=True))

结果:

[1, 2] [2, 1, 0]

对于赋值操作,只需要覆写 __setitem__ 方法:

def __setitem__(self, key, value):
    self.content[key] = value

同理,对于将自定义对象进行类似字典的操作,可以这样实现:

class Model(object):
    def __init__(self):
        self.content = {
            "key": "value"
        }

    def __getitem__(self, item):
        return self.content[item]

    def __setitem__(self, key, value):
        self.content[key] = value

类成员变量

类成员方法 __getattr____getattribute____setattr__ 是三个方便且危险的方法,可以用来定义类成员属性,前两个是用来获取,最后一个是用来赋值。

前两个的区别是 __getattribute__ 会被首先调用,只有 __getattribute__ 找不到的时候,才会调用 __getattr__

因为本身的类属性操作也是基于这几个方法,因此在覆写时应该避免循环调用最后栈溢出。

总结

简单讲了几种常见的魔术方法,魔术方法可以使得自定义类型支持 Python 原生、内置方法,因而使得代码风格更加简洁和一致。但是,也容易写出难以调试的魔法代码,简洁(或者说 geek)和可维护性可能本身就是两个极端,在使用魔术方法时还需要考虑后继维护者是否能理解自己的行为。

2018/5/16 posted in  流畅的 Python

Flask Api 文档管理与 Swagger 上手

Flask 是一个以自由度高、灵活性强著称的 Python Web 框架。但高灵活性也意味着无尽的代码维护成本、高自由度意味着代码质量更依赖程序员自身而没有一致的标准和规范。因此团队内开发时 Flask 项目更需要建立代码和文档规范以保证不会出现太大的偏差。

本文从 Api 的角度探究 Flask 项目的 Api 规范以及获得 Api 文档的最佳姿势。众数周知,文档的编写和整理工作将花费巨大精力甚至不亚于代码的编写,因此在时间紧任务重的情况下,文档是首先被忽略的工作。不过,就算项目在初期存在文档,但在后面的迭代中,文档落后严重,其产生的误导比没有文档更加可怕。

因此,个人认为 文档随代码走,代码改动时文档也应该跟进变动,但本着 人是不可靠的 原则,文档理想上是应该由代码生成,而不是靠人工维护。如果代码有任何改动,文档也能自动更新,这将是一件非常优雅的事情。虽然对很多文档来说这并不现实,但对于 Api 文档来说,实现成本并不高。

Flask-RESTPlus

对于 REST Api 来说,Flask-RESTPlus 是一个优秀的 Api 文档生成工具,这个包将会替换 Flask 路由层的编写方式,通过自己的语法来规定 Api 细节,并生成 Api 文档。

安装

安装 Flask-RESTPlus

pip install flask-restplus

或者:

easy_install flask-restplus

最小 Demo

使用 Flask-RESTPlus 时需要按照这个库规定的方式编写 Api 层,包括 request 的参数解析,以及 response 的返回格式。一个 hello world 级的示范:

from flask import Flask
from flask_restplus import Resource, Api

app = Flask(__name__)
api = Api(app, prefix="/v1", title="Users", description="Users CURD api.")

@api.route('/users')
class UserApi(Resource):
    def get(self):
        return {'user': '1'}

if __name__ == '__main__':
    app.run()

运行之后效果如下:

实践

这里我会实现一个完整的小项目来实践和介绍 Flask-RESTPlus 这个库。我们实现一个简单的 图书订单系统 ,实现用户、图书和订单的 CURD。

Model

用户 model,包含 id 和 username:

class User(object):
    user_id = None
    username = None

    def __init__(self, username: str):
        self.user_id = str(uuid.uuid4())
        self.username = username

图书 model,包含 id,名称和价格:

class Book(object):
    book_id = None
    book_name = None
    price = None

    def __init__(self, book_name: str, book_price: float):
        self.book_id = str(uuid.uuid4())
        self.book_name = book_name
        self.price = book_price

订单 model,包含 id,购买者 id,图书 id 和创建时间:

class Order(object):
    order_id = None
    user_id = None
    book_id = None
    created_at = None

    def __init__(self, user_id, book_id):
        self.order_id = str(uuid.uuid4())
        self.user_id = user_id
        self.book_id = book_id
        self.created_at = int(time.time())

蓝图

在 Flask 中构建大型 Web 项目,可以通过蓝图为路由分组,并在蓝图中添加通用的规则(url 前缀、静态文件路径、模板路径等)。这个项目我们只用一个 api 蓝图,在实际中可能会使用 openapi 蓝图,internal api 蓝图来区分大的分类。

Flask-RESTPlusclass::Api 将直接挂在在蓝图下面,这么我们即利用了 Flask 的蓝图进行对功能模块分类,也可以利用 Api 的版本对 Api 版本进行管理,对于小的模块分类,我们可以利用 Api 的 namespace,着这里我们可以分为 user namespacebook namespaceorder namespace:

Api 蓝图:

from flask import Blueprint
from flask_restplus import Api

api_blueprint = Blueprint("open_api", __name__, url_prefix="/api")
api = Api(api_blueprint, version="1.0",
          prefix="/v1", title="OpenApi", description="The Open Api Service")

然后,就可以创建出不同的 namespace,来编写自己的 api 代码了。而只需要在 app 工厂中注册该 blueprint,便可将自己的编写的 api 挂载到 flask app 中。

def create_app():
    app = Flask("Flask-Web-Demo")

    # register api namespace
    register_api()

    # register blueprint
    from apis import api_blueprint
    app.register_blueprint(api_blueprint)

    return app

要注意的是,因为 Api 中很多工具方法依赖 api 对象,因此在注册 namespace 的时候要避免循环引用,而且,这注册蓝图的时候,需要先将 namespace 注册,否则会 404。这个库的很多方法太依赖 api 对象,感觉设计并不合理,很容易就循环引用,并不是非常优雅。

注册 namespace:

def register_api():
    from apis.user_api import ns as user_api
    from apis.book_api import ns as book_api
    from apis.order_api import ns as order_api
    from apis import api
    api.add_namespace(user_api)
    api.add_namespace(book_api)
    api.add_namespace(order_api)

下面就是 Api 的编写了。

编写 Api

列表和创建

我们先完成用户的列表和创建 Api,代码如下:

from flask_restplus import Resource, fields, Namespace

from model import User
from apis import api

ns = Namespace("users", description="Users CURD api.")

user_model = ns.model('UserModel', {
    'user_id': fields.String(readOnly=True, description='The user unique identifier'),
    'username': fields.String(required=True, description='The user nickname'),
})
user_list_model = ns.model('UserListModel', {
    'users': fields.List(fields.Nested(user_model)),
    'total': fields.Integer,
})


@ns.route("")
class UserListApi(Resource):
    # 初始化数据
    users = [User("HanMeiMei"), User("LiLei")]

    @ns.doc('get_user_list')
    @ns.marshal_with(user_list_model)
    def get(self):
        return {
            "users": self.users,
            "total": len(self.users),
        }

    @ns.doc('create_user')
    @ns.expect(user_model)
    @ns.marshal_with(user_model, code=201)
    def post(self):
        user = User(api.payload['username'])
        return user

解释下上面的代码,首先需要创建一个 user model 来让 Flask-RESTPlus 知道我们如何渲染和解析 json:

user_model = ns.model('UserModel', {
    'user_id': fields.String(readOnly=True, description='The user unique identifier'),
    'username': fields.String(required=True, description='The user nickname'),
})

这里面定义了字段以及字段的描述,这些字段并不参与参数检查,而只是渲染到 api 文档上,来标记 api 将返回什么结果,以及应该怎么调用 api。

然后介绍下目前用到的装饰器:

  1. @ns.doc 来标记这个 api 的作用
  2. @ns.marshal_with 来标记如何渲染返回的 json
  3. @ns.expect 来标记我们预期什么样子的 request

运行程序我们可以看到以下结果:

我们也可以通过 try it 来调用 api:

查询和更新

因为路由是绑定到一个类上的,因此限定了这个类能处理的 url,对于 '/users/user_id' 类似的路径,需要单独的类来处理:

@ns.route("/<string:user_id>")
@ns.response(404, 'User not found')
@ns.param('user_id', 'The user identifier')
class UserInfoApi(Resource):
    users = [User("HanMeiMei"), User("LiLei")]
    print([u.user_id for u in users])

    @ns.doc("get_user_by_id")
    @ns.marshal_with(user_model)
    def get(self, user_id):
        for u in self.users:
            if u.user_id == user_id:
                return u
        ns.abort(404, "User {} doesn't exist".format(user_id))

    @ns.doc("update_user_info")
    @ns.expect(user_model)
    @ns.marshal_with(user_model)
    def put(self, user_id):
        user = None
        for u in self.users:
            if u.user_id == user_id:
                user = u
        if not user:
            ns.abort(404, "User {} doesn't exist".format(user_id))
        user.username = api.payload['username']
        return user

在这里面可以看到更改了 url 和新引入了两个装饰器:

  1. @ns.response 用来标记可能出现的 Response Status Code 并渲染在文档中
  2. @ns.param 用来标记 URL 参数

运行程序之后我们可以尝试根据 id 获得一个用户:

注意namespace 的 name 会被拼接到 url 中,比如上面 url 中的 “users” 即是 namespace name。

带嵌套的 Api

用户 Api 和图书 Api 基本一样而且简单,但是对于订单 Api 中,需要包含用户信息和图书信息,在实现上略微不同。

from flask_restplus import Resource, fields, Namespace

from model import Order, Book, User
from apis.user_api import user_model
from apis.book_api import book_model

ns = Namespace("order", description="Order CURD api.")

order_model = ns.model('OrderModel', {
    "order_id": fields.String(readOnly=True, description='The order unique identifier'),
    "user": fields.Nested(user_model, description='The order creator info'),
    "book": fields.Nested(book_model, description='The book info.'),
    "created_at": fields.Integer(readOnly=True, description='create time: unix timestamp.'),
})
order_list = ns.model('OrderListModel', {
    "orders": fields.List(fields.Nested(order_model)),
    "total": fields.Integer(description='len of orders')
})

book = Book("Book1", 10.5)
user = User("LiLei")
order = Order(user.user_id, book.book_id)


@ns.route("")
class UserListApi(Resource):

    @ns.doc('get_order_list')
    @ns.marshal_with(order_list)
    def get(self):
        return {
            "orders": [{
                "order_id": order.order_id,
                "created_at": order.created_at,
                "user": {
                    "user_id": user.user_id,
                    "username": user.username,
                },
                "book": {
                    "book_id": book.book_id,
                    "book_name": book.book_name,
                    "price": book.price,
                }
            }],
            "total": 1}

    @ns.doc('create_order')
    @ns.expect(order_model)
    @ns.marshal_with(order_model, code=201)
    def post(self):
        return {
            "order_id": order.order_id,
            "created_at": order.created_at,
            "user": {
                "user_id": user.user_id,
                "username": user.username,
            },
            "book": {
                "book_id": book.book_id,
                "book_name": book.book_name,
                "price": book.price,
            }
        }

这里使用了更灵活的格式组合,包括 fields.Nested 可以引入其他 model,因为 model 可以相互引用,因此还是有必要把这些 model 放在一起,来避免循环引用。不过由此也可以看出,Response 解析还是比较自由的。

备注:这里 return 的是一个字典,但是理想状态下应该是一个类(user 字段和 book 字段),只是因为没有数据库操作,简化处理。

到这里,这个小项目就是写完了,最后运行效果图如下:

改造

可以通过这个简单的 Demo 了解 Flask-RESTPlus 的使用,但是目前只是从零到一的写一个完成的项目,因此看起来非常容易上手,但是如果是旧项目改造,我们需要做什么?

通过上述代码,我们可以看到要做的主要是两件事:

  1. Api 层的改造
  2. 设计 Api Model

Api 层改造涉及到两点,因为 url 是由 blueprint、api obj、namespace 三个东西共同组成的,因此需要设计怎么分配,可能还有重写部分 api 的实现。但是理想的 api-service-model 架构的程序, api 应该是比较薄的一层,要接入并不困难,只是琐碎。

Api Model 一般是原有项目没有的,需要引入,其中包括的参数检查的 model(Flask-RESTPlus 提供了 Request Parsing,本文并没讨论,可以参考文档: Request Parsing )和解析 Response 的 model,这些需要梳理所有 api 和字段,工作量不小,如果数据库模型设计合理的话也许能减轻部分工作量。

Swagger

Swagger 是一款非常流行的 Api 文档管理、交互工具,适用于在团队中的 Api 管理,以及服务组件对接。其好用与重要程度不必赘言,下面基于上文的 demo,完成一个 Swagger 文档以及基于文档生成用于对接的 client。

获得 Swagger 文档

Flask-RESTPlus 是已经集成了 Swagger UI 的,在运行时所获得界面即是通过 Swagger UI 渲染的。而我们目前需要的是获取 Swagger 文档 json 或 yaml 文件。

在控制台可以看到,在访问程序时:

是的,这就是 Swagger 文档:

代码生成

使用 Swagger 生成文档需要

在 macOS 下载:

brew install swagger-codegen

然后可以通过 help 名称查看帮助:

Hypo-MBP:~ hypo$ swagger-codegen help
usage: swagger-codegen-cli <command> [<args>]

The most commonly used swagger-codegen-cli commands are:
    config-help   Config help for chosen lang
    generate      Generate code with chosen lang
    help          Display help information
    langs         Shows available langs
    meta          MetaGenerator. Generator for creating a new template set and configuration for Codegen.  The output will be based on the language you specify, and includes default templates to include.
    validate      Validate specification
    version       Show version information

See 'swagger-codegen-cli help <command>' for more information on a specific
command.

生成 Python client:

swagger-codegen generate -i http://127.0.0.1:5000/api/swagger.json -l python

执行完成后,便可以在当前路径的 swagger_client 下找到 api client 了。

总结

本文介绍了 Flask-RESTPlus 的使用,因为其本身就支持 Swagger 语法并内置了 Swagger UI,所以 Swagger 对接简单异常。因此,主要工作量放在了编写 api 层上,包括 model,以及 api 中起到解释说明作用的装饰器。虽然在代码上需要编写不少不必要的代码(介绍说明用的描述等),但是这些额外代码辅助生成了与代码一致的文档,在组件对接和维护上,实则降低了成本。

2018/5/5 posted in  Python 黑魔法

GTD 入门笔记

虽然时间线拉的比较长,《Getting things done》 总算是看完了。

看完 GTD 之后对 GTD 理论有了更进一步的理解,通过通读,更加清晰了 GTD 四步法:收集清理组织回顾,也明确的四步法与执行的关系,并顺便根据自己的理解将 GTD 这一套理论简化为 4 个原则,以方便执行和安利。

四步法与执行

曾经在知乎看到有人反对在不了解时间管理的情况下直接实践 GTD,而更是推荐番茄工作法之类简单的理论。在看完 GTD 这本书之后对这个观点有些认可,GTD 的执行需要形成一大堆习惯,而作者更是在书中坦言,将 GTD 完全接受并融入自己生活中,可能需要大约两年的摸索。 因此,在正式实践 GTD 之前请确定自己有改变生活的决心与坚持的勇气。

收集

GTD 的启动是从收集开始的,一切事情皆在被收集的清单内(没看错,是一切事物)。书中认为第一次收集一般花费 3-6 个小时,需要将一切事物,桌子上的、抽屉里的、角落里的,所有的东西都放在收集篮,而是只是单纯的放进收集篮子,并不做任何处理,处理这个工作放在后面。

而收集这个动作自 GTD 实践开始,就不会停止,而且实时进行,任何的灵光一闪,任何的想到的事情,都需要收集起来。

我恰好是在工作最混乱的时候启用 GTD 的(17 年 9 月左右),当时并没有完全理解 GTD,而是比着葫芦画瓢,将所有想做的事情扔到收件箱里,并每次做事情的时候从收件箱找找能做的事情。

这是一种非常糟糕的实践,但是却有用,因为我知道我所有要做的事情,开会确定的效果,吃饭时想到的新实现,洗澡时想到的潜在 bug,全在收件箱里,我不怕遗漏,我信任这个系统,因此不会像无头苍蝇不知出路与莫名烦躁。

收集的意义就在于把所有突然想到的事情记录下来,让这件事情在变成事故前完成,这样就可以大量减少救火的情况,而且一个良好的收集系统会让人产生安全感,我可以依赖这个系统,不怕自己遗忘,就像一个外脑。

清理

在收集的时候避免任何进一步思考和执行,而是把思考的阶段放在清理阶段。

清理阶段非常简单,就是把收集阶段的东西进行清理与梳理,当处理一件事情时,可以采取以下的选择:

  1. 该事情没啥意义:扔到垃圾箱
  2. 该事情两分钟内就能做完:立马去做
  3. 该事情两分钟内做不完:推迟、委托他人

要明白为什么这样选择首先要明白两点:

  1. 清理是什么
  2. 两分钟原则

清理是什么?收件箱其实就是一个未处理清单,一堆从没有思考过是什么,怎么做,目标是什么的三无任务清单,清理就是将这堆任务想清楚,要不要做,要做成什么样子,要怎么做,并把收件箱清空。

因此,对于没有意义的事情,应该立即丢弃,而把有用的保留下来。不能让 GTD 系统中存在任何垃圾,毕竟垃圾越多意味着越低效。

两分钟原则是清理阶段的执行方式,如果一件事两分钟就可以解决,那就不要让这件事流入后面的处理阶段,因为一件事在 GTD 中经过的阶段越长,这件事的执行效率越低,一般一件事从收集后,经过清理,组织,回顾需要 3 分钟,而这件事本身不过两分钟就可以完成,那么把这件事在清理阶段就完成的,不要再拖延了。

我曾经疑惑为什么超过两分钟的事情就要 delay 呢?因为 清理 本身就是一件事情,如果在清理的时候就执行非常耗时的任务,就会中断清理,从而进入了不同从收件箱拿任务执行的恶性循环,从而让收件箱里的垃圾越来越多,最后放弃 GTD。因此我们要确保清理这件事情本身能成功完成,而耗时任务以后再说,哪怕是等清理完之后再说。

清理是对事情初步思考的环节,将收件箱里的垃圾丢掉,简单的事情完成,复杂的事情简单思考,那么剩下的事情,就是那些需要好好思考的任务了,而这些任务可以流入下一个环节。

组织

无论是小到写篇关于 GTD 的博客,还是大到计划买一套房,无论是今天下午就要着手做的事情,还是未来两年内完成的事情,只要这件事又完成的意义,就会流入组织环节。

虽然书中使用了 “组织” 一词,不过我认为 “梳理” 更适合。因为组织是在做两件事情:

  1. 梳理清单,分门别类
  2. 梳理任务,想清楚怎么执行

如果这件事近期不需要执行,而且你也不知道什么时候会执行,比如要看到书单,要学一门外语,那么可以把这件事放到 “也许、将来” 清单。如果这些事情需要在家完成,那么可以将这件事归到 “在家” 清单。将事情分门别类,可以方便下一步的执行,比如在家的时候,可以看一下 “在家” 这个清单,而书荒的时候,可以去翻翻看 “也许、将来” 里的书单。

清单的分类一般按照地点或客观条件,比如 “公司”、“在家”、“杂货店”、“需要电脑”,也可能会依赖某个人:“老板”,“女朋友”。这些都可以成为清单的梳理规则,考虑到清单是执行和回顾的依据,可以怎么有利于执行和回顾怎么分类。

除了梳理清单,还有把 “任务” 变为 “项目”,任务指的是我们在收件箱收集的事项,比如:“在卧室中安装空气净化器”。而这种任务完全没有可执行力,因此我们需要 “梳理任务”,想清楚下一步怎么做,比如:“调研有哪些空气净化器性价比高”。

回顾

如果一件事被收集到了 GTD 系统,而这个体系并没有经常更新,从而出现了遗忘的事项,那么你就不会再信任这个系统,从而又一次放弃 GTD。

因此,当一件件事被收集清理组织之后,会产生一个又一个的清单,如果清单没有被更新,这就和我们把要做的随手一记一样,并没有实质改观。因此我们需要定期回顾清单,比如曾经记下:“学习日语”,那么在回顾的时候你会又想起这件事情,说不定这次你就会考虑下怎么去做。

而有的项目因为没有更新,可能已经没有了执行的必要,或者有新的步骤需要添加,这时候在回顾环节可以更新这些项目。

回顾是更新整个系统的重要一环,也是防止整个系统不会落后于现实的保险。而作者建议在最后一个工作日拿出下班前的一到两个小时来回顾,以防止遗忘当周没有完成的事项。

执行

这里的执行分两种,一个是 GTD 整个流程的执行和 GTD 中托管的任务的执行。

GTD 四步走包括:收集,清理,组织,回顾。其中收集是无时无刻执行的,只要想起来就可以向收件箱中添加一项,因为有手机等移动设备,其实只要养成习惯,基本就没有耗时的感觉。而回顾是定期执行的,比如周五下班前,那么我们只需要设定一个闹钟或者提醒事项,提前预留出时间,或者稍微加一下班,也能让这个环节执行完。而 GTD 比较麻烦就是清理和组织。

清理和组织环节是比较消耗时间和脑力的,我们要把事情想清楚并归类,甚至还要想清楚下一步怎么执行,作者说他一般是在出差路上完成的,对于长期出差的商务人士,当然是个不错的选择。而对于打工仔,我一般是在饭间完成的,我更建议能经常拿出时间来清理和组织,尤其是组织,因为组织环节会梳理好下一步清单,而再次审视要执行的下一步清单,往往会产生改进,并获得更优的下一步行动列表。不过要注意的是,这两个环节需要脑力,在不清醒的时候并不适合这样的行动。

在执行任务上,人和计算机最大的不同点是人非常不擅长进程(正在执行的事情)切换,如果有新的事情插进来,往往会花很多事情来回忆(已经完成了啥)和思考(现在要做啥),就和每个进程都有一个上下文一样,任何任务也同样需要整个上下文,而 GTD 的下一步列表其实就是充当上下文的记录的作用。

因此,对于事情的事情就是在下一步清单中找到可以做的事情,并完成它,在执行的时候不应该产生 “我该怎么做”,“我已经做了啥” 这样的疑问,如果有的话可能还需要反省 GTD 的实践。

任务的时间点

在 GTD 中好像没有提到 “每日任务” 这样带有时间点的任务,因为在 GTD 中不建议,或者说反对设定每日任务,因为生活充满变数,人们经常完不成每日任务,而在多次把 “今日完成” 移动到 “明日完成” 之后,人们就会对这个日常管理系统失望并不再信任。

GTD 中认为一件事情分为两种:

  1. ASAP(As Soon As Possible,越快越好)的任务
  2. 有时间点的任务

第一类任务比如:“写一篇博客”,第二类比如:“明天 9 点预约了牙医”。

而 GTD 的下一步清单维护的其实只是第一种,而第二种才应该放在日程表中,这样就可以在指定日期提醒自己这件事在今天必须要做。而且日程表中的任务是比下一步清单中的任务要高的。

GTD 原则

GTD 是一套复杂的实践,我在尝试把它简化(方便执行和安利给别人),总结为:收集习惯下一步清单关注结果定期回顾四个原则。

收集习惯

收集事无巨细,应该养成一种习惯,任何灵光一闪,或者将要提醒自己的事情,都要被收集。

下一步清单

收集的事情要被清理和整理,我们要明确每件事的下一步是什么,并总结出下一步清单,而剩下的就是无脑执行下一步清单里的内容。

关注结果

所有的事情都是可量化的,我们要想清楚这件事我们的目标是什么,想要达成的结果是什么,只有下一步可量化,我们才有执行和推进事情的依据。

定期回顾

定期回顾是保证 GTD 系统是最新有效的保险,定期的回顾非常重要,更新过去的想法,添加新的想法。

总结

作者在最后提到,很多人认为如此繁琐的步骤是浪费时间,只是让行动的效率变得更低下。那是因为还没完全依赖这个系统,毕竟如果考虑到全盘依赖大脑时,每次切换事情都要思考:“我之前做了啥”,“我现在应该做啥”,“我要做到什么程度” 所消耗的时间其实是更多的。

GTD 的一些思想在软件行业好像已经有了广泛应用,这一套收集、清理、组织、回顾,如果我们把 “task” 这个词换为 “user story”,把清单换成泳道是不是仿佛很熟悉。

无论是个人,还是团队,时间管理其实并不能获得更多时间,相反会消耗时间在管理本身上,但这道流程的存在,能防止事情失控,一切尽在掌控之中。


附图一张,在星巴克写了一下午,写完才发现被冷风吹的手脚冰凉。

2018/4/30 posted in  坏笔记不如好记性

非 flask 程序实现 Flask Shell

2018/4/29 posted in  Python 黑魔法

Python 的并发编程

这篇文章将讲解 Python 并发编程的基本操作。并发和并行是对孪生兄弟,概念经常混淆。并发是指能够多任务处理,并行则是是能够同时多任务处理。Erlang 之父 Joe Armstrong 有一张非常有趣的图说明这两个概念:

我个人更喜欢的一种说法是:并发是宏观并行而微观串行。

GIL

虽然 Python 自带了很好的类库支持多线程/进程编程,但众所周知,因为 GIL 的存在,Python 很难做好真正的并行。

GIL 指全局解释器锁,对于 GIL 的介绍:

全局解释器锁(英语:Global Interpreter Lock,缩写GIL),是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。
- 维基百科

其实与其说 GIL 是 Python 解释器的限制,不如说是 CPython 的限制,因为 Python 为了保障性能,底层大多使用 C 实现的,而 CPython 的内存管理并不是线程安全的,为了保障整体的线程安全,解释器便禁止多线程的并行执行。

因为 Python 社区认为操作系统的线程调度已经非常成熟了,没有必要自己再实现一遍,因此 Python 的线程切换基本是依赖操作系统,在实际的使用中,对于单核 CPU,GIL 并没有太大的影响,但对于多核 CPU 却引入了线程颠簸(thrashing)问题。

线程颠簸是指作为单一资源的 GIL 锁,在被多核心竞争强占时资源额外消耗的现象。

比如下图,线程1 在释放 GIL 锁后,操作系统唤醒了 线程2,并将 线程2 分配给 核心2 执行,但是如果此时 线程2 却没有成功获得 GIL 锁,只能再次被挂起。此时切换线程、切换上下文的资源都将白白浪费。

因此,Python 多线程程序在多核 CPU 机器下的性能不一定比单核高。那么如果是计算密集型的程序,一般还是考虑用 C 重写关键部分,或者使用多进程避开 GIL。

多线程

在 Python 中使用多线程,有 threadthreading 可供原则,thread 提供了低级别的、原始的线程以及一个简单的锁,因为 thread 过于简陋,线程管理容易出现人为失误,因此官方更建议使用 threading,而 threading 也不过是对 thread 的封装和补充。(Python3 中 thread 被改名为 _thread)。

在 Python 中创建线程非常简单:

import time
import threading


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(1)
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 创建 task
        tasks.append(threading.Thread(
            target=do_task,
            args=("task_{}".format(i),)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

直接创建线程简单优雅,如果逻辑复杂,也可以通过继承 Thread 基类完成多线程:

import time
import threading


class MyTask(threading.Thread):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(1)
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 创建 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

多进程

在 Python 中,可以使用 multiprocessing 库来实现多进程编程,和多线程一样,有两种方法可以使用多进程编程。

直接创建进程:

import time
import random
import multiprocessing


def do_something(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 创建 task
        tasks.append(multiprocessing.Process(
            target=do_something,
            args=("task_{}".format(i),)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

继承进程父类:

import time
import random
import multiprocessing


class MyTask(multiprocessing.Process):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(random.randint(1, 5))
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 创建 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

multiprocessing 除了常用的多进程编程外,我认为它最大的意义在于提供了一套规范,在该库下有一个 dummy 模块,即 multiprocessing.dummy,里面对 threading 进行封装,提供了和 multiprocessing 相同 API 的线程实现,换句话说,class::multiprocessing.Process 提供的是进程任务类,而 class::multiprocessing.dummy.Process,也正是有 multiprocessing.dummy 的存在,可以快速的讲一个多进程程序改为多线程:

import time
import random
from multiprocessing.dummy import Process


class MyTask(Process):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(random.randint(1, 5))
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 创建 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

无论是多线程还是多进程编程,这也是我一般会选择 multiprocessing 的原因。

除了直接创建进程,还可以用进程池(或者 multiprocessing.dummy 里的进程池):

import time
import random
from multiprocessing import Pool


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(0, 10):
        #     创建 task
        pool.apply_async(do_task, ("task_{}".format(i),))
    pool.close()
    pool.join()
    print("Finish.")

线程池:

import time
import random
from multiprocessing.dummy import Pool


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(0, 10):
        #     创建 task
        pool.apply_async(do_task, ("task_{}".format(i),))
    pool.close()
    pool.join()
    print("Finish.")

这里示例有个问题,pool 在 join 前需要 close 掉,否则就会抛出异常,不过 Python 之禅的作者 Tim Peters 给出解释:

As to Pool.close(), you should call that when - and only when - you're never going to submit more work to the Pool instance. So Pool.close() is typically called when the parallelizable part of your main program is finished. Then the worker processes will terminate when all work already assigned has completed.

It's also excellent practice to call Pool.join() to wait for the worker processes to terminate. Among other reasons, there's often no good way to report exceptions in parallelized code (exceptions occur in a context only vaguely related to what your main program is doing), and Pool.join() provides a synchronization point that can report some exceptions that occurred in worker processes that you'd otherwise never see.

同步原语

在多进程编程中,因为进程间的资源隔离,不需要考虑内存的线程安全问题,而在多线程编程中便需要同步原语来保存线程安全,因为 Python 是一门简单的语言,很多操作都是封装的操作系统 API,因此支持的同步原语蛮全,但这里只写两种常见的同步原语:锁和信号量。

通过使用锁可以用来保护一段内存空间,而信号量可以被多个线程共享。

threading 中可以看到 Lock 锁和 RLock 重用锁两种锁,区别如名。这两种锁都只能被一个线程拥有,第一种锁只能被获得一次,而重用锁可以被多次获得,但也需要同样次数的释放才能真正的释放。

当多个线程对同一块内存空间同时进行修改的时候,经常遇到奇怪的问题:

import time
import random
from threading import Thread, Lock

count = 0


def do_task():
    global count
    time.sleep(random.randint(1, 10) * 0.1)
    tmp = count
    tmp += 1
    time.sleep(random.randint(1, 10) * 0.1)
    count = tmp
    print(count)


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish. Count = {}".format(count))

如上就是典型的非线程安全导致 count 没有达到预期的效果。而通过锁便可以控制某一段代码,或者说某段内存空间的访问:

import time
import random
from threading import Thread, Lock

count = 0
lock = Lock()


def do_task():
    lock.acquire()
    global count
    time.sleep(random.randint(1, 10) * 0.1)
    tmp = count
    tmp += 1
    time.sleep(random.randint(1, 10) * 0.1)
    count = tmp
    print(count)
    lock.release()


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish. Count = {}".format(count))

当然,上述例子非常暴力,直接强行把并发改为串行。

对于信号量常见于有限资源强占的场景,可以定义固定大小的信号量供多个线程获取或者释放,从而控制线程的任务执行,比如下面的例子,控制最多有 5 个任务在执行:

import time
import random
from threading import Thread, BoundedSemaphore

sep = BoundedSemaphore(5)


def do_task(task_name):
    sep.acquire()
    print("do Task: {}".format(task_name))
    time.sleep(random.randint(1, 10))
    sep.release()


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task, args=("task_{}".format(i),)))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish.")

Queue 和 Pipe

因为多进程的内存隔离,不会存在内存竞争的问题。但同时,多个进程间的数据共享成为了新的问题,而进程间通信常见:队列,管道,信号。

这里只讲解队列和管道。

队列常见于双进程模型,一般用作生产者-消费者模式,由生产者进程向队列中发布任务,并由消费者从队列首部拿出任务进行执行:

import time
from multiprocessing import Process, Queue


class Task1(Process):
    def __init__(self, queue):
        super(Task1, self).__init__()
        self.queue = queue

    def run(self):
        item = self.queue.get()
        print("get item: [{}]".format(item))


class Task2(Process):
    def __init__(self, queue):
        super(Task2, self).__init__()
        self.queue = queue

    def run(self):
        print("put item: [Hello]")
        time.sleep(1)
        self.queue.put("Hello")


if __name__ == "__main__":
    queue = Queue()
    t1 = Task1(queue)
    t2 = Task2(queue)
    t1.start()
    t2.start()
    t1.join()
    print("Finish.")

理论上每个进程都可以向队列里的读或者写,可以认为队列是半双工路线。但是往往只有特定的读进程(比如消费者)和写进程(比如生产者),尽管这些进程只是开发者自己定义的。

而 Pipe 更像一个全工路线:

import time
from multiprocessing import Process, Pipe


class Task1(Process):
    def __init__(self, pipe):
        super(Task1, self).__init__()
        self.pipe = pipe

    def run(self):
        item = self.pipe.recv()
        print("Task1: recv item: [{}]".format(item))
        print("Task1: send item: [Hi]")
        self.pipe.send("Hi")


class Task2(Process):
    def __init__(self, pipe):
        super(Task2, self).__init__()
        self.pipe = pipe

    def run(self):
        print("Task2: send item: [Hello]")
        time.sleep(1)
        self.pipe.send("Hello")
        time.sleep(1)
        item = self.pipe.recv()
        print("Task2: recv item: [{}]".format(item))


if __name__ == "__main__":
    pipe = Pipe()
    t1 = Task1(pipe[0])
    t2 = Task2(pipe[1])
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("Finish.")

除了上面介绍的 threadingmultiprocessing 两个库外,还有一个好用的令人发指的库 concurrent.futures。和前面两个库不同,这个库是更高等级的抽象,隐藏了很多底层的东西,但也因此非常好用。用官方的例子:

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

该库中自带了进程池和线程池,可以通过上下文管理器来管理,而且对于异步任务执行完后,结果的获得也非常简单。再拿一个官方的多进程计算的例子作为结束:

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()
2018/1/18 posted in  Python 黑魔法