mirror of
https://github.com/hibiken/asynq.git
synced 2025-08-19 15:08:55 +08:00
Update redis/go-redis to v9
Version v9 implements the support for Redis v7 and has some other improvements.
This commit is contained in:
committed by
Ken Hibino
parent
cc777ebdaa
commit
0275df8df4
@@ -10,7 +10,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/errors"
|
||||
"github.com/spf13/cast"
|
||||
|
@@ -12,7 +12,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/google/uuid"
|
||||
@@ -20,6 +19,7 @@ import (
|
||||
"github.com/hibiken/asynq/internal/errors"
|
||||
h "github.com/hibiken/asynq/internal/testutil"
|
||||
"github.com/hibiken/asynq/internal/timeutil"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func TestAllQueues(t *testing.T) {
|
||||
@@ -250,7 +250,7 @@ func TestCurrentStats(t *testing.T) {
|
||||
},
|
||||
oldestPendingMessageEnqueueTime: map[string]time.Time{
|
||||
"default": now.Add(-15 * time.Second),
|
||||
"critical": time.Time{}, // zero value since there's no pending task in this queue
|
||||
"critical": {}, // zero value since there's no pending task in this queue
|
||||
"low": now.Add(-30 * time.Second),
|
||||
},
|
||||
paused: []string{"critical", "low"},
|
||||
@@ -392,7 +392,6 @@ func TestHistoricalStats(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestRedisInfo(t *testing.T) {
|
||||
@@ -487,7 +486,7 @@ func TestGroupStats(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
var sortGroupStatsOpt = cmp.Transformer(
|
||||
sortGroupStatsOpt := cmp.Transformer(
|
||||
"SortGroupStats",
|
||||
func(in []*GroupStat) []*GroupStat {
|
||||
out := append([]*GroupStat(nil), in...)
|
||||
@@ -1509,7 +1508,6 @@ func TestListCompleted(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestListCompletedPagination(t *testing.T) {
|
||||
@@ -2324,7 +2322,6 @@ func TestRunTaskError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestRunAllScheduledTasks(t *testing.T) {
|
||||
@@ -3335,6 +3332,7 @@ func TestArchiveTaskError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestArchiveAllPendingTasks(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
|
@@ -11,11 +11,11 @@ import (
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/google/uuid"
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/errors"
|
||||
"github.com/hibiken/asynq/internal/timeutil"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/spf13/cast"
|
||||
)
|
||||
|
||||
@@ -152,7 +152,7 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
|
||||
var enqueueUniqueCmd = redis.NewScript(`
|
||||
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
|
||||
if not ok then
|
||||
return -1
|
||||
return -1
|
||||
end
|
||||
if redis.call("EXISTS", KEYS[2]) == 1 then
|
||||
return 0
|
||||
@@ -1319,9 +1319,9 @@ func (r *RDB) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.Task
|
||||
// It returns a new expiration time if the operation was successful.
|
||||
func (r *RDB) ExtendLease(qname string, ids ...string) (expirationTime time.Time, err error) {
|
||||
expireAt := r.clock.Now().Add(LeaseDuration)
|
||||
var zs []*redis.Z
|
||||
var zs []redis.Z
|
||||
for _, id := range ids {
|
||||
zs = append(zs, &redis.Z{Member: id, Score: float64(expireAt.Unix())})
|
||||
zs = append(zs, redis.Z{Member: id, Score: float64(expireAt.Unix())})
|
||||
}
|
||||
// Use XX option to only update elements that already exist; Don't add new elements
|
||||
// TODO: Consider adding GT option to ensure we only "extend" the lease. Ceveat is that GT is supported from redis v6.2.0 or above.
|
||||
@@ -1367,10 +1367,10 @@ func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo
|
||||
}
|
||||
skey := base.ServerInfoKey(info.Host, info.PID, info.ServerID)
|
||||
wkey := base.WorkersKey(info.Host, info.PID, info.ServerID)
|
||||
if err := r.client.ZAdd(ctx, base.AllServers, &redis.Z{Score: float64(exp.Unix()), Member: skey}).Err(); err != nil {
|
||||
if err := r.client.ZAdd(ctx, base.AllServers, redis.Z{Score: float64(exp.Unix()), Member: skey}).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||
}
|
||||
if err := r.client.ZAdd(ctx, base.AllWorkers, &redis.Z{Score: float64(exp.Unix()), Member: wkey}).Err(); err != nil {
|
||||
if err := r.client.ZAdd(ctx, base.AllWorkers, redis.Z{Score: float64(exp.Unix()), Member: wkey}).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zadd", Err: err})
|
||||
}
|
||||
return r.runScript(ctx, op, writeServerStateCmd, []string{skey, wkey}, args...)
|
||||
@@ -1423,7 +1423,7 @@ func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.Schedule
|
||||
}
|
||||
exp := r.clock.Now().Add(ttl).UTC()
|
||||
key := base.SchedulerEntriesKey(schedulerID)
|
||||
err := r.client.ZAdd(ctx, base.AllSchedulers, &redis.Z{Score: float64(exp.Unix()), Member: key}).Err()
|
||||
err := r.client.ZAdd(ctx, base.AllSchedulers, redis.Z{Score: float64(exp.Unix()), Member: key}).Err()
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zadd", Err: err})
|
||||
}
|
||||
|
@@ -15,7 +15,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/google/uuid"
|
||||
@@ -23,6 +22,7 @@ import (
|
||||
"github.com/hibiken/asynq/internal/errors"
|
||||
h "github.com/hibiken/asynq/internal/testutil"
|
||||
"github.com/hibiken/asynq/internal/timeutil"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// variables used for package testing.
|
||||
@@ -1272,7 +1272,6 @@ func TestAddToGroupeTaskIdConflictError(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestAddToGroupUnique(t *testing.T) {
|
||||
@@ -1356,7 +1355,6 @@ func TestAddToGroupUnique(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestAddToGroupUniqueTaskIdConflictError(t *testing.T) {
|
||||
@@ -1398,7 +1396,6 @@ func TestAddToGroupUniqueTaskIdConflictError(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestSchedule(t *testing.T) {
|
||||
|
Reference in New Issue
Block a user