網頁

2021/12/25

Java 本機讀取Kinesis Data Streams的資料

本範例在locahost的Java應用程式以AWS的SDK aws-sdk-java-v2從Kinesis Data Streams取得資料。


範例環境:

  • Java 11
  • Maven


事前要求

參考「Java 從本機發送資料到Kinesis Data Streams」來發送資料到KinesisDataStreamDemo


下載AWS SDK for Java 2.x

在專案的pom.xml加入以下需要的AWS SDK for Java的Kinesis依賴。

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>bom</artifactId>
            <version>2.15.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>kinesis</artifactId>
</dependency>

設定好pom.xml在專案執行mvn install下載依賴。


從Kinesis Data Stream取得資料

在Java程式呼叫KinesisClient.builder()輸入region參數建立KinesisClient來存取Kinesis Data Stream。AWS SDK預設會讀取$HOME/.aws/credentials的access keys來通過權限驗證。

調用KinesisClient.describeStream()並傳入帶有stream名稱的DescribeStreamRequest物件取得DescribeStreamResponse

DescribeStreamResponse取得最後一筆shard的id,以此為參數建立GetShardIteratorRequest物件傳入KinesisClient.getShardIterator()取得GetShardIteratorResponse

建立GetRecordsRequest物件並傳入KinesisClient.getRecords()取得GetRecordsResponse物件,遍歷並印出其中全部Record的內容。

Main.java

package com.abc.demo;

import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.*;

import java.util.ArrayList;
import java.util.List;

public class Main {

    public static void main(String[] arges) {
        KinesisClient kinesisClient = KinesisClient.builder()
                .region(Region.AP_NORTHEAST_1)
                .build();
        getDataFromKinesisDataStream(kinesisClient, "KinesisDataStreamDemo");
        kinesisClient.close();
    }

    public static void getDataFromKinesisDataStream(KinesisClient kinesisClient, String streamName) {
        DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
                .streamName(streamName)
                .build();

        DescribeStreamResponse streamResponse = kinesisClient.describeStream(describeStreamRequest);
        List<Shard> shards = new ArrayList<>(streamResponse.streamDescription().shards());
        String lastShardId = shards.get(shards.size() - 1).shardId();

        GetShardIteratorRequest itReq = GetShardIteratorRequest.builder()
                .streamName(streamName)
                .shardIteratorType(ShardIteratorType.TRIM_HORIZON)
                .shardId(lastShardId)
                .build();

        GetShardIteratorResponse shardIteratorResult = kinesisClient.getShardIterator(itReq);
        String shardIterator = shardIteratorResult.shardIterator();

        GetRecordsRequest recordsRequest = GetRecordsRequest.builder()
                .shardIterator(shardIterator)
                .limit(1000) // Set maximum records to return to 1000.
                .build();

        GetRecordsResponse result = kinesisClient.getRecords(recordsRequest);

        List<Record> records = result.records();
        for (Record record : records) {
            System.out.println(new String(record.data().asByteArray()));
        }
    }

}

github


測試

執行Java應用程式印出Kinesis Data Streams的資料。

Hello world

在AWS console Kinesis Data Streams檢視KinesisDataStreamDemo stream的[Monitoring],在[Get records]面板可看到送出的資料。





沒有留言:

張貼留言