2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-24 23:02:18 +08:00
golang基于redis的异步队列
Go to file
2020-04-19 08:51:17 -07:00
.github/ISSUE_TEMPLATE Update issue templates 2019-12-27 10:45:45 -08:00
.travis Run and compare benchmarks after successful ci-build 2020-01-30 21:38:16 -08:00
docs/assets Update all reference to asynqmon to Asynq CLI 2020-04-19 08:51:17 -07:00
internal Rename internal ProcessState to ServerState 2020-04-19 08:51:17 -07:00
tools Update all reference to asynqmon to Asynq CLI 2020-04-19 08:51:17 -07:00
.gitignore Update all reference to asynqmon to Asynq CLI 2020-04-19 08:51:17 -07:00
.travis.yml Allow client to enqueue a task with unique option 2020-03-21 11:40:40 -07:00
asynq_test.go Extract out log package 2020-03-09 07:17:52 -07:00
asynq.go Add ServeMux type 2020-03-01 15:53:18 -08:00
benchmark_test.go Export Start, Stop and Quiet method on Server type 2020-04-19 08:51:17 -07:00
CHANGELOG.md v0.7.1 2020-04-05 14:56:06 -07:00
client_test.go Allow client to enqueue a task with unique option 2020-03-21 11:40:40 -07:00
client.go Allow client to enqueue a task with unique option 2020-03-21 11:40:40 -07:00
CONTRIBUTING.md [ci skip] Add contribution doc 2020-02-08 09:48:04 -08:00
doc.go Change Client APIs 2020-02-23 20:40:40 -08:00
go.mod Fix singnal handling for different systems 2020-04-05 14:37:23 -07:00
go.sum Update redis package to v7.2.0 2020-02-22 21:21:55 -08:00
heartbeat_test.go Rename internal ProcessState to ServerState 2020-04-19 08:51:17 -07:00
heartbeat.go Refactor server state 2020-04-19 08:51:17 -07:00
LICENSE Add MIT License 2019-11-30 10:21:25 -08:00
payload_test.go Refactor payload_test to reduce cyclomatic complexities 2020-03-14 12:30:42 -07:00
payload.go [ci skip] Update docs 2020-01-16 21:04:46 -08:00
processor_test.go Rename internal ProcessState to ServerState 2020-04-19 08:51:17 -07:00
processor.go Rename internal ProcessState to ServerState 2020-04-19 08:51:17 -07:00
README.md Update all reference to asynqmon to Asynq CLI 2020-04-19 08:51:17 -07:00
scheduler_test.go Extract out log package 2020-03-09 07:17:52 -07:00
scheduler.go Allow custom logger to be used in Background 2020-03-12 08:40:37 -07:00
servemux_test.go Add Use method to better support middlewares with ServeMux 2020-03-13 14:13:17 -07:00
servemux.go Add Use method to better support middlewares with ServeMux 2020-03-13 14:13:17 -07:00
server_test.go Export Start, Stop and Quiet method on Server type 2020-04-19 08:51:17 -07:00
server.go Rename internal ProcessState to ServerState 2020-04-19 08:51:17 -07:00
signals_unix.go Export Start, Stop and Quiet method on Server type 2020-04-19 08:51:17 -07:00
signals_windows.go Rename Background to Server 2020-04-19 08:51:17 -07:00
subscriber_test.go Extract out log package 2020-03-09 07:17:52 -07:00
subscriber.go Allow custom logger to be used in Background 2020-03-12 08:40:37 -07:00
syncer_test.go Extract out log package 2020-03-09 07:17:52 -07:00
syncer.go Allow custom logger to be used in Background 2020-03-12 08:40:37 -07:00

Asynq

Build Status License: MIT Go Report Card GoDoc Gitter chat codecov

Overview

Asynq is a Go library for queueing tasks and processing them in the background with workers. It is backed by Redis and it is designed to have a low barrier to entry. It should be integrated in your web stack easily.

Highlevel overview of how Asynq works:

  • Client puts task on a queue
  • Server pulls task off queues and starts a worker goroutine for each task
  • Workers process tasks concurrently

Task queues are used as a mechanism to distribute work across multiple machines.
A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.

Task Queue Diagram

Stability and Compatibility

Important Note: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users. The public API could change without a major version update before v1.0.0 release.

Status: The library is currently undergoing heavy development with frequent, breaking API changes.

Features

  • Guaranteed at least one execution of a task
  • Scheduling of tasks
  • Durability since tasks are written to Redis
  • Retries of failed tasks
  • Concurrency management via configuration
  • Weighted priority queues
  • Strict priority queues
  • Low latency to add a task since writes are fast in Redis
  • De-duplication of tasks using unique option
  • Allow timeout and deadline per task
  • Flexible handler interface with support for middlewares
  • CLI to inspect and remote-control queues and tasks

Quickstart

First, make sure you are running a Redis server locally.

$ redis-server

Next, write a package that encapsulates task creation and task handling.

package tasks

import (
    "fmt"

    "github.com/hibiken/asynq"
)

// A list of background task types.
const (
    EmailDelivery   = "email:deliver"
    ImageProcessing = "image:process"
)

// Write function NewXXXTask to create a task.

func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task {
    payload := map[string]interface{}{"user_id": userID, "template_id": tmplID}
    return asynq.NewTask(EmailDelivery, payload)
}

func NewImageProcessingTask(src, dst string) *asynq.Task {
    payload := map[string]interface{}{"src": src, "dst": dst}
    return asynq.NewTask(ImageProcessing, payload)
}

// Write function HandleXXXTask to handle the given task.
// NOTE: It satisfies the asynq.HandlerFunc interface.

func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
    userID, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    tmplID, err := t.Payload.GetString("template_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID)
    // Email delivery logic ...
    return nil
}

func HandleImageProcessingTask(ctx context.Context, t *asynq.Task) error {
    src, err := t.Payload.GetString("src")
    if err != nil {
        return err
    }
    dst, err := t.Payload.GetString("dst")
    if err != nil {
        return err
    }
    fmt.Printf("Process image: src = %s, dst = %s\n", src, dst)
    // Image processing logic ...
    return nil
}

In your web application code, import the above package and use Client to enqueue tasks to the task queue.
A task will be processed by a background worker as soon as the task gets enqueued.
Scheduled tasks will be stored in Redis and will be enqueued at the specified time.

package main

import (
    "time"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    r := &asynq.RedisClientOpt{Addr: redisAddr}
    c := asynq.NewClient(r)

    // Example 1: Enqueue task to be processed immediately.

    t := tasks.NewEmailDeliveryTask(42, "some:template:id")
    err := c.Enqueue(t)
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }


    // Example 2: Schedule task to be processed in the future.

    t = tasks.NewEmailDeliveryTask(42, "other:template:id")
    err = c.EnqueueIn(24*time.Hour, t)
    if err != nil {
        log.Fatal("could not schedule task: %v", err)
    }


    // Example 3: Pass options to tune task processing behavior.
    // Options include MaxRetry, Queue, Timeout, Deadline, etc.

    t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url")
    err = c.Enqueue(t, asynq.MaxRetry(10), asynq.Queue("critical"), asynq.Timeout(time.Minute))
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
}

Next, create a binary to process these tasks in the background.
To start the background workers, use Background and provide your Handler to process the tasks.

You can optionally use ServeMux to create a handler, just as you would with "net/http" Handler.

package main

import (
    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    r := &asynq.RedisClientOpt{Addr: redisAddr}

    bg := asynq.NewBackground(r, &asynq.Config{
        // Specify how many concurrent workers to use
        Concurrency: 10,
        // Optionally specify multiple queues with different priority.
        Queues: map[string]int{
            "critical": 6,
            "default":  3,
            "low":      1,
        },
        // See the godoc for other configuration options
    })

    // mux maps a type to a handler
    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.EmailDelivery, tasks.HandleEmailDeliveryTask)
    mux.HandleFunc(tasks.ImageProcessing, tasks.HandleImageProcessingTask)
    // ...register other handlers...

    bg.Run(mux)
}

For a more detailed walk-through of the library, see our Getting Started Guide.

To Learn more about asynq features and APIs, see our Wiki and godoc.

Command Line Tool

Asynq ships with a command line tool to inspect the state of queues and tasks.

Here's an example of running the stats command.

Gif

For details on how to use the tool, refer to the tool's README.

Installation

To install asynq library, run the following command:

go get -u github.com/hibiken/asynq

To install the CLI tool, run the following command:

go get -u github.com/hibiken/asynq/tools/asynq

Requirements

Dependency Version
Redis v2.8+
Go v1.13+

Contributing

We are open to, and grateful for, any contributions (Github issues/pull-requests, feedback on Gitter channel, etc) made by the community. Please see the Contribution Guide before contributing.

Acknowledgements

  • Sidekiq : Many of the design ideas are taken from sidekiq and its Web UI
  • RQ : Client APIs are inspired by rq library.
  • Cobra : Asynq CLI is built with cobra

License

Asynq is released under the MIT license. See LICENSE.