網頁

2021/12/25

Java 本機發送資料到Kinesis Data Streams

本範例在locahost的Java應用程式以AWS的SDK aws-sdk-java-v2發送資料到Kinesis Data Streams。


範例環境:

  • Java 11
  • Maven


事前要求

參考「AWS 建立IAM管理使用者及credentials」設定存取AWS需要的IAM管理員credentials。

參考「AWS console 建立Kinesis Data Stream範例」建立名稱為KinesisDataStreamDemo的Kinesis Data Stream。


下載AWS SDK for Java 2.x

在專案的pom.xml加入以下需要的AWS SDK for Java的Kinesis依賴。

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>bom</artifactId>
            <version>2.15.0</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependency>
    <groupId>software.amazon.awssdk</groupId>
    <artifactId>kinesis</artifactId>
</dependency>

設定好pom.xml在專案執行mvn install下載依賴。


發送資料到Kinesis Data Stream

在Java程式呼叫KinesisClient.builder()輸入region參數建立KinesisClient來存取Kinesis Data Stream。AWS SDK預設會讀取$HOME/.aws/credentials的access keys來通過權限驗證。然後調用KinesisClient.putRecord()把要發送的資料放入PutRecordRequest物件發送到Kinesis Data Streams。

Main.java

package com.abc.demo;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;

import java.nio.charset.StandardCharsets;

public class Main {

    public static void main(String[] arges) {
        KinesisClient kinesisClient = KinesisClient.builder()
                .region(Region.AP_NORTHEAST_1)
                .build();

        String data = "Hello world";

        PutRecordRequest putRecordRequest = PutRecordRequest.builder()
                .streamName("KinesisDataStreamDemo")
                .partitionKey("demo-001")
                .data(SdkBytes.fromString(data, StandardCharsets.UTF_8))
                .build();

        PutRecordResponse putRecordResponse = kinesisClient.putRecord(putRecordRequest);
        String sequenceNumber = putRecordResponse.sequenceNumber();
        System.out.println(sequenceNumber);
    }
}

github


測試

執行Java應用程式印出發送資料後返回的序列號(sequence number)。

49625171637511262628927173699067760533852647995979333634

在AWS console Kinesis Data Streams檢視KinesisDataStreamDemo stream的[Monitoring],在[Incoming Data]及[Put record]面板可看到送來的資料。




讀取資料

參考「Java 本機讀取Kinesis Data Streams的資料」。


沒有留言:

張貼留言