mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Add IsOrphaned field to TaskInfo
This commit is contained in:
parent
9b63e23274
commit
cea5110d15
@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
### Added
|
||||
|
||||
- `BaseContext` is introduced in `Config` to specify callback hook to provide a base `context` from which `Handler` `context` is derived
|
||||
- `IsOrphaned` field is added to `TaskInfo` to describe a task left in active state with no worker processing it.
|
||||
|
||||
### Changed
|
||||
|
||||
|
8
asynq.go
8
asynq.go
@ -101,6 +101,14 @@ type TaskInfo struct {
|
||||
// zero if not applicable.
|
||||
NextProcessAt time.Time
|
||||
|
||||
// IsOrphaned describes whether the task is left in active state with no worker processing it.
|
||||
// An orphaned task indicates that the worker has crashed or experienced network failures and was not able to
|
||||
// extend its lease on the task.
|
||||
//
|
||||
// This task will be recovered by running a server against the queue the task is in.
|
||||
// This field is only applicable to tasks with TaskStateActive.
|
||||
IsOrphaned bool
|
||||
|
||||
// Retention is duration of the retention period after the task is successfully processed.
|
||||
Retention time.Duration
|
||||
|
||||
|
18
inspector.go
18
inspector.go
@ -308,16 +308,28 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskIn
|
||||
case err != nil:
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
}
|
||||
expired, err := i.rdb.ListLeaseExpired(time.Now(), qname)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("asynq: %v", err)
|
||||
}
|
||||
expiredSet := make(map[string]struct{}) // set of expired message IDs
|
||||
for _, msg := range expired {
|
||||
expiredSet[msg.ID] = struct{}{}
|
||||
}
|
||||
var tasks []*TaskInfo
|
||||
for _, i := range infos {
|
||||
tasks = append(tasks, newTaskInfo(
|
||||
t := newTaskInfo(
|
||||
i.Message,
|
||||
i.State,
|
||||
i.NextProcessAt,
|
||||
i.Result,
|
||||
))
|
||||
)
|
||||
if _, ok := expiredSet[i.Message.ID]; ok {
|
||||
t.IsOrphaned = true
|
||||
}
|
||||
return tasks, err
|
||||
tasks = append(tasks, t)
|
||||
}
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
// ListScheduledTasks retrieves scheduled tasks from the specified queue.
|
||||
|
@ -745,6 +745,12 @@ func TestInspectorListPendingTasks(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func newOrphanedTaskInfo(msg *base.TaskMessage) *TaskInfo {
|
||||
info := newTaskInfo(msg, base.TaskStateActive, time.Time{}, nil)
|
||||
info.IsOrphaned = true
|
||||
return info
|
||||
}
|
||||
|
||||
func TestInspectorListActiveTasks(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
@ -754,10 +760,12 @@ func TestInspectorListActiveTasks(t *testing.T) {
|
||||
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
|
||||
|
||||
inspector := NewInspector(getRedisConnOpt(t))
|
||||
now := time.Now()
|
||||
|
||||
tests := []struct {
|
||||
desc string
|
||||
active map[string][]*base.TaskMessage
|
||||
lease map[string][]base.Z
|
||||
qname string
|
||||
want []*TaskInfo
|
||||
}{
|
||||
@ -767,10 +775,42 @@ func TestInspectorListActiveTasks(t *testing.T) {
|
||||
"default": {m1, m2},
|
||||
"custom": {m3, m4},
|
||||
},
|
||||
lease: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: now.Add(20 * time.Second).Unix()},
|
||||
{Message: m2, Score: now.Add(20 * time.Second).Unix()},
|
||||
},
|
||||
"custom": {
|
||||
{Message: m3, Score: now.Add(20 * time.Second).Unix()},
|
||||
{Message: m4, Score: now.Add(20 * time.Second).Unix()},
|
||||
},
|
||||
},
|
||||
qname: "custom",
|
||||
want: []*TaskInfo{
|
||||
newTaskInfo(m3, base.TaskStateActive, time.Time{}, nil),
|
||||
newTaskInfo(m4, base.TaskStateActive, time.Time{}, nil),
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "with an orphaned task",
|
||||
active: map[string][]*base.TaskMessage{
|
||||
"default": {m1, m2},
|
||||
"custom": {m3, m4},
|
||||
},
|
||||
lease: map[string][]base.Z{
|
||||
"default": {
|
||||
{Message: m1, Score: now.Add(20 * time.Second).Unix()},
|
||||
{Message: m2, Score: now.Add(-10 * time.Second).Unix()}, // orphaned task
|
||||
},
|
||||
"custom": {
|
||||
{Message: m3, Score: now.Add(20 * time.Second).Unix()},
|
||||
{Message: m4, Score: now.Add(20 * time.Second).Unix()},
|
||||
},
|
||||
},
|
||||
qname: "default",
|
||||
want: []*TaskInfo{
|
||||
newTaskInfo(m1, base.TaskStateActive, time.Time{}, nil),
|
||||
newTaskInfo(m2, base.TaskStateActive, time.Time{}, nil),
|
||||
newOrphanedTaskInfo(m2),
|
||||
},
|
||||
},
|
||||
}
|
||||
@ -778,6 +818,7 @@ func TestInspectorListActiveTasks(t *testing.T) {
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r)
|
||||
h.SeedAllActiveQueues(t, r, tc.active)
|
||||
h.SeedAllLease(t, r, tc.lease)
|
||||
|
||||
got, err := inspector.ListActiveTasks(tc.qname)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user