IPC生产者与消费者模型加线程

 

共享内存方式(IPC进程间通信)

一个进程在内存中就是对应着一块独立的内存空间

进程间通信的方式:

1.管道:只能单向通讯,数据都是二进制

2.文件:在硬盘上创建共享文件

  优点:数据量无限制

  缺点:传输速度慢

3.socket:编程复杂度高

4.共享内存:必须由操作系统来分配

  优点;传输速度快

  缺点:数据量较小

 

队列:

  是一种特殊的数据结构,先进先出

堆栈:先进后出,像叠衣服一样

扩展:函数嵌套调用时,顺序是先进后出,也称之为函数栈

分享图片
from multiprocessing import Queue

q = Queue(3)    # 建队列  不指定maxsize 则没有数量限制
q.put(1)
q.put(2)
q.put(3)
print(q.get())   # 1
q.put(4)    # 如果容量已经满了,在调用put时将进入阻塞状态 直到有人从队列中拿走数据有空位置 才会继续执行

print(q.get())
print(q.get())
print(q.get())
print(q.get())    # 如果队列已经空了,在调用get时将进入阻塞状态 直到有人从存储了新的数据到队列中 才会继续

#block 表示是否阻塞 默认是阻塞的   # 当设置为False 并且队列为空时 抛出异常
q.get(block=True,timeout=2)
# block 表示是否阻塞 默认是阻塞的   # 当设置为False 并且队列满了时 抛出异常
# q.put("123",block=False,)
# timeout 表示阻塞的超时时间 ,超过时间还是没有值或还是没位置则抛出异常  仅在block为True有效
View Code

生产者消费者模型:

生产者:生产数据的一方

消费者:处理数据的一方

出现问题的原因:就是因为生产者与消费者之间的供需不平衡

解决问题:用来双方是耦合,消费者必须等待生产者生产完毕才能够开始处理。

选择将双方分开来,一方只负责生产,另一方指负责取,需要一个共同的容器作为中间介质。

分享图片
import time
import random
from multiprocessing import Process,Queue
def task1(q):
    for i in range(10):
        time.sleep(random.randint(0, 2))
        print("正在烧%s盘菜" % i)

        rose = "%s盘菜" % i
        q.put(rose)


def task2(q):
    for i in range(10):
        rose = q.get()
        print(rose,"正在消费")
        time.sleep(random.randint(0, 2))
        print(rose,"消费完成")


if __name__ == __main__:
    q = Queue()
    p1 = Process(target=task1, args=(q,))
    p2 = Process(target=task2, args=(q,))
    p1.start()
    p2.start()
View Code

还有问题:

生产者结束后,消费者还在循环吃,并不知道已经结束了。会在原地一致等待生产者生产数据。

解决:

用到了joinablequeue这个模块

意思:可以被join的队列

import time
import random
from multiprocessing import JoinableQueue, Process


def make_hotdog(name, q):
    for i in range(5):
        time.sleep(random.randint(1, 3))
        print("%s生产了热狗%s" % (name, i))
        q.put("%s的%s号热狗" % (name, i))

def eat_hotdog(name, q):
    while True:
        hotdog = q.get()
        time.sleep(random.randint(1, 3))
        print("%s吃掉了%s" % (name, hotdog))
        # 每次处理完成一个数据,必须记录该数据
        q.task_done()

if __name__ == __main__:
    q = JoinableQueue()

    p1 = Process(target=make_hotdog, args=("jerry", q))
    p2 = Process(target=make_hotdog, args=("monkey", q))
    p3 = Process(target=make_hotdog, args=("owen", q))

    c1 = Process(target=eat_hotdog, args=("思聪", q))
    c1.daemon = True
    c2 = Process(target=eat_hotdog, args=("健林", q))
    c2.daemon = True

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    # 明确商家生产完毕,在明确消费者吃完了,就算结束
    p1.join()
    print("第一家生产完毕")

    p2.join()
    print("第二家生产完毕")

    p3.join()
    print("第三家生产完毕")

    # 消费者吃完了
    q.join()
    print("消费者吃完了")

多线程:

什么是线程:线程指的是一条流水线的工作过程。

线程是cpu最小的执行单位,是操作系统最小的调度单位。

对比于进程而言,进程只是一个资源单位,为线程在执行过程需要用到的数据提供资源,就好比进程是一个车间,而线程就是车间里面的流水线。

特点:
1.每个进程都会至少有一个线程,是由操作系统分配的。

2.进程内可以有多个线程

3.一个进程内的多个线程资源是共享的

4.线程的创建开销远比进程小的多。

主线程与子线程的区别:

线程之间是平等的

2.主线程由操作系统分配,子线程由程序开启

3.主线程的代码执行完毕,主线程并没有结束而是会等其他子线程代码运行完毕后,线程才会结束。

 开启线程的2种方式(和进程一样的,只是不用写在__main__里面

分享图片
# 方式一 直接实例化Thread类
from threading import Thread

def task():
    print("子线程 is running")

t = Thread(target=task)
t.start()
# 执行的顺序不固定,一般来说,开启子线程的速度远远高于继续执行主线程的速度
print("主线程 is runnning")
#
# # 方式二:自定义类继承thread类
class MyThread(Thread):
    def run(self):
        print("子线程 is running")

t = MyThread()
t.start()
print("主线程 is running")
View Code

同一个进程之间数据共享

import time,os
from threading import Thread

x = 100

def task():
    global x
    x = 10
    print("线程",os.getpid())   # 与主线程的pid相同


t = Thread(target=task)
t.start()
# 主线程等待子线程执行完毕
time.sleep(3)
print(x)   # 10
print("主线程",os.getpid())

守护线程:

与守护进程的区别就是,守护进程在被守护进程结束后会立即结束并不会等其他非守护进程,

而守护线程则是会等待其他非守护线程结束而结束,当然2个都可以提前结束、

import time
from threading import Thread


def task1():
    print("子进程1 is running")
    time.sleep(3)
    print("子进程1 is over")

def task2():
    print("子进程2 is running")
    time.sleep(2)
    print("子进程2 is over")

t1 = Thread(target=task1)
t2 = Thread(target=task2)
t1.daemon = True
t1.start()
t2.start()
print("主进程 Game Over")

互斥锁:

多线程最主要的特征之一就是:一个进程内的多个线程资源共享。

资源共享就是带来竞争问题,所以要加锁

from threading import Lock, enumerate,Thread

import time

num = 10
lock = Lock()
def task():

for i in range(10):
    t = Thread(target=tas
    global num
    lock.acquire()
    a = num
    time.sleep(0.1)
    num = a - 1
    lock.release()k)
    t.start()
for t in enumerate()[1:]:
    t.join()
print(num)
相关文章
相关标签/搜索