diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 935bc1f..061b6ef 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -323,43 +323,94 @@ func GetActiveMessages(tb testing.TB, r redis.UniversalClient, qname string) []* // GetScheduledMessages returns all scheduled task messages in the given queue. func GetScheduledMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { tb.Helper() - return getZSetMessages(tb, r, base.ScheduledKey(qname)) + ids := r.ZRange(base.ScheduledKey(qname), 0, -1).Val() + var msgs []*base.TaskMessage + for _, id := range ids { + msg := r.Get(base.TaskKey(qname, id)).Val() + msgs = append(msgs, MustUnmarshal(tb, msg)) + } + return msgs } // GetRetryMessages returns all retry messages in the given queue. func GetRetryMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { tb.Helper() - return getZSetMessages(tb, r, base.RetryKey(qname)) + ids := r.ZRange(base.RetryKey(qname), 0, -1).Val() + var msgs []*base.TaskMessage + for _, id := range ids { + msg := r.Get(base.TaskKey(qname, id)).Val() + msgs = append(msgs, MustUnmarshal(tb, msg)) + } + return msgs } // GetArchivedMessages returns all archived messages in the given queue. func GetArchivedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { tb.Helper() - return getZSetMessages(tb, r, base.ArchivedKey(qname)) + ids := r.ZRange(base.ArchivedKey(qname), 0, -1).Val() + var msgs []*base.TaskMessage + for _, id := range ids { + msg := r.Get(base.TaskKey(qname, id)).Val() + msgs = append(msgs, MustUnmarshal(tb, msg)) + } + return msgs } // GetScheduledEntries returns all scheduled messages and its score in the given queue. func GetScheduledEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { tb.Helper() - return getZSetEntries(tb, r, base.ScheduledKey(qname)) + zs := r.ZRangeWithScores(base.ScheduledKey(qname), 0, -1).Val() + var res []base.Z + for _, z := range zs { + msg := r.Get(base.TaskKey(qname, z.Member.(string))).Val() + res = append(res, base.Z{Message: MustUnmarshal(tb, msg), Score: int64(z.Score)}) + } + return res } // GetRetryEntries returns all retry messages and its score in the given queue. func GetRetryEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { tb.Helper() - return getZSetEntries(tb, r, base.RetryKey(qname)) + zs := r.ZRangeWithScores(base.RetryKey(qname), 0, -1).Val() + var res []base.Z + for _, z := range zs { + msg := r.Get(base.TaskKey(qname, z.Member.(string))).Val() + res = append(res, base.Z{ + Message: MustUnmarshal(tb, msg), + Score: int64(z.Score), + }) + } + return res } // GetArchivedEntries returns all archived messages and its score in the given queue. func GetArchivedEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { tb.Helper() - return getZSetEntries(tb, r, base.ArchivedKey(qname)) + zs := r.ZRangeWithScores(base.ArchivedKey(qname), 0, -1).Val() + var res []base.Z + for _, z := range zs { + msg := r.Get(base.TaskKey(qname, z.Member.(string))).Val() + res = append(res, base.Z{ + Message: MustUnmarshal(tb, msg), + Score: int64(z.Score), + }) + } + return res } // GetDeadlinesEntries returns all task messages and its score in the deadlines set for the given queue. func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { tb.Helper() - return getZSetEntries(tb, r, base.DeadlinesKey(qname)) + zs := r.ZRangeWithScores(base.DeadlinesKey(qname), 0, -1).Val() + var res []base.Z + for _, z := range zs { + msg := r.Get(base.TaskKey(qname, z.Member.(string))).Val() + res = append(res, base.Z{ + Message: MustUnmarshal(tb, msg), + Score: int64(z.Score), + }) + } + return res } func getListMessages(tb testing.TB, r redis.UniversalClient, list string) []*base.TaskMessage { diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 5666be6..35d5ad3 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -288,7 +288,18 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error { encoded).Err() } -// Schedule adds the task to the backlog queue to be processed in the future. +// KEYS[1] -> asynq:{}:t: +// KEYS[2] -> asynq:{}:scheduled +// ARGV[1] -> task message data +// ARGV[2] -> process_at time in Unix time +// ARGV[3] -> task ID +var scheduleCmd = redis.NewScript(` +redis.call("SET", KEYS[1], ARGV[1]) +redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3]) +return 1 +`) + +// Schedule adds the task to the scheduled set to be processed in the future. func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { encoded, err := base.EncodeMessage(msg) if err != nil { @@ -297,8 +308,9 @@ func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { return err } - score := float64(processAt.Unix()) - return r.client.ZAdd(base.ScheduledKey(msg.Queue), &redis.Z{Score: score, Member: encoded}).Err() + keys := []string{base.TaskKey(msg.Queue, msg.ID.String()), base.ScheduledKey(msg.Queue)} + args := []interface{}{encoded, processAt.Unix(), msg.ID.String()} + return scheduleCmd.Run(r.client, keys, args...).Err() } // KEYS[1] -> unique key diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 40b2885..a24aebf 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -765,12 +765,12 @@ func TestRequeue(t *testing.T) { func TestSchedule(t *testing.T) { r := setup(t) defer r.Close() - t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) + msg := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) tests := []struct { msg *base.TaskMessage processAt time.Time }{ - {t1, time.Now().Add(15 * time.Minute)}, + {msg, time.Now().Add(15 * time.Minute)}, } for _, tc := range tests {