From 0d74c518bfe928ad076c72c64f1b37d54619bcba Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 10 Dec 2019 20:28:31 -0800 Subject: [PATCH] Add methods to rdb to enqueue all tasks from dead, retry and scheduled queue --- asynq.go | 7 +- internal/rdb/inspect.go | 31 +++++++ internal/rdb/inspect_test.go | 153 +++++++++++++++++++++++++++++++++++ 3 files changed, 190 insertions(+), 1 deletion(-) diff --git a/asynq.go b/asynq.go index bed372e..30af831 100644 --- a/asynq.go +++ b/asynq.go @@ -5,13 +5,18 @@ import "github.com/go-redis/redis/v7" /* TODOs: - [P0] enqall command to enq all tasks from "scheduled" "retry", "dead" queue +- [P0] asynqmon del , asynqmon delall +- [P0] asynqmon kill , asynqmon killall +- [P0] Redis Memory Usage, Connection info in stats +- [P0] Processed, Failed count for today - [P0] Go docs + CONTRIBUTION.md + Github issue template +- [P0] Redis Sentinel support - [P1] Add Support for multiple queues and priority - [P1] User defined max-retry count */ // Max retry count by default -const defaultMaxRetry = 25 +const defaultMaxRetry = 1 // Task represents a task to be performed. type Task struct { diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 38984be..79e3674 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -272,6 +272,21 @@ func (r *RDB) EnqueueScheduledTask(id uuid.UUID, score int64) error { return nil } +// EnqueueAllScheduledTasks enqueues all tasks from scheduled queue. +func (r *RDB) EnqueueAllScheduledTasks() error { + return r.removeAndEnqueueAll(scheduledQ) +} + +// EnqueueAllRetryTasks enqueues all tasks from retry queue. +func (r *RDB) EnqueueAllRetryTasks() error { + return r.removeAndEnqueueAll(retryQ) +} + +// EnqueueAllDeadTasks enqueues all tasks from dead queue. +func (r *RDB) EnqueueAllDeadTasks() error { + return r.removeAndEnqueueAll(deadQ) +} + func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) { script := redis.NewScript(` local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) @@ -295,3 +310,19 @@ func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) { } return n, nil } + +func (r *RDB) removeAndEnqueueAll(zset string) error { + script := redis.NewScript(` + local msgs = redis.call("ZRANGE", KEYS[1], 0, -1) + for _, msg in ipairs(msgs) do + redis.call("ZREM", KEYS[1], msg) + redis.call("LPUSH", KEYS[2], msg) + end + return table.getn(msgs) + `) + _, err := script.Run(r.client, []string{zset, defaultQ}).Result() + if err != nil { + return err + } + return nil +} diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 33b6a39..eddf6ac 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -698,3 +698,156 @@ func TestEnqueueScheduledTask(t *testing.T) { } } } + +func TestEnqueueAllScheduledTasks(t *testing.T) { + r := setup(t) + t1 := randomTask("send_email", "default", nil) + t2 := randomTask("gen_thumbnail", "default", nil) + t3 := randomTask("reindex", "default", nil) + + tests := []struct { + description string + scheduled []*TaskMessage + wantEnqueued []*TaskMessage + }{ + { + description: "with tasks in scheduled queue", + scheduled: []*TaskMessage{t1, t2, t3}, + wantEnqueued: []*TaskMessage{t1, t2, t3}, + }, + { + description: "with empty scheduled queue", + scheduled: []*TaskMessage{}, + wantEnqueued: []*TaskMessage{}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + // initialize scheduled queue + for _, msg := range tc.scheduled { + err := r.client.ZAdd(scheduledQ, &redis.Z{ + Member: mustMarshal(t, msg), + Score: float64(time.Now().Add(time.Hour).Unix())}).Err() + if err != nil { + t.Fatal(err) + } + } + + err := r.EnqueueAllScheduledTasks() + if err != nil { + t.Errorf("%s; r.EnqueueAllScheduledTasks = %v, want nil", tc.description, err) + } + + gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() + gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) + if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { + t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.description, defaultQ, diff) + } + } +} + +func TestEnqueueAllRetryTasks(t *testing.T) { + r := setup(t) + t1 := randomTask("send_email", "default", nil) + t2 := randomTask("gen_thumbnail", "default", nil) + t3 := randomTask("reindex", "default", nil) + + tests := []struct { + description string + retry []*TaskMessage + wantEnqueued []*TaskMessage + }{ + { + description: "with tasks in retry queue", + retry: []*TaskMessage{t1, t2, t3}, + wantEnqueued: []*TaskMessage{t1, t2, t3}, + }, + { + description: "with empty retry queue", + retry: []*TaskMessage{}, + wantEnqueued: []*TaskMessage{}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + // initialize retry queue + for _, msg := range tc.retry { + err := r.client.ZAdd(retryQ, &redis.Z{ + Member: mustMarshal(t, msg), + Score: float64(time.Now().Add(time.Hour).Unix())}).Err() + if err != nil { + t.Fatal(err) + } + } + + err := r.EnqueueAllRetryTasks() + if err != nil { + t.Errorf("%s; r.EnqueueAllRetryTasks = %v, want nil", tc.description, err) + } + + gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() + gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) + if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { + t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.description, defaultQ, diff) + } + } +} + +func TestEnqueueAllDeadTasks(t *testing.T) { + r := setup(t) + t1 := randomTask("send_email", "default", nil) + t2 := randomTask("gen_thumbnail", "default", nil) + t3 := randomTask("reindex", "default", nil) + + tests := []struct { + description string + dead []*TaskMessage + wantEnqueued []*TaskMessage + }{ + { + description: "with tasks in dead queue", + dead: []*TaskMessage{t1, t2, t3}, + wantEnqueued: []*TaskMessage{t1, t2, t3}, + }, + { + description: "with empty dead queue", + dead: []*TaskMessage{}, + wantEnqueued: []*TaskMessage{}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + // initialize dead queue + for _, msg := range tc.dead { + err := r.client.ZAdd(deadQ, &redis.Z{ + Member: mustMarshal(t, msg), + Score: float64(time.Now().Add(time.Hour).Unix())}).Err() + if err != nil { + t.Fatal(err) + } + } + + err := r.EnqueueAllDeadTasks() + if err != nil { + t.Errorf("%s; r.EnqueueAllDeadTasks = %v, want nil", tc.description, err) + } + + gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val() + gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) + if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { + t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.description, defaultQ, diff) + } + } +}