網頁

2021/12/24

Golang 本機發送資料到Kinesis Data Streams

在locahost的Go應用程式以AWS的SDK aws-sdk-go-v2發送資料到Kinesis Data Streams。


範例環境:

  • Go 1.17


事前要求

參考「AWS 建立IAM管理使用者及credentials」設定存取AWS需要的IAM管理員credentials。

參考「AWS console 建立Kinesis Data Stream範例」建立一個名稱為KinesisDataStreamDemo的Kinesis Data Stream。


下載AWS SDK Go V2 modules

在專案根目錄執行以下命令下載需要的aws-sdk-go-v2 modules。。



發送資料

在Go程式中呼叫config.LoadDefaultConfig()傳入region參數建立aws.Conifg物件,AWS SDK預設會讀取$HOME/.aws/credentials的access keys來通過權限驗證,然後依此參數建立kinesis.Client,然後調用kinesis.Client.PutRecord()把要發送資料(payload)放入kinesis.PutRecordInput發送到Kinesis Data Streams。

KinesisPutRecordAPI介面是用來在單元測試時可以mock kinesis.Client.PutRecord()的結果。

main.go

package main

import (
    "context"
    "fmt"

    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/kinesis"
)

type KinesisPutRecordAPI interface {
    PutRecord(ctx context.Context,
        params *kinesis.PutRecordInput,
        optFns ...func(*kinesis.Options)) (*kinesis.PutRecordOutput, error)
}

func MakePutRecord(
    ctx context.Context,
    api KinesisPutRecordAPI,
    input *kinesis.PutRecordInput) (*kinesis.PutRecordOutput, error) {

    return api.PutRecord(ctx, input)
}

func CreateInput(payload string) *kinesis.PutRecordInput {
    stream := "KinesisDataStreamDemo" // stream name
    partition := "demo-001"

    return &kinesis.PutRecordInput{
        Data:         []byte(payload),
        PartitionKey: &partition,
        StreamName:   &stream,
    }
}

func main() {
    ctx := context.TODO()
    cfg, err := config.LoadDefaultConfig(
        ctx,
        config.WithRegion("ap-northeast-1"),
    )
    if err != nil {
        panic(err)
    }

    client := kinesis.NewFromConfig(cfg)

    payload := "Hello world"
    input := CreateInput(payload)

    results, err := MakePutRecord(ctx, client, input)
    if err != nil {
        fmt.Println(err.Error())
    }
    fmt.Println(*results.SequenceNumber)
}

github


測試

執行Go應用程式印出發送資料的序列號(sequence number)。

49629408056911650938108365663703629406391528159210110978

在AWS console Kinesis Data Streams檢視KinesisDataStreamDemo stream的[Monitoring],在[Incoming Data]及[Put record]的面板可看到送來的資料。




沒有留言:

張貼留言