本範例在locahost的Java應用程式以AWS的SDK aws-sdk-java-v2
從Kinesis Data Streams取得資料。
範例環境:
- Java 11
- Maven
事前要求
參考「Java 從本機發送資料到Kinesis Data Streams」來發送資料到KinesisDataStreamDemo
。
下載AWS SDK for Java 2.x
在專案的pom.xml
加入以下需要的AWS SDK for Java的Kinesis依賴。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.15.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kinesis</artifactId>
</dependency>
設定好pom.xml
在專案執行mvn install
下載依賴。
從Kinesis Data Stream取得資料
在Java程式呼叫KinesisClient.builder()
輸入region參數建立KinesisClient
來存取Kinesis Data Stream。AWS SDK預設會讀取$HOME/.aws/credentials
的access keys來通過權限驗證。
調用KinesisClient.describeStream()
並傳入帶有stream名稱的DescribeStreamRequest
物件取得DescribeStreamResponse
。
從DescribeStreamResponse
取得最後一筆shard的id,以此為參數建立GetShardIteratorRequest
物件傳入KinesisClient.getShardIterator()
取得GetShardIteratorResponse
。
建立GetRecordsRequest
物件並傳入KinesisClient.getRecords()
取得GetRecordsResponse
物件,遍歷並印出其中全部Record
的內容。
Main.java
package com.abc.demo;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.*;
import java.util.ArrayList;
import java.util.List;
public class Main {
public static void main(String[] arges) {
KinesisClient kinesisClient = KinesisClient.builder()
.region(Region.AP_NORTHEAST_1)
.build();
getDataFromKinesisDataStream(kinesisClient, "KinesisDataStreamDemo");
kinesisClient.close();
}
public static void getDataFromKinesisDataStream(KinesisClient kinesisClient, String streamName) {
DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
.streamName(streamName)
.build();
DescribeStreamResponse streamResponse = kinesisClient.describeStream(describeStreamRequest);
List<Shard> shards = new ArrayList<>(streamResponse.streamDescription().shards());
String lastShardId = shards.get(shards.size() - 1).shardId();
GetShardIteratorRequest itReq = GetShardIteratorRequest.builder()
.streamName(streamName)
.shardIteratorType(ShardIteratorType.TRIM_HORIZON)
.shardId(lastShardId)
.build();
GetShardIteratorResponse shardIteratorResult = kinesisClient.getShardIterator(itReq);
String shardIterator = shardIteratorResult.shardIterator();
GetRecordsRequest recordsRequest = GetRecordsRequest.builder()
.shardIterator(shardIterator)
.limit(1000) // Set maximum records to return to 1000.
.build();
GetRecordsResponse result = kinesisClient.getRecords(recordsRequest);
List<Record> records = result.records();
for (Record record : records) {
System.out.println(new String(record.data().asByteArray()));
}
}
}
測試
執行Java應用程式印出Kinesis Data Streams的資料。
Hello world
在AWS console Kinesis Data Streams檢視KinesisDataStreamDemo
stream的[Monitoring],在[Get records]面板可看到送出的資料。
沒有留言:
張貼留言