本範例在go web以HTTP POST新增資料時用goroutine發送已新增的資料到AWS Kinesis Data Firehose。
範例環境:
- Go 1.17
- github.com/aws/aws-sdk-go-v2 v1.15.0
- github.com/aws/aws-sdk-go-v2/service/firehose v1.14.0
事前要求
參考「AWS console建立Kinesis Data Firehose範例」建立一個名稱為PUT-s3-demo-bucket-202112151320
的Kinesis Data Firehose delivery stream。
範例
下面Save()
用計數器模擬新增資料後產生的ID,然後發送訊息到Firehose。
main.go
package main
import (
"context"
"encoding/json"
"fmt"
"net/http"
"sync/atomic"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/firehose"
"github.com/aws/aws-sdk-go-v2/service/firehose/types"
)
type Employee struct {
ID int64
Name string
Age int
}
func main() {
http.HandleFunc("/employee", func(rw http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
var emp Employee
decoder := json.NewDecoder(r.Body)
err := decoder.Decode(&emp)
if err != nil {
http.Error(rw, "error", http.StatusBadRequest)
}
id := Save(&emp)
data := fmt.Sprintf("employee.id=%d created\n", id)
go PutRecord(context.TODO(), data)
fmt.Fprint(rw, id)
} else {
http.Error(rw, "not allowed", http.StatusMethodNotAllowed)
}
})
http.ListenAndServe(":8080", nil)
}
var count int64
func Save(emp *Employee) int64 {
fmt.Printf("save employee=%v\n", emp)
return atomic.AddInt64(&count, 1)
}
func PutRecord(ctx context.Context, data string) {
fmt.Printf("put data=%s\n", data)
cfg, err := config.LoadDefaultConfig(
ctx,
config.WithRegion("ap-northeast-1"),
)
if err != nil {
panic(err)
}
client := firehose.NewFromConfig(cfg)
input := CreateInput(data)
out, err := client.PutRecord(ctx, input)
if err != nil {
fmt.Println("put record error")
}
fmt.Printf("recordId=%s\n", *out.RecordId)
}
func CreateInput(data string) *firehose.PutRecordInput {
deliveryStream := "PUT-s3-demo-bucket-202112151320"
return &firehose.PutRecordInput{
DeliveryStreamName: &deliveryStream,
Record: &types.Record{
Data: []byte(data),
},
}
}
以上每次有個請求就會開一個goroutine,若短時間內有大量請求可能會造成問題。
測試
啟動應用程式後在命令列以cURL發出以下兩個請求。
$ curl -X POST "http://localhost:8080/employee" -H 'content-type: application/json' d '{"name": "john", "age": 33}'
$ curl -X POST "http://localhost:8080/employee" -H 'content-type: application/json' d '{"name": "mary", "age": 28}'
在console印出以下。
save employee=&{0 john 33}
put data=employee.id=1 created
recordId=4oQN2jYtIiti4ITW8lH4MiRBAmyDv7OVkwkfxRW16RT0H5tAMPoXwJjiXP1tiW9VVHrfh9UVA5oNOhoqezv0RNce7OjlbMY8sBr7lOXZXmZ9NYVcLhJL0FmjshHYbhpr/Wb9b7nOP/Vad0hU18xg+caZtzysAED30zMKPmi6xQ75CdLFQLJrZBasBKg7uxtLzeDUhFC52lAFuQpmYu1DMAR+d+HOqzVj
save employee=&{0 mary 28}
put data=employee.id=2 created
recordId=rOIPvA2gjfDTTeO+G7XorT3TIC2uMHs9SN+DDLcWzAyCqgXf+uNCdZd4vJ4xkiHd+Hu5FRL49W0EmmXq7ltcZ4aaa4GSa7kdLvW/+QeZ/G7TXjSrbGkXh08U8o9K6jB7oEEzRfhi/6qRN4tWes0bI0SHtrR6kF1WWu44I1EJsSLDdjX864Fn8NPw1kOBgVOfapLjT1zyLSsfhXEZ5TS/MOZhOB6OnESN
在AWS console Kinesis Data Firhose delivery stream PUT-s3-demo-bucket-202112151320Info
設定的目的地S3 bucket s3-demo-bucket-202112151320
5分鐘後可看到送來的資料。
下載並以文字編輯器開啟內容如下。
沒有留言:
張貼留言