观察Java的并发编程时,手写“出产者-消费者模子”是一个经典问题。有如下几个考点:
本文主要归纳了4种写法,阅读后,最亏得白板上操练几遍,查抄本身是否把握。这4种写法可能编程接口差异,可能并发粒度差异,但本质是沟通的——都是在利用或实现BlockingQueue。
出产者-消费者模子
网上有许多出产者-消费者模子的界说和实现。本文研究最常用的有界出产者-消费者模子,简朴归纳综合如下:
可通过如下条件验证模子实现的正确性:
该模子的应用和变种很是多,不赘述。
几种写法
筹备
口试时可语言说明以下筹备代码。要害部门需要实现,如AbstractConsumer。
下面会涉及多种出产者-消费者模子的实现,可以先抽象出要害的接口,并实现一些抽象类:
public interface Consumer { void consume() throws InterruptedException; }
public interface Producer { void produce() throws InterruptedException; }
abstract class AbstractConsumer implements Consumer, Runnable { @Override public void run() { while (true) { try { consume(); } catch (InterruptedException e) { e.printStackTrace(); break; } } } }
abstract class AbstractProducer implements Producer, Runnable { @Override public void run() { while (true) { try { produce(); } catch (InterruptedException e) { e.printStackTrace(); break; } } } }
差异的模子实现中,出产者、消费者的详细实现也差异,所以需要为模子界说抽象工场要领:
public interface Model { Runnable newRunnableConsumer(); Runnable newRunnableProducer(); }
我们将Task作为出产和消费的单元:
public class Task { public int no; public Task(int no) { this.no = no; } }
假如需求还不明晰(这切合大部门工程事情的实际环境),发起边实现边抽象,不要“面向将来编程”。
实现一:BlockingQueue
BlockingQueue的写法最简朴。焦点思想是,把并发和容量节制封装在缓冲区中。而BlockingQueue的性质天生满意这个要求。
public class BlockingQueueModel implements Model { private final BlockingQueue<Task> queue; private final AtomicInteger increTaskNo = new AtomicInteger(0); public BlockingQueueModel(int cap) { // LinkedBlockingQueue 的行列是 lazy-init 的,但 ArrayBlockingQueue 在建设时就已经 init this.queue = new LinkedBlockingQueue<>(cap); } @Override public Runnable newRunnableConsumer() { return new ConsumerImpl(); } @Override public Runnable newRunnableProducer() { return new ProducerImpl(); } private class ConsumerImpl extends AbstractConsumer implements Consumer, Runnable { @Override public void consume() throws InterruptedException { Task task = queue.take(); // 固按时间范畴的消费,模仿相对不变的处事器处理惩罚进程 Thread.sleep(500 + (long) (Math.random() * 500)); System.out.println("consume: " + task.no); } } private class ProducerImpl extends AbstractProducer implements Producer, Runnable { @Override public void produce() throws InterruptedException { // 不按期出产,模仿随机的用户请求 Thread.sleep((long) (Math.random() * 1000)); Task task = new Task(increTaskNo.getAndIncrement()); queue.put(task); System.out.println("produce: " + task.no); } } public static void main(String[] args) { Model model = new BlockingQueueModel(3); for (int i = 0; i < 2; i++) { new Thread(model.newRunnableConsumer()).start(); } for (int i = 0; i < 5; i++) { new Thread(model.newRunnableProducer()).start(); } } }
截取前面的一部门输出:
produce: 0 produce: 4 produce: 2 produce: 3 produce: 5 consume: 0 produce: 1 consume: 4 produce: 7 consume: 2 produce: 8 consume: 3 produce: 6 consume: 5 produce: 9 consume: 1 produce: 10 consume: 7