From 29e542e591b37caee76fc14126cbd8bc57962a38 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 5 Sep 2020 13:35:52 -0700 Subject: [PATCH] Rename Enqueue methods in Inspector to Run --- inspector.go | 41 +++++++++++++++--------------- inspector_test.go | 42 +++++++++++++++---------------- internal/rdb/inspect.go | 48 ++++++++++++++++++------------------ internal/rdb/inspect_test.go | 48 ++++++++++++++++++------------------ tools/asynq/cmd/task.go | 8 +++--- 5 files changed, 93 insertions(+), 94 deletions(-) diff --git a/inspector.go b/inspector.go index 9ada89d..4c31ac8 100644 --- a/inspector.go +++ b/inspector.go @@ -171,17 +171,17 @@ type DeadTask struct { score int64 } -// Key returns a key used to delete, enqueue, and kill the task. +// Key returns a key used to delete, run, and kill the task. func (t *ScheduledTask) Key() string { return fmt.Sprintf("s:%v:%v", t.ID, t.score) } -// Key returns a key used to delete, enqueue, and kill the task. +// Key returns a key used to delete, run, and kill the task. func (t *RetryTask) Key() string { return fmt.Sprintf("r:%v:%v", t.ID, t.score) } -// Key returns a key used to delete, enqueue, and kill the task. +// Key returns a key used to delete, run, and kill the task. func (t *DeadTask) Key() string { return fmt.Sprintf("d:%v:%v", t.ID, t.score) } @@ -463,39 +463,38 @@ func (i *Inspector) DeleteTaskByKey(qname, key string) error { } } -// TODO(hibiken): Use different verb here. Idea: Run or Stage -// EnqueueAllScheduledTasks enqueues all scheduled tasks for immediate processing within the given queue, -// and reports the number of tasks enqueued. -func (i *Inspector) EnqueueAllScheduledTasks(qname string) (int, error) { +// RunAllScheduledTasks transition all scheduled tasks to pending state within the given queue, +// and reports the number of tasks transitioned. +func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) { if err := validateQueueName(qname); err != nil { return 0, err } - n, err := i.rdb.EnqueueAllScheduledTasks(qname) + n, err := i.rdb.RunAllScheduledTasks(qname) return int(n), err } -// EnqueueAllRetryTasks enqueues all retry tasks for immediate processing within the given queue, -// and reports the number of tasks enqueued. -func (i *Inspector) EnqueueAllRetryTasks(qname string) (int, error) { +// RunAllRetryTasks transition all retry tasks to pending state within the given queue, +// and reports the number of tasks transitioned. +func (i *Inspector) RunAllRetryTasks(qname string) (int, error) { if err := validateQueueName(qname); err != nil { return 0, err } - n, err := i.rdb.EnqueueAllRetryTasks(qname) + n, err := i.rdb.RunAllRetryTasks(qname) return int(n), err } -// EnqueueAllDeadTasks enqueues all dead tasks for immediate processing within the given queue, -// and reports the number of tasks enqueued. -func (i *Inspector) EnqueueAllDeadTasks(qname string) (int, error) { +// RunAllDeadTasks transition all dead tasks to pending state within the given queue, +// and reports the number of tasks transitioned. +func (i *Inspector) RunAllDeadTasks(qname string) (int, error) { if err := validateQueueName(qname); err != nil { return 0, err } - n, err := i.rdb.EnqueueAllDeadTasks(qname) + n, err := i.rdb.RunAllDeadTasks(qname) return int(n), err } -// EnqueueTaskByKey enqueues a task with the given key in the given queue. -func (i *Inspector) EnqueueTaskByKey(qname, key string) error { +// RunTaskByKey transition a task to pending state given task key and queue name. +func (i *Inspector) RunTaskByKey(qname, key string) error { if err := validateQueueName(qname); err != nil { return err } @@ -505,11 +504,11 @@ func (i *Inspector) EnqueueTaskByKey(qname, key string) error { } switch state { case "s": - return i.rdb.EnqueueScheduledTask(qname, id, score) + return i.rdb.RunScheduledTask(qname, id, score) case "r": - return i.rdb.EnqueueRetryTask(qname, id, score) + return i.rdb.RunRetryTask(qname, id, score) case "d": - return i.rdb.EnqueueDeadTask(qname, id, score) + return i.rdb.RunDeadTask(qname, id, score) default: return fmt.Errorf("invalid key") } diff --git a/inspector_test.go b/inspector_test.go index fb9b99c..1d4ad12 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -1068,7 +1068,7 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { } } -func TestInspectorEnqueueAllScheduledTasks(t *testing.T) { +func TestInspectorRunAllScheduledTasks(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") @@ -1161,13 +1161,13 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) { asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) asynqtest.SeedAllPendingQueues(t, r, tc.pending) - got, err := inspector.EnqueueAllScheduledTasks(tc.qname) + got, err := inspector.RunAllScheduledTasks(tc.qname) if err != nil { - t.Errorf("EnqueueAllScheduledTasks(%q) returned error: %v", tc.qname, err) + t.Errorf("RunAllScheduledTasks(%q) returned error: %v", tc.qname, err) continue } if got != tc.want { - t.Errorf("EnqueueAllScheduledTasks(%q) = %d, want %d", tc.qname, got, tc.want) + t.Errorf("RunAllScheduledTasks(%q) = %d, want %d", tc.qname, got, tc.want) } for qname, want := range tc.wantScheduled { gotScheduled := asynqtest.GetScheduledEntries(t, r, qname) @@ -1184,7 +1184,7 @@ func TestInspectorEnqueueAllScheduledTasks(t *testing.T) { } } -func TestInspectorEnqueueAllRetryTasks(t *testing.T) { +func TestInspectorRunAllRetryTasks(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") @@ -1277,13 +1277,13 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) { asynqtest.SeedAllRetryQueues(t, r, tc.retry) asynqtest.SeedAllPendingQueues(t, r, tc.pending) - got, err := inspector.EnqueueAllRetryTasks(tc.qname) + got, err := inspector.RunAllRetryTasks(tc.qname) if err != nil { - t.Errorf("EnqueueAllRetryTasks(%q) returned error: %v", tc.qname, err) + t.Errorf("RunAllRetryTasks(%q) returned error: %v", tc.qname, err) continue } if got != tc.want { - t.Errorf("EnqueueAllRetryTasks(%q) = %d, want %d", tc.qname, got, tc.want) + t.Errorf("RunAllRetryTasks(%q) = %d, want %d", tc.qname, got, tc.want) } for qname, want := range tc.wantRetry { gotRetry := asynqtest.GetRetryEntries(t, r, qname) @@ -1300,7 +1300,7 @@ func TestInspectorEnqueueAllRetryTasks(t *testing.T) { } } -func TestInspectorEnqueueAllDeadTasks(t *testing.T) { +func TestInspectorRunAllDeadTasks(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") @@ -1389,13 +1389,13 @@ func TestInspectorEnqueueAllDeadTasks(t *testing.T) { asynqtest.SeedAllDeadQueues(t, r, tc.dead) asynqtest.SeedAllPendingQueues(t, r, tc.pending) - got, err := inspector.EnqueueAllDeadTasks(tc.qname) + got, err := inspector.RunAllDeadTasks(tc.qname) if err != nil { - t.Errorf("EnqueueAllDeadTasks(%q) returned error: %v", tc.qname, err) + t.Errorf("RunAllDeadTasks(%q) returned error: %v", tc.qname, err) continue } if got != tc.want { - t.Errorf("EnqueueAllDeadTasks(%q) = %d, want %d", tc.qname, got, tc.want) + t.Errorf("RunAllDeadTasks(%q) = %d, want %d", tc.qname, got, tc.want) } for qname, want := range tc.wantDead { gotDead := asynqtest.GetDeadEntries(t, r, qname) @@ -1560,7 +1560,7 @@ func TestInspectorDeleteTaskByKeyDeletesDeadTask(t *testing.T) { } } -func TestInspectorEnqueueTaskByKeyEnqueuesScheduledTask(t *testing.T) { +func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) @@ -1607,8 +1607,8 @@ func TestInspectorEnqueueTaskByKeyEnqueuesScheduledTask(t *testing.T) { asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) asynqtest.SeedAllPendingQueues(t, r, tc.pending) - if err := inspector.EnqueueTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("EnqueueTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil { + t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } for qname, want := range tc.wantScheduled { @@ -1629,7 +1629,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesScheduledTask(t *testing.T) { } } -func TestInspectorEnqueueTaskByKeyEnqueuesRetryTask(t *testing.T) { +func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom") @@ -1676,8 +1676,8 @@ func TestInspectorEnqueueTaskByKeyEnqueuesRetryTask(t *testing.T) { asynqtest.SeedAllRetryQueues(t, r, tc.retry) asynqtest.SeedAllPendingQueues(t, r, tc.pending) - if err := inspector.EnqueueTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("EnqueueTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil { + t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } for qname, want := range tc.wantRetry { @@ -1697,7 +1697,7 @@ func TestInspectorEnqueueTaskByKeyEnqueuesRetryTask(t *testing.T) { } } -func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) { +func TestInspectorRunTaskByKeyRunsDeadTask(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") @@ -1748,8 +1748,8 @@ func TestInspectorEnqueueTaskByKeyEnqueuesDeadTask(t *testing.T) { asynqtest.SeedAllDeadQueues(t, r, tc.dead) asynqtest.SeedAllPendingQueues(t, r, tc.pending) - if err := inspector.EnqueueTaskByKey(tc.qname, tc.key); err != nil { - t.Errorf("EnqueueTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) + if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil { + t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } for qname, want := range tc.wantDead { diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index 6383c01..a17535a 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -345,11 +345,11 @@ func (r *RDB) listZSetEntries(key string, pgn Pagination) ([]base.Z, error) { return res, nil } -// EnqueueDeadTask finds a dead task that matches the given id and score from +// RunDeadTask finds a dead task that matches the given id and score from // the given queue and enqueues it for processing. //If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) EnqueueDeadTask(qname string, id uuid.UUID, score int64) error { - n, err := r.removeAndEnqueue(base.DeadKey(qname), base.QueueKey(qname), id.String(), float64(score)) +func (r *RDB) RunDeadTask(qname string, id uuid.UUID, score int64) error { + n, err := r.removeAndRun(base.DeadKey(qname), base.QueueKey(qname), id.String(), float64(score)) if err != nil { return err } @@ -359,11 +359,11 @@ func (r *RDB) EnqueueDeadTask(qname string, id uuid.UUID, score int64) error { return nil } -// EnqueueRetryTask finds a retry task that matches the given id and score from +// RunRetryTask finds a retry task that matches the given id and score from // the given queue and enqueues it for processing. // If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) EnqueueRetryTask(qname string, id uuid.UUID, score int64) error { - n, err := r.removeAndEnqueue(base.RetryKey(qname), base.QueueKey(qname), id.String(), float64(score)) +func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error { + n, err := r.removeAndRun(base.RetryKey(qname), base.QueueKey(qname), id.String(), float64(score)) if err != nil { return err } @@ -373,11 +373,11 @@ func (r *RDB) EnqueueRetryTask(qname string, id uuid.UUID, score int64) error { return nil } -// EnqueueScheduledTask finds a scheduled task that matches the given id and score from +// RunScheduledTask finds a scheduled task that matches the given id and score from // from the given queue and enqueues it for processing. // If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) EnqueueScheduledTask(qname string, id uuid.UUID, score int64) error { - n, err := r.removeAndEnqueue(base.ScheduledKey(qname), base.QueueKey(qname), id.String(), float64(score)) +func (r *RDB) RunScheduledTask(qname string, id uuid.UUID, score int64) error { + n, err := r.removeAndRun(base.ScheduledKey(qname), base.QueueKey(qname), id.String(), float64(score)) if err != nil { return err } @@ -387,25 +387,25 @@ func (r *RDB) EnqueueScheduledTask(qname string, id uuid.UUID, score int64) erro return nil } -// EnqueueAllScheduledTasks enqueues all scheduled tasks from the given queue +// RunAllScheduledTasks enqueues all scheduled tasks from the given queue // and returns the number of tasks enqueued. -func (r *RDB) EnqueueAllScheduledTasks(qname string) (int64, error) { - return r.removeAndEnqueueAll(base.ScheduledKey(qname), base.QueueKey(qname)) +func (r *RDB) RunAllScheduledTasks(qname string) (int64, error) { + return r.removeAndRunAll(base.ScheduledKey(qname), base.QueueKey(qname)) } -// EnqueueAllRetryTasks enqueues all retry tasks from the given queue +// RunAllRetryTasks enqueues all retry tasks from the given queue // and returns the number of tasks enqueued. -func (r *RDB) EnqueueAllRetryTasks(qname string) (int64, error) { - return r.removeAndEnqueueAll(base.RetryKey(qname), base.QueueKey(qname)) +func (r *RDB) RunAllRetryTasks(qname string) (int64, error) { + return r.removeAndRunAll(base.RetryKey(qname), base.QueueKey(qname)) } -// EnqueueAllDeadTasks enqueues all tasks from dead queue +// RunAllDeadTasks enqueues all tasks from dead queue // and returns the number of tasks enqueued. -func (r *RDB) EnqueueAllDeadTasks(qname string) (int64, error) { - return r.removeAndEnqueueAll(base.DeadKey(qname), base.QueueKey(qname)) +func (r *RDB) RunAllDeadTasks(qname string) (int64, error) { + return r.removeAndRunAll(base.DeadKey(qname), base.QueueKey(qname)) } -var removeAndEnqueueCmd = redis.NewScript(` +var removeAndRunCmd = redis.NewScript(` local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) for _, msg in ipairs(msgs) do local decoded = cjson.decode(msg) @@ -417,8 +417,8 @@ for _, msg in ipairs(msgs) do end return 0`) -func (r *RDB) removeAndEnqueue(zset, qkey, id string, score float64) (int64, error) { - res, err := removeAndEnqueueCmd.Run(r.client, []string{zset, qkey}, score, id).Result() +func (r *RDB) removeAndRun(zset, qkey, id string, score float64) (int64, error) { + res, err := removeAndRunCmd.Run(r.client, []string{zset, qkey}, score, id).Result() if err != nil { return 0, err } @@ -429,7 +429,7 @@ func (r *RDB) removeAndEnqueue(zset, qkey, id string, score float64) (int64, err return n, nil } -var removeAndEnqueueAllCmd = redis.NewScript(` +var removeAndRunAllCmd = redis.NewScript(` local msgs = redis.call("ZRANGE", KEYS[1], 0, -1) for _, msg in ipairs(msgs) do redis.call("LPUSH", KEYS[2], msg) @@ -437,8 +437,8 @@ for _, msg in ipairs(msgs) do end return table.getn(msgs)`) -func (r *RDB) removeAndEnqueueAll(zset, qkey string) (int64, error) { - res, err := removeAndEnqueueAllCmd.Run(r.client, []string{zset, qkey}).Result() +func (r *RDB) removeAndRunAll(zset, qkey string) (int64, error) { + res, err := removeAndRunAllCmd.Run(r.client, []string{zset, qkey}).Result() if err != nil { return 0, err } diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 7f44794..3bd5a1c 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -981,7 +981,7 @@ var ( zScoreCmpOpt = h.EquateInt64Approx(2) // allow for 2 seconds margin in Z.Score ) -func TestEnqueueDeadTask(t *testing.T) { +func TestRunDeadTask(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil) @@ -994,7 +994,7 @@ func TestEnqueueDeadTask(t *testing.T) { qname string score int64 id uuid.UUID - want error // expected return value from calling EnqueueDeadTask + want error // expected return value from calling RunDeadTask wantDead map[string][]*base.TaskMessage wantPending map[string][]*base.TaskMessage }{ @@ -1063,9 +1063,9 @@ func TestEnqueueDeadTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllDeadQueues(t, r.client, tc.dead) - got := r.EnqueueDeadTask(tc.qname, tc.id, tc.score) + got := r.RunDeadTask(tc.qname, tc.id, tc.score) if got != tc.want { - t.Errorf("r.EnqueueDeadTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) + t.Errorf("r.RunDeadTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) continue } @@ -1085,7 +1085,7 @@ func TestEnqueueDeadTask(t *testing.T) { } } -func TestEnqueueRetryTask(t *testing.T) { +func TestRunRetryTask(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", nil) @@ -1098,7 +1098,7 @@ func TestEnqueueRetryTask(t *testing.T) { qname string score int64 id uuid.UUID - want error // expected return value from calling EnqueueRetryTask + want error // expected return value from calling RunRetryTask wantRetry map[string][]*base.TaskMessage wantPending map[string][]*base.TaskMessage }{ @@ -1167,9 +1167,9 @@ func TestEnqueueRetryTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllRetryQueues(t, r.client, tc.retry) // initialize retry queue - got := r.EnqueueRetryTask(tc.qname, tc.id, tc.score) + got := r.RunRetryTask(tc.qname, tc.id, tc.score) if got != tc.want { - t.Errorf("r.EnqueueRetryTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) + t.Errorf("r.RunRetryTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) continue } @@ -1189,7 +1189,7 @@ func TestEnqueueRetryTask(t *testing.T) { } } -func TestEnqueueScheduledTask(t *testing.T) { +func TestRunScheduledTask(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil) @@ -1202,7 +1202,7 @@ func TestEnqueueScheduledTask(t *testing.T) { qname string score int64 id uuid.UUID - want error // expected return value from calling EnqueueScheduledTask + want error // expected return value from calling RunScheduledTask wantScheduled map[string][]*base.TaskMessage wantPending map[string][]*base.TaskMessage }{ @@ -1271,9 +1271,9 @@ func TestEnqueueScheduledTask(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllScheduledQueues(t, r.client, tc.scheduled) - got := r.EnqueueScheduledTask(tc.qname, tc.id, tc.score) + got := r.RunScheduledTask(tc.qname, tc.id, tc.score) if got != tc.want { - t.Errorf("r.EnqueueRetryTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) + t.Errorf("r.RunRetryTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) continue } @@ -1293,7 +1293,7 @@ func TestEnqueueScheduledTask(t *testing.T) { } } -func TestEnqueueAllScheduledTasks(t *testing.T) { +func TestRunAllScheduledTasks(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil) @@ -1371,15 +1371,15 @@ func TestEnqueueAllScheduledTasks(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllScheduledQueues(t, r.client, tc.scheduled) - got, err := r.EnqueueAllScheduledTasks(tc.qname) + got, err := r.RunAllScheduledTasks(tc.qname) if err != nil { - t.Errorf("%s; r.EnqueueAllScheduledTasks(%q) = %v, %v; want %v, nil", + t.Errorf("%s; r.RunAllScheduledTasks(%q) = %v, %v; want %v, nil", tc.desc, tc.qname, got, err, tc.want) continue } if got != tc.want { - t.Errorf("%s; r.EnqueueAllScheduledTasks(%q) = %v, %v; want %v, nil", + t.Errorf("%s; r.RunAllScheduledTasks(%q) = %v, %v; want %v, nil", tc.desc, tc.qname, got, err, tc.want) } @@ -1398,7 +1398,7 @@ func TestEnqueueAllScheduledTasks(t *testing.T) { } } -func TestEnqueueAllRetryTasks(t *testing.T) { +func TestRunAllRetryTasks(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil) @@ -1476,15 +1476,15 @@ func TestEnqueueAllRetryTasks(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllRetryQueues(t, r.client, tc.retry) - got, err := r.EnqueueAllRetryTasks(tc.qname) + got, err := r.RunAllRetryTasks(tc.qname) if err != nil { - t.Errorf("%s; r.EnqueueAllRetryTasks(%q) = %v, %v; want %v, nil", + t.Errorf("%s; r.RunAllRetryTasks(%q) = %v, %v; want %v, nil", tc.desc, tc.qname, got, err, tc.want) continue } if got != tc.want { - t.Errorf("%s; r.EnqueueAllRetryTasks(%q) = %v, %v; want %v, nil", + t.Errorf("%s; r.RunAllRetryTasks(%q) = %v, %v; want %v, nil", tc.desc, tc.qname, got, err, tc.want) } @@ -1503,7 +1503,7 @@ func TestEnqueueAllRetryTasks(t *testing.T) { } } -func TestEnqueueAllDeadTasks(t *testing.T) { +func TestRunAllDeadTasks(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("gen_thumbnail", nil) @@ -1581,15 +1581,15 @@ func TestEnqueueAllDeadTasks(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllDeadQueues(t, r.client, tc.dead) - got, err := r.EnqueueAllDeadTasks(tc.qname) + got, err := r.RunAllDeadTasks(tc.qname) if err != nil { - t.Errorf("%s; r.EnqueueAllDeadTasks(%q) = %v, %v; want %v, nil", + t.Errorf("%s; r.RunAllDeadTasks(%q) = %v, %v; want %v, nil", tc.desc, tc.qname, got, err, tc.want) continue } if got != tc.want { - t.Errorf("%s; r.EnqueueAllDeadTasks(%q) = %v, %v; want %v, nil", + t.Errorf("%s; r.RunAllDeadTasks(%q) = %v, %v; want %v, nil", tc.desc, tc.qname, got, err, tc.want) } diff --git a/tools/asynq/cmd/task.go b/tools/asynq/cmd/task.go index dc90e3b..c727fd1 100644 --- a/tools/asynq/cmd/task.go +++ b/tools/asynq/cmd/task.go @@ -360,7 +360,7 @@ func taskRun(cmd *cobra.Command, args []string) { } i := createInspector() - err = i.EnqueueTaskByKey(qname, key) + err = i.RunTaskByKey(qname, key) if err != nil { fmt.Printf("error: %v\n", err) os.Exit(1) @@ -446,11 +446,11 @@ func taskRunAll(cmd *cobra.Command, args []string) { var n int switch state { case "scheduled": - n, err = i.EnqueueAllScheduledTasks(qname) + n, err = i.RunAllScheduledTasks(qname) case "retry": - n, err = i.EnqueueAllRetryTasks(qname) + n, err = i.RunAllRetryTasks(qname) case "dead": - n, err = i.EnqueueAllDeadTasks(qname) + n, err = i.RunAllDeadTasks(qname) default: fmt.Printf("error: unsupported state %q\n", state) os.Exit(1)