mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
[ci skip] Update readme
This commit is contained in:
parent
f43c51ce8b
commit
b02e4e6b09
150
README.md
150
README.md
@ -1,6 +1,10 @@
|
|||||||
# Asynq
|
# Asynq
|
||||||
|
|
||||||
[![Build Status](https://travis-ci.com/hibiken/asynq.svg?token=paqzfpSkF4p23s5Ux39b&branch=master)](https://travis-ci.com/hibiken/asynq) [![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT) [![Go Report Card](https://goreportcard.com/badge/github.com/hibiken/asynq)](https://goreportcard.com/report/github.com/hibiken/asynq) [![GoDoc](https://godoc.org/github.com/hibiken/asynq?status.svg)](https://godoc.org/github.com/hibiken/asynq) [![Gitter chat](https://badges.gitter.im/go-asynq/gitter.svg)](https://gitter.im/go-asynq/community)
|
[![Build Status](https://travis-ci.com/hibiken/asynq.svg?token=paqzfpSkF4p23s5Ux39b&branch=master)](https://travis-ci.com/hibiken/asynq)
|
||||||
|
[![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT)
|
||||||
|
[![Go Report Card](https://goreportcard.com/badge/github.com/hibiken/asynq)](https://goreportcard.com/report/github.com/hibiken/asynq)
|
||||||
|
[![GoDoc](https://godoc.org/github.com/hibiken/asynq?status.svg)](https://godoc.org/github.com/hibiken/asynq)
|
||||||
|
[![Gitter chat](https://badges.gitter.im/go-asynq/gitter.svg)](https://gitter.im/go-asynq/community)
|
||||||
|
|
||||||
Simple and efficent asynchronous task processing library in Go.
|
Simple and efficent asynchronous task processing library in Go.
|
||||||
|
|
||||||
@ -29,12 +33,12 @@ Asynq provides:
|
|||||||
- Clear separation of task producer and consumer
|
- Clear separation of task producer and consumer
|
||||||
- Ability to schedule task processing in the future
|
- Ability to schedule task processing in the future
|
||||||
- Automatic retry of failed tasks with exponential backoff
|
- Automatic retry of failed tasks with exponential backoff
|
||||||
- Automatic failover using Redis sentinels
|
- [Automatic failover](https://github.com/hibiken/asynq/wiki/Automatic-Failover) using Redis sentinels
|
||||||
- Ability to configure max retry count per task
|
- [Ability to configure](https://github.com/hibiken/asynq/wiki/Task-Retry) max retry count per task
|
||||||
- Ability to configure max number of worker goroutines to process tasks
|
- Ability to configure max number of worker goroutines to process tasks
|
||||||
- Support for priority queues
|
- Support for [priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues)
|
||||||
- Unix signal handling to gracefully shutdown background processing
|
- [Unix signal handling](https://github.com/hibiken/asynq/wiki/Signals) to gracefully shutdown background processing
|
||||||
- CLI tool to query and mutate queues state for mointoring and administrative purposes
|
- [CLI tool](/tools/asynqmon/README.md) to query and mutate queues state for mointoring and administrative purposes
|
||||||
|
|
||||||
## Requirements
|
## Requirements
|
||||||
|
|
||||||
@ -45,22 +49,35 @@ Asynq provides:
|
|||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
|
To install both `asynq` library and `asynqmon` CLI tool, run the following command:
|
||||||
|
|
||||||
```
|
```
|
||||||
go get -u github.com/hibiken/asynq
|
go get -u github.com/hibiken/asynq
|
||||||
|
go get -u github.com/hibiken/asynq/tools/asynqmon
|
||||||
```
|
```
|
||||||
|
|
||||||
## Getting Started
|
## Getting Started
|
||||||
|
|
||||||
1. Import `asynq` in your file.
|
In this quick tour of `asynq`, we are going to create two programs.
|
||||||
|
|
||||||
|
- `producer.go` will create and schedule tasks to be processed asynchronously by the consumer.
|
||||||
|
- `consumer.go` will process the tasks created by the producer.
|
||||||
|
|
||||||
|
**This guide assumes that you are running a Redis server at `localhost:6379`**.
|
||||||
|
Before we start, make sure you have Redis installed and running.
|
||||||
|
|
||||||
|
1. Import `asynq` in both files.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
import "github.com/hibiken/asynq"
|
import "github.com/hibiken/asynq"
|
||||||
```
|
```
|
||||||
|
|
||||||
2. Asynq uses redis as a message broker.
|
2. Asynq uses Redis as a message broker.
|
||||||
Use one of `RedisConnOpt` types to specify how to connect to Redis.
|
Use one of `RedisConnOpt` types to specify how to connect to Redis.
|
||||||
|
We are going to use `RedisClientOpt` here.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
|
// both in producer.go and consumer.go
|
||||||
var redis = &asynq.RedisClientOpt{
|
var redis = &asynq.RedisClientOpt{
|
||||||
Addr: "localhost:6379",
|
Addr: "localhost:6379",
|
||||||
// Omit if no password is required
|
// Omit if no password is required
|
||||||
@ -71,9 +88,10 @@ var redis = &asynq.RedisClientOpt{
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
3. Create a `Client` instance to create and schedule tasks.
|
3. In `producer.go`, create a `Client` instance to create and schedule tasks.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
|
// producer.go
|
||||||
func main() {
|
func main() {
|
||||||
client := asynq.NewClient(redis)
|
client := asynq.NewClient(redis)
|
||||||
|
|
||||||
@ -88,32 +106,31 @@ func main() {
|
|||||||
|
|
||||||
// Process the task immediately.
|
// Process the task immediately.
|
||||||
err := client.Schedule(t1, time.Now())
|
err := client.Schedule(t1, time.Now())
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
// Process the task 24 hours later.
|
// Process the task 24 hours later.
|
||||||
err = client.Schedule(t2, time.Now().Add(24 * time.Hour))
|
err = client.Schedule(t2, time.Now().Add(24 * time.Hour))
|
||||||
|
if err != nil {
|
||||||
// Specify the max number of retry (default: 25)
|
log.Fatal(err)
|
||||||
err = client.Schedule(t1, time.Now(), asynq.MaxRetry(1))
|
}
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
4. Create a `Background` instance to process tasks.
|
4. In `consumer.go`, create a `Background` instance to process tasks.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
|
// consumer.go
|
||||||
func main() {
|
func main() {
|
||||||
bg := asynq.NewBackground(redis, &asynq.Config{
|
bg := asynq.NewBackground(redis, &asynq.Config{
|
||||||
Concurrency: 10,
|
Concurrency: 10,
|
||||||
})
|
})
|
||||||
|
|
||||||
// Blocks until signal TERM or INT is received.
|
|
||||||
// For graceful shutdown, send signal TSTP to stop processing more tasks
|
|
||||||
// before sending TERM or INT signal to terminate the process.
|
|
||||||
bg.Run(handler)
|
bg.Run(handler)
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
Note that `Client` and `Background` are intended to be used in separate executable binaries.
|
|
||||||
|
|
||||||
The argument to `(*asynq.Background).Run` is an interface `asynq.Handler` which has one method `ProcessTask`.
|
The argument to `(*asynq.Background).Run` is an interface `asynq.Handler` which has one method `ProcessTask`.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
@ -137,9 +154,14 @@ func handler(t *asynq.Task) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
fmt.Printf("Send Welcome Email to %d\n", id)
|
fmt.Printf("Send Welcome Email to User %d\n", id)
|
||||||
|
|
||||||
// ... handle other types ...
|
case "send_reminder_email":
|
||||||
|
id, err := t.Payload.GetInt("user_id")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fmt.Printf("Send Reminder Email to User %d\n", id)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unexpected task type: %s", t.Type)
|
return fmt.Errorf("unexpected task type: %s", t.Type)
|
||||||
@ -157,6 +179,94 @@ func main() {
|
|||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
We could kep adding cases to this handler function, but in a realistic application, it's convenient to define the logic for each case in a separate function.
|
||||||
|
|
||||||
|
To refactor our code, let's create a simple dispatcher which maps task type to its handler.
|
||||||
|
|
||||||
|
```go
|
||||||
|
// consumer.go
|
||||||
|
|
||||||
|
// Dispatcher is used to dispatch tasks to registered handlers.
|
||||||
|
type Dispatcher struct {
|
||||||
|
mapping map[string]asynq.HandlerFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleFunc registers a task handler
|
||||||
|
func (d *Dispatcher) HandleFunc(taskType string, fn asynq.HandlerFunc) {
|
||||||
|
d.mapping[taskType] = fn
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProcessTask processes a task.
|
||||||
|
//
|
||||||
|
// NOTE: Dispatcher satisfies asynq.Handler interface.
|
||||||
|
func (d *Dispatcher) ProcessTask(task *asynq.Task) error {
|
||||||
|
fn, ok := d.mapping[task.Type]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("no handler registered for %q", task.Type)
|
||||||
|
}
|
||||||
|
return fn(task)
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
d := &Dispatcher{mapping: make(map[string]asynq.HandlerFunc)}
|
||||||
|
d.HandleFunc("send_welcome_email", sendWelcomeEmail)
|
||||||
|
d.HandleFunc("send_reminder_email", sendReminderEmail)
|
||||||
|
|
||||||
|
bg := asynq.NewBackground(redis, &asynq.Config{
|
||||||
|
Concurrency: 10,
|
||||||
|
})
|
||||||
|
bg.Run(d)
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendWelcomeEmail(t *asynq.Task) error {
|
||||||
|
id, err := t.Payload.GetInt("user_id")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fmt.Printf("Send Welcome Email to User %d\n", id)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func sendReminderEmail(t *asynq.Task) error {
|
||||||
|
id, err := t.Payload.GetInt("user_id")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
fmt.Printf("Send Welcome Email to User %d\n", id)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
Now that we have both task producer and consumer, we can run both programs.
|
||||||
|
|
||||||
|
```sh
|
||||||
|
go run consumer.go
|
||||||
|
```
|
||||||
|
|
||||||
|
**Note**: This will not exit until you send a signal to terminate the program. See [Signal Wiki page](https://github.com/hibiken/asynq/wiki/Signals) for best practice on how to safely terminate background processing.
|
||||||
|
|
||||||
|
With our consumer running, also run
|
||||||
|
|
||||||
|
```sh
|
||||||
|
go run producer.go
|
||||||
|
```
|
||||||
|
|
||||||
|
This will create a task and the first task will get processed immediately by the consumer. The second task will be processed 24 hours later.
|
||||||
|
|
||||||
|
Let's use `asynqmon` tool to inspect the tasks.
|
||||||
|
|
||||||
|
```sh
|
||||||
|
asynqmon stats
|
||||||
|
```
|
||||||
|
|
||||||
|
This command will show the number of tasks in each state and stats for the current date as well as redis information.
|
||||||
|
|
||||||
|
To understand the meaning of each state, see [Life of a Task Wiki page](https://github.com/hibiken/asynq/wiki/Life-of-a-Task).
|
||||||
|
|
||||||
|
For in-depth guide on `asynqmon` tool, see the [README](/tools/asynqmon/README.md) for the CLI.
|
||||||
|
|
||||||
|
This was a quick tour of `asynq` basics. To see all of its features such as **[priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues)** and **[custom retry](https://github.com/hibiken/asynq/wiki/Task-Retry)**, see [the Wiki page](https://github.com/hibiken/asynq/wiki).
|
||||||
|
|
||||||
## Monitoring CLI
|
## Monitoring CLI
|
||||||
|
|
||||||
Asynq ships with a CLI tool to inspect the state of queues and tasks.
|
Asynq ships with a CLI tool to inspect the state of queues and tasks.
|
||||||
|
Loading…
Reference in New Issue
Block a user