網頁

2022/3/12

Golang 本機發送資料到Kinesis Data Firehose

本範例在locahost的Go應用程式以AWS的SDK aws-sdk-go-v2發送資料到Kinesis Data Firehose。


範例環境:

  • Go 1.17
  • github.com/aws/aws-sdk-go-v2/service/firehose v1.14.0


Prerequisites

參考「AWS 建立IAM管理使用者及credentials」設定存取AWS需要的IAM管理員credentials。

參考「AWS console建立Kinesis Data Firehose範例」建立一個名稱為PUT-s3-demo-bucket-202112151320 的Kinesis Data Firehose delivery stream。


下載AWS SDK Go V2 modules

在專案根目錄執行以下命令下載需要的aws-sdk-go-v2 modules。



發送資料到Kinesis Data Firehose

在Go程式中呼叫config.LoadDefaultConfig()傳入region參數建立aws.Conifg物件,AWS SDK預設會讀取$HOME/.aws/credentials的access keys來通過權限驗證。

Config參數傳入firehose.NewFromConfig()建立firehose.Client,然後建立firehose.PutRecordInput把要發送資料(data)放入Record,調用firehose.Client.PutRecord()發送到Kinesis Data Firehose的delivery stream。

main.go

package main

import (
    "context"
    "fmt"

    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/firehose"
    "github.com/aws/aws-sdk-go-v2/service/firehose/types"
)

func CreateInput(data string) *firehose.PutRecordInput {
    deliveryStream := "PUT-s3-demo-bucket-202112151320" // delivery stream name
    return &firehose.PutRecordInput{
        DeliveryStreamName: &deliveryStream,
        Record: &types.Record{
            Data: []byte(data),
        },
    }
}

func main() {
    ctx := context.TODO()
    cfg, err := config.LoadDefaultConfig(
        ctx,
        config.WithRegion("ap-northeast-1"),
    )
    if err != nil {
        panic(err)
    }

    client := firehose.NewFromConfig(cfg)

    data := "Hello world"
    input := CreateInput(data)
    out, err := client.PutRecord(ctx, input)
    if err != nil {
        fmt.Println(err.Error())
    }
    fmt.Println(out.RecordId) // 0xc000021ae0

}

github


測試

執行Go應用程式印出發送資料回傳的firehose.PutRecordOutput.RecordId(pointer)。

0xc000021ae0

在AWS console Kinesis Data Firehose delivery stream設定的目的地S3 bucket s3-demo-bucket-202112151320可看到送來的資料。



下載並以文字編輯器開啟內容如下。




沒有留言:

張貼留言