網頁

2021/1/22

Java BlockingQueue 簡介

Java集合的BlockingQueue簡介。

Java的BlockingQueue介面屬於Java集合框架(Java Collections Framework)的一員,繼承Queue介面。

+---------------+
|  Collection   |
+---------------+
        ^
     extennds
        |
+---------------+
|     Queue     |
+---------------+
        ^
     extends
        |
+---------------+
| BlockingQueue |
+---------------+

BlockingQueue位在java.util.concurrent包,下有多個併行集合(Concurrent Collections)的實作類別:


BlockingQueue除了具有Queue先進先出(FIFO)特性及操作元素進出的方法,還提供多執行緒等待元素進出的操作。

一般Queue在放入或取出元素失敗時立即會拋出例外或返回特定值;BlockingQueue則多了執行緒在放入或取出元素無法立即滿足時會先被block(阻塞),直到該操作在之後可以成功。

下面是BlockingQueue各種操作元素進出的方法。紅色部分為多執行緒的阻塞操作。

拋出例外回傳特定值阻塞(Blocks)逾時
Insert 插入add(e)offer(e)put(e)offer(e, time, unit)
Remove 刪除(取出)remove()poll()take()poll(time, unit)
Examine 檢視element()peek()不適用不適用

put(e)如果隊列滿了插入失敗,則該執行緒會被阻塞直到有空間為止。

offer(e, time, unit)如果隊列滿了插入失敗,則該執行緒會被阻塞一段指定時間直到有空間,若超過時間仍無空間則返回false。

take()如果隊列是空的取出失敗,則該執行緒會被阻塞直到有元素加入。

poll(time, unit)如果隊列是空的取出失敗,則該執行緒會被阻塞一段指定時間直到有元素加入,若超過時間仍無空間則返回null。


BlockingQueue又分為有界隊列(bounded blocking queue)及無界隊列(unbounded blocking queue),差別為有無最大空間限制。
有界隊列空間若達到上限(空間滿了),當用put(e)放入新元素至隊列時會被阻塞;
無界隊列理論上沒有空間限制,實際上為Integer.MAX_VALUE(231-1),所以幾乎不會阻塞。

BlockingQueue的實作都是執行緒安全的(thread-safe),隊列的操作方法皆為原子操作,但屬於Collection的批次操作如addAllcontainsAllretainAllremoveAll 則不保證。

BlockingQueue的實作主要用在生產者-消費者情境的隊列。典型的範例為Java任務執行器ThreadPoolExecutor中的任務隊列。


下面以生產者-消費者演示BlockingQueue的阻塞效果。ProducerConsumer為執行緒,Producer生產東西放入BlockingQueue交給Consumer消費。Producer生產速度較快,每1秒生產一個;Consumer消費速度較慢,每5秒消費一個。BlockingQueue隊列空間為2。由於生產速度超過消費速度,可以看到隊列滿的時候生產者必須等待(被阻塞)。直到消費者消費完在從隊列中取出消費。

Producer

package com.abc.demo;

import java.util.concurrent.BlockingQueue;

public class Producer extends Thread {

    private final BlockingQueue<Integer> queue; // 與Consumer共用

    public Producer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        for (int i = 1; i <= 5; i++) {
            try {
                Thread.sleep(1000); // 模擬每1秒生產1個

                long startPutTime = System.currentTimeMillis();
                queue.put(i);
                long endPutTime = System.currentTimeMillis();

                long blockTime = (endPutTime - startPutTime) / 1000;
                System.out.printf("Producer:第%d個元素已放入隊列,等待時間%d秒%n", i , blockTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}

Consumer

package com.abc.demo;

import java.util.concurrent.BlockingQueue;

public class Consumer extends Thread {

    private final BlockingQueue<Integer> queue; // 與Producer共用

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                long startTakeTime = System.currentTimeMillis();
                int i = queue.take();
                long endTakeTime = System.currentTimeMillis();

                long blockTime = (endTakeTime - startTakeTime) / 1000;
                System.out.printf("Consumer:第%d個元素已取出隊列,等待時間%d秒%n", i , blockTime);

                Thread.sleep(5000); // 模擬每5秒消費1個

                if (queue.isEmpty()) {
                    return;
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

測試

Main

package com.abc.demo;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Main {

    public static void main(String[] args) {
        
        BlockingQueue<Integer> boundedBlockingQueue = new ArrayBlockingQueue<>(2);
        Producer producer = new Producer(boundedBlockingQueue);
        Consumer consumer = new Consumer(boundedBlockingQueue);

        producer.start();
        consumer.start();
    }

}

測試結果印出以下。

Producer:第1個元素已放入隊列,等待時間0秒
Consumer:第1個元素已取出隊列,等待時間1秒
Producer:第2個元素已放入隊列,等待時間0秒
Producer:第3個元素已放入隊列,等待時間0秒
Consumer:第2個元素已取出隊列,等待時間0秒
Producer:第4個元素已放入隊列,等待時間2秒
Consumer:第3個元素已取出隊列,等待時間0秒
Producer:第5個元素已放入隊列,等待時間3秒
Consumer:第4個元素已取出隊列,等待時間0秒
Consumer:第5個元素已取出隊列,等待時間0秒


沒有留言:

張貼留言