1.进程互斥锁
让并发变成串行,牺牲了效率,保证数据安全.
mutex = Lock()
# 加锁
? mutex.acquire()
# 释放锁
? mutex.release()
2.队列:
相当于在内存中开启了一个空间,可以存放一堆数据,这堆数据都得遵循"先进先出".
管道(阻塞) + 锁
q = Queue()
# 添加数据
? q.put(1)
# 若队列满了,会原地等待
? q.put(2)
# 若队列满了,不会等待直接报错
? q.put_nowait(2)
获取数据,遵循先进先出
若队列中没数据,会原地等待
q.get() # 1
若队列中没数据,会直接报错
q.get_nowait() # 1
q.empty() # 判断队列是否为空
q.full() # 判断队列是否满了
3.IPC进程间通信
通过队列让进程间实现通信.
4.生产者与消费者
生产者: 生产数据的
消费者; 使用数据的
目的: 解决供需不平衡问题.
通过队列来实现,生产者消费者供需不平衡问题.
5.线程
1.什么是线程?
进程: 资源单位
线程: 执行单位
注意: 只要开启一个进程就会有一个线程(主线程).
主线程会在进程结束时,一并销毁.
2.为什么要使用线程?
节省内存资源
开启进程:
1) 开辟一个新的内存空间
2) 会自带一个主线程
开启线程:
1) 一个进程内可以开启多个线程
2) 开启线程的资源远小于进程
创建线程的两种方式
一:
from threading import Thread
def task():
pass
t = Thread(target=task) # 异步提交任务,开启线程
t.start()
t.join() # 主线程等待子线程结束之后再结束.
二:
class MyThread(Thread):
def run(self):
执行任务
? pass
t = MyThread()
t.start()
t.join()
6.线程对象的属性
current_thread().name # 获取当前线程对象的名字
# 返回一个列表,列表中包含当前执行的所有线程对象
print(enumerate())
# 获取当前执行线程的个数
print(activeCount())
is_alive() # 判断线程是否存活
7.线程互斥锁
from threading import Lock()
mutex = Lock()
mutex.acquire()
t1
mutex.release()
''' 服务端的工作: 1.接收客户端的请求 2.24小时不间断提供服务 3.实现并发 ''' import socket import time from threading import Thread server = socket.socket() server.bind( ('127.0.0.1', 9527) ) server.listen(5) print('启动服务端...') # 线程任务,执行接收客户端消息与发送消息给客户端 def working(conn): while True: try: data = conn.recv(1024) if len(data) == 0: break print(data) time.sleep(1) conn.send(data.upper()) except Exception as e: print(e) break conn.close() while True: conn, addr = server.accept() print(addr) t = Thread(target=working, args=(conn, )) t.start()xxxxxxxxxx?'''服务端的工作: ? 1.接收客户端的请求 ? 2.24小时不间断提供服务 ? 3.实现并发'''import socketimport timefrom threading import Threadserver = socket.socket()server.bind( ? ('127.0.0.1', 9527))server.listen(5)print('启动服务端...')# 线程任务,执行接收客户端消息与发送消息给客户端def working(conn): ? while True: ? ? ? try: ? ? ? ? ? data = conn.recv(1024) ? ? ? ? ? if len(data) == 0: ? ? ? ? ? ? ? break ? ? ? ? ? print(data) ? ? ? ? ? time.sleep(1) ? ? ? ? ? conn.send(data.upper()) ? ? ? except Exception as e: ? ? ? ? ? print(e) ? ? ? ? ? break ? conn.close()while True: ? conn, addr = server.accept() ? print(addr) ? t = Thread(target=working, args=(conn, )) ? t.start()'''服务端的工作: ? 1.接收客户端的请求 ? 2.24小时不间断提供服务 ? 3.实现并发'''import socketimport timefrom threading import Threadserver = socket.socket()server.bind( ? ('127.0.0.1', 9527))server.listen(5)print('启动服务端...')# 线程任务,执行接收客户端消息与发送消息给客户端def working(conn): ? while True: ? ? ? try: ? ? ? ? ? data = conn.recv(1024) ? ? ? ? ? if len(data) == 0: ? ? ? ? ? ? ? break ? ? ? ? ? print(data) ? ? ? ? ? time.sleep(1) ? ? ? ? ? conn.send(data.upper()) ? ? ? except Exception as e: ? ? ? ? ? print(e) ? ? ? ? ? break ? conn.close()while True: ? conn, addr = server.accept() ? print(addr) ? t = Thread(target=working, args=(conn, )) ? t.start()
import socket import time client = socket.socket() client.connect( ('127.0.0.1', 9527) ) print('启动客户端...') while True: client.send(b'hello') data = client.recv(1024) print(data) time.sleep(1)
''' In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.) 在CPython中,全局解释器锁(GIL)是一个防止多个锁的互斥锁 本机线程从执行Python字节码一次。这把锁主要是必须的 因为CPython的内存管理不是线程安全的。(然而,自从GIL 存在时,其他功能已逐渐依赖于它所实施的保证。) ''' ''' python解释器: 1.Cpython C 2.Jpython java 3.Ppython Python GIL全局解释器锁: 基于Cpython来研究全局解释器锁. 1.GIL本质上是一个互斥锁. 2.GIL的为了阻止同一个进程内多个线程同时执行(并行) - 单个进程下的多个线程无法实现并行,但能实现并发 3.这把锁主要是因为CPython的内存管理不是 "线程安全" 的. - 内存管理 - 垃圾回收机制 GIL的存在就是为了保证线程安全的. 注意: 多个线程过来执行,一旦遇到IO操作,就会立马释放GIL解释器锁,交给下一个先进来的线程. ''' import time from threading import Thread, current_thread number = 100 def task(): global number number2 = number # time.sleep(1) number = number2 - 1 print(number, current_thread().name) for line in range(100): t = Thread(target=task) t.start()
''' 多线程的作用: 站在两个角度去看问题: - 四个任务, 计算密集型, 每个任务需要10s: 单核: - 开启进程 消耗资源过大 - 4个进程: 40s - 开启线程 消耗资源远小于进程 - 4个线程: 40s 多核: - 开启进程 并行执行,效率比较高 - 4个进程: 10s - 开启线程 并发执行,执行效率低. - 4个线程: 40s - 四个任务, IO密集型, 每个任务需要10s: 单核: - 开启进程 消耗资源过大 - 4个进程: 40s - 开启线程 消耗资源远小于进程 - 4个线程: 40s 多核: - 开启进程 并行执行,效率小于多线程,因为遇到IO会立马切换CPU的执行权限 - 4个进程: 40s + 开启进程消耗的额外时间 - 开启线程 并发执行,执行效率高于多进程 - 4个线程: 40s ''' from threading import Thread from multiprocessing import Process import os import time # 计算密集型 def work1(): number = 0 for line in range(100000000): number += 1 # IO密集型 def work2(): time.sleep(1) if __name__ == '__main__': # 测试计算密集型 # print(os.cpu_count()) # 6 # # 开始时间 # start_time = time.time() # list1 = [] # for line in range(6): # p = Process(target=work1) # 程序执行时间5.300818920135498 # # p = Thread(target=work1) # 程序执行时间24.000795602798462 # # list1.append(p) # p.start() # IO密集型 print(os.cpu_count()) # 6 # 开始时间 start_time = time.time() list1 = [] for line in range(40): # p = Process(target=work2) # 程序执行时间4.445072174072266 p = Thread(target=work2) # 程序执行时间1.009237289428711 list1.append(p) p.start() for p in list1: p.join() end_time = time.time() print(f'程序执行时间{end_time - start_time}') ''' 在计算密集型的情况下: 使用多进程 在IO密集型的情况下: 使用多线程 高效执行多个进程,内多个IO密集型的程序: 使用 多进程 + 多线程 '''
''' 死锁现象(了解): ''' from threading import Lock, Thread, current_thread import time mutex_a = Lock() mutex_b = Lock() # # print(id(mutex_a)) # print(id(mutex_b)) class MyThread(Thread): # 线程执行任务 def run(self): self.func1() self.func2() def func1(self): mutex_a.acquire() # print(f'用户{current_thread().name}抢到锁a') print(f'用户{self.name}抢到锁a') mutex_b.acquire() print(f'用户{self.name}抢到锁b') mutex_b.release() print(f'用户{self.name}释放锁b') mutex_a.release() print(f'用户{self.name}释放锁a') def func2(self): mutex_b.acquire() print(f'用户{self.name}抢到锁b') # IO操作 time.sleep(1) mutex_a.acquire() print(f'用户{self.name}抢到锁a') mutex_a.release() print(f'用户{self.name}释放锁a') mutex_b.release() print(f'用户{self.name}释放锁b') for line in range(10): t = MyThread() t.start() ''' 注意: 锁不能乱用. '''
''' 递归锁(了解): 用于解决死锁问题. RLock: 比喻成万能钥匙,可以提供给多个人去使用. 但是第一个使用的时候,会对该锁做一个引用计数. 只有引用计数为0, 才能真正释放让另一个人去使用 ''' from threading import RLock, Thread, Lock import time mutex_a = mutex_b = Lock() class MyThread(Thread): # 线程执行任务 def run(self): self.func1() self.func2() def func1(self): mutex_a.acquire() # print(f'用户{current_thread().name}抢到锁a') print(f'用户{self.name}抢到锁a') mutex_b.acquire() print(f'用户{self.name}抢到锁b') mutex_b.release() print(f'用户{self.name}释放锁b') mutex_a.release() print(f'用户{self.name}释放锁a') def func2(self): mutex_b.acquire() print(f'用户{self.name}抢到锁b') # IO操作 time.sleep(1) mutex_a.acquire() print(f'用户{self.name}抢到锁a') mutex_a.release() print(f'用户{self.name}释放锁a') mutex_b.release() print(f'用户{self.name}释放锁b') for line in range(10): t = MyThread() t.start()
''' 信号量(了解): 互斥锁: 比喻成一个家用马桶. 同一时间只能让一个人去使用 信号量: 比喻成公厕多个马桶. 同一时间可以让多个人去使用 ''' from threading import Semaphore, Lock from threading import current_thread from threading import Thread import time sm = Semaphore(5) # 5个马桶 mutex = Lock() # 5个马桶 def task(): # mutex.acquire() sm.acquire() print(f'{current_thread().name}执行任务') time.sleep(1) sm.release() # mutex.release() for line in range(20): t = Thread(target=task) t.start()
''' 线程Q(了解级别1): 线程队列 面试会问: FIFO - FIFO队列: 先进先出 - LIFO队列: 后进先出 - 优先级队列: 根据参数内,数字的大小进行分级,数字值越小,优先级越高 ''' import queue # 普通的线程队列: 先进先出 # q = queue.Queue() # q.put(1) # q.put(2) # q.put(3) # print(q.get()) # 1 # LIFO队列: 后进先出 # q = queue.LifoQueue() # q.put(1) # q.put(2) # q.put(3) # print(q.get()) # 3 # 优先级队列 q = queue.PriorityQueue() # 超级了解 # 若参数中传的是元组,会以元组中第一个数字参数为准 q.put(('a优', '先', '娃娃头', 4)) # a==97 q.put(('a先', '优', '娃娃头', 3)) # a==98 q.put(('a级', '级', '娃娃头', 2)) # a==99 ''' 1.首先根据第一个参数判断ascii表的数值大小 2.判断第个参数中的汉字顺序. 3.再判断第二参数中数字--> 字符串数字 ---> 中文 4.以此类推 ''' print(q.get())