mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-24 10:36:12 +08:00
Merge branch 'master' into mongo
This commit is contained in:
@@ -14,16 +14,16 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/golang/protobuf/ptypes"
|
||||
"github.com/hibiken/asynq/internal/errors"
|
||||
pb "github.com/hibiken/asynq/internal/proto"
|
||||
"github.com/hibiken/asynq/internal/timeutil"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
// Version of asynq library and CLI.
|
||||
const Version = "0.23.0"
|
||||
const Version = "0.24.1"
|
||||
|
||||
// DefaultQueueName is the queue name used if none are specified by user.
|
||||
const DefaultQueueName = "default"
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/errors"
|
||||
"github.com/spf13/cast"
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/google/uuid"
|
||||
@@ -20,6 +19,7 @@ import (
|
||||
"github.com/hibiken/asynq/internal/errors"
|
||||
h "github.com/hibiken/asynq/internal/testutil"
|
||||
"github.com/hibiken/asynq/internal/timeutil"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func TestAllQueues(t *testing.T) {
|
||||
@@ -73,11 +73,11 @@ func TestCurrentStats(t *testing.T) {
|
||||
allGroups map[string][]string
|
||||
pending map[string][]string
|
||||
active map[string][]string
|
||||
scheduled map[string][]*redis.Z
|
||||
retry map[string][]*redis.Z
|
||||
archived map[string][]*redis.Z
|
||||
completed map[string][]*redis.Z
|
||||
groups map[string][]*redis.Z
|
||||
scheduled map[string][]redis.Z
|
||||
retry map[string][]redis.Z
|
||||
archived map[string][]redis.Z
|
||||
completed map[string][]redis.Z
|
||||
groups map[string][]redis.Z
|
||||
processed map[string]int
|
||||
failed map[string]int
|
||||
processedTotal map[string]int
|
||||
@@ -111,7 +111,7 @@ func TestCurrentStats(t *testing.T) {
|
||||
base.ActiveKey("critical"): {},
|
||||
base.ActiveKey("low"): {},
|
||||
},
|
||||
scheduled: map[string][]*redis.Z{
|
||||
scheduled: map[string][]redis.Z{
|
||||
base.ScheduledKey("default"): {
|
||||
{Member: m3.ID, Score: float64(now.Add(time.Hour).Unix())},
|
||||
{Member: m4.ID, Score: float64(now.Unix())},
|
||||
@@ -119,22 +119,22 @@ func TestCurrentStats(t *testing.T) {
|
||||
base.ScheduledKey("critical"): {},
|
||||
base.ScheduledKey("low"): {},
|
||||
},
|
||||
retry: map[string][]*redis.Z{
|
||||
retry: map[string][]redis.Z{
|
||||
base.RetryKey("default"): {},
|
||||
base.RetryKey("critical"): {},
|
||||
base.RetryKey("low"): {},
|
||||
},
|
||||
archived: map[string][]*redis.Z{
|
||||
archived: map[string][]redis.Z{
|
||||
base.ArchivedKey("default"): {},
|
||||
base.ArchivedKey("critical"): {},
|
||||
base.ArchivedKey("low"): {},
|
||||
},
|
||||
completed: map[string][]*redis.Z{
|
||||
completed: map[string][]redis.Z{
|
||||
base.CompletedKey("default"): {},
|
||||
base.CompletedKey("critical"): {},
|
||||
base.CompletedKey("low"): {},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "sms:user1"): {
|
||||
{Member: m7.ID, Score: float64(now.Add(-3 * time.Second).Unix())},
|
||||
},
|
||||
@@ -205,7 +205,7 @@ func TestCurrentStats(t *testing.T) {
|
||||
base.ActiveKey("critical"): {},
|
||||
base.ActiveKey("low"): {},
|
||||
},
|
||||
scheduled: map[string][]*redis.Z{
|
||||
scheduled: map[string][]redis.Z{
|
||||
base.ScheduledKey("default"): {
|
||||
{Member: m3.ID, Score: float64(now.Add(time.Hour).Unix())},
|
||||
{Member: m4.ID, Score: float64(now.Unix())},
|
||||
@@ -213,17 +213,17 @@ func TestCurrentStats(t *testing.T) {
|
||||
base.ScheduledKey("critical"): {},
|
||||
base.ScheduledKey("low"): {},
|
||||
},
|
||||
retry: map[string][]*redis.Z{
|
||||
retry: map[string][]redis.Z{
|
||||
base.RetryKey("default"): {},
|
||||
base.RetryKey("critical"): {},
|
||||
base.RetryKey("low"): {},
|
||||
},
|
||||
archived: map[string][]*redis.Z{
|
||||
archived: map[string][]redis.Z{
|
||||
base.ArchivedKey("default"): {},
|
||||
base.ArchivedKey("critical"): {},
|
||||
base.ArchivedKey("low"): {},
|
||||
},
|
||||
completed: map[string][]*redis.Z{
|
||||
completed: map[string][]redis.Z{
|
||||
base.CompletedKey("default"): {},
|
||||
base.CompletedKey("critical"): {},
|
||||
base.CompletedKey("low"): {},
|
||||
@@ -250,7 +250,7 @@ func TestCurrentStats(t *testing.T) {
|
||||
},
|
||||
oldestPendingMessageEnqueueTime: map[string]time.Time{
|
||||
"default": now.Add(-15 * time.Second),
|
||||
"critical": time.Time{}, // zero value since there's no pending task in this queue
|
||||
"critical": {}, // zero value since there's no pending task in this queue
|
||||
"low": now.Add(-30 * time.Second),
|
||||
},
|
||||
paused: []string{"critical", "low"},
|
||||
@@ -392,7 +392,6 @@ func TestHistoricalStats(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestRedisInfo(t *testing.T) {
|
||||
@@ -436,7 +435,7 @@ func TestGroupStats(t *testing.T) {
|
||||
fixtures := struct {
|
||||
tasks []*h.TaskSeedData
|
||||
allGroups map[string][]string
|
||||
groups map[string][]*redis.Z
|
||||
groups map[string][]redis.Z
|
||||
}{
|
||||
tasks: []*h.TaskSeedData{
|
||||
{Msg: m1, State: base.TaskStateAggregating},
|
||||
@@ -449,7 +448,7 @@ func TestGroupStats(t *testing.T) {
|
||||
base.AllGroups("default"): {"group1", "group2"},
|
||||
base.AllGroups("custom"): {"group1"},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "group1"): {
|
||||
{Member: m1.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
|
||||
{Member: m2.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
|
||||
@@ -487,7 +486,7 @@ func TestGroupStats(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
var sortGroupStatsOpt = cmp.Transformer(
|
||||
sortGroupStatsOpt := cmp.Transformer(
|
||||
"SortGroupStats",
|
||||
func(in []*GroupStat) []*GroupStat {
|
||||
out := append([]*GroupStat(nil), in...)
|
||||
@@ -1509,7 +1508,6 @@ func TestListCompleted(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestListCompletedPagination(t *testing.T) {
|
||||
@@ -1585,7 +1583,7 @@ func TestListAggregating(t *testing.T) {
|
||||
tasks []*h.TaskSeedData
|
||||
allQueues []string
|
||||
allGroups map[string][]string
|
||||
groups map[string][]*redis.Z
|
||||
groups map[string][]redis.Z
|
||||
}{
|
||||
tasks: []*h.TaskSeedData{
|
||||
{Msg: m1, State: base.TaskStateAggregating},
|
||||
@@ -1598,7 +1596,7 @@ func TestListAggregating(t *testing.T) {
|
||||
base.AllGroups("default"): {"group1", "group2"},
|
||||
base.AllGroups("custom"): {"group3"},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "group1"): {
|
||||
{Member: m1.ID, Score: float64(now.Add(-30 * time.Second).Unix())},
|
||||
{Member: m2.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
|
||||
@@ -1665,14 +1663,14 @@ func TestListAggregatingPagination(t *testing.T) {
|
||||
tasks []*h.TaskSeedData
|
||||
allQueues []string
|
||||
allGroups map[string][]string
|
||||
groups map[string][]*redis.Z
|
||||
groups map[string][]redis.Z
|
||||
}{
|
||||
tasks: []*h.TaskSeedData{}, // will be populated below
|
||||
allQueues: []string{"default"},
|
||||
allGroups: map[string][]string{
|
||||
base.AllGroups("default"): {"mygroup"},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
groupkey: {}, // will be populated below
|
||||
},
|
||||
}
|
||||
@@ -1683,7 +1681,7 @@ func TestListAggregatingPagination(t *testing.T) {
|
||||
fxt.tasks = append(fxt.tasks, &h.TaskSeedData{
|
||||
Msg: msg, State: base.TaskStateAggregating,
|
||||
})
|
||||
fxt.groups[groupkey] = append(fxt.groups[groupkey], &redis.Z{
|
||||
fxt.groups[groupkey] = append(fxt.groups[groupkey], redis.Z{
|
||||
Member: msg.ID,
|
||||
Score: float64(now.Add(-time.Duration(100-i) * time.Second).Unix()),
|
||||
})
|
||||
@@ -1999,7 +1997,7 @@ func TestRunAggregatingTask(t *testing.T) {
|
||||
tasks []*h.TaskSeedData
|
||||
allQueues []string
|
||||
allGroups map[string][]string
|
||||
groups map[string][]*redis.Z
|
||||
groups map[string][]redis.Z
|
||||
}{
|
||||
tasks: []*h.TaskSeedData{
|
||||
{Msg: m1, State: base.TaskStateAggregating},
|
||||
@@ -2011,7 +2009,7 @@ func TestRunAggregatingTask(t *testing.T) {
|
||||
base.AllGroups("default"): {"group1"},
|
||||
base.AllGroups("custom"): {"group1"},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "group1"): {
|
||||
{Member: m1.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
|
||||
{Member: m2.ID, Score: float64(now.Add(-25 * time.Second).Unix())},
|
||||
@@ -2324,7 +2322,6 @@ func TestRunTaskError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestRunAllScheduledTasks(t *testing.T) {
|
||||
@@ -2691,7 +2688,7 @@ func TestRunAllAggregatingTasks(t *testing.T) {
|
||||
tasks []*h.TaskSeedData
|
||||
allQueues []string
|
||||
allGroups map[string][]string
|
||||
groups map[string][]*redis.Z
|
||||
groups map[string][]redis.Z
|
||||
}{
|
||||
tasks: []*h.TaskSeedData{
|
||||
{Msg: m1, State: base.TaskStateAggregating},
|
||||
@@ -2703,7 +2700,7 @@ func TestRunAllAggregatingTasks(t *testing.T) {
|
||||
base.AllGroups("default"): {"group1"},
|
||||
base.AllGroups("custom"): {"group2"},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "group1"): {
|
||||
{Member: m1.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
|
||||
{Member: m2.ID, Score: float64(now.Add(-25 * time.Second).Unix())},
|
||||
@@ -3001,7 +2998,7 @@ func TestArchiveAggregatingTask(t *testing.T) {
|
||||
tasks []*h.TaskSeedData
|
||||
allQueues []string
|
||||
allGroups map[string][]string
|
||||
groups map[string][]*redis.Z
|
||||
groups map[string][]redis.Z
|
||||
}{
|
||||
tasks: []*h.TaskSeedData{
|
||||
{Msg: m1, State: base.TaskStateAggregating},
|
||||
@@ -3013,7 +3010,7 @@ func TestArchiveAggregatingTask(t *testing.T) {
|
||||
base.AllGroups("default"): {"group1"},
|
||||
base.AllGroups("custom"): {"group1"},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "group1"): {
|
||||
{Member: m1.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
|
||||
{Member: m2.ID, Score: float64(now.Add(-25 * time.Second).Unix())},
|
||||
@@ -3335,6 +3332,7 @@ func TestArchiveTaskError(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestArchiveAllPendingTasks(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
@@ -3485,7 +3483,7 @@ func TestArchiveAllAggregatingTasks(t *testing.T) {
|
||||
tasks []*h.TaskSeedData
|
||||
allQueues []string
|
||||
allGroups map[string][]string
|
||||
groups map[string][]*redis.Z
|
||||
groups map[string][]redis.Z
|
||||
}{
|
||||
tasks: []*h.TaskSeedData{
|
||||
{Msg: m1, State: base.TaskStateAggregating},
|
||||
@@ -3497,7 +3495,7 @@ func TestArchiveAllAggregatingTasks(t *testing.T) {
|
||||
base.AllGroups("default"): {"group1"},
|
||||
base.AllGroups("custom"): {"group2"},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "group1"): {
|
||||
{Member: m1.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
|
||||
{Member: m2.ID, Score: float64(now.Add(-25 * time.Second).Unix())},
|
||||
@@ -4124,7 +4122,7 @@ func TestDeleteAggregatingTask(t *testing.T) {
|
||||
tasks []*h.TaskSeedData
|
||||
allQueues []string
|
||||
allGroups map[string][]string
|
||||
groups map[string][]*redis.Z
|
||||
groups map[string][]redis.Z
|
||||
}{
|
||||
tasks: []*h.TaskSeedData{
|
||||
{Msg: m1, State: base.TaskStateAggregating},
|
||||
@@ -4136,7 +4134,7 @@ func TestDeleteAggregatingTask(t *testing.T) {
|
||||
base.AllGroups("default"): {"group1"},
|
||||
base.AllGroups("custom"): {"group1"},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "group1"): {
|
||||
{Member: m1.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
|
||||
{Member: m2.ID, Score: float64(now.Add(-25 * time.Second).Unix())},
|
||||
@@ -4758,7 +4756,7 @@ func TestDeleteAllAggregatingTasks(t *testing.T) {
|
||||
tasks []*h.TaskSeedData
|
||||
allQueues []string
|
||||
allGroups map[string][]string
|
||||
groups map[string][]*redis.Z
|
||||
groups map[string][]redis.Z
|
||||
}{
|
||||
tasks: []*h.TaskSeedData{
|
||||
{Msg: m1, State: base.TaskStateAggregating},
|
||||
@@ -4770,7 +4768,7 @@ func TestDeleteAllAggregatingTasks(t *testing.T) {
|
||||
base.AllGroups("default"): {"group1"},
|
||||
base.AllGroups("custom"): {"group1"},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "group1"): {
|
||||
{Member: m1.ID, Score: float64(now.Add(-20 * time.Second).Unix())},
|
||||
{Member: m2.ID, Score: float64(now.Add(-25 * time.Second).Unix())},
|
||||
|
||||
@@ -11,11 +11,11 @@ import (
|
||||
"math"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/google/uuid"
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/errors"
|
||||
"github.com/hibiken/asynq/internal/timeutil"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/spf13/cast"
|
||||
)
|
||||
|
||||
@@ -67,7 +67,7 @@ func (r *RDB) runScript(ctx context.Context, op errors.Op, script *redis.Script,
|
||||
return nil
|
||||
}
|
||||
|
||||
// Runs the given script with keys and args and retuns the script's return value as int64.
|
||||
// Runs the given script with keys and args and returns the script's return value as int64.
|
||||
func (r *RDB) runScriptWithErrorCode(ctx context.Context, op errors.Op, script *redis.Script, keys []string, args ...interface{}) (int64, error) {
|
||||
res, err := script.Run(ctx, r.client, keys, args...).Result()
|
||||
if err != nil {
|
||||
@@ -152,7 +152,7 @@ func (r *RDB) Enqueue(ctx context.Context, msg *base.TaskMessage) error {
|
||||
var enqueueUniqueCmd = redis.NewScript(`
|
||||
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
|
||||
if not ok then
|
||||
return -1
|
||||
return -1
|
||||
end
|
||||
if redis.call("EXISTS", KEYS[2]) == 1 then
|
||||
return 0
|
||||
@@ -368,7 +368,7 @@ func (r *RDB) Done(ctx context.Context, msg *base.TaskMessage) error {
|
||||
//
|
||||
// ARGV[1] -> task ID
|
||||
// ARGV[2] -> stats expiration timestamp
|
||||
// ARGV[3] -> task exipration time in unix time
|
||||
// ARGV[3] -> task expiration time in unix time
|
||||
// ARGV[4] -> task message data
|
||||
// ARGV[5] -> max int64 value
|
||||
var markAsCompleteCmd = redis.NewScript(`
|
||||
@@ -379,7 +379,7 @@ if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
|
||||
return redis.error_reply("NOT FOUND")
|
||||
end
|
||||
if redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) ~= 1 then
|
||||
redis.redis.error_reply("INTERNAL")
|
||||
return redis.error_reply("INTERNAL")
|
||||
end
|
||||
redis.call("HSET", KEYS[4], "msg", ARGV[4], "state", "completed")
|
||||
local n = redis.call("INCR", KEYS[5])
|
||||
@@ -405,7 +405,7 @@ return redis.status_reply("OK")
|
||||
//
|
||||
// ARGV[1] -> task ID
|
||||
// ARGV[2] -> stats expiration timestamp
|
||||
// ARGV[3] -> task exipration time in unix time
|
||||
// ARGV[3] -> task expiration time in unix time
|
||||
// ARGV[4] -> task message data
|
||||
// ARGV[5] -> max int64 value
|
||||
var markAsCompleteUniqueCmd = redis.NewScript(`
|
||||
@@ -416,7 +416,7 @@ if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
|
||||
return redis.error_reply("NOT FOUND")
|
||||
end
|
||||
if redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1]) ~= 1 then
|
||||
redis.redis.error_reply("INTERNAL")
|
||||
return redis.error_reply("INTERNAL")
|
||||
end
|
||||
redis.call("HSET", KEYS[4], "msg", ARGV[4], "state", "completed")
|
||||
local n = redis.call("INCR", KEYS[5])
|
||||
@@ -1086,7 +1086,7 @@ const aggregationTimeout = 2 * time.Minute
|
||||
// The time for gracePeriod and maxDelay is computed relative to the time t.
|
||||
//
|
||||
// Note: It assumes that this function is called at frequency less than or equal to the gracePeriod. In other words,
|
||||
// the function only checks the most recently added task aganist the given gracePeriod.
|
||||
// the function only checks the most recently added task against the given gracePeriod.
|
||||
func (r *RDB) AggregationCheck(qname, gname string, t time.Time, gracePeriod, maxDelay time.Duration, maxSize int) (string, error) {
|
||||
var op errors.Op = "RDB.AggregationCheck"
|
||||
aggregationSetID := uuid.NewString()
|
||||
@@ -1319,9 +1319,9 @@ func (r *RDB) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.Task
|
||||
// It returns a new expiration time if the operation was successful.
|
||||
func (r *RDB) ExtendLease(qname string, ids ...string) (expirationTime time.Time, err error) {
|
||||
expireAt := r.clock.Now().Add(LeaseDuration)
|
||||
var zs []*redis.Z
|
||||
var zs []redis.Z
|
||||
for _, id := range ids {
|
||||
zs = append(zs, &redis.Z{Member: id, Score: float64(expireAt.Unix())})
|
||||
zs = append(zs, redis.Z{Member: id, Score: float64(expireAt.Unix())})
|
||||
}
|
||||
// Use XX option to only update elements that already exist; Don't add new elements
|
||||
// TODO: Consider adding GT option to ensure we only "extend" the lease. Ceveat is that GT is supported from redis v6.2.0 or above.
|
||||
@@ -1367,10 +1367,10 @@ func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo
|
||||
}
|
||||
skey := base.ServerInfoKey(info.Host, info.PID, info.ServerID)
|
||||
wkey := base.WorkersKey(info.Host, info.PID, info.ServerID)
|
||||
if err := r.client.ZAdd(ctx, base.AllServers, &redis.Z{Score: float64(exp.Unix()), Member: skey}).Err(); err != nil {
|
||||
if err := r.client.ZAdd(ctx, base.AllServers, redis.Z{Score: float64(exp.Unix()), Member: skey}).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||
}
|
||||
if err := r.client.ZAdd(ctx, base.AllWorkers, &redis.Z{Score: float64(exp.Unix()), Member: wkey}).Err(); err != nil {
|
||||
if err := r.client.ZAdd(ctx, base.AllWorkers, redis.Z{Score: float64(exp.Unix()), Member: wkey}).Err(); err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zadd", Err: err})
|
||||
}
|
||||
return r.runScript(ctx, op, writeServerStateCmd, []string{skey, wkey}, args...)
|
||||
@@ -1423,7 +1423,7 @@ func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.Schedule
|
||||
}
|
||||
exp := r.clock.Now().Add(ttl).UTC()
|
||||
key := base.SchedulerEntriesKey(schedulerID)
|
||||
err := r.client.ZAdd(ctx, base.AllSchedulers, &redis.Z{Score: float64(exp.Unix()), Member: key}).Err()
|
||||
err := r.client.ZAdd(ctx, base.AllSchedulers, redis.Z{Score: float64(exp.Unix()), Member: key}).Err()
|
||||
if err != nil {
|
||||
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zadd", Err: err})
|
||||
}
|
||||
|
||||
@@ -15,7 +15,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/google/uuid"
|
||||
@@ -23,6 +22,7 @@ import (
|
||||
"github.com/hibiken/asynq/internal/errors"
|
||||
h "github.com/hibiken/asynq/internal/testutil"
|
||||
"github.com/hibiken/asynq/internal/timeutil"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// variables used for package testing.
|
||||
@@ -1272,7 +1272,6 @@ func TestAddToGroupeTaskIdConflictError(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestAddToGroupUnique(t *testing.T) {
|
||||
@@ -1356,7 +1355,6 @@ func TestAddToGroupUnique(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestAddToGroupUniqueTaskIdConflictError(t *testing.T) {
|
||||
@@ -1398,7 +1396,6 @@ func TestAddToGroupUniqueTaskIdConflictError(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestSchedule(t *testing.T) {
|
||||
@@ -3122,7 +3119,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
desc string
|
||||
// initial data
|
||||
tasks []*h.TaskSeedData
|
||||
groups map[string][]*redis.Z
|
||||
groups map[string][]redis.Z
|
||||
allGroups map[string][]string
|
||||
|
||||
// args
|
||||
@@ -3141,7 +3138,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
{
|
||||
desc: "with an empty group",
|
||||
tasks: []*h.TaskSeedData{},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "mygroup"): {},
|
||||
},
|
||||
allGroups: map[string][]string{
|
||||
@@ -3168,7 +3165,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
{Msg: msg4, State: base.TaskStateAggregating},
|
||||
{Msg: msg5, State: base.TaskStateAggregating},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "mygroup"): {
|
||||
{Member: msg1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())},
|
||||
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
@@ -3201,7 +3198,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
{Msg: msg4, State: base.TaskStateAggregating},
|
||||
{Msg: msg5, State: base.TaskStateAggregating},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "mygroup"): {
|
||||
{Member: msg1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())},
|
||||
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
@@ -3235,7 +3232,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
{Msg: msg2, State: base.TaskStateAggregating},
|
||||
{Msg: msg3, State: base.TaskStateAggregating},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "mygroup"): {
|
||||
{Member: msg1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())},
|
||||
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
@@ -3266,7 +3263,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
{Msg: msg4, State: base.TaskStateAggregating},
|
||||
{Msg: msg5, State: base.TaskStateAggregating},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "mygroup"): {
|
||||
{Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())},
|
||||
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
@@ -3299,7 +3296,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
{Msg: msg4, State: base.TaskStateAggregating},
|
||||
{Msg: msg5, State: base.TaskStateAggregating},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "mygroup"): {
|
||||
{Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())},
|
||||
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
@@ -3338,7 +3335,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
{Msg: msg4, State: base.TaskStateAggregating},
|
||||
{Msg: msg5, State: base.TaskStateAggregating},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "mygroup"): {
|
||||
{Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())},
|
||||
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
@@ -3371,7 +3368,7 @@ func TestAggregationCheck(t *testing.T) {
|
||||
{Msg: msg4, State: base.TaskStateAggregating},
|
||||
{Msg: msg5, State: base.TaskStateAggregating},
|
||||
},
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "mygroup"): {
|
||||
{Member: msg1.ID, Score: float64(now.Add(-15 * time.Minute).Unix())},
|
||||
{Member: msg2.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
@@ -3473,8 +3470,8 @@ func TestDeleteAggregationSet(t *testing.T) {
|
||||
desc string
|
||||
// initial data
|
||||
tasks []*h.TaskSeedData
|
||||
aggregationSets map[string][]*redis.Z
|
||||
allAggregationSets map[string][]*redis.Z
|
||||
aggregationSets map[string][]redis.Z
|
||||
allAggregationSets map[string][]redis.Z
|
||||
|
||||
// args
|
||||
ctx context.Context
|
||||
@@ -3494,14 +3491,14 @@ func TestDeleteAggregationSet(t *testing.T) {
|
||||
{Msg: m2, State: base.TaskStateAggregating},
|
||||
{Msg: m3, State: base.TaskStateAggregating},
|
||||
},
|
||||
aggregationSets: map[string][]*redis.Z{
|
||||
aggregationSets: map[string][]redis.Z{
|
||||
base.AggregationSetKey("default", "mygroup", setID): {
|
||||
{Member: m1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())},
|
||||
{Member: m2.ID, Score: float64(now.Add(-4 * time.Minute).Unix())},
|
||||
{Member: m3.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
},
|
||||
},
|
||||
allAggregationSets: map[string][]*redis.Z{
|
||||
allAggregationSets: map[string][]redis.Z{
|
||||
base.AllAggregationSets("default"): {
|
||||
{Member: base.AggregationSetKey("default", "mygroup", setID), Score: float64(now.Add(aggregationTimeout).Unix())},
|
||||
},
|
||||
@@ -3528,7 +3525,7 @@ func TestDeleteAggregationSet(t *testing.T) {
|
||||
{Msg: m2, State: base.TaskStateAggregating},
|
||||
{Msg: m3, State: base.TaskStateAggregating},
|
||||
},
|
||||
aggregationSets: map[string][]*redis.Z{
|
||||
aggregationSets: map[string][]redis.Z{
|
||||
base.AggregationSetKey("default", "mygroup", setID): {
|
||||
{Member: m1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())},
|
||||
},
|
||||
@@ -3537,7 +3534,7 @@ func TestDeleteAggregationSet(t *testing.T) {
|
||||
{Member: m3.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
},
|
||||
},
|
||||
allAggregationSets: map[string][]*redis.Z{
|
||||
allAggregationSets: map[string][]redis.Z{
|
||||
base.AllAggregationSets("default"): {
|
||||
{Member: base.AggregationSetKey("default", "mygroup", setID), Score: float64(now.Add(aggregationTimeout).Unix())},
|
||||
{Member: base.AggregationSetKey("default", "mygroup", otherSetID), Score: float64(now.Add(aggregationTimeout).Unix())},
|
||||
@@ -3602,8 +3599,8 @@ func TestDeleteAggregationSetError(t *testing.T) {
|
||||
desc string
|
||||
// initial data
|
||||
tasks []*h.TaskSeedData
|
||||
aggregationSets map[string][]*redis.Z
|
||||
allAggregationSets map[string][]*redis.Z
|
||||
aggregationSets map[string][]redis.Z
|
||||
allAggregationSets map[string][]redis.Z
|
||||
|
||||
// args
|
||||
ctx context.Context
|
||||
@@ -3622,14 +3619,14 @@ func TestDeleteAggregationSetError(t *testing.T) {
|
||||
{Msg: m2, State: base.TaskStateAggregating},
|
||||
{Msg: m3, State: base.TaskStateAggregating},
|
||||
},
|
||||
aggregationSets: map[string][]*redis.Z{
|
||||
aggregationSets: map[string][]redis.Z{
|
||||
base.AggregationSetKey("default", "mygroup", setID): {
|
||||
{Member: m1.ID, Score: float64(now.Add(-5 * time.Minute).Unix())},
|
||||
{Member: m2.ID, Score: float64(now.Add(-4 * time.Minute).Unix())},
|
||||
{Member: m3.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
},
|
||||
},
|
||||
allAggregationSets: map[string][]*redis.Z{
|
||||
allAggregationSets: map[string][]redis.Z{
|
||||
base.AllAggregationSets("default"): {
|
||||
{Member: base.AggregationSetKey("default", "mygroup", setID), Score: float64(now.Add(aggregationTimeout).Unix())},
|
||||
},
|
||||
@@ -3688,23 +3685,23 @@ func TestReclaimStaleAggregationSets(t *testing.T) {
|
||||
// Note: In this test, we're trying out a new way to test RDB by exactly describing how
|
||||
// keys and values are represented in Redis.
|
||||
tests := []struct {
|
||||
groups map[string][]*redis.Z // map redis-key to redis-zset
|
||||
aggregationSets map[string][]*redis.Z
|
||||
allAggregationSets map[string][]*redis.Z
|
||||
groups map[string][]redis.Z // map redis-key to redis-zset
|
||||
aggregationSets map[string][]redis.Z
|
||||
allAggregationSets map[string][]redis.Z
|
||||
qname string
|
||||
wantGroups map[string][]redis.Z
|
||||
wantAggregationSets map[string][]redis.Z
|
||||
wantAllAggregationSets map[string][]redis.Z
|
||||
}{
|
||||
{
|
||||
groups: map[string][]*redis.Z{
|
||||
groups: map[string][]redis.Z{
|
||||
base.GroupKey("default", "foo"): {},
|
||||
base.GroupKey("default", "bar"): {},
|
||||
base.GroupKey("default", "qux"): {
|
||||
{Member: m4.ID, Score: float64(now.Add(-10 * time.Second).Unix())},
|
||||
},
|
||||
},
|
||||
aggregationSets: map[string][]*redis.Z{
|
||||
aggregationSets: map[string][]redis.Z{
|
||||
base.AggregationSetKey("default", "foo", "set1"): {
|
||||
{Member: m1.ID, Score: float64(now.Add(-3 * time.Minute).Unix())},
|
||||
{Member: m2.ID, Score: float64(now.Add(-4 * time.Minute).Unix())},
|
||||
@@ -3713,7 +3710,7 @@ func TestReclaimStaleAggregationSets(t *testing.T) {
|
||||
{Member: m3.ID, Score: float64(now.Add(-1 * time.Minute).Unix())},
|
||||
},
|
||||
},
|
||||
allAggregationSets: map[string][]*redis.Z{
|
||||
allAggregationSets: map[string][]redis.Z{
|
||||
base.AllAggregationSets("default"): {
|
||||
{Member: base.AggregationSetKey("default", "foo", "set1"), Score: float64(now.Add(-10 * time.Second).Unix())}, // set1 is expired
|
||||
{Member: base.AggregationSetKey("default", "bar", "set2"), Score: float64(now.Add(40 * time.Second).Unix())}, // set2 is not expired
|
||||
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
)
|
||||
|
||||
|
||||
@@ -13,12 +13,12 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/google/uuid"
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/timeutil"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// EquateInt64Approx returns a Comparer option that treats int64 values
|
||||
@@ -252,7 +252,7 @@ func SeedLease(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname s
|
||||
seedRedisZSet(tb, r, base.LeaseKey(qname), entries, base.TaskStateActive)
|
||||
}
|
||||
|
||||
// SeedCompletedQueue initializes the completed set witht the given entries.
|
||||
// SeedCompletedQueue initializes the completed set with the given entries.
|
||||
func SeedCompletedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) {
|
||||
tb.Helper()
|
||||
r.SAdd(context.Background(), base.AllQueues, qname)
|
||||
@@ -377,7 +377,7 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string,
|
||||
for _, item := range items {
|
||||
msg := item.Message
|
||||
encoded := MustMarshal(tb, msg)
|
||||
z := &redis.Z{Member: msg.ID, Score: float64(item.Score)}
|
||||
z := redis.Z{Member: msg.ID, Score: float64(item.Score)}
|
||||
if err := c.ZAdd(context.Background(), key, z).Err(); err != nil {
|
||||
tb.Fatal(err)
|
||||
}
|
||||
@@ -570,7 +570,7 @@ func SeedTasks(tb testing.TB, r redis.UniversalClient, taskData []*TaskSeedData)
|
||||
}
|
||||
}
|
||||
|
||||
func SeedRedisZSets(tb testing.TB, r redis.UniversalClient, zsets map[string][]*redis.Z) {
|
||||
func SeedRedisZSets(tb testing.TB, r redis.UniversalClient, zsets map[string][]redis.Z) {
|
||||
for key, zs := range zsets {
|
||||
// FIXME: How come we can't simply do ZAdd(ctx, key, zs...) here?
|
||||
for _, z := range zs {
|
||||
|
||||
Reference in New Issue
Block a user