網頁

2024/5/3

Golang Redis 令牌桶限流器 token bucket rate limiter

Go利用Redis實作簡單的「令牌桶限流器(Token bucket rate limiter)」。


注意,沒有詳細測試且未考慮周全。

main.go

package main

import (
    "context"
    "fmt"
    "time"

    "github.com/go-redis/redis/v9"
)

func main() {
    ctx := context.Background()
    rdb := NewRedisClient()

    userId := "user123" // request's user id
    reqCount := 10      // intense request count of the user

    for i := 0; i < reqCount; i++ { // send intense requests
        allowed, err := isAllowed(ctx, rdb, userId, time.Now())
        if err != nil {
            panic(err)
        }
        if allowed {
            fmt.Println("pass")
        } else {
            fmt.Println("reject")
        }
    }

}

const MAX_TOKEN_COUNT int64 = 10
const MAX_RETRIES = 100

func isAllowed(ctx context.Context, rdb *redis.Client, userId string, currentTime time.Time) (bool, error) {
    key := userId // e.g. key=user123
    currUnixTime := currentTime.Unix()

    currTokenCount, _ := rdb.HGet(ctx, key, "count").Int64()
    lastUnixTime, _ := rdb.HGet(ctx, key, "time").Int64()

    // initual bucket tokens
    if currTokenCount == 0 && lastUnixTime == 0 {
        rdb.HSet(ctx, key, "count", MAX_TOKEN_COUNT, "time", currUnixTime)
    }
    var pass bool
    
    // go-redis transaction
    txf := func(tx *redis.Tx) error {
        lastTokenCount, _ := tx.HGet(ctx, key, "count").Int64()
        lastUnixTime, _ := tx.HGet(ctx, key, "time").Int64()

        currTokenCount = (currUnixTime-lastUnixTime)/2 + lastTokenCount // increment token per 2 second

        // maximum bucket token count
        if currTokenCount > MAX_TOKEN_COUNT {
            currTokenCount = MAX_TOKEN_COUNT
        }

        currTokenCount-- // consume one token per request
        pass = currTokenCount > -1

        // minimum bucket token count
        if currTokenCount < 0 {
            currTokenCount = 0
        }

        tx.HSet(ctx, key, "count", currTokenCount, "time", currUnixTime)
        return nil
    }

    // retry if the key has been changed
    for i := 0; i < MAX_RETRIES; i++ {
        err := rdb.Watch(ctx, txf, key)
        if err == nil {
            break // transaction success
        }
        if err == redis.TxFailedErr {
            continue // optimistic lock lost, retry
        }

        return false, err
    }

    return pass, nil
}

func NewRedisClient() *redis.Client {
    return redis.NewClient(&redis.Options{
        Addr:     "localhost:6379",
        Password: "12345",
        DB:       0, // use default DB
    })
}

github



說明

main函式中的for迴圈代表連續發送的請求,reqCount控制發送次數。

isAllow函式內即為令牌桶限流器(token bucket rate limiter),請求通過回傳true,反之回傳false。

常數MAX_TOKEN_COUNT代表桶內的令牌(token)數上限。

常數MAX_RETRIES代表redis交易樂觀索丟失重試次數。

isAllow函式中,使用發送請求的客戶端的userId作為redis的hash資料型態的key,而hash即代表令牌桶,也就是說每個使用者的令牌桶都是各自獨立的。

令牌桶內會存有兩個資訊,即hash的兩個欄位值,一個是key為"count"的令牌數(token count),一個是key為"time"的上次更新的Unix時間(Unix time)。

若令牌桶取出的token數為0,且上一次更新時間為0,代表令牌桶第一次被使用,所以用HSET將桶子token填滿及更新時間。

func(tx *redis.Tx) error函式是go-redis的一次交易的範圍。一開始先取得上一次請求的token數和更新時間。

設定token自動增長速率為每2秒桶內增加1個令牌。所以(currUnixTime-lastUnixTime)/2代表目前請求時間和上次請求時間的差異秒數除以2,即為時間內的令牌增長數量。增長的令牌數加上桶內原有的令牌數代表目前的令牌數,但令牌數不可超過設定的令牌數上限MAX_TOKEN_COUNT

currTokenCount--代表每次請求扣除一個令牌。扣除令牌後若數量大於-1,代表此次請求有令牌可以通過,反之拒絕。

桶內令牌數不可為負,所以扣除令牌數後若少於0,必須重設為0。

將更新後的令牌數和時間設定回hash。


沒有留言:

張貼留言