非 flask 程序实现 Flask Shell

2018/4/29 posted in  Python 黑魔法

Python 的并发编程

这篇文章将讲解 Python 并发编程的基本操作。并发和并行是对孪生兄弟,概念经常混淆。并发是指能够多任务处理,并行则是是能够同时多任务处理。Erlang 之父 Joe Armstrong 有一张非常有趣的图说明这两个概念:

我个人更喜欢的一种说法是:并发是宏观并行而微观串行。

GIL

虽然 Python 自带了很好的类库支持多线程/进程编程,但众所周知,因为 GIL 的存在,Python 很难做好真正的并行。

GIL 指全局解释器锁,对于 GIL 的介绍:

全局解释器锁(英语:Global Interpreter Lock,缩写GIL),是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。

  • 维基百科

其实与其说 GIL 是 Python 解释器的限制,不如说是 CPython 的限制,因为 Python 为了保障性能,底层大多使用 C 实现的,而 CPython 的内存管理并不是线程安全的,为了保障整体的线程安全,解释器便禁止多线程的并行执行。

因为 Python 社区认为操作系统的线程调度已经非常成熟了,没有必要自己再实现一遍,因此 Python 的线程切换基本是依赖操作系统,在实际的使用中,对于单核 CPU,GIL 并没有太大的影响,但对于多核 CPU 却引入了线程颠簸(thrashing)问题。

线程颠簸是指作为单一资源的 GIL 锁,在被多核心竞争强占时资源额外消耗的现象。

比如下图,线程1 在释放 GIL 锁后,操作系统唤醒了 线程2,并将 线程2 分配给 核心2 执行,但是如果此时 线程2 却没有成功获得 GIL 锁,只能再次被挂起。此时切换线程、切换上下文的资源都将白白浪费。

因此,Python 多线程程序在多核 CPU 机器下的性能不一定比单核高。那么如果是计算密集型的程序,一般还是考虑用 C 重写关键部分,或者使用多进程避开 GIL。

多线程

在 Python 中使用多线程,有 threadthreading 可供原则,thread 提供了低级别的、原始的线程以及一个简单的锁,因为 thread 过于简陋,线程管理容易出现人为失误,因此官方更建议使用 threading,而 threading 也不过是对 thread 的封装和补充。(Python3 中 thread 被改名为 _thread)。

在 Python 中创建线程非常简单:

import time
import threading


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(1)
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 创建 task
        tasks.append(threading.Thread(
            target=do_task,
            args=("task_{}".format(i),)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

直接创建线程简单优雅,如果逻辑复杂,也可以通过继承 Thread 基类完成多线程:

import time
import threading


class MyTask(threading.Thread):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(1)
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 创建 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

多进程

在 Python 中,可以使用 multiprocessing 库来实现多进程编程,和多线程一样,有两种方法可以使用多进程编程。

直接创建进程:

import time
import random
import multiprocessing


def do_something(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 创建 task
        tasks.append(multiprocessing.Process(
            target=do_something,
            args=("task_{}".format(i),)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

继承进程父类:

import time
import random
import multiprocessing


class MyTask(multiprocessing.Process):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(random.randint(1, 5))
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 创建 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

multiprocessing 除了常用的多进程编程外,我认为它最大的意义在于提供了一套规范,在该库下有一个 dummy 模块,即 multiprocessing.dummy,里面对 threading 进行封装,提供了和 multiprocessing 相同 API 的线程实现,换句话说,class::multiprocessing.Process 提供的是进程任务类,而 class::multiprocessing.dummy.Process,也正是有 multiprocessing.dummy 的存在,可以快速的讲一个多进程程序改为多线程:

import time
import random
from multiprocessing.dummy import Process


class MyTask(Process):
    def __init__(self, task_name):
        super(MyTask, self).__init__()
        self.task_name = task_name

    def run(self):
        print("Get task: {}".format(self.task_name))
        time.sleep(random.randint(1, 5))
        print("Finish task: {}".format(self.task_name))


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        # 创建 task
        tasks.append(MyTask("task_{}".format(i)))
    for t in tasks:
        # 开始执行 task
        t.start()

    for t in tasks:
        # 等待 task 执行完毕
        # 完毕前会阻塞住主线程
        t.join()
    print("Finish.")

无论是多线程还是多进程编程,这也是我一般会选择 multiprocessing 的原因。

除了直接创建进程,还可以用进程池(或者 multiprocessing.dummy 里的进程池):

import time
import random
from multiprocessing import Pool


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(0, 10):
        #     创建 task
        pool.apply_async(do_task, ("task_{}".format(i),))
    pool.close()
    pool.join()
    print("Finish.")

线程池:

import time
import random
from multiprocessing.dummy import Pool


def do_task(task_name):
    print("Get task: {}".format(task_name))
    time.sleep(random.randint(1, 5))
    print("Finish task: {}".format(task_name))


if __name__ == "__main__":
    pool = Pool(5)
    for i in range(0, 10):
        #     创建 task
        pool.apply_async(do_task, ("task_{}".format(i),))
    pool.close()
    pool.join()
    print("Finish.")

这里示例有个问题,pool 在 join 前需要 close 掉,否则就会抛出异常,不过 Python 之禅的作者 Tim Peters 给出解释:

As to Pool.close(), you should call that when - and only when - you're never going to submit more work to the Pool instance. So Pool.close() is typically called when the parallelizable part of your main program is finished. Then the worker processes will terminate when all work already assigned has completed.

It's also excellent practice to call Pool.join() to wait for the worker processes to terminate. Among other reasons, there's often no good way to report exceptions in parallelized code (exceptions occur in a context only vaguely related to what your main program is doing), and Pool.join() provides a synchronization point that can report some exceptions that occurred in worker processes that you'd otherwise never see.

同步原语

在多进程编程中,因为进程间的资源隔离,不需要考虑内存的线程安全问题,而在多线程编程中便需要同步原语来保存线程安全,因为 Python 是一门简单的语言,很多操作都是封装的操作系统 API,因此支持的同步原语蛮全,但这里只写两种常见的同步原语:锁和信号量。

通过使用锁可以用来保护一段内存空间,而信号量可以被多个线程共享。

threading 中可以看到 Lock 锁和 RLock 重用锁两种锁,区别如名。这两种锁都只能被一个线程拥有,第一种锁只能被获得一次,而重用锁可以被多次获得,但也需要同样次数的释放才能真正的释放。

当多个线程对同一块内存空间同时进行修改的时候,经常遇到奇怪的问题:

import time
import random
from threading import Thread, Lock

count = 0


def do_task():
    global count
    time.sleep(random.randint(1, 10) * 0.1)
    tmp = count
    tmp += 1
    time.sleep(random.randint(1, 10) * 0.1)
    count = tmp
    print(count)


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish. Count = {}".format(count))

如上就是典型的非线程安全导致 count 没有达到预期的效果。而通过锁便可以控制某一段代码,或者说某段内存空间的访问:

import time
import random
from threading import Thread, Lock

count = 0
lock = Lock()


def do_task():
    lock.acquire()
    global count
    time.sleep(random.randint(1, 10) * 0.1)
    tmp = count
    tmp += 1
    time.sleep(random.randint(1, 10) * 0.1)
    count = tmp
    print(count)
    lock.release()


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish. Count = {}".format(count))

当然,上述例子非常暴力,直接强行把并发改为串行。

对于信号量常见于有限资源强占的场景,可以定义固定大小的信号量供多个线程获取或者释放,从而控制线程的任务执行,比如下面的例子,控制最多有 5 个任务在执行:

import time
import random
from threading import Thread, BoundedSemaphore

sep = BoundedSemaphore(5)


def do_task(task_name):
    sep.acquire()
    print("do Task: {}".format(task_name))
    time.sleep(random.randint(1, 10))
    sep.release()


if __name__ == "__main__":
    tasks = []
    for i in range(0, 10):
        tasks.append(Thread(target=do_task, args=("task_{}".format(i),)))
    for t in tasks:
        t.start()
    for t in tasks:
        t.join()
    print("Finish.")

Queue 和 Pipe

因为多进程的内存隔离,不会存在内存竞争的问题。但同时,多个进程间的数据共享成为了新的问题,而进程间通信常见:队列,管道,信号。

这里只讲解队列和管道。

队列常见于双进程模型,一般用作生产者-消费者模式,由生产者进程向队列中发布任务,并由消费者从队列首部拿出任务进行执行:

import time
from multiprocessing import Process, Queue


class Task1(Process):
    def __init__(self, queue):
        super(Task1, self).__init__()
        self.queue = queue

    def run(self):
        item = self.queue.get()
        print("get item: [{}]".format(item))


class Task2(Process):
    def __init__(self, queue):
        super(Task2, self).__init__()
        self.queue = queue

    def run(self):
        print("put item: [Hello]")
        time.sleep(1)
        self.queue.put("Hello")


if __name__ == "__main__":
    queue = Queue()
    t1 = Task1(queue)
    t2 = Task2(queue)
    t1.start()
    t2.start()
    t1.join()
    print("Finish.")

理论上每个进程都可以向队列里的读或者写,可以认为队列是半双工路线。但是往往只有特定的读进程(比如消费者)和写进程(比如生产者),尽管这些进程只是开发者自己定义的。

而 Pipe 更像一个全工路线:

import time
from multiprocessing import Process, Pipe


class Task1(Process):
    def __init__(self, pipe):
        super(Task1, self).__init__()
        self.pipe = pipe

    def run(self):
        item = self.pipe.recv()
        print("Task1: recv item: [{}]".format(item))
        print("Task1: send item: [Hi]")
        self.pipe.send("Hi")


class Task2(Process):
    def __init__(self, pipe):
        super(Task2, self).__init__()
        self.pipe = pipe

    def run(self):
        print("Task2: send item: [Hello]")
        time.sleep(1)
        self.pipe.send("Hello")
        time.sleep(1)
        item = self.pipe.recv()
        print("Task2: recv item: [{}]".format(item))


if __name__ == "__main__":
    pipe = Pipe()
    t1 = Task1(pipe[0])
    t2 = Task2(pipe[1])
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("Finish.")

除了上面介绍的 threadingmultiprocessing 两个库外,还有一个好用的令人发指的库 concurrent.futures。和前面两个库不同,这个库是更高等级的抽象,隐藏了很多底层的东西,但也因此非常好用。用官方的例子:

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

该库中自带了进程池和线程池,可以通过上下文管理器来管理,而且对于异步任务执行完后,结果的获得也非常简单。再拿一个官方的多进程计算的例子作为结束:

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()
2018/1/18 posted in  Python 黑魔法

Python unittest mock 小记

自从入坑 TDD 至今,并没有太多的 mock 需求,然而在随着深入到微服务后端组件的开发,依赖的组件越来越多也越来越诡异,便开始真正的正视起 Mock 来。

mock 是 Python 中一个神奇的模块,能够在测试中动态的替换部分逻辑,可以用来解决在跑测试的时候对其他组件依赖的问题。

在 Python 3.3 之后,mock 被加入到 unittest 模块里,不需要单独而在,而在之前,需要单独安装:

pip install mock

对于 mock 模块的学习也非常简单,只需要了解 Mock 类的使用和 Patch 的使用就非常受用了,前者是 mock 的精妙所在,可以创造一个可以替换任意函数、类、对象、方法的 object,并让这个 object 伪装成理想的样子。而 Patch 即是将这个 Mock object 替换的它应该存在的地方。

Mock

要理解 Mock,必须要先理解 Mock 类:

class unittest.mock.Mock(spec=None, side_effect=None, return_value=DEFAULT, wraps=None, name=None, spec_set=None, unsafe=False, **kwargs)

文档描述很详尽,但是常用的参数 side_effect, 和 return_value 值得注意。

side_effect 接受一个函数或异常,当 mock 实例被调用是,便会返回这个函数的返回内容,或者抛出对应的异常:

In [3]: m = Mock(side_effect=lambda: "hello")

In [4]: m()
Out[4]: 'hello'

In [5]: m = Mock(side_effect=Exception("Boooooom"))

In [6]: m()
---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-6-7e5925b669a0> in <module>()
----> 1 m()

....

Exception: Boooooom

当对于返回值比较固定的时候,return_value 可能会更加的实用:

In [7]: m = Mock(return_value="hello")

In [8]: m()
Out[8]: 'hello'

除了在 init 的时候对这些值进行注入,还可以通过赋值的方式:

In [10]: m.func.return_value = "world"

In [11]: m.func()
Out[11]: 'world'

目前只是对 Mock 的简单用法,因为 Mock 的实现很神奇,可以直接调用任何参数、方法,哪怕这些参数、方法并不存在,Mock 实例会直接返回一个新的 Mock 实例,像一棵树一样,非常有趣:

In [13]: m.wat
Out[13]: <Mock name='mock.wat' id='4406809544'>

In [14]: m.wat()
Out[14]: <Mock name='mock.wat()' id='4406812456'>

正是有了这样的特性,我们可以直接进行更深入的修改:

In [17]: m.wat.func.return_value = "watman"

In [18]: m.wat.func()
Out[18]: 'watman'

自此,如何 Mock 类已经非常清晰了,一个 Mock 实例可以作为 callable 的对象存在,而它本身,却又可以包含"无限"层级的新的的 Mock 实例,因此他可以非常简单的伪造出复杂的结构,比如 Response 等。

然而,在 mock 模块中,提供的并不只有一个 Mock 类,以上只是最基本的,还有更自动和常用的 unittest.mock.MagicMock,以及可以替换属性的 unittest.mock.PropertyMock。(除此之外还有两种 NonCallableMock)

MagicMock

MagicMock 是 Mock 的子类,其区别正如其名,即是实现了很多 magic method,可以举一个简单的例子,对于一个实例,如果没有实现 __iter__ 方法时,被转换成 list 会抛出异常:

In [19]: m = Mock()

In [20]: list(m)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-20-150120f9fb24> in <module>()
----> 1 list(m)

TypeError: 'Mock' object is not iterable

如果想正常的转化成列表,那么需要实现这个方法:

In [24]: m.__iter__ = Mock(return_value=iter([]))

In [25]: list(m)
Out[25]: []

MagicMock 类则会自作聪明的 mock 掉所有的 magic method,所以不需要自己去实现,而 __iter__ 默认是空数组:

In [28]: m = MagicMock()

In [29]: list(m)
Out[29]: []

我之所以认为 MagicMock 是自作聪明是因为如果我们需要这些魔法方法,默认的肯定不足以我们使用,我们无法避免自己重写。但因为已经有默认的存在,以至于如果我们手动实现的 Mock 实例存在问题时,往往会被它的默认方法掩盖掉一部分真相。

PropertyMock

PropertyMock 是我又爱又恨的 Mock 类,爱其有用,恨其难写。

在 Mock 类中默认 mock 的是可调用对象,因此,如果是熟悉或者 property 属性的 mock 并不理想:

In [30]: m = MagicMock()

In [31]: m.property_attr
Out[31]: <MagicMock name='mock.property_attr' id='4407739728'>

因此才需要 PropertyMock 的存在来拯救世界:

In [34]: p = PropertyMock(return_value=3)

In [35]: type(m).property_attr = p

In [36]: m.property_attr
Out[36]: 3

这种写法我真是日了狗了,不做详解,荆轲刺秦王。

补充

最后,作为 Mock 类的补充。Mock 除了提供了伪造数据的功能,还保留了大量的自带断言,可以来 ”事后“ 审计被 mock 的方法调用的情况,在单元测试中,简直神器。

Patch

有强大的 Mock 并不够用,只有真正的把伪造的可调用对象替换进去才算完美,而 Patch 便是负责这个工作。

在讲 mock 模块中实现的各种神奇 Patch 之前,依然先提下最基础的 unittest.mock.patch 类:

unittest.mock.patch(target, new=DEFAULT, spec=None, create=False, spec_set=None, autospec=None, new_callable=None, **kwargs)

一目了然,其中 target 是必选项,即使要替换什么,在基础的 Patch 类中,要求为字符串,比如:package.module.ClassName,而 new 即使要替换成的 Mock 类实例,默认是一个 MagicMock 实例,正如前面所说,这个实例实现了大量的默认魔法方法,只能能够让被调用时不抛异常而已,如果需要有数据替换等更精细的操作,需要自己定义 Mock 实例。

Patch 的参数基本上都是作为实例化 Mock 实例时使用的,如果自己手动构造 Mock 实例的话,只需要专注于 Patch 本身的功能就好。可以看一个简单的例子:

In [37]: class Class:
    ...:     def method(self):
    ...:         pass
    ...:

In [38]: with patch('__main__.Class') as MockClass:
    ...:     instance = MockClass.return_value
    ...:     instance.method.return_value = 'foo'
    ...:     assert Class() is instance
    ...:     assert Class().method() == 'foo'

值得一提的是,Patch 可以作为装饰器或者上下文管理器,前者对于默认的 Mock 实例即可满足需求的场景非常实用,而且 Patch 的额外参数也显得非常好用,后者对于构造负责的 Mock 实例非常好用。

Patch 拓展

Patch 类还有三个非常有用的拓展,分别是替换字典用的 patch.dict,多次替换用的 patch.multiple,和替换实例用的 patch.object

前两个非常简单,patch.dict 可以替换字典,或者字典类似实例,参考文档上的例子,一目了然:

>>> foo = {}
>>> with patch.dict(foo, {'newkey': 'newvalue'}):
...     assert foo == {'newkey': 'newvalue'}
...
>>> assert foo == {}

对于 patch.multiple,个人感觉有点鸡肋,目前也没真正的用过,作用正如文档所说:

Perform multiple patches in a single call.

文档的例子:

>>> thing = object()
>>> other = object()
>>> @patch.multiple('__main__', thing=DEFAULT, other=DEFAULT)
... def test_function(thing, other):
...     assert isinstance(thing, MagicMock)
...     assert isinstance(other, MagicMock)
...
>>> test_function()

patch.object 是需要单独拿出来强调的,毕竟和前两者相比,替换一个实例的场景是非常场景的,而且会有非常诡异的替换方式。

其实可以看到,Patch 的参数大同小异,我们只需要关注差异就可以了:

patch.object(target, attribute, new=DEFAULT, spec=None, create=False, spec_set=None, autospec=None, new_callable=None, **kwargs)

首先依然有 target,并且多出了 attribute 参数。对于 Patch 来说,要求 target 是字符串,而这里则要求是要替换的类,反而 attribute 被要求为字符串。除了这点,方法没有其他要注意的了,直接举一个文档的例子:

>>> @patch.object(SomeClass, 'class_method')
... def test(mock_method):
...     SomeClass.class_method(3)
...     mock_method.assert_called_with(3)
...
>>> test()

对于 Python unittest mock 的基础用法算是写完了,可以看到 Mock 和 Patch 都是非常灵性的工具,虽然把实现的奇技淫巧都跟封装了起来,但是其灵活性依然让人惊讶。至于怎么设计 Mock 以应对更复杂的实际场景,应该是另外一种艺术了。

2017/12/19 posted in  Python 黑魔法

Linux下常用命令

看见老五博客里有这么一篇博客,赶快转来备忘!!。@tTop5

说明:所有命令是在Centos 6.4 64位的虚拟机系统进行测试的。本文介绍的命令都会在此Centos下运行验证(也有部分命令会在suse/ubuntu系统里测试的,会做特明说明),但运行结果就不再列出了。 硬件篇 CPU相关

lscpu #查看的是cpu的统计信息. cat /proc/cpuinfo #查看CPU信息详细信息,如每个CPU的型号,主频等

内存相关

free -m #概要查看内存情况 这里的单位是MB cat /proc/meminfo #查看内存详细信息

磁盘相关

lsblk #查看硬盘和分区分布,显示很直观 df -h #查看各分区使用情况 cat /proc/partitions #查看硬盘和分区 mount | column -t #查看挂接的分区状态

网卡相关

lspci | grep -i 'eth' #查看网卡硬件信息ifconfig -a #查看系统的所有网络接口 ethtool eth0 #如果要查看某个网络接口的详细信息,例如eth0的详细参数和指标

软件篇 内核相关

uname -a #查看版本当前操作系统内核信息 cat /proc/version #查看当前操作系统版本信息 cat /etc/issue #查看版本当前操作系统发行版信息 cat /etc/redhat-release #同上 cat /etc/SuSE-release #suse系统下才可使用 lsb_release -a #用来查看linux兼容性的发行版信息 lsmod #列出加载的内核模块

网络

ifconfig #查看所有网络接口的属性iptables -L #查看防火墙设置 service iptables status #查看防火墙状态 service iptables stop #关闭防火墙 route -n #查看路由表 netstat -lntp #查看所有监听端口 netstat -antp #查看所有已经建立的连接 netstat -s #查看网络统计信息进程 netstat -at #列出所有tcp端口 netstat -au #列出所有udp端口 netstat -lt #只列出所有监听tcp端口

系统管理

top #查看系统所有进程的详细信息,比如CPU、内存等,信息很多! df -lh #查看硬盘大小及使用率 mount #挂接远程目录、NFS、本地共享目录到linux下 hostname #查看/修改计算机名 w #查看活动用户 id #查看指定用户信息 last #查看用户登录日志 cut -d: -f1 /etc/passwd #查看系统所有用户 cut -d: -f1 /etc/group #查看系统所有组 crontab -l #查看当前用户的计划任务服务 chkconfig –list #列出所有系统服务 chkconfig –list | grep on #列出所有启动的系统服务程序 rpm -qa #查看所有安装的软件包 uptime #查看系统运行时间、用户数、负载 /sbin/chkconfig --list #查看系统自动启动列表 /sbin/chkconfig –add mysql #把MySQL添加到系统的启动服务组里面

文件相关

ls -lht #列出一个文件夹下所有文件及大小、访问权限 **du -sh ** #查看指定目录的大小 **du -lh ** #查看指定目录及各文件的大小 ln -s #建立软链接

进程相关

pstree -p pid #查看一个进程下的所有线程 pstree -a #显示所有进程的所有详细信息,遇到相同的进程名可以压缩显示。 ps -ef #查看所有进程 kill -9 pid #杀死进程 kill all test #杀死进程 kill -9 pgrep test #杀死进程 ./test.sh & #使程序在后台运行 nohup ./test.sh & #使程序在后台运行

压缩解压缩

zip -r dir.zip dir file #将目录dir、文件file等压缩到zip包,zip -re dir.zip dir file #创建zip包,且加密 unzip dir.zip #解压 tar -zcvf dir.tar.gz dir file #将目录dir、文件file等压缩到tar包 tar -xf dir.tar.gz #解压

screen命令 screen命令组最大的好处就是当你的shell退出或关闭后,你运行的服务不会关系,也就是说,我们可以在screen里开启一组服务,且不受终端断开的影响。

screen -S test #创建一个名字为test的screen screen -r test #打开名字为test的screen screen -r pid #打开进程号为pid的screen screen -ls #列出所有的screen ctrl + a,d #当在一个screen时,退出screen ctrl + a,n #当在一个screen时,切换到下一个窗口 ctrl + a,c #当在一个screen时,创建一个新的窗口

scp命令

scp local_file remote_username@remote_ip:remote_dir #拷贝本地文件到远程机器上 scp -r local_dir remote_username@remote_ip:remote_dir #拷贝本地整个目录到远程机器上

软件包安装管理命令 假设你想要安装的软件包叫做app,注意,这里的命令通常需要sudo或者root权限。

//centos系统、redhat系统 rpm -qa | grep app #查找本机是否安装了app; rpm -ivh app.rpm #假设你有app的rpm包,这样直接安装 sudo yum install app #否则就在线安装 yum update app #更新app rpm -e app #删除已安装的app包 //suse、opensuse系统 zypper search app #查找本机是否安装了app; zypper install app #安装 zypper update app #更新 zypper remove app #删除 zypper lr #列出所有已定义的安装源。 zypper ar #添加新安装源。 zypper rr #删除指定的安装源 zypper mr #修改指定的安装源 //ubuntu系统 apt-get install app #安装 apt-get update app #更新 apt-get remove app #删除 apt-cache search app #搜索软件包 dpkg -i app.deb #假设你有app的deb包,这样直接安装

2017/7/29 posted in  Linux

Ansible Config 和 Playbook

Config

Ansible 的配置一般不需要更改,如果需要定制,自定义配置也很简单,在 Ansible 中,寻找配置按照如下顺序:

  • ANSIBLE_CONFIG (一个环境变量)
  • ansible.cfg (位于当前目录中)
  • .ansible.cfg (位于家目录中)
  • /etc/ansible/ansible.cfg

因此只需要按照文档自定义配置即可:http://ansible-tran.readthedocs.io/en/latest/docs/intro_configuration.html

Playbook

为了方便保存执行的操作,Ansible 使用了 Playbook 剧本。剧本使用 yml 格式,来避免成为了一种新语言或者脚本。

Playbook 是有 play 组成的,每个 play 包含了 host,user,tasks。

比如一个 playbook:

---
- hosts: webservers
  vars:
    http_port: 80
    max_clients: 200
  remote_user: root
  tasks:
  - name: ensure apache is at the latest version
    yum: pkg=httpd state=latest
  - name: write the apache config file
    template: src=/srv/httpd.j2 dest=/etc/httpd.conf
    notify:
    - restart apache
  - name: ensure apache is running
    service: name=httpd state=started
  handlers:
    - name: restart apache
      service: name=httpd state=restarted

host 便是指定的 hosts 文件中的主机,可以通过 remote_user 指定在远程使用的用户,也可以用 sudo 为远程操作添加 root 权限。

Task

作为远程部署工具,task 是整个 playbook 的重点。每个 task 都会在指定的所有远程主机执行,如果有执行失败的主机,将会被跳过。

每个 task 目标在于执行一个幂等(moudle)的操作,因此即使是多次执行也会很安全。一个 task 类似于下面的格式:

tasks:
  - name: make sure apache is running
    service: name=httpd state=running

一个 task 包含了名称,model,以及参数。更多的 task 写法见: http://ansible-tran.readthedocs.io/en/latest/docs/playbooks_intro.html

2017/7/26 posted in  坏笔记不如好记性