2021-04-14 20:21:47 +03:00
< img src = "https://user-images.githubusercontent.com/11155743/114697792-ffbfa580-9d26-11eb-8e5b-33bef69476dc.png" alt = "Asynq logo" width = "360px" / >
# Simple, reliable & efficient distributed task queue in Go
2020-01-05 09:06:23 -08:00
2020-01-20 15:17:41 -08:00
[](https://godoc.org/github.com/hibiken/asynq)
2021-01-14 06:26:01 -08:00
[](https://goreportcard.com/report/github.com/hibiken/asynq)
2021-04-14 20:21:47 +03:00

2021-01-14 06:26:01 -08:00
[](https://opensource.org/licenses/MIT)
2020-01-20 15:17:41 -08:00
[](https://gitter.im/go-asynq/community)
2019-11-30 09:38:46 -08:00
2021-04-14 20:21:47 +03:00
Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by [Redis ](https://redis.io/ ) and is designed to be scalable yet easy to get started.
2020-04-11 16:33:35 -07:00
Highlevel overview of how Asynq works:
2021-03-20 13:42:13 -07:00
- Client puts tasks on a queue
- Server pulls tasks off queues and starts a worker goroutine for each task
2020-04-20 07:39:52 -07:00
- Tasks are processed concurrently by multiple workers
2020-04-11 16:33:35 -07:00
2021-04-14 20:21:47 +03:00
Task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.
2020-04-11 16:33:35 -07:00
2021-04-14 20:21:47 +03:00
**Example use case**
2019-11-30 09:38:46 -08:00
2021-04-28 20:25:34 +03:00

2020-04-11 09:26:51 -07:00
2020-03-28 08:37:50 -07:00
## Features
2020-04-20 07:39:52 -07:00
- Guaranteed [at least one execution ](https://www.cloudcomputingpatterns.org/at_least_once_delivery/ ) of a task
2020-03-28 08:37:50 -07:00
- Scheduling of tasks
2020-04-20 07:39:52 -07:00
- [Retries ](https://github.com/hibiken/asynq/wiki/Task-Retry ) of failed tasks
2020-07-05 16:55:33 -07:00
- Automatic recovery of tasks in the event of a worker crash
2021-07-29 11:20:51 -03:00
- [Weighted priority queues ](https://github.com/hibiken/asynq/wiki/Queue-Priority#weighted-priority )
- [Strict priority queues ](https://github.com/hibiken/asynq/wiki/Queue-Priority#strict-priority )
2020-03-28 08:37:50 -07:00
- Low latency to add a task since writes are fast in Redis
2020-04-20 07:39:52 -07:00
- De-duplication of tasks using [unique option ](https://github.com/hibiken/asynq/wiki/Unique-Tasks )
2020-05-03 16:47:55 -07:00
- Allow [timeout and deadline per task ](https://github.com/hibiken/asynq/wiki/Task-Timeout-and-Cancelation )
2022-04-11 17:07:24 -07:00
- Allow [aggregating group of tasks ](https://github.com/hibiken/asynq/wiki/Task-aggregation ) to batch multiple successive operations
2020-05-03 16:47:55 -07:00
- [Flexible handler interface with support for middlewares ](https://github.com/hibiken/asynq/wiki/Handler-Deep-Dive )
2020-06-08 06:15:45 -07:00
- [Ability to pause queue ](/tools/asynq/README.md#pause ) to stop processing tasks from the queue
2020-10-12 06:47:43 -07:00
- [Periodic Tasks ](https://github.com/hibiken/asynq/wiki/Periodic-Tasks )
2020-09-06 08:03:30 -07:00
- [Support Redis Cluster ](https://github.com/hibiken/asynq/wiki/Redis-Cluster ) for automatic sharding and high availability
- [Support Redis Sentinels ](https://github.com/hibiken/asynq/wiki/Automatic-Failover ) for high availability
2021-12-20 05:51:51 -08:00
- Integration with [Prometheus ](https://prometheus.io/ ) to collect and visualize queue metrics
2021-01-31 22:27:59 -08:00
- [Web UI ](#web-ui ) to inspect and remote-control queues and tasks
2020-04-20 07:39:52 -07:00
- [CLI ](#command-line-tool ) to inspect and remote-control queues and tasks
2020-03-28 08:37:50 -07:00
2021-04-14 20:21:47 +03:00
## Stability and Compatibility
**Status**: The library is currently undergoing **heavy development** with frequent, breaking API changes.
2023-01-05 15:23:05 +08:00
> ☝️ **Important Note**: Current major version is zero (`v0.x.x`) to accommodate rapid development and fast iteration while getting early feedback from users (_feedback on APIs are appreciated!_). The public API could change without a major version update before `v1.0.0` release.
2021-04-14 20:21:47 +03:00
2023-07-07 21:00:05 -07:00
## Sponsoring
If you are using this package in production, **please consider sponsoring the project to show your support!**
2020-02-05 21:58:05 -08:00
## Quickstart
2020-01-20 15:17:41 -08:00
2023-12-07 10:34:48 +03:00
Make sure you have Go installed ([download ](https://golang.org/dl/ )). Latest two Go versions are supported (See https://go.dev/dl).
2021-04-14 20:21:47 +03:00
Initialize your project by creating a folder and then running `go mod init github.com/your/repo` ([learn more ](https://blog.golang.org/using-go-modules )) inside the folder. Then install Asynq library with the [`go get` ](https://golang.org/cmd/go/#hdr-Add_dependencies_to_current_module_and_install_them ) command:
2020-01-23 06:33:34 -08:00
```sh
2021-04-14 20:21:47 +03:00
go get -u github.com/hibiken/asynq
2020-01-23 06:33:34 -08:00
```
2021-06-30 06:26:14 -07:00
Make sure you're running a Redis server locally or from a [Docker ](https://hub.docker.com/_/redis ) container. Version `4.0` or higher is required.
2021-04-14 20:21:47 +03:00
2020-03-23 17:35:17 +00:00
Next, write a package that encapsulates task creation and task handling.
2020-01-14 21:19:06 -08:00
```go
2020-03-14 15:51:23 -07:00
package tasks
import (
2021-12-27 14:40:10 +01:00
"context"
"encoding/json"
2020-03-14 15:51:23 -07:00
"fmt"
2021-12-27 14:40:10 +01:00
"log"
"time"
2020-03-14 15:51:23 -07:00
"github.com/hibiken/asynq"
)
2020-04-20 07:39:52 -07:00
// A list of task types.
2020-03-14 15:51:23 -07:00
const (
2020-09-06 08:03:30 -07:00
TypeEmailDelivery = "email:deliver"
TypeImageResize = "image:resize"
2020-03-14 15:51:23 -07:00
)
2021-03-20 13:42:13 -07:00
type EmailDeliveryPayload struct {
UserID int
TemplateID string
}
type ImageResizePayload struct {
SourceURL string
}
2020-05-16 10:44:39 -07:00
//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------
2020-03-14 15:51:23 -07:00
2021-03-20 13:42:13 -07:00
func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) {
2021-07-18 09:24:57 -07:00
payload, err := json.Marshal(EmailDeliveryPayload{UserID: userID, TemplateID: tmplID})
2021-03-20 13:42:13 -07:00
if err != nil {
return nil, err
}
2021-06-23 05:37:35 -07:00
return asynq.NewTask(TypeEmailDelivery, payload), nil
2020-03-14 15:51:23 -07:00
}
2021-03-20 13:42:13 -07:00
func NewImageResizeTask(src string) (*asynq.Task, error) {
2021-06-23 05:37:35 -07:00
payload, err := json.Marshal(ImageResizePayload{SourceURL: src})
2021-03-20 13:42:13 -07:00
if err != nil {
return nil, err
}
2021-11-06 06:38:36 -07:00
// task options can be passed to NewTask, which can be overridden at enqueue time.
return asynq.NewTask(TypeImageResize, payload, asynq.MaxRetry(5), asynq.Timeout(20 * time.Minute)), nil
2020-03-14 15:51:23 -07:00
}
2020-05-16 10:44:39 -07:00
//---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface.
2020-09-06 08:03:30 -07:00
//
// Handler doesn't need to be a function. You can define a type
2020-05-16 10:44:39 -07:00
// that satisfies asynq.Handler interface. See examples below.
//---------------------------------------------------------------
2020-03-14 15:51:23 -07:00
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
2021-03-20 13:42:13 -07:00
var p EmailDeliveryPayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
2020-03-14 15:51:23 -07:00
}
2021-06-30 06:26:14 -07:00
log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID)
2020-09-06 08:03:30 -07:00
// Email delivery code ...
2020-03-14 15:51:23 -07:00
return nil
}
2020-01-14 21:19:06 -08:00
2020-05-16 10:44:39 -07:00
// ImageProcessor implements asynq.Handler interface.
2020-11-25 06:11:00 -08:00
type ImageProcessor struct {
2020-04-20 07:39:52 -07:00
// ... fields for struct
}
2021-07-18 09:24:57 -07:00
func (processor *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
2021-03-20 13:42:13 -07:00
var p ImageResizePayload
if err := json.Unmarshal(t.Payload(), &p); err != nil {
return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
2020-03-14 15:51:23 -07:00
}
2021-06-30 06:26:14 -07:00
log.Printf("Resizing image: src=%s", p.SourceURL)
2020-09-06 08:03:30 -07:00
// Image resizing code ...
2020-03-14 15:51:23 -07:00
return nil
}
2020-04-20 07:39:52 -07:00
func NewImageProcessor() *ImageProcessor {
2021-07-18 09:24:57 -07:00
return & ImageProcessor{}
2020-04-20 07:39:52 -07:00
}
2020-03-14 15:51:23 -07:00
```
2021-06-30 06:26:14 -07:00
In your application code, import the above package and use [`Client` ](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Client ) to put tasks on queues.
2020-03-14 15:51:23 -07:00
```go
package main
import (
2020-12-17 23:05:16 +09:00
"log"
2020-03-14 15:51:23 -07:00
"time"
"github.com/hibiken/asynq"
"your/app/package/tasks"
)
const redisAddr = "127.0.0.1:6379"
func main() {
2021-06-23 05:37:35 -07:00
client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
defer client.Close()
2019-11-30 09:38:46 -08:00
2020-05-16 10:44:39 -07:00
// ------------------------------------------------------
2020-03-13 14:50:03 -07:00
// Example 1: Enqueue task to be processed immediately.
2020-04-20 07:39:52 -07:00
// Use (*Client).Enqueue method.
2020-05-16 10:44:39 -07:00
// ------------------------------------------------------
2020-03-13 14:50:03 -07:00
2021-06-23 05:37:35 -07:00
task, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
2021-03-20 13:42:13 -07:00
if err != nil {
log.Fatalf("could not create task: %v", err)
}
2021-06-23 05:37:35 -07:00
info, err := client.Enqueue(task)
2020-03-13 14:50:03 -07:00
if err != nil {
2021-03-20 13:42:13 -07:00
log.Fatalf("could not enqueue task: %v", err)
2020-03-14 15:51:23 -07:00
}
2021-06-30 06:26:14 -07:00
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
2019-11-30 09:38:46 -08:00
2020-01-23 06:33:34 -08:00
2020-05-16 10:44:39 -07:00
// ------------------------------------------------------------
2020-03-13 14:50:03 -07:00
// Example 2: Schedule task to be processed in the future.
2020-09-06 08:03:30 -07:00
// Use ProcessIn or ProcessAt option.
2020-05-16 10:44:39 -07:00
// ------------------------------------------------------------
2020-01-23 06:33:34 -08:00
2021-06-23 05:37:35 -07:00
info, err = client.Enqueue(task, asynq.ProcessIn(24*time.Hour))
2020-03-13 14:50:03 -07:00
if err != nil {
2021-03-20 13:42:13 -07:00
log.Fatalf("could not schedule task: %v", err)
2020-03-13 14:50:03 -07:00
}
2021-06-30 06:26:14 -07:00
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
2020-02-23 15:40:04 -08:00
2020-05-16 10:44:39 -07:00
// ----------------------------------------------------------------------------
2020-09-06 08:03:30 -07:00
// Example 3: Set other options to tune task processing behavior.
2020-04-20 07:39:52 -07:00
// Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
2020-05-16 10:44:39 -07:00
// ----------------------------------------------------------------------------
2020-03-13 14:50:03 -07:00
2021-06-23 05:37:35 -07:00
task, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg")
2021-03-20 13:42:13 -07:00
if err != nil {
log.Fatalf("could not create task: %v", err)
}
2021-11-06 06:38:36 -07:00
info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3 * time.Minute))
2020-03-13 14:50:03 -07:00
if err != nil {
2021-08-28 02:17:03 +04:30
log.Fatalf("could not enqueue task: %v", err)
2020-03-13 14:50:03 -07:00
}
2021-06-30 06:26:14 -07:00
log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
2020-01-23 06:33:34 -08:00
}
```
2021-04-14 20:21:47 +03:00
Next, start a worker server to process these tasks in the background. To start the background workers, use [`Server` ](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Server ) and provide your [`Handler` ](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler ) to process the tasks.
2019-11-30 09:38:46 -08:00
2021-04-14 20:21:47 +03:00
You can optionally use [`ServeMux` ](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux ) to create a handler, just as you would with [`net/http` ](https://golang.org/pkg/net/http/ ) Handler.
2020-03-01 11:08:00 -08:00
```go
2020-03-14 15:51:23 -07:00
package main
2020-03-01 11:08:00 -08:00
2020-03-14 15:51:23 -07:00
import (
2020-04-26 07:48:38 -07:00
"log"
2020-03-14 15:51:23 -07:00
"github.com/hibiken/asynq"
"your/app/package/tasks"
)
const redisAddr = "127.0.0.1:6379"
2020-03-01 11:08:00 -08:00
2019-11-30 09:38:46 -08:00
func main() {
2021-06-23 05:37:35 -07:00
srv := asynq.NewServer(
2021-08-28 02:17:03 +04:30
asynq.RedisClientOpt{Addr: redisAddr},
2021-06-23 05:37:35 -07:00
asynq.Config{
// Specify how many concurrent workers to use
Concurrency: 10,
// Optionally specify multiple queues with different priority.
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
// See the godoc for other configuration options
2020-01-26 19:58:48 -08:00
},
2021-06-23 05:37:35 -07:00
)
2019-11-30 09:38:46 -08:00
2020-03-02 06:40:53 -08:00
// mux maps a type to a handler
2020-03-01 11:08:00 -08:00
mux := asynq.NewServeMux()
2020-09-06 08:03:30 -07:00
mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
2020-03-01 11:08:00 -08:00
// ...register other handlers...
2019-12-27 16:26:11 -08:00
2020-04-13 08:14:55 -07:00
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)
}
2020-03-01 11:08:00 -08:00
}
2019-12-27 16:26:11 -08:00
```
2021-04-14 20:21:47 +03:00
For a more detailed walk-through of the library, see our [Getting Started ](https://github.com/hibiken/asynq/wiki/Getting-Started ) guide.
2020-01-23 06:33:34 -08:00
2021-04-14 20:21:47 +03:00
To learn more about `asynq` features and APIs, see the package [godoc ](https://godoc.org/github.com/hibiken/asynq ).
2020-01-20 15:17:41 -08:00
2021-01-31 22:27:59 -08:00
## Web UI
[Asynqmon ](https://github.com/hibiken/asynqmon ) is a web based tool for monitoring and administrating Asynq queues and tasks.
2021-04-14 20:21:47 +03:00
Here's a few screenshots of the Web UI:
2021-01-31 22:27:59 -08:00
2021-03-12 16:23:08 -08:00
**Queues view**
2021-01-31 22:27:59 -08:00
2021-04-14 20:21:47 +03:00

2020-01-14 21:19:06 -08:00
2021-03-12 16:23:08 -08:00
**Tasks view**
2020-01-18 20:31:22 -08:00
2021-04-14 20:21:47 +03:00

2020-03-02 06:40:53 -08:00
2021-12-20 05:51:51 -08:00
**Metrics view**
< img width = "1532" alt = "Screen Shot 2021-12-19 at 4 37 19 PM" src = "https://user-images.githubusercontent.com/10953044/146777420-cae6c476-bac6-469c-acce-b2f6584e8707.png" >
2021-04-14 20:21:47 +03:00
**Settings and adaptive dark mode**
2020-03-02 06:40:53 -08:00
2021-04-14 20:21:47 +03:00

2020-03-02 06:40:53 -08:00
2021-03-12 16:23:08 -08:00
For details on how to use the tool, refer to the tool's [README ](https://github.com/hibiken/asynqmon#readme ).
2020-03-02 06:40:53 -08:00
2021-04-14 20:21:47 +03:00
## Command Line Tool
2020-03-02 06:40:53 -08:00
2021-04-14 20:21:47 +03:00
Asynq ships with a command line tool to inspect the state of queues and tasks.
2020-03-02 06:40:53 -08:00
To install the CLI tool, run the following command:
2020-01-18 20:31:22 -08:00
2020-02-07 06:45:36 -08:00
```sh
2024-01-29 07:46:47 +01:00
go install github.com/hibiken/asynq/tools/asynq@latest
2020-02-07 06:45:36 -08:00
```
2020-01-18 20:31:22 -08:00
2022-06-03 04:14:45 -07:00
Here's an example of running the `asynq dash` command:
2020-03-02 06:40:53 -08:00
2022-06-03 04:14:45 -07:00

2020-01-14 21:19:06 -08:00
2021-04-14 20:21:47 +03:00
For details on how to use the tool, refer to the tool's [README ](/tools/asynq/README.md ).
2020-02-08 09:34:14 -08:00
2021-04-14 20:21:47 +03:00
## Contributing
2020-02-08 09:34:14 -08:00
2021-04-14 20:21:47 +03:00
We are open to, and grateful for, any contributions (GitHub issues/PRs, feedback on [Gitter channel ](https://gitter.im/go-asynq/community ), etc) made by the community.
2020-01-05 09:06:23 -08:00
2021-04-14 20:21:47 +03:00
Please see the [Contribution Guide ](/CONTRIBUTING.md ) before contributing.
2020-01-05 09:06:23 -08:00
2019-12-27 16:26:11 -08:00
## License
2021-04-14 20:21:47 +03:00
Copyright (c) 2019-present [Ken Hibino ](https://github.com/hibiken ) and [Contributors ](https://github.com/hibiken/asynq/graphs/contributors ). `Asynq` is free and open-source software licensed under the [MIT License ](https://github.com/hibiken/asynq/blob/master/LICENSE ). Official logo was created by [Vic Shóstak ](https://github.com/koddr ) and distributed under [Creative Commons ](https://creativecommons.org/publicdomain/zero/1.0/ ) license (CC0 1.0 Universal).