Bootstrap

为您收录的操作系统系列 - 进程管理(加餐)

生产者和消费者问题是相互合作进程关系的一种抽象。例如输入进程和计算进程的关系,输入进程是生产进程,计算进程是消费者进程。计算进程和输出进程的关系中,计算进程是生产进程,输出进程是消费进程。

1.问题描述

生产者生产消息,并将消息提供给消费者消费。在生产者进程和消费者进程之间设置一个具有n个缓冲区的缓冲池(如下图所示),生产者进程可以用将它说生产的消息放入缓冲池的一个缓冲区,消费者进程可以用从缓冲区中取得一个消息消费。任意两个进程必须以互斥的方式访问公共缓冲池。当缓冲池空,没有任何消费者的消息时,消费者必须阻塞等待。当缓冲池装满消息,没有空闲缓冲区,生产者必须阻塞等待。

2.需要解决的问题
  • 实现任意两个进程对缓冲池的互斥访问,实现对生产者进程和消费者进程的“协调”,即缓冲区有消息消费者进程才能执行取消息的操作。

  • 无消息时,阻塞消费者进程。

  • 缓冲区中有空闲缓冲区时,生产者才能执行放消息的操作。

  • 无空间缓冲区时,阻塞生产进程。

3.信号量的设置
  • 设置一个互斥信号量 mutex ,用户实现对公共缓冲区的互斥访问,初始值为 1。

  • 设置两个资源信号量(分别表示可用资源数量)

empty 表示缓冲区中空缓冲区数,初值为 n。

full 表示装有消息的缓冲区数,初始值为 0 (一个缓冲区放一个消息) 。

JAVA 代码如下:主要完成共享资源的互斥访问。代码采用线程模拟进程操作。

package message;


import java.util.concurrent.atomic.AtomicInteger;

/**
 * 生产者进程可以用将它说生产的消息放入缓冲池的一个缓冲区,
 * 消费者进程可以用从缓冲区中取得一个消息消费。
 * 任意两个进程必须以互斥的方式访问公共缓冲池。
 * 当缓冲池空,没有任何消费者的消息时,消费者必须阻塞等待。
 * 当缓冲池装满消息,没有空闲缓冲区,生产者必须阻塞等待。
 */
public class MsgQueue {


    //缓冲区大小
    public static final int capacity = 4;

    /**
     * 设置一个互斥信号量  mutex ,用户实现对公共缓冲区的互斥访问,初始值为 1.
     */
    private  static AtomicInteger mutex = new AtomicInteger(1);

    /**
     * 缓冲内容区域
     */
    public static String[] emptyContent = new String[capacity];
    /**
     *  设置一个具有n个缓冲区的缓冲池
     *  empty 表示缓冲区中空缓冲区数,初值为n。
     */
    public static volatile AtomicInteger empty = new AtomicInteger(capacity);

    /**
     *     full 表示装有消息的缓冲区数,初始值为0.(一个缓冲区放一个消息)
     *     声明 Using volatile variables reduces the risk of memory consistency errors
     *     https://docs.oracle.com/javase/tutorial/essential/concurrency/atomic.html
     */
    public static volatile AtomicInteger full  = new AtomicInteger(0);


    public synchronized static void waitMsgQueue(String msg){
        while (mutex.get() <= 0){
            //System.out.println(msg + ",等待消息缓冲区释放");
            try {
                Thread.sleep(1000);
            }catch (Exception e){

            }
        }
        mutex.set(0);
        //System.out.println(msg + ",锁定消息缓冲区");
    }


    public static void signalMsgQueue(String msg){
        mutex.set(1);
        //System.out.println(msg +",释放消息缓冲区");
    }


    /**
     * 将消息放入 in 指针指向的缓冲区
     * @param in
     * @param msg
     */
    public static int addContent(int in,String msg){

        while (empty.get() <= 0){
            System.out.println("队列已满!");
        }

        //将消息放入 in 指针指向的缓冲区
        MsgQueue.emptyContent[in] = msg;

        //System.out.println(msg+",消息队列下标位置="+in+",内容:"+msg);

        System.out.println("生产者-"+MsgQueue.msg());

        return empty.addAndGet(-1);

    }

    /**
     * 从 out 指针指向的缓冲区中区消息
     * @param out
     */
    public static String getContent(int out){

        //将消息放入 in 指针指向的缓冲区
        String content = MsgQueue.emptyContent[out];



        return content;

    }


    /**
     * 输出消息内容
     */
    public static String msg(){
        StringBuffer msg = new StringBuffer("当前队列数据:");
        for (int i = 0; i < emptyContent.length; i++) {
            msg.append("[").append(emptyContent[i]).append("]");
        }
        return msg.toString();
    }


    /**
     * 申请空缓冲区
     */
    public synchronized static void waitProducer(String msg){
        while (MsgQueue.full.get() >= MsgQueue.capacity){
            System.out.println(msg + ",申请空缓冲区失败,缓冲区已满");
            try {
                Thread.sleep(1000);
            }catch (Exception e){

            }
        }
        MsgQueue.full.addAndGet(1);
       // System.out.println(msg + ",申请空缓冲区成功");
    }


    /**
     * 释放消息资源
     */
    public synchronized static void signalProducer(String msg){
        MsgQueue.full.addAndGet(-1);
        //System.out.println(msg+",释放消息资源 full = "+MsgQueue.full.get());
    }
}
4.同步程序

利用记录信号量机制实现 Producer。

  • 申请空缓冲区

  • 申请公共缓冲区池的互斥访问权限

  • 将消息放入 in 指针指向的缓冲区

  • in 指针指向下一个空缓冲区

  • 释放对公共缓冲区的互斥访问权

  • 释放消息资源

Producer 使用 JAVA 代码实现如下:

package message;

import java.util.concurrent.atomic.AtomicInteger;


/**
 *     申请空缓冲区
 *     申请公共缓冲区池的互斥访问权限
 *     将消息放入 in 指针指向的缓冲区
 *     in 指针指向下一个空缓冲区
 *     释放对公共缓冲区的互斥访问权
 *     释放消息资源
 *
 */
public class ProducerDemo {

    /**
     * 缓冲区地址
     */
    private static AtomicInteger in  = new AtomicInteger(0);

    /**
     * 生产消息:
     * @param msg
     */
    public static void produce(String msg){

        //申请空缓冲区
        MsgQueue.waitProducer(msg);

        //申请公共缓冲区池的互斥访问权限
        MsgQueue.waitMsgQueue(msg);

        //将消息放入 in 指针指向的缓冲区
        int n = MsgQueue.addContent(in.get(),msg);

        //System.out.println(msg + ",当前队列大小 n = "+n);

        in.addAndGet(1);

        //in 指针指向下一个空缓冲区
        int next = in.get() % MsgQueue.capacity;

        //System.out.println(msg + ",下一个空缓冲区 next = "+next);

        in.set(next);

        //释放对公共缓冲区的互斥访问权
        MsgQueue.signalMsgQueue(msg);

        //释放消息资源
        MsgQueue.signalProducer(msg);

    }
}

Consumer 申请消息

  • 申请公共资源的互斥访问权限

  • 从 out 指针指向的缓冲区中区消息

  • out 指针指向下一个装有消息的缓冲区

  • 释放对公共缓冲池额互斥访问权

  • 释放缓冲区

Consumer 使用 JAVA 代码实现如下:

package message;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 申请消息
 * 申请公共资源的互斥访问权限
 * 从 out 指针指向的缓冲区中区消息
 * out 指针指向下一个装有消息的缓冲区
 * 释放对公共缓冲池额互斥访问权
 * 释放缓冲区
 */
public class ConsumerDemo {

    /**
     * 缓冲区地址
     */
    private static volatile AtomicInteger out  = new AtomicInteger(0);

    /**
     * 消费消息:
     */
    public static void consumer(String msg){

        //申请空缓冲区
        waitConsumer(msg);

        //申请公共缓冲区池的互斥访问权限
        MsgQueue.waitMsgQueue(msg);

        //System.out.println("消费者,消费前-"+MsgQueue.msg());

        //从 out 指针指向的缓冲区中区消息
        String content = MsgQueue.getContent(out.get());

        System.out.println(msg+",获取队列中的内容"+content);

        MsgQueue.emptyContent[out.get()] = null;

        System.out.println("消费者,消费后-"+MsgQueue.msg());

        //out 指针指向下一个装有消息的缓冲区
        int outIdx = out.addAndGet(1) % MsgQueue.capacity;

        out.set(outIdx);

        //释放对公共缓冲区的互斥访问权
        MsgQueue.signalMsgQueue(msg);

        //释放消息资源å
        signalConsumer();

    }


    /**
     * 申请空缓冲区
     */
    public synchronized static void waitConsumer(String msg){
        while (MsgQueue.empty.get() == MsgQueue.capacity){
            //System.out.println(msg + ",消息队列中无数据,等待中...");
            System.out.println(".");
            try {
                Thread.sleep(500);
            }catch (Exception e){

            }
        }
        //System.out.println(msg + ",获取缓冲区数据");
    }


    /**
     * 释放消息资源
     */
    public synchronized static void signalConsumer(){
        //to do nothing
        MsgQueue.empty.addAndGet(1);
    }


}

测试用例执行代码如下:

package message;

import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 测试用例
 */
public class ProducerAndConsumerTestDemo {


    public static void main(String[] args) {


        ThreadPoolExecutor executor =
                new ThreadPoolExecutor(
                        10,
                        10,
                        10,
                        TimeUnit.SECONDS,
                        new ArrayBlockingQueue<>(100)) ;



        for (int i = 0; i < MsgQueue.capacity; i++) {
            System.out.println("生产者-提交生产任务[" +i+"]次");
            final int id = i;
            Runnable p1 =  new Runnable() {
                public void run() {
                    //
                    long threadId = Thread.currentThread().getId();
                    try {
                        Thread.sleep(2000);
                    }catch (Exception e){

                    }
                    ProducerDemo.produce("线程ID:"+threadId+",生产者-息序列ID = "+ id);
                }
            };

            executor.execute(p1);
        }


        try {
            Thread.sleep(10000);
        }catch (Exception e){

        }

       for (int i = 0; i < MsgQueue.capacity; i++) {
            System.out.println("消费者-提交消费任务[" +i+"]次");
            final int id = i;
            Runnable p1 =  new Runnable() {
                public void run() {
                    //
                    long threadId = Thread.currentThread().getId();
                    try {
                        Thread.sleep(2000);
                    }catch (Exception e){

                    }
                    ConsumerDemo.consumer("线程ID:"+threadId+",消费者-提交消费任务ID = "+ id);
                }
            };

            executor.execute(p1);
        }

        //executor.shutdown();

    }
}

执行输出:

5.说明
  • wait 和 signal 必须成对出现 。

  • wait 的操作顺序不能颠倒,必须现对资源型号量(empty ,full 进程)进行 wait 操作。然后再对互斥信号量进行 wait 操作。

  • 用记录信号量机制解决生产者-消费者问题,对就有相互合作关系的进程,提供解决问题的模型。

欢迎大家的留言讨论。按惯例最后分享一首诗给大家。

鼓声响起。

它的节奏,就像我的心跳。

在鼓点中,一个声音说道:

“我知道你累了,

但是,来吧,这就是你的道路。”

相关收藏

(操作系统-进程控制与同步)

(操作系统-进程概述)

(操作系统-发展简介)