mirror of
https://github.com/hibiken/asynq.git
synced 2025-09-19 05:17:30 +08:00
Update Client.Enqueue to return TaskInfo
This commit is contained in:
300
client_test.go
300
client_test.go
@@ -32,7 +32,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
|
||||
task *Task
|
||||
processAt time.Time // value for ProcessAt option
|
||||
opts []Option // other options
|
||||
wantRes *Result
|
||||
wantInfo *TaskInfo
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
wantScheduled map[string][]base.Z
|
||||
}{
|
||||
@@ -41,13 +41,16 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
|
||||
task: task,
|
||||
processAt: now,
|
||||
opts: []Option{},
|
||||
wantRes: &Result{
|
||||
EnqueuedAt: now.UTC(),
|
||||
ProcessAt: now,
|
||||
Queue: "default",
|
||||
Retry: defaultMaxRetry,
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
wantInfo: &TaskInfo{
|
||||
msg: &base.TaskMessage{
|
||||
Type: task.Type(),
|
||||
Payload: task.Payload(),
|
||||
Queue: "default",
|
||||
Retry: defaultMaxRetry,
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
},
|
||||
state: base.TaskStatePending,
|
||||
nextProcessAt: now,
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
@@ -70,13 +73,17 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
|
||||
task: task,
|
||||
processAt: oneHourLater,
|
||||
opts: []Option{},
|
||||
wantRes: &Result{
|
||||
EnqueuedAt: now.UTC(),
|
||||
ProcessAt: oneHourLater,
|
||||
Queue: "default",
|
||||
Retry: defaultMaxRetry,
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
wantInfo: &TaskInfo{
|
||||
msg: &base.TaskMessage{
|
||||
Type: task.Type(),
|
||||
Payload: task.Payload(),
|
||||
Retry: defaultMaxRetry,
|
||||
Queue: "default",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
state: base.TaskStateScheduled,
|
||||
nextProcessAt: oneHourLater,
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
@@ -103,18 +110,19 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
|
||||
h.FlushDB(t, r) // clean up db before each test case.
|
||||
|
||||
opts := append(tc.opts, ProcessAt(tc.processAt))
|
||||
gotRes, err := client.Enqueue(tc.task, opts...)
|
||||
gotInfo, err := client.Enqueue(tc.task, opts...)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
cmpOptions := []cmp.Option{
|
||||
cmpopts.IgnoreFields(Result{}, "ID"),
|
||||
cmp.AllowUnexported(TaskInfo{}),
|
||||
cmpopts.IgnoreFields(base.TaskMessage{}, "ID"),
|
||||
cmpopts.EquateApproxTime(500 * time.Millisecond),
|
||||
}
|
||||
if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" {
|
||||
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
|
||||
t.Errorf("%s;\nEnqueue(task, ProcessAt(%v)) returned %v, want %v; (-want,+got)\n%s",
|
||||
tc.desc, tc.processAt, gotRes, tc.wantRes, diff)
|
||||
tc.desc, tc.processAt, gotInfo, tc.wantInfo, diff)
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantPending {
|
||||
@@ -144,7 +152,7 @@ func TestClientEnqueue(t *testing.T) {
|
||||
desc string
|
||||
task *Task
|
||||
opts []Option
|
||||
wantRes *Result
|
||||
wantInfo *TaskInfo
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
}{
|
||||
{
|
||||
@@ -153,12 +161,17 @@ func TestClientEnqueue(t *testing.T) {
|
||||
opts: []Option{
|
||||
MaxRetry(3),
|
||||
},
|
||||
wantRes: &Result{
|
||||
ProcessAt: now,
|
||||
Queue: "default",
|
||||
Retry: 3,
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
wantInfo: &TaskInfo{
|
||||
msg: &base.TaskMessage{
|
||||
Type: task.Type(),
|
||||
Payload: task.Payload(),
|
||||
Retry: 3,
|
||||
Queue: "default",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
state: base.TaskStatePending,
|
||||
nextProcessAt: now,
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
@@ -179,12 +192,17 @@ func TestClientEnqueue(t *testing.T) {
|
||||
opts: []Option{
|
||||
MaxRetry(-2),
|
||||
},
|
||||
wantRes: &Result{
|
||||
ProcessAt: now,
|
||||
Queue: "default",
|
||||
Retry: 0,
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
wantInfo: &TaskInfo{
|
||||
msg: &base.TaskMessage{
|
||||
Type: task.Type(),
|
||||
Payload: task.Payload(),
|
||||
Retry: 0, // Retry count should be set to zero
|
||||
Queue: "default",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
state: base.TaskStatePending,
|
||||
nextProcessAt: now,
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
@@ -206,12 +224,17 @@ func TestClientEnqueue(t *testing.T) {
|
||||
MaxRetry(2),
|
||||
MaxRetry(10),
|
||||
},
|
||||
wantRes: &Result{
|
||||
ProcessAt: now,
|
||||
Queue: "default",
|
||||
Retry: 10,
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
wantInfo: &TaskInfo{
|
||||
msg: &base.TaskMessage{
|
||||
Type: task.Type(),
|
||||
Payload: task.Payload(),
|
||||
Retry: 10, // Last option takes precedence
|
||||
Queue: "default",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
state: base.TaskStatePending,
|
||||
nextProcessAt: now,
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
@@ -232,12 +255,17 @@ func TestClientEnqueue(t *testing.T) {
|
||||
opts: []Option{
|
||||
Queue("custom"),
|
||||
},
|
||||
wantRes: &Result{
|
||||
ProcessAt: now,
|
||||
Queue: "custom",
|
||||
Retry: defaultMaxRetry,
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
wantInfo: &TaskInfo{
|
||||
msg: &base.TaskMessage{
|
||||
Type: task.Type(),
|
||||
Payload: task.Payload(),
|
||||
Retry: defaultMaxRetry,
|
||||
Queue: "custom",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
state: base.TaskStatePending,
|
||||
nextProcessAt: now,
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"custom": {
|
||||
@@ -258,12 +286,17 @@ func TestClientEnqueue(t *testing.T) {
|
||||
opts: []Option{
|
||||
Queue("HIGH"),
|
||||
},
|
||||
wantRes: &Result{
|
||||
ProcessAt: now,
|
||||
Queue: "high",
|
||||
Retry: defaultMaxRetry,
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
wantInfo: &TaskInfo{
|
||||
msg: &base.TaskMessage{
|
||||
Type: task.Type(),
|
||||
Payload: task.Payload(),
|
||||
Retry: defaultMaxRetry,
|
||||
Queue: "high",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
state: base.TaskStatePending,
|
||||
nextProcessAt: now,
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"high": {
|
||||
@@ -284,12 +317,17 @@ func TestClientEnqueue(t *testing.T) {
|
||||
opts: []Option{
|
||||
Timeout(20 * time.Second),
|
||||
},
|
||||
wantRes: &Result{
|
||||
ProcessAt: now,
|
||||
Queue: "default",
|
||||
Retry: defaultMaxRetry,
|
||||
Timeout: 20 * time.Second,
|
||||
Deadline: noDeadline,
|
||||
wantInfo: &TaskInfo{
|
||||
msg: &base.TaskMessage{
|
||||
Type: task.Type(),
|
||||
Payload: task.Payload(),
|
||||
Retry: defaultMaxRetry,
|
||||
Queue: "default",
|
||||
Timeout: 20,
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
state: base.TaskStatePending,
|
||||
nextProcessAt: now,
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
@@ -310,12 +348,17 @@ func TestClientEnqueue(t *testing.T) {
|
||||
opts: []Option{
|
||||
Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)),
|
||||
},
|
||||
wantRes: &Result{
|
||||
ProcessAt: now,
|
||||
Queue: "default",
|
||||
Retry: defaultMaxRetry,
|
||||
Timeout: noTimeout,
|
||||
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC),
|
||||
wantInfo: &TaskInfo{
|
||||
msg: &base.TaskMessage{
|
||||
Type: task.Type(),
|
||||
Payload: task.Payload(),
|
||||
Retry: defaultMaxRetry,
|
||||
Queue: "default",
|
||||
Timeout: int64(noTimeout.Seconds()),
|
||||
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix(),
|
||||
},
|
||||
state: base.TaskStatePending,
|
||||
nextProcessAt: now,
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
@@ -337,12 +380,17 @@ func TestClientEnqueue(t *testing.T) {
|
||||
Timeout(20 * time.Second),
|
||||
Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)),
|
||||
},
|
||||
wantRes: &Result{
|
||||
ProcessAt: now,
|
||||
Queue: "default",
|
||||
Retry: defaultMaxRetry,
|
||||
Timeout: 20 * time.Second,
|
||||
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC),
|
||||
wantInfo: &TaskInfo{
|
||||
msg: &base.TaskMessage{
|
||||
Type: task.Type(),
|
||||
Payload: task.Payload(),
|
||||
Retry: defaultMaxRetry,
|
||||
Queue: "default",
|
||||
Timeout: 20,
|
||||
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix(),
|
||||
},
|
||||
state: base.TaskStatePending,
|
||||
nextProcessAt: now,
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
@@ -362,18 +410,19 @@ func TestClientEnqueue(t *testing.T) {
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r) // clean up db before each test case.
|
||||
|
||||
gotRes, err := client.Enqueue(tc.task, tc.opts...)
|
||||
gotInfo, err := client.Enqueue(tc.task, tc.opts...)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
cmpOptions := []cmp.Option{
|
||||
cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"),
|
||||
cmp.AllowUnexported(TaskInfo{}),
|
||||
cmpopts.IgnoreFields(base.TaskMessage{}, "ID"),
|
||||
cmpopts.EquateApproxTime(500 * time.Millisecond),
|
||||
}
|
||||
if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" {
|
||||
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
|
||||
t.Errorf("%s;\nEnqueue(task) returned %v, want %v; (-want,+got)\n%s",
|
||||
tc.desc, gotRes, tc.wantRes, diff)
|
||||
tc.desc, gotInfo, tc.wantInfo, diff)
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantPending {
|
||||
@@ -398,7 +447,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
|
||||
task *Task
|
||||
delay time.Duration // value for ProcessIn option
|
||||
opts []Option // other options
|
||||
wantRes *Result
|
||||
wantInfo *TaskInfo
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
wantScheduled map[string][]base.Z
|
||||
}{
|
||||
@@ -407,12 +456,17 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
|
||||
task: task,
|
||||
delay: 1 * time.Hour,
|
||||
opts: []Option{},
|
||||
wantRes: &Result{
|
||||
ProcessAt: now.Add(1 * time.Hour),
|
||||
Queue: "default",
|
||||
Retry: defaultMaxRetry,
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
wantInfo: &TaskInfo{
|
||||
msg: &base.TaskMessage{
|
||||
Type: task.Type(),
|
||||
Payload: task.Payload(),
|
||||
Retry: defaultMaxRetry,
|
||||
Queue: "default",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
state: base.TaskStateScheduled,
|
||||
nextProcessAt: time.Now().Add(1 * time.Hour),
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
@@ -438,12 +492,17 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
|
||||
task: task,
|
||||
delay: 0,
|
||||
opts: []Option{},
|
||||
wantRes: &Result{
|
||||
ProcessAt: now,
|
||||
Queue: "default",
|
||||
Retry: defaultMaxRetry,
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
wantInfo: &TaskInfo{
|
||||
msg: &base.TaskMessage{
|
||||
Type: task.Type(),
|
||||
Payload: task.Payload(),
|
||||
Retry: defaultMaxRetry,
|
||||
Queue: "default",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
state: base.TaskStatePending,
|
||||
nextProcessAt: now,
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {
|
||||
@@ -467,18 +526,19 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
|
||||
h.FlushDB(t, r) // clean up db before each test case.
|
||||
|
||||
opts := append(tc.opts, ProcessIn(tc.delay))
|
||||
gotRes, err := client.Enqueue(tc.task, opts...)
|
||||
gotInfo, err := client.Enqueue(tc.task, opts...)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
continue
|
||||
}
|
||||
cmpOptions := []cmp.Option{
|
||||
cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"),
|
||||
cmp.AllowUnexported(TaskInfo{}),
|
||||
cmpopts.IgnoreFields(base.TaskMessage{}, "ID"),
|
||||
cmpopts.EquateApproxTime(500 * time.Millisecond),
|
||||
}
|
||||
if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" {
|
||||
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
|
||||
t.Errorf("%s;\nEnqueue(task, ProcessIn(%v)) returned %v, want %v; (-want,+got)\n%s",
|
||||
tc.desc, tc.delay, gotRes, tc.wantRes, diff)
|
||||
tc.desc, tc.delay, gotInfo, tc.wantInfo, diff)
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantPending {
|
||||
@@ -537,7 +597,7 @@ func TestClientDefaultOptions(t *testing.T) {
|
||||
defaultOpts []Option // options set at the client level.
|
||||
opts []Option // options used at enqueue time.
|
||||
task *Task
|
||||
wantRes *Result
|
||||
wantInfo *TaskInfo
|
||||
queue string // queue that the message should go into.
|
||||
want *base.TaskMessage
|
||||
}{
|
||||
@@ -546,12 +606,17 @@ func TestClientDefaultOptions(t *testing.T) {
|
||||
defaultOpts: []Option{Queue("feed")},
|
||||
opts: []Option{},
|
||||
task: NewTask("feed:import", nil),
|
||||
wantRes: &Result{
|
||||
ProcessAt: now,
|
||||
Queue: "feed",
|
||||
Retry: defaultMaxRetry,
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
wantInfo: &TaskInfo{
|
||||
msg: &base.TaskMessage{
|
||||
Type: "feed:import",
|
||||
Payload: nil,
|
||||
Retry: defaultMaxRetry,
|
||||
Queue: "feed",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
state: base.TaskStatePending,
|
||||
nextProcessAt: now,
|
||||
},
|
||||
queue: "feed",
|
||||
want: &base.TaskMessage{
|
||||
@@ -568,12 +633,17 @@ func TestClientDefaultOptions(t *testing.T) {
|
||||
defaultOpts: []Option{Queue("feed"), MaxRetry(5)},
|
||||
opts: []Option{},
|
||||
task: NewTask("feed:import", nil),
|
||||
wantRes: &Result{
|
||||
ProcessAt: now,
|
||||
Queue: "feed",
|
||||
Retry: 5,
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
wantInfo: &TaskInfo{
|
||||
msg: &base.TaskMessage{
|
||||
Type: "feed:import",
|
||||
Payload: nil,
|
||||
Retry: 5,
|
||||
Queue: "feed",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
state: base.TaskStatePending,
|
||||
nextProcessAt: now,
|
||||
},
|
||||
queue: "feed",
|
||||
want: &base.TaskMessage{
|
||||
@@ -590,12 +660,17 @@ func TestClientDefaultOptions(t *testing.T) {
|
||||
defaultOpts: []Option{Queue("feed"), MaxRetry(5)},
|
||||
opts: []Option{Queue("critical")},
|
||||
task: NewTask("feed:import", nil),
|
||||
wantRes: &Result{
|
||||
ProcessAt: now,
|
||||
Queue: "critical",
|
||||
Retry: 5,
|
||||
Timeout: defaultTimeout,
|
||||
Deadline: noDeadline,
|
||||
wantInfo: &TaskInfo{
|
||||
msg: &base.TaskMessage{
|
||||
Type: "feed:import",
|
||||
Payload: nil,
|
||||
Retry: 5,
|
||||
Queue: "critical",
|
||||
Timeout: int64(defaultTimeout.Seconds()),
|
||||
Deadline: noDeadline.Unix(),
|
||||
},
|
||||
state: base.TaskStatePending,
|
||||
nextProcessAt: now,
|
||||
},
|
||||
queue: "critical",
|
||||
want: &base.TaskMessage{
|
||||
@@ -614,17 +689,18 @@ func TestClientDefaultOptions(t *testing.T) {
|
||||
c := NewClient(getRedisConnOpt(t))
|
||||
defer c.Close()
|
||||
c.SetDefaultOptions(tc.task.Type(), tc.defaultOpts...)
|
||||
gotRes, err := c.Enqueue(tc.task, tc.opts...)
|
||||
gotInfo, err := c.Enqueue(tc.task, tc.opts...)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cmpOptions := []cmp.Option{
|
||||
cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"),
|
||||
cmp.AllowUnexported(TaskInfo{}),
|
||||
cmpopts.IgnoreFields(base.TaskMessage{}, "ID"),
|
||||
cmpopts.EquateApproxTime(500 * time.Millisecond),
|
||||
}
|
||||
if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" {
|
||||
if diff := cmp.Diff(tc.wantInfo, gotInfo, cmpOptions...); diff != "" {
|
||||
t.Errorf("%s;\nEnqueue(task, opts...) returned %v, want %v; (-want,+got)\n%s",
|
||||
tc.desc, gotRes, tc.wantRes, diff)
|
||||
tc.desc, gotInfo, tc.wantInfo, diff)
|
||||
}
|
||||
pending := h.GetPendingMessages(t, r, tc.queue)
|
||||
if len(pending) != 1 {
|
||||
|
Reference in New Issue
Block a user