From 267493ccefe02abcacd369c459fe9850f5f49416 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 28 Apr 2021 07:27:35 -0700 Subject: [PATCH] Update RDB.RunTask with task state --- inspeq/inspector.go | 6 ++-- internal/base/base.go | 27 ++++++++------ internal/rdb/inspect.go | 70 ++++++++++++++++-------------------- internal/rdb/inspect_test.go | 12 +++---- 4 files changed, 55 insertions(+), 60 deletions(-) diff --git a/inspeq/inspector.go b/inspeq/inspector.go index 49a99c8..cf622bc 100644 --- a/inspeq/inspector.go +++ b/inspeq/inspector.go @@ -613,11 +613,11 @@ func (i *Inspector) RunTaskByKey(qname, key string) error { } switch prefix { case keyPrefixScheduled: - return i.rdb.RunScheduledTask(qname, id) + return i.rdb.RunTask(qname, id) case keyPrefixRetry: - return i.rdb.RunRetryTask(qname, id) + return i.rdb.RunTask(qname, id) case keyPrefixArchived: - return i.rdb.RunArchivedTask(qname, id) + return i.rdb.RunTask(qname, id) case keyPrefixPending: return fmt.Errorf("task is already pending for run") default: diff --git a/internal/base/base.go b/internal/base/base.go index 3a51f44..db9c145 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -45,9 +45,14 @@ func ValidateQueueName(qname string) error { return nil } +// QueueKeyPrefix returns a prefix for all keys in the given queue. +func QueueKeyPrefix(qname string) string { + return fmt.Sprintf("asynq:{%s}:", qname) +} + // TaskKeyPrefix returns a prefix for task key. func TaskKeyPrefix(qname string) string { - return fmt.Sprintf("asynq:{%s}:t:", qname) + return fmt.Sprintf("%st:", QueueKeyPrefix(qname)) } // TaskKey returns a redis key for the given task message. @@ -57,47 +62,47 @@ func TaskKey(qname, id string) string { // PendingKey returns a redis key for the given queue name. func PendingKey(qname string) string { - return fmt.Sprintf("asynq:{%s}:pending", qname) + return fmt.Sprintf("%spending", QueueKeyPrefix(qname)) } // ActiveKey returns a redis key for the active tasks. func ActiveKey(qname string) string { - return fmt.Sprintf("asynq:{%s}:active", qname) + return fmt.Sprintf("%sactive", QueueKeyPrefix(qname)) } // ScheduledKey returns a redis key for the scheduled tasks. func ScheduledKey(qname string) string { - return fmt.Sprintf("asynq:{%s}:scheduled", qname) + return fmt.Sprintf("%sscheduled", QueueKeyPrefix(qname)) } // RetryKey returns a redis key for the retry tasks. func RetryKey(qname string) string { - return fmt.Sprintf("asynq:{%s}:retry", qname) + return fmt.Sprintf("%sretry", QueueKeyPrefix(qname)) } // ArchivedKey returns a redis key for the archived tasks. func ArchivedKey(qname string) string { - return fmt.Sprintf("asynq:{%s}:archived", qname) + return fmt.Sprintf("%sarchived", QueueKeyPrefix(qname)) } // DeadlinesKey returns a redis key for the deadlines. func DeadlinesKey(qname string) string { - return fmt.Sprintf("asynq:{%s}:deadlines", qname) + return fmt.Sprintf("%sdeadlines", QueueKeyPrefix(qname)) } // PausedKey returns a redis key to indicate that the given queue is paused. func PausedKey(qname string) string { - return fmt.Sprintf("asynq:{%s}:paused", qname) + return fmt.Sprintf("%spaused", QueueKeyPrefix(qname)) } // ProcessedKey returns a redis key for processed count for the given day for the queue. func ProcessedKey(qname string, t time.Time) string { - return fmt.Sprintf("asynq:{%s}:processed:%s", qname, t.UTC().Format("2006-01-02")) + return fmt.Sprintf("%sprocessed:%s", QueueKeyPrefix(qname), t.UTC().Format("2006-01-02")) } // FailedKey returns a redis key for failure count for the given day for the queue. func FailedKey(qname string, t time.Time) string { - return fmt.Sprintf("asynq:{%s}:failed:%s", qname, t.UTC().Format("2006-01-02")) + return fmt.Sprintf("%sfailed:%s", QueueKeyPrefix(qname), t.UTC().Format("2006-01-02")) } // ServerInfoKey returns a redis key for process info. @@ -122,7 +127,7 @@ func SchedulerHistoryKey(entryID string) string { // UniqueKey returns a redis key with the given type, payload, and queue name. func UniqueKey(qname, tasktype string, payload []byte) string { - return fmt.Sprintf("asynq:{%s}:unique:%s:%s", qname, tasktype, string(payload)) + return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, string(payload)) } // TaskMessage is the internal representation of a task with additional metadata fields. diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 8f5bc27..45017a7 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -439,39 +439,10 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro return zs, nil } -// RunArchivedTask finds an archived task that matches the given id and score from -// the given queue and enqueues it for processing. -// If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) RunArchivedTask(qname string, id uuid.UUID) error { - n, err := r.removeAndRun(base.ArchivedKey(qname), base.PendingKey(qname), id.String()) - if err != nil { - return err - } - if n == 0 { - return ErrTaskNotFound - } - return nil -} - -// RunRetryTask finds a retry task that matches the given id and score from -// the given queue and enqueues it for processing. -// If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) RunRetryTask(qname string, id uuid.UUID) error { - n, err := r.removeAndRun(base.RetryKey(qname), base.PendingKey(qname), id.String()) - if err != nil { - return err - } - if n == 0 { - return ErrTaskNotFound - } - return nil -} - -// RunScheduledTask finds a scheduled task that matches the given id and score from -// from the given queue and enqueues it for processing. -// If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) RunScheduledTask(qname string, id uuid.UUID) error { - n, err := r.removeAndRun(base.ScheduledKey(qname), base.PendingKey(qname), id.String()) +// RunTask finds a task that matches the id from the given queue and stages it for processing. +// If a task that matches the id does not exist, it returns ErrTaskNotFound. +func (r *RDB) RunTask(qname string, id uuid.UUID) error { + n, err := r.runTask(qname, id.String()) if err != nil { return err } @@ -499,26 +470,45 @@ func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) { return r.removeAndRunAll(base.ArchivedKey(qname), base.PendingKey(qname)) } -// KEYS[1] -> sorted set to remove the id from +// KEYS[1] -> asynq:{}:t: // KEYS[2] -> asynq:{}:pending // ARGV[1] -> task ID -var removeAndRunCmd = redis.NewScript(` -local n = redis.call("ZREM", KEYS[1], ARGV[1]) -if n == 0 then +// ARGV[2] -> queue key prefix; asynq:{}: +var runTaskCmd = redis.NewScript(` +if redis.call("EXISTS", KEYS[1]) == 0 then return 0 end +local state = redis.call("HGET", KEYS[1], "state") +if state == "active" then + return redis.error_reply("task is already running") +elseif state == "pending" then + return redis.error_reply("task is already pending to be run") +end +local n = redis.call("ZREM", ARGV[2] .. state, ARGV[1]) +if n == 0 then + return redis.error_reply("internal error: task id not found in zset " .. tostring(state)) +end redis.call("LPUSH", KEYS[2], ARGV[1]) +redis.call("HSET", KEYS[1], "state", "pending") return 1 `) -func (r *RDB) removeAndRun(zset, qkey, id string) (int64, error) { - res, err := removeAndRunCmd.Run(r.client, []string{zset, qkey}, id).Result() +func (r *RDB) runTask(qname, id string) (int64, error) { + keys := []string{ + base.TaskKey(qname, id), + base.PendingKey(qname), + } + argv := []interface{}{ + id, + base.QueueKeyPrefix(qname), + } + res, err := runTaskCmd.Run(r.client, keys, argv...).Result() if err != nil { return 0, err } n, ok := res.(int64) if !ok { - return 0, fmt.Errorf("could not cast %v to int64", res) + return 0, fmt.Errorf("internal error: could not cast %v to int64", res) } return n, nil } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 75cca75..75c8fcc 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -1075,9 +1075,9 @@ func TestRunArchivedTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllArchivedQueues(t, r.client, tc.archived) - got := r.RunArchivedTask(tc.qname, tc.id) + got := r.RunTask(tc.qname, tc.id) if got != tc.want { - t.Errorf("r.RunDeadTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want) + t.Errorf("r.RunTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want) continue } @@ -1176,9 +1176,9 @@ func TestRunRetryTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllRetryQueues(t, r.client, tc.retry) // initialize retry queue - got := r.RunRetryTask(tc.qname, tc.id) + got := r.RunTask(tc.qname, tc.id) if got != tc.want { - t.Errorf("r.RunRetryTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want) + t.Errorf("r.RunTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want) continue } @@ -1277,9 +1277,9 @@ func TestRunScheduledTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllScheduledQueues(t, r.client, tc.scheduled) - got := r.RunScheduledTask(tc.qname, tc.id) + got := r.RunTask(tc.qname, tc.id) if got != tc.want { - t.Errorf("r.RunRetryTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want) + t.Errorf("r.RunTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want) continue }