2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

Update README

This commit is contained in:
Ken Hibino 2020-09-06 08:03:30 -07:00
parent 29e542e591
commit ac3d5b126a
3 changed files with 48 additions and 166 deletions

View File

@ -9,7 +9,7 @@
## Overview ## Overview
Asynq is a Go library for queueing tasks and processing them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry. It should be integrated in your web stack easily. Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started.
Highlevel overview of how Asynq works: Highlevel overview of how Asynq works:
@ -42,7 +42,8 @@ A system can consist of multiple worker servers and brokers, giving way to high
- Allow [timeout and deadline per task](https://github.com/hibiken/asynq/wiki/Task-Timeout-and-Cancelation) - Allow [timeout and deadline per task](https://github.com/hibiken/asynq/wiki/Task-Timeout-and-Cancelation)
- [Flexible handler interface with support for middlewares](https://github.com/hibiken/asynq/wiki/Handler-Deep-Dive) - [Flexible handler interface with support for middlewares](https://github.com/hibiken/asynq/wiki/Handler-Deep-Dive)
- [Ability to pause queue](/tools/asynq/README.md#pause) to stop processing tasks from the queue - [Ability to pause queue](/tools/asynq/README.md#pause) to stop processing tasks from the queue
- [Support Redis Sentinels](https://github.com/hibiken/asynq/wiki/Automatic-Failover) for HA - [Support Redis Cluster](https://github.com/hibiken/asynq/wiki/Redis-Cluster) for automatic sharding and high availability
- [Support Redis Sentinels](https://github.com/hibiken/asynq/wiki/Automatic-Failover) for high availability
- [CLI](#command-line-tool) to inspect and remote-control queues and tasks - [CLI](#command-line-tool) to inspect and remote-control queues and tasks
## Quickstart ## Quickstart
@ -66,8 +67,8 @@ import (
// A list of task types. // A list of task types.
const ( const (
EmailDelivery = "email:deliver" TypeEmailDelivery = "email:deliver"
ImageProcessing = "image:process" TypeImageResize = "image:resize"
) )
//---------------------------------------------- //----------------------------------------------
@ -77,19 +78,19 @@ const (
func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task { func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task {
payload := map[string]interface{}{"user_id": userID, "template_id": tmplID} payload := map[string]interface{}{"user_id": userID, "template_id": tmplID}
return asynq.NewTask(EmailDelivery, payload) return asynq.NewTask(TypeEmailDelivery, payload)
} }
func NewImageProcessingTask(src, dst string) *asynq.Task { func NewImageResizeTask(src string) *asynq.Task {
payload := map[string]interface{}{"src": src, "dst": dst} payload := map[string]interface{}{"src": src}
return asynq.NewTask(ImageProcessing, payload) return asynq.NewTask(TypeImageResize, payload)
} }
//--------------------------------------------------------------- //---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task. // Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface. // Note that it satisfies the asynq.HandlerFunc interface.
// //
// Handler doesn't need to be a function. You can define a type // Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See examples below. // that satisfies asynq.Handler interface. See examples below.
//--------------------------------------------------------------- //---------------------------------------------------------------
@ -103,7 +104,7 @@ func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
return err return err
} }
fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID) fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID)
// Email delivery logic ... // Email delivery code ...
return nil return nil
} }
@ -117,12 +118,8 @@ func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
if err != nil { if err != nil {
return err return err
} }
dst, err := t.Payload.GetString("dst") fmt.Printf("Resize image: src = %s\n", src)
if err != nil { // Image resizing code ...
return err
}
fmt.Printf("Process image: src = %s, dst = %s\n", src, dst)
// Image processing logic ...
return nil return nil
} }
@ -131,10 +128,7 @@ func NewImageProcessor() *ImageProcessor {
} }
``` ```
In your web application code, import the above package and use [`Client`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Client) to put tasks on the queue. In your application code, import the above package and use [`Client`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Client) to put tasks on the queue.
// TODO: This description needs to be updated.
A task will be processed asynchronously by a background worker as soon as the task gets enqueued.
Scheduled tasks will be stored in Redis and will be enqueued at the specified time.
```go ```go
package main package main
@ -168,11 +162,11 @@ func main() {
// ------------------------------------------------------------ // ------------------------------------------------------------
// Example 2: Schedule task to be processed in the future. // Example 2: Schedule task to be processed in the future.
// Use (*Client).EnqueueIn or (*Client).EnqueueAt. // Use ProcessIn or ProcessAt option.
// ------------------------------------------------------------ // ------------------------------------------------------------
t = tasks.NewEmailDeliveryTask(42, "other:template:id") t = tasks.NewEmailDeliveryTask(42, "other:template:id")
res, err = c.EnqueueIn(24*time.Hour, t) res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour))
if err != nil { if err != nil {
log.Fatal("could not schedule task: %v", err) log.Fatal("could not schedule task: %v", err)
} }
@ -180,13 +174,13 @@ func main() {
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// Example 3: Set options to tune task processing behavior. // Example 3: Set other options to tune task processing behavior.
// Options include MaxRetry, Queue, Timeout, Deadline, Unique etc. // Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
c.SetDefaultOptions(tasks.ImageProcessing, asynq.MaxRetry(10), asynq.Timeout(time.Minute)) c.SetDefaultOptions(tasks.ImageProcessing, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute))
t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url") t = tasks.NewImageResizeTask("some/blobstore/path")
res, err = c.Enqueue(t) res, err = c.Enqueue(t)
if err != nil { if err != nil {
log.Fatal("could not enqueue task: %v", err) log.Fatal("could not enqueue task: %v", err)
@ -198,7 +192,7 @@ func main() {
// Options passed at enqueue time override default ones, if any. // Options passed at enqueue time override default ones, if any.
// --------------------------------------------------------------------------- // ---------------------------------------------------------------------------
t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url") t = tasks.NewImageResizeTask("some/blobstore/path")
res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second)) res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
if err != nil { if err != nil {
log.Fatal("could not enqueue task: %v", err) log.Fatal("could not enqueue task: %v", err)
@ -207,7 +201,7 @@ func main() {
} }
``` ```
Next, create a worker server to process these tasks in the background. Next, start a worker server to process these tasks in the background.
To start the background workers, use [`Server`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Server) and provide your [`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler) to process the tasks. To start the background workers, use [`Server`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Server) and provide your [`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler) to process the tasks.
You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux) to create a handler, just as you would with [`"net/http"`](https://golang.org/pkg/net/http/) Handler. You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux) to create a handler, just as you would with [`"net/http"`](https://golang.org/pkg/net/http/) Handler.
@ -241,8 +235,8 @@ func main() {
// mux maps a type to a handler // mux maps a type to a handler
mux := asynq.NewServeMux() mux := asynq.NewServeMux()
mux.HandleFunc(tasks.EmailDelivery, tasks.HandleEmailDeliveryTask) mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
mux.Handle(tasks.ImageProcessing, tasks.NewImageProcessor()) mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
// ...register other handlers... // ...register other handlers...
if err := srv.Run(mux); err != nil { if err := srv.Run(mux); err != nil {
@ -283,7 +277,7 @@ go get -u github.com/hibiken/asynq/tools/asynq
| Dependency | Version | | Dependency | Version |
| -------------------------- | ------- | | -------------------------- | ------- |
| [Redis](https://redis.io/) | v2.8+ | | [Redis](https://redis.io/) | v3.0+ |
| [Go](https://golang.org/) | v1.13+ | | [Go](https://golang.org/) | v1.13+ |
## Contributing ## Contributing

BIN
docs/assets/cluster.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 60 KiB

View File

@ -1,20 +1,11 @@
# Asynq CLI # Asynq CLI
Asynq CLI is a command line tool to monitor the tasks managed by `asynq` package. Asynq CLI is a command line tool to monitor the queues and tasks managed by `asynq` package.
## Table of Contents ## Table of Contents
- [Installation](#installation) - [Installation](#installation)
- [Quick Start](#quick-start) - [Usage](#usage)
- [Stats](#stats)
- [History](#history)
- [Servers](#servers)
- [List](#list)
- [Enqueue](#enqueue)
- [Delete](#delete)
- [Kill](#kill)
- [Cancel](#cancel)
- [Pause](#pause)
- [Config File](#config-file) - [Config File](#config-file)
## Installation ## Installation
@ -25,144 +16,41 @@ In order to use the tool, compile it using the following command:
This will create the asynq executable under your `$GOPATH/bin` directory. This will create the asynq executable under your `$GOPATH/bin` directory.
## Quickstart ## Usage
The tool has a few commands to inspect the state of tasks and queues. ### Commands
Run `asynq help` to see all the available commands. To view details on any command, use `asynq help <command> <subcommand>`.
- `asynq stats`
- `asynq queue [ls inspect history rm pause unpause]`
- `asynq task [ls cancel delete kill run delete-all kill-all run-all]`
- `asynq server [ls]`
### Global flags
Asynq CLI needs to connect to a redis-server to inspect the state of queues and tasks. Use flags to specify the options to connect to the redis-server used by your application. Asynq CLI needs to connect to a redis-server to inspect the state of queues and tasks. Use flags to specify the options to connect to the redis-server used by your application.
To connect to a redis cluster, pass `--cluster` and `--cluster_addrs` flags.
By default, CLI will try to connect to a redis server running at `localhost:6379`. By default, CLI will try to connect to a redis server running at `localhost:6379`.
### Stats ```
--config string config file to set flag defaut values (default is $HOME/.asynq.yaml)
-n, --db int redis database number (default is 0)
-h, --help help for asynq
-p, --password string password to use when connecting to redis server
-u, --uri string redis server URI (default "127.0.0.1:6379")
Stats command gives the overview of the current state of tasks and queues. You can run it in conjunction with `watch` command to repeatedly run `stats`. --cluster connect to redis cluster
--cluster_addrs string list of comma-separated redis server addresses
Example: ```
watch -n 3 asynq stats
This will run `asynq stats` command every 3 seconds.
![Gif](/docs/assets/asynq_stats.gif)
### History
History command shows the number of processed and failed tasks from the last x days.
By default, it shows the stats from the last 10 days. Use `--days` to specify the number of days.
Example:
asynq history --days=30
![Gif](/docs/assets/asynq_history.gif)
### Servers
Servers command shows the list of running worker servers pulling tasks from the given redis instance.
Example:
asynq servers
### List
List command shows all tasks in the specified state in a table format
Example:
asynq ls retry
asynq ls scheduled
asynq ls dead
asynq ls enqueued:default
asynq ls inprogress
### Enqueue
There are two commands to enqueue tasks.
Command `enq` takes a task ID and moves the task to **Enqueued** state. You can obtain the task ID by running `ls` command.
Example:
asynq enq d:1575732274:bnogo8gt6toe23vhef0g
Command `enqall` moves all tasks to **Enqueued** state from the specified state.
Example:
asynq enqall retry
Running the above command will move all **Retry** tasks to **Enqueued** state.
### Delete
There are two commands for task deletion.
Command `del` takes a task ID and deletes the task. You can obtain the task ID by running `ls` command.
Example:
asynq del r:1575732274:bnogo8gt6toe23vhef0g
Command `delall` deletes all tasks which are in the specified state.
Example:
asynq delall retry
Running the above command will delete all **Retry** tasks.
### Kill
There are two commands to kill (i.e. move to dead state) tasks.
Command `kill` takes a task ID and kills the task. You can obtain the task ID by running `ls` command.
Example:
asynq kill r:1575732274:bnogo8gt6toe23vhef0g
Command `killall` kills all tasks which are in the specified state.
Example:
asynq killall retry
Running the above command will move all **Retry** tasks to **Dead** state.
### Cancel
Command `cancel` takes a task ID and sends a cancelation signal to the goroutine processing the specified task.
You can obtain the task ID by running `ls` command.
The task should be in "in-progress" state.
Handler implementation needs to be context aware in order to actually stop processing.
Example:
asynq cancel bnogo8gt6toe23vhef0g
### Pause
Command `pause` pauses the spcified queue. Tasks in paused queues are not processed by servers.
To resume processing from the queue, use `unpause` command.
To see which queues are currently paused, use `stats` command.
Example:
asynq pause email
asynq unpause email
## Config File ## Config File
You can use a config file to set default values for the flags. You can use a config file to set default values for the flags.
This is useful, for example when you have to connect to a remote redis server.
By default, `asynq` will try to read config file located in By default, `asynq` will try to read config file located in
`$HOME/.asynq.(yaml|json)`. You can specify the file location via `--config` flag. `$HOME/.asynq.(yml|json)`. You can specify the file location via `--config` flag.
Config file example: Config file example: