diff --git a/inspector.go b/inspector.go index eacc99d..818af65 100644 --- a/inspector.go +++ b/inspector.go @@ -20,7 +20,7 @@ type Inspector struct { rdb *rdb.RDB } -// New returns a new instance of Inspector. +// NewInspector returns a new instance of Inspector. func NewInspector(r RedisConnOpt) *Inspector { return &Inspector{ rdb: rdb.NewRDB(createRedisClient(r)), @@ -42,7 +42,7 @@ type QueueStats struct { // Name of the queue. Queue string // Size is the total number of tasks in the queue. - // The value is the sum of Pending, Active, Scheduled, Retry, and Dead. + // The value is the sum of Pending, Active, Scheduled, Retry, and Archived. Size int // Number of pending tasks. Pending int @@ -52,8 +52,8 @@ type QueueStats struct { Scheduled int // Number of retry tasks. Retry int - // Number of dead tasks. - Dead int + // Number of archived tasks. + Archived int // Total number of tasks being processed during the given date. // The number includes both succeeded and failed tasks. Processed int @@ -82,7 +82,7 @@ func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) { Active: stats.Active, Scheduled: stats.Scheduled, Retry: stats.Retry, - Dead: stats.Dead, + Archived: stats.Archived, Processed: stats.Processed, Failed: stats.Failed, Paused: stats.Paused, @@ -201,9 +201,11 @@ type RetryTask struct { score int64 } -// DeadTask is a task exhausted its retries. -// DeadTask won't be retried automatically. -type DeadTask struct { +// ArchivedTask is a task archived for debugging and inspection purposes, and +// ArchivedTask won't be retried automatically. +// A task can be archived when the task exhausts its retry counts or manually +// archived by a user via CLI or Inspector. +type ArchivedTask struct { *Task ID string Queue string @@ -215,19 +217,19 @@ type DeadTask struct { score int64 } -// Key returns a key used to delete, run, and kill the task. +// Key returns a key used to delete, run, and archive the task. func (t *ScheduledTask) Key() string { return fmt.Sprintf("s:%v:%v", t.ID, t.score) } -// Key returns a key used to delete, run, and kill the task. +// Key returns a key used to delete, run, and archive the task. func (t *RetryTask) Key() string { return fmt.Sprintf("r:%v:%v", t.ID, t.score) } -// Key returns a key used to delete, run, and kill the task. -func (t *DeadTask) Key() string { - return fmt.Sprintf("d:%v:%v", t.ID, t.score) +// Key returns a key used to delete, run, and archive the task. +func (t *ArchivedTask) Key() string { + return fmt.Sprintf("a:%v:%v", t.ID, t.score) } // parseTaskKey parses a key string and returns each part of key with proper @@ -246,7 +248,7 @@ func parseTaskKey(key string) (id uuid.UUID, score int64, state string, err erro return uuid.Nil, 0, "", fmt.Errorf("invalid id") } state = parts[0] - if len(state) != 1 || !strings.Contains("srd", state) { + if len(state) != 1 || !strings.Contains("sra", state) { return uuid.Nil, 0, "", fmt.Errorf("invalid id") } return id, score, state, nil @@ -423,25 +425,25 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa return tasks, nil } -// ListDeadTasks retrieves dead tasks from the specified queue. +// ListArchivedTasks retrieves archived tasks from the specified queue. // Tasks are sorted by LastFailedAt field in descending order. // // By default, it retrieves the first 30 tasks. -func (i *Inspector) ListDeadTasks(qname string, opts ...ListOption) ([]*DeadTask, error) { +func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*ArchivedTask, error) { if err := validateQueueName(qname); err != nil { return nil, err } opt := composeListOptions(opts...) pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1} - zs, err := i.rdb.ListDead(qname, pgn) + zs, err := i.rdb.ListArchived(qname, pgn) if err != nil { return nil, err } - var tasks []*DeadTask + var tasks []*ArchivedTask for _, z := range zs { failedAt := time.Unix(z.Score, 0) t := NewTask(z.Message.Type, z.Message.Payload) - tasks = append(tasks, &DeadTask{ + tasks = append(tasks, &ArchivedTask{ Task: t, ID: z.Message.ID.String(), Queue: z.Message.Queue, @@ -475,13 +477,13 @@ func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) { return int(n), err } -// DeleteAllDeadTasks deletes all dead tasks from the specified queue, +// DeleteAllArchivedTasks deletes all archived tasks from the specified queue, // and reports the number tasks deleted. -func (i *Inspector) DeleteAllDeadTasks(qname string) (int, error) { +func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) { if err := validateQueueName(qname); err != nil { return 0, err } - n, err := i.rdb.DeleteAllDeadTasks(qname) + n, err := i.rdb.DeleteAllArchivedTasks(qname) return int(n), err } @@ -499,8 +501,8 @@ func (i *Inspector) DeleteTaskByKey(qname, key string) error { return i.rdb.DeleteScheduledTask(qname, id, score) case "r": return i.rdb.DeleteRetryTask(qname, id, score) - case "d": - return i.rdb.DeleteDeadTask(qname, id, score) + case "a": + return i.rdb.DeleteArchivedTask(qname, id, score) default: return fmt.Errorf("invalid key") } @@ -526,13 +528,13 @@ func (i *Inspector) RunAllRetryTasks(qname string) (int, error) { return int(n), err } -// RunAllDeadTasks transition all dead tasks to pending state within the given queue, +// RunAllArchivedTasks transition all archived tasks to pending state within the given queue, // and reports the number of tasks transitioned. -func (i *Inspector) RunAllDeadTasks(qname string) (int, error) { +func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) { if err := validateQueueName(qname); err != nil { return 0, err } - n, err := i.rdb.RunAllDeadTasks(qname) + n, err := i.rdb.RunAllArchivedTasks(qname) return int(n), err } @@ -550,35 +552,35 @@ func (i *Inspector) RunTaskByKey(qname, key string) error { return i.rdb.RunScheduledTask(qname, id, score) case "r": return i.rdb.RunRetryTask(qname, id, score) - case "d": - return i.rdb.RunDeadTask(qname, id, score) + case "a": + return i.rdb.RunArchivedTask(qname, id, score) default: return fmt.Errorf("invalid key") } } -// KillAllScheduledTasks kills all scheduled tasks within the given queue, -// and reports the number of tasks killed. -func (i *Inspector) KillAllScheduledTasks(qname string) (int, error) { +// ArchiveAllScheduledTasks archives all scheduled tasks within the given queue, +// and reports the number of tasks archiveed. +func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error) { if err := validateQueueName(qname); err != nil { return 0, err } - n, err := i.rdb.KillAllScheduledTasks(qname) + n, err := i.rdb.ArchiveAllScheduledTasks(qname) return int(n), err } -// KillAllRetryTasks kills all retry tasks within the given queue, -// and reports the number of tasks killed. -func (i *Inspector) KillAllRetryTasks(qname string) (int, error) { +// ArchiveAllRetryTasks archives all retry tasks within the given queue, +// and reports the number of tasks archiveed. +func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) { if err := validateQueueName(qname); err != nil { return 0, err } - n, err := i.rdb.KillAllRetryTasks(qname) + n, err := i.rdb.ArchiveAllRetryTasks(qname) return int(n), err } -// KillTaskByKey kills a task with the given key in the given queue. -func (i *Inspector) KillTaskByKey(qname, key string) error { +// ArchiveTaskByKey archives a task with the given key in the given queue. +func (i *Inspector) ArchiveTaskByKey(qname, key string) error { if err := validateQueueName(qname); err != nil { return err } @@ -588,11 +590,11 @@ func (i *Inspector) KillTaskByKey(qname, key string) error { } switch state { case "s": - return i.rdb.KillScheduledTask(qname, id, score) + return i.rdb.ArchiveScheduledTask(qname, id, score) case "r": - return i.rdb.KillRetryTask(qname, id, score) - case "d": - return fmt.Errorf("task already dead") + return i.rdb.ArchiveRetryTask(qname, id, score) + case "a": + return fmt.Errorf("task already archived") default: return fmt.Errorf("invalid key") } @@ -716,7 +718,7 @@ type ClusterNode struct { Addr string } -// ClusterNode returns a list of nodes the given queue belongs to. +// ClusterNodes returns a list of nodes the given queue belongs to. func (i *Inspector) ClusterNodes(qname string) ([]ClusterNode, error) { nodes, err := i.rdb.ClusterNodes(qname) if err != nil { diff --git a/inspector_test.go b/inspector_test.go index 830c79b..19c117a 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -67,7 +67,7 @@ func TestInspectorDeleteQueue(t *testing.T) { active map[string][]*base.TaskMessage scheduled map[string][]base.Z retry map[string][]base.Z - dead map[string][]base.Z + archived map[string][]base.Z qname string // queue to remove force bool }{ @@ -88,7 +88,7 @@ func TestInspectorDeleteQueue(t *testing.T) { "default": {}, "custom": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "custom": {}, }, @@ -112,7 +112,7 @@ func TestInspectorDeleteQueue(t *testing.T) { "default": {}, "custom": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "custom": {}, }, @@ -127,7 +127,7 @@ func TestInspectorDeleteQueue(t *testing.T) { h.SeedAllActiveQueues(t, r, tc.active) h.SeedAllScheduledQueues(t, r, tc.scheduled) h.SeedAllRetryQueues(t, r, tc.retry) - h.SeedAllDeadQueues(t, r, tc.dead) + h.SeedAllArchivedQueues(t, r, tc.archived) err := inspector.DeleteQueue(tc.qname, tc.force) if err != nil { @@ -156,7 +156,7 @@ func TestInspectorDeleteQueueErrorQueueNotEmpty(t *testing.T) { active map[string][]*base.TaskMessage scheduled map[string][]base.Z retry map[string][]base.Z - dead map[string][]base.Z + archived map[string][]base.Z qname string // queue to remove force bool }{ @@ -173,7 +173,7 @@ func TestInspectorDeleteQueueErrorQueueNotEmpty(t *testing.T) { retry: map[string][]base.Z{ "default": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, }, qname: "default", @@ -187,7 +187,7 @@ func TestInspectorDeleteQueueErrorQueueNotEmpty(t *testing.T) { h.SeedAllActiveQueues(t, r, tc.active) h.SeedAllScheduledQueues(t, r, tc.scheduled) h.SeedAllRetryQueues(t, r, tc.retry) - h.SeedAllDeadQueues(t, r, tc.dead) + h.SeedAllArchivedQueues(t, r, tc.archived) err := inspector.DeleteQueue(tc.qname, tc.force) if _, ok := err.(*ErrQueueNotEmpty); !ok { @@ -212,7 +212,7 @@ func TestInspectorDeleteQueueErrorQueueNotFound(t *testing.T) { active map[string][]*base.TaskMessage scheduled map[string][]base.Z retry map[string][]base.Z - dead map[string][]base.Z + archived map[string][]base.Z qname string // queue to remove force bool }{ @@ -229,7 +229,7 @@ func TestInspectorDeleteQueueErrorQueueNotFound(t *testing.T) { retry: map[string][]base.Z{ "default": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, }, qname: "nonexistent", @@ -243,7 +243,7 @@ func TestInspectorDeleteQueueErrorQueueNotFound(t *testing.T) { h.SeedAllActiveQueues(t, r, tc.active) h.SeedAllScheduledQueues(t, r, tc.scheduled) h.SeedAllRetryQueues(t, r, tc.retry) - h.SeedAllDeadQueues(t, r, tc.dead) + h.SeedAllArchivedQueues(t, r, tc.archived) err := inspector.DeleteQueue(tc.qname, tc.force) if _, ok := err.(*ErrQueueNotFound); !ok { @@ -272,7 +272,7 @@ func TestInspectorCurrentStats(t *testing.T) { active map[string][]*base.TaskMessage scheduled map[string][]base.Z retry map[string][]base.Z - dead map[string][]base.Z + archived map[string][]base.Z processed map[string]int failed map[string]int qname string @@ -302,7 +302,7 @@ func TestInspectorCurrentStats(t *testing.T) { "critical": {}, "low": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "critical": {}, "low": {}, @@ -325,7 +325,7 @@ func TestInspectorCurrentStats(t *testing.T) { Active: 1, Scheduled: 2, Retry: 0, - Dead: 0, + Archived: 0, Processed: 120, Failed: 2, Paused: false, @@ -340,7 +340,7 @@ func TestInspectorCurrentStats(t *testing.T) { asynqtest.SeedAllActiveQueues(t, r, tc.active) asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) asynqtest.SeedAllRetryQueues(t, r, tc.retry) - asynqtest.SeedAllDeadQueues(t, r, tc.dead) + asynqtest.SeedAllArchivedQueues(t, r, tc.archived) for qname, n := range tc.processed { processedKey := base.ProcessedKey(qname, now) r.Set(processedKey, n, 0) @@ -702,9 +702,9 @@ func TestInspectorListRetryTasks(t *testing.T) { } } -func createDeadTask(z base.Z) *DeadTask { +func createArchivedTask(z base.Z) *ArchivedTask { msg := z.Message - return &DeadTask{ + return &ArchivedTask{ Task: NewTask(msg.Type, msg.Payload), ID: msg.ID.String(), Queue: msg.Queue, @@ -716,7 +716,7 @@ func createDeadTask(z base.Z) *DeadTask { } } -func TestInspectorListDeadTasks(t *testing.T) { +func TestInspectorListArchivedTasks(t *testing.T) { r := setup(t) defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) @@ -732,47 +732,47 @@ func TestInspectorListDeadTasks(t *testing.T) { inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { - desc string - dead map[string][]base.Z - qname string - want []*DeadTask + desc string + archived map[string][]base.Z + qname string + want []*ArchivedTask }{ { - desc: "with a few dead tasks", - dead: map[string][]base.Z{ + desc: "with a few archived tasks", + archived: map[string][]base.Z{ "default": {z1, z2, z3}, "custom": {z4}, }, qname: "default", // Should be sorted by LastFailedAt. - want: []*DeadTask{ - createDeadTask(z2), - createDeadTask(z1), - createDeadTask(z3), + want: []*ArchivedTask{ + createArchivedTask(z2), + createArchivedTask(z1), + createArchivedTask(z3), }, }, { - desc: "with empty dead queue", - dead: map[string][]base.Z{ + desc: "with empty archived queue", + archived: map[string][]base.Z{ "default": {}, }, qname: "default", - want: []*DeadTask(nil), + want: []*ArchivedTask(nil), }, } for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedAllDeadQueues(t, r, tc.dead) + asynqtest.SeedAllArchivedQueues(t, r, tc.archived) - got, err := inspector.ListDeadTasks(tc.qname) + got, err := inspector.ListArchivedTasks(tc.qname) if err != nil { - t.Errorf("%s; ListDeadTasks(%q) returned error: %v", tc.desc, tc.qname, err) + t.Errorf("%s; ListArchivedTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(Payload{}, DeadTask{}) + ignoreOpt := cmpopts.IgnoreUnexported(Payload{}, ArchivedTask{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { - t.Errorf("%s; ListDeadTask(%q) = %v, want %v; (-want,+got)\n%s", + t.Errorf("%s; ListArchivedTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) } } @@ -971,7 +971,7 @@ func TestInspectorDeleteAllRetryTasks(t *testing.T) { } } -func TestInspectorDeleteAllDeadTasks(t *testing.T) { +func TestInspectorDeleteAllArchivedTasks(t *testing.T) { r := setup(t) defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) @@ -987,30 +987,30 @@ func TestInspectorDeleteAllDeadTasks(t *testing.T) { inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { - dead map[string][]base.Z - qname string - want int - wantDead map[string][]base.Z + archived map[string][]base.Z + qname string + want int + wantArchived map[string][]base.Z }{ { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {z1, z2, z3}, "custom": {z4}, }, qname: "default", want: 3, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {}, "custom": {z4}, }, }, { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, }, qname: "default", want: 0, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {}, }, }, @@ -1018,20 +1018,20 @@ func TestInspectorDeleteAllDeadTasks(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedAllDeadQueues(t, r, tc.dead) + asynqtest.SeedAllArchivedQueues(t, r, tc.archived) - got, err := inspector.DeleteAllDeadTasks(tc.qname) + got, err := inspector.DeleteAllArchivedTasks(tc.qname) if err != nil { - t.Errorf("DeleteAllDeadTasks(%q) returned error: %v", tc.qname, err) + t.Errorf("DeleteAllArchivedTasks(%q) returned error: %v", tc.qname, err) continue } if got != tc.want { - t.Errorf("DeleteAllDeadTasks(%q) = %d, want %d", tc.qname, got, tc.want) + t.Errorf("DeleteAllArchivedTasks(%q) = %d, want %d", tc.qname, got, tc.want) } - for qname, want := range tc.wantDead { - gotDead := asynqtest.GetDeadEntries(t, r, qname) - if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", qname, diff) + for qname, want := range tc.wantArchived { + gotArchived := asynqtest.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, gotArchived, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } } @@ -1054,18 +1054,18 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { tests := []struct { scheduled map[string][]base.Z - dead map[string][]base.Z + archived map[string][]base.Z qname string want int wantScheduled map[string][]base.Z - wantDead map[string][]base.Z + wantArchived map[string][]base.Z }{ { scheduled: map[string][]base.Z{ "default": {z1, z2, z3}, "custom": {z4}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "custom": {}, }, @@ -1075,7 +1075,7 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { "default": {}, "custom": {z4}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": { base.Z{Message: m1, Score: now.Unix()}, base.Z{Message: m2, Score: now.Unix()}, @@ -1088,7 +1088,7 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { scheduled: map[string][]base.Z{ "default": {z1, z2}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {z3}, }, qname: "default", @@ -1096,7 +1096,7 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { wantScheduled: map[string][]base.Z{ "default": {}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": { z3, base.Z{Message: m1, Score: now.Unix()}, @@ -1108,7 +1108,7 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { scheduled: map[string][]base.Z{ "default": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, }, qname: "default", @@ -1116,7 +1116,7 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { wantScheduled: map[string][]base.Z{ "default": {}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {}, }, }, @@ -1124,7 +1124,7 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { scheduled: map[string][]base.Z{ "default": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {z1, z2}, }, qname: "default", @@ -1132,7 +1132,7 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { wantScheduled: map[string][]base.Z{ "default": {}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {z1, z2}, }, }, @@ -1141,9 +1141,9 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) - asynqtest.SeedAllDeadQueues(t, r, tc.dead) + asynqtest.SeedAllArchivedQueues(t, r, tc.archived) - got, err := inspector.KillAllScheduledTasks(tc.qname) + got, err := inspector.ArchiveAllScheduledTasks(tc.qname) if err != nil { t.Errorf("KillAllScheduledTasks(%q) returned error: %v", tc.qname, err) continue @@ -1157,14 +1157,14 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) { t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff) } } - for qname, want := range tc.wantDead { + for qname, want := range tc.wantArchived { // Allow Z.Score to differ by up to 2. approxOpt := cmp.Comparer(func(a, b int64) bool { return math.Abs(float64(a-b)) < 2 }) - gotDead := asynqtest.GetDeadEntries(t, r, qname) - if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt, approxOpt); diff != "" { - t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", qname, diff) + gotArchived := asynqtest.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, gotArchived, asynqtest.SortZSetEntryOpt, approxOpt); diff != "" { + t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } } @@ -1186,19 +1186,19 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { - retry map[string][]base.Z - dead map[string][]base.Z - qname string - want int - wantRetry map[string][]base.Z - wantDead map[string][]base.Z + retry map[string][]base.Z + archived map[string][]base.Z + qname string + want int + wantRetry map[string][]base.Z + wantArchived map[string][]base.Z }{ { retry: map[string][]base.Z{ "default": {z1, z2, z3}, "custom": {z4}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "custom": {}, }, @@ -1208,7 +1208,7 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { "default": {}, "custom": {z4}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": { base.Z{Message: m1, Score: now.Unix()}, base.Z{Message: m2, Score: now.Unix()}, @@ -1221,7 +1221,7 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { retry: map[string][]base.Z{ "default": {z1, z2}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {z3}, }, qname: "default", @@ -1229,7 +1229,7 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { wantRetry: map[string][]base.Z{ "default": {}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": { z3, base.Z{Message: m1, Score: now.Unix()}, @@ -1241,7 +1241,7 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { retry: map[string][]base.Z{ "default": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {z1, z2}, }, qname: "default", @@ -1249,7 +1249,7 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { wantRetry: map[string][]base.Z{ "default": {}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {z1, z2}, }, }, @@ -1258,9 +1258,9 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) asynqtest.SeedAllRetryQueues(t, r, tc.retry) - asynqtest.SeedAllDeadQueues(t, r, tc.dead) + asynqtest.SeedAllArchivedQueues(t, r, tc.archived) - got, err := inspector.KillAllRetryTasks(tc.qname) + got, err := inspector.ArchiveAllRetryTasks(tc.qname) if err != nil { t.Errorf("KillAllRetryTasks(%q) returned error: %v", tc.qname, err) continue @@ -1275,10 +1275,10 @@ func TestInspectorKillAllRetryTasks(t *testing.T) { } } cmpOpt := asynqtest.EquateInt64Approx(2) // allow for 2 seconds difference in Z.Score - for qname, want := range tc.wantDead { - gotDead := asynqtest.GetDeadEntries(t, r, qname) - if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt, cmpOpt); diff != "" { - t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", qname, diff) + for qname, want := range tc.wantArchived { + wantArchived := asynqtest.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, asynqtest.SortZSetEntryOpt, cmpOpt); diff != "" { + t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } } @@ -1518,7 +1518,7 @@ func TestInspectorRunAllRetryTasks(t *testing.T) { } } -func TestInspectorRunAllDeadTasks(t *testing.T) { +func TestInspectorRunAllArchivedTasks(t *testing.T) { r := setup(t) defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) @@ -1534,15 +1534,15 @@ func TestInspectorRunAllDeadTasks(t *testing.T) { inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { - dead map[string][]base.Z - pending map[string][]*base.TaskMessage - qname string - want int - wantDead map[string][]base.Z - wantPending map[string][]*base.TaskMessage + archived map[string][]base.Z + pending map[string][]*base.TaskMessage + qname string + want int + wantArchived map[string][]base.Z + wantPending map[string][]*base.TaskMessage }{ { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {z1, z4}, "critical": {z2}, "low": {z3}, @@ -1554,7 +1554,7 @@ func TestInspectorRunAllDeadTasks(t *testing.T) { }, qname: "default", want: 2, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {}, "critical": {z2}, "low": {z3}, @@ -1566,7 +1566,7 @@ func TestInspectorRunAllDeadTasks(t *testing.T) { }, }, { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {z1}, "critical": {z2}, }, @@ -1576,7 +1576,7 @@ func TestInspectorRunAllDeadTasks(t *testing.T) { }, qname: "default", want: 1, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {}, "critical": {z2}, }, @@ -1586,7 +1586,7 @@ func TestInspectorRunAllDeadTasks(t *testing.T) { }, }, { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, }, pending: map[string][]*base.TaskMessage{ @@ -1594,7 +1594,7 @@ func TestInspectorRunAllDeadTasks(t *testing.T) { }, qname: "default", want: 0, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {}, }, wantPending: map[string][]*base.TaskMessage{ @@ -1605,21 +1605,21 @@ func TestInspectorRunAllDeadTasks(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedAllDeadQueues(t, r, tc.dead) + asynqtest.SeedAllArchivedQueues(t, r, tc.archived) asynqtest.SeedAllPendingQueues(t, r, tc.pending) - got, err := inspector.RunAllDeadTasks(tc.qname) + got, err := inspector.RunAllArchivedTasks(tc.qname) if err != nil { - t.Errorf("RunAllDeadTasks(%q) returned error: %v", tc.qname, err) + t.Errorf("RunAllArchivedTasks(%q) returned error: %v", tc.qname, err) continue } if got != tc.want { - t.Errorf("RunAllDeadTasks(%q) = %d, want %d", tc.qname, got, tc.want) + t.Errorf("RunAllArchivedTasks(%q) = %d, want %d", tc.qname, got, tc.want) } - for qname, want := range tc.wantDead { - gotDead := asynqtest.GetDeadEntries(t, r, qname) - if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", qname, diff) + for qname, want := range tc.wantArchived { + wantArchived := asynqtest.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1732,7 +1732,7 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { } } -func TestInspectorDeleteTaskByKeyDeletesDeadTask(t *testing.T) { +func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) { r := setup(t) defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) @@ -1746,19 +1746,19 @@ func TestInspectorDeleteTaskByKeyDeletesDeadTask(t *testing.T) { inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { - dead map[string][]base.Z - qname string - key string - wantDead map[string][]base.Z + archived map[string][]base.Z + qname string + key string + wantArchived map[string][]base.Z }{ { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {z1, z2}, "custom": {z3}, }, qname: "default", - key: createDeadTask(z2).Key(), - wantDead: map[string][]base.Z{ + key: createArchivedTask(z2).Key(), + wantArchived: map[string][]base.Z{ "default": {z1}, "custom": {z3}, }, @@ -1767,16 +1767,16 @@ func TestInspectorDeleteTaskByKeyDeletesDeadTask(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedAllDeadQueues(t, r, tc.dead) + asynqtest.SeedAllArchivedQueues(t, r, tc.archived) if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } - for qname, want := range tc.wantDead { - gotDead := asynqtest.GetDeadEntries(t, r, qname) - if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", qname, diff) + for qname, want := range tc.wantArchived { + wantArchived := asynqtest.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } } @@ -1921,7 +1921,7 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { } } -func TestInspectorRunTaskByKeyRunsDeadTask(t *testing.T) { +func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) { r := setup(t) defer r.Close() m1 := asynqtest.NewTaskMessage("task1", nil) @@ -1935,15 +1935,15 @@ func TestInspectorRunTaskByKeyRunsDeadTask(t *testing.T) { inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { - dead map[string][]base.Z - pending map[string][]*base.TaskMessage - qname string - key string - wantDead map[string][]base.Z - wantPending map[string][]*base.TaskMessage + archived map[string][]base.Z + pending map[string][]*base.TaskMessage + qname string + key string + wantArchived map[string][]base.Z + wantPending map[string][]*base.TaskMessage }{ { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {z1}, "critical": {z2}, "low": {z3}, @@ -1954,8 +1954,8 @@ func TestInspectorRunTaskByKeyRunsDeadTask(t *testing.T) { "low": {}, }, qname: "critical", - key: createDeadTask(z2).Key(), - wantDead: map[string][]base.Z{ + key: createArchivedTask(z2).Key(), + wantArchived: map[string][]base.Z{ "default": {z1}, "critical": {}, "low": {z3}, @@ -1970,17 +1970,17 @@ func TestInspectorRunTaskByKeyRunsDeadTask(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) - asynqtest.SeedAllDeadQueues(t, r, tc.dead) + asynqtest.SeedAllArchivedQueues(t, r, tc.archived) asynqtest.SeedAllPendingQueues(t, r, tc.pending) if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } - for qname, want := range tc.wantDead { - gotDead := asynqtest.GetDeadEntries(t, r, qname) - if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", + for qname, want := range tc.wantArchived { + wantArchived := asynqtest.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -2009,19 +2009,19 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) { tests := []struct { scheduled map[string][]base.Z - dead map[string][]base.Z + archived map[string][]base.Z qname string key string want string wantScheduled map[string][]base.Z - wantDead map[string][]base.Z + wantArchived map[string][]base.Z }{ { scheduled: map[string][]base.Z{ "default": {z1}, "custom": {z2, z3}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "custom": {}, }, @@ -2031,7 +2031,7 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) { "default": {z1}, "custom": {z3}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {}, "custom": { { @@ -2046,9 +2046,9 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) - asynqtest.SeedAllDeadQueues(t, r, tc.dead) + asynqtest.SeedAllArchivedQueues(t, r, tc.archived) - if err := inspector.KillTaskByKey(tc.qname, tc.key); err != nil { + if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("KillTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } @@ -2060,10 +2060,10 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) { } } - for qname, want := range tc.wantDead { - gotDead := asynqtest.GetDeadEntries(t, r, qname) - if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", + for qname, want := range tc.wantArchived { + wantArchived := asynqtest.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -2084,19 +2084,19 @@ func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) { inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { - retry map[string][]base.Z - dead map[string][]base.Z - qname string - key string - wantRetry map[string][]base.Z - wantDead map[string][]base.Z + retry map[string][]base.Z + archived map[string][]base.Z + qname string + key string + wantRetry map[string][]base.Z + wantArchived map[string][]base.Z }{ { retry: map[string][]base.Z{ "default": {z1}, "custom": {z2, z3}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "custom": {}, }, @@ -2106,7 +2106,7 @@ func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) { "default": {z1}, "custom": {z3}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {}, "custom": { { @@ -2121,9 +2121,9 @@ func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) { for _, tc := range tests { asynqtest.FlushDB(t, r) asynqtest.SeedAllRetryQueues(t, r, tc.retry) - asynqtest.SeedAllDeadQueues(t, r, tc.dead) + asynqtest.SeedAllArchivedQueues(t, r, tc.archived) - if err := inspector.KillTaskByKey(tc.qname, tc.key); err != nil { + if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("KillTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } @@ -2134,10 +2134,10 @@ func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) { qname, diff) } } - for qname, want := range tc.wantDead { - gotDead := asynqtest.GetDeadEntries(t, r, qname) - if diff := cmp.Diff(want, gotDead, asynqtest.SortZSetEntryOpt); diff != "" { - t.Errorf("unexpected dead tasks in queue %q: (-want, +got)\n%s", + for qname, want := range tc.wantArchived { + wantArchived := asynqtest.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, asynqtest.SortZSetEntryOpt); diff != "" { + t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index d1d5f12..c1ef6f1 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -220,11 +220,11 @@ func SeedRetryQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qn seedRedisZSet(tb, r, base.RetryKey(qname), entries) } -// SeedDeadQueue initializes the dead queue with the given messages. -func SeedDeadQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { +// SeedArchivedQueue initializes the archived queue with the given messages. +func SeedArchivedQueue(tb testing.TB, r redis.UniversalClient, entries []base.Z, qname string) { tb.Helper() r.SAdd(base.AllQueues, qname) - seedRedisZSet(tb, r, base.DeadKey(qname), entries) + seedRedisZSet(tb, r, base.ArchivedKey(qname), entries) } // SeedDeadlines initializes the deadlines set with the given entries. @@ -264,10 +264,10 @@ func SeedAllRetryQueues(tb testing.TB, r redis.UniversalClient, retry map[string } } -// SeedAllDeadQueues initializes all of the specified dead queues with the given entries. -func SeedAllDeadQueues(tb testing.TB, r redis.UniversalClient, dead map[string][]base.Z) { - for q, entries := range dead { - SeedDeadQueue(tb, r, entries, q) +// SeedAllArchivedQueues initializes all of the specified archived queues with the given entries. +func SeedAllArchivedQueues(tb testing.TB, r redis.UniversalClient, archived map[string][]base.Z) { + for q, entries := range archived { + SeedArchivedQueue(tb, r, entries, q) } } @@ -320,10 +320,10 @@ func GetRetryMessages(tb testing.TB, r redis.UniversalClient, qname string) []*b return getZSetMessages(tb, r, base.RetryKey(qname)) } -// GetDeadMessages returns all dead messages in the given queue. -func GetDeadMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { +// GetArchivedMessages returns all archived messages in the given queue. +func GetArchivedMessages(tb testing.TB, r redis.UniversalClient, qname string) []*base.TaskMessage { tb.Helper() - return getZSetMessages(tb, r, base.DeadKey(qname)) + return getZSetMessages(tb, r, base.ArchivedKey(qname)) } // GetScheduledEntries returns all scheduled messages and its score in the given queue. @@ -338,10 +338,10 @@ func GetRetryEntries(tb testing.TB, r redis.UniversalClient, qname string) []bas return getZSetEntries(tb, r, base.RetryKey(qname)) } -// GetDeadEntries returns all dead messages and its score in the given queue. -func GetDeadEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { +// GetArchivedEntries returns all archived messages and its score in the given queue. +func GetArchivedEntries(tb testing.TB, r redis.UniversalClient, qname string) []base.Z { tb.Helper() - return getZSetEntries(tb, r, base.DeadKey(qname)) + return getZSetEntries(tb, r, base.ArchivedKey(qname)) } // GetDeadlinesEntries returns all task messages and its score in the deadlines set for the given queue. diff --git a/internal/base/base.go b/internal/base/base.go index 44559cb..769cdfa 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -56,9 +56,9 @@ func RetryKey(qname string) string { return fmt.Sprintf("asynq:{%s}:retry", qname) } -// DeadKey returns a redis key for the dead tasks. -func DeadKey(qname string) string { - return fmt.Sprintf("asynq:{%s}:dead", qname) +// ArchivedKey returns a redis key for the archived tasks. +func ArchivedKey(qname string) string { + return fmt.Sprintf("asynq:{%s}:archived", qname) } // DeadlinesKey returns a redis key for the deadlines. @@ -156,7 +156,7 @@ type TaskMessage struct { // Timeout specifies timeout in seconds. // If task processing doesn't complete within the timeout, the task will be retried - // if retry count is remaining. Otherwise it will be moved to the dead queue. + // if retry count is remaining. Otherwise it will be moved to the archive. // // Use zero to indicate no timeout. Timeout int64 @@ -164,7 +164,7 @@ type TaskMessage struct { // Deadline specifies the deadline for the task in Unix time, // the number of seconds elapsed since January 1, 1970 UTC. // If task processing doesn't complete before the deadline, the task will be retried - // if retry count is remaining. Otherwise it will be moved to the dead queue. + // if retry count is remaining. Otherwise it will be moved to the archive. // // Use zero to indicate no deadline. Deadline int64 @@ -369,7 +369,7 @@ type Broker interface { Schedule(msg *TaskMessage, processAt time.Time) error ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error Retry(msg *TaskMessage, processAt time.Time, errMsg string) error - Kill(msg *TaskMessage, errMsg string) error + Archive(msg *TaskMessage, errMsg string) error CheckAndEnqueue(qnames ...string) error ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*TaskMessage, error) WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 37d87e3..a75baea 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -100,19 +100,19 @@ func TestRetryKey(t *testing.T) { } } -func TestDeadKey(t *testing.T) { +func TestArchivedKey(t *testing.T) { tests := []struct { qname string want string }{ - {"default", "asynq:{default}:dead"}, - {"custom", "asynq:{custom}:dead"}, + {"default", "asynq:{default}:archived"}, + {"custom", "asynq:{custom}:archived"}, } for _, tc := range tests { - got := DeadKey(tc.qname) + got := ArchivedKey(tc.qname) if got != tc.want { - t.Errorf("DeadKey(%q) = %q, want %q", tc.qname, got, tc.want) + t.Errorf("ArchivedKey(%q) = %q, want %q", tc.qname, got, tc.want) } } } diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index df6a0aa..a3298d1 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -35,7 +35,7 @@ type Stats struct { Active int Scheduled int Retry int - Dead int + Archived int // Total number of tasks processed during the current date. // The number includes both succeeded and failed tasks. Processed int @@ -62,7 +62,7 @@ type DailyStats struct { // KEYS[2] -> asynq::active // KEYS[3] -> asynq::scheduled // KEYS[4] -> asynq::retry -// KEYS[5] -> asynq::dead +// KEYS[5] -> asynq::archived // KEYS[6] -> asynq::processed: // KEYS[7] -> asynq::failed: // KEYS[8] -> asynq::paused @@ -111,7 +111,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { base.ActiveKey(qname), base.ScheduledKey(qname), base.RetryKey(qname), - base.DeadKey(qname), + base.ArchivedKey(qname), base.ProcessedKey(qname, now), base.FailedKey(qname, now), base.PausedKey(qname), @@ -144,8 +144,8 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) { case base.RetryKey(qname): stats.Retry = val size += val - case base.DeadKey(qname): - stats.Dead = val + case base.ArchivedKey(qname): + stats.Archived = val size += val case base.ProcessedKey(qname, now): stats.Processed = val @@ -328,12 +328,12 @@ func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) { return r.listZSetEntries(base.RetryKey(qname), pgn) } -// ListDead returns all tasks from the given queue that have exhausted its retry limit. -func (r *RDB) ListDead(qname string, pgn Pagination) ([]base.Z, error) { +// ListArchived returns all tasks from the given queue that have exhausted its retry limit. +func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) { if !r.client.SIsMember(base.AllQueues, qname).Val() { return nil, fmt.Errorf("queue %q does not exist", qname) } - return r.listZSetEntries(base.DeadKey(qname), pgn) + return r.listZSetEntries(base.ArchivedKey(qname), pgn) } // listZSetEntries returns a list of message and score pairs in Redis sorted-set @@ -358,11 +358,11 @@ func (r *RDB) listZSetEntries(key string, pgn Pagination) ([]base.Z, error) { return res, nil } -// RunDeadTask finds a dead task that matches the given id and score from +// RunArchivedTask finds an archived task that matches the given id and score from // the given queue and enqueues it for processing. -//If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) RunDeadTask(qname string, id uuid.UUID, score int64) error { - n, err := r.removeAndRun(base.DeadKey(qname), base.QueueKey(qname), id.String(), float64(score)) +// If a task that matches the id and score does not exist, it returns ErrTaskNotFound. +func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error { + n, err := r.removeAndRun(base.ArchivedKey(qname), base.QueueKey(qname), id.String(), float64(score)) if err != nil { return err } @@ -412,10 +412,10 @@ func (r *RDB) RunAllRetryTasks(qname string) (int64, error) { return r.removeAndRunAll(base.RetryKey(qname), base.QueueKey(qname)) } -// RunAllDeadTasks enqueues all tasks from dead queue +// RunAllArchivedTasks enqueues all archived tasks from the given queue // and returns the number of tasks enqueued. -func (r *RDB) RunAllDeadTasks(qname string) (int64, error) { - return r.removeAndRunAll(base.DeadKey(qname), base.QueueKey(qname)) +func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) { + return r.removeAndRunAll(base.ArchivedKey(qname), base.QueueKey(qname)) } var removeAndRunCmd = redis.NewScript(` @@ -462,10 +462,10 @@ func (r *RDB) removeAndRunAll(zset, qkey string) (int64, error) { return n, nil } -// KillRetryTask finds a retry task that matches the given id and score from the given queue -// and kills it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) KillRetryTask(qname string, id uuid.UUID, score int64) error { - n, err := r.removeAndKill(base.RetryKey(qname), base.DeadKey(qname), id.String(), float64(score)) +// ArchiveRetryTask finds a retry task that matches the given id and score from the given queue +// and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound. +func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID, score int64) error { + n, err := r.removeAndArchive(base.RetryKey(qname), base.ArchivedKey(qname), id.String(), float64(score)) if err != nil { return err } @@ -475,10 +475,10 @@ func (r *RDB) KillRetryTask(qname string, id uuid.UUID, score int64) error { return nil } -// KillScheduledTask finds a scheduled task that matches the given id and score from the given queue -// and kills it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) KillScheduledTask(qname string, id uuid.UUID, score int64) error { - n, err := r.removeAndKill(base.ScheduledKey(qname), base.DeadKey(qname), id.String(), float64(score)) +// ArchiveScheduledTask finds a scheduled task that matches the given id and score from the given queue +// and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound. +func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID, score int64) error { + n, err := r.removeAndArchive(base.ScheduledKey(qname), base.ArchivedKey(qname), id.String(), float64(score)) if err != nil { return err } @@ -488,26 +488,26 @@ func (r *RDB) KillScheduledTask(qname string, id uuid.UUID, score int64) error { return nil } -// KillAllRetryTasks kills all retry tasks from the given queue and +// ArchiveAllRetryTasks archives all retry tasks from the given queue and // returns the number of tasks that were moved. -func (r *RDB) KillAllRetryTasks(qname string) (int64, error) { - return r.removeAndKillAll(base.RetryKey(qname), base.DeadKey(qname)) +func (r *RDB) ArchiveAllRetryTasks(qname string) (int64, error) { + return r.removeAndArchiveAll(base.RetryKey(qname), base.ArchivedKey(qname)) } -// KillAllScheduledTasks kills all scheduled tasks from the given queue and +// ArchiveAllScheduledTasks archives all scheduled tasks from the given queue and // returns the number of tasks that were moved. -func (r *RDB) KillAllScheduledTasks(qname string) (int64, error) { - return r.removeAndKillAll(base.ScheduledKey(qname), base.DeadKey(qname)) +func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error) { + return r.removeAndArchiveAll(base.ScheduledKey(qname), base.ArchivedKey(qname)) } // KEYS[1] -> ZSET to move task from (e.g., retry queue) -// KEYS[2] -> asynq:{}:dead -// ARGV[1] -> score of the task to kill -// ARGV[2] -> id of the task to kill +// KEYS[2] -> asynq:{}:archived +// ARGV[1] -> score of the task to archive +// ARGV[2] -> id of the task to archive // ARGV[3] -> current timestamp // ARGV[4] -> cutoff timestamp (e.g., 90 days ago) -// ARGV[5] -> max number of tasks in dead queue (e.g., 100) -var removeAndKillCmd = redis.NewScript(` +// ARGV[5] -> max number of tasks in archived state (e.g., 100) +var removeAndArchiveCmd = redis.NewScript(` local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1]) for _, msg in ipairs(msgs) do local decoded = cjson.decode(msg) @@ -521,12 +521,12 @@ for _, msg in ipairs(msgs) do end return 0`) -func (r *RDB) removeAndKill(src, dst, id string, score float64) (int64, error) { +func (r *RDB) removeAndArchive(src, dst, id string, score float64) (int64, error) { now := time.Now() - limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago - res, err := removeAndKillCmd.Run(r.client, + limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago + res, err := removeAndArchiveCmd.Run(r.client, []string{src, dst}, - score, id, now.Unix(), limit, maxDeadTasks).Result() + score, id, now.Unix(), limit, maxArchiveSize).Result() if err != nil { return 0, err } @@ -538,11 +538,11 @@ func (r *RDB) removeAndKill(src, dst, id string, score float64) (int64, error) { } // KEYS[1] -> ZSET to move task from (e.g., retry queue) -// KEYS[2] -> asynq:{}:dead +// KEYS[2] -> asynq:{}:archived // ARGV[1] -> current timestamp // ARGV[2] -> cutoff timestamp (e.g., 90 days ago) -// ARGV[3] -> max number of tasks in dead queue (e.g., 100) -var removeAndKillAllCmd = redis.NewScript(` +// ARGV[3] -> max number of tasks in archive (e.g., 100) +var removeAndArchiveAllCmd = redis.NewScript(` local msgs = redis.call("ZRANGE", KEYS[1], 0, -1) for _, msg in ipairs(msgs) do redis.call("ZADD", KEYS[2], ARGV[1], msg) @@ -552,11 +552,11 @@ for _, msg in ipairs(msgs) do end return table.getn(msgs)`) -func (r *RDB) removeAndKillAll(src, dst string) (int64, error) { +func (r *RDB) removeAndArchiveAll(src, dst string) (int64, error) { now := time.Now() - limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago - res, err := removeAndKillAllCmd.Run(r.client, []string{src, dst}, - now.Unix(), limit, maxDeadTasks).Result() + limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago + res, err := removeAndArchiveAllCmd.Run(r.client, []string{src, dst}, + now.Unix(), limit, maxArchiveSize).Result() if err != nil { return 0, err } @@ -567,10 +567,10 @@ func (r *RDB) removeAndKillAll(src, dst string) (int64, error) { return n, nil } -// DeleteDeadTask deletes a dead task that matches the given id and score from the given queue. +// DeleteArchivedTask deletes an archived task that matches the given id and score from the given queue. // If a task that matches the id and score does not exist, it returns ErrTaskNotFound. -func (r *RDB) DeleteDeadTask(qname string, id uuid.UUID, score int64) error { - return r.deleteTask(base.DeadKey(qname), id.String(), float64(score)) +func (r *RDB) DeleteArchivedTask(qname string, id uuid.UUID, score int64) error { + return r.deleteTask(base.ArchivedKey(qname), id.String(), float64(score)) } // DeleteRetryTask deletes a retry task that matches the given id and score from the given queue. @@ -617,10 +617,10 @@ local n = redis.call("ZCARD", KEYS[1]) redis.call("DEL", KEYS[1]) return n`) -// DeleteAllDeadTasks deletes all dead tasks from the given queue +// DeleteAllArchivedTasks deletes all archived tasks from the given queue // and returns the number of tasks deleted. -func (r *RDB) DeleteAllDeadTasks(qname string) (int64, error) { - return r.deleteAll(base.DeadKey(qname)) +func (r *RDB) DeleteAllArchivedTasks(qname string) (int64, error) { + return r.deleteAll(base.ArchivedKey(qname)) } // DeleteAllRetryTasks deletes all retry tasks from the given queue @@ -670,7 +670,7 @@ func (e *ErrQueueNotEmpty) Error() string { // KEYS[2] -> asynq:{}:active // KEYS[3] -> asynq:{}:scheduled // KEYS[4] -> asynq:{}:retry -// KEYS[5] -> asynq:{}:dead +// KEYS[5] -> asynq:{}:archived // KEYS[6] -> asynq:{}:deadlines var removeQueueForceCmd = redis.NewScript(` local active = redis.call("LLEN", KEYS[2]) @@ -690,15 +690,15 @@ return redis.status_reply("OK")`) // KEYS[2] -> asynq:{}:active // KEYS[3] -> asynq:{}:scheduled // KEYS[4] -> asynq:{}:retry -// KEYS[5] -> asynq:{}:dead +// KEYS[5] -> asynq:{}:archived // KEYS[6] -> asynq:{}:deadlines var removeQueueCmd = redis.NewScript(` local pending = redis.call("LLEN", KEYS[1]) local active = redis.call("LLEN", KEYS[2]) local scheduled = redis.call("SCARD", KEYS[3]) local retry = redis.call("SCARD", KEYS[4]) -local dead = redis.call("SCARD", KEYS[5]) -local total = pending + active + scheduled + retry + dead +local archived = redis.call("SCARD", KEYS[5]) +local total = pending + active + scheduled + retry + archived if total > 0 then return redis.error_reply("QUEUE NOT EMPTY") end @@ -735,7 +735,7 @@ func (r *RDB) RemoveQueue(qname string, force bool) error { base.ActiveKey(qname), base.ScheduledKey(qname), base.RetryKey(qname), - base.DeadKey(qname), + base.ArchivedKey(qname), base.DeadlinesKey(qname), } if err := script.Run(r.client, keys).Err(); err != nil { diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 552b88c..3dd53d4 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -63,7 +63,7 @@ func TestCurrentStats(t *testing.T) { inProgress map[string][]*base.TaskMessage scheduled map[string][]base.Z retry map[string][]base.Z - dead map[string][]base.Z + archived map[string][]base.Z processed map[string]int failed map[string]int paused []string @@ -94,7 +94,7 @@ func TestCurrentStats(t *testing.T) { "critical": {}, "low": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "critical": {}, "low": {}, @@ -119,7 +119,7 @@ func TestCurrentStats(t *testing.T) { Active: 1, Scheduled: 2, Retry: 0, - Dead: 0, + Archived: 0, Processed: 120, Failed: 2, Timestamp: now, @@ -149,7 +149,7 @@ func TestCurrentStats(t *testing.T) { "critical": {}, "low": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "critical": {}, "low": {}, @@ -174,7 +174,7 @@ func TestCurrentStats(t *testing.T) { Active: 0, Scheduled: 0, Retry: 0, - Dead: 0, + Archived: 0, Processed: 100, Failed: 0, Timestamp: now, @@ -193,7 +193,7 @@ func TestCurrentStats(t *testing.T) { h.SeedAllActiveQueues(t, r.client, tc.inProgress) h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllRetryQueues(t, r.client, tc.retry) - h.SeedAllDeadQueues(t, r.client, tc.dead) + h.SeedAllArchivedQueues(t, r.client, tc.archived) for qname, n := range tc.processed { processedKey := base.ProcessedKey(qname, now) r.client.Set(processedKey, n, 0) @@ -869,12 +869,12 @@ func TestListDead(t *testing.T) { f3 := time.Now().Add(-4 * time.Hour) tests := []struct { - dead map[string][]base.Z - qname string - want []base.Z + archived map[string][]base.Z + qname string + want []base.Z }{ { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": { {Message: m1, Score: f1.Unix()}, {Message: m2, Score: f2.Unix()}, @@ -890,7 +890,7 @@ func TestListDead(t *testing.T) { }, }, { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": { {Message: m1, Score: f1.Unix()}, {Message: m2, Score: f2.Unix()}, @@ -905,7 +905,7 @@ func TestListDead(t *testing.T) { }, }, { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, }, qname: "default", @@ -915,9 +915,9 @@ func TestListDead(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedAllDeadQueues(t, r.client, tc.dead) + h.SeedAllArchivedQueues(t, r.client, tc.archived) - got, err := r.ListDead(tc.qname, Pagination{Size: 20, Page: 0}) + got, err := r.ListArchived(tc.qname, Pagination{Size: 20, Page: 0}) op := fmt.Sprintf("r.ListDead(%q, Pagination{Size: 20, Page: 0})", tc.qname) if err != nil { t.Errorf("%s = %v, %v, want %v, nil", op, got, err, tc.want) @@ -939,7 +939,7 @@ func TestListDeadPagination(t *testing.T) { msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) entries = append(entries, base.Z{Message: msg, Score: int64(i)}) } - h.SeedDeadQueue(t, r.client, entries, "default") + h.SeedArchivedQueue(t, r.client, entries, "default") tests := []struct { desc string @@ -958,7 +958,7 @@ func TestListDeadPagination(t *testing.T) { } for _, tc := range tests { - got, err := r.ListDead(tc.qname, Pagination{Size: tc.size, Page: tc.page}) + got, err := r.ListArchived(tc.qname, Pagination{Size: tc.size, Page: tc.page}) op := fmt.Sprintf("r.ListDead(Pagination{Size: %d, Page: %d})", tc.size, tc.page) if err != nil { @@ -1005,16 +1005,16 @@ func TestRunDeadTask(t *testing.T) { s2 := time.Now().Add(-time.Hour).Unix() tests := []struct { - dead map[string][]base.Z - qname string - score int64 - id uuid.UUID - want error // expected return value from calling RunDeadTask - wantDead map[string][]*base.TaskMessage - wantPending map[string][]*base.TaskMessage + archived map[string][]base.Z + qname string + score int64 + id uuid.UUID + want error // expected return value from calling RunDeadTask + wantArchived map[string][]*base.TaskMessage + wantPending map[string][]*base.TaskMessage }{ { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": { {Message: t1, Score: s1}, {Message: t2, Score: s2}, @@ -1024,7 +1024,7 @@ func TestRunDeadTask(t *testing.T) { score: s2, id: t2.ID, want: nil, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {t1}, }, wantPending: map[string][]*base.TaskMessage{ @@ -1032,7 +1032,7 @@ func TestRunDeadTask(t *testing.T) { }, }, { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": { {Message: t1, Score: s1}, {Message: t2, Score: s2}, @@ -1042,7 +1042,7 @@ func TestRunDeadTask(t *testing.T) { score: 123, id: t2.ID, want: ErrTaskNotFound, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {t1, t2}, }, wantPending: map[string][]*base.TaskMessage{ @@ -1050,7 +1050,7 @@ func TestRunDeadTask(t *testing.T) { }, }, { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": { {Message: t1, Score: s1}, {Message: t2, Score: s2}, @@ -1063,7 +1063,7 @@ func TestRunDeadTask(t *testing.T) { score: s1, id: t3.ID, want: nil, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {t1, t2}, "critical": {}, }, @@ -1076,9 +1076,9 @@ func TestRunDeadTask(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedAllDeadQueues(t, r.client, tc.dead) + h.SeedAllArchivedQueues(t, r.client, tc.archived) - got := r.RunDeadTask(tc.qname, tc.id, tc.score) + got := r.RunArchivedTask(tc.qname, tc.id, tc.score) if got != tc.want { t.Errorf("r.RunDeadTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) continue @@ -1091,10 +1091,10 @@ func TestRunDeadTask(t *testing.T) { } } - for qname, want := range tc.wantDead { - gotDead := h.GetDeadMessages(t, r.client, qname) + for qname, want := range tc.wantArchived { + gotDead := h.GetArchivedMessages(t, r.client, qname) if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q, (-want, +got)\n%s", base.DeadKey(qname), diff) + t.Errorf("mismatch found in %q, (-want, +got)\n%s", base.ArchivedKey(qname), diff) } } } @@ -1532,16 +1532,16 @@ func TestRunAllDeadTasks(t *testing.T) { t5 := h.NewTaskMessageWithQueue("minor_notification", nil, "custom") tests := []struct { - desc string - dead map[string][]base.Z - qname string - want int64 - wantPending map[string][]*base.TaskMessage - wantDead map[string][]*base.TaskMessage + desc string + archived map[string][]base.Z + qname string + want int64 + wantPending map[string][]*base.TaskMessage + wantArchived map[string][]*base.TaskMessage }{ { - desc: "with tasks in dead queue", - dead: map[string][]base.Z{ + desc: "with tasks in archived queue", + archived: map[string][]base.Z{ "default": { {Message: t1, Score: time.Now().Add(-time.Minute).Unix()}, {Message: t2, Score: time.Now().Add(-time.Minute).Unix()}, @@ -1553,13 +1553,13 @@ func TestRunAllDeadTasks(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": {t1, t2, t3}, }, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {}, }, }, { - desc: "with empty dead queue", - dead: map[string][]base.Z{ + desc: "with empty archived queue", + archived: map[string][]base.Z{ "default": {}, }, qname: "default", @@ -1567,13 +1567,13 @@ func TestRunAllDeadTasks(t *testing.T) { wantPending: map[string][]*base.TaskMessage{ "default": {}, }, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {}, }, }, { desc: "with custom queues", - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": { {Message: t1, Score: time.Now().Add(-time.Minute).Unix()}, {Message: t2, Score: time.Now().Add(-time.Minute).Unix()}, @@ -1590,7 +1590,7 @@ func TestRunAllDeadTasks(t *testing.T) { "default": {}, "custom": {t4, t5}, }, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {t1, t2, t3}, "custom": {}, }, @@ -1599,9 +1599,9 @@ func TestRunAllDeadTasks(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedAllDeadQueues(t, r.client, tc.dead) + h.SeedAllArchivedQueues(t, r.client, tc.archived) - got, err := r.RunAllDeadTasks(tc.qname) + got, err := r.RunAllArchivedTasks(tc.qname) if err != nil { t.Errorf("%s; r.RunAllDeadTasks(%q) = %v, %v; want %v, nil", tc.desc, tc.qname, got, err, tc.want) @@ -1619,10 +1619,10 @@ func TestRunAllDeadTasks(t *testing.T) { t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff) } } - for qname, want := range tc.wantDead { - gotDead := h.GetDeadMessages(t, r.client, qname) + for qname, want := range tc.wantArchived { + gotDead := h.GetArchivedMessages(t, r.client, qname) if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" { - t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.DeadKey(qname), diff) + t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.ArchivedKey(qname), diff) } } } @@ -1641,14 +1641,14 @@ func TestKillRetryTask(t *testing.T) { t4 := time.Now().Add(3 * time.Hour) tests := []struct { - retry map[string][]base.Z - dead map[string][]base.Z - qname string - id uuid.UUID - score int64 - want error - wantRetry map[string][]base.Z - wantDead map[string][]base.Z + retry map[string][]base.Z + archived map[string][]base.Z + qname string + id uuid.UUID + score int64 + want error + wantRetry map[string][]base.Z + wantArchived map[string][]base.Z }{ { retry: map[string][]base.Z{ @@ -1657,7 +1657,7 @@ func TestKillRetryTask(t *testing.T) { {Message: m2, Score: t2.Unix()}, }, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, }, qname: "default", @@ -1667,7 +1667,7 @@ func TestKillRetryTask(t *testing.T) { wantRetry: map[string][]base.Z{ "default": {{Message: m2, Score: t2.Unix()}}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {{Message: m1, Score: time.Now().Unix()}}, }, }, @@ -1675,7 +1675,7 @@ func TestKillRetryTask(t *testing.T) { retry: map[string][]base.Z{ "default": {{Message: m1, Score: t1.Unix()}}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {{Message: m2, Score: t2.Unix()}}, }, qname: "default", @@ -1685,7 +1685,7 @@ func TestKillRetryTask(t *testing.T) { wantRetry: map[string][]base.Z{ "default": {{Message: m1, Score: t1.Unix()}}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {{Message: m2, Score: t2.Unix()}}, }, }, @@ -1700,7 +1700,7 @@ func TestKillRetryTask(t *testing.T) { {Message: m4, Score: t4.Unix()}, }, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "custom": {}, }, @@ -1717,7 +1717,7 @@ func TestKillRetryTask(t *testing.T) { {Message: m4, Score: t4.Unix()}, }, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {}, "custom": {{Message: m3, Score: time.Now().Unix()}}, }, @@ -1727,9 +1727,9 @@ func TestKillRetryTask(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) h.SeedAllRetryQueues(t, r.client, tc.retry) - h.SeedAllDeadQueues(t, r.client, tc.dead) + h.SeedAllArchivedQueues(t, r.client, tc.archived) - got := r.KillRetryTask(tc.qname, tc.id, tc.score) + got := r.ArchiveRetryTask(tc.qname, tc.id, tc.score) if got != tc.want { t.Errorf("(*RDB).KillRetryTask(%q, %v, %v) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) @@ -1744,11 +1744,11 @@ func TestKillRetryTask(t *testing.T) { } } - for qname, want := range tc.wantDead { - gotDead := h.GetDeadEntries(t, r.client, qname) + for qname, want := range tc.wantArchived { + gotDead := h.GetArchivedEntries(t, r.client, qname) if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", - base.DeadKey(qname), diff) + base.ArchivedKey(qname), diff) } } } @@ -1768,13 +1768,13 @@ func TestKillScheduledTask(t *testing.T) { tests := []struct { scheduled map[string][]base.Z - dead map[string][]base.Z + archived map[string][]base.Z qname string id uuid.UUID score int64 want error wantScheduled map[string][]base.Z - wantDead map[string][]base.Z + wantArchived map[string][]base.Z }{ { scheduled: map[string][]base.Z{ @@ -1783,7 +1783,7 @@ func TestKillScheduledTask(t *testing.T) { {Message: m2, Score: t2.Unix()}, }, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, }, qname: "default", @@ -1793,7 +1793,7 @@ func TestKillScheduledTask(t *testing.T) { wantScheduled: map[string][]base.Z{ "default": {{Message: m2, Score: t2.Unix()}}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {{Message: m1, Score: time.Now().Unix()}}, }, }, @@ -1801,7 +1801,7 @@ func TestKillScheduledTask(t *testing.T) { scheduled: map[string][]base.Z{ "default": {{Message: m1, Score: t1.Unix()}}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {{Message: m2, Score: t2.Unix()}}, }, qname: "default", @@ -1811,7 +1811,7 @@ func TestKillScheduledTask(t *testing.T) { wantScheduled: map[string][]base.Z{ "default": {{Message: m1, Score: t1.Unix()}}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {{Message: m2, Score: t2.Unix()}}, }, }, @@ -1826,7 +1826,7 @@ func TestKillScheduledTask(t *testing.T) { {Message: m4, Score: t4.Unix()}, }, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "custom": {}, }, @@ -1843,7 +1843,7 @@ func TestKillScheduledTask(t *testing.T) { {Message: m4, Score: t4.Unix()}, }, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {}, "custom": {{Message: m3, Score: time.Now().Unix()}}, }, @@ -1853,9 +1853,9 @@ func TestKillScheduledTask(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) h.SeedAllScheduledQueues(t, r.client, tc.scheduled) - h.SeedAllDeadQueues(t, r.client, tc.dead) + h.SeedAllArchivedQueues(t, r.client, tc.archived) - got := r.KillScheduledTask(tc.qname, tc.id, tc.score) + got := r.ArchiveScheduledTask(tc.qname, tc.id, tc.score) if got != tc.want { t.Errorf("(*RDB).KillScheduledTask(%q, %v, %v) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) @@ -1870,11 +1870,11 @@ func TestKillScheduledTask(t *testing.T) { } } - for qname, want := range tc.wantDead { - gotDead := h.GetDeadEntries(t, r.client, qname) + for qname, want := range tc.wantArchived { + gotDead := h.GetArchivedEntries(t, r.client, qname) if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", - base.DeadKey(qname), diff) + base.ArchivedKey(qname), diff) } } } @@ -1893,12 +1893,12 @@ func TestKillAllRetryTasks(t *testing.T) { t4 := time.Now().Add(3 * time.Hour) tests := []struct { - retry map[string][]base.Z - dead map[string][]base.Z - qname string - want int64 - wantRetry map[string][]base.Z - wantDead map[string][]base.Z + retry map[string][]base.Z + archived map[string][]base.Z + qname string + want int64 + wantRetry map[string][]base.Z + wantArchived map[string][]base.Z }{ { retry: map[string][]base.Z{ @@ -1907,7 +1907,7 @@ func TestKillAllRetryTasks(t *testing.T) { {Message: m2, Score: t2.Unix()}, }, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, }, qname: "default", @@ -1915,7 +1915,7 @@ func TestKillAllRetryTasks(t *testing.T) { wantRetry: map[string][]base.Z{ "default": {}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": { {Message: m1, Score: time.Now().Unix()}, {Message: m2, Score: time.Now().Unix()}, @@ -1926,7 +1926,7 @@ func TestKillAllRetryTasks(t *testing.T) { retry: map[string][]base.Z{ "default": {{Message: m1, Score: t1.Unix()}}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {{Message: m2, Score: t2.Unix()}}, }, qname: "default", @@ -1934,7 +1934,7 @@ func TestKillAllRetryTasks(t *testing.T) { wantRetry: map[string][]base.Z{ "default": {}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": { {Message: m1, Score: time.Now().Unix()}, {Message: m2, Score: t2.Unix()}, @@ -1945,7 +1945,7 @@ func TestKillAllRetryTasks(t *testing.T) { retry: map[string][]base.Z{ "default": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": { {Message: m1, Score: t1.Unix()}, {Message: m2, Score: t2.Unix()}, @@ -1956,7 +1956,7 @@ func TestKillAllRetryTasks(t *testing.T) { wantRetry: map[string][]base.Z{ "default": {}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": { {Message: m1, Score: t1.Unix()}, {Message: m2, Score: t2.Unix()}, @@ -1974,7 +1974,7 @@ func TestKillAllRetryTasks(t *testing.T) { {Message: m4, Score: t4.Unix()}, }, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "custom": {}, }, @@ -1987,7 +1987,7 @@ func TestKillAllRetryTasks(t *testing.T) { }, "custom": {}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {}, "custom": { {Message: m3, Score: time.Now().Unix()}, @@ -2000,9 +2000,9 @@ func TestKillAllRetryTasks(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) h.SeedAllRetryQueues(t, r.client, tc.retry) - h.SeedAllDeadQueues(t, r.client, tc.dead) + h.SeedAllArchivedQueues(t, r.client, tc.archived) - got, err := r.KillAllRetryTasks(tc.qname) + got, err := r.ArchiveAllRetryTasks(tc.qname) if got != tc.want || err != nil { t.Errorf("(*RDB).KillAllRetryTasks(%q) = %v, %v; want %v, nil", tc.qname, got, err, tc.want) @@ -2017,11 +2017,11 @@ func TestKillAllRetryTasks(t *testing.T) { } } - for qname, want := range tc.wantDead { - gotDead := h.GetDeadEntries(t, r.client, qname) + for qname, want := range tc.wantArchived { + gotDead := h.GetArchivedEntries(t, r.client, qname) if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", - base.DeadKey(qname), diff) + base.ArchivedKey(qname), diff) } } } @@ -2041,11 +2041,11 @@ func TestKillAllScheduledTasks(t *testing.T) { tests := []struct { scheduled map[string][]base.Z - dead map[string][]base.Z + archived map[string][]base.Z qname string want int64 wantScheduled map[string][]base.Z - wantDead map[string][]base.Z + wantArchived map[string][]base.Z }{ { scheduled: map[string][]base.Z{ @@ -2054,7 +2054,7 @@ func TestKillAllScheduledTasks(t *testing.T) { {Message: m2, Score: t2.Unix()}, }, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, }, qname: "default", @@ -2062,7 +2062,7 @@ func TestKillAllScheduledTasks(t *testing.T) { wantScheduled: map[string][]base.Z{ "default": {}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": { {Message: m1, Score: time.Now().Unix()}, {Message: m2, Score: time.Now().Unix()}, @@ -2073,7 +2073,7 @@ func TestKillAllScheduledTasks(t *testing.T) { scheduled: map[string][]base.Z{ "default": {{Message: m1, Score: t1.Unix()}}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {{Message: m2, Score: t2.Unix()}}, }, qname: "default", @@ -2081,7 +2081,7 @@ func TestKillAllScheduledTasks(t *testing.T) { wantScheduled: map[string][]base.Z{ "default": {}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": { {Message: m1, Score: time.Now().Unix()}, {Message: m2, Score: t2.Unix()}, @@ -2092,7 +2092,7 @@ func TestKillAllScheduledTasks(t *testing.T) { scheduled: map[string][]base.Z{ "default": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": { {Message: m1, Score: t1.Unix()}, {Message: m2, Score: t2.Unix()}, @@ -2103,7 +2103,7 @@ func TestKillAllScheduledTasks(t *testing.T) { wantScheduled: map[string][]base.Z{ "default": {}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": { {Message: m1, Score: t1.Unix()}, {Message: m2, Score: t2.Unix()}, @@ -2121,7 +2121,7 @@ func TestKillAllScheduledTasks(t *testing.T) { {Message: m4, Score: t4.Unix()}, }, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "custom": {}, }, @@ -2134,7 +2134,7 @@ func TestKillAllScheduledTasks(t *testing.T) { }, "custom": {}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {}, "custom": { {Message: m3, Score: time.Now().Unix()}, @@ -2147,9 +2147,9 @@ func TestKillAllScheduledTasks(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) h.SeedAllScheduledQueues(t, r.client, tc.scheduled) - h.SeedAllDeadQueues(t, r.client, tc.dead) + h.SeedAllArchivedQueues(t, r.client, tc.archived) - got, err := r.KillAllScheduledTasks(tc.qname) + got, err := r.ArchiveAllScheduledTasks(tc.qname) if got != tc.want || err != nil { t.Errorf("(*RDB).KillAllScheduledTasks(%q) = %v, %v; want %v, nil", tc.qname, got, err, tc.want) @@ -2164,11 +2164,11 @@ func TestKillAllScheduledTasks(t *testing.T) { } } - for qname, want := range tc.wantDead { - gotDead := h.GetDeadEntries(t, r.client, qname) + for qname, want := range tc.wantArchived { + gotDead := h.GetArchivedEntries(t, r.client, qname) if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { t.Errorf("mismatch found in %q; (-want,+got)\n%s", - base.DeadKey(qname), diff) + base.ArchivedKey(qname), diff) } } } @@ -2185,15 +2185,15 @@ func TestDeleteDeadTask(t *testing.T) { t3 := time.Now().Add(-time.Hour) tests := []struct { - dead map[string][]base.Z - qname string - id uuid.UUID - score int64 - want error - wantDead map[string][]*base.TaskMessage + archived map[string][]base.Z + qname string + id uuid.UUID + score int64 + want error + wantArchived map[string][]*base.TaskMessage }{ { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": { {Message: m1, Score: t1.Unix()}, {Message: m2, Score: t2.Unix()}, @@ -2203,12 +2203,12 @@ func TestDeleteDeadTask(t *testing.T) { id: m1.ID, score: t1.Unix(), want: nil, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {m2}, }, }, { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": { {Message: m1, Score: t1.Unix()}, {Message: m2, Score: t2.Unix()}, @@ -2221,13 +2221,13 @@ func TestDeleteDeadTask(t *testing.T) { id: m3.ID, score: t3.Unix(), want: nil, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {m1, m2}, "custom": {}, }, }, { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": { {Message: m1, Score: t1.Unix()}, {Message: m2, Score: t2.Unix()}, @@ -2237,19 +2237,19 @@ func TestDeleteDeadTask(t *testing.T) { id: m1.ID, score: t2.Unix(), // id and score mismatch want: ErrTaskNotFound, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {m1, m2}, }, }, { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, }, qname: "default", id: m1.ID, score: t1.Unix(), want: ErrTaskNotFound, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {}, }, }, @@ -2257,18 +2257,18 @@ func TestDeleteDeadTask(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedAllDeadQueues(t, r.client, tc.dead) + h.SeedAllArchivedQueues(t, r.client, tc.archived) - got := r.DeleteDeadTask(tc.qname, tc.id, tc.score) + got := r.DeleteArchivedTask(tc.qname, tc.id, tc.score) if got != tc.want { t.Errorf("r.DeleteDeadTask(%q, %v, %v) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want) continue } - for qname, want := range tc.wantDead { - gotDead := h.GetDeadMessages(t, r.client, qname) + for qname, want := range tc.wantArchived { + gotDead := h.GetArchivedMessages(t, r.client, qname) if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadKey(qname), diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ArchivedKey(qname), diff) } } } @@ -2452,13 +2452,13 @@ func TestDeleteAllDeadTasks(t *testing.T) { m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") tests := []struct { - dead map[string][]base.Z - qname string - want int64 - wantDead map[string][]*base.TaskMessage + archived map[string][]base.Z + qname string + want int64 + wantArchived map[string][]*base.TaskMessage }{ { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": { {Message: m1, Score: time.Now().Unix()}, {Message: m2, Score: time.Now().Unix()}, @@ -2469,18 +2469,18 @@ func TestDeleteAllDeadTasks(t *testing.T) { }, qname: "default", want: 2, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {}, "custom": {m3}, }, }, { - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, }, qname: "default", want: 0, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {}, }, }, @@ -2488,19 +2488,19 @@ func TestDeleteAllDeadTasks(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedAllDeadQueues(t, r.client, tc.dead) + h.SeedAllArchivedQueues(t, r.client, tc.archived) - got, err := r.DeleteAllDeadTasks(tc.qname) + got, err := r.DeleteAllArchivedTasks(tc.qname) if err != nil { t.Errorf("r.DeleteAllDeadTasks(%q) returned error: %v", tc.qname, err) } if got != tc.want { t.Errorf("r.DeleteAllDeadTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want) } - for qname, want := range tc.wantDead { - gotDead := h.GetDeadMessages(t, r.client, qname) + for qname, want := range tc.wantArchived { + gotDead := h.GetArchivedMessages(t, r.client, qname) if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadKey(qname), diff) + t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ArchivedKey(qname), diff) } } } @@ -2643,7 +2643,7 @@ func TestRemoveQueue(t *testing.T) { inProgress map[string][]*base.TaskMessage scheduled map[string][]base.Z retry map[string][]base.Z - dead map[string][]base.Z + archived map[string][]base.Z qname string // queue to remove force bool }{ @@ -2664,7 +2664,7 @@ func TestRemoveQueue(t *testing.T) { "default": {}, "custom": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "custom": {}, }, @@ -2688,7 +2688,7 @@ func TestRemoveQueue(t *testing.T) { "default": {}, "custom": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "custom": {}, }, @@ -2703,7 +2703,7 @@ func TestRemoveQueue(t *testing.T) { h.SeedAllActiveQueues(t, r.client, tc.inProgress) h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllRetryQueues(t, r.client, tc.retry) - h.SeedAllDeadQueues(t, r.client, tc.dead) + h.SeedAllArchivedQueues(t, r.client, tc.archived) err := r.RemoveQueue(tc.qname, tc.force) if err != nil { @@ -2721,7 +2721,7 @@ func TestRemoveQueue(t *testing.T) { base.DeadlinesKey(tc.qname), base.ScheduledKey(tc.qname), base.RetryKey(tc.qname), - base.DeadKey(tc.qname), + base.ArchivedKey(tc.qname), } for _, key := range keys { if r.client.Exists(key).Val() != 0 { @@ -2745,7 +2745,7 @@ func TestRemoveQueueError(t *testing.T) { inProgress map[string][]*base.TaskMessage scheduled map[string][]base.Z retry map[string][]base.Z - dead map[string][]base.Z + archived map[string][]base.Z qname string // queue to remove force bool }{ @@ -2767,7 +2767,7 @@ func TestRemoveQueueError(t *testing.T) { "default": {}, "custom": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "custom": {}, }, @@ -2792,7 +2792,7 @@ func TestRemoveQueueError(t *testing.T) { "default": {}, "custom": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "custom": {}, }, @@ -2817,7 +2817,7 @@ func TestRemoveQueueError(t *testing.T) { "default": {}, "custom": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "custom": {}, }, @@ -2833,7 +2833,7 @@ func TestRemoveQueueError(t *testing.T) { h.SeedAllActiveQueues(t, r.client, tc.inProgress) h.SeedAllScheduledQueues(t, r.client, tc.scheduled) h.SeedAllRetryQueues(t, r.client, tc.retry) - h.SeedAllDeadQueues(t, r.client, tc.dead) + h.SeedAllArchivedQueues(t, r.client, tc.archived) got := r.RemoveQueue(tc.qname, tc.force) if got == nil { @@ -2866,10 +2866,10 @@ func TestRemoveQueueError(t *testing.T) { t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.RetryKey(qname), diff) } } - for qname, want := range tc.dead { - gotDead := h.GetDeadEntries(t, r.client, qname) + for qname, want := range tc.archived { + gotDead := h.GetArchivedEntries(t, r.client, qname) if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt); diff != "" { - t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.DeadKey(qname), diff) + t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.ArchivedKey(qname), diff) } } } @@ -3080,9 +3080,9 @@ func TestSchedulerEnqueueEvents(t *testing.T) { }{ { entryID: "entry123", - events: []*base.SchedulerEnqueueEvent{ - {TaskID: "task123", EnqueuedAt: oneDayAgo}, - {TaskID: "task789", EnqueuedAt: oneHourAgo}, + events: []*base.SchedulerEnqueueEvent{ + {TaskID: "task123", EnqueuedAt: oneDayAgo}, + {TaskID: "task789", EnqueuedAt: oneHourAgo}, {TaskID: "task456", EnqueuedAt: fiveHoursAgo}, }, // Recent events first diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 09a962d..9c514b6 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -381,22 +381,22 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e } const ( - maxDeadTasks = 10000 - deadExpirationInDays = 90 + maxArchiveSize = 10000 // maximum number of tasks in archive + archivedExpirationInDays = 90 // number of days before an archived task gets deleted permanently ) // KEYS[1] -> asynq:{}:active // KEYS[2] -> asynq:{}:deadlines -// KEYS[3] -> asynq:{}:dead +// KEYS[3] -> asynq:{}:archived // KEYS[4] -> asynq:{}:processed: // KEYS[5] -> asynq:{}:failed: -// ARGV[1] -> base.TaskMessage value to remove from base.ActiveQueue queue -// ARGV[2] -> base.TaskMessage value to add to Dead queue +// ARGV[1] -> base.TaskMessage value to remove +// ARGV[2] -> base.TaskMessage value to add // ARGV[3] -> died_at UNIX timestamp // ARGV[4] -> cutoff timestamp (e.g., 90 days ago) -// ARGV[5] -> max number of tasks in dead queue (e.g., 100) +// ARGV[5] -> max number of tasks in archive (e.g., 100) // ARGV[6] -> stats expiration timestamp -var killCmd = redis.NewScript(` +var archiveCmd = redis.NewScript(` if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then return redis.error_reply("NOT FOUND") end @@ -416,10 +416,9 @@ if tonumber(m) == 1 then end return redis.status_reply("OK")`) -// Kill sends the task to "dead" queue from active queue, assigning -// the error message to the task. -// It also trims the set by timestamp and set size. -func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { +// Archive sends the given task to archive, attaching the error message to the task. +// It also trims the archive by timestamp and set size. +func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error { msgToRemove, err := base.EncodeMessage(msg) if err != nil { return err @@ -431,13 +430,13 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { return err } now := time.Now() - limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago + limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago processedKey := base.ProcessedKey(msg.Queue, now) failedKey := base.FailedKey(msg.Queue, now) expireAt := now.Add(statsTTL) - return killCmd.Run(r.client, - []string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.DeadKey(msg.Queue), processedKey, failedKey}, - msgToRemove, msgToAdd, now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Err() + return archiveCmd.Run(r.client, + []string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.ArchivedKey(msg.Queue), processedKey, failedKey}, + msgToRemove, msgToAdd, now.Unix(), limit, maxArchiveSize, expireAt.Unix()).Err() } // CheckAndEnqueue checks for scheduled/retry tasks for the given queues diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index db777d8..7cef73b 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -1008,7 +1008,7 @@ func TestRetry(t *testing.T) { } } -func TestKill(t *testing.T) { +func TestArchive(t *testing.T) { r := setup(t) defer r.Close() now := time.Now() @@ -1058,11 +1058,11 @@ func TestKill(t *testing.T) { tests := []struct { inProgress map[string][]*base.TaskMessage deadlines map[string][]base.Z - dead map[string][]base.Z - target *base.TaskMessage // task to kill + archived map[string][]base.Z + target *base.TaskMessage // task to archive wantActive map[string][]*base.TaskMessage wantDeadlines map[string][]base.Z - wantDead map[string][]base.Z + wantArchived map[string][]base.Z }{ { inProgress: map[string][]*base.TaskMessage{ @@ -1074,7 +1074,7 @@ func TestKill(t *testing.T) { {Message: t2, Score: t2Deadline}, }, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": { {Message: t3, Score: now.Add(-time.Hour).Unix()}, }, @@ -1086,7 +1086,7 @@ func TestKill(t *testing.T) { wantDeadlines: map[string][]base.Z{ "default": {{Message: t2, Score: t2Deadline}}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": { {Message: h.TaskMessageWithError(*t1, errMsg), Score: now.Unix()}, {Message: t3, Score: now.Add(-time.Hour).Unix()}, @@ -1104,7 +1104,7 @@ func TestKill(t *testing.T) { {Message: t3, Score: t3Deadline}, }, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, }, target: t1, @@ -1117,7 +1117,7 @@ func TestKill(t *testing.T) { {Message: t3, Score: t3Deadline}, }, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": { {Message: h.TaskMessageWithError(*t1, errMsg), Score: now.Unix()}, }, @@ -1136,7 +1136,7 @@ func TestKill(t *testing.T) { {Message: t4, Score: t4Deadline}, }, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "custom": {}, }, @@ -1149,7 +1149,7 @@ func TestKill(t *testing.T) { "default": {{Message: t1, Score: t1Deadline}}, "custom": {}, }, - wantDead: map[string][]base.Z{ + wantArchived: map[string][]base.Z{ "default": {}, "custom": { {Message: h.TaskMessageWithError(*t4, errMsg), Score: now.Unix()}, @@ -1162,11 +1162,11 @@ func TestKill(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllActiveQueues(t, r.client, tc.inProgress) h.SeedAllDeadlines(t, r.client, tc.deadlines) - h.SeedAllDeadQueues(t, r.client, tc.dead) + h.SeedAllArchivedQueues(t, r.client, tc.archived) - err := r.Kill(tc.target, errMsg) + err := r.Archive(tc.target, errMsg) if err != nil { - t.Errorf("(*RDB).Kill(%v, %v) = %v, want nil", tc.target, errMsg, err) + t.Errorf("(*RDB).Archive(%v, %v) = %v, want nil", tc.target, errMsg, err) continue } @@ -1179,13 +1179,13 @@ func TestKill(t *testing.T) { for queue, want := range tc.wantDeadlines { gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue) if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" { - t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadlinesKey(queue), diff) + t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.DeadlinesKey(queue), diff) } } - for queue, want := range tc.wantDead { - gotDead := h.GetDeadEntries(t, r.client, queue) - if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { - t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadKey(queue), diff) + for queue, want := range tc.wantArchived { + gotArchived := h.GetArchivedEntries(t, r.client, queue) + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.ArchivedKey(queue), diff) } } diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go index c74679e..3696401 100644 --- a/internal/testbroker/testbroker.go +++ b/internal/testbroker/testbroker.go @@ -117,13 +117,13 @@ func (tb *TestBroker) Retry(msg *base.TaskMessage, processAt time.Time, errMsg s return tb.real.Retry(msg, processAt, errMsg) } -func (tb *TestBroker) Kill(msg *base.TaskMessage, errMsg string) error { +func (tb *TestBroker) Archive(msg *base.TaskMessage, errMsg string) error { tb.mu.Lock() defer tb.mu.Unlock() if tb.sleeping { return errRedisDown } - return tb.real.Kill(msg, errMsg) + return tb.real.Archive(msg, errMsg) } func (tb *TestBroker) CheckAndEnqueue(qnames ...string) error { diff --git a/processor.go b/processor.go index 8c57466..a0d5bcc 100644 --- a/processor.go +++ b/processor.go @@ -223,7 +223,7 @@ func (p *processor) exec() { // Note: One of three things should happen. // 1) Done -> Removes the message from Active // 2) Retry -> Removes the message from Active & Adds the message to Retry - // 3) Kill -> Removes the message from Active & Adds the message to Dead + // 3) Archive -> Removes the message from Active & Adds the message to archive if resErr != nil { p.retryOrKill(ctx, msg, resErr) return @@ -272,7 +272,7 @@ func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err } if msg.Retried >= msg.Retry || errors.Is(err, SkipRetry) { p.logger.Warnf("Retry exhausted for task id=%s", msg.ID) - p.kill(ctx, msg, err) + p.archive(ctx, msg, err) } else { p.retry(ctx, msg, err) } @@ -299,10 +299,10 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) { } } -func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) { - err := p.broker.Kill(msg, e.Error()) +func (p *processor) archive(ctx context.Context, msg *base.TaskMessage, e error) { + err := p.broker.Archive(msg, e.Error()) if err != nil { - errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.DeadKey(msg.Queue)) + errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.ActiveKey(msg.Queue), base.ArchivedKey(msg.Queue)) deadline, ok := ctx.Deadline() if !ok { panic("asynq: internal error: missing deadline in context") @@ -310,7 +310,7 @@ func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) { p.logger.Warnf("%s; Will retry syncing", errMsg) p.syncRequestCh <- &syncRequest{ fn: func() error { - return p.broker.Kill(msg, e.Error()) + return p.broker.Archive(msg, e.Error()) }, errMsg: errMsg, deadline: deadline, diff --git a/processor_test.go b/processor_test.go index f496971..1b5a279 100644 --- a/processor_test.go +++ b/processor_test.go @@ -318,7 +318,7 @@ func TestProcessorRetry(t *testing.T) { handler Handler // task handler wait time.Duration // wait duration between starting and stopping processor for this test case wantRetry []base.Z // tasks in retry queue at the end - wantDead []*base.TaskMessage // tasks in dead queue at the end + wantArchived []*base.TaskMessage // tasks in archived queue at the end wantErrCount int // number of times error handler should be called }{ { @@ -335,7 +335,7 @@ func TestProcessorRetry(t *testing.T) { {Message: h.TaskMessageAfterRetry(*m3, errMsg), Score: now.Add(time.Minute).Unix()}, {Message: h.TaskMessageAfterRetry(*m4, errMsg), Score: now.Add(time.Minute).Unix()}, }, - wantDead: []*base.TaskMessage{h.TaskMessageWithError(*m1, errMsg)}, + wantArchived: []*base.TaskMessage{h.TaskMessageWithError(*m1, errMsg)}, wantErrCount: 4, }, { @@ -348,7 +348,7 @@ func TestProcessorRetry(t *testing.T) { }), wait: 2 * time.Second, wantRetry: []base.Z{}, - wantDead: []*base.TaskMessage{ + wantArchived: []*base.TaskMessage{ h.TaskMessageWithError(*m1, SkipRetry.Error()), h.TaskMessageWithError(*m2, SkipRetry.Error()), }, @@ -364,7 +364,7 @@ func TestProcessorRetry(t *testing.T) { }), wait: 2 * time.Second, wantRetry: []base.Z{}, - wantDead: []*base.TaskMessage{ + wantArchived: []*base.TaskMessage{ h.TaskMessageWithError(*m1, wrappedSkipRetry.Error()), h.TaskMessageWithError(*m2, wrappedSkipRetry.Error()), }, @@ -427,9 +427,9 @@ func TestProcessorRetry(t *testing.T) { t.Errorf("%s: mismatch found in %q after running processor; (-want, +got)\n%s", tc.desc, base.RetryKey(base.DefaultQueueName), diff) } - gotDead := h.GetDeadMessages(t, r, base.DefaultQueueName) - if diff := cmp.Diff(tc.wantDead, gotDead, h.SortMsgOpt); diff != "" { - t.Errorf("%s: mismatch found in %q after running processor; (-want, +got)\n%s", tc.desc, base.DeadKey(base.DefaultQueueName), diff) + gotDead := h.GetArchivedMessages(t, r, base.DefaultQueueName) + if diff := cmp.Diff(tc.wantArchived, gotDead, h.SortMsgOpt); diff != "" { + t.Errorf("%s: mismatch found in %q after running processor; (-want, +got)\n%s", tc.desc, base.ArchivedKey(base.DefaultQueueName), diff) } if l := r.LLen(base.ActiveKey(base.DefaultQueueName)).Val(); l != 0 { diff --git a/recoverer.go b/recoverer.go index c0dce5e..43265b8 100644 --- a/recoverer.go +++ b/recoverer.go @@ -75,7 +75,7 @@ func (r *recoverer) start(wg *sync.WaitGroup) { const errMsg = "deadline exceeded" // TODO: better error message for _, msg := range msgs { if msg.Retried >= msg.Retry { - r.kill(msg, errMsg) + r.archive(msg, errMsg) } else { r.retry(msg, errMsg) } @@ -94,8 +94,8 @@ func (r *recoverer) retry(msg *base.TaskMessage, errMsg string) { } } -func (r *recoverer) kill(msg *base.TaskMessage, errMsg string) { - if err := r.broker.Kill(msg, errMsg); err != nil { - r.logger.Warnf("recoverer: could not move task to dead queue: %v", err) +func (r *recoverer) archive(msg *base.TaskMessage, errMsg string) { + if err := r.broker.Archive(msg, errMsg); err != nil { + r.logger.Warnf("recoverer: could not move task to archive: %v", err) } } diff --git a/recoverer_test.go b/recoverer_test.go index 02aa5ad..ae32674 100644 --- a/recoverer_test.go +++ b/recoverer_test.go @@ -37,11 +37,11 @@ func TestRecoverer(t *testing.T) { inProgress map[string][]*base.TaskMessage deadlines map[string][]base.Z retry map[string][]base.Z - dead map[string][]base.Z + archived map[string][]base.Z wantActive map[string][]*base.TaskMessage wantDeadlines map[string][]base.Z wantRetry map[string][]*base.TaskMessage - wantDead map[string][]*base.TaskMessage + wantArchived map[string][]*base.TaskMessage }{ { desc: "with one active task", @@ -54,7 +54,7 @@ func TestRecoverer(t *testing.T) { retry: map[string][]base.Z{ "default": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, }, wantActive: map[string][]*base.TaskMessage{ @@ -66,7 +66,7 @@ func TestRecoverer(t *testing.T) { wantRetry: map[string][]*base.TaskMessage{ "default": {h.TaskMessageAfterRetry(*t1, "deadline exceeded")}, }, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {}, }, }, @@ -84,7 +84,7 @@ func TestRecoverer(t *testing.T) { "default": {}, "critical": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "critical": {}, }, @@ -100,7 +100,7 @@ func TestRecoverer(t *testing.T) { "default": {}, "critical": {}, }, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {h.TaskMessageWithError(*t4, "deadline exceeded")}, "critical": {}, }, @@ -124,7 +124,7 @@ func TestRecoverer(t *testing.T) { "default": {}, "critical": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "critical": {}, }, @@ -140,7 +140,7 @@ func TestRecoverer(t *testing.T) { "default": {h.TaskMessageAfterRetry(*t1, "deadline exceeded")}, "critical": {}, }, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, }, @@ -164,7 +164,7 @@ func TestRecoverer(t *testing.T) { "default": {}, "cricial": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "cricial": {}, }, @@ -179,7 +179,7 @@ func TestRecoverer(t *testing.T) { "default": {h.TaskMessageAfterRetry(*t1, "deadline exceeded")}, "critical": {h.TaskMessageAfterRetry(*t3, "deadline exceeded")}, }, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, }, @@ -198,7 +198,7 @@ func TestRecoverer(t *testing.T) { "default": {}, "critical": {}, }, - dead: map[string][]base.Z{ + archived: map[string][]base.Z{ "default": {}, "critical": {}, }, @@ -214,7 +214,7 @@ func TestRecoverer(t *testing.T) { "default": {}, "critical": {}, }, - wantDead: map[string][]*base.TaskMessage{ + wantArchived: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, }, @@ -226,7 +226,7 @@ func TestRecoverer(t *testing.T) { h.SeedAllActiveQueues(t, r, tc.inProgress) h.SeedAllDeadlines(t, r, tc.deadlines) h.SeedAllRetryQueues(t, r, tc.retry) - h.SeedAllDeadQueues(t, r, tc.dead) + h.SeedAllArchivedQueues(t, r, tc.archived) recoverer := newRecoverer(recovererParams{ logger: testLogger, @@ -259,10 +259,10 @@ func TestRecoverer(t *testing.T) { t.Errorf("%s; mismatch found in %q: (-want, +got)\n%s", tc.desc, base.RetryKey(qname), diff) } } - for qname, want := range tc.wantDead { - gotDead := h.GetDeadMessages(t, r, qname) + for qname, want := range tc.wantArchived { + gotDead := h.GetArchivedMessages(t, r, qname) if diff := cmp.Diff(want, gotDead, h.SortMsgOpt); diff != "" { - t.Errorf("%s; mismatch found in %q: (-want, +got)\n%s", tc.desc, base.DeadKey(qname), diff) + t.Errorf("%s; mismatch found in %q: (-want, +got)\n%s", tc.desc, base.ArchivedKey(qname), diff) } } } diff --git a/server.go b/server.go index 261031b..fad354f 100644 --- a/server.go +++ b/server.go @@ -20,18 +20,17 @@ import ( "github.com/hibiken/asynq/internal/rdb" ) -// Server is responsible for managing the background-task processing. +// Server is responsible for managing the task processing. // // Server pulls tasks off queues and processes them. -// If the processing of a task is unsuccessful, server will -// schedule it for a retry. +// If the processing of a task is unsuccessful, server will schedule it for a retry. // A task will be retried until either the task gets processed successfully // or until it reaches its max retry count. // -// If a task exhausts its retries, it will be moved to the "dead" queue and -// will be kept in the queue for some time until a certain condition is met -// (e.g., queue size reaches a certain limit, or the task has been in the -// queue for a certain amount of time). +// If a task exhausts its retries, it will be moved to the archive and +// will be kept in the archive for some time until a certain condition is met +// (e.g., archive size reaches a certain limit, or the task has been in the +// archive for a certain amount of time). type Server struct { logger *log.Logger diff --git a/tools/asynq/README.md b/tools/asynq/README.md index 4545833..5384f5c 100644 --- a/tools/asynq/README.md +++ b/tools/asynq/README.md @@ -24,7 +24,7 @@ To view details on any command, use `asynq help `. - `asynq stats` - `asynq queue [ls inspect history rm pause unpause]` -- `asynq task [ls cancel delete kill run delete-all kill-all run-all]` +- `asynq task [ls cancel delete archive run delete-all archive-all run-all]` - `asynq server [ls]` ### Global flags diff --git a/tools/asynq/cmd/queue.go b/tools/asynq/cmd/queue.go index 7426f50..66f71af 100644 --- a/tools/asynq/cmd/queue.go +++ b/tools/asynq/cmd/queue.go @@ -149,9 +149,9 @@ func printQueueStats(s *asynq.QueueStats) { fmt.Printf("Paused: %t\n\n", s.Paused) bold.Println("Task Count by State") printTable( - []string{"active", "pending", "scheduled", "retry", "dead"}, + []string{"active", "pending", "scheduled", "retry", "archived"}, func(w io.Writer, tmpl string) { - fmt.Fprintf(w, tmpl, s.Active, s.Pending, s.Scheduled, s.Retry, s.Dead) + fmt.Fprintf(w, tmpl, s.Active, s.Pending, s.Scheduled, s.Retry, s.Archived) }, ) fmt.Println() diff --git a/tools/asynq/cmd/stats.go b/tools/asynq/cmd/stats.go index 810783b..d569ddb 100644 --- a/tools/asynq/cmd/stats.go +++ b/tools/asynq/cmd/stats.go @@ -57,7 +57,7 @@ type AggregateStats struct { Pending int Scheduled int Retry int - Dead int + Archived int Processed int Failed int Timestamp time.Time @@ -84,7 +84,7 @@ func stats(cmd *cobra.Command, args []string) { aggStats.Pending += s.Pending aggStats.Scheduled += s.Scheduled aggStats.Retry += s.Retry - aggStats.Dead += s.Dead + aggStats.Archived += s.Archived aggStats.Processed += s.Processed aggStats.Failed += s.Failed aggStats.Timestamp = s.Timestamp @@ -126,9 +126,9 @@ func stats(cmd *cobra.Command, args []string) { func printStatsByState(s *AggregateStats) { format := strings.Repeat("%v\t", 5) + "\n" tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) - fmt.Fprintf(tw, format, "active", "pending", "scheduled", "retry", "dead") + fmt.Fprintf(tw, format, "active", "pending", "scheduled", "retry", "archived") fmt.Fprintf(tw, format, "----------", "--------", "---------", "-----", "----") - fmt.Fprintf(tw, format, s.Active, s.Pending, s.Scheduled, s.Retry, s.Dead) + fmt.Fprintf(tw, format, s.Active, s.Pending, s.Scheduled, s.Retry, s.Archived) tw.Flush() } diff --git a/tools/asynq/cmd/task.go b/tools/asynq/cmd/task.go index c727fd1..d625a6c 100644 --- a/tools/asynq/cmd/task.go +++ b/tools/asynq/cmd/task.go @@ -26,11 +26,11 @@ func init() { taskCmd.AddCommand(taskCancelCmd) - taskCmd.AddCommand(taskKillCmd) - taskKillCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs") - taskKillCmd.Flags().StringP("key", "k", "", "key of the task") - taskKillCmd.MarkFlagRequired("queue") - taskKillCmd.MarkFlagRequired("key") + taskCmd.AddCommand(taskArchiveCmd) + taskArchiveCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs") + taskArchiveCmd.Flags().StringP("key", "k", "", "key of the task") + taskArchiveCmd.MarkFlagRequired("queue") + taskArchiveCmd.MarkFlagRequired("key") taskCmd.AddCommand(taskDeleteCmd) taskDeleteCmd.Flags().StringP("queue", "q", "", "queue to which the task belongs") @@ -44,11 +44,11 @@ func init() { taskRunCmd.MarkFlagRequired("queue") taskRunCmd.MarkFlagRequired("key") - taskCmd.AddCommand(taskKillAllCmd) - taskKillAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong") - taskKillAllCmd.Flags().StringP("state", "s", "", "state of the tasks") - taskKillAllCmd.MarkFlagRequired("queue") - taskKillAllCmd.MarkFlagRequired("state") + taskCmd.AddCommand(taskArchiveAllCmd) + taskArchiveAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong") + taskArchiveAllCmd.Flags().StringP("state", "s", "", "state of the tasks") + taskArchiveAllCmd.MarkFlagRequired("queue") + taskArchiveAllCmd.MarkFlagRequired("state") taskCmd.AddCommand(taskDeleteAllCmd) taskDeleteAllCmd.Flags().StringP("queue", "q", "", "queue to which the tasks belong") @@ -78,7 +78,7 @@ The value for the state flag should be one of: - pending - scheduled - retry -- dead +- archived List opeartion paginates the result set. By default, the command fetches the first 30 tasks. @@ -100,9 +100,9 @@ var taskCancelCmd = &cobra.Command{ Run: taskCancel, } -var taskKillCmd = &cobra.Command{ - Use: "kill --queue=QUEUE --key=KEY", - Short: "Kill a task with the given key", +var taskArchiveCmd = &cobra.Command{ + Use: "archive --queue=QUEUE --key=KEY", + Short: "Archive a task with the given key", Args: cobra.NoArgs, Run: taskKill, } @@ -121,9 +121,9 @@ var taskRunCmd = &cobra.Command{ Run: taskRun, } -var taskKillAllCmd = &cobra.Command{ - Use: "kill-all --queue=QUEUE --state=STATE", - Short: "Kill all tasks in the given state", +var taskArchiveAllCmd = &cobra.Command{ + Use: "archive-all --queue=QUEUE --state=STATE", + Short: "Archive all tasks in the given state", Args: cobra.NoArgs, Run: taskKillAll, } @@ -173,8 +173,8 @@ func taskList(cmd *cobra.Command, args []string) { listScheduledTasks(qname, pageNum, pageSize) case "retry": listRetryTasks(qname, pageNum, pageSize) - case "dead": - listDeadTasks(qname, pageNum, pageSize) + case "archived": + listArchivedTasks(qname, pageNum, pageSize) default: fmt.Printf("error: state=%q is not supported\n", state) os.Exit(1) @@ -273,7 +273,7 @@ func listRetryTasks(qname string, pageNum, pageSize int) { ) } -func listDeadTasks(qname string, pageNum, pageSize int) { +func listArchivedTasks(qname string, pageNum, pageSize int) { i := createInspector() tasks, err := i.ListDeadTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) if err != nil { @@ -281,7 +281,7 @@ func listDeadTasks(qname string, pageNum, pageSize int) { os.Exit(1) } if len(tasks) == 0 { - fmt.Printf("No dead tasks in %q queue\n", qname) + fmt.Printf("No archived tasks in %q queue\n", qname) return } printTable( @@ -323,7 +323,7 @@ func taskKill(cmd *cobra.Command, args []string) { fmt.Printf("error: %v\n", err) os.Exit(1) } - fmt.Println("task transitioned to dead state") + fmt.Println("task transitioned to archived state") } func taskDelete(cmd *cobra.Command, args []string) { @@ -395,7 +395,7 @@ func taskKillAll(cmd *cobra.Command, args []string) { fmt.Printf("error: %v\n", err) os.Exit(1) } - fmt.Printf("%d tasks transitioned to dead state\n", n) + fmt.Printf("%d tasks transitioned to archived state\n", n) } func taskDeleteAll(cmd *cobra.Command, args []string) { @@ -417,8 +417,8 @@ func taskDeleteAll(cmd *cobra.Command, args []string) { n, err = i.DeleteAllScheduledTasks(qname) case "retry": n, err = i.DeleteAllRetryTasks(qname) - case "dead": - n, err = i.DeleteAllDeadTasks(qname) + case "archived": + n, err = i.DeleteAllArchivedTasks(qname) default: fmt.Printf("error: unsupported state %q\n", state) os.Exit(1) @@ -449,8 +449,8 @@ func taskRunAll(cmd *cobra.Command, args []string) { n, err = i.RunAllScheduledTasks(qname) case "retry": n, err = i.RunAllRetryTasks(qname) - case "dead": - n, err = i.RunAllDeadTasks(qname) + case "archived": + n, err = i.RunAllArchivedTasks(qname) default: fmt.Printf("error: unsupported state %q\n", state) os.Exit(1)