diff --git a/asynq.go b/asynq.go index 5e452ea..a918cc4 100644 --- a/asynq.go +++ b/asynq.go @@ -38,71 +38,87 @@ func NewTask(typename string, payload []byte) *Task { // A TaskInfo describes a task and its metadata. type TaskInfo struct { - msg *base.TaskMessage - state base.TaskState - nextProcessAt time.Time + // ID is the identifier of the task. + ID string + + // Queue is the name of the queue in which the task belongs. + Queue string + + // Type is the type name of the task. + Type string + + // Payload is the payload data of the task. + Payload []byte + + // State indicates the task state. + State TaskState + + // MaxRetry is the maximum number of times the task can be retried. + MaxRetry int + + // Retried is the number of times the task has retried so far. + Retried int + + // LastErr is the error message from the last failure. + LastErr string + + // LastFailedAt is the time time of the last failure if any. + // If the task has no failures, LastFailedAt is zero time (i.e. time.Time{}). + LastFailedAt time.Time + + // Timeout is the duration the task can be processed by Handler before being retried, + // zero if not specified + Timeout time.Duration + + // Deadline is the deadline for the task, zero value if not specified. + Deadline time.Time + + // NextProcessAt is the time the task is scheduled to be processed, + // zero if not applicable. + NextProcessAt time.Time } -// ID returns the id of the task. -func (info *TaskInfo) ID() string { return info.msg.ID.String() } +func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time.Time) *TaskInfo { + info := TaskInfo{ + ID: msg.ID.String(), + Queue: msg.Queue, + Type: msg.Type, + Payload: msg.Payload, // Do we need to make a copy? + MaxRetry: msg.Retry, + Retried: msg.Retried, + LastErr: msg.ErrorMsg, + Timeout: time.Duration(msg.Timeout) * time.Second, + NextProcessAt: nextProcessAt, + } + if msg.LastFailedAt == 0 { + info.LastFailedAt = time.Time{} + } else { + info.LastFailedAt = time.Unix(msg.LastFailedAt, 0) + } -// Queue returns the name of the queue in which the task belongs. -func (info *TaskInfo) Queue() string { return info.msg.Queue } + if msg.Deadline == 0 { + info.Deadline = time.Time{} + } else { + info.Deadline = time.Unix(msg.Deadline, 0) + } -// Type returns the type name of the task. -func (info *TaskInfo) Type() string { return info.msg.Type } - -// Payload returns the payload data of the task. -func (info *TaskInfo) Payload() []byte { return info.msg.Payload } - -func (info *TaskInfo) State() TaskState { - switch info.state { + switch state { case base.TaskStateActive: - return TaskStateActive + info.State = TaskStateActive case base.TaskStatePending: - return TaskStatePending + info.State = TaskStatePending case base.TaskStateScheduled: - return TaskStateScheduled + info.State = TaskStateScheduled case base.TaskStateRetry: - return TaskStateRetry + info.State = TaskStateRetry case base.TaskStateArchived: - return TaskStateArchived + info.State = TaskStateArchived + default: + panic(fmt.Sprintf("internal error: unknown state: %d", state)) } - panic("internal error: unknown state in TaskInfo") + return &info } -// MaxRetry returns the maximum number of times the task can be retried. -func (info *TaskInfo) MaxRetry() int { return info.msg.Retry } - -// Retried returns the number of times the task has retried so far. -func (info *TaskInfo) Retried() int { return info.msg.Retried } - -// LastErr returns the error message from the last failure. -// If the task has no failures, returns an empty string. -func (info *TaskInfo) LastErr() string { return info.msg.ErrorMsg } - -// LastFailedAt returns the time of the last failure if any. -// If the task has no failures, returns zero time. -func (info *TaskInfo) LastFailedAt() time.Time { return time.Unix(info.msg.LastFailedAt, 0) } - -// Timeout returns the duration the task can be processed by Handler before being retried, -// zero if not specified -func (info *TaskInfo) Timeout() time.Duration { - return time.Duration(info.msg.Timeout) * time.Second -} - -// Deadline returns the deadline for the task, zero value if not specified. -func (info *TaskInfo) Deadline() time.Time { - if info.msg.Deadline == 0 { - return time.Time{} - } - return time.Unix(info.msg.Deadline, 0) -} - -// NextProcessAt returns the time the task is scheduled to be processed, -// zero if not applicable. -func (info *TaskInfo) NextProcessAt() time.Time { return info.nextProcessAt } - // TaskState denotes the state of a task. type TaskState int diff --git a/client.go b/client.go index 10720bc..cf67461 100644 --- a/client.go +++ b/client.go @@ -320,7 +320,7 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) { case err != nil: return nil, err } - return &TaskInfo{msg, state, opt.processAt}, nil + return newTaskInfo(msg, state, opt.processAt), nil } func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error { diff --git a/client_test.go b/client_test.go index a186ade..bc2a4ee 100644 --- a/client_test.go +++ b/client_test.go @@ -42,15 +42,17 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { processAt: now, opts: []Option{}, wantInfo: &TaskInfo{ - msg: &base.TaskMessage{ - Type: task.Type(), - Payload: task.Payload(), - Queue: "default", - Retry: defaultMaxRetry, - Timeout: int64(defaultTimeout.Seconds()), - }, - state: base.TaskStatePending, - nextProcessAt: now, + Queue: "default", + Type: task.Type(), + Payload: task.Payload(), + State: TaskStatePending, + MaxRetry: defaultMaxRetry, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -74,16 +76,17 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { processAt: oneHourLater, opts: []Option{}, wantInfo: &TaskInfo{ - msg: &base.TaskMessage{ - Type: task.Type(), - Payload: task.Payload(), - Retry: defaultMaxRetry, - Queue: "default", - Timeout: int64(defaultTimeout.Seconds()), - Deadline: noDeadline.Unix(), - }, - state: base.TaskStateScheduled, - nextProcessAt: oneHourLater, + Queue: "default", + Type: task.Type(), + Payload: task.Payload(), + State: TaskStateScheduled, + MaxRetry: defaultMaxRetry, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: oneHourLater, }, wantPending: map[string][]*base.TaskMessage{ "default": {}, @@ -116,8 +119,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { continue } cmpOptions := []cmp.Option{ - cmp.AllowUnexported(TaskInfo{}), - cmpopts.IgnoreFields(base.TaskMessage{}, "ID"), + cmpopts.IgnoreFields(TaskInfo{}, "ID"), cmpopts.EquateApproxTime(500 * time.Millisecond), } if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" { @@ -162,16 +164,17 @@ func TestClientEnqueue(t *testing.T) { MaxRetry(3), }, wantInfo: &TaskInfo{ - msg: &base.TaskMessage{ - Type: task.Type(), - Payload: task.Payload(), - Retry: 3, - Queue: "default", - Timeout: int64(defaultTimeout.Seconds()), - Deadline: noDeadline.Unix(), - }, - state: base.TaskStatePending, - nextProcessAt: now, + Queue: "default", + Type: task.Type(), + Payload: task.Payload(), + State: TaskStatePending, + MaxRetry: 3, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -193,16 +196,17 @@ func TestClientEnqueue(t *testing.T) { MaxRetry(-2), }, wantInfo: &TaskInfo{ - msg: &base.TaskMessage{ - Type: task.Type(), - Payload: task.Payload(), - Retry: 0, // Retry count should be set to zero - Queue: "default", - Timeout: int64(defaultTimeout.Seconds()), - Deadline: noDeadline.Unix(), - }, - state: base.TaskStatePending, - nextProcessAt: now, + Queue: "default", + Type: task.Type(), + Payload: task.Payload(), + State: TaskStatePending, + MaxRetry: 0, // Retry count should be set to zero + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -225,16 +229,17 @@ func TestClientEnqueue(t *testing.T) { MaxRetry(10), }, wantInfo: &TaskInfo{ - msg: &base.TaskMessage{ - Type: task.Type(), - Payload: task.Payload(), - Retry: 10, // Last option takes precedence - Queue: "default", - Timeout: int64(defaultTimeout.Seconds()), - Deadline: noDeadline.Unix(), - }, - state: base.TaskStatePending, - nextProcessAt: now, + Queue: "default", + Type: task.Type(), + Payload: task.Payload(), + State: TaskStatePending, + MaxRetry: 10, // Last option takes precedence + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -256,16 +261,17 @@ func TestClientEnqueue(t *testing.T) { Queue("custom"), }, wantInfo: &TaskInfo{ - msg: &base.TaskMessage{ - Type: task.Type(), - Payload: task.Payload(), - Retry: defaultMaxRetry, - Queue: "custom", - Timeout: int64(defaultTimeout.Seconds()), - Deadline: noDeadline.Unix(), - }, - state: base.TaskStatePending, - nextProcessAt: now, + Queue: "custom", + Type: task.Type(), + Payload: task.Payload(), + State: TaskStatePending, + MaxRetry: defaultMaxRetry, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "custom": { @@ -287,16 +293,17 @@ func TestClientEnqueue(t *testing.T) { Queue("HIGH"), }, wantInfo: &TaskInfo{ - msg: &base.TaskMessage{ - Type: task.Type(), - Payload: task.Payload(), - Retry: defaultMaxRetry, - Queue: "high", - Timeout: int64(defaultTimeout.Seconds()), - Deadline: noDeadline.Unix(), - }, - state: base.TaskStatePending, - nextProcessAt: now, + Queue: "high", + Type: task.Type(), + Payload: task.Payload(), + State: TaskStatePending, + MaxRetry: defaultMaxRetry, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "high": { @@ -318,16 +325,17 @@ func TestClientEnqueue(t *testing.T) { Timeout(20 * time.Second), }, wantInfo: &TaskInfo{ - msg: &base.TaskMessage{ - Type: task.Type(), - Payload: task.Payload(), - Retry: defaultMaxRetry, - Queue: "default", - Timeout: 20, - Deadline: noDeadline.Unix(), - }, - state: base.TaskStatePending, - nextProcessAt: now, + Queue: "default", + Type: task.Type(), + Payload: task.Payload(), + State: TaskStatePending, + MaxRetry: defaultMaxRetry, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: 20 * time.Second, + Deadline: time.Time{}, + NextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -349,16 +357,17 @@ func TestClientEnqueue(t *testing.T) { Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)), }, wantInfo: &TaskInfo{ - msg: &base.TaskMessage{ - Type: task.Type(), - Payload: task.Payload(), - Retry: defaultMaxRetry, - Queue: "default", - Timeout: int64(noTimeout.Seconds()), - Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix(), - }, - state: base.TaskStatePending, - nextProcessAt: now, + Queue: "default", + Type: task.Type(), + Payload: task.Payload(), + State: TaskStatePending, + MaxRetry: defaultMaxRetry, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: noTimeout, + Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC), + NextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -381,16 +390,17 @@ func TestClientEnqueue(t *testing.T) { Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)), }, wantInfo: &TaskInfo{ - msg: &base.TaskMessage{ - Type: task.Type(), - Payload: task.Payload(), - Retry: defaultMaxRetry, - Queue: "default", - Timeout: 20, - Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix(), - }, - state: base.TaskStatePending, - nextProcessAt: now, + Queue: "default", + Type: task.Type(), + Payload: task.Payload(), + State: TaskStatePending, + MaxRetry: defaultMaxRetry, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: 20 * time.Second, + Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC), + NextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -416,8 +426,7 @@ func TestClientEnqueue(t *testing.T) { continue } cmpOptions := []cmp.Option{ - cmp.AllowUnexported(TaskInfo{}), - cmpopts.IgnoreFields(base.TaskMessage{}, "ID"), + cmpopts.IgnoreFields(TaskInfo{}, "ID"), cmpopts.EquateApproxTime(500 * time.Millisecond), } if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" { @@ -457,16 +466,17 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { delay: 1 * time.Hour, opts: []Option{}, wantInfo: &TaskInfo{ - msg: &base.TaskMessage{ - Type: task.Type(), - Payload: task.Payload(), - Retry: defaultMaxRetry, - Queue: "default", - Timeout: int64(defaultTimeout.Seconds()), - Deadline: noDeadline.Unix(), - }, - state: base.TaskStateScheduled, - nextProcessAt: time.Now().Add(1 * time.Hour), + Queue: "default", + Type: task.Type(), + Payload: task.Payload(), + State: TaskStateScheduled, + MaxRetry: defaultMaxRetry, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: time.Now().Add(1 * time.Hour), }, wantPending: map[string][]*base.TaskMessage{ "default": {}, @@ -493,16 +503,17 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { delay: 0, opts: []Option{}, wantInfo: &TaskInfo{ - msg: &base.TaskMessage{ - Type: task.Type(), - Payload: task.Payload(), - Retry: defaultMaxRetry, - Queue: "default", - Timeout: int64(defaultTimeout.Seconds()), - Deadline: noDeadline.Unix(), - }, - state: base.TaskStatePending, - nextProcessAt: now, + Queue: "default", + Type: task.Type(), + Payload: task.Payload(), + State: TaskStatePending, + MaxRetry: defaultMaxRetry, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: now, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -532,8 +543,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { continue } cmpOptions := []cmp.Option{ - cmp.AllowUnexported(TaskInfo{}), - cmpopts.IgnoreFields(base.TaskMessage{}, "ID"), + cmpopts.IgnoreFields(TaskInfo{}, "ID"), cmpopts.EquateApproxTime(500 * time.Millisecond), } if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" { @@ -607,16 +617,17 @@ func TestClientDefaultOptions(t *testing.T) { opts: []Option{}, task: NewTask("feed:import", nil), wantInfo: &TaskInfo{ - msg: &base.TaskMessage{ - Type: "feed:import", - Payload: nil, - Retry: defaultMaxRetry, - Queue: "feed", - Timeout: int64(defaultTimeout.Seconds()), - Deadline: noDeadline.Unix(), - }, - state: base.TaskStatePending, - nextProcessAt: now, + Queue: "feed", + Type: "feed:import", + Payload: nil, + State: TaskStatePending, + MaxRetry: defaultMaxRetry, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: now, }, queue: "feed", want: &base.TaskMessage{ @@ -634,16 +645,17 @@ func TestClientDefaultOptions(t *testing.T) { opts: []Option{}, task: NewTask("feed:import", nil), wantInfo: &TaskInfo{ - msg: &base.TaskMessage{ - Type: "feed:import", - Payload: nil, - Retry: 5, - Queue: "feed", - Timeout: int64(defaultTimeout.Seconds()), - Deadline: noDeadline.Unix(), - }, - state: base.TaskStatePending, - nextProcessAt: now, + Queue: "feed", + Type: "feed:import", + Payload: nil, + State: TaskStatePending, + MaxRetry: 5, + Retried: 0, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: now, }, queue: "feed", want: &base.TaskMessage{ @@ -661,16 +673,16 @@ func TestClientDefaultOptions(t *testing.T) { opts: []Option{Queue("critical")}, task: NewTask("feed:import", nil), wantInfo: &TaskInfo{ - msg: &base.TaskMessage{ - Type: "feed:import", - Payload: nil, - Retry: 5, - Queue: "critical", - Timeout: int64(defaultTimeout.Seconds()), - Deadline: noDeadline.Unix(), - }, - state: base.TaskStatePending, - nextProcessAt: now, + Queue: "critical", + Type: "feed:import", + Payload: nil, + State: TaskStatePending, + MaxRetry: 5, + LastErr: "", + LastFailedAt: time.Time{}, + Timeout: defaultTimeout, + Deadline: time.Time{}, + NextProcessAt: now, }, queue: "critical", want: &base.TaskMessage{ @@ -694,8 +706,7 @@ func TestClientDefaultOptions(t *testing.T) { t.Fatal(err) } cmpOptions := []cmp.Option{ - cmp.AllowUnexported(TaskInfo{}), - cmpopts.IgnoreFields(base.TaskMessage{}, "ID"), + cmpopts.IgnoreFields(TaskInfo{}, "ID"), cmpopts.EquateApproxTime(500 * time.Millisecond), } if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" { diff --git a/inspector.go b/inspector.go index 39bd586..725c664 100644 --- a/inspector.go +++ b/inspector.go @@ -48,11 +48,14 @@ func (i *Inspector) Queues() ([]string, error) { type QueueInfo struct { // Name of the queue. Queue string + // Total number of bytes that the queue and its tasks require to be stored in redis. MemoryUsage int64 + // Size is the total number of tasks in the queue. // The value is the sum of Pending, Active, Scheduled, Retry, and Archived. Size int + // Number of pending tasks. Pending int // Number of active tasks. @@ -63,15 +66,18 @@ type QueueInfo struct { Retry int // Number of archived tasks. Archived int + // Total number of tasks being processed during the given date. // The number includes both succeeded and failed tasks. Processed int // Total number of tasks failed to be processed during the given date. Failed int + // Paused indicates whether the queue is paused. // If true, tasks in the queue will not be processed. Paused bool - // Time when this stats was taken. + + // Time when this queue info snapshot was taken. Timestamp time.Time } @@ -184,11 +190,7 @@ func (i *Inspector) GetTaskInfo(qname, id string) (*TaskInfo, error) { case err != nil: return nil, fmt.Errorf("asynq: %v", err) } - return &TaskInfo{ - msg: info.Message, - state: info.State, - nextProcessAt: info.NextProcessAt, - }, nil + return newTaskInfo(info.Message, info.State, info.NextProcessAt), nil } // ListOption specifies behavior of list operation. @@ -271,11 +273,7 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*TaskI now := time.Now() var tasks []*TaskInfo for _, m := range msgs { - tasks = append(tasks, &TaskInfo{ - msg: m, - state: base.TaskStatePending, - nextProcessAt: now, - }) + tasks = append(tasks, newTaskInfo(m, base.TaskStatePending, now)) } return tasks, err } @@ -298,10 +296,7 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*TaskIn } var tasks []*TaskInfo for _, m := range msgs { - tasks = append(tasks, &TaskInfo{ - msg: m, - state: base.TaskStateActive, - }) + tasks = append(tasks, newTaskInfo(m, base.TaskStateActive, time.Time{})) } return tasks, err } @@ -325,11 +320,11 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Tas } var tasks []*TaskInfo for _, z := range zs { - tasks = append(tasks, &TaskInfo{ - msg: z.Message, - state: base.TaskStateScheduled, - nextProcessAt: time.Unix(z.Score, 0), - }) + tasks = append(tasks, newTaskInfo( + z.Message, + base.TaskStateScheduled, + time.Unix(z.Score, 0), + )) } return tasks, nil } @@ -353,11 +348,11 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*TaskInf } var tasks []*TaskInfo for _, z := range zs { - tasks = append(tasks, &TaskInfo{ - msg: z.Message, - state: base.TaskStateRetry, - nextProcessAt: time.Unix(z.Score, 0), - }) + tasks = append(tasks, newTaskInfo( + z.Message, + base.TaskStateRetry, + time.Unix(z.Score, 0), + )) } return tasks, nil } @@ -381,10 +376,11 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Task } var tasks []*TaskInfo for _, z := range zs { - tasks = append(tasks, &TaskInfo{ - msg: z.Message, - state: base.TaskStateArchived, - }) + tasks = append(tasks, newTaskInfo( + z.Message, + base.TaskStateArchived, + time.Time{}, + )) } return tasks, nil } diff --git a/inspector_test.go b/inspector_test.go index 4bf62d9..8b4ecb2 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -423,11 +423,7 @@ func TestInspectorHistory(t *testing.T) { } func createPendingTask(msg *base.TaskMessage) *TaskInfo { - return &TaskInfo{ - msg: msg, - state: base.TaskStatePending, - nextProcessAt: time.Now(), - } + return newTaskInfo(msg, base.TaskStatePending, time.Now()) } func TestInspectorGetTaskInfo(t *testing.T) { @@ -488,47 +484,47 @@ func TestInspectorGetTaskInfo(t *testing.T) { { qname: "default", id: m1.ID.String(), - want: &TaskInfo{ - msg: m1, - state: base.TaskStateActive, - nextProcessAt: time.Time{}, // zero value for n/a - }, + want: newTaskInfo( + m1, + base.TaskStateActive, + time.Time{}, // zero value for n/a + ), }, { qname: "default", id: m2.ID.String(), - want: &TaskInfo{ - msg: m2, - state: base.TaskStateScheduled, - nextProcessAt: fiveMinsFromNow, - }, + want: newTaskInfo( + m2, + base.TaskStateScheduled, + fiveMinsFromNow, + ), }, { qname: "custom", id: m3.ID.String(), - want: &TaskInfo{ - msg: m3, - state: base.TaskStateRetry, - nextProcessAt: oneHourFromNow, - }, + want: newTaskInfo( + m3, + base.TaskStateRetry, + oneHourFromNow, + ), }, { qname: "custom", id: m4.ID.String(), - want: &TaskInfo{ - msg: m4, - state: base.TaskStateArchived, - nextProcessAt: time.Time{}, // zero value for n/a - }, + want: newTaskInfo( + m4, + base.TaskStateArchived, + time.Time{}, // zero value for n/a + ), }, { qname: "custom", id: m5.ID.String(), - want: &TaskInfo{ - msg: m5, - state: base.TaskStatePending, - nextProcessAt: now, - }, + want: newTaskInfo( + m5, + base.TaskStatePending, + now, + ), }, } @@ -725,8 +721,8 @@ func TestInspectorListActiveTasks(t *testing.T) { }, qname: "default", want: []*TaskInfo{ - {msg: m1, state: base.TaskStateActive, nextProcessAt: time.Time{}}, - {msg: m2, state: base.TaskStateActive, nextProcessAt: time.Time{}}, + newTaskInfo(m1, base.TaskStateActive, time.Time{}), + newTaskInfo(m2, base.TaskStateActive, time.Time{}), }, }, } @@ -748,11 +744,11 @@ func TestInspectorListActiveTasks(t *testing.T) { } func createScheduledTask(z base.Z) *TaskInfo { - return &TaskInfo{ - msg: z.Message, - state: base.TaskStateScheduled, - nextProcessAt: time.Unix(z.Score, 0), - } + return newTaskInfo( + z.Message, + base.TaskStateScheduled, + time.Unix(z.Score, 0), + ) } func TestInspectorListScheduledTasks(t *testing.T) { @@ -817,11 +813,11 @@ func TestInspectorListScheduledTasks(t *testing.T) { } func createRetryTask(z base.Z) *TaskInfo { - return &TaskInfo{ - msg: z.Message, - state: base.TaskStateRetry, - nextProcessAt: time.Unix(z.Score, 0), - } + return newTaskInfo( + z.Message, + base.TaskStateRetry, + time.Unix(z.Score, 0), + ) } func TestInspectorListRetryTasks(t *testing.T) { @@ -887,11 +883,11 @@ func TestInspectorListRetryTasks(t *testing.T) { } func createArchivedTask(z base.Z) *TaskInfo { - return &TaskInfo{ - msg: z.Message, - state: base.TaskStateArchived, - nextProcessAt: time.Time{}, - } + return newTaskInfo( + z.Message, + base.TaskStateArchived, + time.Time{}, // zero value for n/a + ) } func TestInspectorListArchivedTasks(t *testing.T) { @@ -2047,7 +2043,7 @@ func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) { "custom": {m3}, }, qname: "default", - id: createPendingTask(m2).ID(), + id: createPendingTask(m2).ID, wantPending: map[string][]*base.TaskMessage{ "default": {m1}, "custom": {m3}, @@ -2059,7 +2055,7 @@ func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) { "custom": {m3}, }, qname: "custom", - id: createPendingTask(m3).ID(), + id: createPendingTask(m3).ID, wantPending: map[string][]*base.TaskMessage{ "default": {m1, m2}, "custom": {}, @@ -2112,7 +2108,7 @@ func TestInspectorDeleteTaskDeletesScheduledTask(t *testing.T) { "custom": {z3}, }, qname: "default", - id: createScheduledTask(z2).ID(), + id: createScheduledTask(z2).ID, wantScheduled: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2162,7 +2158,7 @@ func TestInspectorDeleteTaskDeletesRetryTask(t *testing.T) { "custom": {z3}, }, qname: "default", - id: createRetryTask(z2).ID(), + id: createRetryTask(z2).ID, wantRetry: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2212,7 +2208,7 @@ func TestInspectorDeleteTaskDeletesArchivedTask(t *testing.T) { "custom": {z3}, }, qname: "default", - id: createArchivedTask(z2).ID(), + id: createArchivedTask(z2).ID, wantArchived: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2263,7 +2259,7 @@ func TestInspectorDeleteTaskError(t *testing.T) { "custom": {z3}, }, qname: "nonexistent", - id: createArchivedTask(z2).ID(), + id: createArchivedTask(z2).ID, wantErr: ErrQueueNotFound, wantArchived: map[string][]base.Z{ "default": {z1, z2}, @@ -2333,7 +2329,7 @@ func TestInspectorRunTaskRunsScheduledTask(t *testing.T) { "custom": {}, }, qname: "default", - id: createScheduledTask(z2).ID(), + id: createScheduledTask(z2).ID, wantScheduled: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2403,7 +2399,7 @@ func TestInspectorRunTaskRunsRetryTask(t *testing.T) { "custom": {}, }, qname: "custom", - id: createRetryTask(z2).ID(), + id: createRetryTask(z2).ID, wantRetry: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2474,7 +2470,7 @@ func TestInspectorRunTaskRunsArchivedTask(t *testing.T) { "low": {}, }, qname: "critical", - id: createArchivedTask(z2).ID(), + id: createArchivedTask(z2).ID, wantArchived: map[string][]base.Z{ "default": {z1}, "critical": {}, @@ -2548,7 +2544,7 @@ func TestInspectorRunTaskError(t *testing.T) { "low": {}, }, qname: "nonexistent", - id: createArchivedTask(z2).ID(), + id: createArchivedTask(z2).ID, wantErr: ErrQueueNotFound, wantArchived: map[string][]base.Z{ "default": {z1}, @@ -2641,7 +2637,7 @@ func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) { "custom": {}, }, qname: "default", - id: createPendingTask(m1).ID(), + id: createPendingTask(m1).ID, wantPending: map[string][]*base.TaskMessage{ "default": {}, "custom": {m2, m3}, @@ -2663,7 +2659,7 @@ func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) { "custom": {}, }, qname: "custom", - id: createPendingTask(m2).ID(), + id: createPendingTask(m2).ID, wantPending: map[string][]*base.TaskMessage{ "default": {m1}, "custom": {m3}, @@ -2736,7 +2732,7 @@ func TestInspectorArchiveTaskArchivesScheduledTask(t *testing.T) { "custom": {}, }, qname: "custom", - id: createScheduledTask(z2).ID(), + id: createScheduledTask(z2).ID, wantScheduled: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2811,7 +2807,7 @@ func TestInspectorArchiveTaskArchivesRetryTask(t *testing.T) { "custom": {}, }, qname: "custom", - id: createRetryTask(z2).ID(), + id: createRetryTask(z2).ID, wantRetry: map[string][]base.Z{ "default": {z1}, "custom": {z3}, @@ -2886,7 +2882,7 @@ func TestInspectorArchiveTaskError(t *testing.T) { "custom": {}, }, qname: "nonexistent", - id: createRetryTask(z2).ID(), + id: createRetryTask(z2).ID, wantErr: ErrQueueNotFound, wantRetry: map[string][]base.Z{ "default": {z1}, diff --git a/scheduler.go b/scheduler.go index c394b6a..a85899d 100644 --- a/scheduler.go +++ b/scheduler.go @@ -127,7 +127,7 @@ func (j *enqueueJob) Run() { } j.logger.Debugf("scheduler enqueued a task: %+v", info) event := &base.SchedulerEnqueueEvent{ - TaskID: info.ID(), + TaskID: info.ID, EnqueuedAt: time.Now().In(j.location), } err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event)