原创

基于ReenTrantLock 和 Condition 实现 生产者-消费者模式


任何一个Java对象,都拥有一组监视器Monitor 方法,主要包括wait()、notify()、notifyAll()方法,这些方法与synchronized关键字配合使用可以实现等待/通知机制。

类似地,Condition接口也提供类似的Object的监视器的方法,主要包括await()、signal()、signalAll()方法,这些方法与Lock锁配合使用也可以实现等待/通知机制。

Lock 替代了 synchronized 方法和语句的使用,Condition 替代了 Object 监视器方法的使用。在Condition中,用await()替换wait(),用signal()替换notify(),用signalAll()替换notifyAll()。传统线程的通信方式,Condition都可以实现。

这里注意,Condition是被绑定到Lock上的,要创建一个Lock的Condition必须用newCondition()方法。


import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
public class ProductAndConsumer {
    
    final Lock lock = new ReentrantLock();
    
    final Condition notFull = lock.newCondition();
    
    final Condition notEmpty = lock.newCondition();
    
    private Queue<Integer> queue = new LinkedList<Integer>();
    
    public void produce() {
        lock.lock();
        try {
            while(queue.size() == 3) {
                System.out.println("队列已满,生产者等待...");
                notFull.await();
            }
            int num = (int) (Math.random() * 100 + 1);
            System.out.println("生产者生产数字: " + num);
            queue.offer(num);
            notEmpty.signalAll();
        }catch(Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
    }
    
    public void consum() {
        lock.lock();
        try {
            while(queue.size() == 0) {
                System.out.println("队列为空,消费者等待...");
                notEmpty.await();
            }
            System.out.println("消费者消费数字: " + queue.poll());
            notFull.signalAll();
        }catch(Exception e) {
            e.printStackTrace();
        }finally {
            lock.unlock();
        }
        
    }
}
 

//测试代码
package ProductAndConsumer;

public class ProductAndConsumerTest {

    public static void main(String[] args) {
        ProductAndConsumer productAndConsumer = new ProductAndConsumer();
        

        Thread product = new Thread(new Runnable() {
            
            @Override
            public void run() {
                for(int i=0;i<50;i++) {
                    productAndConsumer.produce();
                }
                
            }
        });
        
        Thread consumer = new Thread(new Runnable() {
            
            @Override
            public void run() {
                for(int i=0;i<50;i++) {
                    productAndConsumer.consum();
                }
                
            }
        });
        
        product.start();
        consumer.start();
        
    }
}

输出结果

生产者生产数字: 58
生产者生产数字: 52
生产者生产数字: 77
队列已满,生产者等待...
消费者消费数字: 58
消费者消费数字: 52
消费者消费数字: 77
队列为空,消费者等待...
生产者生产数字: 52
生产者生产数字: 48
生产者生产数字: 53
队列已满,生产者等待...
消费者消费数字: 52
消费者消费数字: 48
消费者消费数字: 53
队列为空,消费者等待...
生产者生产数字: 78
生产者生产数字: 51
生产者生产数字: 34
队列已满,生产者等待...
消费者消费数字: 78
消费者消费数字: 51
消费者消费数字: 34
队列为空,消费者等待...
生产者生产数字: 47
生产者生产数字: 70
生产者生产数字: 84
队列已满,生产者等待...
消费者消费数字: 47
消费者消费数字: 70
消费者消费数字: 84
队列为空,消费者等待...
生产者生产数字: 82
生产者生产数字: 45
生产者生产数字: 80
队列已满,生产者等待...
消费者消费数字: 82
消费者消费数字: 45
消费者消费数字: 80
队列为空,消费者等待...
生产者生产数字: 50
生产者生产数字: 75
生产者生产数字: 92
Java
多线程
设计模式