mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-27 00:02:19 +08:00
Update RDB.RunTask with task state
This commit is contained in:
parent
5d7f1b6a80
commit
267493ccef
@ -613,11 +613,11 @@ func (i *Inspector) RunTaskByKey(qname, key string) error {
|
|||||||
}
|
}
|
||||||
switch prefix {
|
switch prefix {
|
||||||
case keyPrefixScheduled:
|
case keyPrefixScheduled:
|
||||||
return i.rdb.RunScheduledTask(qname, id)
|
return i.rdb.RunTask(qname, id)
|
||||||
case keyPrefixRetry:
|
case keyPrefixRetry:
|
||||||
return i.rdb.RunRetryTask(qname, id)
|
return i.rdb.RunTask(qname, id)
|
||||||
case keyPrefixArchived:
|
case keyPrefixArchived:
|
||||||
return i.rdb.RunArchivedTask(qname, id)
|
return i.rdb.RunTask(qname, id)
|
||||||
case keyPrefixPending:
|
case keyPrefixPending:
|
||||||
return fmt.Errorf("task is already pending for run")
|
return fmt.Errorf("task is already pending for run")
|
||||||
default:
|
default:
|
||||||
|
@ -45,9 +45,14 @@ func ValidateQueueName(qname string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// QueueKeyPrefix returns a prefix for all keys in the given queue.
|
||||||
|
func QueueKeyPrefix(qname string) string {
|
||||||
|
return fmt.Sprintf("asynq:{%s}:", qname)
|
||||||
|
}
|
||||||
|
|
||||||
// TaskKeyPrefix returns a prefix for task key.
|
// TaskKeyPrefix returns a prefix for task key.
|
||||||
func TaskKeyPrefix(qname string) string {
|
func TaskKeyPrefix(qname string) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:t:", qname)
|
return fmt.Sprintf("%st:", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// TaskKey returns a redis key for the given task message.
|
// TaskKey returns a redis key for the given task message.
|
||||||
@ -57,47 +62,47 @@ func TaskKey(qname, id string) string {
|
|||||||
|
|
||||||
// PendingKey returns a redis key for the given queue name.
|
// PendingKey returns a redis key for the given queue name.
|
||||||
func PendingKey(qname string) string {
|
func PendingKey(qname string) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:pending", qname)
|
return fmt.Sprintf("%spending", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ActiveKey returns a redis key for the active tasks.
|
// ActiveKey returns a redis key for the active tasks.
|
||||||
func ActiveKey(qname string) string {
|
func ActiveKey(qname string) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:active", qname)
|
return fmt.Sprintf("%sactive", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ScheduledKey returns a redis key for the scheduled tasks.
|
// ScheduledKey returns a redis key for the scheduled tasks.
|
||||||
func ScheduledKey(qname string) string {
|
func ScheduledKey(qname string) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:scheduled", qname)
|
return fmt.Sprintf("%sscheduled", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// RetryKey returns a redis key for the retry tasks.
|
// RetryKey returns a redis key for the retry tasks.
|
||||||
func RetryKey(qname string) string {
|
func RetryKey(qname string) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:retry", qname)
|
return fmt.Sprintf("%sretry", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ArchivedKey returns a redis key for the archived tasks.
|
// ArchivedKey returns a redis key for the archived tasks.
|
||||||
func ArchivedKey(qname string) string {
|
func ArchivedKey(qname string) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:archived", qname)
|
return fmt.Sprintf("%sarchived", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeadlinesKey returns a redis key for the deadlines.
|
// DeadlinesKey returns a redis key for the deadlines.
|
||||||
func DeadlinesKey(qname string) string {
|
func DeadlinesKey(qname string) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:deadlines", qname)
|
return fmt.Sprintf("%sdeadlines", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// PausedKey returns a redis key to indicate that the given queue is paused.
|
// PausedKey returns a redis key to indicate that the given queue is paused.
|
||||||
func PausedKey(qname string) string {
|
func PausedKey(qname string) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:paused", qname)
|
return fmt.Sprintf("%spaused", QueueKeyPrefix(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProcessedKey returns a redis key for processed count for the given day for the queue.
|
// ProcessedKey returns a redis key for processed count for the given day for the queue.
|
||||||
func ProcessedKey(qname string, t time.Time) string {
|
func ProcessedKey(qname string, t time.Time) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:processed:%s", qname, t.UTC().Format("2006-01-02"))
|
return fmt.Sprintf("%sprocessed:%s", QueueKeyPrefix(qname), t.UTC().Format("2006-01-02"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// FailedKey returns a redis key for failure count for the given day for the queue.
|
// FailedKey returns a redis key for failure count for the given day for the queue.
|
||||||
func FailedKey(qname string, t time.Time) string {
|
func FailedKey(qname string, t time.Time) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:failed:%s", qname, t.UTC().Format("2006-01-02"))
|
return fmt.Sprintf("%sfailed:%s", QueueKeyPrefix(qname), t.UTC().Format("2006-01-02"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServerInfoKey returns a redis key for process info.
|
// ServerInfoKey returns a redis key for process info.
|
||||||
@ -122,7 +127,7 @@ func SchedulerHistoryKey(entryID string) string {
|
|||||||
|
|
||||||
// UniqueKey returns a redis key with the given type, payload, and queue name.
|
// UniqueKey returns a redis key with the given type, payload, and queue name.
|
||||||
func UniqueKey(qname, tasktype string, payload []byte) string {
|
func UniqueKey(qname, tasktype string, payload []byte) string {
|
||||||
return fmt.Sprintf("asynq:{%s}:unique:%s:%s", qname, tasktype, string(payload))
|
return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, string(payload))
|
||||||
}
|
}
|
||||||
|
|
||||||
// TaskMessage is the internal representation of a task with additional metadata fields.
|
// TaskMessage is the internal representation of a task with additional metadata fields.
|
||||||
|
@ -439,39 +439,10 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, erro
|
|||||||
return zs, nil
|
return zs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunArchivedTask finds an archived task that matches the given id and score from
|
// RunTask finds a task that matches the id from the given queue and stages it for processing.
|
||||||
// the given queue and enqueues it for processing.
|
// If a task that matches the id does not exist, it returns ErrTaskNotFound.
|
||||||
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
|
func (r *RDB) RunTask(qname string, id uuid.UUID) error {
|
||||||
func (r *RDB) RunArchivedTask(qname string, id uuid.UUID) error {
|
n, err := r.runTask(qname, id.String())
|
||||||
n, err := r.removeAndRun(base.ArchivedKey(qname), base.PendingKey(qname), id.String())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if n == 0 {
|
|
||||||
return ErrTaskNotFound
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RunRetryTask finds a retry task that matches the given id and score from
|
|
||||||
// the given queue and enqueues it for processing.
|
|
||||||
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
|
|
||||||
func (r *RDB) RunRetryTask(qname string, id uuid.UUID) error {
|
|
||||||
n, err := r.removeAndRun(base.RetryKey(qname), base.PendingKey(qname), id.String())
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if n == 0 {
|
|
||||||
return ErrTaskNotFound
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// RunScheduledTask finds a scheduled task that matches the given id and score from
|
|
||||||
// from the given queue and enqueues it for processing.
|
|
||||||
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
|
|
||||||
func (r *RDB) RunScheduledTask(qname string, id uuid.UUID) error {
|
|
||||||
n, err := r.removeAndRun(base.ScheduledKey(qname), base.PendingKey(qname), id.String())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -499,26 +470,45 @@ func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) {
|
|||||||
return r.removeAndRunAll(base.ArchivedKey(qname), base.PendingKey(qname))
|
return r.removeAndRunAll(base.ArchivedKey(qname), base.PendingKey(qname))
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> sorted set to remove the id from
|
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
|
||||||
// KEYS[2] -> asynq:{<qname>}:pending
|
// KEYS[2] -> asynq:{<qname>}:pending
|
||||||
// ARGV[1] -> task ID
|
// ARGV[1] -> task ID
|
||||||
var removeAndRunCmd = redis.NewScript(`
|
// ARGV[2] -> queue key prefix; asynq:{<qname>}:
|
||||||
local n = redis.call("ZREM", KEYS[1], ARGV[1])
|
var runTaskCmd = redis.NewScript(`
|
||||||
if n == 0 then
|
if redis.call("EXISTS", KEYS[1]) == 0 then
|
||||||
return 0
|
return 0
|
||||||
end
|
end
|
||||||
|
local state = redis.call("HGET", KEYS[1], "state")
|
||||||
|
if state == "active" then
|
||||||
|
return redis.error_reply("task is already running")
|
||||||
|
elseif state == "pending" then
|
||||||
|
return redis.error_reply("task is already pending to be run")
|
||||||
|
end
|
||||||
|
local n = redis.call("ZREM", ARGV[2] .. state, ARGV[1])
|
||||||
|
if n == 0 then
|
||||||
|
return redis.error_reply("internal error: task id not found in zset " .. tostring(state))
|
||||||
|
end
|
||||||
redis.call("LPUSH", KEYS[2], ARGV[1])
|
redis.call("LPUSH", KEYS[2], ARGV[1])
|
||||||
|
redis.call("HSET", KEYS[1], "state", "pending")
|
||||||
return 1
|
return 1
|
||||||
`)
|
`)
|
||||||
|
|
||||||
func (r *RDB) removeAndRun(zset, qkey, id string) (int64, error) {
|
func (r *RDB) runTask(qname, id string) (int64, error) {
|
||||||
res, err := removeAndRunCmd.Run(r.client, []string{zset, qkey}, id).Result()
|
keys := []string{
|
||||||
|
base.TaskKey(qname, id),
|
||||||
|
base.PendingKey(qname),
|
||||||
|
}
|
||||||
|
argv := []interface{}{
|
||||||
|
id,
|
||||||
|
base.QueueKeyPrefix(qname),
|
||||||
|
}
|
||||||
|
res, err := runTaskCmd.Run(r.client, keys, argv...).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
n, ok := res.(int64)
|
n, ok := res.(int64)
|
||||||
if !ok {
|
if !ok {
|
||||||
return 0, fmt.Errorf("could not cast %v to int64", res)
|
return 0, fmt.Errorf("internal error: could not cast %v to int64", res)
|
||||||
}
|
}
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
@ -1075,9 +1075,9 @@ func TestRunArchivedTask(t *testing.T) {
|
|||||||
h.FlushDB(t, r.client) // clean up db before each test case
|
h.FlushDB(t, r.client) // clean up db before each test case
|
||||||
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
h.SeedAllArchivedQueues(t, r.client, tc.archived)
|
||||||
|
|
||||||
got := r.RunArchivedTask(tc.qname, tc.id)
|
got := r.RunTask(tc.qname, tc.id)
|
||||||
if got != tc.want {
|
if got != tc.want {
|
||||||
t.Errorf("r.RunDeadTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
|
t.Errorf("r.RunTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1176,9 +1176,9 @@ func TestRunRetryTask(t *testing.T) {
|
|||||||
h.FlushDB(t, r.client) // clean up db before each test case
|
h.FlushDB(t, r.client) // clean up db before each test case
|
||||||
h.SeedAllRetryQueues(t, r.client, tc.retry) // initialize retry queue
|
h.SeedAllRetryQueues(t, r.client, tc.retry) // initialize retry queue
|
||||||
|
|
||||||
got := r.RunRetryTask(tc.qname, tc.id)
|
got := r.RunTask(tc.qname, tc.id)
|
||||||
if got != tc.want {
|
if got != tc.want {
|
||||||
t.Errorf("r.RunRetryTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
|
t.Errorf("r.RunTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1277,9 +1277,9 @@ func TestRunScheduledTask(t *testing.T) {
|
|||||||
h.FlushDB(t, r.client) // clean up db before each test case
|
h.FlushDB(t, r.client) // clean up db before each test case
|
||||||
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
|
||||||
|
|
||||||
got := r.RunScheduledTask(tc.qname, tc.id)
|
got := r.RunTask(tc.qname, tc.id)
|
||||||
if got != tc.want {
|
if got != tc.want {
|
||||||
t.Errorf("r.RunRetryTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
|
t.Errorf("r.RunTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user