diff --git a/README.md b/README.md index 26672b9..b588f01 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ Highlevel overview of how Asynq works: - Client puts task on a queue - Server pulls task off queues and starts a worker goroutine for each task -- Workers process tasks concurrently +- Tasks are processed concurrently by multiple workers Task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling. @@ -24,24 +24,24 @@ A system can consist of multiple worker servers and brokers, giving way to high ## Stability and Compatibility -**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users. The public API could change without a major version update before v1.0.0 release. +**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users (Feedback on APIs are appreciated!). The public API could change without a major version update before v1.0.0 release. **Status**: The library is currently undergoing heavy development with frequent, breaking API changes. ## Features -- Guaranteed at least one execution of a task +- Guaranteed [at least one execution](https://www.cloudcomputingpatterns.org/at_least_once_delivery/) of a task - Scheduling of tasks - Durability since tasks are written to Redis -- Retries of failed tasks -- Concurrency management via configuration -- Weighted priority queues -- Strict priority queues +- [Retries](https://github.com/hibiken/asynq/wiki/Task-Retry) of failed tasks +- [Weighted priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#weighted-priority-queues) +- [Strict priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#strict-priority-queues) - Low latency to add a task since writes are fast in Redis -- De-duplication of tasks using unique option +- De-duplication of tasks using [unique option](https://github.com/hibiken/asynq/wiki/Unique-Tasks) - Allow timeout and deadline per task - Flexible handler interface with support for middlewares -- CLI to inspect and remote-control queues and tasks +- [Support Redis Sentinels](https://github.com/hibiken/asynq/wiki/Automatic-Failover) for HA +- [CLI](#command-line-tool) to inspect and remote-control queues and tasks ## Quickstart @@ -62,13 +62,15 @@ import ( "github.com/hibiken/asynq" ) -// A list of background task types. +// A list of task types. const ( EmailDelivery = "email:deliver" ImageProcessing = "image:process" ) +//-------------------------------------------- // Write function NewXXXTask to create a task. +//-------------------------------------------- func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task { payload := map[string]interface{}{"user_id": userID, "template_id": tmplID} @@ -80,8 +82,13 @@ func NewImageProcessingTask(src, dst string) *asynq.Task { return asynq.NewTask(ImageProcessing, payload) } +//------------------------------------------------------------- // Write function HandleXXXTask to handle the given task. // NOTE: It satisfies the asynq.HandlerFunc interface. +// +// Handler doesn't need to be a function. You can define a type +// that satisfies asynq.Handler interface. See example below. +//------------------------------------------------------------- func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error { userID, err := t.Payload.GetInt("user_id") @@ -97,7 +104,12 @@ func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error { return nil } -func HandleImageProcessingTask(ctx context.Context, t *asynq.Task) error { +type ImageProcesser struct { + // ... fields for struct +} + +// ImageProcessor implements asynq.Handler. +func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error { src, err := t.Payload.GetString("src") if err != nil { return err @@ -110,6 +122,10 @@ func HandleImageProcessingTask(ctx context.Context, t *asynq.Task) error { // Image processing logic ... return nil } + +func NewImageProcessor() *ImageProcessor { + // ... return an instance +} ``` In your web application code, import the above package and use [`Client`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Client) to put tasks on the queue. @@ -132,7 +148,10 @@ func main() { r := asynq.RedisClientOpt{Addr: redisAddr} c := asynq.NewClient(r) + // ---------------------------------------------------- // Example 1: Enqueue task to be processed immediately. + // Use (*Client).Enqueue method. + // ---------------------------------------------------- t := tasks.NewEmailDeliveryTask(42, "some:template:id") err := c.Enqueue(t) @@ -141,7 +160,10 @@ func main() { } + // ---------------------------------------------------------- // Example 2: Schedule task to be processed in the future. + // Use (*Client).EnqueueIn or (*Client).EnqueueAt. + // ---------------------------------------------------------- t = tasks.NewEmailDeliveryTask(42, "other:template:id") err = c.EnqueueIn(24*time.Hour, t) @@ -150,8 +172,10 @@ func main() { } + // -------------------------------------------------------------------------- // Example 3: Pass options to tune task processing behavior. - // Options include MaxRetry, Queue, Timeout, Deadline, Unique etc. + // Options include MaxRetry, Queue, Timeout, Deadline, Unique etc. + // -------------------------------------------------------------------------- t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url") err = c.Enqueue(t, asynq.MaxRetry(10), asynq.Queue("critical"), asynq.Timeout(time.Minute)) @@ -161,7 +185,7 @@ func main() { } ``` -Next, create a work server binary to process these tasks in the background. +Next, create a worker server to process these tasks in the background. To start the background workers, use [`Server`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Server) and provide your [`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler) to process the tasks. You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux) to create a handler, just as you would with [`"net/http"`](https://golang.org/pkg/net/http/) Handler. @@ -194,7 +218,7 @@ func main() { // mux maps a type to a handler mux := asynq.NewServeMux() mux.HandleFunc(tasks.EmailDelivery, tasks.HandleEmailDeliveryTask) - mux.HandleFunc(tasks.ImageProcessing, tasks.HandleImageProcessingTask) + mux.Handle(tasks.ImageProcessing, tasks.NewImageProcessor()) // ...register other handlers... if err := srv.Run(mux); err != nil {