本範例在locahost的Java應用程式以AWS的SDK aws-sdk-java-v2
發送資料到Kinesis Data Streams。
範例環境:
- Java 11
- Maven
事前要求
參考「AWS 建立IAM管理使用者及credentials」設定存取AWS需要的IAM管理員credentials。
參考「AWS console 建立Kinesis Data Stream範例」建立名稱為KinesisDataStreamDemo
的Kinesis Data Stream。
下載AWS SDK for Java 2.x
在專案的pom.xml
加入以下需要的AWS SDK for Java的Kinesis依賴。
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.15.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>kinesis</artifactId>
</dependency>
設定好pom.xml
在專案執行mvn install
下載依賴。
發送資料到Kinesis Data Stream
在Java程式呼叫KinesisClient.builder()
輸入region參數建立KinesisClient
來存取Kinesis Data Stream。AWS SDK預設會讀取$HOME/.aws/credentials
的access keys來通過權限驗證。然後調用KinesisClient.putRecord()
把要發送的資料放入PutRecordRequest
物件發送到Kinesis Data Streams。
Main.java
package com.abc.demo;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
import java.nio.charset.StandardCharsets;
public class Main {
public static void main(String[] arges) {
KinesisClient kinesisClient = KinesisClient.builder()
.region(Region.AP_NORTHEAST_1)
.build();
String data = "Hello world";
PutRecordRequest putRecordRequest = PutRecordRequest.builder()
.streamName("KinesisDataStreamDemo")
.partitionKey("demo-001")
.data(SdkBytes.fromString(data, StandardCharsets.UTF_8))
.build();
PutRecordResponse putRecordResponse = kinesisClient.putRecord(putRecordRequest);
String sequenceNumber = putRecordResponse.sequenceNumber();
System.out.println(sequenceNumber);
}
}
測試
執行Java應用程式印出發送資料後返回的序列號(sequence number)。
49625171637511262628927173699067760533852647995979333634
在AWS console Kinesis Data Streams檢視KinesisDataStreamDemo
stream的[Monitoring],在[Incoming Data]及[Put record]面板可看到送來的資料。
讀取資料
參考「Java 本機讀取Kinesis Data Streams的資料」。
沒有留言:
張貼留言