mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Update readme
This commit is contained in:
parent
5924cdac33
commit
789a9fd711
52
README.md
52
README.md
@ -15,7 +15,7 @@ Highlevel overview of how Asynq works:
|
|||||||
|
|
||||||
- Client puts task on a queue
|
- Client puts task on a queue
|
||||||
- Server pulls task off queues and starts a worker goroutine for each task
|
- Server pulls task off queues and starts a worker goroutine for each task
|
||||||
- Workers process tasks concurrently
|
- Tasks are processed concurrently by multiple workers
|
||||||
|
|
||||||
Task queues are used as a mechanism to distribute work across multiple machines.
|
Task queues are used as a mechanism to distribute work across multiple machines.
|
||||||
A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.
|
A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.
|
||||||
@ -24,24 +24,24 @@ A system can consist of multiple worker servers and brokers, giving way to high
|
|||||||
|
|
||||||
## Stability and Compatibility
|
## Stability and Compatibility
|
||||||
|
|
||||||
**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users. The public API could change without a major version update before v1.0.0 release.
|
**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users (Feedback on APIs are appreciated!). The public API could change without a major version update before v1.0.0 release.
|
||||||
|
|
||||||
**Status**: The library is currently undergoing heavy development with frequent, breaking API changes.
|
**Status**: The library is currently undergoing heavy development with frequent, breaking API changes.
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
- Guaranteed at least one execution of a task
|
- Guaranteed [at least one execution](https://www.cloudcomputingpatterns.org/at_least_once_delivery/) of a task
|
||||||
- Scheduling of tasks
|
- Scheduling of tasks
|
||||||
- Durability since tasks are written to Redis
|
- Durability since tasks are written to Redis
|
||||||
- Retries of failed tasks
|
- [Retries](https://github.com/hibiken/asynq/wiki/Task-Retry) of failed tasks
|
||||||
- Concurrency management via configuration
|
- [Weighted priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#weighted-priority-queues)
|
||||||
- Weighted priority queues
|
- [Strict priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#strict-priority-queues)
|
||||||
- Strict priority queues
|
|
||||||
- Low latency to add a task since writes are fast in Redis
|
- Low latency to add a task since writes are fast in Redis
|
||||||
- De-duplication of tasks using unique option
|
- De-duplication of tasks using [unique option](https://github.com/hibiken/asynq/wiki/Unique-Tasks)
|
||||||
- Allow timeout and deadline per task
|
- Allow timeout and deadline per task
|
||||||
- Flexible handler interface with support for middlewares
|
- Flexible handler interface with support for middlewares
|
||||||
- CLI to inspect and remote-control queues and tasks
|
- [Support Redis Sentinels](https://github.com/hibiken/asynq/wiki/Automatic-Failover) for HA
|
||||||
|
- [CLI](#command-line-tool) to inspect and remote-control queues and tasks
|
||||||
|
|
||||||
## Quickstart
|
## Quickstart
|
||||||
|
|
||||||
@ -62,13 +62,15 @@ import (
|
|||||||
"github.com/hibiken/asynq"
|
"github.com/hibiken/asynq"
|
||||||
)
|
)
|
||||||
|
|
||||||
// A list of background task types.
|
// A list of task types.
|
||||||
const (
|
const (
|
||||||
EmailDelivery = "email:deliver"
|
EmailDelivery = "email:deliver"
|
||||||
ImageProcessing = "image:process"
|
ImageProcessing = "image:process"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
//--------------------------------------------
|
||||||
// Write function NewXXXTask to create a task.
|
// Write function NewXXXTask to create a task.
|
||||||
|
//--------------------------------------------
|
||||||
|
|
||||||
func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task {
|
func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task {
|
||||||
payload := map[string]interface{}{"user_id": userID, "template_id": tmplID}
|
payload := map[string]interface{}{"user_id": userID, "template_id": tmplID}
|
||||||
@ -80,8 +82,13 @@ func NewImageProcessingTask(src, dst string) *asynq.Task {
|
|||||||
return asynq.NewTask(ImageProcessing, payload)
|
return asynq.NewTask(ImageProcessing, payload)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//-------------------------------------------------------------
|
||||||
// Write function HandleXXXTask to handle the given task.
|
// Write function HandleXXXTask to handle the given task.
|
||||||
// NOTE: It satisfies the asynq.HandlerFunc interface.
|
// NOTE: It satisfies the asynq.HandlerFunc interface.
|
||||||
|
//
|
||||||
|
// Handler doesn't need to be a function. You can define a type
|
||||||
|
// that satisfies asynq.Handler interface. See example below.
|
||||||
|
//-------------------------------------------------------------
|
||||||
|
|
||||||
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
|
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
|
||||||
userID, err := t.Payload.GetInt("user_id")
|
userID, err := t.Payload.GetInt("user_id")
|
||||||
@ -97,7 +104,12 @@ func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleImageProcessingTask(ctx context.Context, t *asynq.Task) error {
|
type ImageProcesser struct {
|
||||||
|
// ... fields for struct
|
||||||
|
}
|
||||||
|
|
||||||
|
// ImageProcessor implements asynq.Handler.
|
||||||
|
func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
|
||||||
src, err := t.Payload.GetString("src")
|
src, err := t.Payload.GetString("src")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -110,6 +122,10 @@ func HandleImageProcessingTask(ctx context.Context, t *asynq.Task) error {
|
|||||||
// Image processing logic ...
|
// Image processing logic ...
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewImageProcessor() *ImageProcessor {
|
||||||
|
// ... return an instance
|
||||||
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
In your web application code, import the above package and use [`Client`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Client) to put tasks on the queue.
|
In your web application code, import the above package and use [`Client`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Client) to put tasks on the queue.
|
||||||
@ -132,7 +148,10 @@ func main() {
|
|||||||
r := asynq.RedisClientOpt{Addr: redisAddr}
|
r := asynq.RedisClientOpt{Addr: redisAddr}
|
||||||
c := asynq.NewClient(r)
|
c := asynq.NewClient(r)
|
||||||
|
|
||||||
|
// ----------------------------------------------------
|
||||||
// Example 1: Enqueue task to be processed immediately.
|
// Example 1: Enqueue task to be processed immediately.
|
||||||
|
// Use (*Client).Enqueue method.
|
||||||
|
// ----------------------------------------------------
|
||||||
|
|
||||||
t := tasks.NewEmailDeliveryTask(42, "some:template:id")
|
t := tasks.NewEmailDeliveryTask(42, "some:template:id")
|
||||||
err := c.Enqueue(t)
|
err := c.Enqueue(t)
|
||||||
@ -141,7 +160,10 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// ----------------------------------------------------------
|
||||||
// Example 2: Schedule task to be processed in the future.
|
// Example 2: Schedule task to be processed in the future.
|
||||||
|
// Use (*Client).EnqueueIn or (*Client).EnqueueAt.
|
||||||
|
// ----------------------------------------------------------
|
||||||
|
|
||||||
t = tasks.NewEmailDeliveryTask(42, "other:template:id")
|
t = tasks.NewEmailDeliveryTask(42, "other:template:id")
|
||||||
err = c.EnqueueIn(24*time.Hour, t)
|
err = c.EnqueueIn(24*time.Hour, t)
|
||||||
@ -150,8 +172,10 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// --------------------------------------------------------------------------
|
||||||
// Example 3: Pass options to tune task processing behavior.
|
// Example 3: Pass options to tune task processing behavior.
|
||||||
// Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
|
// Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
|
||||||
|
// --------------------------------------------------------------------------
|
||||||
|
|
||||||
t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url")
|
t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url")
|
||||||
err = c.Enqueue(t, asynq.MaxRetry(10), asynq.Queue("critical"), asynq.Timeout(time.Minute))
|
err = c.Enqueue(t, asynq.MaxRetry(10), asynq.Queue("critical"), asynq.Timeout(time.Minute))
|
||||||
@ -161,7 +185,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
Next, create a work server binary to process these tasks in the background.
|
Next, create a worker server to process these tasks in the background.
|
||||||
To start the background workers, use [`Server`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Server) and provide your [`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler) to process the tasks.
|
To start the background workers, use [`Server`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Server) and provide your [`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler) to process the tasks.
|
||||||
|
|
||||||
You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux) to create a handler, just as you would with [`"net/http"`](https://golang.org/pkg/net/http/) Handler.
|
You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux) to create a handler, just as you would with [`"net/http"`](https://golang.org/pkg/net/http/) Handler.
|
||||||
@ -194,7 +218,7 @@ func main() {
|
|||||||
// mux maps a type to a handler
|
// mux maps a type to a handler
|
||||||
mux := asynq.NewServeMux()
|
mux := asynq.NewServeMux()
|
||||||
mux.HandleFunc(tasks.EmailDelivery, tasks.HandleEmailDeliveryTask)
|
mux.HandleFunc(tasks.EmailDelivery, tasks.HandleEmailDeliveryTask)
|
||||||
mux.HandleFunc(tasks.ImageProcessing, tasks.HandleImageProcessingTask)
|
mux.Handle(tasks.ImageProcessing, tasks.NewImageProcessor())
|
||||||
// ...register other handlers...
|
// ...register other handlers...
|
||||||
|
|
||||||
if err := srv.Run(mux); err != nil {
|
if err := srv.Run(mux); err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user