入门向:Istio 与服务网格

近期,服务网格(Service Mesh)越加流行红火,各类社区讨论也层出不穷。面对如此火热的技术,我们不免有些疑问:服务网格究竟是什么,服务网格解决了什么?本文尝试简单讲解服务网格的架构设计,并介绍其流行解决方案 Istio。

从分布式系统聊起

现代的应用已经很少采用单体架构了,当分布式架构成为主流,系统组件间的网络调用变成了自然而然的问题。

当然服务数目比较少的时候,服务可以通过配置文件来记录其他服务的网络位置,进行调用时只需要读取配置即可,但当系统越来越庞大、自动化程度越来越高,配置文件这种方式便会成为一种负担。因此会引入 服务中心 来统一管理所有的服务,类似一个系统级的 DNS,来帮助某个服务来找到所依赖的服务。

上图便是一种常见的服务中心流程,Spring 全家桶中的 Eureka 便是采取这种模式。这种模式的特征比较明显:1. 服务会自注册,2. 服务主动去 Service Name System 中查询其他服务的地址。换句话说,服务是知道有服务中心存在的,并且有部分逻辑会侵入代码。

当然,还有不侵入代码的架构方式,就是把服务的注册、发现下沉到基础设施,在宿主机上运行代理进程,服务通过代理对其他组件发起访问。

如上图,服务本身可能并不知道服务中心或者代理的存在,但是整个系统依然拥有了服务注册、服务发现的能力。

服务网格的网格

说起最能体现服务网格 “样子” 的图片,肯定是这一张:

绿色的部分就是我们自己定义的服务,而蓝色的部分,便是 Sidecar。其工作原理,类似于上面提到的第二种服务发现模式,不过是高配版,因为并非在宿主机部署 Proxy,而是每个服务都拥有自己的 Proxy(Sidecar)。

但也只有 Proxy 是不够的,还需要一个 Service Name System,服务网格仅有 Sidecar 也是不够的,还需要一个控制平面:

只不过控制平面并不只是作为注册中心,还有很多强大的功能,下面,我便以具体的服务网格解决方案:Istio 来介绍。

Istio

Istio 服务网格逻辑上分为 数据平面控制平面

  • 数据平面 由一组以 sidecar 方式部署的智能代理组成。这些代理可以调节和控制微服务及 Mixer 之间所有的网络通信。
  • 控制平面 负责管理和配置代理来路由流量。此外控制平面配置 Mixer 以实施策略和收集遥测数据。

官方推荐使用 Envoy 作为 Sidecar。Envoy 是一个 C++ 写的高性能代理,根据官方描述,Envoy 具有动态服务发现、负载均衡、多协议支持等优点,如果是通过 Kubernetes 部署,只需要将 Envoy 和业务服务放在同一个 Pod,经过简单配置,便可接入网格。

Istio 的控制平面采用方便拓展的设计结构,主要由 Pilot、Mixer、Citadel 组件组成,并可以根据自己的需求插拔或者拓展。

Pilot 起到了前文提到的 Service Name System 的作用,担当服务发现、智能路由、流量管控的大任。

Mixer 主要的作用检查和遥测,比如前置条件检查(如认证、白名单等),配额检查(如判断服务的访问频率是否超标等),监控(如链路追踪、日志等)。

也正是因为 Mixer 在整个网络中起到了无微不至的大管家角色,在被人诟病性能问题时首当其冲。不过好在 Mixer 是一个比较独立的组件,如果系统已经有自己比较完善的监控、认证方案,也可以不启用 Mixer。

Istio 中,所有服务间的通讯全部是经过 Sidecar 的,而 Citadel 便是负责两个服务间通讯的安全问题,其提供了终端用户认证、流量加密的能力。

Istio 的组件比较简单,但也就是其简单的架构,帮我们完成和掩盖了大量复杂的事情。当然,Istio 并不是唯一的选择,老牌的 Linkerd,华为、阿里根据自己的需求改进并开源的 SM 解决方案,都是很不错的选择。

如何看待服务网格

如何看待当下火热的服务网格呢?在此之前,我们先看它为我们解决了什么?

  1. 流量管理
  2. 安全
  3. 可观测性

那除此之外,对使用者来说,又带来了什么隐患?

  1. 架构变复杂,运维难度升级
  2. 需要去了解,有一定的学习成本
  3. 每次请求都加了两跳,排错困难、性能隐患

因此,如果引入服务网格不是为了解决当前面临的问题,就没有引入的必要,还是那句老话:“如无必要勿增实体”,何况这个实体还是个黑盒子。不过,如果引入服务网格是为了解决当前的问题,那需要想清楚自己是否承受的起上面提到的隐患,毕竟没有东西会是银弹。

参考

2018/12/11 posted in  Service Mesh

浅谈 WSGI

WSGI 是 Python Web 开发中经常提到的名词,在维基百科中,定义如下:

Web服务器网关接口(Python Web Server Gateway Interface,缩写为WSGI)是为Python语言定义的Web服务器和Web应用程序或框架之间的一种简单而通用的接口。自从WSGI被开发出来以后,许多其它语言中也出现了类似接口。

正如定义,WSGI 不是服务器,不是 API,不是 Python 模块,而是一种规定服务器和客户端交互的 接口规范

WSGI 目标是在 Web 服务器和 Web 框架层之间提供一个通用的 API 标准,减少之间的互操作性并形成统一的调用方式。根据这个定义,满足 WSGI 的 Web 服务器会将两个固定参数传入 WSGI APP:环境变量字典和一个初始化 Response 的可调用对象。而 WSGI APP 会处理请求并返回一个可迭代对象。

WSGI APP

根据定义,我们可以实现一个非常简单的满足 WSGI 的 App:

def demo_wsgi_app(environ, start_response):
    status = '200 OK'
    headers = [('Content-type', 'text/plain')]
    start_response(status, headers)
    yield "Hello World!"

可以看到,该 App 通过 start_response 初始化请求,并通过 yield 将 body 返回。除了 yield,也可以直接返回一个可迭代对象。

在标准库 wsgiref 中已经包含了一个简单的 WSGI APP,可以在 wsgiref.simple_server 中找到,可以看到,这也是在做相同的事情:

def demo_app(environ,start_response):
    from io import StringIO
    stdout = StringIO()
    print("Hello world!", file=stdout)
    print(file=stdout)
    h = sorted(environ.items())
    for k,v in h:
        print(k,'=',repr(v), file=stdout)
    start_response("200 OK", [('Content-Type','text/plain; charset=utf-8')])
    return [stdout.getvalue().encode("utf-8")]

将这个 App 运行起来如下:

在 Django 中,可以在默认 app 下的 wsgi.py 中找到 get_wsgi_application,Django 通过这个方法创建并返回了一个 WSGIHandle,其本质,依然是一个 WSGI APP,可以看其 __call__ 方法:

class WSGIHandler(base.BaseHandler):
    request_class = WSGIRequest

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.load_middleware()

    def __call__(self, environ, start_response):
        set_script_prefix(get_script_name(environ))
        signals.request_started.send(sender=self.__class__, environ=environ)
        request = self.request_class(environ)
        response = self.get_response(request)

        response._handler_class = self.__class__

        status = '%d %s' % (response.status_code, response.reason_phrase)
        response_headers = list(response.items())
        for c in response.cookies.values():
            response_headers.append(('Set-Cookie', c.output(header='')))
        start_response(status, response_headers)
        if getattr(response, 'file_to_stream', None) is not None and environ.get('wsgi.file_wrapper'):
            response = environ['wsgi.file_wrapper'](response.file_to_stream)
        return response

WSGI 服务器

从 WSGI APP 的写法上就基本能推测出 WSGI 服务器做了什么,因此可以尝试实现一个简陋的 WSGI 服务器:

def run_wsgi_app(app, environ):
    from io import StringIO
    body = StringIO()

    def start_response(status, headers):
        body.write('Status: {}\r\n'.format(status))
        for header in headers:
            body.write("{}: {}".format(*header))
        return body.write

    iterable = app(environ, start_response)
    try:
        if not body.getvalue():
            raise RuntimeError("No exec start_response")
        body.write("\r\n{}\r\n".format('\r\n'.join(line for line in iterable)))
    finally:
        if hasattr(iterable, "close") and callable(iterable.close):
            iterable.close()
    # 这里瞎扯
    return body.getvalue()

对于真正(可用)的 WSGI 服务器,常用的比如 Gunicorn,在不同的 Worker(gunicorn.worker 模块中)中,都实现了一个叫 handle_request 的类方法,这个方法便是调用 WSGI APP,并完成 Response 拼装的,虽然不同的 Worker 的实现略有差异,但比较共通的代码:

respiter = self.wsgi(environ, resp.start_response)
try:
    if isinstance(respiter, environ['wsgi.file_wrapper']):
        resp.write_file(respiter)
    else:
        for item in respiter:
            resp.write(item)
    resp.close()
    request_time = datetime.now() - request_start
    self.log.access(resp, req, environ, request_time)
finally:
    if hasattr(respiter, "close"):
        respiter.close()

这段代码便是调用 WSGI APP,并通过循环把 Body 写入到 resp 中。

中间件

因为 WSGI 的定义方式,可以写多个 WSGI APP 进行嵌套并处理不同的逻辑,比如:

def first_wsgi_app(environ, start_response):
    import logging
    logging.info("new request")
    rsp = second_wsgi_app(environ, start_response)
    logging.info("finish request")
    return rsp


def second_wsgi_app(environ, start_response):
    if environ.get("HTTP_ROLE") == "ADMIN":
        return third_wsgi_app(environ, start_response)
    status = '200 OK'
    headers = [('Content-type', 'text/plain')]
    start_response(status, headers)
    yield "Hello User!"


def third_wsgi_app(environ, start_response):
    status = '200 OK'
    headers = [('Content-type', 'text/plain')]
    start_response(status, headers)
    yield "Hello Admin!"

这时候我们把第一个 WSGI APP first_wsgi_app 传给 Server。在执行时,first_wsgi_app 可以完成日志的记录,second_wsgi_app 可以完成鉴权,third_wsgi_app 来真正的处理请求。

这种 App 的洋葱结构,被正伦亲切的称为俄罗斯套娃。

2018/9/21 posted in  Python 黑魔法

Linux 系统调用

内核提供了用户进程和内核进行交互的一组接口,这些接口在应用程序和内核之间扮演了使者的角色,应用程序发出各种请求,而内核负责满足这些请求,而这些接口,即是 系统调用

作用

系统调用主要起到保护系统稳定可靠运行的作用,避免应用程序肆意妄为。

除此,系统调用还起到了为用户空间进程与硬件设备之间添加一个中间层的作用,该层的主要作用三个:

  1. 为用户空间提供一种硬件的抽象接口,使用户进程不用区分硬件类型
  2. 使得内核可以基于权限、用户累心和其他一些规则对需要进行的访问进行判断
  3. 为用户空间和系统的其余部分提供公共接口

工作原理

一般情况下,应用程序通过在用户空间实现的应用编程接口(API)而不是直接通过系统调用来编程。在 Unix 中,最流行的应用编程接口是基于 POSIX 标准的,所以 Liunx的系统调用像绝大多数 Unix 一样,作为 C 库的一部分提供:

关于 Unix 的接口设计中有一句格言:“提供机制而不是策略”。换句话说,Unix 的系统调用抽象出了用于完成某种确定的目的的函数,至于这些函数怎么用完全不需要被内核关心。

要访问系统调用(在 Linux 被称为 syscall),通常通过 C 库中定义的函数调用来进行。通常系统调用可以定义零个到多个参数,也会返回一个 long 来标记执行情况或者遇到的错误,而具体的错误信息会写在 error 全局变量中,通过 perror 库函数便可以打印出可读的错误信息。

在 Linux 中每个系统调用都被赋予一个系统调用号(存储在 sys_call_table 中),这样通过独一无二的编号就可以关联系统调用使用户进程不需要提及系统调用的名称。

Linux 系统的系统调用比其他操作系统执行要快的多,首先是因为 Linux 很短的上下文切换时间,其次就是 Linux 的系统调用本身就被设计的非常简洁。

用户空间进程通过系统调用来 “执行” 内核代码,其通知内核的机制是通过软中断实现的: 通过引发一个异常来促使系统切换到内核态去执行异常处理程序 (用户空间引发异常陷入内核)。

在使用系统调用时,除了调用号以外,一般还需要一些外部的参数输入,所以在陷入内核的时候,需要把这些参数从用户空间传入到内核,而最简单的方法就是像传递系统调用号一样,把这些参数也存放在寄存器中。

2018/7/8 posted in  Kernel

Linux 进程调度

在参与面试的时候,Linux 的进程应该算是我的必问题目,因为很多语言并没有去实现一套进程管理和调度,而是直接借用了操作系统的能力,尤其 Python 社区认为这方面没必要重复造轮子,而使用了系统调用的简单封装。因此,如果对 Linux 进程不甚了解,很难真的了解自己写的并发代码到底会发生什么。

调度程序是内核中确保进程能有效工作的子系统,负责决定将哪个进程投入运行,何时运行以及运行多少时间。调度程序的合理调度,是将系统资源最大限度发挥的保证。

从 1991 年 Linux 的初版直到 2.4 版本的内核,Linux 的调度程序都相当简陋,设计非常原始。但在 2.5 的内核中,调度程序采用了一种叫做 O(1) 的调度程序(因为其调度算法的时间复杂度为 O(1))。但是 O(1) 调度程序对时间敏感、需要用户交互的程序先天不足,表现欠佳(也许因为这个原因早期 Linux 桌面版推进缓慢),在 2.6 内核的开发时便特意对交互类程序优化,并引入了新的调度算法,其中最著名的即是 翻转楼梯最后期限调度算法(Rotating Staircase Deadline scheduler,RSDL)。直到 2.6.23 内核中,该算法彻底地替换掉了 O(1) 算法,并被称为 完全公平调度算法,即 CFS。本文将重点介绍 O(1) 算法和 CFS 算法,并简单讲下 CFS 是如何实现的。

多任务与调度基本概念

Linux 能同时并发的交互执行多个进程的多任务操作系统。在多核处理器机器上,多任务操作系统能使多个进程在不同处理器上真正的并行执行,而在单核处理器机器上,只是产生多个进程同时执行的幻觉。无论是单核还是多核,操作系统都能使得多个进程处于阻塞或者睡眠状态,只将适合执行的进程交给处理器执行。

多任务系统一般分为 非抢占式多任务抢占式多任务。而 Linux 属于后者,也就是 由调度程序来决定什么时候停止一个进程的运行,以便其他进程能够得到执行机会 。其中,这个强制挂起的动作,即是抢占(preemption),而进程在被抢占之前能够运行的时间是预先设置好的,而且有一个专门的名字,叫进程的时间片(timeslice)。

时间片是实际分给每个 可运行进程 的处理器时间段,很多操作系统都采用了动态时间片计算方式,也就是说分给进程的时间片具体是多少绝对时间是根据机器的负载动态计算的,而不是指定完就不再变化的。不过,Linux 的调度程序本身并没有通过分配时间片来达到公平的调度。

调度程序的精髓在于调度算法,算法策略决定调度程序在何时让什么进程执行,那么我们先提几个实际的问题来理解调度算法要做的事情。

IO 密集型和 CPU 密集型

进程可以被分为 IO 密集型和 CPU 密集型。前者的大部分时间用来提交 IO 请求或者等待 IO 请求,这样的进程经常处于运行的状态,但是通常每次都会运行短短的一会儿,因为它在等待更多的 IO 请求时最后总会阻塞。相反,CPU 密集型把时间大多用在执行代码上,除非被抢占,否则会一直执行下去,因为它们没有太大的 IO 请求或者没有太多可能被阻塞。而操作系统为了响应速度的考虑,调度器并不会让它们经常运行。

因为系统需要考虑到响应速度,调度器只需要保证不让 CPU 密集型长时间运行就可以,但有些程序不完全符合这两种分类,比如 Word 之类的办公软件,经常等待键盘输入(等待 IO),而在任意时刻,又可能粘住处理器疯狂的处理拼写检查和宏计算。

怎么让调度策略在两个矛盾的目标中间寻找平衡:响应迅速和最大系统利用率。不同的系统有不同的解决策略,但通常是一套复杂的算法,但大部分算法都无法保证低优先级的进程能被公平对待,而 Linux 的 CFS 基本解决这个问题(进程数量不会巨大的情况下)。

进程优先级问题

调度算法中最基本的一类就是基于优先级的调度,这是一种根据进程的价值和其他对处理器需求分级的想法。而一般分为两种,一种是优先级高的先运行,低的后运行,相同优先级的进程按轮转的方式进行调度。另一种是优先级高的进程使用的时间片比较长,调度程序总选择时间片未用尽而且优先级最高的进程运行。

Linux 采用了两种不同的优先级范围,第一种是用 nice 值,它的范围是 -20 ~ +19,默认为 0,而且 nice 值越高优先级越低。相对低优先级(高 nice 值)的进程,高优先级(低 nice 值)的进程可以获得更多的处理器时间。但对这个时间的分配,各个系统采用不同的方法,比如 macOS 采用了时间片的绝对值,而 Linux 中使用时间片的比例。

第二种范围是实时优先级,默认值为 0 ~ 99,与 nice 值相反,实时优先级数值越高,优先级越高,任何实时进程的优先级都高于普通进程,也就是说,这两个优先级范围是独立不相交的。而实时进程是 Linux 为有时间有严格要求进程的优待。

时间片

时间片是一个数值,它表明进程在被抢占前所能持续运行的时间。调度策略都会规定一个默认的时间片,但对于默认时间片的确定并不简单,如果时间片太长,往往就会延迟进程切换,使得系统对交互的响应表现欠佳,如果时间片太短,会明显增大进程切换带来的处理器耗时。

所以,任何时长的时间片都会导致系统表现欠佳,很多操作系统都非常重视这点,因此时间片默认一个比较适中的大小,比如 10ms。在 Linux 中 CFS 并没有使用绝对长度,而是将 CPU 的使用 时间比 分配给了进程,这样一来,Linux 中的进程获得的处理器时间和机器负载密切相关。

Linux 中进程获得的比例还会受到 nice 值影响,从而体现了优先级的作用。在多重条件影响下,内核就可以以时间片为依据来决定进程的运行:如果新的进程消耗的使用比当前执行的进程小,这立马剥夺当前进程的执行权,并把新进程投入运行。

需要用户交互的进程

前面提到,用户交互的进程更注重实时性,因此处理起来非常特别,我们举一个实际的例子来说明 Linux 是怎么处理需要用户交互的进程的。

如果有一个非常消耗 CPU 的进程,比如编译一个巨大的 C 程序,以及一个非常注重交互的记事本等类似的文字处理程序。对于编译进程来说,我们并不在意什么时候运行,早半秒还是晚半秒对我们来说没有差别,但是如果能尽快完成就再好不过了,而对于记事本进程,我们更希望在敲击键盘后立马看到反馈(立马抢占编译进程)。

对于实现上述需求依靠的是系统对记事本分配更高的优先级和更多的时间片(可能涉及到系统自动识别记事本程序需要更高的优先级),但在 Linux 中却采用了非常简单的方式,甚至不需要分配不同的优先级。如果两个进程 nice 值相同,那么这两个进程会分别各获得 50% 的 CPU 时间(Linux 下的时间片是个比例)。因为记事本不停的等待用户的输入,无疑大部分时间将是编译进程在执行,因此,记事本肯定用不到 CPU 的 50% 而编译程序肯定超过 50%。当用户输入的时候,会发现刚被唤醒的记事本使用比小于正在执行的编译进程,便立马抢占,从而快速响应。在记事本完成工作继续等待用户输入时,会被挂起将执行权再次分配给编译进程。

调度算法

上文通过一些简单的概念来描述了调度器做了什么,下面就开始正式讨论调度算法。

调度器类

在讲调度算法之前,需要先说明调度器类。Linux 调度器是以模块的方式提供的,这是为了允许不同类型的进程可以有针对性的选择调度算法。而提供不同调度算法的模块即是调度器类。像 CFS 就是一个针对普通进程的调度器类(定义在 kernel/sched_fair.c 中),在 Linux 中称为 SCHED_NORMAL(在 POSIX 中称为 SCHED_OTHER)。

每个调度器都有一个优先级,内核会先选择优先级最高的调度器,然后由该调度器调度进程并执行。

O(1) 调度算法

在讲 CFS 之前,先介绍下传统的 Unix 调度算法:O(1) 调度算法。现代进程调度器有两个通用的概念: 进程优先级和时间片 ,时间片是指进程运行多少时间,进程创建之后就被赋予一个时间片,优先级更高的进程运行的更频繁,而且往往拥有更多的时间片,这就是 O(1) 调度算法的实质。

很明显,除了让优先级更高的进程可以尽可能抢占之外,O(1) 调度算法还根据优先级来给时间片加权。但是,前面提到,传统的调度算法使用的 绝对的时间长度,这也引起了部分问题,比如有两个不同优先级的进程,一个 nice 值为 0,另一个为 1,那么他们经过加权的时间片长度分别是 100ms 和 95ms,他们的时间片非常接近,但是如果将 nice 值改为 18 和 19,这时他们的时间片变为了 10ms 和 5 ms,这时前者是后者两倍的运行时间,因此,尽管 nice 值只相差 1 但最后的结果却是差别巨大。

因此 CFS 完全摒弃时间片的绝对分配,而是分配处理器的使用比重。

CFS 调度算法

CFS 的出发点基于一个简单的理念: 进程调度的效果应该如同系统具备一个理想中的完美多任务处理器 。在这种系统中,每个进程将获得 1/n 的处理器时间(如果有 n 个进程)。比如我们有两个可运行进程,先运行其中一个 5ms,然后再运行另外一个进程 5ms,如果进程切换够快,那么在 10ms 内仿佛可以同时运行两个进程而且各自使用了处理器一半的能力。

当然这并不现实,首先一个处理器无法真正的同时运行多个任务,而且进程间切换是有损耗的,也无法做到无限快的切换,CFS 采用了折中的做法:让每个进程运行一段时间、循环轮转、选择运行最少的进程作为下一个运行进程,而不再采用分配给每个进程时间片的做法。 CFS 在所有可以运行的进程总数基础上计算出一个进程应该运行多久,而不是依靠 nice 值来计算时间片(nice 值只影响比重而不是绝对值)。

每个进程都按其权重在全部可运行进程中所占比例的 “时间片” 来运行,为了准确的计算时间片,CFS 为完美多任务中的无限小调度周期的近似值设定了一个目标,称为:目标延迟。越小的调度周期带来越好的交互性,同时也越接近完美的多任务(但同时需要更多的切换开销)。举例我们将目标延迟定位 20ms,那么如果有两个同优先级的进程,那么每个进程在被抢占前只能运行 10ms,而如果有 5 个这样的任务,那每个任务只能允许 4ms。

但是,上面这个例子中,如果进程数目非常多,比如超过 20 个,那么每个进程获得运行时间还不到 1ms,甚至可能小于进程切换所消耗的时间。当然,Linux 为了避免这种事情发生,设定了一个底线,被称为最小粒度(一般默认 1ms)。因此,只能说 CFS 在进程数目不巨大的情况下比较公平(一般系统中也就运行几百个进程,这种规模下 CFS 还是非常公平的)。而对于不同优先级的进程中,CFS 也是表现良好,比如目标延迟依然为 20ms,这两个进程一个 nice 为 0,另一个为 5,那么后者的权重将是前者的 1/3,即两个进程分别获得了 15ms(20 * 2/3) 和 5ms(20 * 1/3) 的处理器时间。而如果两个进程的 nice 值分别为 10 和 15,因为权重关系没有改变,因此两个进程依然分别获得 15ms 和 5ms 的处理器时间。所以,nice 值 不在影响调度决策,只有相对值才会影响处理器时间的分配比例

在 CFS 下,任何进程所获得的处理器时间是由它自己和其他所有可运行进程 nice 值的相对差值决定的。nice 值由算数加权变为了几何加权,正是将时间片的绝对值变为了使用比,使得在多进程环境下有了更低的调度延迟。

CFS 的实现

在讨论了种种概念之后,终于到了重点,即 CFS 算法的实现。相关的代码在 kernel/sched_fair.c 中,我们的关注点主要下面四个地方:

  • 运行时间的记录
  • 选择投入运行的进程
  • 调度器的选择
  • 睡眠与唤醒

运行时间的记录

上文讲到,CFS 的核心在于 CPU 的使用比,那么对于进程的运行时间的记录非常重要。多数 Unix 系统,分配一个绝对时间的时间片给进程,当每次系统时钟节拍发生时,时间片都会被减少一个节拍周期。每当一个进程的时间片被减少到 0,他就会被尚未减少到 0 的进程抢占。

但 CFS 并没有绝对的时间片,但它依然需要对每个进程的运行时间记账,以确保每个进程只在公平分配给它的处理器运行时间内运行。而记账的信息会保存其结构体指针 se 在进程的 task_struct 中(参照前一篇文章《你需要了解的 Linux 进程管理》)。

在结构体中,有一个重要的成员变量 vruntime,即是记录了该进程的总运行时间(花在运行上的时间和),而且这个时间经过了加权(优先级、机器负载等因素)。虚拟时间是以 ns 为单位的,因此 vruntime 和系统定时器节拍不再相关。

内核通过定时调用 uodate_curr() 函数(定义在 kernel/sched_fair.c)来更新进程的 vruntime,该函数计算了当前进程的执行时间,并将调用 __uodate_curr() 获得根据机器负载(可运行的进程总数)对运行时间加权后的值,然后将该值与原有的 vruntime 相加获得新的 vruntime

选择投入运行的进程

如果存在上文中描述的 “完美多任务处理器”,那么每个可运行的进程的 vruntime 值应该一致。然而这样的处理器是不存在的,那么 CFS 就会尽力去向这个方向靠拢,即在每次选择进程时,会挑选 vruntime 最小的进程。

CFS 使用红黑树来组织可运行队列,红黑树简称 rbtree,是一种自平衡的二叉搜索树,树上的每一个节点都对应了一个键值,可以通过键值来快速检索节点上的数据。后面我会单独写文章介绍数据结构(数据库索引一般也是基于红黑树)。

CFS 会维护一个包含了所有可运行进程的红黑树(相关代码在 kernel/sched_fair.c 中),节点的键值便是可运行进程的虚拟运行时间,CFS 调度器需要选取下一个运行的进程时,只需要 __pick_next_entity() 方法找到树中虚拟运行时间最小的进程(即搜索树的最左侧的叶子节点,其实为了快速访问这个节点,内核专门拿出了一个全局变量),并将这个进程投入运行。

如果有阻塞的进程满足了等待的条件或者有通过 fork() 创建的新进程,便会通过 enqueue_entity() 方法插入到该红黑树中,为了保证能尽快响应,新插入的节点会和最左叶子节点比较,如果新节点是最左叶子节点,调度器会立马把新进程投入运行。

相反,如果有进程堵塞(或者其他)变为不可运行的状态,调度器会通过 dequeue_entity() 方法将该进程从红黑树中移除,如果移除的是最左节点,这会调用 rb_next() 方法找到新的最左节点。

调度器的选择

正如前面讲到的,内核支持多种调度器,而 CFS 只不过是其中一种。进程调度的主要入口点是定义在 kernel/sched.c 下的函数 schedule(),它完成的事情就是选择一个进程,并将其投入运行,而它的逻辑非常简单:

  1. 调用 pick_next_task() 获得一个 task
  2. 将这个 task 投入运行

而真正和调度器相关的逻辑在 pick_next_task() 中,pick_next_task() 做了下面的事情:

  1. 按照调度类的优先级遍历
  2. 只有该调度类下有可运行进程,立马返回

pick_next_task() 为 CFS 做了一个简单的优化,如果所有的可运行任务都在 CFS 调度器下,就不再遍历其他调度类,而是不停的从 CFS 调度器里拿任务。

睡眠与唤醒

休眠(阻塞)的进程处于一个特殊不可执行的状态,阻塞的原因可能很多,比如等待一个信号,或者等待用户键盘的输入等,无论哪种,内核的操作是相同的:进程把自己标记为休眠状态,从可执行红黑树中移除并放入等待队列,然后调用 schedule() 选择和执行一个其他进程。

上一篇文章讲过,休眠有两种进程状态:TASK_INTERRUPTIBLETASK_UNINTERRUPTIBLE,无论哪种状态,休眠的进程都在同一个等待队列上。

等待队列 是由等待某些事件发生的进程组成的简单链表,当与等待队列相关的事件发生时,队列上的进程会被唤醒,为了避免产生竞争条件,休眠和唤醒的实现不能有批量。但如果简单的实现,有可能导致在判定条件为真后,进程却开始了休眠,那么就会使进程无限期地休眠下去,因此进程按以下处理加入等待队列:

  1. 调用宏 DEFINE_WAIT() 创建一个等待队列的项
  2. 调用 add_wait_queue() 把自己加入到队列中
  3. 调用 prepare_to_wait() 将进程的状态变为 TASK_INTERRUPTIBLETASK_UNINTERRUPTIBLE
  4. 如果状态被设置的是 TASK_INTERRUPTIBLE 则信号唤醒(伪唤醒),以检查并处理信号
  5. 唤醒之后检查等待条件是否为真,是则跳出循环,否则再次调用 schedule() 并一直重复
  6. 跳出循环(条件满足)后,进程将自己设置为 TASK_RUNNING 并调用 finish_wait() 方法把自己移除等待队列

唤醒操作通过 wake_up() 完成,它会唤醒指定的等待队列上的所有进程,wake_up() 的主要逻辑在调用的 try_to_wake_up() 中:

  1. 将进程设置为 TASK_RUNNING
  2. 调用 enqueue_task() 将此进程放入红黑树中
  3. 如果唤醒的进程比当前执行的进程优先级高则立马抢占
  4. 设置 need_resched 标记(标记是否触发重新调度)

不过如上面提到的,因为有伪唤醒,所以进程被唤醒不一定都是因为等待的条件达成。

抢占进程上下文切换

上下文切换是指从一个可执行进程切换到另一个可执行进程。在文章的最后,讲讲进程的上下文切换和抢占的问题。

内核处理的上下文切换的函数 context_switch() 定义在 kernel/sched.c,他基本完成了两件事情:

  1. 调用 switch_mm() 把虚拟内存从上一个进程映射切换到新进程中
  2. 调用 switch_to() 从上一个进程处理器状态切换到新进程的处理器状态(包含栈信息、寄存器信息等)

内核即将返回用户空间的时候,如果 need_resched 被标记,则会导致 schedule() 被调用,此时就会发生用户抢占。因为从内核返回到用户空间的进程知道自己是安全的,它既可以继续执行,也可以选择一个新进程去执行,所以无论是系统调用后还是中断后,进程都可以检查 need_resched 被标记,来判断是否需要重新调用 schedule()。总之,一般用户抢占发生在:

  • 从系统调用返回用户空间
  • 从中断处理程序返回用户空间

因为 Linux 完整的支持内核抢占,所以只要调度是安全的(没有持有锁),内核就可以在任何时间抢占正在执行的任务。Linux 的锁的判断是通过计数实现的,在 thread_info 中引入了 preempt_count 来记录持有的锁,当该值为 0 的时候,则该进程是安全的,而恰好该新进程设置了 need_resched 标记,那么当前进程可以被抢占。

如果内核中的进程被阻塞了,或显式调用了 schedule(),则内核会发生显式的抢占。显式的抢占从来都是受支持的,因为如果一个函数显式的调用 schedule(),说明它自己是清楚可以被安全的抢占。

内核的抢占一般发生在:

  • 中断处理程序正在执行,且返回内核空间之前
  • 内核代码再一次具有可抢占性的时候
  • 如果内核中的任务显式调用 schedule() 的时候
  • 内核中的任务阻塞时
2018/6/3 posted in  Kernel

使用 Flask-RESTPlus 构建生产级应用

本文来自对某项目的实践总结,敏感信息已被隐藏或被 Resource 一词代替。

前几天有人辗转找到公众号,留言询问之前一篇介绍 Flask-RESTPlus 文章的源代码(Flask Api 文档管理与 Swagger 上手),Flask-RESTPlus 虽然看起来非常方便,但在实际编写代码时总有种和当前项目结构冲突的感觉,因此整理之前的一篇改造某项目的总结,分享并探讨最佳实践。

在生成 Swagger 文档上,Flask-RESTPlus 是比较常用的 flask 拓展,但引入该插件需要对项目结构些许调整,如果是从 0 到 1 的新项目,倒也无伤大雅,但是对于已经存在的旧项目,改造还是有一定的工作量的,本文通过总结具体的项目改造,对 Flask-RESTPlus 进一步的讲解,以此总结。

蓝图与 API

在大型 Flask 项目中,为了防止各个模块的依赖混乱,一般通过模块划分,并在 app 工厂方法中统一对各个模块的蓝图进行注册,Flask-RESTPlus 作为 flask 拓展可以通过与 flask app 绑定从而托管注册在 Flask-RESTPlus 的视图,比如官方文档的例子:

app = Flask(__name__)
api = Api(app)

但是这样会架空 flask 自带的蓝图,如果是新项目的话可以考虑使用 Flask-RESTPlus 的 Namespace 替代,但是如果是老项目迁移,成本还是蛮高的,因此可以将 蓝图与 Flask-RESTPlus Api 绑定,这样既保留了原有的模块划分,还可以利用 Namespace 进行更细致的逻辑划分。

比如对于当然项目来说,其中有多个 blueprint,来分割相对独立的模块,我们拿 Resource 模块举例,通过 flask 的蓝图对大模块进行划分之后,再通过 Namespace 对细节再次划分:

desc = """
resource type 1, type 2, type 3, type 4, type 5 api
"""

resource_blueprint = Blueprint("Resource", __name__)
api = Api(resource_blueprint, version='1.0', title='Resource info',
          description=desc)
api.add_namespace(resource_type_1_api)
api.add_namespace(resource_type_2_api)
api.add_namespace(resource_type_3_api)
api.add_namespace(resource_type_4_api)
api.add_namespace(resource_type_5_api)

Resource 支持五种不同的类型,虽然这几种类型的 api 同属在一个蓝图里,但是其本身相对独立,因此可以使用 Namespace 做更细致的区分,然后将这五个 namespace 注册到 api 里。因此 blueprints 目录结构如下:

.
├── __init__.py
├── action
│   ├── __init__.py
│   ├── apis.py
│   └── dto.py
├── health.py
├── json_schema.py
└── resource
    ├── __init__.py
    ├── type_1.py
    ├── type_2.py
    ├── type_3.py
    ├── type_4.py
    ├── type_5.py
    └── dto.py
    

参数检查与权限验证

解决了注册问题,还有部分公共设施需要修改,比如参数检查和 api 权限认证。在之前是这样处理的:

@resource_blueprint.route("/", methods=['POST'])
@internal_token_validator
@request_json_validator(SEND_TYPE_1_SCHEMA)
@tracing_span("post_type_1:type_1_api")
def op_type_1():
    pass

token 验证的逻辑写在 internal_token_validator 装饰器中,虽然 Flask-RESTPlus api 类支持注册装饰器,但是因为并不是所有的 api 都需要 token 认证,因此并不能直接注册在其中,但是有认证的 api 比例非常多,依然选择装饰器,那么装饰数量将要突破 6 个而且到处写一样的逻辑非常丑,因此我继承了 Flask-RESTPlus 视图类 Resource,并复写了 dispatch 函数,如果有方法需要 token 认证则动态将 internal_token_validator 装饰器放在 method_decorators 中,而后者会在 Flask-RESTPlus 处理视图方法时调用。

虽然 Flask-RESTPlus 提供了提供了参数验证的功能,但是对我们来讲并不够用(并不强大),而 DCS 中的参数验证一直使用的是 json-schema,在上面的例子中 request_json_validator 装饰器便是处理相关逻辑,该装饰器会将一个 json-schema 规则传入,然后在处理该 api 函数前将 request 中的 json body 验证,如果验证失败便会封装一个友好的 400 Response。

为了方便使用 json-schema 验证,我也将相关逻辑封装了继承的视图基类里,相关代码:

class BaseView(Resource):
    json_schemas = {}
    internal_token_required = ['get', 'post', 'delete', 'put', 'patch']

    def dispatch_request(self, *args, **kwargs):
        from message.common.errors import APIError
        try:
            method = request.method.lower()
            self.validate(self.json_schemas.get(method))
            if method in self.internal_token_required:
                self.method_decorators.append(internal_token_validator)
            return super(BaseView, self).dispatch_request(*args, **kwargs)
        except APIError as e:
            rsp = Response(
                response=json.dumps(e.render()), status=e.status_code,
                mimetype='application/json')
            return rsp

    @staticmethod
    def validate(schema):
        from message.common.errors import APIError
        if not schema:
            return
        data = request.json
        try:
            validate(data, schema)
        except ValidationError as e:
            raise APIError("ARGS_ERROR", e.message, 400)

DTO

最后谈一下导包的问题,在前一篇文章也提到 Flask-RESTPlus 容易产生相互引用, 而工程和 demo 不同,不能通过什么魔法技巧来避免这个问题 ,而应该通过更加细致的模块划分来避免,最后看到文章《How to structure a Flask-RESTPlus web service for production builds》(文后附链接)中介绍了 DTO 才让我找到了更 “结构化” 的解决办法。

DTO 即 data transfer object,这样设计的思路是和蓝图类似,传统 flask 应用中,在 app 工厂方法注册蓝图,而蓝图内的包相对独立,而 Flask-RESTPlus 引入了 namespace,按上文,我们把它作为蓝图更细以级的存在,因此,可以参考蓝图,将 namespace 的定义和依赖封装在一个类中,这样既避免了循环引用,还可以让整个项目的结构更清晰。

比如 Type1 DTO:

class Type1Dto:
    api = Namespace("type1", path="/type1", decorators=[internal_token_validator])
    action_model = api.model('ActionModel', action_desc)
    create_model = api.model("CreateType1Model", {
        "type1_title": fields.List(fields.String(description="Type1 title"), required=True),
        "type1_info": fields.Nested(api.model("Type1InfoModel", {
            "content": fields.String(description="Type1 content"),
        }), required=True),
    })
    template_model = api.model("TemplateType1Model", {
        "type1_title": fields.List(fields.String(description="Type1 title", required=True)),
        "content": fields.Nested(api.model("TemplateContent", {}), required=True),
    })
    model = api.model("Type1Model", {
        "total": fields.Integer(readOnly=True, description="action total"),
        "results": fields.List(fields.Nested(action_model)),
    })

其中包含了 namespace 的定义,request 的格式对象(Flask-RESTPlus 基于它生成 Request 文档),和 response 的返回对象(Flask-RESTPlus 基于它渲染 json 并生成 Response 文档)。

在使用时,将 dto 导入到视图层,而相关 model 也会在这派上用场:

from .dto import Type1Dto

api = Type1Dto.api


@api.route("/")
class Type1Api(Resource):
    json_schemas = {"post": SEND_TYPE_1_SCHEMA}

    @tracing_span("post_type_1:type_1_api")
    @api.expect(Type1Dto.create_model)
    @api.param("Token", description="internal token.", _in="header", required=True)
    def post(self):
        actions = deal_with_type_1(api.payload['type_1_title'], api.payload['content'])
        result = {
            "total": len(actions),
            "results": [a.render() for a in actions]
        }
        return result

最后将视图层的 api 导入到蓝图定义的地方完成注册,这样整个项目既做到了合理的结构分类,也完成和解决了导包问题。


参考资料:《How to structure a Flask-RESTPlus web service for production builds》: https://medium.freecodecamp.org/structuring-a-flask-restplus-web-service-for-production-builds-c2ec676de563

2018/5/26 posted in  Python 黑魔法