1.线程
-
python的thread模块是比较底层的模块 -
python的threading模块是对thread做了一些包装的,可以更加方便的被使用 -
当调用start()时,才会真正的创建线程,并且开始执行
1.1 直接创建线程
import time
import threading
def sing():
"""唱歌 5秒钟"""
for i in range(5):
print("----正在唱:菊花茶----")
time.sleep(1)
def dance():
"""跳舞 5秒钟"""
for i in range(5):
print("----正在跳舞----")
time.sleep(1)
def main():
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
t1.start()
t2.start()
if __name__ == "__main__":
main()
1.2 通过继承实现多线程
-
继承threading.Thread的类,一定要实现run()方法。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
class MyThread(threading.Thread):
# 初始化函数,类似于构造函数
def __init__(self, num):
#先初始化父亲
threading.Thread.__init__(self)
self.num = num
# 定义每个线程要运行的函数,如果是继承的方式,这个run的名字不能修改,每个继承自threading的类,都要事先这个run方法
def run(self):
print("线程 %s" %self.num)
time.sleep(3)
if __name__ == '__main__':
t1 = MyThread(1) #创建一个线程
t2 = MyThread(2) #创建另一个线程
t1.start()
t2.start()
1.3 主线程会等待所有的子线程结束后才结束
-
下面程序中,主线程是 __main__
#coding=utf-8
import threading
from time import sleep,ctime
def sing():
for i in range(3):
print("正在唱歌...%d"%i)
sleep(1)
def dance():
for i in range(3):
print("正在跳舞...%d"%i)
sleep(1)
if __name__ == '__main__':
print('---开始---:%s'%ctime())
t1 = threading.Thread(target=sing)
t2 = threading.Thread(target=dance)
t1.start()
t2.start()
#sleep(5) # 屏蔽此行代码,试试看,程序是否会立马结束?
print('---结束---:%s'%ctime())
1.4 总结
-
每个线程一定会有一个名字,尽管上面的例子中没有指定线程对象的name,但是python会自动为线程指定一个名字。 -
当线程的run()方法结束时该线程完成。 -
无法控制线程调度程序,但可以通过别的方式来影响线程调度的方式。
1.5 线程的几种状态
-
例如当执行到sleep语句时,线程将被阻塞(Blocked),到sleep结束后,线程进入就绪(Runnable)状态,等待调度。而线程调度将自行选择一个线程执行。上面的代码中只能保证每个线程都运行完整个run函数,但是线程的启动顺序、run函数中每次循环的执行顺序都不能确定。
2. 线程-join等待其他线程执行完毕
2.1 join方法
-
主线程A中,创建了子线程B,并且在主线程A中调用了B.join() -
那么,主线程A会在调用的地方等待,直到子线程B完成操作后,才可以接着往下执行, -
那么在调用这个线程时可以使用被调用线程的join方法。
2.2 原型:join([timeout])
-
A线程通过b.join()的话,此时就会去B线程执行,当B线程执行完毕后返回到A线程继续执行。 -
里面的参数时可选的,代表线程运行的最大时间,即如果超过这个时间,不管这个此线程有没有执行完毕都会被回收,然后主线程或函数都会接着执行的。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
from time import sleep
class MyThread(threading.Thread):
def __init__(self, id):
threading.Thread.__init__(self)
self.id = id
def run(self):
x = 0
time.sleep(10)
print self.id
if __name__ == "__main__":
t1 = MyThread(999)
t1.start()
t1.join() # 在主线程调用了t1.join(),说明在主线程运行的时候,先运行t1这个线程,然后当t1线程运行完毕后,在回来运行主线程
for i in range(5):
print i
3.线程-守护线程:setDaemon
3.1 守护线程的意义
-
程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程就分兵两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如果子线程未完成,则主线程会等待子线程完成后再退出。 -
但是有时候我们需要的是,只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以用setDaemon方法了。 -
A线程中通过b.setDaemon(),就是不管b线程是否执行完毕,只要A线程结束,B线程也要一起退出。
3.2 例子
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
from time import sleep
class MyThread(threading.Thread):
def __init__(self, id):
threading.Thread.__init__(self)
def run(self):
time.sleep(5)
print "This is " + self.getName()
if __name__ == "__main__":
t1 = MyThread(999)
t1.setDaemon(True) # 如果注释掉这一行,当主线程执行完毕后,会等待子线程执行完毕,此时会输出两行
t1.start()
print "I am the father thread."
-
t1.setDaemon(True)的操作,将父线程设置为了守护线程。 -
主线程与子线程同时执行,但是子线程会sleep5秒,由于在主线程中把子线程t1设置成了守护线程,那么当主线程执行完毕后,不管子线程是否执行完毕,子线程随着父线程一起退出。
4. 线程-线程锁
4.1 互斥锁Mutex
-
lock.acquire(blocking)
与lock.release()
成对出现。 -
锁定方法acquire可以有一个blocking参数。
如果设定blocking为 True
,则当前线程会堵塞,直到获取到这个锁为止(如果没有指定,那么默认为True)如果设定blocking为 False
,则当前线程不会堵塞
-
换句话说:
True表示堵塞 即如果这个锁在上锁之前已经被上锁了,那么这个线程会在这里一直等待到解锁为止 False表示非堵塞,即不管本次调用能够成功上锁,都不会卡在这,而是继续执行下面的代码
-
互斥锁语法
# 创建锁
mutex = threading.Lock()
# 锁定
mutex.acquire()
# 释放
mutex.release()
4.2 上锁解锁过程
-
当一个线程调用锁的acquire()方法获得锁时,锁就进入“locked”状态。 -
每次只有一个线程可以获得锁。如果此时另一个线程试图获得这个锁,该线程就会变为“blocked”状态,称为“阻塞”,直到拥有锁的线程调用锁的release()方法释放锁之后,锁进入“unlocked”状态。 -
线程调度程序从处于同步阻塞状态的线程中选择一个来获得锁,并使得该线程进入运行(running)状态。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
def addNum():
global num # 在每个线程中都获取这个全局变量
print('--get num:', num)
time.sleep(1)
lock.acquire() # 1. 修改数据前加锁
num -= 1 # 对此公共变量进行-1操作
lock.release() # 2. 修改后释放
num = 100 # 设定一个共享变量
thread_list = []
lock = threading.Lock() # 生成全局锁
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: # 等待所有线程执行完毕
t.join()
print('final num:', num)
4.3 递归锁(RLock)
-
锁中还有其他的锁,使用lock = threading.RLock() 得到一个锁对象。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time
def run1():
print("grab the first part data")
#线程内加锁
lock.acquire()
global num
num += 1
lock.release()
return num
def run2():
print("grab the second part data")
#线程内加锁
lock.acquire()
global num2
num2 += 1
lock.release()
return num2
def run3():
# 线程内加锁
lock.acquire()
res = run1()
print('--------between run1 and run2-----')
res2 = run2()
lock.release()
print(res, res2)
if __name__ == '__main__':
num, num2 = 0, 0
lock = threading.RLock()
for i in range(10):
t = threading.Thread(target=run3)
t.start()
while threading.active_count() != 1:
print(threading.active_count())
else:
print('----all threads done---')
print(num, num2)
4.4 信号量(semaphore)
-
互斥锁,同时只允许一个线程更改数据; -
而信号量,可以允许一定数量的线程更改数据。 通过 semaphore = threading.BoundedSemaphore(N)
创建指定数量(N)的线程
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading, time
# 线程运行的方法
def run(n):
semaphore.acquire()
time.sleep(1)
print("run the thread: %sn" % n)
semaphore.release()
# 主线程
if __name__ == '__main__':
num = 0
semaphore = threading.BoundedSemaphore(5) # 最多允许5个线程同时运行
for i in range(20):
t = threading.Thread(target=run, args=(i,))
t.start()
while threading.active_count() != 1:
pass # print threading.active_count()
else:
print('----all threads done---')
print(num)
4.5 死锁
-
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁
#coding=utf-8
import threading
import time
class MyThread1(threading.Thread):
def run(self):
# 对mutexA上锁
mutexA.acquire()
# mutexA上锁后,延时1秒,等待另外那个线程 把mutexB上锁
print(self.name+'----do1---up----')
time.sleep(1)
# 此时会堵塞,因为这个mutexB已经被另外的线程抢先上锁了
mutexB.acquire()
print(self.name+'----do1---down----')
mutexB.release()
# 对mutexA解锁
mutexA.release()
class MyThread2(threading.Thread):
def run(self):
# 对mutexB上锁
mutexB.acquire()
# mutexB上锁后,延时1秒,等待另外那个线程 把mutexA上锁
print(self.name+'----do2---up----')
time.sleep(1)
# 此时会堵塞,因为这个mutexA已经被另外的线程抢先上锁了
mutexA.acquire()
print(self.name+'----do2---down----')
mutexA.release()
# 对mutexB解锁
mutexB.release()
mutexA = threading.Lock()
mutexB = threading.Lock()
if __name__ == '__main__':
t1 = MyThread1()
t2 = MyThread2()
t1.start()
t2.start()
5. 定时器Timer
-
延迟指定时间,执行指定函数
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
def hello():
print("hello, world")
t = threading.Timer(30.0, hello)
t.start() # 30秒后输出hello, world
6. 线程-线程间通信threading.Event()
6.1 Event
-
Python提供了Event对象用于线程间通信,它是由线程设置的信号标志,如果信号标志位为假,则线程等待直到信号被其他线程设置成真 -
Event对象实现了简单的线程通信机制,它提供了设置信号,清除信号,等待等用于实现线程间的通信。 -
通过 event = threading.Event()
得到Event的对象。
设置信号set()
-
使用Event的 set()
方法可以设置Event对象内部的信号标志为真。 -
Event对象提供了 isSet()
方法来判断其内部信号标志的状态,当使用event对象的set()方法后,isSet()方法返回真.
清除信号 clear()
-
使用Event对象的 clear()
方法可以清除Event对象内部的信号标志,即将其设为假; -
当使用Event的clear方法后, isSet()
方法返回假
等待wait()
-
Event对象 wait
的方法只有在内部信号为真的时候才会很快的执行并完成返回。 -
当Event对象的内部信号标志为假时,则wait方法一直等待到其为真时才返回。
6.2 例子
-
启动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading,time
import random
def light():
if not event.isSet():
event.set() #wait就不阻塞 #绿灯状态
count = 0
while True:
if count < 10:
print(' 33[42;1m--green light on--- 33[0m')
elif count <13:
print(' 33[43;1m--yellow light on--- 33[0m')
elif count <20:
if event.isSet():
event.clear()
print(' 33[41;1m--red light on--- 33[0m')
else:
count = 0
event.set() #打开绿灯
time.sleep(1)
count +=1
def car(n):
while 1:
time.sleep(random.randrange(10))
if event.isSet(): #绿灯
print("car [%s] is running.." % n)
else:
print("car [%s] is waiting for the red light.." %n)
if __name__ == '__main__':
event = threading.Event()
Light = threading.Thread(target=light)
Light.start()
for i in range(3):
t = threading.Thread(target=car,args=(i,))
t.start()
7.线程-ThreadLocal
-
一个ThreadLocal变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。 -
ThreadLocal解决了参数在一个线程中各个函数之间互相传递的问题。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import threading
# 创建全局ThreadLocal对象:
local_school = threading.local()
def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('dongGe',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('老王',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
'''
Hello, dongGe (in Thread-A)
Hello, 老王 (in Thread-B)
'''
-
全局变量 local_school
就是一个ThreadLocal对象,每个Thread对它都可以读写student属性,但互不影响。 -
你可以把 local_school
看成全局变量,但每个属性如local_school.student
都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal内部会处理。 -
可以理解为全局变量 local_school
是一个dict
,不但可以用local_school.student
,还可以绑定其他变量,如local_school.teacher
等等。 -
ThreadLocal最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
8. 线程-多线程服务器
-
由于多线程中,父子线程共享运行空间 -
所以,当父线程创建了子线程以后,父线程把客户端连接的socket传递给子线程以后,父线程不能关闭这个socket, -
因为父子线程共用这个socket。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from socket import *
from threading import Thread
from time import sleep
# 处理客户端的请求并执行事情
def dealWithClient(newSocket,destAddr):
while True:
recvData = newSocket.recv(1024)
if len(recvData)>0:
print('recv[%s]:%s'%(str(destAddr), recvData))
else:
print('[%s]客户端已经关闭'%str(destAddr))
break
newSocket.close()
def main():
serSocket = socket(AF_INET, SOCK_STREAM)
serSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR , 1)
localAddr = ('', 7788)
serSocket.bind(localAddr)
serSocket.listen(5)
try:
while True:
print('-----主进程,,等待新客户端的到来------')
newSocket,destAddr = serSocket.accept()
print('-----主进程,,接下来创建一个新的进程负责数据处理[%s]-----'%str(destAddr))
client = Thread(target=dealWithClient, args=(newSocket,destAddr))
client.start()
#因为线程中共享这个套接字,如果关闭了会导致这个套接字不可用,
#但是此时在线程中这个套接字可能还在收数据,因此不能关闭
#newSocket.close()
finally:
serSocket.close()
if __name__ == '__main__':
main()
9. 线程-单线程服务器-非阻塞
Server端
-
非阻塞服务器,就是服务器端的socket调用setblocking(Flase)来设置为非阻塞即可。 -
注意点:
设置成非阻塞以后,如果accept时,恰巧没有客户端connect,那么accept会产生一个异常,所以需要try来进行处理。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from socket import *
import time
# 用来存储所有的新链接的socket
g_socketList = []
def main():
serSocket = socket(AF_INET, SOCK_STREAM) # TCP
serSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR , 1) # 地址复用
localAddr = ('', 7788)
serSocket.bind(localAddr)
#可以适当修改listen中的值来看看不同的现象
serSocket.listen(1000)
#将套接字设置为非堵塞
#设置为非堵塞后,如果accept时,恰巧没有客户端connect,那么accept会
#产生一个异常,所以需要try来进行处理
serSocket.setblocking(False)
while True:
#用来测试
#time.sleep(0.5)
try:
newClientInfo = serSocket.accept()
except Exception as result:
pass
else:
print("一个新的客户端到来:%s"%str(newClientInfo))
newClientInfo[0].setblocking(False)
g_socketList.append(newClientInfo)
# 用来存储需要删除的客户端信息
g_needDelClientInfoList = []
for clientSocket,clientAddr in g_socketList:
try:
recvData = clientSocket.recv(1024)
if len(recvData)>0:
print('recv[%s]:%s'%(str(clientAddr), recvData))
else:
print('[%s]客户端已经关闭'%str(clientAddr))
clientSocket.close()
g_needDelClientInfoList.append((clientSocket,clientAddr))
except Exception as result:
pass
for needDelClientInfo in g_needDelClientInfoList:
g_socketList.remove(needDelClientInfo)
if __name__ == '__main__':
main()
client端
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from socket import *
import random
import time
serverIp = input("请输入服务器的ip:")
connNum = input("请输入要链接服务器的次数(例如1000):")
g_socketList = []
for i in range(int(connNum)):
s = socket(AF_INET, SOCK_STREAM)
s.connect((serverIp, 7788))
g_socketList.append(s)
print(i)
while True:
for s in g_socketList:
s.send(str(random.randint(0,100)))
# 用来测试用
#time.sleep(1)
10. 进程
-
进程:一个程序运行起来后,代码+用到的资源 称之为进程,它是操作系统分配资源的基本单元。
10.1 进程的状态
-
工作中,任务数往往大于cpu的核数,即一定有一些任务正在执行,而另外一些任务在等待cpu进行执行,因此导致了有了不同的状态 -
就绪态:运行的条件都已经慢去,正在等在cpu执行 -
执行态:cpu正在执行其功能 -
等待态:等待某些条件满足,例如一个程序sleep了,此时就处于等待态
11. epoll的优点
-
没有最大并发连接的限制,能打开的FD(指的是文件描述符,通俗的理解就是套接字对应的数字编号)的上限远大于1024。 -
效率提升,不是轮询的方式,不会随着FD数目的增加效率下降。只有活跃可用的FD才会调用callback函数;即epoll最大的优点就在于它只管你“活跃”的连接,而跟连接总数无关 -
因此在实际的网络环境中,epoll的效率就会远远高于select和poll。 -
EPOLLIN (可读) -
EPOLLOUT (可写) -
EPOLLET (ET模式) -
epoll对文件描述符的操作有两种模式:LT(level trigger)和ET(edge trigger)。 -
LT模式是默认模式,LT模式与ET模式的区别如下:
LT模式:当epoll检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll时,会再次响应应用程序并通知此事件。 ET模式:当epoll检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll时,不会再次响应应用程序并通知此事件。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import socket
import select
# 创建套接字
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
# 设置可以重复使用绑定的信息
s.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
# 绑定本机信息
s.bind(("",7788))
# 变为被动
s.listen(10)
# 创建一个epoll对象
epoll=select.epoll()
# 测试,用来打印套接字对应的文件描述符
# print s.fileno()
# print select.EPOLLIN|select.EPOLLET
# 注册事件到epoll中
# epoll.register(fd[, eventmask])
# 注意,如果fd已经注册过,则会发生异常
# 将创建的套接字添加到epoll的事件监听中
epoll.register(s.fileno(),select.EPOLLIN|select.EPOLLET)
connections = {}
addresses = {}
# 循环等待客户端的到来或者对方发送数据
while True:
# epoll 进行 fd 扫描的地方 -- 未指定超时时间则为阻塞等待
epoll_list=epoll.poll()
# 对事件进行判断
for fd,events in epoll_list:
# print fd
# print events
# 如果是socket创建的套接字被激活
if fd == s.fileno():
conn,addr=s.accept()
print('有新的客户端到来%s'%str(addr))
# 将 conn 和 addr 信息分别保存起来
connections[conn.fileno()] = conn
addresses[conn.fileno()] = addr
# 向 epoll 中注册 连接 socket 的 可读 事件
epoll.register(conn.fileno(), select.EPOLLIN | select.EPOLLET)
elif events == select.EPOLLIN:
# 从激活 fd 上接收
recvData = connections[fd].recv(1024)
if len(recvData)>0:
print('recv:%s'%recvData)
else:
# 从 epoll 中移除该 连接 fd
epoll.unregister(fd)
# server 侧主动关闭该 连接 fd
connections[fd].close()
print("%s---offline---"%str(addresses[fd]))
12. 进程-fork创建多进程
12.1 fork(),linux上的fork
-
Python
的os模块
封装了常见的系统调用,其中就包括fork
。 -
fork函数是linux/unix系统中的一个系统调用。 -
fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。 -
子进程永远返回0,而父进程返回子进程的ID。 -
这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID
一个基本的例子
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
# 注意,fork函数,只在Unix/Linux/Mac上运行,windows不可以
pid = os.fork()
if pid == 0:
print('哈哈1')
else:
print('哈哈2')
getpid()、getppid()
-
getpid()
: 自己的进程ID -
getppid()
:得到自己父进程的ID。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
x=0
rpid = os.fork()
if rpid<0:
print("fork调用失败。")
elif rpid == 0:
print("我是子进程(%s),我的父进程是(%s)"%(os.getpid(),os.getppid()))
x+=1
else:
print("我是父进程(%s),我的子进程是(%s)"%(os.getpid(),rpid))
print("父子进程都可以执行这里的代码")
12.2 多进程修改全局变量
-
父子进程,各自拥有自己的全局变量
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import time
num = 0
# 注意,fork函数,只在Unix/Linux/Mac上运行,windows不可以
pid = os.fork()
if pid == 0:
num+=1
print('哈哈1---num=%d'%num)
else:
time.sleep(1)
num+=1
print('哈哈2---num=%d'%num)
12.3 多次fork
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import time
# 注意,fork函数,只在Unix/Linux/Mac上运行,windows不可以
pid = os.fork()
if pid == 0:
print('哈哈1')
else:
print('哈哈2')
pid = os.fork()
if pid == 0:
print('哈哈3')
else:
print('哈哈4')
time.sleep(1)
13. 单进程服务器-gevent协程
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
import time
import gevent
from gevent import socket,monkey
monkey.patch_all()
def handle_request(conn):
while True:
data = conn.recv(1024)
if not data:
conn.close()
break
print("recv:", data)
conn.send(data)
def server(port):
s = socket.socket()
s.bind(('', port))
s.listen(5)
while True:
cli, addr = s.accept()
gevent.spawn(handle_request, cli)
if __name__ == '__main__':
server(7788)
14. 进程-multiprocessing.Process创建多进程
14.1 multiprocessing.Process创建多进程
-
由于fork函数是unix/linux上的函数,无法做到跨平台。 -
multiprocessing模块就是跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情 -
创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Process
import time
def f(name):
time.sleep(2)
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
print('__main__')
14.2 Porcess语法结构
结构
-
Process([group [, target [, name [, args [, kwargs]]]]])
target:表示这个进程实例所调用对象; args:表示调用对象的位置参数元组; kwargs:表示调用对象的关键字参数字典; name:为当前进程实例的别名; group:大多数情况下用不到;
Process类常用方法
方法 | 描述 |
---|---|
is_alive() | 判断进程实例是否还在执行; |
join([timeout]) | 是否等待进程实例执行结束,或等待多少秒; |
start() | 启动进程实例(创建子进程); |
run() | 如果没有给定target参数,对这个对象调用start()方法时,就将执行对象中的run()方法; |
terminate() | 不管任务是否完成,立即终止; |
Process类常用属性
属性 | 描述 |
---|---|
name | 当前进程实例别名,默认为Process-N,N为从1开始递增的整数; |
pid | 当前进程实例的PID值; |
14.3 例子
进程PID
# -*- coding:utf-8 -*-
from multiprocessing import Process
import os
import time
def run_proc():
"""子进程要执行的代码"""
print('子进程运行中,pid=%d...' % os.getpid()) # os.getpid获取当前进程的进程号
print('子进程将要结束...')
if __name__ == '__main__':
print('父进程pid: %d' % os.getpid()) # os.getpid获取当前进程的进程号
p = Process(target=run_proc)
p.start()
给子进程指定的函数传递参数
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from multiprocessing import Process,freeze_support
import time
import os
#两个子进程将会调用的两个方法
def worker_1(interval):
print("worker_1,父进程(%s),当前进程(%s)"%(os.getppid(),os.getpid()))
t_start = time.time()
time.sleep(interval) #程序将会被挂起interval秒
t_end = time.time()
print("worker_1,执行时间为'%0.2f'秒"%(t_end - t_start))
def worker_2(interval):
print("worker_2,父进程(%s),当前进程(%s)"%(os.getppid(),os.getpid()))
t_start = time.time()
time.sleep(interval)
t_end = time.time()
print("worker_2,执行时间为'%0.2f'秒"%(t_end - t_start))
#输出当前程序的ID
print("进程ID:%s"%os.getpid())
if __name__ == '__main__':
freeze_support() # 为了能在windows上运行,必须添加这一个
#创建两个进程对象,target指向这个进程对象要执行的对象名称,
#args后面的元组中,是要传递给worker_1方法的参数,
#因为worker_1方法就一个interval参数,这里传递一个整数2给它,
#如果不指定name参数,默认的进程对象名称为Process-N,N为一个递增的整数
p1=Process(target=worker_1,args=(2,))
p2=Process(target=worker_2,name="dongGe",args=(1,))
#使用"进程对象名称.start()"来创建并执行一个子进程,
#这两个进程对象在start后,就会分别去执行worker_1和worker_2方法中的内容
p1.start()
p2.start()
#同时父进程仍然往下执行,如果p2进程还在执行,将会返回True
print("p2.is_alive=%s"%p2.is_alive())
#输出p1和p2进程的别名和pid
print("p1.name=%s"%p1.name)
print("p1.pid=%s"%p1.pid)
print("p2.name=%s"%p2.name)
print("p2.pid=%s"%p2.pid)
#join括号中不携带参数,表示父进程在这个位置要等待p1进程执行完成后,
#再继续执行下面的语句,一般用于进程间的数据同步,如果不写这一句,
#下面的is_alive判断将会是True,在shell(cmd)里面调用这个程序时
#可以完整的看到这个过程,大家可以尝试着将下面的这条语句改成p1.join(1),
#因为p2需要2秒以上才可能执行完成,父进程等待1秒很可能不能让p1完全执行完成,
#所以下面的print会输出True,即p1仍然在执行
p1.join()
print("p1.is_alive=%s"%p1.is_alive())
14.4 进程间不共享全局变量
import multiprocessing
import os
import time
nums = [11, 22, 33]
def test():
nums.append(44)
print("在进程中1中nums=%s" % str(nums))
time.sleep(3)
def test2():
print("在进程中2中nums=%s" % str(nums))
def main():
print("----in 主进程 pid=%d---父进程pid=%d----" % (os.getpid(), os.getppid()))
p = multiprocessing.Process(target=test)
p.start()
# time.sleep(1)
p.join()
p2 = multiprocessing.Process(target=test2)
p2.start()
if __name__ == "__main__":
main()
15. 多进程服务器
-
通过为每个客户端创建一个进程的方式,能够同时为多个客户端进行服务 -
当客户端不是特别多的时候,这种方式还行,如果有几百上千个,就不可取了,因为每次创建进程等过程需要好较大的资源。 -
注意点:
由于父进程得到一个客户端的socket以后,创建子进程的时候会把这个socket传递给子进程来处理; 由于父进程会复制一份运行空间给子进程,所以,父进程可以关闭接收到的这个 socket
。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from socket import *
from multiprocessing import *
from time import sleep
# 处理客户端的请求并为其服务,作为子进程来运行
def dealWithClient(newSocket,destAddr):
while True:
recvData = newSocket.recv(1024)
if len(recvData)>0:
print('recv[%s]:%s'%(str(destAddr), recvData))
else:
print('[%s]客户端已经关闭'%str(destAddr))
break
newSocket.close()
def main():
serSocket = socket(AF_INET, SOCK_STREAM)
serSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR , 1)
localAddr = ('', 7788)
serSocket.bind(localAddr)
serSocket.listen(5)
try:
while True:
print('-----主进程,,等待新客户端的到来------')
newSocket,destAddr = serSocket.accept()
print('-----主进程,,接下来创建一个新的进程负责数据处理[%s]-----'%str(destAddr))
# 当有一个客户连接过来后,就启动一个子进程来处理这个连接
client = Process(target=dealWithClient, args=(newSocket,destAddr))
client.start() # 启动子进程
#因为已经向子进程中copy了一份(引用),并且父进程中这个套接字也没有用处了
#所以关闭
newSocket.close() # 关闭父进程的这个newSocket,因为这个socket已经交给了子进程处理了
finally:
#当为所有的客户端服务完之后再进行关闭,表示不再接收新的客户端的链接
serSocket.close()
if __name__ == '__main__':
main()
16. 进程-继承Porcess创建多进程
-
在子类构造方法中调用父类的构造方法的方式
Process.__init__(self)
,调用父类的初始化函数super(Process_Class, self).__init__()
,另一种调用父类构造函数的方法
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from multiprocessing import Process
import time
import os
#继承Process类
class Process_Class(Process):
#因为Process类本身也有__init__方法,这个子类相当于重写了这个方法,
#但这样就会带来一个问题,我们并没有完全的初始化一个Process类,所以就不能使用从这个类继承的一些方法和属性,
#最好的方法就是将继承类本身传递给Process.__init__方法,完成这些初始化操作
def __init__(self,interval):
#Process.__init__(self) # 调用父类的初始化函数
super(Process_Class, self).__init__() # 另一种调用父类构造函数的方法
self.interval = interval
#重写了Process类的run()方法
def run(self):
print("子进程(%s) 开始执行,父进程为(%s)"%(os.getpid(),os.getppid()))
t_start = time.time()
time.sleep(self.interval)
t_stop = time.time()
print("(%s)执行结束,耗时%0.2f秒"%(os.getpid(),t_stop-t_start))
if __name__=="__main__":
t_start = time.time()
print("当前程序进程(%s)"%os.getpid())
p1 = Process_Class(2)
#对一个不包含target属性的Process类执行start()方法,就会运行这个类中的run()方法,所以这里会执行p1.run()
p1.start()
p1.join()
t_stop = time.time()
print("(%s)执行结束,耗时%0.2f"%(os.getpid(),t_stop-t_start))
17. 进程-进程池Pool
-
进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。 -
如果等待进程池中所有进程关闭后在关闭主进程,比如先pool.close(),再pool.join()
#_*_coding:utf-8_*_
from multiprocessing import Process, Pool, freeze_support
import time
def Foo(i):
time.sleep(2)
return i + 100
def Bar(arg):
print('-->exec done:', arg)
if __name__ == '__main__':
freeze_support() # 解决window下无法生成子进程而卡住
pool = Pool(5)
for i in range(10):
pool.apply_async(func=Foo, args=(i,), callback=Bar)
# pool.apply(func=Foo, args=(i,))
print('end')
pool.close()
pool.join() # 进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。
17.1 常用函数
函数 | 描述 |
---|---|
apply_async(func[, args[, kwds]]) | 使用非阻塞方式调用func(并行执行,堵塞方式必须等待上一个进程退出才能执行下一个进程),args为传递给func的参数列表,kwds为传递给func的关键字参数列表; |
apply(func[, args[, kwds]]) | 使用阻塞方式调用func |
close() | 关闭Pool,使其不再接受新的任务; |
terminate() | 不管任务是否完成,立即终止; |
join() | 主进程阻塞,等待子进程的退出, 必须在close或terminate之后使用; |
17.2 apply阻塞式
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from multiprocessing import Pool, freeze_support
import os,time,random
def worker(msg):
t_start = time.time()
print("%s开始执行,进程号为%d"%(msg,os.getpid()))
#random.random()随机生成0~1之间的浮点数
time.sleep(random.random()*2)
t_stop = time.time()
print(msg,"执行完毕,耗时%0.2f"%(t_stop-t_start))
if __name__ == '__main__':
freeze_support()
po=Pool(3) #定义一个进程池,最大进程数3
for i in range(0,10):
po.apply(worker,(i,))
print("----start----")
po.close() #关闭进程池,关闭后po不再接收新的请求
po.join() #等待po中所有子进程执行完成,必须放在close语句之后
print("-----end-----")
18. 进程-队列Queue
18.1 Queue
-
队列是Python标准库中线程安全的队列FIFO实现,实现了一个适用于多线程的先进先出的数据结构。 -
用来在生产者和消费者线程之间的信息传递。
18.2 基本FIFO队列
-
class Queue.Queue(maxsize=0)
-
FIFO即First in First Out,先进先出。 -
Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。 -
如果maxsize小于或者等于0,队列大小没有限制。
#_*_coding:utf-8_*_
import Queue # 导入序列
q = Queue.Queue() #得到序列对象
#向序列中存储数据
for i in range(5):
q.put(i)
#从序列中得到数据,先进先出,先存的谁,先取出来谁
while not q.empty():
print q.get()
18.3 LIFO队列:栈
-
class Queue.LifoQueue(maxsize=0)
-
LIFO即Last in First Out,后进先出。 -
与栈的类似,使用也很简单,maxsize用法同上
#_*_coding:utf-8_*_
import Queue
q = Queue.LifoQueue()
#向栈中存放数据
for i in range(5):
q.put(i)
#取数据,后存的先出
while not q.empty():
print q.get()
18.4 优先级队列
-
class Queue.PriorityQueue(maxsize=0)
-
构造一个优先队列。maxsize用法同上。
#_*_coding:utf-8_*_
import Queue
import threading
class Job(object):
def __init__(self, priority, description):
self.priority = priority
self.description = description
print 'Job:',description
return
def __cmp__(self, other):
return cmp(self.priority, other.priority)
q = Queue.PriorityQueue()
q.put(Job(3, 'level 3 job'))
q.put(Job(10, 'level 10 job'))
q.put(Job(1, 'level 1 job'))
def process_job(q):
while True:
next_job = q.get()
print 'for:', next_job.description
q.task_done()
workers = [threading.Thread(target=process_job, args=(q,)),
threading.Thread(target=process_job, args=(q,))
]
for w in workers:
w.setDaemon(True)
w.start()
q.join()
18.5 常用方法
task_done()
-
意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。 -
如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。
join()
-
阻塞调用线程,直到队列中的所有任务被处理掉。 -
只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。
put(item[, block[, timeout]])
-
将item放入队列中。 -
如果可选的参数block为True且timeout为空对象(默认的情况,阻塞调用,无超时)。 -
如果timeout是个正整数,阻塞调用进程最多timeout秒,如果一直无空空间可用,抛出Full异常(带超时的阻塞调用)。 -
如果block为False,如果有空闲空间可用将数据放入队列,否则立即抛出Full异常
如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出”Queue.Full”异常; 如果block值为False,消息列队如果没有空间可写入,则会立刻抛出”Queue.Full”异常; 其非阻塞版本为put_nowait等同于put(item, False)
get([block[, timeout]])
-
从队列中移除并返回一个数据。block跟timeout参数同put方法 -
其非阻塞方法为`get_nowait()`相当与get(False)
如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出”Queue.Empty”异常; 如果block值为False,消息列队如果为空,则会立刻抛出”Queue.Empty”异常;
empty()
-
如果队列为空,返回True,反之返回False
18.6 Queue:使用的是multiprocessing中的Queue
-
初始化Queue()对象时(例如:q=Queue()),若括号中没有指定最大可接收的消息数量,或数量为负值,那么就代表可接受的消息数量没有上限(直到内存的尽头);
一些方法
-
Queue.qsize():返回当前队列包含的消息数量; -
Queue.empty():如果队列为空,返回True,反之False ; -
Queue.full():如果队列满了,返回True,反之False; -
Queue.get([block[, timeout]]):获取队列中的一条消息,然后将其从列队中移除,block默认值为True;
如果block使用默认值,且没有设置timeout(单位秒),消息列队如果为空,此时程序将被阻塞(停在读取状态),直到从消息列队读到消息为止,如果设置了timeout,则会等待timeout秒,若还没读取到任何消息,则抛出”Queue.Empty”异常; -如果block值为False,消息列队如果为空,则会立刻抛出”Queue.Empty”异常;
-
Queue.get_nowait():相当Queue.get(False); -
Queue.put(item,[block[, timeout]]):将item消息写入队列,block默认值为True;
如果block使用默认值,且没有设置timeout(单位秒),消息列队如果已经没有空间可写入,此时程序将被阻塞(停在写入状态),直到从消息列队腾出空间为止,如果设置了timeout,则会等待timeout秒,若还没空间,则抛出”Queue.Full”异常; 如果block值为False,消息列队如果没有空间可写入,则会立刻抛出”Queue.Full”异常;
-
Queue.put_nowait(item):相当Queue.put(item, False); -
例子: 在子进程中把数据存入到Queue中,然后从主线程中读取出来。
#_*_coding:utf-8_*_
# 这里导入的是multiprocessing中的Queue
from multiprocessing import Process,Queue
import os,time
# 这个方法会被子进程调用,在子进程中放入数据
def f(q,n):
q.put([n,'hello'])
if __name__ == '__main__':
#此queue不是直接导入的import Queue,这个是multiprocessing重新封装的
q=Queue() # 创建队列
#循环5个进程,并启动
for i in range(5):
p=Process(target=f,args=(q,i))
p.start()
#主线程 等待子进程完毕后在继续执行
p.join()
# 从主线程中取出子线程存到Queue中的数据
for i in range(q.qsize()):
print q.get()
例子:一个进程读,一个进程写
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from multiprocessing import Process, Queue
import os, time, random
# 写数据进程执行的代码:
def write(q):
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
while True:
if not q.empty():
value = q.get(True)
print('Get %s from queue.' % value)
time.sleep(random.random())
else:
break
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 等待pw结束:
pw.join()
# 启动子进程pr,读取:
pr.start()
pr.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
print('')
print('所有数据都写入并且读完')
多进程之间通过Queue来实现数据共享
import multiprocessing
"""
一个进程向Queue中写入数据,另外一个进程从Queue中获取数据,
通过Queue完成了 多个需要配合的进程间的数据共享,从而能够 起到 解耦的作用
"""
def download_from_web(q):
"""下载数据"""
# 模拟从网上下载的数据
data = [11, 22, 33, 44]
# 向队列中写入数据
for temp in data:
q.put(temp)
print("---下载器已经下载完了数据并且存入到队列中----")
def analysis_data(q):
"""数据处理"""
waitting_analysis_data = list()
# 从队列中获取数据
while True:
data = q.get()
waitting_analysis_data.append(data)
if q.empty():
break
# 模拟数据处理
print(waitting_analysis_data)
def main():
# 1. 创建一个队列
q = multiprocessing.Queue()
# 2. 创建多个进程,将队列的引用当做实参进行传递到里面
p1 = multiprocessing.Process(target=download_from_web, args=(q,))
p2 = multiprocessing.Process(target=analysis_data, args=(q,))
p1.start()
p2.start()
if __name__ == "__main__":
main()
18.7 Pipe
-
pipe()返回一对连接对象,代表了pipe的两端。每个对象都有send()和recv()方法。 -
parent_conn, child_conn = Pipe()
返回两个对象,第一个是父,第二个是子。 每一个对象都有send和recv方法。可以使用父与子之间的相互发送与接收。
#_*_coding:utf-8_*_
from multiprocessing import Process, Pipe
# 子进程 调用Pipe的子进程对象的send()方法
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
# Pipe()返回两个对象
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
18.8 Value + Array
-
共享内存映射文件的方法。
#_*_coding:utf-8_*_
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = n.value + 1
for i in range(len(a)):
a[i] = a[i] * 10
if __name__ == '__main__':
num = Value('i', 1)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
p2 = Process(target=f, args=(num, arr))
p2.start()
p2.join()
print(num.value)
print(arr[:])
# the output is :
# 2
# [0, 10, 20, 30, 40, 50, 60, 70, 80, 90]
# 3
# [0, 100, 200, 300, 400, 500, 600, 700, 800, 900]
18.9 Manager中的Queue来给线程池Pool通信
-
Manager()返回的manager对象控制了一个server进程,此进程包含的python对象可以被其他的进程通过proxies来访问。从而达到多进程间数据通信且安全。 -
Manager支持的类型有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#修改import中的Queue为Manager
from multiprocessing import Manager,Pool
import os,time,random
def reader(q):
print("reader启动(%s),父进程为(%s)"%(os.getpid(),os.getppid()))
for i in range(q.qsize()):
print("reader从Queue获取到消息:%s"%q.get(True))
def writer(q):
print("writer启动(%s),父进程为(%s)"%(os.getpid(),os.getppid()))
for i in "dongGe":
q.put(i)
if __name__=="__main__":
print("(%s) start"%os.getpid())
q=Manager().Queue() #使用Manager中的Queue来初始化
po=Pool()
#使用阻塞模式创建进程,这样就不需要在reader中使用死循环了,可以让writer完全执行完成后,再用reader去读取
po.apply(writer,(q,))
po.apply(reader,(q,))
po.close()
po.join()
print("(%s) End"%os.getpid())
19. 进程与线程对比
-
进程,能够完成多任务,比如 在一台电脑上能够同时运行多个QQ -
线程,能够完成多任务,比如 一个QQ中的多个聊天窗口
定义的不同
-
进程是系统进行资源分配和调度的一个独立单位. -
线程是进程的一个实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位.线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈),但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源.
区别
-
一个程序至少有一个进程,一个进程至少有一个线程. -
线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高。 -
进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率 -
线程不能够独立执行,必须依存在进程中
优缺点
-
线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护;而进程正相反。
20.进程-案例文件夹copy
多任务文件夹copy
import os
import multiprocessing
def copy_file(file_name, old_folder_name, new_folder_name):
"""完成文件的复制"""
print("======>模拟copy文件:从%s--->到%s 文件名是:%s" % (old_folder_name, new_folder_name, file_name))
old_f = open(old_folder_name + "/" + file_name, "rb")
content = old_f.read()
old_f.close()
new_f = open(new_folder_name + "/" + file_name, "wb")
new_f.write(content)
new_f.close()
def main():
# 1. 获取用户要copy的文件夹的名字
old_folder_name = input("请输入要copy的文件夹的名字:")
# 2. 创建一个新的文件夹
try:
new_folder_name = old_folder_name + "[复件]"
os.mkdir(new_folder_name)
except:
pass
# 3. 获取文件夹的所有的待copy的文件名字 listdir()
file_names = os.listdir(old_folder_name)
print(file_names)
# 4. 创建进程池
po = multiprocessing.Pool(5)
# 5. 向进程池中添加 copy文件的任务
for file_name in file_names:
po.apply_async(copy_file, args=(file_name, old_folder_name, new_folder_name))
po.close()
po.join()
if __name__ == "__main__":
main()
多任务文件夹copy,显示进度
import os
import multiprocessing
def copy_file(q, file_name, old_folder_name, new_folder_name):
"""完成文件的复制"""
# print("======>模拟copy文件:从%s--->到%s 文件名是:%s" % (old_folder_name, new_folder_name, file_name))
old_f = open(old_folder_name + "/" + file_name, "rb")
content = old_f.read()
old_f.close()
new_f = open(new_folder_name + "/" + file_name, "wb")
new_f.write(content)
new_f.close()
# 如果拷贝完了文件,那么就向队列中写入一个消息,表示已经完成
q.put(file_name)
def main():
# 1. 获取用户要copy的文件夹的名字
old_folder_name = input("请输入要copy的文件夹的名字:")
# 2. 创建一个新的文件夹
try:
new_folder_name = old_folder_name + "[复件]"
os.mkdir(new_folder_name)
except:
pass
# 3. 获取文件夹的所有的待copy的文件名字 listdir()
file_names = os.listdir(old_folder_name)
# print(file_names)
# 4. 创建进程池
po = multiprocessing.Pool(5)
# 5. 创建一个队列
q = multiprocessing.Manager().Queue()
# 6. 向进程池中添加 copy文件的任务
for file_name in file_names:
po.apply_async(copy_file, args=(q, file_name, old_folder_name, new_folder_name))
po.close()
# po.join()
all_file_num = len(file_names) # 测一下所有的文件个数
copy_ok_num = 0
while True:
file_name = q.get()
# print("已经完成copy:%s" % file_name)
copy_ok_num+=1
print("r拷贝的进度为:%.2f %%" % (copy_ok_num*100/all_file_num), end="")
if copy_ok_num >= all_file_num:
break
print()
if __name__ == "__main__":
main()
21. 进程-单线程服务器
-
一次只能服务一个客户端,当再有其他客户端连接到服务器时,无法连接。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from socket import *
serSocket = socket(AF_INET, SOCK_STREAM)
# 重复使用绑定的信息
serSocket.setsockopt(SOL_SOCKET, SO_REUSEADDR , 1)
localAddr = ('', 7788)
serSocket.bind(localAddr)
serSocket.listen(5)
while True:
print('-----主进程,,等待新客户端的到来------')
newSocket,destAddr = serSocket.accept()
print('-----主进程,,接下来负责数据处理[%s]-----'%str(destAddr))
try:
while True:
recvData = newSocket.recv(1024)
if len(recvData)>0:
print('recv[%s]:%s'%(str(destAddr), recvData))
else:
print('[%s]客户端已经关闭'%str(destAddr))
break
finally:
newSocket.close()
serSocket.close()
22. 协程-迭代器
22.1 可迭代对象Iterable
-
我们已经知道可以对list、tuple、str等类型的数据使用 for...in...
的循环语法从其中依次拿到数据进行使用,我们把这样的过程称为遍历,也叫迭代。 -
但是,是否所有的数据类型都可以放到for…in…的语句中,然后让for…in…每次从中取出一条数据供我们使用,即供我们迭代吗? -
通过for…in…这类语句迭代读取一条数据供我们使用的对象称之为可迭代对象(Iterable)**
22.2 如何判断一个对象是否可以迭代
-
可以使用 isinstance()
判断一个对象是否是Iterable
对象
In [50]: from collections import Iterable
In [51]: isinstance([], Iterable)
Out[51]: True
In [52]: isinstance({}, Iterable)
Out[52]: True
In [53]: isinstance('abc', Iterable)
Out[53]: True
In [54]: isinstance(mylist, Iterable)
Out[54]: False
In [55]: isinstance(100, Iterable)
Out[55]: False
22.3 可迭代对象的本质
-
可迭代对象,每迭代一次(即在for…in…中每循环一次)都会返回对象中的下一条数据,一直向后读取数据直到迭代了所有数据后结束。 -
那么,在这个过程中就应该有一个“人”去记录每次访问到了第几条数据,以便每次迭代都可以返回下一条数据。 -
我们把这个能帮助我们进行数据迭代的“人”称为**迭代器(Iterator)**。
本质
-
可以向我们提供一个这样的中间“人”即迭代器帮助我们对其进行迭代遍历使用。 -
可迭代对象通过 __iter__
方法向我们提供一个迭代器, -
我们在迭代一个可迭代对象的时候,实际上就是先获取该对象提供的一个迭代器,然后通过这个迭代器来依次获取对象中的每一个数据. -
一个具备了 **__iter__**
方法的对象,就是一个可迭代对象
22.4 迭代器Iterator
-
迭代是访问集合元素的一种方式。 -
迭代器是一个可以记住遍历的位置的对象。 -
迭代器对象从集合的第一个元素开始访问,直到所有的元素被访问完结束。 -
迭代器只能往前不会后退。 -
当我们对迭代器使用 next()
函数的时候,迭代器会向我们返回它所记录位置的下一个位置的数据。 -
实际上,在使用 next()
函数的时候,调用的就是迭代器对象的__next__
方法(Python3中是对象的next方法,Python2中是对象的next()方法)。 -
所以,我们要想构造一个迭代器,就要实现它的 __next__
方法。但这还不够,python要求迭代器本身也是可迭代的,所以我们还要为迭代器实现__iter__
方法,而iter方法要返回一个迭代器,迭代器自身正是一个迭代器,所以迭代器的__iter__
方法返回自身即可。 -
一个实现了 **__iter__**
方法和**__next__**
方法的对象,就是迭代器
class MyList(object):
"""自定义的一个可迭代对象"""
def __init__(self):
self.items = []
def add(self, val):
self.items.append(val)
def __iter__(self):
myiterator = MyIterator(self)
return myiterator
class MyIterator(object):
"""自定义的供上面可迭代对象使用的一个迭代器"""
def __init__(self, mylist):
self.mylist = mylist
# current用来记录当前访问到的位置
self.current = 0
def __next__(self):
if self.current < len(self.mylist.items):
item = self.mylist.items[self.current]
self.current += 1
return item
else:
raise StopIteration
def __iter__(self):
return self
if __name__ == '__main__':
mylist = MyList()
mylist.add(1)
mylist.add(2)
mylist.add(3)
mylist.add(4)
mylist.add(5)
for num in mylist:
print(num)
22.5 如何判断一个对象是否是迭代器
-
可以使用 isinstance()
判断一个对象是否是Iterator
对象:
In [56]: from collections import Iterator
In [57]: isinstance([], Iterator)
Out[57]: False
In [58]: isinstance(iter([]), Iterator)
Out[58]: True
In [59]: isinstance(iter("abc"), Iterator)
Out[59]: True
22.6 创建可迭代对象和迭代器
自己是迭代器对象,然后使用其他的迭代器
-
Classmate是一个迭代器对象,它有一个 __iter__
方法,返回一个迭代器对象 -
Classmate是一个迭代器ClassIterator,它有一个 __next__
方法用于返回下一个数据
import time
from collections import Iterable
from collections import Iterator
class Classmate(object):
def __init__(self):
self.names = list()
def add(self, name):
self.names.append(name)
def __iter__(self):
"""如果想要一个对象称为一个 可以迭代的对象,即可以使用for,那么必须实现__iter__方法"""
return ClassIterator(self)
class ClassIterator(object):
def __init__(self, obj):
self.obj = obj
self.current_num = 0
def __iter__(self):
pass
def __next__(self):
if self.current_num < len(self.obj.names):
ret = self.obj.names[self.current_num]
self.current_num += 1
return ret
else:
raise StopIteration
classmate = Classmate()
classmate.add("a")
classmate.add("b")
classmate.add("c")
# print("判断classmate是否是可以迭代的对象:", isinstance(classmate, Iterable))
# classmate_iterator = iter(classmate)
# print("判断classmate_iterator是否是迭代器:", isinstance(classmate_iterator, Iterator))
# print(next(classmate_iterator))
for name in classmate:
print(name)
time.sleep(1)
自己即时迭代器,也是迭代器对象的[推荐]
import time
from collections import Iterable
from collections import Iterator
class Classmate(object):
def __init__(self):
self.names = list()
self.current_num = 0
def add(self, name):
self.names.append(name)
def __iter__(self):
"""如果想要一个对象称为一个 可以迭代的对象,即可以使用for,那么必须实现__iter__方法"""
return self # 调用iter(xxobj)的时候 只要__iter__方法返回一个 迭代器即可,至于是自己 还是 别的对象都可以的, 但是要保证是一个迭代器(即实现了 __iter__ __next__方法)
def __next__(self):
if self.current_num < len(self.names):
ret = self.names[self.current_num]
self.current_num += 1
return ret
else:
raise StopIteration
classmate = Classmate()
classmate.add("a")
classmate.add("b")
classmate.add("c")
# print("判断classmate是否是可以迭代的对象:", isinstance(classmate, Iterable))
# classmate_iterator = iter(classmate)
# print("判断classmate_iterator是否是迭代器:", isinstance(classmate_iterator, Iterator))
# print(next(classmate_iterator))
for name in classmate:
print(name)
time.sleep(1)
22.7 iter()函数与next()函数
-
list、tuple等都是可迭代对象,我们可以通过 iter()
函数获取这些可迭代对象的迭代器。 -
然后我们可以对获取到的迭代器不断使用 next()
函数来获取下一条数据。 -
iter()
函数实际上就是调用了可迭代对象的__iter__
方法。
22.8 for...in...
循环的本质
-
for item in Iterable
循环的本质
先通过iter()函数获取可迭代对象Iterable的迭代器 然后对获取到的迭代器不断调用next()方法来获取下一个值并将其赋值给item 当遇到StopIteration的异常后循环结束
并不是只有for循环能接收可迭代对象
-
除了for循环能接收可迭代对象,list、tuple等也能接收。
li = list(FibIterator(15))
print(li)
tp = tuple(FibIterator(6))
print(tp)
22.9 迭代器的应用场景
-
迭代器最核心的功能就是可以通过next()函数的调用来返回下一个数据值。 -
如果每次返回的数据值不是在一个已有的数据集合中读取的,而是通过程序按照一定的规律计算生成的,那么也就意味着可以不用再依赖一个已有的数据集合,也就是说不用再将所有要迭代的数据都一次性缓存下来供后续依次读取,这样可以节省大量的存储(内存)空间。 -斐波拉契数列,数列中第一个数为0,第二个数为1,其后的每一个数都可由前两个数相加得到
class FibIterator(object):
"""斐波那契数列迭代器"""
def __init__(self, n):
"""
:param n: int, 指明生成数列的前n个数
"""
self.n = n
# current用来保存当前生成到数列中的第几个数了
self.current = 0
# num1用来保存前前一个数,初始值为数列中的第一个数0
self.num1 = 0
# num2用来保存前一个数,初始值为数列中的第二个数1
self.num2 = 1
def __next__(self):
"""被next()函数调用来获取下一个数"""
if self.current < self.n:
num = self.num1
self.num1, self.num2 = self.num2, self.num1+self.num2
self.current += 1
return num
else:
raise StopIteration
def __iter__(self):
"""迭代器的__iter__返回自身即可"""
return self
if __name__ == '__main__':
fib = FibIterator(10)
for num in fib:
print(num, end=" ")
23. 协程-生成器
23.1 生成器
-
利用迭代器,我们可以在每次迭代获取数据(通过next()方法)时按照特定的规律进行生成。但是我们在实现一个迭代器时,关于当前迭代到的状态需要我们自己记录,进而才能根据当前状态生成下一个数据。为了达到记录当前状态,并配合next()函数进行迭代使用,我们可以采用更简便的语法,即生成器( generator
)。 -
生成器是一类特殊的迭代器。 -
你只能对其迭代一次。这是因为它们并没有把所有的值都存在内存中,而是在运行时生成值。 -
生成器,就是需要之前,再通过算法推演出来的数据,就是生成器。 -
通过遍历来使用它们,要么使用for循环,要么将它们传递给任意可以进行迭代的函数和结构。 -
大多数的时候生成器是以函数来实现的。然后它们并不返回一个值,而是 yield
(可以理解为生出)一个值。
#!/usr/bin/env python
#coding:utf-8
def generator_function():
for i in range(10):
yield i
for item in generator_function():
print(item)
23.2 创建生成器
方式1
-
把列表生成式(推导式)的 **[]**
** 变成****()**
即可变成生成器。 -
可以通过next(生成器对象)依次获取生成器值,如果没有了还是next()就会报错。 -
通过next()得到值有一些不方便。 -
生成器也是迭代器,可以通过for循环来得到所有的值,且不会报错。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# l是一个列表生成器
l=[x*2 for x in range(5)]
print(l) #[0, 2, 4, 6, 8]
# g是一个生成器
g=(x*2 for x in range(5))
# <generator object <genexpr> at 0x0000000001DF0F68>
# 说明这是一个生成器
print(g)
print(next(g)) # 0
print(next(g)) # 2
print(next(g)) # 4
print('---')
#生成器生成了以后,再次通过for循环就得不到已经生成过的了
for x in g:
print(x)
''' 输出结果
0
2
4
---
6
8
'''
yield
-
你不想同一时间将所有计算出来的大量结果集分配到内存当中,特别是结果集里还包含循环; -
在使用生成器实现的方式中,我们将原本在迭代器 __next__
方法中实现的基本逻辑放到一个函数中来实现,但是将每次迭代返回数值的return
换成了yield
,此时新定义的函数便不再是函数,而是一个生成器了。 -
简单来说:只要在 **def**
中有**yield**
关键字的 就称为 生成器
#!/usr/bin/env python
# -*- coding: utf-8 -*-
def fibon(n):
a = b = 1 # 定义两个变量
for i in range(n):
yield a # 遍历的时候才会生成a的值
a, b = b, a + b # 菲波那切数列需要的两个值
return 'yield生成的生成器的返回值done'
for x in fibon(10):
print(x)
'''
1
1
2
3
5
8
13
21
34
55
'''
得到yeild函数返回值
-
但是用for循环调用generator时,发现拿不到generator的return语句的返回值。 -
如果想要拿到返回值,必须捕获StopIteration错误,返回值包含在StopIteration的value中:
def create_num(all_num):
# a = 0
# b = 1
a, b = 0, 1
current_num = 0
while current_num < all_num:
# print(a)
yield a # 如果一个函数中有yield语句,那么这个就不在是函数,而是一个生成器的模板
a, b = b, a+b
current_num += 1
return "ok...."
obj2 = create_num(50)
while True:
try:
ret = next(obj2)
print(ret)
except Exception as ret:
print(ret.value)
break
23.3 生成器总结
-
使用了 yield
关键字的函数不再是函数,而是生成器。(使用了yield
的函数就是生成器) -
yield关键字有两点作用:
保存当前运行状态(断点),然后暂停执行,即将生成器(函数)挂起 将 yield
关键字后面表达式的值作为返回值返回,此时可以理解为起到了return的作用
-
可以使用next()函数让生成器从断点处继续执行,即唤醒生成器(函数) -
Python3中的生成器可以使用return返回最终运行的返回值,而Python2中的生成器不允许使用return返回一个返回值(即可以使用return从生成器中退出,但return后不能有任何表达式)。
23.4 使用send唤醒
-
执行到 yield
时,gen
函数作用暂时保存,返回i的值; -
temp接收下次 c.send("python")
,send发送过来的值,**c.next()**
等价c.send(None) -
send()
也具有next()
的方法,但是会在下一次temp = yeild i
这种类型的赋值语句时的temp赋值
#!/usr/bin/env python
# -*- coding: utf-8 -*-
def test():
i = 0
while i<5:
temp = yield i # 生成i
print(temp)
i+=1
f = test()
print(f.__next__())
f.send("ok") # 这里的ok会在下一次yeild的时候,赋值给temp
print(f.__next__())
print(f.__next__())
print(f.__next__())
'''
0
ok
None
2
None
3
None
4
'''
例子
def create_num(all_num):
a, b = 0, 1
current_num = 0
while current_num < all_num:
ret = yield a
print(">>>ret>>>>", ret)
a, b = b, a+b
current_num += 1
obj = create_num(10)
# obj.send(None) # send一般不会放到第一次启动生成器,如果非要这样做 那么传递None
ret = next(obj)
print(ret)
# send里面的数据会 传递给第5行,当做yield a的结果,然后ret保存这个结果,,,
# send的结果是下一次调用yield时 yield后面的值
ret = obj.send("hahahha")
print(ret)
23.5 生成器的next()
-
在 yield
掉所有的值后,next()
触发了一个StopIteration
的异常。 -
这个异常告诉我们,所有的值都被 yield
完了; -
next(a)
等价于a.__next__()
#!/usr/bin/env python
#coding:utf-8
def generator_function():
for i in range(3):
yield i
gen = generator_function()
print(next(gen))
# Output: 0
print(next(gen))
# Output: 1
print(next(gen))
# Output: 2
print(next(gen))
"""
Output: Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
"""
24. 协程-协程yield
24.1 协程
-
协程,又叫做 微线程。 -
协程是一种用户态的轻量级线程。 -
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合) 每次过程重入时,就相当于进入上一次调用的状态 换种说法:进入上一次离开时所处逻辑流的位置。
-
通俗的理解:在一个线程中的某个函数,可以在任何地方保存当前函数的一些临时变量等信息,然后切换到另外一个函数中执行,注意不是通过调用函数的方式做到的,并且切换的次数以及什么时候再切换到原来的函数都由开发者自己确定。
协程和线程差异
-
那么这个过程看起来比线程差不多。 -
其实不然, 线程切换从系统层面远不止保存和恢复 CPU上下文这么简单。 -
操作系统为了程序运行的高效性每个线程都有自己缓存Cache等等数据,操作系统还会帮你做这些数据的恢复操作。 – 所以线程的切换非常耗性能。 -
但是协程的切换只是单纯的操作CPU的上下文,所以一秒钟切换个上百万次,系统都抗的住。
协程的问题
-
但是协程有一个问题,就是系统并不感知,所以操作系统不会帮你做切换。 那么谁来帮你做切换?让需要执行的协程更多的获得CPU时间才是问题的关键。 -
目前的协程框架一般都是设计成 1:N 模式。所谓 1:N 就是一个线程作为一个容器里面放置多个协程。 那么谁来适时的切换这些协程?答案是有协程自己主动让出CPU,也就是每个协程池里面有一个调度器, 这个调度器是被动调度的。意思就是他不会主动调度。而且当一个协程发现自己执行不下去了(比如异步等待网络的数据回来,但是当前还没有数据到), 这个时候就可以由这个协程通知调度器,这个时候执行到调度器的代码,调度器根据事先设计好的调度算法找到当前最需要CPU的协程。 切换这个协程的CPU上下文把CPU的运行权交个这个协程,直到这个协程出现执行不下去需要等等的情况,或者它调用主动让出CPU的API之类,触发下一次调度。
24.2 使用yield实现协程
-
使用yield
便可获得了一个协程。 -
发送的值会被 yield接收
。 -
我们为什么要运行next()方法呢?
这样做正是为了启动一个协程。 就像协程中包含的生成器并不是立刻执行,而是通过 next()
方法来响应send()
方法。因此,你必须通过 next()
方法来执行yield
表达式。
#!/usr/bin/env python
# coding:utf-8
def grep(pattern):
print("Searching for", pattern)
while True:
line = (yield)
if pattern in line:
print(line)
search = grep('coroutine')
next(search) # 启动协程
#output: Searching for coroutine
search.send("I love you")
search.send("Don't you love me?")
search.send("I love coroutine instead!")
search.close() # 关闭协程
#output: I love coroutine instead!
24.3 使用yield完成多任务
import time
def task_1():
while True:
print("---1----")
time.sleep(0.1)
yield
def task_2():
while True:
print("---2----")
time.sleep(0.1)
yield
def main():
t1 = task_1()
t2 = task_2()
# 先让t1运行一会,当t1中遇到yield的时候,再返回到24行,然后
# 执行t2,当它遇到yield的时候,再次切换到t1中
# 这样t1/t2/t1/t2的交替运行,最终实现了多任务....协程
while True:
next(t1)
next(t2)
if __name__ == "__main__":
main()
25. 协程-协程greenlet
25.1 greenlet
-
为了更好使用协程来完成多任务,python中的greenlet模块对其封装,从而使得切换任务变的更加简单
25.2 安装
sudo pip install greenlet
25.3 基本使用
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from greenlet import greenlet
import time
def test1():
while True:
print ("---A--")
gr2.switch()
time.sleep(0.5)
def test2():
while True:
print("---B--")
gr1.switch()
time.sleep(0.5)
gr1 = greenlet(test1)
gr2 = greenlet(test2)
#切换到gr1中运行
gr1.switch()
26. 协程-协程gevent网络库
26.1 gevent
-
greenlet
已经实现了协程,但是这个还的人工切换,是不是觉得太麻烦了,不要捉急, -
python还有一个比greenlet更强大的并且能够自动切换任务的模块 gevent
-
其原理是当一个greenlet遇到IO(指的是input output输入输出,比如网络、文件操作等)操作时,比如访问网络,就自动切换到其他的 greenlet
,等到IO操作完成,再在适当的时候切换回来继续执行。 -
由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO
安装
pip3 install gevent
例子
import gevent
import time
def f1(n):
for i in range(n):
print(gevent.getcurrent(), i)
# time.sleep(0.5)
gevent.sleep(0.5)
def f2(n):
for i in range(n):
print(gevent.getcurrent(), i)
# time.sleep(0.5)
gevent.sleep(0.5)
def f3(n):
for i in range(n):
print(gevent.getcurrent(), i)
# time.sleep(0.5)
gevent.sleep(0.5)
print("----1---")
g1 = gevent.spawn(f1, 5)
print("----2---")
g2 = gevent.spawn(f2, 5)
print("----3---")
g3 = gevent.spawn(f3, 5)
print("----4---")
g1.join()
g2.join()
g3.join()
26.2 gevent切换执行
mport gevent
def f(n):
for i in range(n):
print(gevent.getcurrent(), i)
#用来模拟一个耗时操作,注意不是time模块中的sleep
gevent.sleep(1)
g1 = gevent.spawn(f, 5)
g2 = gevent.spawn(f, 5)
g3 = gevent.spawn(f, 5)
g1.join()
g2.join()
g3.join()
26.3 给程序打补丁
-
不改变源代码而对功能进行追加和变更,统称为“猴子补丁”。
from gevent import monkey
import gevent
import random
import time
# 有耗时操作时需要
monkey.patch_all() # 将程序中用到的耗时操作的代码,换为gevent中自己实现的模块
def coroutine_work(coroutine_name):
for i in range(10):
print(coroutine_name, i)
time.sleep(random.random())
gevent.joinall([
gevent.spawn(coroutine_work, "work1"),
gevent.spawn(coroutine_work, "work2")
])
26.4 协程-并发下载器例子
并发下载原理
from gevent import monkey
import gevent
import urllib.request
# 有耗时操作时需要
monkey.patch_all()
def my_downLoad(url):
print('GET: %s' % url)
resp = urllib.request.urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url))
gevent.joinall([
gevent.spawn(my_downLoad, 'http://www.baidu.com/'),
gevent.spawn(my_downLoad, 'http://www.itcast.cn/'),
gevent.spawn(my_downLoad, 'http://www.itheima.com/'),
])
运行结果
GET: http://www.baidu.com/
GET: http://www.itcast.cn/
GET: http://www.itheima.com/
111327 bytes received from http://www.baidu.com/.
172054 bytes received from http://www.itheima.com/.
215035 bytes received from http://www.itcast.cn/.
-
从上能够看到是先发送的获取baidu的相关信息,然后依次是itcast、itheima, -
但是收到数据的先后顺序不一定与发送顺序相同,这也就体现出了异步, -
即不确定什么时候会收到数据,顺序不一定
实现多个视频下载
from gevent import monkey
import gevent
import urllib.request
#有IO才做时需要这一句
monkey.patch_all()
def my_downLoad(file_name, url):
print('GET: %s' % url)
resp = urllib.request.urlopen(url)
data = resp.read()
with open(file_name, "wb") as f:
f.write(data)
print('%d bytes received from %s.' % (len(data), url))
gevent.joinall([
gevent.spawn(my_downLoad, "1.mp4", 'http://oo52bgdsl.bkt.clouddn.com/05day-08-%E3%80%90%E7%90%86%E8%A7%A3%E3%80%91%E5%87%BD%E6%95%B0%E4%BD%BF%E7%94%A8%E6%80%BB%E7%BB%93%EF%BC%88%E4%B8%80%EF%BC%89.mp4'),
gevent.spawn(my_downLoad, "2.mp4", 'http://oo52bgdsl.bkt.clouddn.com/05day-03-%E3%80%90%E6%8E%8C%E6%8F%A1%E3%80%91%E6%97%A0%E5%8F%82%E6%95%B0%E6%97%A0%E8%BF%94%E5%9B%9E%E5%80%BC%E5%87%BD%E6%95%B0%E7%9A%84%E5%AE%9A%E4%B9%89%E3%80%81%E8%B0%83%E7%94%A8%28%E4%B8%8B%29.mp4'),
])
原文始发于微信公众号(Python之家):Python基础-26-多任务
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。
文章由极客之音整理,本文链接:https://www.bmabk.com/index.php/post/198459.html