diff --git a/CHANGELOG.md b/CHANGELOG.md index 84cd41c..927880a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - `BaseContext` is introduced in `Config` to specify callback hook to provide a base `context` from which `Handler` `context` is derived +- `IsOrphaned` field is added to `TaskInfo` to describe a task left in active state with no worker processing it. ### Changed diff --git a/asynq.go b/asynq.go index 752c6e7..234d4b9 100644 --- a/asynq.go +++ b/asynq.go @@ -101,6 +101,14 @@ type TaskInfo struct { // zero if not applicable. NextProcessAt time.Time + // IsOrphaned describes whether the task is left in active state with no worker processing it. + // An orphaned task indicates that the worker has crashed or experienced network failures and was not able to + // extend its lease on the task. + // + // This task will be recovered by running a server against the queue the task is in. + // This field is only applicable to tasks with TaskStateActive. + IsOrphaned bool + // Retention is duration of the retention period after the task is successfully processed. Retention time.Duration diff --git a/inspector.go b/inspector.go index f9bf17b..d636b34 100644 --- a/inspector.go +++ b/inspector.go @@ -308,16 +308,28 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskIn case err != nil: return nil, fmt.Errorf("asynq: %v", err) } + expired, err := i.rdb.ListLeaseExpired(time.Now(), qname) + if err != nil { + return nil, fmt.Errorf("asynq: %v", err) + } + expiredSet := make(map[string]struct{}) // set of expired message IDs + for _, msg := range expired { + expiredSet[msg.ID] = struct{}{} + } var tasks []*TaskInfo for _, i := range infos { - tasks = append(tasks, newTaskInfo( + t := newTaskInfo( i.Message, i.State, i.NextProcessAt, i.Result, - )) + ) + if _, ok := expiredSet[i.Message.ID]; ok { + t.IsOrphaned = true + } + tasks = append(tasks, t) } - return tasks, err + return tasks, nil } // ListScheduledTasks retrieves scheduled tasks from the specified queue. diff --git a/inspector_test.go b/inspector_test.go index f826a6a..4af0941 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -745,6 +745,12 @@ func TestInspectorListPendingTasks(t *testing.T) { } } +func newOrphanedTaskInfo(msg *base.TaskMessage) *TaskInfo { + info := newTaskInfo(msg, base.TaskStateActive, time.Time{}, nil) + info.IsOrphaned = true + return info +} + func TestInspectorListActiveTasks(t *testing.T) { r := setup(t) defer r.Close() @@ -754,10 +760,12 @@ func TestInspectorListActiveTasks(t *testing.T) { m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") inspector := NewInspector(getRedisConnOpt(t)) + now := time.Now() tests := []struct { desc string active map[string][]*base.TaskMessage + lease map[string][]base.Z qname string want []*TaskInfo }{ @@ -767,10 +775,42 @@ func TestInspectorListActiveTasks(t *testing.T) { "default": {m1, m2}, "custom": {m3, m4}, }, + lease: map[string][]base.Z{ + "default": { + {Message: m1, Score: now.Add(20 * time.Second).Unix()}, + {Message: m2, Score: now.Add(20 * time.Second).Unix()}, + }, + "custom": { + {Message: m3, Score: now.Add(20 * time.Second).Unix()}, + {Message: m4, Score: now.Add(20 * time.Second).Unix()}, + }, + }, + qname: "custom", + want: []*TaskInfo{ + newTaskInfo(m3, base.TaskStateActive, time.Time{}, nil), + newTaskInfo(m4, base.TaskStateActive, time.Time{}, nil), + }, + }, + { + desc: "with an orphaned task", + active: map[string][]*base.TaskMessage{ + "default": {m1, m2}, + "custom": {m3, m4}, + }, + lease: map[string][]base.Z{ + "default": { + {Message: m1, Score: now.Add(20 * time.Second).Unix()}, + {Message: m2, Score: now.Add(-10 * time.Second).Unix()}, // orphaned task + }, + "custom": { + {Message: m3, Score: now.Add(20 * time.Second).Unix()}, + {Message: m4, Score: now.Add(20 * time.Second).Unix()}, + }, + }, qname: "default", want: []*TaskInfo{ newTaskInfo(m1, base.TaskStateActive, time.Time{}, nil), - newTaskInfo(m2, base.TaskStateActive, time.Time{}, nil), + newOrphanedTaskInfo(m2), }, }, } @@ -778,6 +818,7 @@ func TestInspectorListActiveTasks(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r) h.SeedAllActiveQueues(t, r, tc.active) + h.SeedAllLease(t, r, tc.lease) got, err := inspector.ListActiveTasks(tc.qname) if err != nil {