Java集合的BlockingQueue
簡介。
Java的BlockingQueue
介面屬於Java集合框架(Java Collections Framework)的一員,繼承Queue
介面。
+---------------+
| Collection |
+---------------+
^
extennds
|
+---------------+
| Queue |
+---------------+
^
extends
|
+---------------+
| BlockingQueue |
+---------------+
BlockingQueue
位在java.util.concurrent
包,下有多個併行集合(Concurrent Collections)的實作類別:
ArrayBlockingQueue
DelayQueue
LinkedBlockingDeque
LinkedBlockingQueue
LinkedTransferQueue
PriorityBlockingQueue
SynchronousQueue
BlockingQueue
除了具有Queue
先進先出(FIFO)特性及操作元素進出的方法,還提供多執行緒等待元素進出的操作。
一般Queue
在放入或取出元素失敗時立即會拋出例外或返回特定值;BlockingQueue
則多了執行緒在放入或取出元素無法立即滿足時會先被block(阻塞),直到該操作在之後可以成功。
下面是BlockingQueue
各種操作元素進出的方法。紅色部分為多執行緒的阻塞操作。
拋出例外 | 回傳特定值 | 阻塞(Blocks) | 逾時 | |
---|---|---|---|---|
Insert 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
Remove 刪除(取出) | remove() | poll() | take() | poll(time, unit) |
Examine 檢視 | element() | peek() | 不適用 | 不適用 |
put(e)
如果隊列滿了插入失敗,則該執行緒會被阻塞直到有空間為止。
offer(e, time, unit)
如果隊列滿了插入失敗,則該執行緒會被阻塞一段指定時間直到有空間,若超過時間仍無空間則返回false。
take()
如果隊列是空的取出失敗,則該執行緒會被阻塞直到有元素加入。
poll(time, unit)
如果隊列是空的取出失敗,則該執行緒會被阻塞一段指定時間直到有元素加入,若超過時間仍無空間則返回null。
BlockingQueue
又分為有界隊列(bounded blocking queue)及無界隊列(unbounded blocking queue),差別為有無最大空間限制。
有界隊列空間若達到上限(空間滿了),當用put(e)
放入新元素至隊列時會被阻塞;
無界隊列理論上沒有空間限制,實際上為Integer.MAX_VALUE
(231-1),所以幾乎不會阻塞。
BlockingQueue
的實作都是執行緒安全的(thread-safe),隊列的操作方法皆為原子操作,但屬於Collection的批次操作如addAll
、containsAll
、retainAll
、removeAll
則不保證。
BlockingQueue
的實作主要用在生產者-消費者情境的隊列。典型的範例為Java任務執行器ThreadPoolExecutor
中的任務隊列。
下面以生產者-消費者演示BlockingQueue
的阻塞效果。Producer
及Consumer
為執行緒,Producer
生產東西放入BlockingQueue
交給Consumer
消費。Producer
生產速度較快,每1秒生產一個;Consumer
消費速度較慢,每5秒消費一個。BlockingQueue
隊列空間為2。由於生產速度超過消費速度,可以看到隊列滿的時候生產者必須等待(被阻塞)。直到消費者消費完在從隊列中取出消費。
Producer
package com.abc.demo;
import java.util.concurrent.BlockingQueue;
public class Producer extends Thread {
private final BlockingQueue<Integer> queue; // 與Consumer共用
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
for (int i = 1; i <= 5; i++) {
try {
Thread.sleep(1000); // 模擬每1秒生產1個
long startPutTime = System.currentTimeMillis();
queue.put(i);
long endPutTime = System.currentTimeMillis();
long blockTime = (endPutTime - startPutTime) / 1000;
System.out.printf("Producer:第%d個元素已放入隊列,等待時間%d秒%n", i , blockTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Consumer
package com.abc.demo;
import java.util.concurrent.BlockingQueue;
public class Consumer extends Thread {
private final BlockingQueue<Integer> queue; // 與Producer共用
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
long startTakeTime = System.currentTimeMillis();
int i = queue.take();
long endTakeTime = System.currentTimeMillis();
long blockTime = (endTakeTime - startTakeTime) / 1000;
System.out.printf("Consumer:第%d個元素已取出隊列,等待時間%d秒%n", i , blockTime);
Thread.sleep(5000); // 模擬每5秒消費1個
if (queue.isEmpty()) {
return;
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
測試
Main
package com.abc.demo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Main {
public static void main(String[] args) {
BlockingQueue<Integer> boundedBlockingQueue = new ArrayBlockingQueue<>(2);
Producer producer = new Producer(boundedBlockingQueue);
Consumer consumer = new Consumer(boundedBlockingQueue);
producer.start();
consumer.start();
}
}
測試結果印出以下。
Producer:第1個元素已放入隊列,等待時間0秒
Consumer:第1個元素已取出隊列,等待時間1秒
Producer:第2個元素已放入隊列,等待時間0秒
Producer:第3個元素已放入隊列,等待時間0秒
Consumer:第2個元素已取出隊列,等待時間0秒
Producer:第4個元素已放入隊列,等待時間2秒
Consumer:第3個元素已取出隊列,等待時間0秒
Producer:第5個元素已放入隊列,等待時間3秒
Consumer:第4個元素已取出隊列,等待時間0秒
Consumer:第5個元素已取出隊列,等待時間0秒
沒有留言:
張貼留言