2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00
golang基于redis的异步队列
Go to file
2020-09-12 12:59:03 -07:00
.github/ISSUE_TEMPLATE Update issue templates 2019-12-27 10:45:45 -08:00
.travis Limit the number of tasks moved by CheckAndEnqueue to prevent a long 2020-06-08 06:06:18 -07:00
docs/assets Update demo.gif for CLI demo 2020-09-12 12:59:03 -07:00
internal Minor test updates 2020-09-12 12:59:03 -07:00
tools Use color package to bold fonts in CLI output 2020-09-12 12:59:03 -07:00
.gitignore Update all reference to asynqmon to Asynq CLI 2020-04-19 08:51:17 -07:00
.travis.yml Run CI builds using go v1.15.x 2020-09-02 06:34:58 -07:00
asynq_test.go Add test flags to run tests using redis cluster 2020-09-12 12:59:03 -07:00
asynq.go Add MaxRedirects field in RedisClusterClientOpt 2020-09-12 12:59:03 -07:00
benchmark_test.go Close redis client after each test run 2020-09-12 12:59:03 -07:00
CHANGELOG.md Update changelog 2020-09-12 12:59:03 -07:00
client_test.go Close redis client after each test run 2020-09-12 12:59:03 -07:00
client.go Add Close to Inspector 2020-09-12 12:59:03 -07:00
context_test.go Add GetQueueName helper to extract queue name from context 2020-09-12 12:59:03 -07:00
context.go Add GetQueueName helper to extract queue name from context 2020-09-12 12:59:03 -07:00
CONTRIBUTING.md Mention about testing using redis cluster in CONTRIBUTING.md 2020-09-12 12:59:03 -07:00
doc.go Update package documentation 2020-09-12 12:59:03 -07:00
example_test.go Add ParseRedisURI helper function 2020-04-25 13:06:20 -07:00
go.mod Upgrade redis client lib to v7.4.0 2020-09-12 12:59:03 -07:00
go.sum Upgrade redis client lib to v7.4.0 2020-09-12 12:59:03 -07:00
healthcheck_test.go Close redis client after each test run 2020-09-12 12:59:03 -07:00
healthcheck.go Add healthchecker to check broker connection 2020-07-28 22:45:57 -07:00
heartbeat_test.go Close redis client after each test run 2020-09-12 12:59:03 -07:00
heartbeat.go Replace github.com/rs/xid with github.com/google/uuid 2020-07-06 05:48:31 -07:00
inspector_test.go Minor test updates 2020-09-12 12:59:03 -07:00
inspector.go Add Close to Inspector 2020-09-12 12:59:03 -07:00
LICENSE Add MIT License 2019-11-30 10:21:25 -08:00
payload_test.go Fix JSON number ovewflow issue 2020-06-12 06:29:36 -07:00
payload.go Fix JSON number ovewflow issue 2020-06-12 06:29:36 -07:00
processor_test.go Close redis client after each test run 2020-09-12 12:59:03 -07:00
processor.go Rename InProgress to Active 2020-09-12 12:59:03 -07:00
README.md Update README 2020-09-12 12:59:03 -07:00
recoverer_test.go Close redis client after each test run 2020-09-12 12:59:03 -07:00
recoverer.go Update recoverer 2020-09-12 12:59:03 -07:00
scheduler_test.go Close redis client after each test run 2020-09-12 12:59:03 -07:00
scheduler.go Update scheduler to check and enqueue for only the specified queues. 2020-09-12 12:59:03 -07:00
servemux_test.go Add Use method to better support middlewares with ServeMux 2020-03-13 14:13:17 -07:00
servemux.go remove typo and redundant code 2020-05-22 05:11:54 -07:00
server_test.go Close redis client after each test run 2020-09-12 12:59:03 -07:00
server.go Update recoverer 2020-09-12 12:59:03 -07:00
signals_unix.go Export Start, Stop and Quiet method on Server type 2020-04-19 08:51:17 -07:00
signals_windows.go Rename Background to Server 2020-04-19 08:51:17 -07:00
subscriber_test.go Close redis client after each test run 2020-09-12 12:59:03 -07:00
subscriber.go Rename InProgress to Active 2020-09-12 12:59:03 -07:00
syncer_test.go Close redis client after each test run 2020-09-12 12:59:03 -07:00
syncer.go Add deadline to syncRequest 2020-07-06 05:48:31 -07:00

Asynq

Build Status License: MIT Go Report Card GoDoc Gitter chat codecov

Overview

Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started.

Highlevel overview of how Asynq works:

  • Client puts task on a queue
  • Server pulls task off queues and starts a worker goroutine for each task
  • Tasks are processed concurrently by multiple workers

Task queues are used as a mechanism to distribute work across multiple machines.
A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.

Task Queue Diagram

Stability and Compatibility

Important Note: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users (Feedback on APIs are appreciated!). The public API could change without a major version update before v1.0.0 release.

Status: The library is currently undergoing heavy development with frequent, breaking API changes.

Features

Quickstart

First, make sure you are running a Redis server locally.

$ redis-server

Next, write a package that encapsulates task creation and task handling.

package tasks

import (
    "fmt"

    "github.com/hibiken/asynq"
)

// A list of task types.
const (
    TypeEmailDelivery   = "email:deliver"
    TypeImageResize     = "image:resize"
)

//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------

func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task {
    payload := map[string]interface{}{"user_id": userID, "template_id": tmplID}
    return asynq.NewTask(TypeEmailDelivery, payload)
}

func NewImageResizeTask(src string) *asynq.Task {
    payload := map[string]interface{}{"src": src}
    return asynq.NewTask(TypeImageResize, payload)
}

//---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface.
//
// Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See examples below.
//---------------------------------------------------------------

func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
    userID, err := t.Payload.GetInt("user_id")
    if err != nil {
        return err
    }
    tmplID, err := t.Payload.GetString("template_id")
    if err != nil {
        return err
    }
    fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID)
    // Email delivery code ...
    return nil
}

// ImageProcessor implements asynq.Handler interface.
type ImageProcesser struct {
    // ... fields for struct
}

func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
    src, err := t.Payload.GetString("src")
    if err != nil {
        return err
    }
    fmt.Printf("Resize image: src = %s\n", src)
    // Image resizing code ...
    return nil
}

func NewImageProcessor() *ImageProcessor {
    // ... return an instance
}

In your application code, import the above package and use Client to put tasks on the queue.

package main

import (
    "time"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    r := asynq.RedisClientOpt{Addr: redisAddr}
    c := asynq.NewClient(r)
    defer c.Close()

    // ------------------------------------------------------
    // Example 1: Enqueue task to be processed immediately.
    //            Use (*Client).Enqueue method.
    // ------------------------------------------------------

    t := tasks.NewEmailDeliveryTask(42, "some:template:id")
    res, err := c.Enqueue(t)
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)


    // ------------------------------------------------------------
    // Example 2: Schedule task to be processed in the future.
    //            Use ProcessIn or ProcessAt option.
    // ------------------------------------------------------------

    t = tasks.NewEmailDeliveryTask(42, "other:template:id")
    res, err = c.Enqueue(t, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatal("could not schedule task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)


    // ----------------------------------------------------------------------------
    // Example 3: Set other options to tune task processing behavior.
    //            Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
    // ----------------------------------------------------------------------------

    c.SetDefaultOptions(tasks.ImageProcessing, asynq.MaxRetry(10), asynq.Timeout(3*time.Minute))

    t = tasks.NewImageResizeTask("some/blobstore/path")
    res, err = c.Enqueue(t)
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)

    // ---------------------------------------------------------------------------
    // Example 4: Pass options to tune task processing behavior at enqueue time.
    //            Options passed at enqueue time override default ones, if any.
    // ---------------------------------------------------------------------------

    t = tasks.NewImageResizeTask("some/blobstore/path")
    res, err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
    if err != nil {
        log.Fatal("could not enqueue task: %v", err)
    }
    fmt.Printf("Enqueued Result: %+v\n", res)
}

Next, start a worker server to process these tasks in the background.
To start the background workers, use Server and provide your Handler to process the tasks.

You can optionally use ServeMux to create a handler, just as you would with "net/http" Handler.

package main

import (
    "log"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    r := asynq.RedisClientOpt{Addr: redisAddr}

    srv := asynq.NewServer(r, asynq.Config{
        // Specify how many concurrent workers to use
        Concurrency: 10,
        // Optionally specify multiple queues with different priority.
        Queues: map[string]int{
            "critical": 6,
            "default":  3,
            "low":      1,
        },
        // See the godoc for other configuration options
    })

    // mux maps a type to a handler
    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
    mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
    // ...register other handlers...

    if err := srv.Run(mux); err != nil {
        log.Fatalf("could not run server: %v", err)
    }
}

For a more detailed walk-through of the library, see our Getting Started Guide.

To Learn more about asynq features and APIs, see our Wiki and godoc.

Command Line Tool

Asynq ships with a command line tool to inspect the state of queues and tasks.

Here's an example of running the stats command.

Gif

For details on how to use the tool, refer to the tool's README.

Installation

To install asynq library, run the following command:

go get -u github.com/hibiken/asynq

To install the CLI tool, run the following command:

go get -u github.com/hibiken/asynq/tools/asynq

Requirements

Dependency Version
Redis v3.0+
Go v1.13+

Contributing

We are open to, and grateful for, any contributions (Github issues/pull-requests, feedback on Gitter channel, etc) made by the community. Please see the Contribution Guide before contributing.

Acknowledgements

  • Sidekiq : Many of the design ideas are taken from sidekiq and its Web UI
  • RQ : Client APIs are inspired by rq library.
  • Cobra : Asynq CLI is built with cobra

License

Asynq is released under the MIT license. See LICENSE.