2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 16:13:40 +08:00

Update CheckAndEnqueue method in RDB

This commit is contained in:
Ken Hibino 2020-08-09 06:26:14 -07:00
parent f6d504939e
commit 650d7fdbe9
2 changed files with 116 additions and 68 deletions

View File

@ -419,48 +419,56 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
msgToRemove, msgToAdd, now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Err() msgToRemove, msgToAdd, now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Err()
} }
// CheckAndEnqueue checks for all scheduled/retry tasks and enqueues any tasks that // CheckAndEnqueue checks for scheduled/retry tasks for the given queues
// are ready to be processed. //and enqueues any tasks that are ready to be processed.
func (r *RDB) CheckAndEnqueue() (err error) { func (r *RDB) CheckAndEnqueue(qnames ...string) error {
delayed := []string{base.ScheduledQueue, base.RetryQueue} for _, qname := range qnames {
for _, zset := range delayed { if err := r.forwardAll(base.ScheduledKey(qname), base.QueueKey(qname)); err != nil {
n := 1
for n != 0 {
n, err = r.forward(zset)
if err != nil {
return err return err
} }
if err := r.forwardAll(base.RetryKey(qname), base.QueueKey(qname)); err != nil {
return err
} }
} }
return nil return nil
} }
// KEYS[1] -> source queue (e.g. scheduled or retry queue) // KEYS[1] -> source queue (e.g. asynq:{<qname>:scheduled or asynq:{<qname>}:retry})
// KEYS[2] -> destination queue (e.g. asynq:{<qname>})
// ARGV[1] -> current unix time // ARGV[1] -> current unix time
// ARGV[2] -> queue prefix
// Note: Script moves tasks up to 100 at a time to keep the runtime of script short. // Note: Script moves tasks up to 100 at a time to keep the runtime of script short.
var forwardCmd = redis.NewScript(` var forwardCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100) local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)
for _, msg in ipairs(msgs) do for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg) redis.call("LPUSH", KEYS[2], msg)
local qkey = ARGV[2] .. decoded["Queue"]
redis.call("LPUSH", qkey, msg)
redis.call("ZREM", KEYS[1], msg) redis.call("ZREM", KEYS[1], msg)
end end
return table.getn(msgs)`) return table.getn(msgs)`)
// forward moves tasks with a score less than the current unix time // forward moves tasks with a score less than the current unix time
// from the src zset. It returns the number of tasks moved. // from the src zset to the dst list. It returns the number of tasks moved.
func (r *RDB) forward(src string) (int, error) { func (r *RDB) forward(src, dst string) (int, error) {
now := float64(time.Now().Unix()) now := float64(time.Now().Unix())
res, err := forwardCmd.Run(r.client, res, err := forwardCmd.Run(r.client, []string{src, dst}, now).Result()
[]string{src}, now, base.QueuePrefix).Result()
if err != nil { if err != nil {
return 0, err return 0, err
} }
return cast.ToInt(res), nil return cast.ToInt(res), nil
} }
// forwardAll moves tasks with a score less than the current unix time from the src zset,
// until there's no more tasks.
func (r *RDB) forwardAll(src, dst string) error {
n := 1
for n != 0 {
n, err = r.forward(src, dst)
if err != nil {
return err
}
}
return nil
}
// ListDeadlineExceeded returns a list of task messages that have exceeded the given deadline. // ListDeadlineExceeded returns a list of task messages that have exceeded the given deadline.
func (r *RDB) ListDeadlineExceeded(deadline time.Time) ([]*base.TaskMessage, error) { func (r *RDB) ListDeadlineExceeded(deadline time.Time) ([]*base.TaskMessage, error) {
var msgs []*base.TaskMessage var msgs []*base.TaskMessage

View File

@ -1161,82 +1161,120 @@ func TestCheckAndEnqueue(t *testing.T) {
t1 := h.NewTaskMessage("send_email", nil) t1 := h.NewTaskMessage("send_email", nil)
t2 := h.NewTaskMessage("generate_csv", nil) t2 := h.NewTaskMessage("generate_csv", nil)
t3 := h.NewTaskMessage("gen_thumbnail", nil) t3 := h.NewTaskMessage("gen_thumbnail", nil)
t4 := h.NewTaskMessage("important_task", nil) t4 := h.NewTaskMessageWithQueue("important_task", nil, "critical")
t4.Queue = "critical" t5 := h.NewTaskMessageWithQueue("minor_task", nil, "low")
t5 := h.NewTaskMessage("minor_task", nil)
t5.Queue = "low"
secondAgo := time.Now().Add(-time.Second) secondAgo := time.Now().Add(-time.Second)
hourFromNow := time.Now().Add(time.Hour) hourFromNow := time.Now().Add(time.Hour)
tests := []struct { tests := []struct {
scheduled []base.Z scheduled map[string][]base.Z
retry []base.Z retry map[string][]base.Z
qnames []string
wantEnqueued map[string][]*base.TaskMessage wantEnqueued map[string][]*base.TaskMessage
wantScheduled []*base.TaskMessage wantScheduled map[string][]*base.TaskMessage
wantRetry []*base.TaskMessage wantRetry map[string][]*base.TaskMessage
}{ }{
{ {
scheduled: []base.Z{ scheduled: map[string][]base.Z{
"default": {
{Message: t1, Score: secondAgo.Unix()}, {Message: t1, Score: secondAgo.Unix()},
{Message: t2, Score: secondAgo.Unix()}, {Message: t2, Score: secondAgo.Unix()},
}, },
retry: []base.Z{ },
{Message: t3, Score: secondAgo.Unix()}}, retry: map[string][]base.Z{
"default": {{Message: t3, Score: secondAgo.Unix()}},
},
qnames: []string{"default"},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1, t2, t3}, "default": {t1, t2, t3},
}, },
wantScheduled: []*base.TaskMessage{}, wantScheduled: map[string][]*base.TaskMessage{
wantRetry: []*base.TaskMessage{}, "default": {},
},
wantRetry: map[string][]*base.TaskMessage{
"default": {},
},
}, },
{ {
scheduled: []base.Z{ scheduled: map[string][]base.Z{
"default": {
{Message: t1, Score: hourFromNow.Unix()}, {Message: t1, Score: hourFromNow.Unix()},
{Message: t2, Score: secondAgo.Unix()}}, {Message: t2, Score: secondAgo.Unix()},
retry: []base.Z{ },
{Message: t3, Score: secondAgo.Unix()}}, },
retry: map[string][]base.Z{
"default": {{Message: t3, Score: secondAgo.Unix()}},
},
qnames: []string{"default"},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t2, t3}, "default": {t2, t3},
}, },
wantScheduled: []*base.TaskMessage{t1}, wantScheduled: map[string][]*base.TaskMessage{
wantRetry: []*base.TaskMessage{}, "default": {t1},
},
wantRetry: map[string][]*base.TaskMessage{
"default": {},
},
}, },
{ {
scheduled: []base.Z{ scheduled: map[string][]base.Z{
"default": {
{Message: t1, Score: hourFromNow.Unix()}, {Message: t1, Score: hourFromNow.Unix()},
{Message: t2, Score: hourFromNow.Unix()}}, {Message: t2, Score: hourFromNow.Unix()},
retry: []base.Z{ },
{Message: t3, Score: hourFromNow.Unix()}}, },
retry: map[string][]base.Z{
"default": {{Message: t3, Score: hourFromNow.Unix()}},
},
qnames: []string{"default"},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {}, "default": {},
}, },
wantScheduled: []*base.TaskMessage{t1, t2}, wantScheduled: map[string][]*base.TaskMessage{
wantRetry: []*base.TaskMessage{t3}, "default": {t1, t2},
},
wantRetry: map[string][]*base.TaskMessage{
"default": {t3},
},
}, },
{ {
scheduled: []base.Z{ scheduled: map[string][]base.Z{
{Message: t1, Score: secondAgo.Unix()}, "default": {{Message: t1, Score: secondAgo.Unix()}},
{Message: t4, Score: secondAgo.Unix()}, "critical": {{Message: t4, Score: secondAgo.Unix()}},
"low": {},
}, },
retry: []base.Z{ retry: map[string][]base.Z{
{Message: t5, Score: secondAgo.Unix()}}, "default": {},
"critical": {},
"low": {{Message: t5, Score: secondAgo.Unix()}},
},
qnames: []string{"default", "critical", "low"},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1}, "default": {t1},
"critical": {t4}, "critical": {t4},
"low": {t5}, "low": {t5},
}, },
wantScheduled: []*base.TaskMessage{}, wantScheduled: map[string][]*base.TaskMessage{
wantRetry: []*base.TaskMessage{}, "default": {},
"critical": {},
"low": {},
},
wantRetry: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
"low": {},
},
}, },
} }
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedScheduledQueue(t, r.client, tc.scheduled) h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedRetryQueue(t, r.client, tc.retry) h.SeedAllRetryQueues(t, r.client, tc.retry)
err := r.CheckAndEnqueue() err := r.CheckAndEnqueue(tc.qnames...)
if err != nil { if err != nil {
t.Errorf("(*RDB).CheckScheduled() = %v, want nil", err) t.Errorf("(*RDB).CheckScheduled(%v) = %v, want nil", tc.qnames, err)
continue continue
} }
@ -1246,15 +1284,17 @@ func TestCheckAndEnqueue(t *testing.T) {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
} }
} }
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledMessages(t, r.client) gotScheduled := h.GetScheduledMessages(t, r.client, qname)
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.SortMsgOpt); diff != "" { if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledQueue, diff) t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledKey(qname), diff)
}
}
for qname, want := range tc.wantRetry {
gotRetry := h.GetRetryMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotRetry, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryKey(qname), diff)
} }
gotRetry := h.GetRetryMessages(t, r.client)
if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff)
} }
} }
} }