Python协程

协程概念

协程是一种用户态的轻量级线程,协程的调度完全由用户控制。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈,直接操作栈则基本没有内核切换的开销,可以不加锁的访问全局变量,所以上下文的切换非常快。与线程相比,协程更轻量。一个Python线程大概占用8M内存,而一个协程只占用1KB不到内存。协程更适用于IO密集型的应用。

greenlet模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from greenlet import greenlet
import time


def eat():
print("start eating")
g2.switch()
print("finished eating")


def sleep():
print("start sleeping")
print("finished sleeping")
g1.switch()


g1 = greenlet(eat)
g2 = greenlet(sleep)
g1.switch()

在上面的代码中,创建了两个协程对象,分别是g1g2,分别对应的是eat()函数sleep()函数。通过使用greenletswitch方法可以切换协程。这里我们先调用g1switch方法,此时eat()函数被执行,并打印出start eating。接下来是g2switch方法被调用,此时执行sleep()函数,依次打印start sleepingfinished sleeping,之后又调用了g1switch方法,回到eat()函数中,打印出finished eating。程序执行结束。这个时候我们就完成了两个协程之间的切换。代码的输出为:

1
2
3
4
start eating
start sleeping
finished sleeping
finished eating

gevent模块

gevent是第三方库,通过greenlet实现协程。然而当一个greenlet遇到IO操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。

创建协程任务
1
2
3
4
5
6
7
8
9
10
import gevent
import time

def eat():
print("start eating")
time.sleep(1)
print("finished eating")

g1 = gevent.spawn(eat) # 创造一个协程任务
gevent.sleep(2)

gevent模块中,通过gevent.spawn就可以创建一个协程任务。如果需要交替之行的话,需要使用gevent.sleep交出控制权。这个时候,就会转去执行eat函数。

阻塞等待协程完成

使用join方法,可以阻塞等待一个协程的结束。

1
2
3
4
5
6
7
8
9
import gevent

def eat():
print("start eating")
time.sleep(3)
print("finished eating")

g1 = gevent.spawn(eat)
g1.join()
阻塞等待多个协程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import gevent

def eat():
print("start eating")
gevent.sleep(3)
print("finished eating")

def sleep():
print("start sleeping")
gevent.sleep(1)
print("finished sleeping")

g1 = gevent.spawn(eat)
g2 = gevent.spawn(sleep)
g1.join()
g2.join()

这里需要注意在多个协程中,不能使用time.sleep,这样将会导致协程是同步执行的。输出如下

1
2
3
4
start eating
finished eating
start sleeping
finished sleeping

因此需要使用gevent.sleep,这样才能引起协程之间的切换。输出如下

1
2
3
4
start eating
start sleeping
finished sleeping
finished eating

除此之外,对于多个协程的阻塞,我们也可以不用对于每一个协程采用join方法, 在gevent中提供了joinall方法,在该方法中,只需要传一个协程的列表即可,因此对于上面代码最后两行,我们可以使用下面代码替换

1
gevent.joinall([g1,g2])
gevent中的monkey模块

monkey模块可以使我们在不修改原来使用的python标准库函数的程序的情况下,将程序转换成可以使用gevent框架的异步程序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import gevent
from gevent import monkey
import time

monkey.patch_all()

def eat():
print("start eating")
time.sleep(3)
print("finished eating")

def sleep():
print("start sleeping")
time.sleep(1)
print("finished sleeping")

g1 = gevent.spawn(eat)
g2 = gevent.spawn(sleep)
gevent.joinall([g1, g2])

我们只需要从gevent中导入monkey。并加上monkey.patch_all()。这样对于代码中的耗时操作,就会转换为gevent中实现的模块。

同时起多个协程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import gevent
from gevent import monkey
import time

monkey.patch_all()

def eat():
print("start eating")
time.sleep(1)
print("finished eating")

g_l = []
for i in range(10):
g = gevent.spawn(eat)
g_l.append(g)
gevent.joinall(g_l)

这里我们使用for循环起10个协程,并将起的每一个协程存入列表中进行阻塞等待。得到的输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
start eating
start eating
start eating
start eating
start eating
start eating
start eating
start eating
start eating
start eating
finished eating
finished eating
finished eating
finished eating
finished eating
finished eating
finished eating
finished eating
finished eating
finished eating
获取协程返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import gevent
from gevent import monkey
import time

monkey.patch_all()

def eat():
print("start eating")
time.sleep(3)
print("finished eating")
return 123

def sleep():
print("start sleeping")
time.sleep(1)
print("finished sleeping")
return 456

g1 = gevent.spawn(eat) # 创造一个协程任务
g2 = gevent.spawn(sleep)

gevent.joinall([g1, g2])
print(g1.value)
print(g2.value)

通过g.value可以拿到每一个协程的返回值。输出如下

1
2
3
4
5
6
start eating
start sleeping
finished sleeping
finished eating
123
456
gevent实现socket并发

server端实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import gevent
from gevent import monkey
monkey.patch_all()
import socket

def chat(conn):
while True:
msg = conn.recv(1024).decode('utf-8')
conn.send(msg.upper().encode('utf-8'))

sk = socket.socket()
sk.bind(('127.0.0.1', 9000))
sk.listen()

while True:
conn, addr = sk.accept()
gevent.spawn(chat, conn)

client端实现如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import socket
import time

def client(i):
sk = socket.socket()
sk.connect(('127.0.0.1', 9000))
while True:
sk.send('elssm'.encode('utf-8'))
print(i*'*',sk.recv(1024).decode('utf-8'))

from threading import Thread

for i in range(500):
Thread(target=client,args=(i,)).start()

这里客户端起500个线程去和服务端进行连接,在服务端接收之后,使用gevent模块,将连接请求通过协程进行处理。

asyncio模块

asyncio 是用来编写 并发 代码的库,使用 async/await 语法。需要注意的是asyncio模块是在python3.x中引入的,在python2.x中并不支持

起一个协程
1
2
3
4
5
6
7
8
9
import asyncio

async def func():
print("start ...")
await asyncio.sleep(1)
print("end ...")

loop = asyncio.get_event_loop()
loop.run_until_complete(func())

首先使用get_event_loop方法创建一个事件循环,之后使用run_until_complete方法将协程注册到事件循环,在asyncio模块中使用async关键字定义一个协程。对于一些耗时的操作,例如网络请求,文件读取等。我们使用asyncio.sleep函数来模拟IO操作。协程的目的也是让这些IO操作异步化。在 sleep的时候,使用await让出控制权。即当遇到阻塞调用的函数的时候,使用await方法将协程的控制权让出,以便loop调用其他的协程。输出如下

1
2
start ...
end ...
启动多个协程
1
2
3
4
5
6
7
8
9
10
import asyncio

async def func():
print("start ...")
await asyncio.sleep(1)
print("end ...")

loop = asyncio.get_event_loop()
wait_obj = asyncio.wait([func(), func(), func()])
loop.run_until_complete(wait_obj)

这里我们还是使用get_event_loop方法先创建一个事件循环,接着我们使用asyncio.wait函数将多个协程保存在列表中,每当有任务阻塞的时候就await,然后其他协程继续工作。创建多个协程的列表,然后将这些协程注册到事件循环中。输出如下

1
2
3
4
5
6
start ...
start ...
start ...
end ...
end ...
end ...
获取多个协程返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import asyncio

async def func():
print("start ...")
await asyncio.sleep(1)
print("end ...")
return 123

loop = asyncio.get_event_loop()
t1 = loop.create_task(func())
t2 = loop.create_task(func())
tasks = [t1,t2]
wait_obj = asyncio.wait([t1,t2])
loop.run_until_complete(wait_obj)
for i in tasks:
print(i.result())

在asyncio模块中,可以使用loop自带的create_task。通过result()方法获取到返回值。输出如下

1
2
3
4
5
6
start ...
start ...
end ...
end ...
123
123
按顺序获取协程返回值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import asyncio

async def func(i):
print("start ...")
await asyncio.sleep(5-i)
print("end ...")
return i,123

async def main():
task_l = []
for i in range(5):
task = asyncio.ensure_future(func(i))
task_l.append(task)
for ret in asyncio.as_completed(task_l):
res = await ret
print(res)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

这里我们起5个协程,使用ensure_future安排将要执行的协程任务,并将5个协程保存在列表中,使用as_completed方法可以随时获取任务的返回值,不需要等待循环事件结束后一并获取所有任务的返回值,随后使用await等待任务完成。因为我们是按顺序来获取返回值,因此睡眠时间最短的应该最先返回,对于5次循环,当i=4的时候只需要睡眠1秒,因此我们应该先得到的返回值是(4,123),最终得到的输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
start ...
start ...
start ...
start ...
start ...
end ...
(4, 123)
end ...
(3, 123)
end ...
(2, 123)
end ...
(1, 123)
end ...
(0, 123)