2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Update RDB.Schedule

This commit is contained in:
Ken Hibino 2021-02-21 16:54:11 -08:00
parent 4ba0e46f96
commit 69b7aa22cd
3 changed files with 75 additions and 12 deletions

View File

@ -323,43 +323,94 @@ func GetActiveMessages(tb testing.TB, r redis.UniversalClient, qname string) []*
// GetScheduledMessages returns all scheduled task messages in the given queue. // GetScheduledMessages returns all scheduled task messages in the given queue.
func GetScheduledMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { func GetScheduledMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper() tb.Helper()
return getZSetMessages(tb, r, base.ScheduledKey(qname)) ids := r.ZRange(base.ScheduledKey(qname), 0, -1).Val()
var msgs []*base.TaskMessage
for _, id := range ids {
msg := r.Get(base.TaskKey(qname, id)).Val()
msgs = append(msgs, MustUnmarshal(tb, msg))
}
return msgs
} }
// GetRetryMessages returns all retry messages in the given queue. // GetRetryMessages returns all retry messages in the given queue.
func GetRetryMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { func GetRetryMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper() tb.Helper()
return getZSetMessages(tb, r, base.RetryKey(qname)) ids := r.ZRange(base.RetryKey(qname), 0, -1).Val()
var msgs []*base.TaskMessage
for _, id := range ids {
msg := r.Get(base.TaskKey(qname, id)).Val()
msgs = append(msgs, MustUnmarshal(tb, msg))
}
return msgs
} }
// GetArchivedMessages returns all archived messages in the given queue. // GetArchivedMessages returns all archived messages in the given queue.
func GetArchivedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { func GetArchivedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper() tb.Helper()
return getZSetMessages(tb, r, base.ArchivedKey(qname)) ids := r.ZRange(base.ArchivedKey(qname), 0, -1).Val()
var msgs []*base.TaskMessage
for _, id := range ids {
msg := r.Get(base.TaskKey(qname, id)).Val()
msgs = append(msgs, MustUnmarshal(tb, msg))
}
return msgs
} }
// GetScheduledEntries returns all scheduled messages and its score in the given queue. // GetScheduledEntries returns all scheduled messages and its score in the given queue.
func GetScheduledEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { func GetScheduledEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
tb.Helper() tb.Helper()
return getZSetEntries(tb, r, base.ScheduledKey(qname)) zs := r.ZRangeWithScores(base.ScheduledKey(qname), 0, -1).Val()
var res []base.Z
for _, z := range zs {
msg := r.Get(base.TaskKey(qname, z.Member.(string))).Val()
res = append(res, base.Z{Message: MustUnmarshal(tb, msg), Score: int64(z.Score)})
}
return res
} }
// GetRetryEntries returns all retry messages and its score in the given queue. // GetRetryEntries returns all retry messages and its score in the given queue.
func GetRetryEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { func GetRetryEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
tb.Helper() tb.Helper()
return getZSetEntries(tb, r, base.RetryKey(qname)) zs := r.ZRangeWithScores(base.RetryKey(qname), 0, -1).Val()
var res []base.Z
for _, z := range zs {
msg := r.Get(base.TaskKey(qname, z.Member.(string))).Val()
res = append(res, base.Z{
Message: MustUnmarshal(tb, msg),
Score: int64(z.Score),
})
}
return res
} }
// GetArchivedEntries returns all archived messages and its score in the given queue. // GetArchivedEntries returns all archived messages and its score in the given queue.
func GetArchivedEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { func GetArchivedEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
tb.Helper() tb.Helper()
return getZSetEntries(tb, r, base.ArchivedKey(qname)) zs := r.ZRangeWithScores(base.ArchivedKey(qname), 0, -1).Val()
var res []base.Z
for _, z := range zs {
msg := r.Get(base.TaskKey(qname, z.Member.(string))).Val()
res = append(res, base.Z{
Message: MustUnmarshal(tb, msg),
Score: int64(z.Score),
})
}
return res
} }
// GetDeadlinesEntries returns all task messages and its score in the deadlines set for the given queue. // GetDeadlinesEntries returns all task messages and its score in the deadlines set for the given queue.
func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { func GetDeadlinesEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z {
tb.Helper() tb.Helper()
return getZSetEntries(tb, r, base.DeadlinesKey(qname)) zs := r.ZRangeWithScores(base.DeadlinesKey(qname), 0, -1).Val()
var res []base.Z
for _, z := range zs {
msg := r.Get(base.TaskKey(qname, z.Member.(string))).Val()
res = append(res, base.Z{
Message: MustUnmarshal(tb, msg),
Score: int64(z.Score),
})
}
return res
} }
func getListMessages(tb testing.TB, r redis.UniversalClient, list string) []*base.TaskMessage { func getListMessages(tb testing.TB, r redis.UniversalClient, list string) []*base.TaskMessage {

View File

@ -288,7 +288,18 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error {
encoded).Err() encoded).Err()
} }
// Schedule adds the task to the backlog queue to be processed in the future. // KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:scheduled
// ARGV[1] -> task message data
// ARGV[2] -> process_at time in Unix time
// ARGV[3] -> task ID
var scheduleCmd = redis.NewScript(`
redis.call("SET", KEYS[1], ARGV[1])
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3])
return 1
`)
// Schedule adds the task to the scheduled set to be processed in the future.
func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error {
encoded, err := base.EncodeMessage(msg) encoded, err := base.EncodeMessage(msg)
if err != nil { if err != nil {
@ -297,8 +308,9 @@ func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error {
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
return err return err
} }
score := float64(processAt.Unix()) keys := []string{base.TaskKey(msg.Queue, msg.ID.String()), base.ScheduledKey(msg.Queue)}
return r.client.ZAdd(base.ScheduledKey(msg.Queue), &redis.Z{Score: score, Member: encoded}).Err() args := []interface{}{encoded, processAt.Unix(), msg.ID.String()}
return scheduleCmd.Run(r.client, keys, args...).Err()
} }
// KEYS[1] -> unique key // KEYS[1] -> unique key

View File

@ -765,12 +765,12 @@ func TestRequeue(t *testing.T) {
func TestSchedule(t *testing.T) { func TestSchedule(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) msg := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})
tests := []struct { tests := []struct {
msg *base.TaskMessage msg *base.TaskMessage
processAt time.Time processAt time.Time
}{ }{
{t1, time.Now().Add(15 * time.Minute)}, {msg, time.Now().Add(15 * time.Minute)},
} }
for _, tc := range tests { for _, tc := range tests {