2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-25 10:56:12 +08:00

Rename InProgress to Active

This commit is contained in:
Ken Hibino
2020-09-05 12:43:15 -07:00
parent d849d33d0b
commit 8385383760
17 changed files with 304 additions and 305 deletions

View File

@@ -37,12 +37,12 @@ type QueueStats struct {
// Name of the queue.
Queue string
// Size is the total number of tasks in the queue.
// The value is the sum of Pending, InProgress, Scheduled, Retry, and Dead.
// The value is the sum of Pending, Active, Scheduled, Retry, and Dead.
Size int
// Number of pending tasks.
Pending int
// Number of in-progress tasks.
InProgress int
// Number of active tasks.
Active int
// Number of scheduled tasks.
Scheduled int
// Number of retry tasks.
@@ -71,17 +71,17 @@ func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) {
return nil, err
}
return &QueueStats{
Queue: stats.Queue,
Size: stats.Size,
Pending: stats.Pending,
InProgress: stats.InProgress,
Scheduled: stats.Scheduled,
Retry: stats.Retry,
Dead: stats.Dead,
Processed: stats.Processed,
Failed: stats.Failed,
Paused: stats.Paused,
Timestamp: stats.Timestamp,
Queue: stats.Queue,
Size: stats.Size,
Pending: stats.Pending,
Active: stats.Active,
Scheduled: stats.Scheduled,
Retry: stats.Retry,
Dead: stats.Dead,
Processed: stats.Processed,
Failed: stats.Failed,
Paused: stats.Paused,
Timestamp: stats.Timestamp,
}, nil
}
@@ -126,8 +126,8 @@ type PendingTask struct {
Queue string
}
// InProgressTask is a task that's currently being processed.
type InProgressTask struct {
// ActiveTask is a task that's currently being processed.
type ActiveTask struct {
*Task
ID string
Queue string
@@ -293,22 +293,22 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi
return tasks, err
}
// ListInProgressTasks retrieves in-progress tasks from the specified queue.
// ListActiveTasks retrieves active tasks from the specified queue.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListInProgressTasks(qname string, opts ...ListOption) ([]*InProgressTask, error) {
func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*ActiveTask, error) {
if err := validateQueueName(qname); err != nil {
return nil, err
}
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListInProgress(qname, pgn)
msgs, err := i.rdb.ListActive(qname, pgn)
if err != nil {
return nil, err
}
var tasks []*InProgressTask
var tasks []*ActiveTask
for _, m := range msgs {
tasks = append(tasks, &InProgressTask{
tasks = append(tasks, &ActiveTask{
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,

View File

@@ -114,17 +114,17 @@ func TestInspectorCurrentStats(t *testing.T) {
},
qname: "default",
want: &QueueStats{
Queue: "default",
Size: 4,
Pending: 1,
InProgress: 1,
Scheduled: 2,
Retry: 0,
Dead: 0,
Processed: 120,
Failed: 2,
Paused: false,
Timestamp: now,
Queue: "default",
Size: 4,
Pending: 1,
Active: 1,
Scheduled: 2,
Retry: 0,
Dead: 0,
Processed: 120,
Failed: 2,
Paused: false,
Timestamp: now,
},
},
}
@@ -132,7 +132,7 @@ func TestInspectorCurrentStats(t *testing.T) {
for _, tc := range tests {
asynqtest.FlushDB(t, r)
asynqtest.SeedAllPendingQueues(t, r, tc.pending)
asynqtest.SeedAllInProgressQueues(t, r, tc.inProgress)
asynqtest.SeedAllActiveQueues(t, r, tc.inProgress)
asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled)
asynqtest.SeedAllRetryQueues(t, r, tc.retry)
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
@@ -291,7 +291,7 @@ func TestInspectorListPendingTasks(t *testing.T) {
}
}
func TestInspectorListInProgressTasks(t *testing.T) {
func TestInspectorListActiveTasks(t *testing.T) {
r := setup(t)
m1 := asynqtest.NewTaskMessage("task1", nil)
m2 := asynqtest.NewTaskMessage("task2", nil)
@@ -300,8 +300,8 @@ func TestInspectorListInProgressTasks(t *testing.T) {
inspector := NewInspector(getRedisConnOpt(t))
createInProgressTask := func(msg *base.TaskMessage) *InProgressTask {
return &InProgressTask{
createActiveTask := func(msg *base.TaskMessage) *ActiveTask {
return &ActiveTask{
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
@@ -312,34 +312,34 @@ func TestInspectorListInProgressTasks(t *testing.T) {
desc string
inProgress map[string][]*base.TaskMessage
qname string
want []*InProgressTask
want []*ActiveTask
}{
{
desc: "with a few in-progress tasks",
desc: "with a few active tasks",
inProgress: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3, m4},
},
qname: "default",
want: []*InProgressTask{
createInProgressTask(m1),
createInProgressTask(m2),
want: []*ActiveTask{
createActiveTask(m1),
createActiveTask(m2),
},
},
}
for _, tc := range tests {
asynqtest.FlushDB(t, r)
asynqtest.SeedAllInProgressQueues(t, r, tc.inProgress)
asynqtest.SeedAllActiveQueues(t, r, tc.inProgress)
got, err := inspector.ListInProgressTasks(tc.qname)
got, err := inspector.ListActiveTasks(tc.qname)
if err != nil {
t.Errorf("%s; ListInProgressTasks(%q) returned error: %v", tc.qname, tc.desc, err)
t.Errorf("%s; ListActiveTasks(%q) returned error: %v", tc.qname, tc.desc, err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(Payload{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("%s; ListInProgressTask(%q) = %v, want %v; (-want,+got)\n%s",
t.Errorf("%s; ListActiveTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff)
}
}

View File

@@ -181,11 +181,11 @@ func SeedPendingQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskM
seedRedisList(tb, r, base.QueueKey(qname), msgs)
}
// SeedInProgressQueue initializes the in-progress queue with the given messages.
func SeedInProgressQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) {
// SeedActiveQueue initializes the active queue with the given messages.
func SeedActiveQueue(tb testing.TB, r redis.UniversalClient, msgs []*base.TaskMessage, qname string) {
tb.Helper()
r.SAdd(base.AllQueues, qname)
seedRedisList(tb, r, base.InProgressKey(qname), msgs)
seedRedisList(tb, r, base.ActiveKey(qname), msgs)
}
// SeedScheduledQueue initializes the scheduled queue with the given messages.
@@ -225,10 +225,10 @@ func SeedAllPendingQueues(tb testing.TB, r redis.UniversalClient, pending map[st
}
}
// SeedAllInProgressQueues initializes all of the specified in-progress queues with the given messages.
func SeedAllInProgressQueues(tb testing.TB, r redis.UniversalClient, inprogress map[string][]*base.TaskMessage) {
for q, msgs := range inprogress {
SeedInProgressQueue(tb, r, msgs, q)
// SeedAllActiveQueues initializes all of the specified active queues with the given messages.
func SeedAllActiveQueues(tb testing.TB, r redis.UniversalClient, active map[string][]*base.TaskMessage) {
for q, msgs := range active {
SeedActiveQueue(tb, r, msgs, q)
}
}
@@ -284,10 +284,10 @@ func GetPendingMessages(tb testing.TB, r redis.UniversalClient, qname string) []
return getListMessages(tb, r, base.QueueKey(qname))
}
// GetInProgressMessages returns all in-progress messages in the given queue.
func GetInProgressMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
// GetActiveMessages returns all active messages in the given queue.
func GetActiveMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage {
tb.Helper()
return getListMessages(tb, r, base.InProgressKey(qname))
return getListMessages(tb, r, base.ActiveKey(qname))
}
// GetScheduledMessages returns all scheduled task messages in the given queue.

View File

@@ -40,10 +40,9 @@ func QueueKey(qname string) string {
return fmt.Sprintf("asynq:{%s}", qname)
}
// TODO: Should we rename this to "active"?
// InProgressKey returns a redis key for the in-progress tasks.
func InProgressKey(qname string) string {
return fmt.Sprintf("asynq:{%s}:in_progress", qname)
// ActiveKey returns a redis key for the active tasks.
func ActiveKey(qname string) string {
return fmt.Sprintf("asynq:{%s}:active", qname)
}
// ScheduledKey returns a redis key for the scheduled tasks.
@@ -274,7 +273,7 @@ type WorkerInfo struct {
Started time.Time
}
// Cancelations is a collection that holds cancel functions for all in-progress tasks.
// Cancelations is a collection that holds cancel functions for all active tasks.
//
// Cancelations are safe for concurrent use by multipel goroutines.
type Cancelations struct {

View File

@@ -32,19 +32,19 @@ func TestQueueKey(t *testing.T) {
}
}
func TestInProgressKey(t *testing.T) {
func TestActiveKey(t *testing.T) {
tests := []struct {
qname string
want string
}{
{"default", "asynq:{default}:in_progress"},
{"custom", "asynq:{custom}:in_progress"},
{"default", "asynq:{default}:active"},
{"custom", "asynq:{custom}:active"},
}
for _, tc := range tests {
got := InProgressKey(tc.qname)
got := ActiveKey(tc.qname)
if got != tc.want {
t.Errorf("InProgressKey(%q) = %q, want %q", tc.qname, got, tc.want)
t.Errorf("ActiveKey(%q) = %q, want %q", tc.qname, got, tc.want)
}
}
}

View File

@@ -31,11 +31,11 @@ type Stats struct {
// Size is the total number of tasks in the queue.
Size int
// Number of tasks in each state.
Pending int
InProgress int
Scheduled int
Retry int
Dead int
Pending int
Active int
Scheduled int
Retry int
Dead int
// Total number of tasks processed during the current date.
// The number includes both succeeded and failed tasks.
Processed int
@@ -59,7 +59,7 @@ type DailyStats struct {
}
// KEYS[1] -> asynq:<qname>
// KEYS[2] -> asynq:<qname>:in_progress
// KEYS[2] -> asynq:<qname>:active
// KEYS[3] -> asynq:<qname>:scheduled
// KEYS[4] -> asynq:<qname>:retry
// KEYS[5] -> asynq:<qname>:dead
@@ -108,7 +108,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
now := time.Now()
res, err := currentStatsCmd.Run(r.client, []string{
base.QueueKey(qname),
base.InProgressKey(qname),
base.ActiveKey(qname),
base.ScheduledKey(qname),
base.RetryKey(qname),
base.DeadKey(qname),
@@ -135,8 +135,8 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
case base.QueueKey(qname):
stats.Pending = val
size += val
case base.InProgressKey(qname):
stats.InProgress = val
case base.ActiveKey(qname):
stats.Active = val
size += val
case base.ScheduledKey(qname):
stats.Scheduled = val
@@ -266,12 +266,12 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, er
return r.listMessages(base.QueueKey(qname), pgn)
}
// ListInProgress returns all tasks that are currently being processed for the given queue.
func (r *RDB) ListInProgress(qname string, pgn Pagination) ([]*base.TaskMessage, error) {
// ListActive returns all tasks that are currently being processed for the given queue.
func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, error) {
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listMessages(base.InProgressKey(qname), pgn)
return r.listMessages(base.ActiveKey(qname), pgn)
}
// listMessages returns a list of TaskMessage in Redis list with the given key.
@@ -652,17 +652,17 @@ func (e *ErrQueueNotEmpty) Error() string {
return fmt.Sprintf("queue %q is not empty", e.qname)
}
// Only check whether in-progress queue is empty before removing.
// Only check whether active queue is empty before removing.
// KEYS[1] -> asynq:{<qname>}
// KEYS[2] -> asynq:{<qname>}:in_progress
// KEYS[2] -> asynq:{<qname>}:active
// KEYS[3] -> asynq:{<qname>}:scheduled
// KEYS[4] -> asynq:{<qname>}:retry
// KEYS[5] -> asynq:{<qname>}:dead
// KEYS[6] -> asynq:{<qname>}:deadlines
var removeQueueForceCmd = redis.NewScript(`
local inprogress = redis.call("LLEN", KEYS[2])
if inprogress > 0 then
return redis.error_reply("Queue has tasks in-progress")
local active = redis.call("LLEN", KEYS[2])
if active > 0 then
return redis.error_reply("Queue has tasks active")
end
redis.call("DEL", KEYS[1])
redis.call("DEL", KEYS[2])
@@ -674,18 +674,18 @@ return redis.status_reply("OK")`)
// Checks whether queue is empty before removing.
// KEYS[1] -> asynq:{<qname>}
// KEYS[2] -> asynq:{<qname>}:in_progress
// KEYS[2] -> asynq:{<qname>}:active
// KEYS[3] -> asynq:{<qname>}:scheduled
// KEYS[4] -> asynq:{<qname>}:retry
// KEYS[5] -> asynq:{<qname>}:dead
// KEYS[6] -> asynq:{<qname>}:deadlines
var removeQueueCmd = redis.NewScript(`
local pending = redis.call("LLEN", KEYS[1])
local inprogress = redis.call("LLEN", KEYS[2])
local active = redis.call("LLEN", KEYS[2])
local scheduled = redis.call("SCARD", KEYS[3])
local retry = redis.call("SCARD", KEYS[4])
local dead = redis.call("SCARD", KEYS[5])
local total = pending + inprogress + scheduled + retry + dead
local total = pending + active + scheduled + retry + dead
if total > 0 then
return redis.error_reply("QUEUE NOT EMPTY")
end
@@ -700,7 +700,7 @@ return redis.status_reply("OK")`)
// RemoveQueue removes the specified queue.
//
// If force is set to true, it will remove the queue regardless
// as long as no tasks are in-progress for the queue.
// as long as no tasks are active for the queue.
// If force is set to false, it will only remove the queue if
// the queue is empty.
func (r *RDB) RemoveQueue(qname string, force bool) error {
@@ -719,7 +719,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
}
keys := []string{
base.QueueKey(qname),
base.InProgressKey(qname),
base.ActiveKey(qname),
base.ScheduledKey(qname),
base.RetryKey(qname),
base.DeadKey(qname),

View File

@@ -110,17 +110,17 @@ func TestCurrentStats(t *testing.T) {
paused: []string{},
qname: "default",
want: &Stats{
Queue: "default",
Paused: false,
Size: 4,
Pending: 1,
InProgress: 1,
Scheduled: 2,
Retry: 0,
Dead: 0,
Processed: 120,
Failed: 2,
Timestamp: now,
Queue: "default",
Paused: false,
Size: 4,
Pending: 1,
Active: 1,
Scheduled: 2,
Retry: 0,
Dead: 0,
Processed: 120,
Failed: 2,
Timestamp: now,
},
},
{
@@ -165,17 +165,17 @@ func TestCurrentStats(t *testing.T) {
paused: []string{"critical", "low"},
qname: "critical",
want: &Stats{
Queue: "critical",
Paused: true,
Size: 1,
Pending: 1,
InProgress: 0,
Scheduled: 0,
Retry: 0,
Dead: 0,
Processed: 100,
Failed: 0,
Timestamp: now,
Queue: "critical",
Paused: true,
Size: 1,
Pending: 1,
Active: 0,
Scheduled: 0,
Retry: 0,
Dead: 0,
Processed: 100,
Failed: 0,
Timestamp: now,
},
},
}
@@ -188,7 +188,7 @@ func TestCurrentStats(t *testing.T) {
}
}
h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllInProgressQueues(t, r.client, tc.inProgress)
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllRetryQueues(t, r.client, tc.retry)
h.SeedAllDeadQueues(t, r.client, tc.dead)
@@ -433,7 +433,7 @@ func TestListPendingPagination(t *testing.T) {
}
}
func TestListInProgress(t *testing.T) {
func TestListActive(t *testing.T) {
r := setup(t)
m1 := h.NewTaskMessage("task1", nil)
@@ -466,10 +466,10 @@ func TestListInProgress(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllInProgressQueues(t, r.client, tc.inProgress)
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
got, err := r.ListInProgress(tc.qname, Pagination{Size: 20, Page: 0})
op := fmt.Sprintf("r.ListInProgress(%q, Pagination{Size: 20, Page: 0})", tc.qname)
got, err := r.ListActive(tc.qname, Pagination{Size: 20, Page: 0})
op := fmt.Sprintf("r.ListActive(%q, Pagination{Size: 20, Page: 0})", tc.qname)
if err != nil {
t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.inProgress)
continue
@@ -481,14 +481,14 @@ func TestListInProgress(t *testing.T) {
}
}
func TestListInProgressPagination(t *testing.T) {
func TestListActivePagination(t *testing.T) {
r := setup(t)
var msgs []*base.TaskMessage
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
msgs = append(msgs, msg)
}
h.SeedInProgressQueue(t, r.client, msgs, "default")
h.SeedActiveQueue(t, r.client, msgs, "default")
tests := []struct {
desc string
@@ -507,8 +507,8 @@ func TestListInProgressPagination(t *testing.T) {
}
for _, tc := range tests {
got, err := r.ListInProgress(tc.qname, Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListInProgress(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
got, err := r.ListActive(tc.qname, Pagination{Size: tc.size, Page: tc.page})
op := fmt.Sprintf("r.ListActive(%q, Pagination{Size: %d, Page: %d})", tc.qname, tc.size, tc.page)
if err != nil {
t.Errorf("%s; %s returned error %v", tc.desc, op, err)
continue
@@ -2669,7 +2669,7 @@ func TestRemoveQueue(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllInProgressQueues(t, r.client, tc.inProgress)
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllRetryQueues(t, r.client, tc.retry)
h.SeedAllDeadQueues(t, r.client, tc.dead)
@@ -2686,7 +2686,7 @@ func TestRemoveQueue(t *testing.T) {
keys := []string{
base.QueueKey(tc.qname),
base.InProgressKey(tc.qname),
base.ActiveKey(tc.qname),
base.DeadlinesKey(tc.qname),
base.ScheduledKey(tc.qname),
base.RetryKey(tc.qname),
@@ -2768,7 +2768,7 @@ func TestRemoveQueueError(t *testing.T) {
force: false,
},
{
desc: "force removing queue with tasks in-progress",
desc: "force removing queue with active tasks",
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3},
@@ -2790,7 +2790,7 @@ func TestRemoveQueueError(t *testing.T) {
"custom": {},
},
qname: "custom",
// Even with force=true, it should error if there are tasks in-progress.
// Even with force=true, it should error if there are active tasks.
force: true,
},
}
@@ -2798,7 +2798,7 @@ func TestRemoveQueueError(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllInProgressQueues(t, r.client, tc.inProgress)
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllRetryQueues(t, r.client, tc.retry)
h.SeedAllDeadQueues(t, r.client, tc.dead)
@@ -2817,9 +2817,9 @@ func TestRemoveQueueError(t *testing.T) {
}
}
for qname, want := range tc.inProgress {
gotInProgress := h.GetInProgressMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.InProgressKey(qname), diff)
gotActive := h.GetActiveMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" {
t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.ActiveKey(qname), diff)
}
}
for qname, want := range tc.scheduled {

View File

@@ -120,7 +120,7 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti
// KEYS[1] -> asynq:{<qname>}
// KEYS[2] -> asynq:{<qname>}:paused
// KEYS[3] -> asynq:{<qname>}:in_progress
// KEYS[3] -> asynq:{<qname>}:active
// KEYS[4] -> asynq:{<qname>}:deadlines
// ARGV[1] -> current time in Unix time
//
@@ -156,7 +156,7 @@ func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err err
keys := []string{
base.QueueKey(qname),
base.PausedKey(qname),
base.InProgressKey(qname),
base.ActiveKey(qname),
base.DeadlinesKey(qname),
}
res, err := dequeueCmd.Run(r.client, keys, time.Now().Unix()).Result()
@@ -183,7 +183,7 @@ func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err err
return "", 0, ErrNoProcessableTask
}
// KEYS[1] -> asynq:{<qname>}:in_progress
// KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// ARGV[1] -> base.TaskMessage value
@@ -202,7 +202,7 @@ end
return redis.status_reply("OK")
`)
// KEYS[1] -> asynq:{<qname>}:in_progress
// KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[4] -> unique key
@@ -226,7 +226,7 @@ end
return redis.status_reply("OK")
`)
// Done removes the task from in-progress queue to mark the task as done.
// Done removes the task from active queue to mark the task as done.
// It removes a uniqueness lock acquired by the task, if any.
func (r *RDB) Done(msg *base.TaskMessage) error {
encoded, err := base.EncodeMessage(msg)
@@ -236,7 +236,7 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
now := time.Now()
expireAt := now.Add(statsTTL)
keys := []string{
base.InProgressKey(msg.Queue),
base.ActiveKey(msg.Queue),
base.DeadlinesKey(msg.Queue),
base.ProcessedKey(msg.Queue, now),
}
@@ -249,7 +249,7 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
return doneCmd.Run(r.client, keys, args...).Err()
}
// KEYS[1] -> asynq:{<qname>}:in_progress
// KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}
// ARGV[1] -> base.TaskMessage value
@@ -264,14 +264,14 @@ end
redis.call("RPUSH", KEYS[3], ARGV[1])
return redis.status_reply("OK")`)
// Requeue moves the task from in-progress queue to the specified queue.
// Requeue moves the task from active queue to the specified queue.
func (r *RDB) Requeue(msg *base.TaskMessage) error {
encoded, err := base.EncodeMessage(msg)
if err != nil {
return err
}
return requeueCmd.Run(r.client,
[]string{base.InProgressKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.QueueKey(msg.Queue)},
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.QueueKey(msg.Queue)},
encoded).Err()
}
@@ -330,12 +330,12 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim
return nil
}
// KEYS[1] -> asynq:{<qname>}:in_progress
// KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:retry
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[5] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
// ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue
// ARGV[1] -> base.TaskMessage value to remove from base.ActiveQueue queue
// ARGV[2] -> base.TaskMessage value to add to Retry queue
// ARGV[3] -> retry_at UNIX timestamp
// ARGV[4] -> stats expiration timestamp
@@ -357,7 +357,7 @@ if tonumber(m) == 1 then
end
return redis.status_reply("OK")`)
// Retry moves the task from in-progress to retry queue, incrementing retry count
// Retry moves the task from active to retry queue, incrementing retry count
// and assigning error message to the task message.
func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error {
msgToRemove, err := base.EncodeMessage(msg)
@@ -376,7 +376,7 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e
failedKey := base.FailedKey(msg.Queue, now)
expireAt := now.Add(statsTTL)
return retryCmd.Run(r.client,
[]string{base.InProgressKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.RetryKey(msg.Queue), processedKey, failedKey},
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.RetryKey(msg.Queue), processedKey, failedKey},
msgToRemove, msgToAdd, processAt.Unix(), expireAt.Unix()).Err()
}
@@ -385,12 +385,12 @@ const (
deadExpirationInDays = 90
)
// KEYS[1] -> asynq:{<qname>}:in_progress
// KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:dead
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[5] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
// ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue
// ARGV[1] -> base.TaskMessage value to remove from base.ActiveQueue queue
// ARGV[2] -> base.TaskMessage value to add to Dead queue
// ARGV[3] -> died_at UNIX timestamp
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
@@ -416,7 +416,7 @@ if tonumber(m) == 1 then
end
return redis.status_reply("OK")`)
// Kill sends the task to "dead" queue from in-progress queue, assigning
// Kill sends the task to "dead" queue from active queue, assigning
// the error message to the task.
// It also trims the set by timestamp and set size.
func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
@@ -436,7 +436,7 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
failedKey := base.FailedKey(msg.Queue, now)
expireAt := now.Add(statsTTL)
return killCmd.Run(r.client,
[]string{base.InProgressKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.DeadKey(msg.Queue), processedKey, failedKey},
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.DeadKey(msg.Queue), processedKey, failedKey},
msgToRemove, msgToAdd, now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Err()
}

View File

@@ -167,14 +167,14 @@ func TestDequeue(t *testing.T) {
t3Deadline := now.Unix() + t3.Timeout // use whichever is earliest
tests := []struct {
pending map[string][]*base.TaskMessage
args []string // list of queues to query
wantMsg *base.TaskMessage
wantDeadline time.Time
err error
wantPending map[string][]*base.TaskMessage
wantInProgress map[string][]*base.TaskMessage
wantDeadlines map[string][]base.Z
pending map[string][]*base.TaskMessage
args []string // list of queues to query
wantMsg *base.TaskMessage
wantDeadline time.Time
err error
wantPending map[string][]*base.TaskMessage
wantActive map[string][]*base.TaskMessage
wantDeadlines map[string][]base.Z
}{
{
pending: map[string][]*base.TaskMessage{
@@ -187,7 +187,7 @@ func TestDequeue(t *testing.T) {
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {t1},
},
wantDeadlines: map[string][]base.Z{
@@ -205,7 +205,7 @@ func TestDequeue(t *testing.T) {
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {},
},
wantDeadlines: map[string][]base.Z{
@@ -227,7 +227,7 @@ func TestDequeue(t *testing.T) {
"critical": {},
"low": {t3},
},
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {},
"critical": {t2},
"low": {},
@@ -253,7 +253,7 @@ func TestDequeue(t *testing.T) {
"critical": {},
"low": {t2, t1},
},
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {t3},
"critical": {},
"low": {},
@@ -279,7 +279,7 @@ func TestDequeue(t *testing.T) {
"critical": {},
"low": {},
},
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
"low": {},
@@ -319,10 +319,10 @@ func TestDequeue(t *testing.T) {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff)
}
}
for queue, want := range tc.wantInProgress {
gotInProgress := h.GetInProgressMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressKey(queue), diff)
for queue, want := range tc.wantActive {
gotActive := h.GetActiveMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.ActiveKey(queue), diff)
}
}
for queue, want := range tc.wantDeadlines {
@@ -354,13 +354,13 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
}
tests := []struct {
paused []string // list of paused queues
pending map[string][]*base.TaskMessage
args []string // list of queues to query
wantMsg *base.TaskMessage
err error
wantPending map[string][]*base.TaskMessage
wantInProgress map[string][]*base.TaskMessage
paused []string // list of paused queues
pending map[string][]*base.TaskMessage
args []string // list of queues to query
wantMsg *base.TaskMessage
err error
wantPending map[string][]*base.TaskMessage
wantActive map[string][]*base.TaskMessage
}{
{
paused: []string{"default"},
@@ -375,7 +375,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
"default": {t1},
"critical": {},
},
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {},
"critical": {t2},
},
@@ -391,7 +391,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
wantPending: map[string][]*base.TaskMessage{
"default": {t1},
},
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {},
},
},
@@ -408,7 +408,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
"default": {t1},
"critical": {t2},
},
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
},
@@ -437,10 +437,10 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff)
}
}
for queue, want := range tc.wantInProgress {
gotInProgress := h.GetInProgressMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressKey(queue), diff)
for queue, want := range tc.wantActive {
gotActive := h.GetActiveMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.ActiveKey(queue), diff)
}
}
}
@@ -479,12 +479,12 @@ func TestDone(t *testing.T) {
t3Deadline := now.Unix() + t3.Deadline
tests := []struct {
desc string
inProgress map[string][]*base.TaskMessage // initial state of the in-progress list
deadlines map[string][]base.Z // initial state of deadlines set
target *base.TaskMessage // task to remove
wantInProgress map[string][]*base.TaskMessage // final state of the in-progress list
wantDeadlines map[string][]base.Z // final state of the deadline set
desc string
inProgress map[string][]*base.TaskMessage // initial state of the active list
deadlines map[string][]base.Z // initial state of deadlines set
target *base.TaskMessage // task to remove
wantActive map[string][]*base.TaskMessage // final state of the active list
wantDeadlines map[string][]base.Z // final state of the deadline set
}{
{
desc: "removes message from the correct queue",
@@ -497,7 +497,7 @@ func TestDone(t *testing.T) {
"custom": {{Message: t2, Score: t2Deadline}},
},
target: t1,
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {},
"custom": {t2},
},
@@ -515,7 +515,7 @@ func TestDone(t *testing.T) {
"default": {{Message: t1, Score: t1Deadline}},
},
target: t1,
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {},
},
wantDeadlines: map[string][]base.Z{
@@ -533,7 +533,7 @@ func TestDone(t *testing.T) {
"custom": {{Message: t2, Score: t2Deadline}},
},
target: t3,
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {t1},
"custom": {t2},
},
@@ -547,7 +547,7 @@ func TestDone(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllInProgressQueues(t, r.client, tc.inProgress)
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
for _, msgs := range tc.inProgress {
for _, msg := range msgs {
// Set uniqueness lock if unique key is present.
@@ -566,10 +566,10 @@ func TestDone(t *testing.T) {
continue
}
for queue, want := range tc.wantInProgress {
gotInProgress := h.GetInProgressMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.InProgressKey(queue), diff)
for queue, want := range tc.wantActive {
gotActive := h.GetActiveMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" {
t.Errorf("%s; mismatch found in %q: (-want, +got):\n%s", tc.desc, base.ActiveKey(queue), diff)
continue
}
}
@@ -627,13 +627,13 @@ func TestRequeue(t *testing.T) {
t3Deadline := now.Unix() + t3.Timeout
tests := []struct {
pending map[string][]*base.TaskMessage // initial state of queues
inProgress map[string][]*base.TaskMessage // initial state of the in-progress list
deadlines map[string][]base.Z // initial state of the deadlines set
target *base.TaskMessage // task to requeue
wantPending map[string][]*base.TaskMessage // final state of queues
wantInProgress map[string][]*base.TaskMessage // final state of the in-progress list
wantDeadlines map[string][]base.Z // final state of the deadlines set
pending map[string][]*base.TaskMessage // initial state of queues
inProgress map[string][]*base.TaskMessage // initial state of the active list
deadlines map[string][]base.Z // initial state of the deadlines set
target *base.TaskMessage // task to requeue
wantPending map[string][]*base.TaskMessage // final state of queues
wantActive map[string][]*base.TaskMessage // final state of the active list
wantDeadlines map[string][]base.Z // final state of the deadlines set
}{
{
pending: map[string][]*base.TaskMessage{
@@ -652,7 +652,7 @@ func TestRequeue(t *testing.T) {
wantPending: map[string][]*base.TaskMessage{
"default": {t1},
},
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {t2},
},
wantDeadlines: map[string][]base.Z{
@@ -677,7 +677,7 @@ func TestRequeue(t *testing.T) {
wantPending: map[string][]*base.TaskMessage{
"default": {t1, t2},
},
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {},
},
wantDeadlines: map[string][]base.Z{
@@ -702,7 +702,7 @@ func TestRequeue(t *testing.T) {
"default": {t1},
"critical": {t3},
},
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {t2},
"critical": {},
},
@@ -716,7 +716,7 @@ func TestRequeue(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllInProgressQueues(t, r.client, tc.inProgress)
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
h.SeedAllDeadlines(t, r.client, tc.deadlines)
err := r.Requeue(tc.target)
@@ -731,10 +731,10 @@ func TestRequeue(t *testing.T) {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
}
}
for qname, want := range tc.wantInProgress {
gotInProgress := h.GetInProgressMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.InProgressKey(qname), diff)
for qname, want := range tc.wantActive {
gotActive := h.GetActiveMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.ActiveKey(qname), diff)
}
}
for qname, want := range tc.wantDeadlines {
@@ -877,15 +877,15 @@ func TestRetry(t *testing.T) {
errMsg := "SMTP server is not responding"
tests := []struct {
inProgress map[string][]*base.TaskMessage
deadlines map[string][]base.Z
retry map[string][]base.Z
msg *base.TaskMessage
processAt time.Time
errMsg string
wantInProgress map[string][]*base.TaskMessage
wantDeadlines map[string][]base.Z
wantRetry map[string][]base.Z
inProgress map[string][]*base.TaskMessage
deadlines map[string][]base.Z
retry map[string][]base.Z
msg *base.TaskMessage
processAt time.Time
errMsg string
wantActive map[string][]*base.TaskMessage
wantDeadlines map[string][]base.Z
wantRetry map[string][]base.Z
}{
{
inProgress: map[string][]*base.TaskMessage{
@@ -900,7 +900,7 @@ func TestRetry(t *testing.T) {
msg: t1,
processAt: now.Add(5 * time.Minute),
errMsg: errMsg,
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {t2},
},
wantDeadlines: map[string][]base.Z{
@@ -929,7 +929,7 @@ func TestRetry(t *testing.T) {
msg: t4,
processAt: now.Add(5 * time.Minute),
errMsg: errMsg,
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {t1, t2},
"custom": {},
},
@@ -948,7 +948,7 @@ func TestRetry(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllInProgressQueues(t, r.client, tc.inProgress)
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllRetryQueues(t, r.client, tc.retry)
@@ -958,10 +958,10 @@ func TestRetry(t *testing.T) {
continue
}
for queue, want := range tc.wantInProgress {
gotInProgress := h.GetInProgressMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.InProgressKey(queue), diff)
for queue, want := range tc.wantActive {
gotActive := h.GetActiveMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ActiveKey(queue), diff)
}
}
for queue, want := range tc.wantDeadlines {
@@ -1046,13 +1046,13 @@ func TestKill(t *testing.T) {
// TODO(hibiken): add test cases for trimming
tests := []struct {
inProgress map[string][]*base.TaskMessage
deadlines map[string][]base.Z
dead map[string][]base.Z
target *base.TaskMessage // task to kill
wantInProgress map[string][]*base.TaskMessage
wantDeadlines map[string][]base.Z
wantDead map[string][]base.Z
inProgress map[string][]*base.TaskMessage
deadlines map[string][]base.Z
dead map[string][]base.Z
target *base.TaskMessage // task to kill
wantActive map[string][]*base.TaskMessage
wantDeadlines map[string][]base.Z
wantDead map[string][]base.Z
}{
{
inProgress: map[string][]*base.TaskMessage{
@@ -1070,7 +1070,7 @@ func TestKill(t *testing.T) {
},
},
target: t1,
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {t2},
},
wantDeadlines: map[string][]base.Z{
@@ -1098,7 +1098,7 @@ func TestKill(t *testing.T) {
"default": {},
},
target: t1,
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {t2, t3},
},
wantDeadlines: map[string][]base.Z{
@@ -1131,7 +1131,7 @@ func TestKill(t *testing.T) {
"custom": {},
},
target: t4,
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {t1},
"custom": {},
},
@@ -1150,7 +1150,7 @@ func TestKill(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllInProgressQueues(t, r.client, tc.inProgress)
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllDeadQueues(t, r.client, tc.dead)
@@ -1160,10 +1160,10 @@ func TestKill(t *testing.T) {
continue
}
for queue, want := range tc.wantInProgress {
gotInProgress := h.GetInProgressMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want, +got)\n%s", base.InProgressKey(queue), diff)
for queue, want := range tc.wantActive {
gotActive := h.GetActiveMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want, +got)\n%s", base.ActiveKey(queue), diff)
}
}
for queue, want := range tc.wantDeadlines {
@@ -1363,7 +1363,7 @@ func TestListDeadlineExceeded(t *testing.T) {
want []*base.TaskMessage
}{
{
desc: "with one task in-progress",
desc: "with a single active task",
deadlines: map[string][]base.Z{
"default": {{Message: t1, Score: fiveMinutesAgo.Unix()}},
},
@@ -1372,7 +1372,7 @@ func TestListDeadlineExceeded(t *testing.T) {
want: []*base.TaskMessage{t1},
},
{
desc: "with multiple tasks in-progress, and one expired",
desc: "with multiple active tasks, and one expired",
deadlines: map[string][]base.Z{
"default": {
{Message: t1, Score: oneHourAgo.Unix()},
@@ -1387,7 +1387,7 @@ func TestListDeadlineExceeded(t *testing.T) {
want: []*base.TaskMessage{t1},
},
{
desc: "with multiple expired tasks in-progress",
desc: "with multiple expired active tasks",
deadlines: map[string][]base.Z{
"default": {
{Message: t1, Score: oneHourAgo.Unix()},
@@ -1402,7 +1402,7 @@ func TestListDeadlineExceeded(t *testing.T) {
want: []*base.TaskMessage{t1, t3},
},
{
desc: "with empty in-progress queue",
desc: "with empty active queue",
deadlines: map[string][]base.Z{
"default": {},
"critical": {},

View File

@@ -56,7 +56,7 @@ type processor struct {
// abort channel communicates to the in-flight worker goroutines to stop.
abort chan struct{}
// cancelations is a set of cancel functions for all in-progress tasks.
// cancelations is a set of cancel functions for all active tasks.
cancelations *base.Cancelations
starting chan<- *base.TaskMessage
@@ -216,9 +216,9 @@ func (p *processor) exec() {
return
case resErr := <-resCh:
// Note: One of three things should happen.
// 1) Done -> Removes the message from InProgress
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead
// 1) Done -> Removes the message from Active
// 2) Retry -> Removes the message from Active & Adds the message to Retry
// 3) Kill -> Removes the message from Active & Adds the message to Dead
if resErr != nil {
p.retryOrKill(ctx, msg, resErr)
return
@@ -241,7 +241,7 @@ func (p *processor) requeue(msg *base.TaskMessage) {
func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
err := p.broker.Done(msg)
if err != nil {
errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.InProgressKey(msg.Queue), err)
errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.ActiveKey(msg.Queue), err)
deadline, ok := ctx.Deadline()
if !ok {
panic("asynq: internal error: missing deadline in context")
@@ -274,7 +274,7 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) {
retryAt := time.Now().Add(d)
err := p.broker.Retry(msg, retryAt, e.Error())
if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressKey(msg.Queue), base.RetryKey(msg.Queue))
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.RetryKey(msg.Queue))
deadline, ok := ctx.Deadline()
if !ok {
panic("asynq: internal error: missing deadline in context")
@@ -293,7 +293,7 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) {
func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) {
err := p.broker.Kill(msg, e.Error())
if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressKey(msg.Queue), base.DeadKey(msg.Queue))
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.DeadKey(msg.Queue))
deadline, ok := ctx.Deadline()
if !ok {
panic("asynq: internal error: missing deadline in context")

View File

@@ -118,8 +118,8 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
}
}
time.Sleep(2 * time.Second) // wait for two second to allow all pending tasks to be processed.
if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l)
if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l)
}
p.terminate()
@@ -207,10 +207,10 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
p.start(&sync.WaitGroup{})
// Wait for two second to allow all pending tasks to be processed.
time.Sleep(2 * time.Second)
// Make sure no messages are stuck in in-progress list.
// Make sure no messages are stuck in active list.
for _, qname := range tc.queues {
if l := r.LLen(base.InProgressKey(qname)).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.InProgressKey(qname), l)
if l := r.LLen(base.ActiveKey(qname)).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.ActiveKey(qname), l)
}
}
p.terminate()
@@ -283,8 +283,8 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
p.start(&sync.WaitGroup{})
time.Sleep(2 * time.Second) // wait for two second to allow all pending tasks to be processed.
if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l)
if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l)
}
p.terminate()
@@ -397,8 +397,8 @@ func TestProcessorRetry(t *testing.T) {
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.DeadKey(base.DefaultQueueName), diff)
}
if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l)
if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.ActiveKey(base.DefaultQueueName), l)
}
if n != tc.wantErrCount {
@@ -548,10 +548,10 @@ func TestProcessorWithStrictPriority(t *testing.T) {
p.start(&sync.WaitGroup{})
time.Sleep(tc.wait)
// Make sure no tasks are stuck in in-progress list.
// Make sure no tasks are stuck in active list.
for _, qname := range tc.queues {
if l := r.LLen(base.InProgressKey(qname)).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.InProgressKey(qname), l)
if l := r.LLen(base.ActiveKey(qname)).Val(); l != 0 {
t.Errorf("%q has %d tasks, want 0", base.ActiveKey(qname), l)
}
}
p.terminate()

View File

@@ -32,18 +32,18 @@ func TestRecoverer(t *testing.T) {
oneHourAgo := now.Add(-1 * time.Hour)
tests := []struct {
desc string
inProgress map[string][]*base.TaskMessage
deadlines map[string][]base.Z
retry map[string][]base.Z
dead map[string][]base.Z
wantInProgress map[string][]*base.TaskMessage
wantDeadlines map[string][]base.Z
wantRetry map[string][]*base.TaskMessage
wantDead map[string][]*base.TaskMessage
desc string
inProgress map[string][]*base.TaskMessage
deadlines map[string][]base.Z
retry map[string][]base.Z
dead map[string][]base.Z
wantActive map[string][]*base.TaskMessage
wantDeadlines map[string][]base.Z
wantRetry map[string][]*base.TaskMessage
wantDead map[string][]*base.TaskMessage
}{
{
desc: "with one task in-progress",
desc: "with one active task",
inProgress: map[string][]*base.TaskMessage{
"default": {t1},
},
@@ -56,7 +56,7 @@ func TestRecoverer(t *testing.T) {
dead: map[string][]base.Z{
"default": {},
},
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {},
},
wantDeadlines: map[string][]base.Z{
@@ -87,7 +87,7 @@ func TestRecoverer(t *testing.T) {
"default": {},
"critical": {},
},
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
},
@@ -105,7 +105,7 @@ func TestRecoverer(t *testing.T) {
},
},
{
desc: "with multiple tasks in-progress, and one expired",
desc: "with multiple active tasks, and one expired",
inProgress: map[string][]*base.TaskMessage{
"default": {t1, t2},
"critical": {t3},
@@ -127,7 +127,7 @@ func TestRecoverer(t *testing.T) {
"default": {},
"critical": {},
},
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {t2},
"critical": {t3},
},
@@ -145,7 +145,7 @@ func TestRecoverer(t *testing.T) {
},
},
{
desc: "with multiple expired tasks in-progress",
desc: "with multiple expired active tasks",
inProgress: map[string][]*base.TaskMessage{
"default": {t1, t2},
"critical": {t3},
@@ -167,7 +167,7 @@ func TestRecoverer(t *testing.T) {
"default": {},
"cricial": {},
},
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {t2},
"critical": {},
},
@@ -184,7 +184,7 @@ func TestRecoverer(t *testing.T) {
},
},
{
desc: "with empty in-progress queue",
desc: "with empty active queue",
inProgress: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
@@ -201,7 +201,7 @@ func TestRecoverer(t *testing.T) {
"default": {},
"critical": {},
},
wantInProgress: map[string][]*base.TaskMessage{
wantActive: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
},
@@ -222,7 +222,7 @@ func TestRecoverer(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r)
h.SeedAllInProgressQueues(t, r, tc.inProgress)
h.SeedAllActiveQueues(t, r, tc.inProgress)
h.SeedAllDeadlines(t, r, tc.deadlines)
h.SeedAllRetryQueues(t, r, tc.retry)
h.SeedAllDeadQueues(t, r, tc.dead)
@@ -240,10 +240,10 @@ func TestRecoverer(t *testing.T) {
time.Sleep(2 * time.Second)
recoverer.terminate()
for qname, want := range tc.wantInProgress {
gotInProgress := h.GetInProgressMessages(t, r, qname)
if diff := cmp.Diff(want, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("%s; mismatch found in %q; (-want,+got)\n%s", tc.desc, base.InProgressKey(qname), diff)
for qname, want := range tc.wantActive {
gotActive := h.GetActiveMessages(t, r, qname)
if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" {
t.Errorf("%s; mismatch found in %q; (-want,+got)\n%s", tc.desc, base.ActiveKey(qname), diff)
}
}
for qname, want := range tc.wantDeadlines {

View File

@@ -20,7 +20,7 @@ type subscriber struct {
// channel to communicate back to the long running "subscriber" goroutine.
done chan struct{}
// cancelations hold cancel functions for all in-progress tasks.
// cancelations hold cancel functions for all active tasks.
cancelations *base.Cancelations
// time to wait before retrying to connect to redis.

View File

@@ -23,7 +23,7 @@ func TestSyncer(t *testing.T) {
}
r := setup(t)
rdbClient := rdb.NewRDB(r)
h.SeedInProgressQueue(t, r, inProgress, base.DefaultQueueName)
h.SeedActiveQueue(t, r, inProgress, base.DefaultQueueName)
const interval = time.Second
syncRequestCh := make(chan *syncRequest)
@@ -48,9 +48,9 @@ func TestSyncer(t *testing.T) {
time.Sleep(2 * interval) // ensure that syncer runs at least once
gotInProgress := h.GetInProgressMessages(t, r, base.DefaultQueueName)
if l := len(gotInProgress); l != 0 {
t.Errorf("%q has length %d; want 0", base.InProgressKey(base.DefaultQueueName), l)
gotActive := h.GetActiveMessages(t, r, base.DefaultQueueName)
if l := len(gotActive); l != 0 {
t.Errorf("%q has length %d; want 0", base.ActiveKey(base.DefaultQueueName), l)
}
}

View File

@@ -145,9 +145,9 @@ func printQueueStats(s *asynq.QueueStats) {
fmt.Printf("Paused: %t\n\n", s.Paused)
fmt.Println("Task Breakdown:")
printTable(
[]string{"InProgress", "Pending", "Scheduled", "Retry", "Dead"},
[]string{"Active", "Pending", "Scheduled", "Retry", "Dead"},
func(w io.Writer, tmpl string) {
fmt.Fprintf(w, tmpl, s.InProgress, s.Pending, s.Scheduled, s.Retry, s.Dead)
fmt.Fprintf(w, tmpl, s.Active, s.Pending, s.Scheduled, s.Retry, s.Dead)
},
)
fmt.Println()

View File

@@ -51,14 +51,14 @@ func init() {
}
type AggregateStats struct {
InProgress int
Pending int
Scheduled int
Retry int
Dead int
Processed int
Failed int
Timestamp time.Time
Active int
Pending int
Scheduled int
Retry int
Dead int
Processed int
Failed int
Timestamp time.Time
}
func stats(cmd *cobra.Command, args []string) {
@@ -78,7 +78,7 @@ func stats(cmd *cobra.Command, args []string) {
fmt.Println(err)
os.Exit(1)
}
aggStats.InProgress += s.InProgress
aggStats.Active += s.Active
aggStats.Pending += s.Pending
aggStats.Scheduled += s.Scheduled
aggStats.Retry += s.Retry
@@ -113,9 +113,9 @@ func stats(cmd *cobra.Command, args []string) {
func printStatsByState(s *AggregateStats) {
format := strings.Repeat("%v\t", 5) + "\n"
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
fmt.Fprintf(tw, format, "InProgress", "Pending", "Scheduled", "Retry", "Dead")
fmt.Fprintf(tw, format, "Active", "Pending", "Scheduled", "Retry", "Dead")
fmt.Fprintf(tw, format, "----------", "--------", "---------", "-----", "----")
fmt.Fprintf(tw, format, s.InProgress, s.Pending, s.Scheduled, s.Retry, s.Dead)
fmt.Fprintf(tw, format, s.Active, s.Pending, s.Scheduled, s.Retry, s.Dead)
tw.Flush()
}

View File

@@ -74,7 +74,7 @@ var taskListCmd = &cobra.Command{
Long: `List tasks of the given state from the specified queue.
The value for the state flag should be one of:
- in-progress
- active
- pending
- scheduled
- retry
@@ -95,7 +95,7 @@ To list the tasks from the second page, run
var taskCancelCmd = &cobra.Command{
Use: "cancel TASK_ID [TASK_ID...]",
Short: "Cancel one or more in-progress tasks",
Short: "Cancel one or more active tasks",
Args: cobra.MinimumNArgs(1),
Run: taskCancel,
}
@@ -165,8 +165,8 @@ func taskList(cmd *cobra.Command, args []string) {
}
switch state {
case "in-progress":
listInProgressTasks(qname, pageNum, pageSize)
case "active":
listActiveTasks(qname, pageNum, pageSize)
case "pending":
listPendingTasks(qname, pageNum, pageSize)
case "scheduled":
@@ -181,15 +181,15 @@ func taskList(cmd *cobra.Command, args []string) {
}
}
func listInProgressTasks(qname string, pageNum, pageSize int) {
func listActiveTasks(qname string, pageNum, pageSize int) {
i := createInspector()
tasks, err := i.ListInProgressTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
tasks, err := i.ListActiveTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if len(tasks) == 0 {
fmt.Printf("No in-progress tasks in %q queue\n", qname)
fmt.Printf("No active tasks in %q queue\n", qname)
return
}
printTable(