From 0f09f936a9a5407fe7d477eceeed51a7dab8b9e2 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 26 Feb 2021 06:40:28 -0800 Subject: [PATCH] Update RDB.RunArchivedTask, RDB.RunRetryTask, RDB.ScheduledTask --- internal/rdb/inspect.go | 34 +++++++++++++++++----------------- internal/rdb/inspect_test.go | 34 +++++++++++----------------------- 2 files changed, 28 insertions(+), 40 deletions(-) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index a9e84f4..1aa6c58 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -431,8 +431,8 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro // RunArchivedTask finds an archived task that matches the given id and score from // the given queue and enqueues it for processing. // If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error { - n, err := r.removeAndRun(base.ArchivedKey(qname), base.PendingKey(qname), id.String(), float64(score)) +func (r *RDB) RunArchivedTask(qname string, id uuid.UUID) error { + n, err := r.removeAndRun(base.ArchivedKey(qname), base.PendingKey(qname), id.String()) if err != nil { return err } @@ -445,8 +445,8 @@ func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error { // RunRetryTask finds a retry task that matches the given id and score from // the given queue and enqueues it for processing. // If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error { - n, err := r.removeAndRun(base.RetryKey(qname), base.PendingKey(qname), id.String(), float64(score)) +func (r *RDB) RunRetryTask(qname string, id uuid.UUID) error { + n, err := r.removeAndRun(base.RetryKey(qname), base.PendingKey(qname), id.String()) if err != nil { return err } @@ -459,8 +459,8 @@ func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error { // RunScheduledTask finds a scheduled task that matches the given id and score from // from the given queue and enqueues it for processing. // If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) RunScheduledTask(qname string, id uuid.UUID, score int64) error { - n, err := r.removeAndRun(base.ScheduledKey(qname), base.PendingKey(qname), id.String(), float64(score)) +func (r *RDB) RunScheduledTask(qname string, id uuid.UUID) error { + n, err := r.removeAndRun(base.ScheduledKey(qname), base.PendingKey(qname), id.String()) if err != nil { return err } @@ -488,20 +488,20 @@ func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) { return r.removeAndRunAll(base.ArchivedKey(qname), base.PendingKey(qname)) } +// KEYS[1] -> sorted set to remove the id from +// KEYS[2] -> asynq:{}:pending +// ARGV[1] -> task ID var removeAndRunCmd = redis.NewScript(` -local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) -for _, msg in ipairs(msgs) do - local decoded = cjson.decode(msg) - if decoded["ID"] == ARGV[2] then - redis.call("LPUSH", KEYS[2], msg) - redis.call("ZREM", KEYS[1], msg) - return 1 + local n = redis.call("ZREM", KEYS[1], ARGV[1]) + if n == 0 then + return 0 end -end -return 0`) + redis.call("LPUSH", KEYS[2], ARGV[1]) + return 1 +`) -func (r *RDB) removeAndRun(zset, qkey, id string, score float64) (int64, error) { - res, err := removeAndRunCmd.Run(r.client, []string{zset, qkey}, score, id).Result() +func (r *RDB) removeAndRun(zset, qkey, id string) (int64, error) { + res, err := removeAndRunCmd.Run(r.client, []string{zset, qkey}, id).Result() if err != nil { return 0, err } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index a4ab7ac..9abe434 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -996,7 +996,7 @@ var ( zScoreCmpOpt = h.EquateInt64Approx(2) // allow for 2 seconds margin in Z.Score ) -func TestRunDeadTask(t *testing.T) { +func TestRunArchivedTask(t *testing.T) { r := setup(t) defer r.Close() t1 := h.NewTaskMessage("send_email", nil) @@ -1008,9 +1008,8 @@ func TestRunDeadTask(t *testing.T) { tests := []struct { archived map[string][]base.Z qname string - score int64 id uuid.UUID - want error // expected return value from calling RunDeadTask + want error // expected return value from calling RunArchivedTask wantArchived map[string][]*base.TaskMessage wantPending map[string][]*base.TaskMessage }{ @@ -1022,7 +1021,6 @@ func TestRunDeadTask(t *testing.T) { }, }, qname: "default", - score: s2, id: t2.ID, want: nil, wantArchived: map[string][]*base.TaskMessage{ @@ -1040,8 +1038,7 @@ func TestRunDeadTask(t *testing.T) { }, }, qname: "default", - score: 123, - id: t2.ID, + id: uuid.New(), want: ErrTaskNotFound, wantArchived: map[string][]*base.TaskMessage{ "default": {t1, t2}, @@ -1061,7 +1058,6 @@ func TestRunDeadTask(t *testing.T) { }, }, qname: "critical", - score: s1, id: t3.ID, want: nil, wantArchived: map[string][]*base.TaskMessage{ @@ -1079,9 +1075,9 @@ func TestRunDeadTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllArchivedQueues(t, r.client, tc.archived) - got := r.RunArchivedTask(tc.qname, tc.id, tc.score) + got := r.RunArchivedTask(tc.qname, tc.id) if got != tc.want { - t.Errorf("r.RunDeadTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) + t.Errorf("r.RunDeadTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want) continue } @@ -1113,7 +1109,6 @@ func TestRunRetryTask(t *testing.T) { tests := []struct { retry map[string][]base.Z qname string - score int64 id uuid.UUID want error // expected return value from calling RunRetryTask wantRetry map[string][]*base.TaskMessage @@ -1127,7 +1122,6 @@ func TestRunRetryTask(t *testing.T) { }, }, qname: "default", - score: s2, id: t2.ID, want: nil, wantRetry: map[string][]*base.TaskMessage{ @@ -1145,8 +1139,7 @@ func TestRunRetryTask(t *testing.T) { }, }, qname: "default", - score: 123, - id: t2.ID, + id: uuid.New(), want: ErrTaskNotFound, wantRetry: map[string][]*base.TaskMessage{ "default": {t1, t2}, @@ -1166,7 +1159,6 @@ func TestRunRetryTask(t *testing.T) { }, }, qname: "low", - score: s2, id: t3.ID, want: nil, wantRetry: map[string][]*base.TaskMessage{ @@ -1184,9 +1176,9 @@ func TestRunRetryTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllRetryQueues(t, r.client, tc.retry) // initialize retry queue - got := r.RunRetryTask(tc.qname, tc.id, tc.score) + got := r.RunRetryTask(tc.qname, tc.id) if got != tc.want { - t.Errorf("r.RunRetryTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) + t.Errorf("r.RunRetryTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want) continue } @@ -1218,7 +1210,6 @@ func TestRunScheduledTask(t *testing.T) { tests := []struct { scheduled map[string][]base.Z qname string - score int64 id uuid.UUID want error // expected return value from calling RunScheduledTask wantScheduled map[string][]*base.TaskMessage @@ -1232,7 +1223,6 @@ func TestRunScheduledTask(t *testing.T) { }, }, qname: "default", - score: s2, id: t2.ID, want: nil, wantScheduled: map[string][]*base.TaskMessage{ @@ -1250,8 +1240,7 @@ func TestRunScheduledTask(t *testing.T) { }, }, qname: "default", - score: 123, - id: t2.ID, + id: uuid.New(), want: ErrTaskNotFound, wantScheduled: map[string][]*base.TaskMessage{ "default": {t1, t2}, @@ -1271,7 +1260,6 @@ func TestRunScheduledTask(t *testing.T) { }, }, qname: "notifications", - score: s1, id: t3.ID, want: nil, wantScheduled: map[string][]*base.TaskMessage{ @@ -1289,9 +1277,9 @@ func TestRunScheduledTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllScheduledQueues(t, r.client, tc.scheduled) - got := r.RunScheduledTask(tc.qname, tc.id, tc.score) + got := r.RunScheduledTask(tc.qname, tc.id) if got != tc.want { - t.Errorf("r.RunRetryTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) + t.Errorf("r.RunRetryTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want) continue }