mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Fix more build errors
This commit is contained in:
parent
2a18181501
commit
3f26122ac0
@ -36,7 +36,7 @@ func TestClientEnqueueAt(t *testing.T) {
|
|||||||
opts []Option
|
opts []Option
|
||||||
wantRes *Result
|
wantRes *Result
|
||||||
wantEnqueued map[string][]*base.TaskMessage
|
wantEnqueued map[string][]*base.TaskMessage
|
||||||
wantScheduled []base.Z
|
wantScheduled map[string][]base.Z
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
desc: "Process task immediately",
|
desc: "Process task immediately",
|
||||||
@ -61,7 +61,9 @@ func TestClientEnqueueAt(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
|
wantScheduled: map[string][]base.Z{
|
||||||
|
"default": {},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
desc: "Schedule task to be processed in the future",
|
desc: "Schedule task to be processed in the future",
|
||||||
@ -74,18 +76,22 @@ func TestClientEnqueueAt(t *testing.T) {
|
|||||||
Timeout: defaultTimeout,
|
Timeout: defaultTimeout,
|
||||||
Deadline: noDeadline,
|
Deadline: noDeadline,
|
||||||
},
|
},
|
||||||
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
wantScheduled: []base.Z{
|
"default": {},
|
||||||
{
|
},
|
||||||
Message: &base.TaskMessage{
|
wantScheduled: map[string][]base.Z{
|
||||||
Type: task.Type,
|
"default": {
|
||||||
Payload: task.Payload.data,
|
{
|
||||||
Retry: defaultMaxRetry,
|
Message: &base.TaskMessage{
|
||||||
Queue: "default",
|
Type: task.Type,
|
||||||
Timeout: int64(defaultTimeout.Seconds()),
|
Payload: task.Payload.data,
|
||||||
Deadline: noDeadline.Unix(),
|
Retry: defaultMaxRetry,
|
||||||
|
Queue: "default",
|
||||||
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
|
Deadline: noDeadline.Unix(),
|
||||||
|
},
|
||||||
|
Score: oneHourLater.Unix(),
|
||||||
},
|
},
|
||||||
Score: oneHourLater.Unix(),
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -110,10 +116,11 @@ func TestClientEnqueueAt(t *testing.T) {
|
|||||||
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
|
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for qname, want := range tc.wantScheduled {
|
||||||
gotScheduled := h.GetScheduledEntries(t, r)
|
gotScheduled := h.GetScheduledEntries(t, r, qname)
|
||||||
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.IgnoreIDOpt); diff != "" {
|
if diff := cmp.Diff(want, gotScheduled, h.IgnoreIDOpt); diff != "" {
|
||||||
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledQueue, diff)
|
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledKey(qname), diff)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -376,7 +383,7 @@ func TestClientEnqueueIn(t *testing.T) {
|
|||||||
opts []Option
|
opts []Option
|
||||||
wantRes *Result
|
wantRes *Result
|
||||||
wantEnqueued map[string][]*base.TaskMessage
|
wantEnqueued map[string][]*base.TaskMessage
|
||||||
wantScheduled []base.Z
|
wantScheduled map[string][]base.Z
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
desc: "schedule a task to be enqueued in one hour",
|
desc: "schedule a task to be enqueued in one hour",
|
||||||
@ -389,18 +396,22 @@ func TestClientEnqueueIn(t *testing.T) {
|
|||||||
Timeout: defaultTimeout,
|
Timeout: defaultTimeout,
|
||||||
Deadline: noDeadline,
|
Deadline: noDeadline,
|
||||||
},
|
},
|
||||||
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
wantScheduled: []base.Z{
|
"default": {},
|
||||||
{
|
},
|
||||||
Message: &base.TaskMessage{
|
wantScheduled: map[string][]base.Z{
|
||||||
Type: task.Type,
|
"default": {
|
||||||
Payload: task.Payload.data,
|
{
|
||||||
Retry: defaultMaxRetry,
|
Message: &base.TaskMessage{
|
||||||
Queue: "default",
|
Type: task.Type,
|
||||||
Timeout: int64(defaultTimeout.Seconds()),
|
Payload: task.Payload.data,
|
||||||
Deadline: noDeadline.Unix(),
|
Retry: defaultMaxRetry,
|
||||||
|
Queue: "default",
|
||||||
|
Timeout: int64(defaultTimeout.Seconds()),
|
||||||
|
Deadline: noDeadline.Unix(),
|
||||||
|
},
|
||||||
|
Score: time.Now().Add(time.Hour).Unix(),
|
||||||
},
|
},
|
||||||
Score: time.Now().Add(time.Hour).Unix(),
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -427,7 +438,9 @@ func TestClientEnqueueIn(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil
|
wantScheduled: map[string][]base.Z{
|
||||||
|
"default": {},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -450,10 +463,11 @@ func TestClientEnqueueIn(t *testing.T) {
|
|||||||
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
|
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.QueueKey(qname), diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for qname, want := range tc.wantScheduled {
|
||||||
gotScheduled := h.GetScheduledEntries(t, r)
|
gotScheduled := h.GetScheduledEntries(t, r, qname)
|
||||||
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, h.IgnoreIDOpt); diff != "" {
|
if diff := cmp.Diff(want, gotScheduled, h.IgnoreIDOpt); diff != "" {
|
||||||
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledQueue, diff)
|
t.Errorf("%s;\nmismatch found in %q; (-want,+got)\n%s", tc.desc, base.ScheduledKey(qname), diff)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -587,7 +601,7 @@ func TestEnqueueUnique(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
gotTTL := r.TTL(uniqueKey(tc.task, tc.ttl, base.DefaultQueueName)).Val()
|
gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type, tc.task.Payload.data)).Val()
|
||||||
if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
|
if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
|
||||||
t.Errorf("TTL = %v, want %v", gotTTL, tc.ttl)
|
t.Errorf("TTL = %v, want %v", gotTTL, tc.ttl)
|
||||||
continue
|
continue
|
||||||
@ -634,7 +648,7 @@ func TestEnqueueInUnique(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
gotTTL := r.TTL(uniqueKey(tc.task, tc.ttl, base.DefaultQueueName)).Val()
|
gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type, tc.task.Payload.data)).Val()
|
||||||
wantTTL := time.Duration(tc.ttl.Seconds()+tc.d.Seconds()) * time.Second
|
wantTTL := time.Duration(tc.ttl.Seconds()+tc.d.Seconds()) * time.Second
|
||||||
if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
|
if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
|
||||||
t.Errorf("TTL = %v, want %v", gotTTL, wantTTL)
|
t.Errorf("TTL = %v, want %v", gotTTL, wantTTL)
|
||||||
@ -682,7 +696,7 @@ func TestEnqueueAtUnique(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
gotTTL := r.TTL(uniqueKey(tc.task, tc.ttl, base.DefaultQueueName)).Val()
|
gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type, tc.task.Payload.data)).Val()
|
||||||
wantTTL := tc.at.Add(tc.ttl).Sub(time.Now())
|
wantTTL := tc.at.Add(tc.ttl).Sub(time.Now())
|
||||||
if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
|
if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
|
||||||
t.Errorf("TTL = %v, want %v", gotTTL, wantTTL)
|
t.Errorf("TTL = %v, want %v", gotTTL, wantTTL)
|
||||||
|
@ -63,12 +63,12 @@ func TestInspectorCurrentStats(t *testing.T) {
|
|||||||
"critical": {},
|
"critical": {},
|
||||||
"low": {},
|
"low": {},
|
||||||
},
|
},
|
||||||
retry: []base.Z{
|
retry: map[string][]base.Z{
|
||||||
"default": {},
|
"default": {},
|
||||||
"critical": {},
|
"critical": {},
|
||||||
"low": {},
|
"low": {},
|
||||||
},
|
},
|
||||||
dead: []base.Z{
|
dead: map[string][]base.Z{
|
||||||
"default": {},
|
"default": {},
|
||||||
"critical": {},
|
"critical": {},
|
||||||
"low": {},
|
"low": {},
|
||||||
@ -108,11 +108,11 @@ func TestInspectorCurrentStats(t *testing.T) {
|
|||||||
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
|
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
|
||||||
for qname, n := range tc.processed {
|
for qname, n := range tc.processed {
|
||||||
processedKey := base.ProcessedKey(qname, now)
|
processedKey := base.ProcessedKey(qname, now)
|
||||||
r.client.Set(processedKey, n, 0)
|
r.Set(processedKey, n, 0)
|
||||||
}
|
}
|
||||||
for qname, n := range tc.failed {
|
for qname, n := range tc.failed {
|
||||||
failedKey := base.FailedKey(qname, now)
|
failedKey := base.FailedKey(qname, now)
|
||||||
r.client.Set(failedKey, n, 0)
|
r.Set(failedKey, n, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
got, err := inspector.CurrentStats(tc.qname)
|
got, err := inspector.CurrentStats(tc.qname)
|
||||||
@ -337,7 +337,7 @@ func TestInspectorListScheduledTasks(t *testing.T) {
|
|||||||
m1 := asynqtest.NewTaskMessage("task1", nil)
|
m1 := asynqtest.NewTaskMessage("task1", nil)
|
||||||
m2 := asynqtest.NewTaskMessage("task2", nil)
|
m2 := asynqtest.NewTaskMessage("task2", nil)
|
||||||
m3 := asynqtest.NewTaskMessage("task3", nil)
|
m3 := asynqtest.NewTaskMessage("task3", nil)
|
||||||
m3 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom")
|
m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom")
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
|
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
|
||||||
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
|
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
|
||||||
@ -383,7 +383,7 @@ func TestInspectorListScheduledTasks(t *testing.T) {
|
|||||||
asynqtest.FlushDB(t, r)
|
asynqtest.FlushDB(t, r)
|
||||||
asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled)
|
asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled)
|
||||||
|
|
||||||
got, err := inspector.ListScheduledTasks()
|
got, err := inspector.ListScheduledTasks(tc.qname)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err)
|
t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err)
|
||||||
continue
|
continue
|
||||||
@ -461,7 +461,7 @@ func TestInspectorListRetryTasks(t *testing.T) {
|
|||||||
asynqtest.FlushDB(t, r)
|
asynqtest.FlushDB(t, r)
|
||||||
asynqtest.SeedAllRetryQueues(t, r, tc.retry)
|
asynqtest.SeedAllRetryQueues(t, r, tc.retry)
|
||||||
|
|
||||||
got, err := inspector.ListRetryTasks()
|
got, err := inspector.ListRetryTasks(tc.qname)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err)
|
t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err)
|
||||||
continue
|
continue
|
||||||
@ -536,7 +536,7 @@ func TestInspectorListDeadTasks(t *testing.T) {
|
|||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
asynqtest.FlushDB(t, r)
|
asynqtest.FlushDB(t, r)
|
||||||
asynqtest.SeedAllDeadQueues(t, r, tc.retry)
|
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
|
||||||
|
|
||||||
got, err := inspector.ListDeadTasks(tc.qname)
|
got, err := inspector.ListDeadTasks(tc.qname)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -559,7 +559,7 @@ func TestInspectorListPagination(t *testing.T) {
|
|||||||
asynqtest.NewTaskMessage(fmt.Sprintf("task%d", i), nil))
|
asynqtest.NewTaskMessage(fmt.Sprintf("task%d", i), nil))
|
||||||
}
|
}
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
asynqtest.SeedEnqueuedQueue(t, r, msgs)
|
asynqtest.SeedEnqueuedQueue(t, r, msgs, base.DefaultQueueName)
|
||||||
|
|
||||||
inspector := NewInspector(RedisClientOpt{
|
inspector := NewInspector(RedisClientOpt{
|
||||||
Addr: redisAddr,
|
Addr: redisAddr,
|
||||||
@ -841,26 +841,56 @@ func TestInspectorKillAllScheduledTasks(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
scheduled: []base.Z{z1, z2},
|
scheduled: map[string][]base.Z{
|
||||||
dead: []base.Z{z3},
|
"default": {z1, z2},
|
||||||
want: 2,
|
},
|
||||||
wantDead: []base.Z{
|
dead: map[string][]base.Z{
|
||||||
z3,
|
"default": {z3},
|
||||||
base.Z{Message: m1, Score: now.Unix()},
|
},
|
||||||
base.Z{Message: m2, Score: now.Unix()},
|
qname: "default",
|
||||||
|
want: 2,
|
||||||
|
wantScheduled: map[string][]base.Z{
|
||||||
|
"default": {},
|
||||||
|
},
|
||||||
|
wantDead: map[string][]base.Z{
|
||||||
|
"default": {
|
||||||
|
z3,
|
||||||
|
base.Z{Message: m1, Score: now.Unix()},
|
||||||
|
base.Z{Message: m2, Score: now.Unix()},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
scheduled: []base.Z(nil),
|
scheduled: map[string][]base.Z{
|
||||||
dead: []base.Z(nil),
|
"default": {},
|
||||||
want: 0,
|
},
|
||||||
wantDead: []base.Z(nil),
|
dead: map[string][]base.Z{
|
||||||
|
"default": {},
|
||||||
|
},
|
||||||
|
qname: "default",
|
||||||
|
want: 0,
|
||||||
|
wantScheduled: map[string][]base.Z{
|
||||||
|
"default": {},
|
||||||
|
},
|
||||||
|
wantDead: map[string][]base.Z{
|
||||||
|
"default": {},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
scheduled: []base.Z(nil),
|
scheduled: map[string][]base.Z{
|
||||||
dead: []base.Z{z1, z2},
|
"default": {},
|
||||||
want: 0,
|
},
|
||||||
wantDead: []base.Z{z1, z2},
|
dead: map[string][]base.Z{
|
||||||
|
"default": {z1, z2},
|
||||||
|
},
|
||||||
|
qname: "default",
|
||||||
|
want: 0,
|
||||||
|
wantScheduled: map[string][]base.Z{
|
||||||
|
"default": {},
|
||||||
|
},
|
||||||
|
wantDead: map[string][]base.Z{
|
||||||
|
"default": {z1, z2},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -976,7 +1006,7 @@ func TestInspectorKillAllRetryTasks(t *testing.T) {
|
|||||||
wantRetry: map[string][]base.Z{
|
wantRetry: map[string][]base.Z{
|
||||||
"default": {},
|
"default": {},
|
||||||
},
|
},
|
||||||
wantdead: map[string][]base.Z{
|
wantDead: map[string][]base.Z{
|
||||||
"default": {z1, z2},
|
"default": {z1, z2},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1506,7 +1536,7 @@ func TestInspectorDeleteTaskByKeyDeletesDeadTask(t *testing.T) {
|
|||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
asynqtest.FlushDB(t, r)
|
asynqtest.FlushDB(t, r)
|
||||||
asynqtest.SeedDAlleadQueues(t, r, tc.dead)
|
asynqtest.SeedAllDeadQueues(t, r, tc.dead)
|
||||||
|
|
||||||
if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil {
|
if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil {
|
||||||
t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
|
t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err)
|
||||||
@ -1758,6 +1788,7 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) {
|
|||||||
scheduled map[string][]base.Z
|
scheduled map[string][]base.Z
|
||||||
dead map[string][]base.Z
|
dead map[string][]base.Z
|
||||||
qname string
|
qname string
|
||||||
|
key string
|
||||||
want string
|
want string
|
||||||
wantScheduled map[string][]base.Z
|
wantScheduled map[string][]base.Z
|
||||||
wantDead map[string][]base.Z
|
wantDead map[string][]base.Z
|
||||||
@ -1773,7 +1804,7 @@ func TestInspectorKillTaskByKeyKillsScheduledTask(t *testing.T) {
|
|||||||
},
|
},
|
||||||
qname: "custom",
|
qname: "custom",
|
||||||
key: createScheduledTask(z2).Key(),
|
key: createScheduledTask(z2).Key(),
|
||||||
scheduled: map[string][]base.Z{
|
wantScheduled: map[string][]base.Z{
|
||||||
"default": {z1},
|
"default": {z1},
|
||||||
"custom": {z3},
|
"custom": {z3},
|
||||||
},
|
},
|
||||||
@ -1849,7 +1880,7 @@ func TestInspectorKillTaskByKeyKillsRetryTask(t *testing.T) {
|
|||||||
"default": {z1},
|
"default": {z1},
|
||||||
"custom": {z3},
|
"custom": {z3},
|
||||||
},
|
},
|
||||||
wantDead: []base.Z{
|
wantDead: map[string][]base.Z{
|
||||||
"default": {},
|
"default": {},
|
||||||
"custom": {{m2, now.Unix()}},
|
"custom": {{m2, now.Unix()}},
|
||||||
},
|
},
|
||||||
|
@ -241,7 +241,7 @@ func (p *processor) requeue(msg *base.TaskMessage) {
|
|||||||
func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
|
func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
|
||||||
err := p.broker.Done(msg)
|
err := p.broker.Done(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.InProgressQueue, err)
|
errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q err: %+v", msg.ID, msg.Type, base.InProgressKey(msg.Queue), err)
|
||||||
deadline, ok := ctx.Deadline()
|
deadline, ok := ctx.Deadline()
|
||||||
if !ok {
|
if !ok {
|
||||||
panic("asynq: internal error: missing deadline in context")
|
panic("asynq: internal error: missing deadline in context")
|
||||||
@ -274,7 +274,7 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) {
|
|||||||
retryAt := time.Now().Add(d)
|
retryAt := time.Now().Add(d)
|
||||||
err := p.broker.Retry(msg, retryAt, e.Error())
|
err := p.broker.Retry(msg, retryAt, e.Error())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue)
|
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressKey(msg.Queue), base.RetryKey(msg.Queue))
|
||||||
deadline, ok := ctx.Deadline()
|
deadline, ok := ctx.Deadline()
|
||||||
if !ok {
|
if !ok {
|
||||||
panic("asynq: internal error: missing deadline in context")
|
panic("asynq: internal error: missing deadline in context")
|
||||||
@ -293,7 +293,7 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) {
|
|||||||
func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) {
|
func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) {
|
||||||
err := p.broker.Kill(msg, e.Error())
|
err := p.broker.Kill(msg, e.Error())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)
|
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressKey(msg.Queue), base.DeadKey(msg.Queue))
|
||||||
deadline, ok := ctx.Deadline()
|
deadline, ok := ctx.Deadline()
|
||||||
if !ok {
|
if !ok {
|
||||||
panic("asynq: internal error: missing deadline in context")
|
panic("asynq: internal error: missing deadline in context")
|
||||||
|
@ -74,8 +74,8 @@ func TestProcessorSuccess(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r) // clean up db before each test case.
|
h.FlushDB(t, r) // clean up db before each test case.
|
||||||
h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue.
|
h.SeedEnqueuedQueue(t, r, tc.enqueued, base.DefaultQueueName) // initialize default queue.
|
||||||
|
|
||||||
// instantiate a new processor
|
// instantiate a new processor
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
@ -118,8 +118,8 @@ func TestProcessorSuccess(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
time.Sleep(2 * time.Second) // wait for two second to allow all enqueued tasks to be processed.
|
time.Sleep(2 * time.Second) // wait for two second to allow all enqueued tasks to be processed.
|
||||||
if l := r.LLen(base.InProgressQueue).Val(); l != 0 {
|
if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 {
|
||||||
t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l)
|
t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l)
|
||||||
}
|
}
|
||||||
p.terminate()
|
p.terminate()
|
||||||
|
|
||||||
@ -150,8 +150,8 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r) // clean up db before each test case.
|
h.FlushDB(t, r) // clean up db before each test case.
|
||||||
h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue.
|
h.SeedEnqueuedQueue(t, r, tc.enqueued, base.DefaultQueueName) // initialize default queue.
|
||||||
|
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
var processed []*Task
|
var processed []*Task
|
||||||
@ -191,8 +191,8 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
|
|||||||
|
|
||||||
p.start(&sync.WaitGroup{})
|
p.start(&sync.WaitGroup{})
|
||||||
time.Sleep(2 * time.Second) // wait for two second to allow all enqueued tasks to be processed.
|
time.Sleep(2 * time.Second) // wait for two second to allow all enqueued tasks to be processed.
|
||||||
if l := r.LLen(base.InProgressQueue).Val(); l != 0 {
|
if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 {
|
||||||
t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l)
|
t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l)
|
||||||
}
|
}
|
||||||
p.terminate()
|
p.terminate()
|
||||||
|
|
||||||
@ -246,8 +246,8 @@ func TestProcessorRetry(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r) // clean up db before each test case.
|
h.FlushDB(t, r) // clean up db before each test case.
|
||||||
h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue.
|
h.SeedEnqueuedQueue(t, r, tc.enqueued, base.DefaultQueueName) // initialize default queue.
|
||||||
|
|
||||||
// instantiate a new processor
|
// instantiate a new processor
|
||||||
delayFunc := func(n int, e error, t *Task) time.Duration {
|
delayFunc := func(n int, e error, t *Task) time.Duration {
|
||||||
@ -295,18 +295,18 @@ func TestProcessorRetry(t *testing.T) {
|
|||||||
p.terminate()
|
p.terminate()
|
||||||
|
|
||||||
cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to a second difference in zset score
|
cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to a second difference in zset score
|
||||||
gotRetry := h.GetRetryEntries(t, r)
|
gotRetry := h.GetRetryEntries(t, r, base.DefaultQueueName)
|
||||||
if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" {
|
if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" {
|
||||||
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryQueue, diff)
|
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryKey(base.DefaultQueueName), diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
gotDead := h.GetDeadMessages(t, r)
|
gotDead := h.GetDeadMessages(t, r, base.DefaultQueueName)
|
||||||
if diff := cmp.Diff(tc.wantDead, gotDead, h.SortMsgOpt); diff != "" {
|
if diff := cmp.Diff(tc.wantDead, gotDead, h.SortMsgOpt); diff != "" {
|
||||||
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.DeadQueue, diff)
|
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.DeadKey(base.DefaultQueueName), diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
if l := r.LLen(base.InProgressQueue).Val(); l != 0 {
|
if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 {
|
||||||
t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l)
|
t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l)
|
||||||
}
|
}
|
||||||
|
|
||||||
if n != tc.wantErrCount {
|
if n != tc.wantErrCount {
|
||||||
@ -455,8 +455,8 @@ func TestProcessorWithStrictPriority(t *testing.T) {
|
|||||||
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
|
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
if l := r.LLen(base.InProgressQueue).Val(); l != 0 {
|
if l := r.LLen(base.InProgressKey(base.DefaultQueueName)).Val(); l != 0 {
|
||||||
t.Errorf("%q has %d tasks, want 0", base.InProgressQueue, l)
|
t.Errorf("%q has %d tasks, want 0", base.InProgressKey(base.DefaultQueueName), l)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -139,7 +139,10 @@ func TestRecoverer(t *testing.T) {
|
|||||||
"default": {h.TaskMessageAfterRetry(*t1, "deadline exceeded")},
|
"default": {h.TaskMessageAfterRetry(*t1, "deadline exceeded")},
|
||||||
"critical": {},
|
"critical": {},
|
||||||
},
|
},
|
||||||
wantDead: []*base.TaskMessage{},
|
wantDead: map[string][]*base.TaskMessage{
|
||||||
|
"default": {},
|
||||||
|
"critical": {},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
desc: "with multiple expired tasks in-progress",
|
desc: "with multiple expired tasks in-progress",
|
||||||
|
@ -129,7 +129,7 @@ func TestScheduler(t *testing.T) {
|
|||||||
for qname, want := range tc.wantEnqueued {
|
for qname, want := range tc.wantEnqueued {
|
||||||
gotEnqueued := h.GetEnqueuedMessages(t, r, qname)
|
gotEnqueued := h.GetEnqueuedMessages(t, r, qname)
|
||||||
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
|
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
|
||||||
t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.DefaultKey(qname), diff)
|
t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.QueueKey(qname), diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,7 +23,7 @@ func TestSyncer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
rdbClient := rdb.NewRDB(r)
|
rdbClient := rdb.NewRDB(r)
|
||||||
h.SeedInProgressQueue(t, r, inProgress)
|
h.SeedInProgressQueue(t, r, inProgress, base.DefaultQueueName)
|
||||||
|
|
||||||
const interval = time.Second
|
const interval = time.Second
|
||||||
syncRequestCh := make(chan *syncRequest)
|
syncRequestCh := make(chan *syncRequest)
|
||||||
@ -48,9 +48,9 @@ func TestSyncer(t *testing.T) {
|
|||||||
|
|
||||||
time.Sleep(2 * interval) // ensure that syncer runs at least once
|
time.Sleep(2 * interval) // ensure that syncer runs at least once
|
||||||
|
|
||||||
gotInProgress := h.GetInProgressMessages(t, r)
|
gotInProgress := h.GetInProgressMessages(t, r, base.DefaultQueueName)
|
||||||
if l := len(gotInProgress); l != 0 {
|
if l := len(gotInProgress); l != 0 {
|
||||||
t.Errorf("%q has length %d; want 0", base.InProgressQueue, l)
|
t.Errorf("%q has length %d; want 0", base.InProgressKey(base.DefaultQueueName), l)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user