From be746d01a5fbb34d741baeefdeccd27ccea2fa31 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 11 Dec 2019 19:56:19 -0800 Subject: [PATCH] Add delete methods to rdb --- asynq.go | 1 + internal/rdb/inspect.go | 47 ++++++++ internal/rdb/inspect_test.go | 215 +++++++++++++++++++++++++++++++++++ 3 files changed, 263 insertions(+) diff --git a/asynq.go b/asynq.go index daa2f76..0ae9833 100644 --- a/asynq.go +++ b/asynq.go @@ -6,6 +6,7 @@ import "github.com/go-redis/redis/v7" TODOs: - [P0] asynqmon del , asynqmon delall - [P0] asynqmon kill , asynqmon killall +- [P0] Test refactor - helpers to initialize queues and read queue contents - [P0] Redis Memory Usage, Connection info in stats - [P0] Processed, Failed count for today - [P0] Go docs + CONTRIBUTION.md + Github issue template diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 72557e5..0d0dba3 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -333,3 +333,50 @@ func (r *RDB) removeAndEnqueueAll(zset string) (int64, error) { } return n, nil } + +// DeleteDeadTask finds a task that matches the given id and score from dead queue +// and deletes it. If a task that matches the id and score does not exist, +// it returns ErrTaskNotFound. +func (r *RDB) DeleteDeadTask(id xid.ID, score int64) error { + return r.deleteTask(deadQ, id.String(), float64(score)) +} + +// DeleteRetryTask finds a task that matches the given id and score from retry queue +// and deletes it. If a task that matches the id and score does not exist, +// it returns ErrTaskNotFound. +func (r *RDB) DeleteRetryTask(id xid.ID, score int64) error { + return r.deleteTask(retryQ, id.String(), float64(score)) +} + +// DeleteScheduledTask finds a task that matches the given id and score from +// scheduled queue and deletes it. If a task that matches the id and score +//does not exist, it returns ErrTaskNotFound. +func (r *RDB) DeleteScheduledTask(id xid.ID, score int64) error { + return r.deleteTask(scheduledQ, id.String(), float64(score)) +} + +func (r *RDB) deleteTask(zset, id string, score float64) error { + script := redis.NewScript(` + local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) + for _, msg in ipairs(msgs) do + local decoded = cjson.decode(msg) + if decoded["ID"] == ARGV[2] then + redis.call("ZREM", KEYS[1], msg) + return 1 + end + end + return 0 + `) + res, err := script.Run(r.client, []string{zset}, score, id).Result() + if err != nil { + return err + } + n, ok := res.(int64) + if !ok { + return fmt.Errorf("could not cast %v to int64", res) + } + if n == 0 { + return ErrTaskNotFound + } + return nil +} diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 295ed33..348951a 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -881,3 +881,218 @@ func TestEnqueueAllDeadTasks(t *testing.T) { } } } + +func TestDeleteDeadTask(t *testing.T) { + r := setup(t) + m1 := randomTask("send_email", "default", nil) + m2 := randomTask("reindex", "default", nil) + t1 := time.Now().Add(-5 * time.Minute) + t2 := time.Now().Add(-time.Hour) + + type deadEntry struct { + msg *TaskMessage + score int64 + } + tests := []struct { + dead []deadEntry + id xid.ID + score int64 + want error + wantDead []*TaskMessage + }{ + { + dead: []deadEntry{ + {m1, t1.Unix()}, + {m2, t2.Unix()}, + }, + id: m1.ID, + score: t1.Unix(), + want: nil, + wantDead: []*TaskMessage{m2}, + }, + { + dead: []deadEntry{ + {m1, t1.Unix()}, + {m2, t2.Unix()}, + }, + id: m1.ID, + score: t2.Unix(), // id and score mismatch + want: ErrTaskNotFound, + wantDead: []*TaskMessage{m1, m2}, + }, + { + dead: []deadEntry{}, + id: m1.ID, + score: t1.Unix(), + want: ErrTaskNotFound, + wantDead: []*TaskMessage{}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + // initialize dead queue. + for _, d := range tc.dead { + err := r.client.ZAdd(deadQ, &redis.Z{ + Member: mustMarshal(t, d.msg), + Score: float64(d.score), + }).Err() + if err != nil { + t.Fatal(err) + } + } + + got := r.DeleteDeadTask(tc.id, tc.score) + if got != tc.want { + t.Errorf("r.DeleteDeadTask(%v, %v) = %v, want %v", tc.id, tc.score, got, tc.want) + continue + } + + gotDeadRaw := r.client.ZRange(deadQ, 0, -1).Val() + gotDead := mustUnmarshalSlice(t, gotDeadRaw) + if diff := cmp.Diff(tc.wantDead, gotDead, sortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", deadQ, diff) + } + } +} + +func TestDeleteRetryTask(t *testing.T) { + r := setup(t) + m1 := randomTask("send_email", "default", nil) + m2 := randomTask("reindex", "default", nil) + t1 := time.Now().Add(5 * time.Minute) + t2 := time.Now().Add(time.Hour) + + type retryEntry struct { + msg *TaskMessage + score int64 + } + tests := []struct { + retry []retryEntry + id xid.ID + score int64 + want error + wantRetry []*TaskMessage + }{ + { + retry: []retryEntry{ + {m1, t1.Unix()}, + {m2, t2.Unix()}, + }, + id: m1.ID, + score: t1.Unix(), + want: nil, + wantRetry: []*TaskMessage{m2}, + }, + { + retry: []retryEntry{ + {m1, t1.Unix()}, + }, + id: m2.ID, + score: t2.Unix(), + want: ErrTaskNotFound, + wantRetry: []*TaskMessage{m1}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + // initialize retry queue. + for _, e := range tc.retry { + err := r.client.ZAdd(retryQ, &redis.Z{ + Member: mustMarshal(t, e.msg), + Score: float64(e.score), + }).Err() + if err != nil { + t.Fatal(err) + } + } + + got := r.DeleteRetryTask(tc.id, tc.score) + if got != tc.want { + t.Errorf("r.DeleteRetryTask(%v, %v) = %v, want %v", tc.id, tc.score, got, tc.want) + continue + } + + gotRetryRaw := r.client.ZRange(retryQ, 0, -1).Val() + gotRetry := mustUnmarshalSlice(t, gotRetryRaw) + if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", retryQ, diff) + } + } +} + +func TestDeleteScheduledTask(t *testing.T) { + r := setup(t) + m1 := randomTask("send_email", "default", nil) + m2 := randomTask("reindex", "default", nil) + t1 := time.Now().Add(5 * time.Minute) + t2 := time.Now().Add(time.Hour) + + type scheduledEntry struct { + msg *TaskMessage + score int64 + } + tests := []struct { + scheduled []scheduledEntry + id xid.ID + score int64 + want error + wantScheduled []*TaskMessage + }{ + { + scheduled: []scheduledEntry{ + {m1, t1.Unix()}, + {m2, t2.Unix()}, + }, + id: m1.ID, + score: t1.Unix(), + want: nil, + wantScheduled: []*TaskMessage{m2}, + }, + { + scheduled: []scheduledEntry{ + {m1, t1.Unix()}, + }, + id: m2.ID, + score: t2.Unix(), + want: ErrTaskNotFound, + wantScheduled: []*TaskMessage{m1}, + }, + } + + for _, tc := range tests { + // clean up db before each test case. + if err := r.client.FlushDB().Err(); err != nil { + t.Fatal(err) + } + // initialize scheduled queue. + for _, e := range tc.scheduled { + err := r.client.ZAdd(scheduledQ, &redis.Z{ + Member: mustMarshal(t, e.msg), + Score: float64(e.score), + }).Err() + if err != nil { + t.Fatal(err) + } + } + + got := r.DeleteScheduledTask(tc.id, tc.score) + if got != tc.want { + t.Errorf("r.DeleteScheduledTask(%v, %v) = %v, want %v", tc.id, tc.score, got, tc.want) + continue + } + + gotScheduledRaw := r.client.ZRange(scheduledQ, 0, -1).Val() + gotScheduled := mustUnmarshalSlice(t, gotScheduledRaw) + if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", scheduledQ, diff) + } + } +}