Go利用Redis實作簡單的「令牌桶限流器(Token bucket rate limiter)」。
注意,沒有詳細測試且未考慮周全。
main.go
package main
import (
"context"
"fmt"
"time"
"github.com/go-redis/redis/v9"
)
func main() {
ctx := context.Background()
rdb := NewRedisClient()
userId := "user123" // request's user id
reqCount := 10 // intense request count of the user
for i := 0; i < reqCount; i++ { // send intense requests
allowed, err := isAllowed(ctx, rdb, userId, time.Now())
if err != nil {
panic(err)
}
if allowed {
fmt.Println("pass")
} else {
fmt.Println("reject")
}
}
}
const MAX_TOKEN_COUNT int64 = 10
const MAX_RETRIES = 100
func isAllowed(ctx context.Context, rdb *redis.Client, userId string, currentTime time.Time) (bool, error) {
key := userId // e.g. key=user123
currUnixTime := currentTime.Unix()
currTokenCount, _ := rdb.HGet(ctx, key, "count").Int64()
lastUnixTime, _ := rdb.HGet(ctx, key, "time").Int64()
// initual bucket tokens
if currTokenCount == 0 && lastUnixTime == 0 {
rdb.HSet(ctx, key, "count", MAX_TOKEN_COUNT, "time", currUnixTime)
}
var pass bool
// go-redis transaction
txf := func(tx *redis.Tx) error {
lastTokenCount, _ := tx.HGet(ctx, key, "count").Int64()
lastUnixTime, _ := tx.HGet(ctx, key, "time").Int64()
currTokenCount = (currUnixTime-lastUnixTime)/2 + lastTokenCount // increment token per 2 second
// maximum bucket token count
if currTokenCount > MAX_TOKEN_COUNT {
currTokenCount = MAX_TOKEN_COUNT
}
currTokenCount-- // consume one token per request
pass = currTokenCount > -1
// minimum bucket token count
if currTokenCount < 0 {
currTokenCount = 0
}
tx.HSet(ctx, key, "count", currTokenCount, "time", currUnixTime)
return nil
}
// retry if the key has been changed
for i := 0; i < MAX_RETRIES; i++ {
err := rdb.Watch(ctx, txf, key)
if err == nil {
break // transaction success
}
if err == redis.TxFailedErr {
continue // optimistic lock lost, retry
}
return false, err
}
return pass, nil
}
func NewRedisClient() *redis.Client {
return redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "12345",
DB: 0, // use default DB
})
}
說明
main
函式中的for迴圈代表連續發送的請求,reqCount
控制發送次數。
isAllow
函式內即為令牌桶限流器(token bucket rate limiter),請求通過回傳true,反之回傳false。
常數MAX_TOKEN_COUNT
代表桶內的令牌(token)數上限。
常數MAX_RETRIES
代表redis交易樂觀索丟失重試次數。
isAllow
函式中,使用發送請求的客戶端的userId
作為redis的hash資料型態的key,而hash即代表令牌桶,也就是說每個使用者的令牌桶都是各自獨立的。
令牌桶內會存有兩個資訊,即hash的兩個欄位值,一個是key為"count"的令牌數(token count),一個是key為"time"的上次更新的Unix時間(Unix time)。
若令牌桶取出的token數為0,且上一次更新時間為0,代表令牌桶第一次被使用,所以用HSET
將桶子token填滿及更新時間。
func(tx *redis.Tx) error
函式是go-redis的一次交易的範圍。一開始先取得上一次請求的token數和更新時間。
設定token自動增長速率為每2秒桶內增加1個令牌。所以(currUnixTime-lastUnixTime)/2
代表目前請求時間和上次請求時間的差異秒數除以2,即為時間內的令牌增長數量。增長的令牌數加上桶內原有的令牌數代表目前的令牌數,但令牌數不可超過設定的令牌數上限MAX_TOKEN_COUNT
。
currTokenCount--
代表每次請求扣除一個令牌。扣除令牌後若數量大於-1,代表此次請求有令牌可以通過,反之拒絕。
桶內令牌數不可為負,所以扣除令牌數後若少於0,必須重設為0。
將更新後的令牌數和時間設定回hash。
沒有留言:
張貼留言