2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-23 01:00:17 +08:00

Update test to match the new API

This commit is contained in:
Ken Hibino 2021-03-17 09:07:43 -07:00
parent 848a03dc16
commit b3b50d26a2
12 changed files with 161 additions and 121 deletions

View File

@ -85,7 +85,7 @@ func getRedisConnOpt(tb testing.TB) RedisConnOpt {
var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task { var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task {
out := append([]*Task(nil), in...) // Copy input to avoid mutating it out := append([]*Task(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool { sort.Slice(out, func(i, j int) bool {
return out[i].Type < out[j].Type return out[i].Type() < out[j].Type()
}) })
return out return out
}) })

View File

@ -6,12 +6,24 @@ package asynq
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"sync" "sync"
"testing" "testing"
"time" "time"
h "github.com/hibiken/asynq/internal/asynqtest"
) )
// Creates a new task of type "task<n>" with payload {"data": n}.
func makeTask(n int) *Task {
b, err := json.Marshal(map[string]int{"data": n})
if err != nil {
panic(err)
}
return NewTask(fmt.Sprintf("task%d", n), b)
}
// Simple E2E Benchmark testing with no scheduled tasks and retries. // Simple E2E Benchmark testing with no scheduled tasks and retries.
func BenchmarkEndToEndSimple(b *testing.B) { func BenchmarkEndToEndSimple(b *testing.B) {
const count = 100000 const count = 100000
@ -29,7 +41,7 @@ func BenchmarkEndToEndSimple(b *testing.B) {
}) })
// Create a bunch of tasks // Create a bunch of tasks
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) t := NewTask(fmt.Sprintf("task%d", i), h.KV(map[string]interface{}{"data": i}))
if _, err := client.Enqueue(t); err != nil { if _, err := client.Enqueue(t); err != nil {
b.Fatalf("could not enqueue a task: %v", err) b.Fatalf("could not enqueue a task: %v", err)
} }
@ -70,14 +82,12 @@ func BenchmarkEndToEnd(b *testing.B) {
}) })
// Create a bunch of tasks // Create a bunch of tasks
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) if _, err := client.Enqueue(makeTask(i)); err != nil {
if _, err := client.Enqueue(t); err != nil {
b.Fatalf("could not enqueue a task: %v", err) b.Fatalf("could not enqueue a task: %v", err)
} }
} }
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i}) if _, err := client.Enqueue(makeTask(i), ProcessIn(1*time.Second)); err != nil {
if _, err := client.Enqueue(t, ProcessIn(1*time.Second)); err != nil {
b.Fatalf("could not enqueue a task: %v", err) b.Fatalf("could not enqueue a task: %v", err)
} }
} }
@ -86,13 +96,18 @@ func BenchmarkEndToEnd(b *testing.B) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(count * 2) wg.Add(count * 2)
handler := func(ctx context.Context, t *Task) error { handler := func(ctx context.Context, t *Task) error {
n, err := t.Payload.GetInt("data") var p map[string]int
if err != nil { if err := json.Unmarshal(t.Payload(), &p); err != nil {
b.Logf("internal error: %v", err) b.Logf("internal error: %v", err)
} }
n, ok := p["data"]
if !ok {
n = 1
b.Logf("internal error: could not get data from payload")
}
retried, ok := GetRetryCount(ctx) retried, ok := GetRetryCount(ctx)
if !ok { if !ok {
b.Logf("internal error: %v", err) b.Logf("internal error: could not get retry count from context")
} }
// Fail 1% of tasks for the first attempt. // Fail 1% of tasks for the first attempt.
if retried == 0 && n%100 == 0 { if retried == 0 && n%100 == 0 {
@ -136,20 +151,17 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) {
}) })
// Create a bunch of tasks // Create a bunch of tasks
for i := 0; i < highCount; i++ { for i := 0; i < highCount; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) if _, err := client.Enqueue(makeTask(i), Queue("high")); err != nil {
if _, err := client.Enqueue(t, Queue("high")); err != nil {
b.Fatalf("could not enqueue a task: %v", err) b.Fatalf("could not enqueue a task: %v", err)
} }
} }
for i := 0; i < defaultCount; i++ { for i := 0; i < defaultCount; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) if _, err := client.Enqueue(makeTask(i)); err != nil {
if _, err := client.Enqueue(t); err != nil {
b.Fatalf("could not enqueue a task: %v", err) b.Fatalf("could not enqueue a task: %v", err)
} }
} }
for i := 0; i < lowCount; i++ { for i := 0; i < lowCount; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) if _, err := client.Enqueue(makeTask(i), Queue("low")); err != nil {
if _, err := client.Enqueue(t, Queue("low")); err != nil {
b.Fatalf("could not enqueue a task: %v", err) b.Fatalf("could not enqueue a task: %v", err)
} }
} }
@ -190,15 +202,13 @@ func BenchmarkClientWhileServerRunning(b *testing.B) {
}) })
// Enqueue 10,000 tasks. // Enqueue 10,000 tasks.
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
t := NewTask(fmt.Sprintf("task%d", i), map[string]interface{}{"data": i}) if _, err := client.Enqueue(makeTask(i)); err != nil {
if _, err := client.Enqueue(t); err != nil {
b.Fatalf("could not enqueue a task: %v", err) b.Fatalf("could not enqueue a task: %v", err)
} }
} }
// Schedule 10,000 tasks. // Schedule 10,000 tasks.
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
t := NewTask(fmt.Sprintf("scheduled%d", i), map[string]interface{}{"data": i}) if _, err := client.Enqueue(makeTask(i), ProcessIn(1*time.Second)); err != nil {
if _, err := client.Enqueue(t, ProcessIn(1*time.Second)); err != nil {
b.Fatalf("could not enqueue a task: %v", err) b.Fatalf("could not enqueue a task: %v", err)
} }
} }
@ -213,7 +223,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) {
b.Log("Starting enqueueing") b.Log("Starting enqueueing")
enqueued := 0 enqueued := 0
for enqueued < 100000 { for enqueued < 100000 {
t := NewTask(fmt.Sprintf("enqueued%d", enqueued), map[string]interface{}{"data": enqueued}) t := NewTask(fmt.Sprintf("enqueued%d", enqueued), h.KV(map[string]interface{}{"data": enqueued}))
if _, err := client.Enqueue(t); err != nil { if _, err := client.Enqueue(t); err != nil {
b.Logf("could not enqueue task %d: %v", enqueued, err) b.Logf("could not enqueue task %d: %v", enqueued, err)
continue continue

View File

@ -20,7 +20,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) task := NewTask("send_email", h.KV(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
var ( var (
now = time.Now() now = time.Now()
@ -52,8 +52,8 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
{ {
Type: task.Type, Type: task.Type(),
Payload: task.Payload.data, Payload: task.Payload(),
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "default", Queue: "default",
Timeout: int64(defaultTimeout.Seconds()), Timeout: int64(defaultTimeout.Seconds()),
@ -85,8 +85,8 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
"default": { "default": {
{ {
Message: &base.TaskMessage{ Message: &base.TaskMessage{
Type: task.Type, Type: task.Type(),
Payload: task.Payload.data, Payload: task.Payload(),
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "default", Queue: "default",
Timeout: int64(defaultTimeout.Seconds()), Timeout: int64(defaultTimeout.Seconds()),
@ -137,7 +137,7 @@ func TestClientEnqueue(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) task := NewTask("send_email", h.KV(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
@ -163,8 +163,8 @@ func TestClientEnqueue(t *testing.T) {
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
{ {
Type: task.Type, Type: task.Type(),
Payload: task.Payload.data, Payload: task.Payload(),
Retry: 3, Retry: 3,
Queue: "default", Queue: "default",
Timeout: int64(defaultTimeout.Seconds()), Timeout: int64(defaultTimeout.Seconds()),
@ -189,8 +189,8 @@ func TestClientEnqueue(t *testing.T) {
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
{ {
Type: task.Type, Type: task.Type(),
Payload: task.Payload.data, Payload: task.Payload(),
Retry: 0, // Retry count should be set to zero Retry: 0, // Retry count should be set to zero
Queue: "default", Queue: "default",
Timeout: int64(defaultTimeout.Seconds()), Timeout: int64(defaultTimeout.Seconds()),
@ -216,8 +216,8 @@ func TestClientEnqueue(t *testing.T) {
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
{ {
Type: task.Type, Type: task.Type(),
Payload: task.Payload.data, Payload: task.Payload(),
Retry: 10, // Last option takes precedence Retry: 10, // Last option takes precedence
Queue: "default", Queue: "default",
Timeout: int64(defaultTimeout.Seconds()), Timeout: int64(defaultTimeout.Seconds()),
@ -242,8 +242,8 @@ func TestClientEnqueue(t *testing.T) {
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"custom": { "custom": {
{ {
Type: task.Type, Type: task.Type(),
Payload: task.Payload.data, Payload: task.Payload(),
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "custom", Queue: "custom",
Timeout: int64(defaultTimeout.Seconds()), Timeout: int64(defaultTimeout.Seconds()),
@ -268,8 +268,8 @@ func TestClientEnqueue(t *testing.T) {
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"high": { "high": {
{ {
Type: task.Type, Type: task.Type(),
Payload: task.Payload.data, Payload: task.Payload(),
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "high", Queue: "high",
Timeout: int64(defaultTimeout.Seconds()), Timeout: int64(defaultTimeout.Seconds()),
@ -294,8 +294,8 @@ func TestClientEnqueue(t *testing.T) {
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
{ {
Type: task.Type, Type: task.Type(),
Payload: task.Payload.data, Payload: task.Payload(),
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "default", Queue: "default",
Timeout: 20, Timeout: 20,
@ -320,8 +320,8 @@ func TestClientEnqueue(t *testing.T) {
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
{ {
Type: task.Type, Type: task.Type(),
Payload: task.Payload.data, Payload: task.Payload(),
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "default", Queue: "default",
Timeout: int64(noTimeout.Seconds()), Timeout: int64(noTimeout.Seconds()),
@ -347,8 +347,8 @@ func TestClientEnqueue(t *testing.T) {
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
{ {
Type: task.Type, Type: task.Type(),
Payload: task.Payload.data, Payload: task.Payload(),
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "default", Queue: "default",
Timeout: 20, Timeout: 20,
@ -390,7 +390,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) task := NewTask("send_email", h.KV(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
@ -421,8 +421,8 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
"default": { "default": {
{ {
Message: &base.TaskMessage{ Message: &base.TaskMessage{
Type: task.Type, Type: task.Type(),
Payload: task.Payload.data, Payload: task.Payload(),
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "default", Queue: "default",
Timeout: int64(defaultTimeout.Seconds()), Timeout: int64(defaultTimeout.Seconds()),
@ -448,8 +448,8 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
{ {
Type: task.Type, Type: task.Type(),
Payload: task.Payload.data, Payload: task.Payload(),
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
Queue: "default", Queue: "default",
Timeout: int64(defaultTimeout.Seconds()), Timeout: int64(defaultTimeout.Seconds()),
@ -501,7 +501,7 @@ func TestClientEnqueueError(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) task := NewTask("send_email", h.KV(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
tests := []struct { tests := []struct {
desc string desc string
@ -613,7 +613,7 @@ func TestClientDefaultOptions(t *testing.T) {
h.FlushDB(t, r) h.FlushDB(t, r)
c := NewClient(getRedisConnOpt(t)) c := NewClient(getRedisConnOpt(t))
defer c.Close() defer c.Close()
c.SetDefaultOptions(tc.task.Type, tc.defaultOpts...) c.SetDefaultOptions(tc.task.Type(), tc.defaultOpts...)
gotRes, err := c.Enqueue(tc.task, tc.opts...) gotRes, err := c.Enqueue(tc.task, tc.opts...)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -650,7 +650,7 @@ func TestClientEnqueueUnique(t *testing.T) {
ttl time.Duration ttl time.Duration
}{ }{
{ {
NewTask("email", map[string]interface{}{"user_id": 123}), NewTask("email", h.KV(map[string]interface{}{"user_id": 123})),
time.Hour, time.Hour,
}, },
} }
@ -664,7 +664,7 @@ func TestClientEnqueueUnique(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type, tc.task.Payload.data)).Val() gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type(), tc.task.Payload())).Val()
if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL = %v, want %v", gotTTL, tc.ttl) t.Errorf("TTL = %v, want %v", gotTTL, tc.ttl)
continue continue
@ -709,7 +709,7 @@ func TestClientEnqueueUniqueWithProcessInOption(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type, tc.task.Payload.data)).Val() gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type(), tc.task.Payload())).Val()
wantTTL := time.Duration(tc.ttl.Seconds()+tc.d.Seconds()) * time.Second wantTTL := time.Duration(tc.ttl.Seconds()+tc.d.Seconds()) * time.Second
if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL = %v, want %v", gotTTL, wantTTL) t.Errorf("TTL = %v, want %v", gotTTL, wantTTL)
@ -755,7 +755,7 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type, tc.task.Payload.data)).Val() gotTTL := r.TTL(base.UniqueKey(base.DefaultQueueName, tc.task.Type(), tc.task.Payload())).Val()
wantTTL := tc.at.Add(tc.ttl).Sub(time.Now()) wantTTL := tc.at.Add(tc.ttl).Sub(time.Now())
if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
t.Errorf("TTL = %v, want %v", gotTTL, wantTTL) t.Errorf("TTL = %v, want %v", gotTTL, wantTTL)
@ -774,4 +774,3 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
} }
} }
} }

View File

@ -574,7 +574,7 @@ func TestInspectorListPendingTasks(t *testing.T) {
tc.desc, tc.qname, err) tc.desc, tc.qname, err)
continue continue
} }
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}) ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("%s; ListPendingTasks(%q) = %v, want %v; (-want,+got)\n%s", t.Errorf("%s; ListPendingTasks(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff) tc.desc, tc.qname, got, tc.want, diff)
@ -632,7 +632,7 @@ func TestInspectorListActiveTasks(t *testing.T) {
t.Errorf("%s; ListActiveTasks(%q) returned error: %v", tc.qname, tc.desc, err) t.Errorf("%s; ListActiveTasks(%q) returned error: %v", tc.qname, tc.desc, err)
continue continue
} }
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}) ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("%s; ListActiveTask(%q) = %v, want %v; (-want,+got)\n%s", t.Errorf("%s; ListActiveTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff) tc.desc, tc.qname, got, tc.want, diff)
@ -708,7 +708,7 @@ func TestInspectorListScheduledTasks(t *testing.T) {
t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err) t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err)
continue continue
} }
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}, ScheduledTask{}) ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, ScheduledTask{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("%s; ListScheduledTask(%q) = %v, want %v; (-want,+got)\n%s", t.Errorf("%s; ListScheduledTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff) tc.desc, tc.qname, got, tc.want, diff)
@ -785,7 +785,7 @@ func TestInspectorListRetryTasks(t *testing.T) {
t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err) t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err)
continue continue
} }
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}, RetryTask{}) ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, RetryTask{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("%s; ListRetryTask(%q) = %v, want %v; (-want,+got)\n%s", t.Errorf("%s; ListRetryTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff) tc.desc, tc.qname, got, tc.want, diff)
@ -861,7 +861,7 @@ func TestInspectorListArchivedTasks(t *testing.T) {
t.Errorf("%s; ListArchivedTasks(%q) returned error: %v", tc.desc, tc.qname, err) t.Errorf("%s; ListArchivedTasks(%q) returned error: %v", tc.desc, tc.qname, err)
continue continue
} }
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}, ArchivedTask{}) ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, ArchivedTask{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("%s; ListArchivedTask(%q) = %v, want %v; (-want,+got)\n%s", t.Errorf("%s; ListArchivedTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff) tc.desc, tc.qname, got, tc.want, diff)
@ -922,7 +922,7 @@ func TestInspectorListPagination(t *testing.T) {
t.Errorf("ListPendingTask('default') returned error: %v", err) t.Errorf("ListPendingTask('default') returned error: %v", err)
continue continue
} }
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}) ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("ListPendingTask('default') = %v, want %v; (-want,+got)\n%s", t.Errorf("ListPendingTask('default') = %v, want %v; (-want,+got)\n%s",
got, tc.want, diff) got, tc.want, diff)
@ -2598,7 +2598,7 @@ func TestInspectorSchedulerEntries(t *testing.T) {
{ {
Spec: "@every 20m", Spec: "@every 20m",
Type: "bar", Type: "bar",
Payload: map[string]interface{}{"fiz": "baz"}, Payload: h.KV(map[string]interface{}{"fiz": "baz"}),
Opts: []string{`Queue("bar")`, `MaxRetry(20)`}, Opts: []string{`Queue("bar")`, `MaxRetry(20)`},
Next: now.Add(1 * time.Minute), Next: now.Add(1 * time.Minute),
Prev: now.Add(-19 * time.Minute), Prev: now.Add(-19 * time.Minute),
@ -2614,7 +2614,7 @@ func TestInspectorSchedulerEntries(t *testing.T) {
}, },
{ {
Spec: "@every 20m", Spec: "@every 20m",
Task: asynq.NewTask("bar", map[string]interface{}{"fiz": "baz"}), Task: asynq.NewTask("bar", h.KV(map[string]interface{}{"fiz": "baz"})),
Opts: []asynq.Option{asynq.Queue("bar"), asynq.MaxRetry(20)}, Opts: []asynq.Option{asynq.Queue("bar"), asynq.MaxRetry(20)},
Next: now.Add(1 * time.Minute), Next: now.Add(1 * time.Minute),
Prev: now.Add(-19 * time.Minute), Prev: now.Add(-19 * time.Minute),
@ -2634,7 +2634,7 @@ func TestInspectorSchedulerEntries(t *testing.T) {
t.Errorf("SchedulerEntries() returned error: %v", err) t.Errorf("SchedulerEntries() returned error: %v", err)
continue continue
} }
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}) ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{})
if diff := cmp.Diff(tc.want, got, sortSchedulerEntry, ignoreOpt); diff != "" { if diff := cmp.Diff(tc.want, got, sortSchedulerEntry, ignoreOpt); diff != "" {
t.Errorf("SchedulerEntries() = %v, want %v; (-want,+got)\n%s", t.Errorf("SchedulerEntries() = %v, want %v; (-want,+got)\n%s",
got, tc.want, diff) got, tc.want, diff)

View File

@ -6,6 +6,7 @@
package asynqtest package asynqtest
import ( import (
"encoding/json"
"math" "math"
"sort" "sort"
"testing" "testing"
@ -111,6 +112,15 @@ func NewTaskMessageWithQueue(taskType string, payload []byte, qname string) *bas
} }
} }
// KV serializes the given key-value pairs into stream of bytes.
func KV(kv map[string]interface{}) []byte {
b, err := json.Marshal(kv)
if err != nil {
panic(err)
}
return b
}
// TaskMessageAfterRetry returns an updated copy of t after retry. // TaskMessageAfterRetry returns an updated copy of t after retry.
// It increments retry count and sets the error message. // It increments retry count and sets the error message.
func TaskMessageAfterRetry(t base.TaskMessage, errMsg string) *base.TaskMessage { func TaskMessageAfterRetry(t base.TaskMessage, errMsg string) *base.TaskMessage {

View File

@ -267,52 +267,68 @@ func TestSchedulerHistoryKey(t *testing.T) {
} }
} }
func toBytes(m map[string]interface{}) []byte {
b, err := json.Marshal(m)
if err != nil {
panic(err)
}
return b
}
func TestUniqueKey(t *testing.T) { func TestUniqueKey(t *testing.T) {
tests := []struct { tests := []struct {
desc string desc string
qname string qname string
tasktype string tasktype string
payload map[string]interface{} payload []byte
want string want string
}{ }{
{ {
"with primitive types", "with primitive types",
"default", "default",
"email:send", "email:send",
map[string]interface{}{"a": 123, "b": "hello", "c": true}, toBytes(map[string]interface{}{"a": 123, "b": "hello", "c": true}),
"asynq:{default}:unique:email:send:a=123,b=hello,c=true", fmt.Sprintf("asynq:{default}:unique:email:send:%s",
string(toBytes(map[string]interface{}{"a": 123, "b": "hello", "c": true}))),
}, },
{ {
"with unsorted keys", "with unsorted keys",
"default", "default",
"email:send", "email:send",
map[string]interface{}{"b": "hello", "c": true, "a": 123}, toBytes(map[string]interface{}{"b": "hello", "c": true, "a": 123}),
"asynq:{default}:unique:email:send:a=123,b=hello,c=true", fmt.Sprintf("asynq:{default}:unique:email:send:%s",
string(toBytes(map[string]interface{}{"b": "hello", "c": true, "a": 123}))),
}, },
{ {
"with composite types", "with composite types",
"default", "default",
"email:send", "email:send",
map[string]interface{}{ toBytes(map[string]interface{}{
"address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"}, "address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"},
"names": []string{"bob", "mike", "rob"}}, "names": []string{"bob", "mike", "rob"}}),
"asynq:{default}:unique:email:send:address=map[city:Boston line:123 Main St state:MA],names=[bob mike rob]", fmt.Sprintf("asynq:{default}:unique:email:send:%s",
string(toBytes(map[string]interface{}{
"address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"},
"names": []string{"bob", "mike", "rob"}}))),
}, },
{ {
"with complex types", "with complex types",
"default", "default",
"email:send", "email:send",
map[string]interface{}{ toBytes(map[string]interface{}{
"time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC), "time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC),
"duration": time.Hour}, "duration": time.Hour}),
"asynq:{default}:unique:email:send:duration=1h0m0s,time=2020-07-28 00:00:00 +0000 UTC", fmt.Sprintf("asynq:{default}:unique:email:send:%s",
string(toBytes(map[string]interface{}{
"time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC),
"duration": time.Hour}))),
}, },
{ {
"with nil payload", "with nil payload",
"default", "default",
"reindex", "reindex",
nil, nil,
"asynq:{default}:unique:reindex:nil", "asynq:{default}:unique:reindex:",
}, },
} }
@ -333,7 +349,7 @@ func TestMessageEncoding(t *testing.T) {
{ {
in: &TaskMessage{ in: &TaskMessage{
Type: "task1", Type: "task1",
Payload: map[string]interface{}{"a": 1, "b": "hello!", "c": true}, Payload: toBytes(map[string]interface{}{"a": 1, "b": "hello!", "c": true}),
ID: id, ID: id,
Queue: "default", Queue: "default",
Retry: 10, Retry: 10,
@ -343,7 +359,7 @@ func TestMessageEncoding(t *testing.T) {
}, },
out: &TaskMessage{ out: &TaskMessage{
Type: "task1", Type: "task1",
Payload: map[string]interface{}{"a": json.Number("1"), "b": "hello!", "c": true}, Payload: toBytes(map[string]interface{}{"a": json.Number("1"), "b": "hello!", "c": true}),
ID: id, ID: id,
Queue: "default", Queue: "default",
Retry: 10, Retry: 10,
@ -420,7 +436,7 @@ func TestWorkerInfoEncoding(t *testing.T) {
ServerID: "abc123", ServerID: "abc123",
ID: uuid.NewString(), ID: uuid.NewString(),
Type: "taskA", Type: "taskA",
Payload: map[string]interface{}{"foo": "bar"}, Payload: toBytes(map[string]interface{}{"foo": "bar"}),
Queue: "default", Queue: "default",
Started: time.Now().Add(-3 * time.Hour), Started: time.Now().Add(-3 * time.Hour),
Deadline: time.Now().Add(30 * time.Second), Deadline: time.Now().Add(30 * time.Second),
@ -455,7 +471,7 @@ func TestSchedulerEntryEncoding(t *testing.T) {
ID: uuid.NewString(), ID: uuid.NewString(),
Spec: "* * * * *", Spec: "* * * * *",
Type: "task_A", Type: "task_A",
Payload: map[string]interface{}{"foo": "bar"}, Payload: toBytes(map[string]interface{}{"foo": "bar"}),
Opts: []string{"Queue('email')"}, Opts: []string{"Queue('email')"},
Next: time.Now().Add(30 * time.Second).UTC(), Next: time.Now().Add(30 * time.Second).UTC(),
Prev: time.Now().Add(-2 * time.Minute).UTC(), Prev: time.Now().Add(-2 * time.Minute).UTC(),

View File

@ -50,9 +50,9 @@ func TestAllQueues(t *testing.T) {
func TestCurrentStats(t *testing.T) { func TestCurrentStats(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) m1 := h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"subject": "hello"}))
m2 := h.NewTaskMessage("reindex", nil) m2 := h.NewTaskMessage("reindex", nil)
m3 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"src": "some/path/to/img"}) m3 := h.NewTaskMessage("gen_thumbnail", h.KV(map[string]interface{}{"src": "some/path/to/img"}))
m4 := h.NewTaskMessage("sync", nil) m4 := h.NewTaskMessage("sync", nil)
m5 := h.NewTaskMessageWithQueue("important_notification", nil, "critical") m5 := h.NewTaskMessageWithQueue("important_notification", nil, "critical")
m6 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") m6 := h.NewTaskMessageWithQueue("minor_notification", nil, "low")
@ -312,7 +312,7 @@ func TestListPending(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
m1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) m1 := h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"subject": "hello"}))
m2 := h.NewTaskMessage("reindex", nil) m2 := h.NewTaskMessage("reindex", nil)
m3 := h.NewTaskMessageWithQueue("important_notification", nil, "critical") m3 := h.NewTaskMessageWithQueue("important_notification", nil, "critical")
m4 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") m4 := h.NewTaskMessageWithQueue("minor_notification", nil, "low")
@ -3282,9 +3282,9 @@ func TestListWorkers(t *testing.T) {
pid = 4567 pid = 4567
serverID = "server123" serverID = "server123"
m1 = h.NewTaskMessage("send_email", map[string]interface{}{"user_id": "abc123"}) m1 = h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"user_id": "abc123"}))
m2 = h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/image/file"}) m2 = h.NewTaskMessage("gen_thumbnail", h.KV(map[string]interface{}{"path": "some/path/to/image/file"}))
m3 = h.NewTaskMessage("reindex", map[string]interface{}{}) m3 = h.NewTaskMessage("reindex", h.KV(map[string]interface{}{}))
) )
tests := []struct { tests := []struct {
@ -3367,7 +3367,7 @@ func TestWriteListClearSchedulerEntries(t *testing.T) {
{ {
Spec: "@every 20m", Spec: "@every 20m",
Type: "bar", Type: "bar",
Payload: map[string]interface{}{"fiz": "baz"}, Payload: h.KV(map[string]interface{}{"fiz": "baz"}),
Opts: nil, Opts: nil,
Next: now.Add(1 * time.Minute), Next: now.Add(1 * time.Minute),
Prev: now.Add(-19 * time.Minute), Prev: now.Add(-19 * time.Minute),

View File

@ -61,8 +61,8 @@ func setup(tb testing.TB) (r *RDB) {
func TestEnqueue(t *testing.T) { func TestEnqueue(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"}) t1 := h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"}))
t2 := h.NewTaskMessageWithQueue("generate_csv", map[string]interface{}{}, "csv") t2 := h.NewTaskMessageWithQueue("generate_csv", h.KV(map[string]interface{}{}), "csv")
t3 := h.NewTaskMessageWithQueue("sync", nil, "low") t3 := h.NewTaskMessageWithQueue("sync", nil, "low")
tests := []struct { tests := []struct {
@ -101,9 +101,9 @@ func TestEnqueueUnique(t *testing.T) {
m1 := base.TaskMessage{ m1 := base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "email", Type: "email",
Payload: map[string]interface{}{"user_id": json.Number("123")}, Payload: h.KV(map[string]interface{}{"user_id": json.Number("123")}),
Queue: base.DefaultQueueName, Queue: base.DefaultQueueName,
UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", map[string]interface{}{"user_id": 123}), UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.KV(map[string]interface{}{"user_id": 123})),
} }
tests := []struct { tests := []struct {
@ -157,7 +157,7 @@ func TestDequeue(t *testing.T) {
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "send_email", Type: "send_email",
Payload: map[string]interface{}{"subject": "hello!"}, Payload: h.KV(map[string]interface{}{"subject": "hello!"}),
Queue: "default", Queue: "default",
Timeout: 1800, Timeout: 1800,
Deadline: 0, Deadline: 0,
@ -355,7 +355,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "send_email", Type: "send_email",
Payload: map[string]interface{}{"subject": "hello!"}, Payload: h.KV(map[string]interface{}{"subject": "hello!"}),
Queue: "default", Queue: "default",
Timeout: 1800, Timeout: 1800,
Deadline: 0, Deadline: 0,
@ -767,7 +767,7 @@ func TestRequeue(t *testing.T) {
func TestSchedule(t *testing.T) { func TestSchedule(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
msg := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"}) msg := h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"subject": "hello"}))
tests := []struct { tests := []struct {
msg *base.TaskMessage msg *base.TaskMessage
processAt time.Time processAt time.Time
@ -808,9 +808,9 @@ func TestScheduleUnique(t *testing.T) {
m1 := base.TaskMessage{ m1 := base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "email", Type: "email",
Payload: map[string]interface{}{"user_id": 123}, Payload: h.KV(map[string]interface{}{"user_id": 123}),
Queue: base.DefaultQueueName, Queue: base.DefaultQueueName,
UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", map[string]interface{}{"user_id": 123}), UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.KV(map[string]interface{}{"user_id": 123})),
} }
tests := []struct { tests := []struct {
@ -866,7 +866,7 @@ func TestRetry(t *testing.T) {
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "send_email", Type: "send_email",
Payload: map[string]interface{}{"subject": "Hola!"}, Payload: h.KV(map[string]interface{}{"subject": "Hola!"}),
Retried: 10, Retried: 10,
Timeout: 1800, Timeout: 1800,
Queue: "default", Queue: "default",
@ -874,7 +874,7 @@ func TestRetry(t *testing.T) {
t2 := &base.TaskMessage{ t2 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "gen_thumbnail", Type: "gen_thumbnail",
Payload: map[string]interface{}{"path": "some/path/to/image.jpg"}, Payload: h.KV(map[string]interface{}{"path": "some/path/to/image.jpg"}),
Timeout: 3000, Timeout: 3000,
Queue: "default", Queue: "default",
} }
@ -1530,8 +1530,8 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
pid = 4242 pid = 4242
serverID = "server123" serverID = "server123"
msg1 = h.NewTaskMessage("send_email", map[string]interface{}{"user_id": "123"}) msg1 = h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"user_id": "123"}))
msg2 = h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/imgfile"}) msg2 = h.NewTaskMessage("gen_thumbnail", h.KV(map[string]interface{}{"path": "some/path/to/imgfile"}))
ttl = 5 * time.Second ttl = 5 * time.Second
) )
@ -1642,8 +1642,8 @@ func TestClearServerState(t *testing.T) {
otherPID = 9876 otherPID = 9876
otherServerID = "server987" otherServerID = "server987"
msg1 = h.NewTaskMessage("send_email", map[string]interface{}{"user_id": "123"}) msg1 = h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"user_id": "123"}))
msg2 = h.NewTaskMessage("gen_thumbnail", map[string]interface{}{"path": "some/path/to/imgfile"}) msg2 = h.NewTaskMessage("gen_thumbnail", h.KV(map[string]interface{}{"path": "some/path/to/imgfile"}))
ttl = 5 * time.Second ttl = 5 * time.Second
) )

View File

@ -6,6 +6,7 @@ package asynq
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"sort" "sort"
"sync" "sync"
@ -13,7 +14,6 @@ import (
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
h "github.com/hibiken/asynq/internal/asynqtest" h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
@ -124,7 +124,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
p.terminate() p.terminate()
mu.Lock() mu.Lock()
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" { if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" {
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
} }
mu.Unlock() mu.Unlock()
@ -216,7 +216,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
p.terminate() p.terminate()
mu.Lock() mu.Lock()
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" { if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" {
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
} }
mu.Unlock() mu.Unlock()
@ -228,7 +228,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
r := setup(t) r := setup(t)
rdbClient := rdb.NewRDB(r) rdbClient := rdb.NewRDB(r)
m1 := h.NewTaskMessage("large_number", map[string]interface{}{"data": 111111111111111111}) m1 := h.NewTaskMessage("large_number", h.KV(map[string]interface{}{"data": 111111111111111111}))
t1 := NewTask(m1.Type, m1.Payload) t1 := NewTask(m1.Type, m1.Payload)
tests := []struct { tests := []struct {
@ -250,10 +250,14 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
handler := func(ctx context.Context, task *Task) error { handler := func(ctx context.Context, task *Task) error {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
if data, err := task.Payload.GetInt("data"); err != nil { var payload map[string]int
t.Errorf("coult not get data from payload: %v", err) if err := json.Unmarshal(task.Payload(), &payload); err != nil {
} else { t.Errorf("coult not decode payload: %v", err)
}
if data, ok := payload["data"]; ok {
t.Logf("data == %d", data) t.Logf("data == %d", data)
} else {
t.Errorf("could not get data from payload")
} }
processed = append(processed, task) processed = append(processed, task)
return nil return nil
@ -289,7 +293,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
p.terminate() p.terminate()
mu.Lock() mu.Lock()
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmpopts.IgnoreUnexported(Payload{})); diff != "" { if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" {
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
} }
mu.Unlock() mu.Unlock()
@ -592,7 +596,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
} }
p.terminate() p.terminate()
if diff := cmp.Diff(tc.wantProcessed, processed, cmp.AllowUnexported(Payload{})); diff != "" { if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" {
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff) t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
} }
@ -611,7 +615,7 @@ func TestProcessorPerform(t *testing.T) {
handler: func(ctx context.Context, t *Task) error { handler: func(ctx context.Context, t *Task) error {
return nil return nil
}, },
task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), task: NewTask("gen_thumbnail", h.KV(map[string]interface{}{"src": "some/img/path"})),
wantErr: false, wantErr: false,
}, },
{ {
@ -619,7 +623,7 @@ func TestProcessorPerform(t *testing.T) {
handler: func(ctx context.Context, t *Task) error { handler: func(ctx context.Context, t *Task) error {
return fmt.Errorf("something went wrong") return fmt.Errorf("something went wrong")
}, },
task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), task: NewTask("gen_thumbnail", h.KV(map[string]interface{}{"src": "some/img/path"})),
wantErr: true, wantErr: true,
}, },
{ {
@ -627,7 +631,7 @@ func TestProcessorPerform(t *testing.T) {
handler: func(ctx context.Context, t *Task) error { handler: func(ctx context.Context, t *Task) error {
panic("something went terribly wrong") panic("something went terribly wrong")
}, },
task: NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), task: NewTask("gen_thumbnail", h.KV(map[string]interface{}{"src": "some/img/path"})),
wantErr: true, wantErr: true,
}, },
} }

View File

@ -151,7 +151,7 @@ func (mux *ServeMux) Use(mws ...MiddlewareFunc) {
// NotFound returns an error indicating that the handler was not found for the given task. // NotFound returns an error indicating that the handler was not found for the given task.
func NotFound(ctx context.Context, task *Task) error { func NotFound(ctx context.Context, task *Task) error {
return fmt.Errorf("handler not found for task %q", task.Type) return fmt.Errorf("handler not found for task %q", task.Type())
} }
// NotFoundHandler returns a simple task handler that returns a ``not found`` error. // NotFoundHandler returns a simple task handler that returns a ``not found`` error.

View File

@ -68,7 +68,7 @@ func TestServeMux(t *testing.T) {
} }
if called != tc.want { if called != tc.want {
t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type, tc.want) t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type(), tc.want)
} }
} }
} }
@ -124,7 +124,7 @@ func TestServeMuxNotFound(t *testing.T) {
task := NewTask(tc.typename, nil) task := NewTask(tc.typename, nil)
err := mux.ProcessTask(context.Background(), task) err := mux.ProcessTask(context.Background(), task)
if err == nil { if err == nil {
t.Errorf("ProcessTask did not return error for task %q, should return 'not found' error", task.Type) t.Errorf("ProcessTask did not return error for task %q, should return 'not found' error", task.Type())
} }
} }
} }
@ -164,7 +164,7 @@ func TestServeMuxMiddlewares(t *testing.T) {
} }
if called != tc.want { if called != tc.want {
t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type, tc.want) t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type(), tc.want)
} }
} }
} }

View File

@ -11,6 +11,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
"github.com/hibiken/asynq/internal/testbroker" "github.com/hibiken/asynq/internal/testbroker"
"go.uber.org/goleak" "go.uber.org/goleak"
@ -39,12 +40,12 @@ func TestServer(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
_, err = c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 123})) _, err = c.Enqueue(NewTask("send_email", asynqtest.KV(map[string]interface{}{"recipient_id": 123})))
if err != nil { if err != nil {
t.Errorf("could not enqueue a task: %v", err) t.Errorf("could not enqueue a task: %v", err)
} }
_, err = c.Enqueue(NewTask("send_email", map[string]interface{}{"recipient_id": 456}), ProcessIn(1*time.Hour)) _, err = c.Enqueue(NewTask("send_email", asynqtest.KV(map[string]interface{}{"recipient_id": 456})), ProcessIn(1*time.Hour))
if err != nil { if err != nil {
t.Errorf("could not enqueue a task: %v", err) t.Errorf("could not enqueue a task: %v", err)
} }
@ -169,8 +170,8 @@ func TestServerWithFlakyBroker(t *testing.T) {
h := func(ctx context.Context, task *Task) error { h := func(ctx context.Context, task *Task) error {
// force task retry. // force task retry.
if task.Type == "bad_task" { if task.Type() == "bad_task" {
return fmt.Errorf("could not process %q", task.Type) return fmt.Errorf("could not process %q", task.Type())
} }
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
return nil return nil