diff --git a/README.md b/README.md index 5286791..744871a 100644 --- a/README.md +++ b/README.md @@ -12,8 +12,8 @@ Asynq is a Go library for queueing tasks and processing them asynchronously with Highlevel overview of how Asynq works: -- Client puts task on a queue -- Server pulls task off queues and starts a worker goroutine for each task +- Client puts tasks on a queue +- Server pulls tasks off queues and starts a worker goroutine for each task - Tasks are processed concurrently by multiple workers Task queues are used as a mechanism to distribute work across multiple machines. @@ -72,19 +72,36 @@ const ( TypeImageResize = "image:resize" ) +type EmailDeliveryPayload struct { + UserID int + TemplateID string +} + +type ImageResizePayload struct { + SourceURL string +} + //---------------------------------------------- // Write a function NewXXXTask to create a task. // A task consists of a type and a payload. //---------------------------------------------- -func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task { - payload := map[string]interface{}{"user_id": userID, "template_id": tmplID} - return asynq.NewTask(TypeEmailDelivery, payload) +func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) { + payload := EmailDeliveryPayload{UserID: userID, TemplateID: templID} + bytes, err := json.Marshal(payload) + if err != nil { + return nil, err + } + return asynq.NewTask(TypeEmailDelivery, bytes), nil } -func NewImageResizeTask(src string) *asynq.Task { - payload := map[string]interface{}{"src": src} - return asynq.NewTask(TypeImageResize, payload) +func NewImageResizeTask(src string) (*asynq.Task, error) { + payload := ImageResizePayload{SourceURL: src} + bytes, err := json.Marshal(payload) + if err != nil { + return nil, err + } + return asynq.NewTask(TypeImageResize, bytes), nil } //--------------------------------------------------------------- @@ -96,15 +113,11 @@ func NewImageResizeTask(src string) *asynq.Task { //--------------------------------------------------------------- func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error { - userID, err := t.Payload.GetInt("user_id") - if err != nil { - return err + var p EmailDeliveryPayload + if err := json.Unmarshal(t.Payload(), &p); err != nil { + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } - tmplID, err := t.Payload.GetString("template_id") - if err != nil { - return err - } - fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID) + log.Printf("Sending Email to User: user_id = %d, template_id = %s\n", p.UserID, p.TemplateID) // Email delivery code ... return nil } @@ -115,11 +128,11 @@ type ImageProcessor struct { } func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { - src, err := t.Payload.GetString("src") - if err != nil { - return err + var p ImageResizePayload + if err := json.Unmarshal(t.Payload(), &p); err != nil { + return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry) } - fmt.Printf("Resize image: src = %s\n", src) + log.Printf("Resizing image: src = %s\n", p.SourceURL) // Image resizing code ... return nil } @@ -155,10 +168,13 @@ func main() { // Use (*Client).Enqueue method. // ------------------------------------------------------ - t := tasks.NewEmailDeliveryTask(42, "some:template:id") + t, err := tasks.NewEmailDeliveryTask(42, "some:template:id") + if err != nil { + log.Fatalf("could not create task: %v", err) + } res, err := c.Enqueue(t) if err != nil { - log.Fatal("could not enqueue task: %v", err) + log.Fatalf("could not enqueue task: %v", err) } fmt.Printf("Enqueued Result: %+v\n", res) @@ -168,10 +184,9 @@ func main() { // Use ProcessIn or ProcessAt option. // ------------------------------------------------------------ - t = tasks.NewEmailDeliveryTask(42, "other:template:id") res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour)) if err != nil { - log.Fatal("could not schedule task: %v", err) + log.Fatalf("could not schedule task: %v", err) } fmt.Printf("Enqueued Result: %+v\n", res) @@ -183,19 +198,21 @@ func main() { c.SetDefaultOptions(tasks.TypeImageResize, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute)) - t = tasks.NewImageResizeTask("some/blobstore/path") + t, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg") + if err != nil { + log.Fatalf("could not create task: %v", err) + } res, err = c.Enqueue(t) if err != nil { - log.Fatal("could not enqueue task: %v", err) + log.Fatalf("could not enqueue task: %v", err) } fmt.Printf("Enqueued Result: %+v\n", res) // --------------------------------------------------------------------------- // Example 4: Pass options to tune task processing behavior at enqueue time. - // Options passed at enqueue time override default ones, if any. + // Options passed at enqueue time override default ones. // --------------------------------------------------------------------------- - t = tasks.NewImageResizeTask("some/blobstore/path") res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second)) if err != nil { log.Fatal("could not enqueue task: %v", err) diff --git a/doc.go b/doc.go index edd3865..06ede25 100644 --- a/doc.go +++ b/doc.go @@ -11,7 +11,7 @@ specify the connection using one of RedisConnOpt types. redisConnOpt = asynq.RedisClientOpt{ Addr: "127.0.0.1:6379", Password: "xxxxx", - DB: 3, + DB: 2, } The Client is used to enqueue a task. @@ -20,12 +20,16 @@ The Client is used to enqueue a task. client := asynq.NewClient(redisConnOpt) // Task is created with two parameters: its type and payload. - t := asynq.NewTask( - "send_email", - map[string]interface{}{"user_id": 42}) + // Payload data is simply an array of bytes. It can be encoded in JSON, Protocol Buffer, Gob, etc. + b, err := json.Marshal(ExamplePayload{UserID: 42}) + if err != nil { + log.Fatal(err) + } + + task := asynq.NewTask("example", b) // Enqueue the task to be processed immediately. - res, err := client.Enqueue(t) + res, err := client.Enqueue(task) // Schedule the task to be processed after one minute. res, err = client.Enqueue(t, asynq.ProcessIn(1*time.Minute)) @@ -52,10 +56,13 @@ Example of a type that implements the Handler interface. func (h *TaskHandler) ProcessTask(ctx context.Context, task *asynq.Task) error { switch task.Type { - case "send_email": - id, err := task.Payload.GetInt("user_id") - // send email - //... + case "example": + var data ExamplePayload + if err := json.Unmarshal(task.Payload(), &data); err != nil { + return err + } + // perform task with the data + default: return fmt.Errorf("unexpected task type %q", task.Type) }