網頁

2022/3/14

Golang goroutine send record to Kinesis Data Firehose

本範例在go web以HTTP POST新增資料時用goroutine發送已新增的資料到AWS Kinesis Data Firehose


範例環境:

  • Go 1.17
  • github.com/aws/aws-sdk-go-v2 v1.15.0
  • github.com/aws/aws-sdk-go-v2/service/firehose v1.14.0


事前要求

參考「AWS console建立Kinesis Data Firehose範例」建立一個名稱為PUT-s3-demo-bucket-202112151320 的Kinesis Data Firehose delivery stream。


範例

下面Save()用計數器模擬新增資料後產生的ID,然後發送訊息到Firehose。

main.go

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "net/http"
    "sync/atomic"

    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/firehose"
    "github.com/aws/aws-sdk-go-v2/service/firehose/types"
)

type Employee struct {
    ID   int64
    Name string
    Age  int
}

func main() {
    http.HandleFunc("/employee", func(rw http.ResponseWriter, r *http.Request) {
        if r.Method == http.MethodPost {
            var emp Employee
            decoder := json.NewDecoder(r.Body)
            err := decoder.Decode(&emp)
            if err != nil {
                http.Error(rw, "error", http.StatusBadRequest)
            }

            id := Save(&emp)

            data := fmt.Sprintf("employee.id=%d created\n", id)
            go PutRecord(context.TODO(), data)

            fmt.Fprint(rw, id)
        } else {
            http.Error(rw, "not allowed", http.StatusMethodNotAllowed)
        }
    })

    http.ListenAndServe(":8080", nil)
}

var count int64

func Save(emp *Employee) int64 {
    fmt.Printf("save employee=%v\n", emp)
    return atomic.AddInt64(&count, 1)
}

func PutRecord(ctx context.Context, data string) {
    fmt.Printf("put data=%s\n", data)
    cfg, err := config.LoadDefaultConfig(
        ctx,
        config.WithRegion("ap-northeast-1"),
    )
    if err != nil {
        panic(err)
    }

    client := firehose.NewFromConfig(cfg)

    input := CreateInput(data)
    out, err := client.PutRecord(ctx, input)
    if err != nil {
        fmt.Println("put record error")
    }
    fmt.Printf("recordId=%s\n", *out.RecordId)
}

func CreateInput(data string) *firehose.PutRecordInput {
    deliveryStream := "PUT-s3-demo-bucket-202112151320"
    return &firehose.PutRecordInput{
        DeliveryStreamName: &deliveryStream,
        Record: &types.Record{
            Data: []byte(data),
        },
    }
}

github


以上每次有個請求就會開一個goroutine,若短時間內有大量請求可能會造成問題。


測試

啟動應用程式後在命令列以cURL發出以下兩個請求。

$ curl -X POST "http://localhost:8080/employee" -H 'content-type: application/json' d '{"name": "john", "age": 33}'
$ curl -X POST "http://localhost:8080/employee" -H 'content-type: application/json' d '{"name": "mary", "age": 28}'

在console印出以下。

save employee=&{0 john 33}
put data=employee.id=1 created
recordId=4oQN2jYtIiti4ITW8lH4MiRBAmyDv7OVkwkfxRW16RT0H5tAMPoXwJjiXP1tiW9VVHrfh9UVA5oNOhoqezv0RNce7OjlbMY8sBr7lOXZXmZ9NYVcLhJL0FmjshHYbhpr/Wb9b7nOP/Vad0hU18xg+caZtzysAED30zMKPmi6xQ75CdLFQLJrZBasBKg7uxtLzeDUhFC52lAFuQpmYu1DMAR+d+HOqzVj
save employee=&{0 mary 28}
put data=employee.id=2 created
recordId=rOIPvA2gjfDTTeO+G7XorT3TIC2uMHs9SN+DDLcWzAyCqgXf+uNCdZd4vJ4xkiHd+Hu5FRL49W0EmmXq7ltcZ4aaa4GSa7kdLvW/+QeZ/G7TXjSrbGkXh08U8o9K6jB7oEEzRfhi/6qRN4tWes0bI0SHtrR6kF1WWu44I1EJsSLDdjX864Fn8NPw1kOBgVOfapLjT1zyLSsfhXEZ5TS/MOZhOB6OnESN

在AWS console Kinesis Data Firhose delivery stream PUT-s3-demo-bucket-202112151320Info設定的目的地S3 bucket s3-demo-bucket-2021121513205分鐘後可看到送來的資料。



下載並以文字編輯器開啟內容如下。




沒有留言:

張貼留言