diff --git a/inspector.go b/inspector.go index e4f173e..9ada89d 100644 --- a/inspector.go +++ b/inspector.go @@ -37,12 +37,12 @@ type QueueStats struct { // Name of the queue. Queue string // Size is the total number of tasks in the queue. - // The value is the sum of Pending, InProgress, Scheduled, Retry, and Dead. + // The value is the sum of Pending, Active, Scheduled, Retry, and Dead. Size int // Number of pending tasks. Pending int - // Number of in-progress tasks. - InProgress int + // Number of active tasks. + Active int // Number of scheduled tasks. Scheduled int // Number of retry tasks. @@ -71,17 +71,17 @@ func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) { return nil, err } return &QueueStats{ - Queue: stats.Queue, - Size: stats.Size, - Pending: stats.Pending, - InProgress: stats.InProgress, - Scheduled: stats.Scheduled, - Retry: stats.Retry, - Dead: stats.Dead, - Processed: stats.Processed, - Failed: stats.Failed, - Paused: stats.Paused, - Timestamp: stats.Timestamp, + Queue: stats.Queue, + Size: stats.Size, + Pending: stats.Pending, + Active: stats.Active, + Scheduled: stats.Scheduled, + Retry: stats.Retry, + Dead: stats.Dead, + Processed: stats.Processed, + Failed: stats.Failed, + Paused: stats.Paused, + Timestamp: stats.Timestamp, }, nil } @@ -126,8 +126,8 @@ type PendingTask struct { Queue string } -// InProgressTask is a task that's currently being processed. -type InProgressTask struct { +// ActiveTask is a task that's currently being processed. +type ActiveTask struct { *Task ID string Queue string @@ -293,22 +293,22 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi return tasks, err } -// ListInProgressTasks retrieves in-progress tasks from the specified queue. +// ListActiveTasks retrieves active tasks from the specified queue. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListInProgressTasks(qname string, opts ...ListOption) ([]*InProgressTask, error) { +func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*ActiveTask, error) { if err := validateQueueName(qname); err != nil { return nil, err } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - msgs, err := i.rdb.ListInProgress(qname, pgn) + msgs, err := i.rdb.ListActive(qname, pgn) if err != nil { return nil, err } - var tasks []*InProgressTask + var tasks []*ActiveTask for _, m := range msgs { - tasks = append(tasks, &InProgressTask{ + tasks = append(tasks, &ActiveTask{ Task: NewTask(m.Type, m.Payload), ID: m.ID.String(), Queue: m.Queue, diff --git a/inspector_test.go b/inspector_test.go index 53ae940..fb9b99c 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -114,17 +114,17 @@ func TestInspectorCurrentStats(t *testing.T) { }, qname: "default", want: &QueueStats{ - Queue: "default", - Size: 4, - Pending: 1, - InProgress: 1, - Scheduled: 2, - Retry: 0, - Dead: 0, - Processed: 120, - Failed: 2, - Paused: false, - Timestamp: now, + Queue: "default", + Size: 4, + Pending: 1, + Active: 1, + Scheduled: 2, + Retry: 0, + Dead: 0, + Processed: 120, + Failed: 2, + Paused: false, + Timestamp: now, }, }, } @@ -132,7 +132,7 @@ func TestInspectorCurrentStats(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) asynqtest.SeedAllPendingQueues(t, r, tc.pending) - asynqtest.SeedAllInProgressQueues(t, r, tc.inProgress) + asynqtest.SeedAllActiveQueues(t, r, tc.inProgress) asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) asynqtest.SeedAllRetryQueues(t, r, tc.retry) asynqtest.SeedAllDeadQueues(t, r, tc.dead) @@ -291,7 +291,7 @@ func TestInspectorListPendingTasks(t *testing.T) { } } -func TestInspectorListInProgressTasks(t *testing.T) { +func TestInspectorListActiveTasks(t *testing.T) { r := setup(t) m1 := asynqtest.NewTaskMessage("task1", nil) m2 := asynqtest.NewTaskMessage("task2", nil) @@ -300,8 +300,8 @@ func TestInspectorListInProgressTasks(t *testing.T) { inspector := NewInspector(getRedisConnOpt(t)) - createInProgressTask := func(msg *base.TaskMessage) *InProgressTask { - return &InProgressTask{ + createActiveTask := func(msg *base.TaskMessage) *ActiveTask { + return &ActiveTask{ Task: NewTask(msg.Type, msg.Payload), ID: msg.ID.String(), Queue: msg.Queue, @@ -312,34 +312,34 @@ func TestInspectorListInProgressTasks(t *testing.T) { desc string inProgress map[string][]*base.TaskMessage qname string - want []*InProgressTask + want []*ActiveTask }{ { - desc: "with a few in-progress tasks", + desc: "with a few active tasks", inProgress: map[string][]*base.TaskMessage{ "default": {m1, m2}, "custom": {m3, m4}, }, qname: "default", - want: []*InProgressTask{ - createInProgressTask(m1), - createInProgressTask(m2), + want: []*ActiveTask{ + createActiveTask(m1), + createActiveTask(m2), }, }, } for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedAllInProgressQueues(t, r, tc.inProgress) + asynqtest.SeedAllActiveQueues(t, r, tc.inProgress) - got, err := inspector.ListInProgressTasks(tc.qname) + got, err := inspector.ListActiveTasks(tc.qname) if err != nil { - t.Errorf("%s; ListInProgressTasks(%q) returned error: %v", tc.qname, tc.desc, err) + t.Errorf("%s; ListActiveTasks(%q) returned error: %v", tc.qname, tc.desc, err) continue } ignoreOpt := cmpopts.IgnoreUnexported(Payload{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { - t.Errorf("%s; ListInProgressTask(%q) = %v, want %v; (-want,+got)\n%s", + t.Errorf("%s; ListActiveTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) } } diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index f33210c..8e02358 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -181,11 +181,11 @@ func SeedPendingQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskM seedRedisList(tb, r, base.QueueKey(qname), msgs) } -// SeedInProgressQueue initializes the in-progress queue with the given messages. -func SeedInProgressQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) { +// SeedActiveQueue initializes the active queue with the given messages. +func SeedActiveQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) { tb.Helper() r.SAdd(base.AllQueues, qname) - seedRedisList(tb, r, base.InProgressKey(qname), msgs) + seedRedisList(tb, r, base.ActiveKey(qname), msgs) } // SeedScheduledQueue initializes the scheduled queue with the given messages. @@ -225,10 +225,10 @@ func SeedAllPendingQueues(tb testing.TB, r redis.UniversalClient, pending map[st } } -// SeedAllInProgressQueues initializes all of the specified in-progress queues with the given messages. -func SeedAllInProgressQueues(tb testing.TB, r redis.UniversalClient, inprogress map[string][]*base.TaskMessage) { - for q, msgs := range inprogress { - SeedInProgressQueue(tb, r, msgs, q) +// SeedAllActiveQueues initializes all of the specified active queues with the given messages. +func SeedAllActiveQueues(tb testing.TB, r redis.UniversalClient, active map[string][]*base.TaskMessage) { + for q, msgs := range active { + SeedActiveQueue(tb, r, msgs, q) } } @@ -284,10 +284,10 @@ func GetPendingMessages(tb testing.TB, r redis.UniversalClient, qname string) [] return getListMessages(tb, r, base.QueueKey(qname)) } -// GetInProgressMessages returns all in-progress messages in the given queue. -func GetInProgressMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { +// GetActiveMessages returns all active messages in the given queue. +func GetActiveMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { tb.Helper() - return getListMessages(tb, r, base.InProgressKey(qname)) + return getListMessages(tb, r, base.ActiveKey(qname)) } // GetScheduledMessages returns all scheduled task messages in the given queue. diff --git a/internal/base/base.go b/internal/base/base.go index 271f49e..35ec0f3 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -40,10 +40,9 @@ func QueueKey(qname string) string { return fmt.Sprintf("asynq:{%s}", qname) } -// TODO: Should we rename this to "active"? -// InProgressKey returns a redis key for the in-progress tasks. -func InProgressKey(qname string) string { - return fmt.Sprintf("asynq:{%s}:in_progress", qname) +// ActiveKey returns a redis key for the active tasks. +func ActiveKey(qname string) string { + return fmt.Sprintf("asynq:{%s}:active", qname) } // ScheduledKey returns a redis key for the scheduled tasks. @@ -274,7 +273,7 @@ type WorkerInfo struct { Started time.Time } -// Cancelations is a collection that holds cancel functions for all in-progress tasks. +// Cancelations is a collection that holds cancel functions for all active tasks. // // Cancelations are safe for concurrent use by multipel goroutines. type Cancelations struct { diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 728d2e3..c12fc80 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -32,19 +32,19 @@ func TestQueueKey(t *testing.T) { } } -func TestInProgressKey(t *testing.T) { +func TestActiveKey(t *testing.T) { tests := []struct { qname string want string }{ - {"default", "asynq:{default}:in_progress"}, - {"custom", "asynq:{custom}:in_progress"}, + {"default", "asynq:{default}:active"}, + {"custom", "asynq:{custom}:active"}, } for _, tc := range tests { - got := InProgressKey(tc.qname) + got := ActiveKey(tc.qname) if got != tc.want { - t.Errorf("InProgressKey(%q) = %q, want %q", tc.qname, got, tc.want) + t.Errorf("ActiveKey(%q) = %q, want %q", tc.qname, got, tc.want) } } } diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index c3e7f86..6383c01 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -31,11 +31,11 @@ type Stats struct { // Size is the total number of tasks in the queue. Size int // Number of tasks in each state. - Pending int - InProgress int - Scheduled int - Retry int - Dead int + Pending int + Active int + Scheduled int + Retry int + Dead int // Total number of tasks processed during the current date. // The number includes both succeeded and failed tasks. Processed int @@ -59,7 +59,7 @@ type DailyStats struct { } // KEYS[1] -> asynq: -// KEYS[2] -> asynq::in_progress +// KEYS[2] -> asynq::active // KEYS[3] -> asynq::scheduled // KEYS[4] -> asynq::retry // KEYS[5] -> asynq::dead @@ -108,7 +108,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { now := time.Now() res, err := currentStatsCmd.Run(r.client, []string{ base.QueueKey(qname), - base.InProgressKey(qname), + base.ActiveKey(qname), base.ScheduledKey(qname), base.RetryKey(qname), base.DeadKey(qname), @@ -135,8 +135,8 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { case base.QueueKey(qname): stats.Pending = val size += val - case base.InProgressKey(qname): - stats.InProgress = val + case base.ActiveKey(qname): + stats.Active = val size += val case base.ScheduledKey(qname): stats.Scheduled = val @@ -266,12 +266,12 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, er return r.listMessages(base.QueueKey(qname), pgn) } -// ListInProgress returns all tasks that are currently being processed for the given queue. -func (r *RDB) ListInProgress(qname string, pgn Pagination) ([]*base.TaskMessage, error) { +// ListActive returns all tasks that are currently being processed for the given queue. +func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, error) { if !r.client.SIsMember(base.AllQueues, qname).Val() { return nil, fmt.Errorf("queue %q does not exist", qname) } - return r.listMessages(base.InProgressKey(qname), pgn) + return r.listMessages(base.ActiveKey(qname), pgn) } // listMessages returns a list of TaskMessage in Redis list with the given key. @@ -652,17 +652,17 @@ func (e *ErrQueueNotEmpty) Error() string { return fmt.Sprintf("queue %q is not empty", e.qname) } -// Only check whether in-progress queue is empty before removing. +// Only check whether active queue is empty before removing. // KEYS[1] -> asynq:{} -// KEYS[2] -> asynq:{}:in_progress +// KEYS[2] -> asynq:{}:active // KEYS[3] -> asynq:{}:scheduled // KEYS[4] -> asynq:{}:retry // KEYS[5] -> asynq:{}:dead // KEYS[6] -> asynq:{}:deadlines var removeQueueForceCmd = redis.NewScript(` -local inprogress = redis.call("LLEN", KEYS[2]) -if inprogress > 0 then - return redis.error_reply("Queue has tasks in-progress") +local active = redis.call("LLEN", KEYS[2]) +if active > 0 then + return redis.error_reply("Queue has tasks active") end redis.call("DEL", KEYS[1]) redis.call("DEL", KEYS[2]) @@ -674,18 +674,18 @@ return redis.status_reply("OK")`) // Checks whether queue is empty before removing. // KEYS[1] -> asynq:{} -// KEYS[2] -> asynq:{}:in_progress +// KEYS[2] -> asynq:{}:active // KEYS[3] -> asynq:{}:scheduled // KEYS[4] -> asynq:{}:retry // KEYS[5] -> asynq:{}:dead // KEYS[6] -> asynq:{}:deadlines var removeQueueCmd = redis.NewScript(` local pending = redis.call("LLEN", KEYS[1]) -local inprogress = redis.call("LLEN", KEYS[2]) +local active = redis.call("LLEN", KEYS[2]) local scheduled = redis.call("SCARD", KEYS[3]) local retry = redis.call("SCARD", KEYS[4]) local dead = redis.call("SCARD", KEYS[5]) -local total = pending + inprogress + scheduled + retry + dead +local total = pending + active + scheduled + retry + dead if total > 0 then return redis.error_reply("QUEUE NOT EMPTY") end @@ -700,7 +700,7 @@ return redis.status_reply("OK")`) // RemoveQueue removes the specified queue. // // If force is set to true, it will remove the queue regardless -// as long as no tasks are in-progress for the queue. +// as long as no tasks are active for the queue. // If force is set to false, it will only remove the queue if // the queue is empty. func (r *RDB) RemoveQueue(qname string, force bool) error { @@ -719,7 +719,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error { } keys := []string{ base.QueueKey(qname), - base.InProgressKey(qname), + base.ActiveKey(qname), base.ScheduledKey(qname), base.RetryKey(qname), base.DeadKey(qname), diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 7620e28..7f44794 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -110,17 +110,17 @@ func TestCurrentStats(t *testing.T) { paused: []string{}, qname: "default", want: &Stats{ - Queue: "default", - Paused: false, - Size: 4, - Pending: 1, - InProgress: 1, - Scheduled: 2, - Retry: 0, - Dead: 0, - Processed: 120, - Failed: 2, - Timestamp: now, + Queue: "default", + Paused: false, + Size: 4, + Pending: 1, + Active: 1, + Scheduled: 2, + Retry: 0, + Dead: 0, + Processed: 120, + Failed: 2, + Timestamp: now, }, }, { @@ -165,17 +165,17 @@ func TestCurrentStats(t *testing.T) { paused: []string{"critical", "low"}, qname: "critical", want: &Stats{ - Queue: "critical", - Paused: true, - Size: 1, - Pending: 1, - InProgress: 0, - Scheduled: 0, - Retry: 0, - Dead: 0, - Processed: 100, - Failed: 0, - Timestamp: now, + Queue: "critical", + Paused: true, + Size: 1, + Pending: 1, + Active: 0, + Scheduled: 0, + Retry: 0, + Dead: 0, + Processed: 100, + Failed: 0, + Timestamp: now, }, }, } @@ -188,7 +188,7 @@ func TestCurrentStats(t *testing.T) { } } h.SeedAllPendingQueues(t, r.client, tc.pending) - h.SeedAllInProgressQueues(t, r.client, tc.inProgress) + h.SeedAllActiveQueues(t, r.client, tc.inProgress) h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllRetryQueues(t, r.client, tc.retry) h.SeedAllDeadQueues(t, r.client, tc.dead) @@ -433,7 +433,7 @@ func TestListPendingPagination(t *testing.T) { } } -func TestListInProgress(t *testing.T) { +func TestListActive(t *testing.T) { r := setup(t) m1 := h.NewTaskMessage("task1", nil) @@ -466,10 +466,10 @@ func TestListInProgress(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedAllInProgressQueues(t, r.client, tc.inProgress) + h.SeedAllActiveQueues(t, r.client, tc.inProgress) - got, err := r.ListInProgress(tc.qname, Pagination{Size: 20, Page: 0}) - op := fmt.Sprintf("r.ListInProgress(%q, Pagination{Size: 20, Page: 0})", tc.qname) + got, err := r.ListActive(tc.qname, Pagination{Size: 20, Page: 0}) + op := fmt.Sprintf("r.ListActive(%q, Pagination{Size: 20, Page: 0})", tc.qname) if err != nil { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.inProgress) continue @@ -481,14 +481,14 @@ func TestListInProgress(t *testing.T) { } } -func TestListInProgressPagination(t *testing.T) { +func TestListActivePagination(t *testing.T) { r := setup(t) var msgs []*base.TaskMessage for i := 0; i < 100; i++ { msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) msgs = append(msgs, msg) } - h.SeedInProgressQueue(t, r.client, msgs, "default") + h.SeedActiveQueue(t, r.client, msgs, "default") tests := []struct { desc string @@ -507,8 +507,8 @@ func TestListInProgressPagination(t *testing.T) { } for _, tc := range tests { - got, err := r.ListInProgress(tc.qname, Pagination{Size: tc.size, Page: tc.page}) - op := fmt.Sprintf("r.ListInProgress(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page) + got, err := r.ListActive(tc.qname, Pagination{Size: tc.size, Page: tc.page}) + op := fmt.Sprintf("r.ListActive(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page) if err != nil { t.Errorf("%s; %s returned error %v", tc.desc, op, err) continue @@ -2669,7 +2669,7 @@ func TestRemoveQueue(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) h.SeedAllPendingQueues(t, r.client, tc.pending) - h.SeedAllInProgressQueues(t, r.client, tc.inProgress) + h.SeedAllActiveQueues(t, r.client, tc.inProgress) h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllRetryQueues(t, r.client, tc.retry) h.SeedAllDeadQueues(t, r.client, tc.dead) @@ -2686,7 +2686,7 @@ func TestRemoveQueue(t *testing.T) { keys := []string{ base.QueueKey(tc.qname), - base.InProgressKey(tc.qname), + base.ActiveKey(tc.qname), base.DeadlinesKey(tc.qname), base.ScheduledKey(tc.qname), base.RetryKey(tc.qname), @@ -2768,7 +2768,7 @@ func TestRemoveQueueError(t *testing.T) { force: false, }, { - desc: "force removing queue with tasks in-progress", + desc: "force removing queue with active tasks", pending: map[string][]*base.TaskMessage{ "default": {m1, m2}, "custom": {m3}, @@ -2790,7 +2790,7 @@ func TestRemoveQueueError(t *testing.T) { "custom": {}, }, qname: "custom", - // Even with force=true, it should error if there are tasks in-progress. + // Even with force=true, it should error if there are active tasks. force: true, }, } @@ -2798,7 +2798,7 @@ func TestRemoveQueueError(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) h.SeedAllPendingQueues(t, r.client, tc.pending) - h.SeedAllInProgressQueues(t, r.client, tc.inProgress) + h.SeedAllActiveQueues(t, r.client, tc.inProgress) h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllRetryQueues(t, r.client, tc.retry) h.SeedAllDeadQueues(t, r.client, tc.dead) @@ -2817,9 +2817,9 @@ func TestRemoveQueueError(t *testing.T) { } } for qname, want := range tc.inProgress { - gotInProgress := h.GetInProgressMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.InProgressKey(qname), diff) + gotActive := h.GetActiveMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" { + t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.ActiveKey(qname), diff) } } for qname, want := range tc.scheduled { diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index d7a03a5..baf3280 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -120,7 +120,7 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti // KEYS[1] -> asynq:{} // KEYS[2] -> asynq:{}:paused -// KEYS[3] -> asynq:{}:in_progress +// KEYS[3] -> asynq:{}:active // KEYS[4] -> asynq:{}:deadlines // ARGV[1] -> current time in Unix time // @@ -156,7 +156,7 @@ func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err err keys := []string{ base.QueueKey(qname), base.PausedKey(qname), - base.InProgressKey(qname), + base.ActiveKey(qname), base.DeadlinesKey(qname), } res, err := dequeueCmd.Run(r.client, keys, time.Now().Unix()).Result() @@ -183,7 +183,7 @@ func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err err return "", 0, ErrNoProcessableTask } -// KEYS[1] -> asynq:{}:in_progress +// KEYS[1] -> asynq:{}:active // KEYS[2] -> asynq:{}:deadlines // KEYS[3] -> asynq:{}:processed: // ARGV[1] -> base.TaskMessage value @@ -202,7 +202,7 @@ end return redis.status_reply("OK") `) -// KEYS[1] -> asynq:{}:in_progress +// KEYS[1] -> asynq:{}:active // KEYS[2] -> asynq:{}:deadlines // KEYS[3] -> asynq:{}:processed: // KEYS[4] -> unique key @@ -226,7 +226,7 @@ end return redis.status_reply("OK") `) -// Done removes the task from in-progress queue to mark the task as done. +// Done removes the task from active queue to mark the task as done. // It removes a uniqueness lock acquired by the task, if any. func (r *RDB) Done(msg *base.TaskMessage) error { encoded, err := base.EncodeMessage(msg) @@ -236,7 +236,7 @@ func (r *RDB) Done(msg *base.TaskMessage) error { now := time.Now() expireAt := now.Add(statsTTL) keys := []string{ - base.InProgressKey(msg.Queue), + base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.ProcessedKey(msg.Queue, now), } @@ -249,7 +249,7 @@ func (r *RDB) Done(msg *base.TaskMessage) error { return doneCmd.Run(r.client, keys, args...).Err() } -// KEYS[1] -> asynq:{}:in_progress +// KEYS[1] -> asynq:{}:active // KEYS[2] -> asynq:{}:deadlines // KEYS[3] -> asynq:{} // ARGV[1] -> base.TaskMessage value @@ -264,14 +264,14 @@ end redis.call("RPUSH", KEYS[3], ARGV[1]) return redis.status_reply("OK")`) -// Requeue moves the task from in-progress queue to the specified queue. +// Requeue moves the task from active queue to the specified queue. func (r *RDB) Requeue(msg *base.TaskMessage) error { encoded, err := base.EncodeMessage(msg) if err != nil { return err } return requeueCmd.Run(r.client, - []string{base.InProgressKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.QueueKey(msg.Queue)}, + []string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.QueueKey(msg.Queue)}, encoded).Err() } @@ -330,12 +330,12 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim return nil } -// KEYS[1] -> asynq:{}:in_progress +// KEYS[1] -> asynq:{}:active // KEYS[2] -> asynq:{}:deadlines // KEYS[3] -> asynq:{}:retry // KEYS[4] -> asynq:{}:processed: // KEYS[5] -> asynq:{}:failed: -// ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue +// ARGV[1] -> base.TaskMessage value to remove from base.ActiveQueue queue // ARGV[2] -> base.TaskMessage value to add to Retry queue // ARGV[3] -> retry_at UNIX timestamp // ARGV[4] -> stats expiration timestamp @@ -357,7 +357,7 @@ if tonumber(m) == 1 then end return redis.status_reply("OK")`) -// Retry moves the task from in-progress to retry queue, incrementing retry count +// Retry moves the task from active to retry queue, incrementing retry count // and assigning error message to the task message. func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error { msgToRemove, err := base.EncodeMessage(msg) @@ -376,7 +376,7 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e failedKey := base.FailedKey(msg.Queue, now) expireAt := now.Add(statsTTL) return retryCmd.Run(r.client, - []string{base.InProgressKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.RetryKey(msg.Queue), processedKey, failedKey}, + []string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.RetryKey(msg.Queue), processedKey, failedKey}, msgToRemove, msgToAdd, processAt.Unix(), expireAt.Unix()).Err() } @@ -385,12 +385,12 @@ const ( deadExpirationInDays = 90 ) -// KEYS[1] -> asynq:{}:in_progress +// KEYS[1] -> asynq:{}:active // KEYS[2] -> asynq:{}:deadlines // KEYS[3] -> asynq:{}:dead // KEYS[4] -> asynq:{}:processed: // KEYS[5] -> asynq:{}:failed: -// ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue +// ARGV[1] -> base.TaskMessage value to remove from base.ActiveQueue queue // ARGV[2] -> base.TaskMessage value to add to Dead queue // ARGV[3] -> died_at UNIX timestamp // ARGV[4] -> cutoff timestamp (e.g., 90 days ago) @@ -416,7 +416,7 @@ if tonumber(m) == 1 then end return redis.status_reply("OK")`) -// Kill sends the task to "dead" queue from in-progress queue, assigning +// Kill sends the task to "dead" queue from active queue, assigning // the error message to the task. // It also trims the set by timestamp and set size. func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { @@ -436,7 +436,7 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { failedKey := base.FailedKey(msg.Queue, now) expireAt := now.Add(statsTTL) return killCmd.Run(r.client, - []string{base.InProgressKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.DeadKey(msg.Queue), processedKey, failedKey}, + []string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.DeadKey(msg.Queue), processedKey, failedKey}, msgToRemove, msgToAdd, now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Err() } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index d3c2af6..5202f79 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -167,14 +167,14 @@ func TestDequeue(t *testing.T) { t3Deadline := now.Unix() + t3.Timeout // use whichever is earliest tests := []struct { - pending map[string][]*base.TaskMessage - args []string // list of queues to query - wantMsg *base.TaskMessage - wantDeadline time.Time - err error - wantPending map[string][]*base.TaskMessage - wantInProgress map[string][]*base.TaskMessage - wantDeadlines map[string][]base.Z + pending map[string][]*base.TaskMessage + args []string // list of queues to query + wantMsg *base.TaskMessage + wantDeadline time.Time + err error + wantPending map[string][]*base.TaskMessage + wantActive map[string][]*base.TaskMessage + wantDeadlines map[string][]base.Z }{ { pending: map[string][]*base.TaskMessage{ @@ -187,7 +187,7 @@ func TestDequeue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": {}, }, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {t1}, }, wantDeadlines: map[string][]base.Z{ @@ -205,7 +205,7 @@ func TestDequeue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": {}, }, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {}, }, wantDeadlines: map[string][]base.Z{ @@ -227,7 +227,7 @@ func TestDequeue(t *testing.T) { "critical": {}, "low": {t3}, }, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {}, "critical": {t2}, "low": {}, @@ -253,7 +253,7 @@ func TestDequeue(t *testing.T) { "critical": {}, "low": {t2, t1}, }, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {t3}, "critical": {}, "low": {}, @@ -279,7 +279,7 @@ func TestDequeue(t *testing.T) { "critical": {}, "low": {}, }, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, "low": {}, @@ -319,10 +319,10 @@ func TestDequeue(t *testing.T) { t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff) } } - for queue, want := range tc.wantInProgress { - gotInProgress := h.GetInProgressMessages(t, r.client, queue) - if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressKey(queue), diff) + for queue, want := range tc.wantActive { + gotActive := h.GetActiveMessages(t, r.client, queue) + if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.ActiveKey(queue), diff) } } for queue, want := range tc.wantDeadlines { @@ -354,13 +354,13 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { } tests := []struct { - paused []string // list of paused queues - pending map[string][]*base.TaskMessage - args []string // list of queues to query - wantMsg *base.TaskMessage - err error - wantPending map[string][]*base.TaskMessage - wantInProgress map[string][]*base.TaskMessage + paused []string // list of paused queues + pending map[string][]*base.TaskMessage + args []string // list of queues to query + wantMsg *base.TaskMessage + err error + wantPending map[string][]*base.TaskMessage + wantActive map[string][]*base.TaskMessage }{ { paused: []string{"default"}, @@ -375,7 +375,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { "default": {t1}, "critical": {}, }, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {}, "critical": {t2}, }, @@ -391,7 +391,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": {t1}, }, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {}, }, }, @@ -408,7 +408,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { "default": {t1}, "critical": {t2}, }, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, }, @@ -437,10 +437,10 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff) } } - for queue, want := range tc.wantInProgress { - gotInProgress := h.GetInProgressMessages(t, r.client, queue) - if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressKey(queue), diff) + for queue, want := range tc.wantActive { + gotActive := h.GetActiveMessages(t, r.client, queue) + if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.ActiveKey(queue), diff) } } } @@ -479,12 +479,12 @@ func TestDone(t *testing.T) { t3Deadline := now.Unix() + t3.Deadline tests := []struct { - desc string - inProgress map[string][]*base.TaskMessage // initial state of the in-progress list - deadlines map[string][]base.Z // initial state of deadlines set - target *base.TaskMessage // task to remove - wantInProgress map[string][]*base.TaskMessage // final state of the in-progress list - wantDeadlines map[string][]base.Z // final state of the deadline set + desc string + inProgress map[string][]*base.TaskMessage // initial state of the active list + deadlines map[string][]base.Z // initial state of deadlines set + target *base.TaskMessage // task to remove + wantActive map[string][]*base.TaskMessage // final state of the active list + wantDeadlines map[string][]base.Z // final state of the deadline set }{ { desc: "removes message from the correct queue", @@ -497,7 +497,7 @@ func TestDone(t *testing.T) { "custom": {{Message: t2, Score: t2Deadline}}, }, target: t1, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {}, "custom": {t2}, }, @@ -515,7 +515,7 @@ func TestDone(t *testing.T) { "default": {{Message: t1, Score: t1Deadline}}, }, target: t1, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {}, }, wantDeadlines: map[string][]base.Z{ @@ -533,7 +533,7 @@ func TestDone(t *testing.T) { "custom": {{Message: t2, Score: t2Deadline}}, }, target: t3, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {t1}, "custom": {t2}, }, @@ -547,7 +547,7 @@ func TestDone(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllDeadlines(t, r.client, tc.deadlines) - h.SeedAllInProgressQueues(t, r.client, tc.inProgress) + h.SeedAllActiveQueues(t, r.client, tc.inProgress) for _, msgs := range tc.inProgress { for _, msg := range msgs { // Set uniqueness lock if unique key is present. @@ -566,10 +566,10 @@ func TestDone(t *testing.T) { continue } - for queue, want := range tc.wantInProgress { - gotInProgress := h.GetInProgressMessages(t, r.client, queue) - if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.InProgressKey(queue), diff) + for queue, want := range tc.wantActive { + gotActive := h.GetActiveMessages(t, r.client, queue) + if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" { + t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.ActiveKey(queue), diff) continue } } @@ -627,13 +627,13 @@ func TestRequeue(t *testing.T) { t3Deadline := now.Unix() + t3.Timeout tests := []struct { - pending map[string][]*base.TaskMessage // initial state of queues - inProgress map[string][]*base.TaskMessage // initial state of the in-progress list - deadlines map[string][]base.Z // initial state of the deadlines set - target *base.TaskMessage // task to requeue - wantPending map[string][]*base.TaskMessage // final state of queues - wantInProgress map[string][]*base.TaskMessage // final state of the in-progress list - wantDeadlines map[string][]base.Z // final state of the deadlines set + pending map[string][]*base.TaskMessage // initial state of queues + inProgress map[string][]*base.TaskMessage // initial state of the active list + deadlines map[string][]base.Z // initial state of the deadlines set + target *base.TaskMessage // task to requeue + wantPending map[string][]*base.TaskMessage // final state of queues + wantActive map[string][]*base.TaskMessage // final state of the active list + wantDeadlines map[string][]base.Z // final state of the deadlines set }{ { pending: map[string][]*base.TaskMessage{ @@ -652,7 +652,7 @@ func TestRequeue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": {t1}, }, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {t2}, }, wantDeadlines: map[string][]base.Z{ @@ -677,7 +677,7 @@ func TestRequeue(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": {t1, t2}, }, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {}, }, wantDeadlines: map[string][]base.Z{ @@ -702,7 +702,7 @@ func TestRequeue(t *testing.T) { "default": {t1}, "critical": {t3}, }, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {t2}, "critical": {}, }, @@ -716,7 +716,7 @@ func TestRequeue(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllPendingQueues(t, r.client, tc.pending) - h.SeedAllInProgressQueues(t, r.client, tc.inProgress) + h.SeedAllActiveQueues(t, r.client, tc.inProgress) h.SeedAllDeadlines(t, r.client, tc.deadlines) err := r.Requeue(tc.target) @@ -731,10 +731,10 @@ func TestRequeue(t *testing.T) { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff) } } - for qname, want := range tc.wantInProgress { - gotInProgress := h.GetInProgressMessages(t, r.client, qname) - if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.InProgressKey(qname), diff) + for qname, want := range tc.wantActive { + gotActive := h.GetActiveMessages(t, r.client, qname) + if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.ActiveKey(qname), diff) } } for qname, want := range tc.wantDeadlines { @@ -877,15 +877,15 @@ func TestRetry(t *testing.T) { errMsg := "SMTP server is not responding" tests := []struct { - inProgress map[string][]*base.TaskMessage - deadlines map[string][]base.Z - retry map[string][]base.Z - msg *base.TaskMessage - processAt time.Time - errMsg string - wantInProgress map[string][]*base.TaskMessage - wantDeadlines map[string][]base.Z - wantRetry map[string][]base.Z + inProgress map[string][]*base.TaskMessage + deadlines map[string][]base.Z + retry map[string][]base.Z + msg *base.TaskMessage + processAt time.Time + errMsg string + wantActive map[string][]*base.TaskMessage + wantDeadlines map[string][]base.Z + wantRetry map[string][]base.Z }{ { inProgress: map[string][]*base.TaskMessage{ @@ -900,7 +900,7 @@ func TestRetry(t *testing.T) { msg: t1, processAt: now.Add(5 * time.Minute), errMsg: errMsg, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {t2}, }, wantDeadlines: map[string][]base.Z{ @@ -929,7 +929,7 @@ func TestRetry(t *testing.T) { msg: t4, processAt: now.Add(5 * time.Minute), errMsg: errMsg, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {t1, t2}, "custom": {}, }, @@ -948,7 +948,7 @@ func TestRetry(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) - h.SeedAllInProgressQueues(t, r.client, tc.inProgress) + h.SeedAllActiveQueues(t, r.client, tc.inProgress) h.SeedAllDeadlines(t, r.client, tc.deadlines) h.SeedAllRetryQueues(t, r.client, tc.retry) @@ -958,10 +958,10 @@ func TestRetry(t *testing.T) { continue } - for queue, want := range tc.wantInProgress { - gotInProgress := h.GetInProgressMessages(t, r.client, queue) - if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.InProgressKey(queue), diff) + for queue, want := range tc.wantActive { + gotActive := h.GetActiveMessages(t, r.client, queue) + if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ActiveKey(queue), diff) } } for queue, want := range tc.wantDeadlines { @@ -1046,13 +1046,13 @@ func TestKill(t *testing.T) { // TODO(hibiken): add test cases for trimming tests := []struct { - inProgress map[string][]*base.TaskMessage - deadlines map[string][]base.Z - dead map[string][]base.Z - target *base.TaskMessage // task to kill - wantInProgress map[string][]*base.TaskMessage - wantDeadlines map[string][]base.Z - wantDead map[string][]base.Z + inProgress map[string][]*base.TaskMessage + deadlines map[string][]base.Z + dead map[string][]base.Z + target *base.TaskMessage // task to kill + wantActive map[string][]*base.TaskMessage + wantDeadlines map[string][]base.Z + wantDead map[string][]base.Z }{ { inProgress: map[string][]*base.TaskMessage{ @@ -1070,7 +1070,7 @@ func TestKill(t *testing.T) { }, }, target: t1, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {t2}, }, wantDeadlines: map[string][]base.Z{ @@ -1098,7 +1098,7 @@ func TestKill(t *testing.T) { "default": {}, }, target: t1, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {t2, t3}, }, wantDeadlines: map[string][]base.Z{ @@ -1131,7 +1131,7 @@ func TestKill(t *testing.T) { "custom": {}, }, target: t4, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {t1}, "custom": {}, }, @@ -1150,7 +1150,7 @@ func TestKill(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedAllInProgressQueues(t, r.client, tc.inProgress) + h.SeedAllActiveQueues(t, r.client, tc.inProgress) h.SeedAllDeadlines(t, r.client, tc.deadlines) h.SeedAllDeadQueues(t, r.client, tc.dead) @@ -1160,10 +1160,10 @@ func TestKill(t *testing.T) { continue } - for queue, want := range tc.wantInProgress { - gotInProgress := h.GetInProgressMessages(t, r.client, queue) - if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q: (-want, +got)\n%s", base.InProgressKey(queue), diff) + for queue, want := range tc.wantActive { + gotActive := h.GetActiveMessages(t, r.client, queue) + if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want, +got)\n%s", base.ActiveKey(queue), diff) } } for queue, want := range tc.wantDeadlines { @@ -1363,7 +1363,7 @@ func TestListDeadlineExceeded(t *testing.T) { want []*base.TaskMessage }{ { - desc: "with one task in-progress", + desc: "with a single active task", deadlines: map[string][]base.Z{ "default": {{Message: t1, Score: fiveMinutesAgo.Unix()}}, }, @@ -1372,7 +1372,7 @@ func TestListDeadlineExceeded(t *testing.T) { want: []*base.TaskMessage{t1}, }, { - desc: "with multiple tasks in-progress, and one expired", + desc: "with multiple active tasks, and one expired", deadlines: map[string][]base.Z{ "default": { {Message: t1, Score: oneHourAgo.Unix()}, @@ -1387,7 +1387,7 @@ func TestListDeadlineExceeded(t *testing.T) { want: []*base.TaskMessage{t1}, }, { - desc: "with multiple expired tasks in-progress", + desc: "with multiple expired active tasks", deadlines: map[string][]base.Z{ "default": { {Message: t1, Score: oneHourAgo.Unix()}, @@ -1402,7 +1402,7 @@ func TestListDeadlineExceeded(t *testing.T) { want: []*base.TaskMessage{t1, t3}, }, { - desc: "with empty in-progress queue", + desc: "with empty active queue", deadlines: map[string][]base.Z{ "default": {}, "critical": {}, diff --git a/processor.go b/processor.go index fc27e82..faf509d 100644 --- a/processor.go +++ b/processor.go @@ -56,7 +56,7 @@ type processor struct { // abort channel communicates to the in-flight worker goroutines to stop. abort chan struct{} - // cancelations is a set of cancel functions for all in-progress tasks. + // cancelations is a set of cancel functions for all active tasks. cancelations *base.Cancelations starting chan<- *base.TaskMessage @@ -216,9 +216,9 @@ func (p *processor) exec() { return case resErr := <-resCh: // Note: One of three things should happen. - // 1) Done -> Removes the message from InProgress - // 2) Retry -> Removes the message from InProgress & Adds the message to Retry - // 3) Kill -> Removes the message from InProgress & Adds the message to Dead + // 1) Done -> Removes the message from Active + // 2) Retry -> Removes the message from Active & Adds the message to Retry + // 3) Kill -> Removes the message from Active & Adds the message to Dead if resErr != nil { p.retryOrKill(ctx, msg, resErr) return @@ -241,7 +241,7 @@ func (p *processor) requeue(msg *base.TaskMessage) { func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) { err := p.broker.Done(msg) if err != nil { - errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.InProgressKey(msg.Queue), err) + errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.ActiveKey(msg.Queue), err) deadline, ok := ctx.Deadline() if !ok { panic("asynq: internal error: missing deadline in context") @@ -274,7 +274,7 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) { retryAt := time.Now().Add(d) err := p.broker.Retry(msg, retryAt, e.Error()) if err != nil { - errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressKey(msg.Queue), base.RetryKey(msg.Queue)) + errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.RetryKey(msg.Queue)) deadline, ok := ctx.Deadline() if !ok { panic("asynq: internal error: missing deadline in context") @@ -293,7 +293,7 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) { func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) { err := p.broker.Kill(msg, e.Error()) if err != nil { - errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressKey(msg.Queue), base.DeadKey(msg.Queue)) + errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.DeadKey(msg.Queue)) deadline, ok := ctx.Deadline() if !ok { panic("asynq: internal error: missing deadline in context") diff --git a/processor_test.go b/processor_test.go index 7ae498c..268a62b 100644 --- a/processor_test.go +++ b/processor_test.go @@ -118,8 +118,8 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) { } } time.Sleep(2 * time.Second) // wait for two second to allow all pending tasks to be processed. - if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l) + if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l) } p.terminate() @@ -207,10 +207,10 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) { p.start(&sync.WaitGroup{}) // Wait for two second to allow all pending tasks to be processed. time.Sleep(2 * time.Second) - // Make sure no messages are stuck in in-progress list. + // Make sure no messages are stuck in active list. for _, qname := range tc.queues { - if l := r.LLen(base.InProgressKey(qname)).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", base.InProgressKey(qname), l) + if l := r.LLen(base.ActiveKey(qname)).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", base.ActiveKey(qname), l) } } p.terminate() @@ -283,8 +283,8 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) { p.start(&sync.WaitGroup{}) time.Sleep(2 * time.Second) // wait for two second to allow all pending tasks to be processed. - if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l) + if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l) } p.terminate() @@ -397,8 +397,8 @@ func TestProcessorRetry(t *testing.T) { t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.DeadKey(base.DefaultQueueName), diff) } - if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l) + if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l) } if n != tc.wantErrCount { @@ -548,10 +548,10 @@ func TestProcessorWithStrictPriority(t *testing.T) { p.start(&sync.WaitGroup{}) time.Sleep(tc.wait) - // Make sure no tasks are stuck in in-progress list. + // Make sure no tasks are stuck in active list. for _, qname := range tc.queues { - if l := r.LLen(base.InProgressKey(qname)).Val(); l != 0 { - t.Errorf("%q has %d tasks, want 0", base.InProgressKey(qname), l) + if l := r.LLen(base.ActiveKey(qname)).Val(); l != 0 { + t.Errorf("%q has %d tasks, want 0", base.ActiveKey(qname), l) } } p.terminate() diff --git a/recoverer_test.go b/recoverer_test.go index 057dd61..c9773be 100644 --- a/recoverer_test.go +++ b/recoverer_test.go @@ -32,18 +32,18 @@ func TestRecoverer(t *testing.T) { oneHourAgo := now.Add(-1 * time.Hour) tests := []struct { - desc string - inProgress map[string][]*base.TaskMessage - deadlines map[string][]base.Z - retry map[string][]base.Z - dead map[string][]base.Z - wantInProgress map[string][]*base.TaskMessage - wantDeadlines map[string][]base.Z - wantRetry map[string][]*base.TaskMessage - wantDead map[string][]*base.TaskMessage + desc string + inProgress map[string][]*base.TaskMessage + deadlines map[string][]base.Z + retry map[string][]base.Z + dead map[string][]base.Z + wantActive map[string][]*base.TaskMessage + wantDeadlines map[string][]base.Z + wantRetry map[string][]*base.TaskMessage + wantDead map[string][]*base.TaskMessage }{ { - desc: "with one task in-progress", + desc: "with one active task", inProgress: map[string][]*base.TaskMessage{ "default": {t1}, }, @@ -56,7 +56,7 @@ func TestRecoverer(t *testing.T) { dead: map[string][]base.Z{ "default": {}, }, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {}, }, wantDeadlines: map[string][]base.Z{ @@ -87,7 +87,7 @@ func TestRecoverer(t *testing.T) { "default": {}, "critical": {}, }, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, }, @@ -105,7 +105,7 @@ func TestRecoverer(t *testing.T) { }, }, { - desc: "with multiple tasks in-progress, and one expired", + desc: "with multiple active tasks, and one expired", inProgress: map[string][]*base.TaskMessage{ "default": {t1, t2}, "critical": {t3}, @@ -127,7 +127,7 @@ func TestRecoverer(t *testing.T) { "default": {}, "critical": {}, }, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {t2}, "critical": {t3}, }, @@ -145,7 +145,7 @@ func TestRecoverer(t *testing.T) { }, }, { - desc: "with multiple expired tasks in-progress", + desc: "with multiple expired active tasks", inProgress: map[string][]*base.TaskMessage{ "default": {t1, t2}, "critical": {t3}, @@ -167,7 +167,7 @@ func TestRecoverer(t *testing.T) { "default": {}, "cricial": {}, }, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {t2}, "critical": {}, }, @@ -184,7 +184,7 @@ func TestRecoverer(t *testing.T) { }, }, { - desc: "with empty in-progress queue", + desc: "with empty active queue", inProgress: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, @@ -201,7 +201,7 @@ func TestRecoverer(t *testing.T) { "default": {}, "critical": {}, }, - wantInProgress: map[string][]*base.TaskMessage{ + wantActive: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, }, @@ -222,7 +222,7 @@ func TestRecoverer(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) - h.SeedAllInProgressQueues(t, r, tc.inProgress) + h.SeedAllActiveQueues(t, r, tc.inProgress) h.SeedAllDeadlines(t, r, tc.deadlines) h.SeedAllRetryQueues(t, r, tc.retry) h.SeedAllDeadQueues(t, r, tc.dead) @@ -240,10 +240,10 @@ func TestRecoverer(t *testing.T) { time.Sleep(2 * time.Second) recoverer.terminate() - for qname, want := range tc.wantInProgress { - gotInProgress := h.GetInProgressMessages(t, r, qname) - if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" { - t.Errorf("%s; mismatch found in %q; (-want,+got)\n%s", tc.desc, base.InProgressKey(qname), diff) + for qname, want := range tc.wantActive { + gotActive := h.GetActiveMessages(t, r, qname) + if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" { + t.Errorf("%s; mismatch found in %q; (-want,+got)\n%s", tc.desc, base.ActiveKey(qname), diff) } } for qname, want := range tc.wantDeadlines { diff --git a/subscriber.go b/subscriber.go index e9edd8d..aa895f9 100644 --- a/subscriber.go +++ b/subscriber.go @@ -20,7 +20,7 @@ type subscriber struct { // channel to communicate back to the long running "subscriber" goroutine. done chan struct{} - // cancelations hold cancel functions for all in-progress tasks. + // cancelations hold cancel functions for all active tasks. cancelations *base.Cancelations // time to wait before retrying to connect to redis. diff --git a/syncer_test.go b/syncer_test.go index bed924e..6c67d12 100644 --- a/syncer_test.go +++ b/syncer_test.go @@ -23,7 +23,7 @@ func TestSyncer(t *testing.T) { } r := setup(t) rdbClient := rdb.NewRDB(r) - h.SeedInProgressQueue(t, r, inProgress, base.DefaultQueueName) + h.SeedActiveQueue(t, r, inProgress, base.DefaultQueueName) const interval = time.Second syncRequestCh := make(chan *syncRequest) @@ -48,9 +48,9 @@ func TestSyncer(t *testing.T) { time.Sleep(2 * interval) // ensure that syncer runs at least once - gotInProgress := h.GetInProgressMessages(t, r, base.DefaultQueueName) - if l := len(gotInProgress); l != 0 { - t.Errorf("%q has length %d; want 0", base.InProgressKey(base.DefaultQueueName), l) + gotActive := h.GetActiveMessages(t, r, base.DefaultQueueName) + if l := len(gotActive); l != 0 { + t.Errorf("%q has length %d; want 0", base.ActiveKey(base.DefaultQueueName), l) } } diff --git a/tools/asynq/cmd/queue.go b/tools/asynq/cmd/queue.go index 61a3b96..ef1e0dc 100644 --- a/tools/asynq/cmd/queue.go +++ b/tools/asynq/cmd/queue.go @@ -145,9 +145,9 @@ func printQueueStats(s *asynq.QueueStats) { fmt.Printf("Paused: %t\n\n", s.Paused) fmt.Println("Task Breakdown:") printTable( - []string{"InProgress", "Pending", "Scheduled", "Retry", "Dead"}, + []string{"Active", "Pending", "Scheduled", "Retry", "Dead"}, func(w io.Writer, tmpl string) { - fmt.Fprintf(w, tmpl, s.InProgress, s.Pending, s.Scheduled, s.Retry, s.Dead) + fmt.Fprintf(w, tmpl, s.Active, s.Pending, s.Scheduled, s.Retry, s.Dead) }, ) fmt.Println() diff --git a/tools/asynq/cmd/stats.go b/tools/asynq/cmd/stats.go index 3103b35..1a9cb73 100644 --- a/tools/asynq/cmd/stats.go +++ b/tools/asynq/cmd/stats.go @@ -51,14 +51,14 @@ func init() { } type AggregateStats struct { - InProgress int - Pending int - Scheduled int - Retry int - Dead int - Processed int - Failed int - Timestamp time.Time + Active int + Pending int + Scheduled int + Retry int + Dead int + Processed int + Failed int + Timestamp time.Time } func stats(cmd *cobra.Command, args []string) { @@ -78,7 +78,7 @@ func stats(cmd *cobra.Command, args []string) { fmt.Println(err) os.Exit(1) } - aggStats.InProgress += s.InProgress + aggStats.Active += s.Active aggStats.Pending += s.Pending aggStats.Scheduled += s.Scheduled aggStats.Retry += s.Retry @@ -113,9 +113,9 @@ func stats(cmd *cobra.Command, args []string) { func printStatsByState(s *AggregateStats) { format := strings.Repeat("%v\t", 5) + "\n" tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) - fmt.Fprintf(tw, format, "InProgress", "Pending", "Scheduled", "Retry", "Dead") + fmt.Fprintf(tw, format, "Active", "Pending", "Scheduled", "Retry", "Dead") fmt.Fprintf(tw, format, "----------", "--------", "---------", "-----", "----") - fmt.Fprintf(tw, format, s.InProgress, s.Pending, s.Scheduled, s.Retry, s.Dead) + fmt.Fprintf(tw, format, s.Active, s.Pending, s.Scheduled, s.Retry, s.Dead) tw.Flush() } diff --git a/tools/asynq/cmd/task.go b/tools/asynq/cmd/task.go index 4dcf5d0..dc90e3b 100644 --- a/tools/asynq/cmd/task.go +++ b/tools/asynq/cmd/task.go @@ -74,7 +74,7 @@ var taskListCmd = &cobra.Command{ Long: `List tasks of the given state from the specified queue. The value for the state flag should be one of: -- in-progress +- active - pending - scheduled - retry @@ -95,7 +95,7 @@ To list the tasks from the second page, run var taskCancelCmd = &cobra.Command{ Use: "cancel TASK_ID [TASK_ID...]", - Short: "Cancel one or more in-progress tasks", + Short: "Cancel one or more active tasks", Args: cobra.MinimumNArgs(1), Run: taskCancel, } @@ -165,8 +165,8 @@ func taskList(cmd *cobra.Command, args []string) { } switch state { - case "in-progress": - listInProgressTasks(qname, pageNum, pageSize) + case "active": + listActiveTasks(qname, pageNum, pageSize) case "pending": listPendingTasks(qname, pageNum, pageSize) case "scheduled": @@ -181,15 +181,15 @@ func taskList(cmd *cobra.Command, args []string) { } } -func listInProgressTasks(qname string, pageNum, pageSize int) { +func listActiveTasks(qname string, pageNum, pageSize int) { i := createInspector() - tasks, err := i.ListInProgressTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) + tasks, err := i.ListActiveTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { fmt.Println(err) os.Exit(1) } if len(tasks) == 0 { - fmt.Printf("No in-progress tasks in %q queue\n", qname) + fmt.Printf("No active tasks in %q queue\n", qname) return } printTable(