2020-01-18 21:00:33 -08:00
2020-01-16 21:13:34 -08:00
2019-12-06 22:06:59 -08:00
2019-11-30 09:08:14 -08:00
2020-01-18 15:07:15 -08:00
2020-01-16 21:04:46 -08:00
2020-01-16 21:04:46 -08:00
2020-01-18 15:07:15 -08:00
2020-01-18 15:07:15 -08:00
2020-01-16 21:04:46 -08:00
2020-01-16 21:04:46 -08:00
2020-01-16 21:04:46 -08:00
2020-01-16 21:04:46 -08:00
2019-11-30 10:21:25 -08:00
2020-01-16 21:04:46 -08:00
2020-01-18 15:07:15 -08:00
2020-01-18 15:07:15 -08:00

Asynq

Build Status License: MIT Go Report Card GoDoc Gitter chat

Simple and efficent asynchronous task processing library in Go.

Important Note: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users. The public API could change without a major version update before the release of verson 1.0.0.

Table of Contents

Overview

Gif

Asynq provides a simple interface to asynchronous task processing.

It also ships with a tool to monitor the queues and take manual actions if needed.

Asynq provides:

  • Clear separation of task producer and consumer
  • Ability to schedule task processing in the future
  • Automatic retry of failed tasks with exponential backoff
  • Automatic failover using Redis sentinels
  • Ability to configure max retry count per task
  • Ability to configure max number of worker goroutines to process tasks
  • Support for priority queues
  • Unix signal handling to gracefully shutdown background processing
  • CLI tool to query and mutate queues state for mointoring and administrative purposes

Requirements

Dependency Version
Redis v2.8+
Go v1.12+

Installation

go get -u github.com/hibiken/asynq

Getting Started

  1. Import asynq in your file.
import "github.com/hibiken/asynq"
  1. Asynq uses redis as a message broker. Use one of RedisConnOpt types to specify how to connect to Redis.
var redis = &asynq.RedisClientOpt{
    Addr: "localhost:6379",
    // Omit if no password is required
    Password: "mypassword",
    // Use a dedicated db number for asynq.
    // By default, Redis offers 16 databases (0..15)
    DB: 0,
}
  1. Create a Client instance to create and schedule tasks.
func main() {
    client := asynq.NewClient(redis)

    // Create a task with typename and payload.
    t1 := asynq.NewTask(
        "send_welcome_email",
        map[string]interface{}{"user_id": 42})

    t2 := asynq.NewTask(
        "send_reminder_email",
        map[string]interface{}{"user_id": 42})

    // Process the task immediately.
    err := client.Schedule(t1, time.Now())

    // Process the task 24 hours later.
    err = client.Schedule(t2, time.Now().Add(24 * time.Hour))

    // Specify the max number of retry (default: 25)
    err = client.Schedule(t1, time.Now(), asynq.MaxRetry(1))
}
  1. Create a Background instance to process tasks.
func main() {
    bg := asynq.NewBackground(redis, &asynq.Config{
        Concurrency: 10,
    })

    // Blocks until signal TERM or INT is received.
    // For graceful shutdown, send signal TSTP to stop processing more tasks
    // before sending TERM or INT signal to terminate the process.
    bg.Run(handler)
}

Note that Client and Background are intended to be used in separate executable binaries.

The argument to (*asynq.Background).Run is an interface asynq.Handler which has one method ProcessTask.

// ProcessTask should return nil if the processing of a task
// is successful.
//
// If ProcessTask return a non-nil error or panics, the task
// will be retried.
type Handler interface {
    ProcessTask(*Task) error
}

The simplest way to implement a handler is to define a function with the same signature and use asynq.HandlerFunc adapter type when passing it to Run.

func handler(t *asynq.Task) error {
    switch t.Type {
    case "send_welcome_email":
        id, err := t.Payload.GetInt("user_id")
        if err != nil {
            return err
        }
        fmt.Printf("Send Welcome Email to %d\n", id)

    // ... handle other types ...

    default:
        return fmt.Errorf("unexpected task type: %s", t.Type)
    }
    return nil
}

func main() {
    bg := asynq.NewBackground(redis, &asynq.Config{
        Concurrency: 10,
    })

    // Use asynq.HandlerFunc adapter for a handler function
    bg.Run(asynq.HandlerFunc(handler))
}

Monitoring CLI

Asynq ships with a CLI tool to inspect the state of queues and tasks.

To install the CLI, run the following command:

go get github.com/hibiken/asynq/tools/asynqmon

For details on how to use the tool, see the README for the asynqmon CLI.

Acknowledgements

  • Sidekiq : Many of the design ideas are taken from sidekiq and its Web UI
  • Cobra : Asynqmon CLI is built with cobra

License

Asynq is released under the MIT license. See LICENSE.

Description
Simple, reliable, and efficient distributed task queue in Go
Readme MIT 10 MiB
Languages
Go 100%