2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-01-18 10:53:39 +08:00
golang基于redis的异步队列
Go to file
2024-12-11 09:19:37 +03:00
.github build(deps): bump codecov/codecov-action from 4 to 5 (#970) 2024-11-19 07:37:44 +03:00
docs/assets Update readme 2022-06-03 04:14:45 -07:00
internal release: v0.25.1 2024-12-11 09:19:37 +03:00
tools feat(dash): Add --insecure option (#980) 2024-12-09 09:09:12 +03:00
x Use string concat instead of fmt.Sprintf (#962) 2024-11-11 08:20:16 +03:00
.gitignore Allow configuration of DelayedTaskCheckInterval 2022-01-03 14:44:00 -08:00
aggregator_test.go Define GroupAggregator interface 2022-04-11 16:55:43 -07:00
aggregator.go Always enqueue the aggregated task in the same queue 2022-04-11 16:55:43 -07:00
asynq_test.go Update redis/go-redis to v9 2023-04-17 22:30:33 -07:00
asynq.go Need to support redis sentinel username. (#924) 2024-10-26 13:04:21 +03:00
benchmark_test.go Run golangci-lint in CI (#927) 2024-10-26 08:48:12 +03:00
CHANGELOG.md release: v0.25.1 2024-12-11 09:19:37 +03:00
client_test.go Run golangci-lint in CI (#927) 2024-10-26 08:48:12 +03:00
client.go Run golangci-lint in CI (#927) 2024-10-26 08:48:12 +03:00
CODE_OF_CONDUCT.md Add CODE_OF_CONDUCT.md 2022-01-04 06:17:48 -08:00
context.go Fix comment typos 2022-05-16 21:14:15 -07:00
CONTRIBUTING.md Update CONTRIBUTING.md to use git ssh 2022-05-08 09:21:33 -07:00
doc.go Update Client.Enqueue to return TaskInfo 2021-06-29 16:34:21 -07:00
example_test.go Add example for ResultWriter 2022-01-31 09:08:41 -08:00
forwarder_test.go Rename asynqtest package to testutil 2022-04-11 16:55:43 -07:00
forwarder.go Update forwarder to use time.Timer 2022-04-11 16:55:43 -07:00
go.mod build(deps): bump google.golang.org/protobuf from 1.35.1 to 1.35.2 (#971) 2024-11-19 07:37:13 +03:00
go.sum build(deps): bump google.golang.org/protobuf from 1.35.1 to 1.35.2 (#971) 2024-11-19 07:37:13 +03:00
healthcheck_test.go Change Server API 2021-06-29 16:34:21 -07:00
healthcheck.go Change Server API 2021-06-29 16:34:21 -07:00
heartbeat_test.go fix some typos 2023-01-05 20:03:02 -08:00
heartbeat.go Run golangci-lint in CI (#927) 2024-10-26 08:48:12 +03:00
inspector_test.go Implement reusing redis client 2023-09-19 11:20:32 +02:00
inspector.go docs: apply recommendaded updates 2024-10-19 09:05:17 +03:00
janitor_test.go feature: configurable janitor interval and deletion batch size (#715) 2024-05-06 14:11:52 +08:00
janitor.go feature: configurable janitor interval and deletion batch size (#715) 2024-05-06 14:11:52 +08:00
LICENSE Add MIT License 2019-11-30 10:21:25 -08:00
Makefile Run golangci-lint in CI (#927) 2024-10-26 08:48:12 +03:00
periodic_task_manager_test.go Update tests to use the configured Redis address 2024-10-28 12:48:56 +01:00
periodic_task_manager.go feat(periodic_task_manager): Add RedisUniversalClient support (#958) 2024-11-13 14:48:56 +03:00
processor_test.go Run golangci-lint in CI (#927) 2024-10-26 08:48:12 +03:00
processor.go Use math/rand/v2 2024-10-28 18:39:54 +01:00
README.md release: v0.25.0 2024-11-01 11:13:57 +03:00
recoverer_test.go Rename asynqtest package to testutil 2022-04-11 16:55:43 -07:00
recoverer.go fix some typos 2023-01-05 20:03:02 -08:00
scheduler_test.go Update tests to use the configured Redis address 2024-10-28 12:48:56 +01:00
scheduler.go fix: NewScheduler incorrectly creates underlying Client, closing broker properly (#977) 2024-12-06 08:40:04 +03:00
servemux_test.go Change payload to byte slice 2021-06-29 16:34:21 -07:00
servemux.go chore: replace loop with mux.mws = append(mux.mws, mws...) 2023-07-07 21:01:54 -07:00
server_test.go Update tests to use the configured Redis address 2024-10-28 12:48:56 +01:00
server.go Use math/rand/v2 2024-10-28 18:39:54 +01:00
signals_unix.go fix: call Stop on all other signals to correctly set the server state for the shutdown procedure to complete successfully (#982) 2024-12-11 09:05:00 +03:00
signals_windows.go pkg: go version update -> 1.20 2023-12-10 09:46:45 -08:00
subscriber_test.go Change Server API 2021-06-29 16:34:21 -07:00
subscriber.go Update redis/go-redis to v9 2023-04-17 22:30:33 -07:00
syncer_test.go Rename asynqtest package to testutil 2022-04-11 16:55:43 -07:00
syncer.go Change Server API 2021-06-29 16:34:21 -07:00

Asynq logo

Simple, reliable & efficient distributed task queue in Go

GoDoc Go Report Card Build Status License: MIT Gitter chat

Asynq is a Go library for queueing tasks and processing them asynchronously with workers. It's backed by Redis and is designed to be scalable yet easy to get started.

Highlevel overview of how Asynq works:

  • Client puts tasks on a queue
  • Server pulls tasks off queues and starts a worker goroutine for each task
  • Tasks are processed concurrently by multiple workers

Task queues are used as a mechanism to distribute work across multiple machines. A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.

Example use case

Task Queue Diagram

Features

Stability and Compatibility

Status: The library relatively stable and is currently undergoing moderate development with less frequent breaking API changes.

☝️ Important Note: Current major version is zero (v0.x.x) to accommodate rapid development and fast iteration while getting early feedback from users (feedback on APIs are appreciated!). The public API could change without a major version update before v1.0.0 release.

Redis Cluster Compatibility

Some of the lua scripts in this library may not be compatible with Redis Cluster.

Sponsoring

If you are using this package in production, please consider sponsoring the project to show your support!

Quickstart

Make sure you have Go installed (download). The last two Go versions are supported (See https://go.dev/dl).

Initialize your project by creating a folder and then running go mod init github.com/your/repo (learn more) inside the folder. Then install Asynq library with the go get command:

go get -u github.com/hibiken/asynq

Make sure you're running a Redis server locally or from a Docker container. Version 4.0 or higher is required.

Next, write a package that encapsulates task creation and task handling.

package tasks

import (
    "context"
    "encoding/json"
    "fmt"
    "log"
    "time"
    "github.com/hibiken/asynq"
)

// A list of task types.
const (
    TypeEmailDelivery   = "email:deliver"
    TypeImageResize     = "image:resize"
)

type EmailDeliveryPayload struct {
    UserID     int
    TemplateID string
}

type ImageResizePayload struct {
    SourceURL string
}

//----------------------------------------------
// Write a function NewXXXTask to create a task.
// A task consists of a type and a payload.
//----------------------------------------------

func NewEmailDeliveryTask(userID int, tmplID string) (*asynq.Task, error) {
    payload, err := json.Marshal(EmailDeliveryPayload{UserID: userID, TemplateID: tmplID})
    if err != nil {
        return nil, err
    }
    return asynq.NewTask(TypeEmailDelivery, payload), nil
}

func NewImageResizeTask(src string) (*asynq.Task, error) {
    payload, err := json.Marshal(ImageResizePayload{SourceURL: src})
    if err != nil {
        return nil, err
    }
    // task options can be passed to NewTask, which can be overridden at enqueue time.
    return asynq.NewTask(TypeImageResize, payload, asynq.MaxRetry(5), asynq.Timeout(20 * time.Minute)), nil
}

//---------------------------------------------------------------
// Write a function HandleXXXTask to handle the input task.
// Note that it satisfies the asynq.HandlerFunc interface.
//
// Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See examples below.
//---------------------------------------------------------------

func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
    var p EmailDeliveryPayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
    }
    log.Printf("Sending Email to User: user_id=%d, template_id=%s", p.UserID, p.TemplateID)
    // Email delivery code ...
    return nil
}

// ImageProcessor implements asynq.Handler interface.
type ImageProcessor struct {
    // ... fields for struct
}

func (processor *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
    var p ImageResizePayload
    if err := json.Unmarshal(t.Payload(), &p); err != nil {
        return fmt.Errorf("json.Unmarshal failed: %v: %w", err, asynq.SkipRetry)
    }
    log.Printf("Resizing image: src=%s", p.SourceURL)
    // Image resizing code ...
    return nil
}

func NewImageProcessor() *ImageProcessor {
	return &ImageProcessor{}
}

In your application code, import the above package and use Client to put tasks on queues.

package main

import (
    "log"
    "time"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    client := asynq.NewClient(asynq.RedisClientOpt{Addr: redisAddr})
    defer client.Close()

    // ------------------------------------------------------
    // Example 1: Enqueue task to be processed immediately.
    //            Use (*Client).Enqueue method.
    // ------------------------------------------------------

    task, err := tasks.NewEmailDeliveryTask(42, "some:template:id")
    if err != nil {
        log.Fatalf("could not create task: %v", err)
    }
    info, err := client.Enqueue(task)
    if err != nil {
        log.Fatalf("could not enqueue task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)


    // ------------------------------------------------------------
    // Example 2: Schedule task to be processed in the future.
    //            Use ProcessIn or ProcessAt option.
    // ------------------------------------------------------------

    info, err = client.Enqueue(task, asynq.ProcessIn(24*time.Hour))
    if err != nil {
        log.Fatalf("could not schedule task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)


    // ----------------------------------------------------------------------------
    // Example 3: Set other options to tune task processing behavior.
    //            Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
    // ----------------------------------------------------------------------------

    task, err = tasks.NewImageResizeTask("https://example.com/myassets/image.jpg")
    if err != nil {
        log.Fatalf("could not create task: %v", err)
    }
    info, err = client.Enqueue(task, asynq.MaxRetry(10), asynq.Timeout(3 * time.Minute))
    if err != nil {
        log.Fatalf("could not enqueue task: %v", err)
    }
    log.Printf("enqueued task: id=%s queue=%s", info.ID, info.Queue)
}

Next, start a worker server to process these tasks in the background. To start the background workers, use Server and provide your Handler to process the tasks.

You can optionally use ServeMux to create a handler, just as you would with net/http Handler.

package main

import (
    "log"

    "github.com/hibiken/asynq"
    "your/app/package/tasks"
)

const redisAddr = "127.0.0.1:6379"

func main() {
    srv := asynq.NewServer(
        asynq.RedisClientOpt{Addr: redisAddr},
        asynq.Config{
            // Specify how many concurrent workers to use
            Concurrency: 10,
            // Optionally specify multiple queues with different priority.
            Queues: map[string]int{
                "critical": 6,
                "default":  3,
                "low":      1,
            },
            // See the godoc for other configuration options
        },
    )

    // mux maps a type to a handler
    mux := asynq.NewServeMux()
    mux.HandleFunc(tasks.TypeEmailDelivery, tasks.HandleEmailDeliveryTask)
    mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
    // ...register other handlers...

    if err := srv.Run(mux); err != nil {
        log.Fatalf("could not run server: %v", err)
    }
}

For a more detailed walk-through of the library, see our Getting Started guide.

To learn more about asynq features and APIs, see the package godoc.

Web UI

Asynqmon is a web based tool for monitoring and administrating Asynq queues and tasks.

Here's a few screenshots of the Web UI:

Queues view

Web UI Queues View

Tasks view

Web UI TasksView

Metrics view Screen Shot 2021-12-19 at 4 37 19 PM

Settings and adaptive dark mode

Web UI Settings and adaptive dark mode

For details on how to use the tool, refer to the tool's README.

Command Line Tool

Asynq ships with a command line tool to inspect the state of queues and tasks.

To install the CLI tool, run the following command:

go install github.com/hibiken/asynq/tools/asynq@latest

Here's an example of running the asynq dash command:

Gif

For details on how to use the tool, refer to the tool's README.

Contributing

We are open to, and grateful for, any contributions (GitHub issues/PRs, feedback on Gitter channel, etc) made by the community.

Please see the Contribution Guide before contributing.

License

Copyright (c) 2019-present Ken Hibino and Contributors. Asynq is free and open-source software licensed under the MIT License. Official logo was created by Vic Shóstak and distributed under Creative Commons license (CC0 1.0 Universal).