- 创建生产者。
public class Producer implements Runnable {
/**
* 是否循环
*/
private volatile boolean isRunning = true;
/**
* 休眠时间
*/
private final static int SLEEP_TIME = 1000;
/**
* 阻塞队列
*/
private BlockingQueue<String> strQueue;
/**
* 序号
*/
private static AtomicInteger serial = new AtomicInteger();
/**
* 构造方法
*/
public Producer(BlockingQueue<String> strQueue) {
this.strQueue = strQueue;
}
/**
* 重写run方法
*/
@Override
public void run() {
// 当isRunning为true的时候,循环进行生产
while (isRunning) {
try {
if (!strQueue.offer("" + serial.getAndAdd(1), 2, TimeUnit.SECONDS)) {
System.out.println("队列的数量为:" + strQueue.size() + ",无法加入队列");
} else {
System.out.println("【生产者】" + Thread.currentThread().getName() + "生产" + serial.get());
}
// 线程休眠
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 停止生产
*/
public void stopProduct() {
isRunning = false;
}
}
- 创建消费者。
public class Consumer implements Runnable {
/**
* 阻塞队列
*/
private BlockingQueue<String> strQueue;
/**
* 是否支持消费
*/
private volatile boolean isConsumer = true;
/**
* 休眠时间
*/
private final static int SLEEP_TIME = 5000;
/**
* 构造方法
*/
public Consumer(BlockingQueue<String> strQueue) {
this.strQueue = strQueue;
}
/**
* 重写run方法
*/
@Override
public void run() {
// 当isConsumer为true的时候,循环进行消费
while (isConsumer) {
// 从队列中获取字符串
String str = strQueue.poll();
// 若字符串存在,则消费,若不存在则提示
if (str != null) {
System.out.println("【消费者】" + Thread.currentThread().getName() + "消费" + str);
} else {
System.out.println("队列数量剩余" + strQueue.size() + ",没有字符串可以消费");
}
// 线程休眠
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 停止消费
*/
public void stopConsumer() {
isConsumer = false;
}
}
- 测试方法。
public static void main(String[] args) throws InterruptedException {
// 创建一个阻塞队列,容量为200
BlockingQueue<String> strQueue = new LinkedBlockingQueue<>(200);
// 创建单个生产者
Producer producer = new Producer(strQueue);
// 创建多个消费者
Consumer consumer1 = new Consumer(strQueue);
Consumer consumer2 = new Consumer(strQueue);
// 使用线程池执行
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(producer);
executorService.execute(consumer1);
executorService.execute(consumer2);
// 让主线程休眠,这里是为了查看执行的结果
Thread.sleep(20000);
// 停止生产和消费
producer.stopProduct();
consumer1.stopConsumer();
consumer2.stopConsumer();
// 最后关闭线程池
executorService.shutdown();
}