2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-22 16:50:18 +08:00

Fix build

This commit is contained in:
Ken Hibino 2021-03-20 06:54:00 -07:00
parent 4c53446c10
commit 6171a6400d
10 changed files with 40 additions and 47 deletions

View File

@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed ### Changed
- NewTask function now takes array of bytes as payload. - NewTask function now takes array of bytes as payload.
- Task `Type` and `Payload` should be accessed by a method call.
- Requires redis v4.0+ for multiple field/value pair support - Requires redis v4.0+ for multiple field/value pair support
- Renamed pending key (TODO: need migration script) - Renamed pending key (TODO: need migration script)

View File

@ -41,8 +41,7 @@ func BenchmarkEndToEndSimple(b *testing.B) {
}) })
// Create a bunch of tasks // Create a bunch of tasks
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
t := NewTask(fmt.Sprintf("task%d", i), h.KV(map[string]interface{}{"data": i})) if _, err := client.Enqueue(makeTask(i)); err != nil {
if _, err := client.Enqueue(t); err != nil {
b.Fatalf("could not enqueue a task: %v", err) b.Fatalf("could not enqueue a task: %v", err)
} }
} }
@ -223,7 +222,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) {
b.Log("Starting enqueueing") b.Log("Starting enqueueing")
enqueued := 0 enqueued := 0
for enqueued < 100000 { for enqueued < 100000 {
t := NewTask(fmt.Sprintf("enqueued%d", enqueued), h.KV(map[string]interface{}{"data": enqueued})) t := NewTask(fmt.Sprintf("enqueued%d", enqueued), h.JSON(map[string]interface{}{"data": enqueued}))
if _, err := client.Enqueue(t); err != nil { if _, err := client.Enqueue(t); err != nil {
b.Logf("could not enqueue task %d: %v", enqueued, err) b.Logf("could not enqueue task %d: %v", enqueued, err)
continue continue

View File

@ -20,7 +20,7 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", h.KV(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
var ( var (
now = time.Now() now = time.Now()
@ -137,7 +137,7 @@ func TestClientEnqueue(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", h.KV(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
@ -390,7 +390,7 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", h.KV(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
@ -501,7 +501,7 @@ func TestClientEnqueueError(t *testing.T) {
client := NewClient(getRedisConnOpt(t)) client := NewClient(getRedisConnOpt(t))
defer client.Close() defer client.Close()
task := NewTask("send_email", h.KV(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})) task := NewTask("send_email", h.JSON(map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}))
tests := []struct { tests := []struct {
desc string desc string
@ -650,7 +650,7 @@ func TestClientEnqueueUnique(t *testing.T) {
ttl time.Duration ttl time.Duration
}{ }{
{ {
NewTask("email", h.KV(map[string]interface{}{"user_id": 123})), NewTask("email", h.JSON(map[string]interface{}{"user_id": 123})),
time.Hour, time.Hour,
}, },
} }

View File

@ -2598,7 +2598,7 @@ func TestInspectorSchedulerEntries(t *testing.T) {
{ {
Spec: "@every 20m", Spec: "@every 20m",
Type: "bar", Type: "bar",
Payload: h.KV(map[string]interface{}{"fiz": "baz"}), Payload: h.JSON(map[string]interface{}{"fiz": "baz"}),
Opts: []string{`Queue("bar")`, `MaxRetry(20)`}, Opts: []string{`Queue("bar")`, `MaxRetry(20)`},
Next: now.Add(1 * time.Minute), Next: now.Add(1 * time.Minute),
Prev: now.Add(-19 * time.Minute), Prev: now.Add(-19 * time.Minute),
@ -2614,7 +2614,7 @@ func TestInspectorSchedulerEntries(t *testing.T) {
}, },
{ {
Spec: "@every 20m", Spec: "@every 20m",
Task: asynq.NewTask("bar", h.KV(map[string]interface{}{"fiz": "baz"})), Task: asynq.NewTask("bar", h.JSON(map[string]interface{}{"fiz": "baz"})),
Opts: []asynq.Option{asynq.Queue("bar"), asynq.MaxRetry(20)}, Opts: []asynq.Option{asynq.Queue("bar"), asynq.MaxRetry(20)},
Next: now.Add(1 * time.Minute), Next: now.Add(1 * time.Minute),
Prev: now.Add(-19 * time.Minute), Prev: now.Add(-19 * time.Minute),

View File

@ -112,8 +112,8 @@ func NewTaskMessageWithQueue(taskType string, payload []byte, qname string) *bas
} }
} }
// KV serializes the given key-value pairs into stream of bytes. // JSON serializes the given key-value pairs into stream of bytes in JSON.
func KV(kv map[string]interface{}) []byte { func JSON(kv map[string]interface{}) []byte {
b, err := json.Marshal(kv) b, err := json.Marshal(kv)
if err != nil { if err != nil {
panic(err) panic(err)

View File

@ -765,17 +765,10 @@ func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error {
n, ok := res.(int64) n, ok := res.(int64)
if !ok { if !ok {
return fmt.Errorf("command error: unexpected return value %v", res) return fmt.Errorf("command error: unexpected return value %v", res)
<<<<<<< HEAD
} }
if n == 0 { if n == 0 {
return ErrTaskNotFound return ErrTaskNotFound
} }
=======
}
if n == 0 {
return ErrTaskNotFound
}
>>>>>>> e0402fe... Refactor redis keys and store messages in protobuf
return nil return nil
} }

View File

@ -50,9 +50,9 @@ func TestAllQueues(t *testing.T) {
func TestCurrentStats(t *testing.T) { func TestCurrentStats(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
m1 := h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"subject": "hello"})) m1 := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"}))
m2 := h.NewTaskMessage("reindex", nil) m2 := h.NewTaskMessage("reindex", nil)
m3 := h.NewTaskMessage("gen_thumbnail", h.KV(map[string]interface{}{"src": "some/path/to/img"})) m3 := h.NewTaskMessage("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/path/to/img"}))
m4 := h.NewTaskMessage("sync", nil) m4 := h.NewTaskMessage("sync", nil)
m5 := h.NewTaskMessageWithQueue("important_notification", nil, "critical") m5 := h.NewTaskMessageWithQueue("important_notification", nil, "critical")
m6 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") m6 := h.NewTaskMessageWithQueue("minor_notification", nil, "low")
@ -312,7 +312,7 @@ func TestListPending(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
m1 := h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"subject": "hello"})) m1 := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"}))
m2 := h.NewTaskMessage("reindex", nil) m2 := h.NewTaskMessage("reindex", nil)
m3 := h.NewTaskMessageWithQueue("important_notification", nil, "critical") m3 := h.NewTaskMessageWithQueue("important_notification", nil, "critical")
m4 := h.NewTaskMessageWithQueue("minor_notification", nil, "low") m4 := h.NewTaskMessageWithQueue("minor_notification", nil, "low")
@ -3282,9 +3282,9 @@ func TestListWorkers(t *testing.T) {
pid = 4567 pid = 4567
serverID = "server123" serverID = "server123"
m1 = h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"user_id": "abc123"})) m1 = h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"user_id": "abc123"}))
m2 = h.NewTaskMessage("gen_thumbnail", h.KV(map[string]interface{}{"path": "some/path/to/image/file"})) m2 = h.NewTaskMessage("gen_thumbnail", h.JSON(map[string]interface{}{"path": "some/path/to/image/file"}))
m3 = h.NewTaskMessage("reindex", h.KV(map[string]interface{}{})) m3 = h.NewTaskMessage("reindex", h.JSON(map[string]interface{}{}))
) )
tests := []struct { tests := []struct {
@ -3367,7 +3367,7 @@ func TestWriteListClearSchedulerEntries(t *testing.T) {
{ {
Spec: "@every 20m", Spec: "@every 20m",
Type: "bar", Type: "bar",
Payload: h.KV(map[string]interface{}{"fiz": "baz"}), Payload: h.JSON(map[string]interface{}{"fiz": "baz"}),
Opts: nil, Opts: nil,
Next: now.Add(1 * time.Minute), Next: now.Add(1 * time.Minute),
Prev: now.Add(-19 * time.Minute), Prev: now.Add(-19 * time.Minute),

View File

@ -61,8 +61,8 @@ func setup(tb testing.TB) (r *RDB) {
func TestEnqueue(t *testing.T) { func TestEnqueue(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
t1 := h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"})) t1 := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"}))
t2 := h.NewTaskMessageWithQueue("generate_csv", h.KV(map[string]interface{}{}), "csv") t2 := h.NewTaskMessageWithQueue("generate_csv", h.JSON(map[string]interface{}{}), "csv")
t3 := h.NewTaskMessageWithQueue("sync", nil, "low") t3 := h.NewTaskMessageWithQueue("sync", nil, "low")
tests := []struct { tests := []struct {
@ -101,9 +101,9 @@ func TestEnqueueUnique(t *testing.T) {
m1 := base.TaskMessage{ m1 := base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "email", Type: "email",
Payload: h.KV(map[string]interface{}{"user_id": json.Number("123")}), Payload: h.JSON(map[string]interface{}{"user_id": json.Number("123")}),
Queue: base.DefaultQueueName, Queue: base.DefaultQueueName,
UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.KV(map[string]interface{}{"user_id": 123})), UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.JSON(map[string]interface{}{"user_id": 123})),
} }
tests := []struct { tests := []struct {
@ -157,7 +157,7 @@ func TestDequeue(t *testing.T) {
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "send_email", Type: "send_email",
Payload: h.KV(map[string]interface{}{"subject": "hello!"}), Payload: h.JSON(map[string]interface{}{"subject": "hello!"}),
Queue: "default", Queue: "default",
Timeout: 1800, Timeout: 1800,
Deadline: 0, Deadline: 0,
@ -355,7 +355,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "send_email", Type: "send_email",
Payload: h.KV(map[string]interface{}{"subject": "hello!"}), Payload: h.JSON(map[string]interface{}{"subject": "hello!"}),
Queue: "default", Queue: "default",
Timeout: 1800, Timeout: 1800,
Deadline: 0, Deadline: 0,
@ -767,7 +767,7 @@ func TestRequeue(t *testing.T) {
func TestSchedule(t *testing.T) { func TestSchedule(t *testing.T) {
r := setup(t) r := setup(t)
defer r.Close() defer r.Close()
msg := h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"subject": "hello"})) msg := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"}))
tests := []struct { tests := []struct {
msg *base.TaskMessage msg *base.TaskMessage
processAt time.Time processAt time.Time
@ -808,9 +808,9 @@ func TestScheduleUnique(t *testing.T) {
m1 := base.TaskMessage{ m1 := base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "email", Type: "email",
Payload: h.KV(map[string]interface{}{"user_id": 123}), Payload: h.JSON(map[string]interface{}{"user_id": 123}),
Queue: base.DefaultQueueName, Queue: base.DefaultQueueName,
UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.KV(map[string]interface{}{"user_id": 123})), UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.JSON(map[string]interface{}{"user_id": 123})),
} }
tests := []struct { tests := []struct {
@ -866,7 +866,7 @@ func TestRetry(t *testing.T) {
t1 := &base.TaskMessage{ t1 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "send_email", Type: "send_email",
Payload: h.KV(map[string]interface{}{"subject": "Hola!"}), Payload: h.JSON(map[string]interface{}{"subject": "Hola!"}),
Retried: 10, Retried: 10,
Timeout: 1800, Timeout: 1800,
Queue: "default", Queue: "default",
@ -874,7 +874,7 @@ func TestRetry(t *testing.T) {
t2 := &base.TaskMessage{ t2 := &base.TaskMessage{
ID: uuid.New(), ID: uuid.New(),
Type: "gen_thumbnail", Type: "gen_thumbnail",
Payload: h.KV(map[string]interface{}{"path": "some/path/to/image.jpg"}), Payload: h.JSON(map[string]interface{}{"path": "some/path/to/image.jpg"}),
Timeout: 3000, Timeout: 3000,
Queue: "default", Queue: "default",
} }
@ -1530,8 +1530,8 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
pid = 4242 pid = 4242
serverID = "server123" serverID = "server123"
msg1 = h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"user_id": "123"})) msg1 = h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"user_id": "123"}))
msg2 = h.NewTaskMessage("gen_thumbnail", h.KV(map[string]interface{}{"path": "some/path/to/imgfile"})) msg2 = h.NewTaskMessage("gen_thumbnail", h.JSON(map[string]interface{}{"path": "some/path/to/imgfile"}))
ttl = 5 * time.Second ttl = 5 * time.Second
) )
@ -1642,8 +1642,8 @@ func TestClearServerState(t *testing.T) {
otherPID = 9876 otherPID = 9876
otherServerID = "server987" otherServerID = "server987"
msg1 = h.NewTaskMessage("send_email", h.KV(map[string]interface{}{"user_id": "123"})) msg1 = h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"user_id": "123"}))
msg2 = h.NewTaskMessage("gen_thumbnail", h.KV(map[string]interface{}{"path": "some/path/to/imgfile"})) msg2 = h.NewTaskMessage("gen_thumbnail", h.JSON(map[string]interface{}{"path": "some/path/to/imgfile"}))
ttl = 5 * time.Second ttl = 5 * time.Second
) )

View File

@ -228,7 +228,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
r := setup(t) r := setup(t)
rdbClient := rdb.NewRDB(r) rdbClient := rdb.NewRDB(r)
m1 := h.NewTaskMessage("large_number", h.KV(map[string]interface{}{"data": 111111111111111111})) m1 := h.NewTaskMessage("large_number", h.JSON(map[string]interface{}{"data": 111111111111111111}))
t1 := NewTask(m1.Type, m1.Payload) t1 := NewTask(m1.Type, m1.Payload)
tests := []struct { tests := []struct {
@ -615,7 +615,7 @@ func TestProcessorPerform(t *testing.T) {
handler: func(ctx context.Context, t *Task) error { handler: func(ctx context.Context, t *Task) error {
return nil return nil
}, },
task: NewTask("gen_thumbnail", h.KV(map[string]interface{}{"src": "some/img/path"})), task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})),
wantErr: false, wantErr: false,
}, },
{ {
@ -623,7 +623,7 @@ func TestProcessorPerform(t *testing.T) {
handler: func(ctx context.Context, t *Task) error { handler: func(ctx context.Context, t *Task) error {
return fmt.Errorf("something went wrong") return fmt.Errorf("something went wrong")
}, },
task: NewTask("gen_thumbnail", h.KV(map[string]interface{}{"src": "some/img/path"})), task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})),
wantErr: true, wantErr: true,
}, },
{ {
@ -631,7 +631,7 @@ func TestProcessorPerform(t *testing.T) {
handler: func(ctx context.Context, t *Task) error { handler: func(ctx context.Context, t *Task) error {
panic("something went terribly wrong") panic("something went terribly wrong")
}, },
task: NewTask("gen_thumbnail", h.KV(map[string]interface{}{"src": "some/img/path"})), task: NewTask("gen_thumbnail", h.JSON(map[string]interface{}{"src": "some/img/path"})),
wantErr: true, wantErr: true,
}, },
} }

View File

@ -40,12 +40,12 @@ func TestServer(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
_, err = c.Enqueue(NewTask("send_email", asynqtest.KV(map[string]interface{}{"recipient_id": 123}))) _, err = c.Enqueue(NewTask("send_email", asynqtest.JSON(map[string]interface{}{"recipient_id": 123})))
if err != nil { if err != nil {
t.Errorf("could not enqueue a task: %v", err) t.Errorf("could not enqueue a task: %v", err)
} }
_, err = c.Enqueue(NewTask("send_email", asynqtest.KV(map[string]interface{}{"recipient_id": 456})), ProcessIn(1*time.Hour)) _, err = c.Enqueue(NewTask("send_email", asynqtest.JSON(map[string]interface{}{"recipient_id": 456})), ProcessIn(1*time.Hour))
if err != nil { if err != nil {
t.Errorf("could not enqueue a task: %v", err) t.Errorf("could not enqueue a task: %v", err)
} }