2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-20 21:26:14 +08:00

Compare commits

..

11 Commits

Author SHA1 Message Date
Ken Hibino
9884d5f2fa v0.8.2 2020-05-03 16:55:34 -07:00
Ken Hibino
826f1ecff4 Update docs 2020-05-03 16:54:39 -07:00
Ken Hibino
24f2b64c6c Make sure to invoke CancelFunc in all cases 2020-05-03 15:58:23 -07:00
Ken Hibino
1c1474c55c Add tests to simulate cases where server cannot talk to redis 2020-05-02 07:05:26 -07:00
Ken Hibino
5161b9368a Clean up tests 2020-05-02 07:05:26 -07:00
Ken Hibino
0c998a8e17 Add test for signal handling 2020-04-28 06:56:05 -07:00
Ken Hibino
49160f2536 v0.8.1 2020-04-27 06:49:12 -07:00
Ken Hibino
e33d297d8e Add SetDefaultOptions method to Client 2020-04-27 06:45:13 -07:00
Ken Hibino
eb8ced6bdd Add ParseRedisURI helper function 2020-04-25 13:06:20 -07:00
Ken Hibino
789a9fd711 Update readme 2020-04-20 07:52:26 -07:00
Ken Hibino
5924cdac33 Add example tests 2020-04-19 11:36:43 -07:00
11 changed files with 623 additions and 82 deletions

View File

@@ -7,6 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.8.2] - 2020-05-03
### Fixed
- [Fixed cancelfunc leak](https://github.com/hibiken/asynq/pull/145)
## [0.8.1] - 2020-04-27
### Added
- `ParseRedisURI` helper function is added to create a `RedisConnOpt` from a URI string.
- `SetDefaultOptions` method is added to `Client`.
## [0.8.0] - 2020-04-19 ## [0.8.0] - 2020-04-19
### Changed ### Changed

View File

@@ -15,7 +15,7 @@ Highlevel overview of how Asynq works:
- Client puts task on a queue - Client puts task on a queue
- Server pulls task off queues and starts a worker goroutine for each task - Server pulls task off queues and starts a worker goroutine for each task
- Workers process tasks concurrently - Tasks are processed concurrently by multiple workers
Task queues are used as a mechanism to distribute work across multiple machines. Task queues are used as a mechanism to distribute work across multiple machines.
A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling. A system can consist of multiple worker servers and brokers, giving way to high availability and horizontal scaling.
@@ -24,24 +24,24 @@ A system can consist of multiple worker servers and brokers, giving way to high
## Stability and Compatibility ## Stability and Compatibility
**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users. The public API could change without a major version update before v1.0.0 release. **Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users (Feedback on APIs are appreciated!). The public API could change without a major version update before v1.0.0 release.
**Status**: The library is currently undergoing heavy development with frequent, breaking API changes. **Status**: The library is currently undergoing heavy development with frequent, breaking API changes.
## Features ## Features
- Guaranteed at least one execution of a task - Guaranteed [at least one execution](https://www.cloudcomputingpatterns.org/at_least_once_delivery/) of a task
- Scheduling of tasks - Scheduling of tasks
- Durability since tasks are written to Redis - Durability since tasks are written to Redis
- Retries of failed tasks - [Retries](https://github.com/hibiken/asynq/wiki/Task-Retry) of failed tasks
- Concurrency management via configuration - [Weighted priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#weighted-priority-queues)
- Weighted priority queues - [Strict priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#strict-priority-queues)
- Strict priority queues
- Low latency to add a task since writes are fast in Redis - Low latency to add a task since writes are fast in Redis
- De-duplication of tasks using unique option - De-duplication of tasks using [unique option](https://github.com/hibiken/asynq/wiki/Unique-Tasks)
- Allow timeout and deadline per task - Allow [timeout and deadline per task](https://github.com/hibiken/asynq/wiki/Task-Timeout-and-Cancelation)
- Flexible handler interface with support for middlewares - [Flexible handler interface with support for middlewares](https://github.com/hibiken/asynq/wiki/Handler-Deep-Dive)
- CLI to inspect and remote-control queues and tasks - [Support Redis Sentinels](https://github.com/hibiken/asynq/wiki/Automatic-Failover) for HA
- [CLI](#command-line-tool) to inspect and remote-control queues and tasks
## Quickstart ## Quickstart
@@ -62,13 +62,15 @@ import (
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
) )
// A list of background task types. // A list of task types.
const ( const (
EmailDelivery = "email:deliver" EmailDelivery = "email:deliver"
ImageProcessing = "image:process" ImageProcessing = "image:process"
) )
//--------------------------------------------
// Write function NewXXXTask to create a task. // Write function NewXXXTask to create a task.
//--------------------------------------------
func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task { func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task {
payload := map[string]interface{}{"user_id": userID, "template_id": tmplID} payload := map[string]interface{}{"user_id": userID, "template_id": tmplID}
@@ -80,8 +82,13 @@ func NewImageProcessingTask(src, dst string) *asynq.Task {
return asynq.NewTask(ImageProcessing, payload) return asynq.NewTask(ImageProcessing, payload)
} }
//-------------------------------------------------------------
// Write function HandleXXXTask to handle the given task. // Write function HandleXXXTask to handle the given task.
// NOTE: It satisfies the asynq.HandlerFunc interface. // NOTE: It satisfies the asynq.HandlerFunc interface.
//
// Handler doesn't need to be a function. You can define a type
// that satisfies asynq.Handler interface. See example below.
//-------------------------------------------------------------
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error { func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
userID, err := t.Payload.GetInt("user_id") userID, err := t.Payload.GetInt("user_id")
@@ -97,7 +104,12 @@ func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
return nil return nil
} }
func HandleImageProcessingTask(ctx context.Context, t *asynq.Task) error { type ImageProcesser struct {
// ... fields for struct
}
// ImageProcessor implements asynq.Handler.
func (p *ImageProcessor) ProcessTask(ctx context.Context, t *asynq.Task) error {
src, err := t.Payload.GetString("src") src, err := t.Payload.GetString("src")
if err != nil { if err != nil {
return err return err
@@ -110,6 +122,10 @@ func HandleImageProcessingTask(ctx context.Context, t *asynq.Task) error {
// Image processing logic ... // Image processing logic ...
return nil return nil
} }
func NewImageProcessor() *ImageProcessor {
// ... return an instance
}
``` ```
In your web application code, import the above package and use [`Client`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Client) to put tasks on the queue. In your web application code, import the above package and use [`Client`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Client) to put tasks on the queue.
@@ -132,7 +148,10 @@ func main() {
r := asynq.RedisClientOpt{Addr: redisAddr} r := asynq.RedisClientOpt{Addr: redisAddr}
c := asynq.NewClient(r) c := asynq.NewClient(r)
// ----------------------------------------------------
// Example 1: Enqueue task to be processed immediately. // Example 1: Enqueue task to be processed immediately.
// Use (*Client).Enqueue method.
// ----------------------------------------------------
t := tasks.NewEmailDeliveryTask(42, "some:template:id") t := tasks.NewEmailDeliveryTask(42, "some:template:id")
err := c.Enqueue(t) err := c.Enqueue(t)
@@ -141,7 +160,10 @@ func main() {
} }
// ----------------------------------------------------------
// Example 2: Schedule task to be processed in the future. // Example 2: Schedule task to be processed in the future.
// Use (*Client).EnqueueIn or (*Client).EnqueueAt.
// ----------------------------------------------------------
t = tasks.NewEmailDeliveryTask(42, "other:template:id") t = tasks.NewEmailDeliveryTask(42, "other:template:id")
err = c.EnqueueIn(24*time.Hour, t) err = c.EnqueueIn(24*time.Hour, t)
@@ -150,18 +172,33 @@ func main() {
} }
// Example 3: Pass options to tune task processing behavior. // --------------------------------------------------------------------------
// Options include MaxRetry, Queue, Timeout, Deadline, Unique etc. // Example 3: Set options to tune task processing behavior.
// Options include MaxRetry, Queue, Timeout, Deadline, Unique etc.
// --------------------------------------------------------------------------
c.SetDefaultOptions(tasks.ImageProcessing, asynq.MaxRetry(10), asynq.Timeout(time.Minute))
t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url") t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url")
err = c.Enqueue(t, asynq.MaxRetry(10), asynq.Queue("critical"), asynq.Timeout(time.Minute)) err = c.Enqueue(t)
if err != nil {
log.Fatal("could not enqueue task: %v", err)
}
// --------------------------------------------------------------------------
// Example 4: Pass options to tune task processing behavior at enqueue time.
// Options passed at enqueue time override default ones, if any.
// --------------------------------------------------------------------------
t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url")
err = c.Enqueue(t, asynq.Queue("critical"), asynq.Timeout(30*time.Second))
if err != nil { if err != nil {
log.Fatal("could not enqueue task: %v", err) log.Fatal("could not enqueue task: %v", err)
} }
} }
``` ```
Next, create a work server binary to process these tasks in the background. Next, create a worker server to process these tasks in the background.
To start the background workers, use [`Server`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Server) and provide your [`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler) to process the tasks. To start the background workers, use [`Server`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Server) and provide your [`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler) to process the tasks.
You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux) to create a handler, just as you would with [`"net/http"`](https://golang.org/pkg/net/http/) Handler. You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux) to create a handler, just as you would with [`"net/http"`](https://golang.org/pkg/net/http/) Handler.
@@ -170,6 +207,8 @@ You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?
package main package main
import ( import (
"log"
"github.com/hibiken/asynq" "github.com/hibiken/asynq"
"your/app/package/tasks" "your/app/package/tasks"
) )
@@ -194,7 +233,7 @@ func main() {
// mux maps a type to a handler // mux maps a type to a handler
mux := asynq.NewServeMux() mux := asynq.NewServeMux()
mux.HandleFunc(tasks.EmailDelivery, tasks.HandleEmailDeliveryTask) mux.HandleFunc(tasks.EmailDelivery, tasks.HandleEmailDeliveryTask)
mux.HandleFunc(tasks.ImageProcessing, tasks.HandleImageProcessingTask) mux.Handle(tasks.ImageProcessing, tasks.NewImageProcessor())
// ...register other handlers... // ...register other handlers...
if err := srv.Run(mux); err != nil { if err := srv.Run(mux); err != nil {

View File

@@ -7,6 +7,9 @@ package asynq
import ( import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net/url"
"strconv"
"strings"
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
) )
@@ -94,6 +97,79 @@ type RedisFailoverClientOpt struct {
TLSConfig *tls.Config TLSConfig *tls.Config
} }
// ParseRedisURI parses redis uri string and returns RedisConnOpt if uri is valid.
// It returns a non-nil error if uri cannot be parsed.
//
// Three URI schemes are supported, which are redis:, redis-socket:, and redis-sentinel:.
// Supported formats are:
// redis://[:password@]host[:port][/dbnumber]
// redis-socket://[:password@]path[?db=dbnumber]
// redis-sentinel://[:password@]host1[:port][,host2:[:port]][,hostN:[:port]][?master=masterName]
func ParseRedisURI(uri string) (RedisConnOpt, error) {
u, err := url.Parse(uri)
if err != nil {
return nil, fmt.Errorf("asynq: could not parse redis uri: %v", err)
}
switch u.Scheme {
case "redis":
return parseRedisURI(u)
case "redis-socket":
return parseRedisSocketURI(u)
case "redis-sentinel":
return parseRedisSentinelURI(u)
default:
return nil, fmt.Errorf("asynq: unsupported uri scheme: %q", u.Scheme)
}
}
func parseRedisURI(u *url.URL) (RedisConnOpt, error) {
var db int
var err error
if len(u.Path) > 0 {
xs := strings.Split(strings.Trim(u.Path, "/"), "/")
db, err = strconv.Atoi(xs[0])
if err != nil {
return nil, fmt.Errorf("asynq: could not parse redis uri: database number should be the first segment of the path")
}
}
var password string
if v, ok := u.User.Password(); ok {
password = v
}
return RedisClientOpt{Addr: u.Host, DB: db, Password: password}, nil
}
func parseRedisSocketURI(u *url.URL) (RedisConnOpt, error) {
const errPrefix = "asynq: could not parse redis socket uri"
if len(u.Path) == 0 {
return nil, fmt.Errorf("%s: path does not exist", errPrefix)
}
q := u.Query()
var db int
var err error
if n := q.Get("db"); n != "" {
db, err = strconv.Atoi(n)
if err != nil {
return nil, fmt.Errorf("%s: query param `db` should be a number", errPrefix)
}
}
var password string
if v, ok := u.User.Password(); ok {
password = v
}
return RedisClientOpt{Network: "unix", Addr: u.Path, DB: db, Password: password}, nil
}
func parseRedisSentinelURI(u *url.URL) (RedisConnOpt, error) {
addrs := strings.Split(u.Host, ",")
master := u.Query().Get("master")
var password string
if v, ok := u.User.Password(); ok {
password = v
}
return RedisFailoverClientOpt{MasterName: master, SentinelAddrs: addrs, Password: password}, nil
}
// createRedisClient returns a redis client given a redis connection configuration. // createRedisClient returns a redis client given a redis connection configuration.
// //
// Passing an unexpected type as a RedisConnOpt argument will cause panic. // Passing an unexpected type as a RedisConnOpt argument will cause panic.

View File

@@ -44,3 +44,106 @@ var sortTaskOpt = cmp.Transformer("SortMsg", func(in []*Task) []*Task {
}) })
return out return out
}) })
func TestParseRedisURI(t *testing.T) {
tests := []struct {
uri string
want RedisConnOpt
}{
{
"redis://localhost:6379",
RedisClientOpt{Addr: "localhost:6379"},
},
{
"redis://localhost:6379/3",
RedisClientOpt{Addr: "localhost:6379", DB: 3},
},
{
"redis://:mypassword@localhost:6379",
RedisClientOpt{Addr: "localhost:6379", Password: "mypassword"},
},
{
"redis://:mypassword@127.0.0.1:6379/11",
RedisClientOpt{Addr: "127.0.0.1:6379", Password: "mypassword", DB: 11},
},
{
"redis-socket:///var/run/redis/redis.sock",
RedisClientOpt{Network: "unix", Addr: "/var/run/redis/redis.sock"},
},
{
"redis-socket://:mypassword@/var/run/redis/redis.sock",
RedisClientOpt{Network: "unix", Addr: "/var/run/redis/redis.sock", Password: "mypassword"},
},
{
"redis-socket:///var/run/redis/redis.sock?db=7",
RedisClientOpt{Network: "unix", Addr: "/var/run/redis/redis.sock", DB: 7},
},
{
"redis-socket://:mypassword@/var/run/redis/redis.sock?db=12",
RedisClientOpt{Network: "unix", Addr: "/var/run/redis/redis.sock", Password: "mypassword", DB: 12},
},
{
"redis-sentinel://localhost:5000,localhost:5001,localhost:5002?master=mymaster",
RedisFailoverClientOpt{
MasterName: "mymaster",
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
},
},
{
"redis-sentinel://:mypassword@localhost:5000,localhost:5001,localhost:5002?master=mymaster",
RedisFailoverClientOpt{
MasterName: "mymaster",
SentinelAddrs: []string{"localhost:5000", "localhost:5001", "localhost:5002"},
Password: "mypassword",
},
},
}
for _, tc := range tests {
got, err := ParseRedisURI(tc.uri)
if err != nil {
t.Errorf("ParseRedisURI(%q) returned an error: %v", tc.uri, err)
continue
}
if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("ParseRedisURI(%q) = %+v, want %+v\n(-want,+got)\n%s", tc.uri, got, tc.want, diff)
}
}
}
func TestParseRedisURIErrors(t *testing.T) {
tests := []struct {
desc string
uri string
}{
{
"unsupported scheme",
"rdb://localhost:6379",
},
{
"missing scheme",
"localhost:6379",
},
{
"multiple db numbers",
"redis://localhost:6379/1,2,3",
},
{
"missing path for socket connection",
"redis-socket://?db=one",
},
{
"non integer for db numbers for socket",
"redis-socket:///some/path/to/redis?db=one",
},
}
for _, tc := range tests {
_, err := ParseRedisURI(tc.uri)
if err == nil {
t.Errorf("%s: ParseRedisURI(%q) succeeded for malformed input, want error",
tc.desc, tc.uri)
}
}
}

View File

@@ -9,6 +9,7 @@ import (
"fmt" "fmt"
"sort" "sort"
"strings" "strings"
"sync"
"time" "time"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
@@ -23,13 +24,18 @@ import (
// //
// Clients are safe for concurrent use by multiple goroutines. // Clients are safe for concurrent use by multiple goroutines.
type Client struct { type Client struct {
rdb *rdb.RDB mu sync.Mutex
opts map[string][]Option
rdb *rdb.RDB
} }
// NewClient and returns a new Client given a redis connection option. // NewClient and returns a new Client given a redis connection option.
func NewClient(r RedisConnOpt) *Client { func NewClient(r RedisConnOpt) *Client {
rdb := rdb.NewRDB(createRedisClient(r)) rdb := rdb.NewRDB(createRedisClient(r))
return &Client{rdb} return &Client{
opts: make(map[string][]Option),
rdb: rdb,
}
} }
// Option specifies the task processing behavior. // Option specifies the task processing behavior.
@@ -159,10 +165,19 @@ func serializePayload(payload map[string]interface{}) string {
return b.String() return b.String()
} }
const ( // Default max retry count used if nothing is specified.
// Max retry count by default const defaultMaxRetry = 25
defaultMaxRetry = 25
) // SetDefaultOptions sets options to be used for a given task type.
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
//
// Default options can be overridden by options passed at enqueue time.
func (c *Client) SetDefaultOptions(taskType string, opts ...Option) {
c.mu.Lock()
defer c.mu.Unlock()
c.opts[taskType] = opts
}
// EnqueueAt schedules task to be enqueued at the specified time. // EnqueueAt schedules task to be enqueued at the specified time.
// //
@@ -171,6 +186,35 @@ const (
// The argument opts specifies the behavior of task processing. // The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others. // If there are conflicting Option values the last one overrides others.
func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error { func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
return c.enqueueAt(t, task, opts...)
}
// Enqueue enqueues task to be processed immediately.
//
// Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
func (c *Client) Enqueue(task *Task, opts ...Option) error {
return c.enqueueAt(time.Now(), task, opts...)
}
// EnqueueIn schedules task to be enqueued after the specified delay.
//
// EnqueueIn returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error {
return c.enqueueAt(time.Now().Add(d), task, opts...)
}
func (c *Client) enqueueAt(t time.Time, task *Task, opts ...Option) error {
c.mu.Lock()
defer c.mu.Unlock()
if defaults, ok := c.opts[task.Type]; ok {
opts = append(defaults, opts...)
}
opt := composeOptions(opts...) opt := composeOptions(opts...)
msg := &base.TaskMessage{ msg := &base.TaskMessage{
ID: xid.New(), ID: xid.New(),
@@ -194,26 +238,6 @@ func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
return err return err
} }
// Enqueue enqueues task to be processed immediately.
//
// Enqueue returns nil if the task is enqueued successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
func (c *Client) Enqueue(task *Task, opts ...Option) error {
return c.EnqueueAt(time.Now(), task, opts...)
}
// EnqueueIn schedules task to be enqueued after the specified delay.
//
// EnqueueIn returns nil if the task is scheduled successfully, otherwise returns a non-nil error.
//
// The argument opts specifies the behavior of task processing.
// If there are conflicting Option values the last one overrides others.
func (c *Client) EnqueueIn(d time.Duration, task *Task, opts ...Option) error {
return c.EnqueueAt(time.Now().Add(d), task, opts...)
}
func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error { func (c *Client) enqueue(msg *base.TaskMessage, uniqueTTL time.Duration) error {
if uniqueTTL > 0 { if uniqueTTL > 0 {
return c.rdb.EnqueueUnique(msg, uniqueTTL) return c.rdb.EnqueueUnique(msg, uniqueTTL)

View File

@@ -15,6 +15,11 @@ import (
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
) )
var (
noTimeout = time.Duration(0).String()
noDeadline = time.Time{}.Format(time.RFC3339)
)
func TestClientEnqueueAt(t *testing.T) { func TestClientEnqueueAt(t *testing.T) {
r := setup(t) r := setup(t)
client := NewClient(RedisClientOpt{ client := NewClient(RedisClientOpt{
@@ -27,9 +32,6 @@ func TestClientEnqueueAt(t *testing.T) {
var ( var (
now = time.Now() now = time.Now()
oneHourLater = now.Add(time.Hour) oneHourLater = now.Add(time.Hour)
noTimeout = time.Duration(0).String()
noDeadline = time.Time{}.Format(time.RFC3339)
) )
tests := []struct { tests := []struct {
@@ -113,11 +115,6 @@ func TestClientEnqueue(t *testing.T) {
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
var (
noTimeout = time.Duration(0).String()
noDeadline = time.Time{}.Format(time.RFC3339)
)
tests := []struct { tests := []struct {
desc string desc string
task *Task task *Task
@@ -287,11 +284,6 @@ func TestClientEnqueueIn(t *testing.T) {
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"}) task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
var (
noTimeout = time.Duration(0).String()
noDeadline = time.Time{}.Format(time.RFC3339)
)
tests := []struct { tests := []struct {
desc string desc string
task *Task task *Task
@@ -364,6 +356,86 @@ func TestClientEnqueueIn(t *testing.T) {
} }
} }
func TestClientDefaultOptions(t *testing.T) {
r := setup(t)
tests := []struct {
desc string
defaultOpts []Option // options set at the client level.
opts []Option // options used at enqueue time.
task *Task
queue string // queue that the message should go into.
want *base.TaskMessage
}{
{
desc: "With queue routing option",
defaultOpts: []Option{Queue("feed")},
opts: []Option{},
task: NewTask("feed:import", nil),
queue: "feed",
want: &base.TaskMessage{
Type: "feed:import",
Payload: nil,
Retry: defaultMaxRetry,
Queue: "feed",
Timeout: noTimeout,
Deadline: noDeadline,
},
},
{
desc: "With multiple options",
defaultOpts: []Option{Queue("feed"), MaxRetry(5)},
opts: []Option{},
task: NewTask("feed:import", nil),
queue: "feed",
want: &base.TaskMessage{
Type: "feed:import",
Payload: nil,
Retry: 5,
Queue: "feed",
Timeout: noTimeout,
Deadline: noDeadline,
},
},
{
desc: "With overriding options at enqueue time",
defaultOpts: []Option{Queue("feed"), MaxRetry(5)},
opts: []Option{Queue("critical")},
task: NewTask("feed:import", nil),
queue: "critical",
want: &base.TaskMessage{
Type: "feed:import",
Payload: nil,
Retry: 5,
Queue: "critical",
Timeout: noTimeout,
Deadline: noDeadline,
},
},
}
for _, tc := range tests {
h.FlushDB(t, r)
c := NewClient(RedisClientOpt{Addr: redisAddr, DB: redisDB})
c.SetDefaultOptions(tc.task.Type, tc.defaultOpts...)
err := c.Enqueue(tc.task, tc.opts...)
if err != nil {
t.Fatal(err)
}
enqueued := h.GetEnqueuedMessages(t, r, tc.queue)
if len(enqueued) != 1 {
t.Errorf("%s;\nexpected queue %q to have one message; got %d messages in the queue.",
tc.desc, tc.queue, len(enqueued))
continue
}
got := enqueued[0]
if diff := cmp.Diff(tc.want, got, h.IgnoreIDOpt); diff != "" {
t.Errorf("%s;\nmismatch found in enqueued task message; (-want,+got)\n%s",
tc.desc, diff)
}
}
}
func TestUniqueKey(t *testing.T) { func TestUniqueKey(t *testing.T) {
tests := []struct { tests := []struct {
desc string desc string

95
example_test.go Normal file
View File

@@ -0,0 +1,95 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq_test
import (
"fmt"
"log"
"os"
"os/signal"
"github.com/hibiken/asynq"
"golang.org/x/sys/unix"
)
func ExampleServer_Run() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20},
)
h := asynq.NewServeMux()
// ... Register handlers
// Run blocks and waits for os signal to terminate the program.
if err := srv.Run(h); err != nil {
log.Fatal(err)
}
}
func ExampleServer_Stop() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20},
)
h := asynq.NewServeMux()
// ... Register handlers
if err := srv.Start(h); err != nil {
log.Fatal(err)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT)
<-sigs // wait for termination signal
srv.Stop()
}
func ExampleServer_Quiet() {
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: ":6379"},
asynq.Config{Concurrency: 20},
)
h := asynq.NewServeMux()
// ... Register handlers
if err := srv.Start(h); err != nil {
log.Fatal(err)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT, unix.SIGTSTP)
// Handle SIGTERM, SIGINT to exit the program.
// Handle SIGTSTP to stop processing new tasks.
for {
s := <-sigs
if s == unix.SIGTSTP {
srv.Quiet() // stop processing new tasks
continue
}
break
}
srv.Stop()
}
func ExampleParseRedisURI() {
rconn, err := asynq.ParseRedisURI("redis://localhost:6379/10")
if err != nil {
log.Fatal(err)
}
r, ok := rconn.(asynq.RedisClientOpt)
if !ok {
log.Fatal("unexpected type")
}
fmt.Println(r.Addr)
fmt.Println(r.DB)
// Output:
// localhost:6379
// 10
}

View File

@@ -82,7 +82,8 @@ func TestServerInfoKey(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
got := ServerInfoKey(tc.hostname, tc.pid, tc.sid) got := ServerInfoKey(tc.hostname, tc.pid, tc.sid)
if got != tc.want { if got != tc.want {
t.Errorf("ServerInfoKey(%q, %d) = %q, want %q", tc.hostname, tc.pid, got, tc.want) t.Errorf("ServerInfoKey(%q, %d, %q) = %q, want %q",
tc.hostname, tc.pid, tc.sid, got, tc.want)
} }
} }
} }
@@ -101,7 +102,8 @@ func TestWorkersKey(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
got := WorkersKey(tc.hostname, tc.pid, tc.sid) got := WorkersKey(tc.hostname, tc.pid, tc.sid)
if got != tc.want { if got != tc.want {
t.Errorf("WorkersKey(%q, %d) = %q, want = %q", tc.hostname, tc.pid, got, tc.want) t.Errorf("WorkersKey(%q, %d, %q) = %q, want = %q",
tc.hostname, tc.pid, tc.sid, got, tc.want)
} }
} }
} }

View File

@@ -188,15 +188,17 @@ func (p *processor) exec() {
<-p.sema /* release token */ <-p.sema /* release token */
}() }()
resCh := make(chan error, 1)
task := NewTask(msg.Type, msg.Payload)
ctx, cancel := createContext(msg) ctx, cancel := createContext(msg)
p.cancelations.Add(msg.ID.String(), cancel) p.cancelations.Add(msg.ID.String(), cancel)
go func() { defer func() {
resCh <- perform(ctx, task, p.handler) cancel()
p.cancelations.Delete(msg.ID.String()) p.cancelations.Delete(msg.ID.String())
}() }()
resCh := make(chan error, 1)
task := NewTask(msg.Type, msg.Payload)
go func() { resCh <- perform(ctx, task, p.handler) }()
select { select {
case <-p.quit: case <-p.quit:
// time is up, quit this worker goroutine. // time is up, quit this worker goroutine.

View File

@@ -37,19 +37,16 @@ func TestProcessorSuccess(t *testing.T) {
tests := []struct { tests := []struct {
enqueued []*base.TaskMessage // initial default queue state enqueued []*base.TaskMessage // initial default queue state
incoming []*base.TaskMessage // tasks to be enqueued during run incoming []*base.TaskMessage // tasks to be enqueued during run
wait time.Duration // wait duration between starting and stopping processor for this test case
wantProcessed []*Task // tasks to be processed at the end wantProcessed []*Task // tasks to be processed at the end
}{ }{
{ {
enqueued: []*base.TaskMessage{m1}, enqueued: []*base.TaskMessage{m1},
incoming: []*base.TaskMessage{m2, m3, m4}, incoming: []*base.TaskMessage{m2, m3, m4},
wait: time.Second,
wantProcessed: []*Task{t1, t2, t3, t4}, wantProcessed: []*Task{t1, t2, t3, t4},
}, },
{ {
enqueued: []*base.TaskMessage{}, enqueued: []*base.TaskMessage{},
incoming: []*base.TaskMessage{m1}, incoming: []*base.TaskMessage{m1},
wait: time.Second,
wantProcessed: []*Task{t1}, wantProcessed: []*Task{t1},
}, },
} }
@@ -68,21 +65,19 @@ func TestProcessorSuccess(t *testing.T) {
return nil return nil
} }
ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false) ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false)
cancelations := base.NewCancelations()
p := newProcessor(newProcessorParams{ p := newProcessor(newProcessorParams{
logger: testLogger, logger: testLogger,
broker: rdbClient, broker: rdbClient,
ss: ss, ss: ss,
retryDelayFunc: defaultDelayFunc, retryDelayFunc: defaultDelayFunc,
syncCh: nil, syncCh: nil,
cancelations: cancelations, cancelations: base.NewCancelations(),
errHandler: nil, errHandler: nil,
shutdownTimeout: defaultShutdownTimeout, shutdownTimeout: defaultShutdownTimeout,
}) })
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
var wg sync.WaitGroup p.start(&sync.WaitGroup{})
p.start(&wg)
for _, msg := range tc.incoming { for _, msg := range tc.incoming {
err := rdbClient.Enqueue(msg) err := rdbClient.Enqueue(msg)
if err != nil { if err != nil {
@@ -90,7 +85,7 @@ func TestProcessorSuccess(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
} }
time.Sleep(tc.wait) time.Sleep(time.Second) // wait for one second to allow all enqueued tasks to be processed.
p.terminate() p.terminate()
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" { if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Payload{})); diff != "" {
@@ -175,21 +170,19 @@ func TestProcessorRetry(t *testing.T) {
n++ n++
} }
ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false) ss := base.NewServerState("localhost", 1234, 10, defaultQueueConfig, false)
cancelations := base.NewCancelations()
p := newProcessor(newProcessorParams{ p := newProcessor(newProcessorParams{
logger: testLogger, logger: testLogger,
broker: rdbClient, broker: rdbClient,
ss: ss, ss: ss,
retryDelayFunc: delayFunc, retryDelayFunc: delayFunc,
syncCh: nil, syncCh: nil,
cancelations: cancelations, cancelations: base.NewCancelations(),
errHandler: ErrorHandlerFunc(errHandler), errHandler: ErrorHandlerFunc(errHandler),
shutdownTimeout: defaultShutdownTimeout, shutdownTimeout: defaultShutdownTimeout,
}) })
p.handler = tc.handler p.handler = tc.handler
var wg sync.WaitGroup p.start(&sync.WaitGroup{})
p.start(&wg)
for _, msg := range tc.incoming { for _, msg := range tc.incoming {
err := rdbClient.Enqueue(msg) err := rdbClient.Enqueue(msg)
if err != nil { if err != nil {
@@ -200,7 +193,7 @@ func TestProcessorRetry(t *testing.T) {
time.Sleep(tc.wait) time.Sleep(tc.wait)
p.terminate() p.terminate()
cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to second difference in zset score cmpOpt := cmpopts.EquateApprox(0, float64(time.Second)) // allow up to a second difference in zset score
gotRetry := h.GetRetryEntries(t, r) gotRetry := h.GetRetryEntries(t, r)
if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" { if diff := cmp.Diff(tc.wantRetry, gotRetry, h.SortZSetEntryOpt, cmpOpt); diff != "" {
t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryQueue, diff) t.Errorf("mismatch found in %q after running processor; (-want, +got)\n%s", base.RetryQueue, diff)
@@ -249,7 +242,6 @@ func TestProcessorQueues(t *testing.T) {
} }
for _, tc := range tests { for _, tc := range tests {
cancelations := base.NewCancelations()
ss := base.NewServerState("localhost", 1234, 10, tc.queueCfg, false) ss := base.NewServerState("localhost", 1234, 10, tc.queueCfg, false)
p := newProcessor(newProcessorParams{ p := newProcessor(newProcessorParams{
logger: testLogger, logger: testLogger,
@@ -257,7 +249,7 @@ func TestProcessorQueues(t *testing.T) {
ss: ss, ss: ss,
retryDelayFunc: defaultDelayFunc, retryDelayFunc: defaultDelayFunc,
syncCh: nil, syncCh: nil,
cancelations: cancelations, cancelations: base.NewCancelations(),
errHandler: nil, errHandler: nil,
shutdownTimeout: defaultShutdownTimeout, shutdownTimeout: defaultShutdownTimeout,
}) })
@@ -326,7 +318,6 @@ func TestProcessorWithStrictPriority(t *testing.T) {
"low": 1, "low": 1,
} }
// Note: Set concurrency to 1 to make sure tasks are processed one at a time. // Note: Set concurrency to 1 to make sure tasks are processed one at a time.
cancelations := base.NewCancelations()
ss := base.NewServerState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/) ss := base.NewServerState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/)
p := newProcessor(newProcessorParams{ p := newProcessor(newProcessorParams{
logger: testLogger, logger: testLogger,
@@ -334,14 +325,13 @@ func TestProcessorWithStrictPriority(t *testing.T) {
ss: ss, ss: ss,
retryDelayFunc: defaultDelayFunc, retryDelayFunc: defaultDelayFunc,
syncCh: nil, syncCh: nil,
cancelations: cancelations, cancelations: base.NewCancelations(),
errHandler: nil, errHandler: nil,
shutdownTimeout: defaultShutdownTimeout, shutdownTimeout: defaultShutdownTimeout,
}) })
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
var wg sync.WaitGroup p.start(&sync.WaitGroup{})
p.start(&wg)
time.Sleep(tc.wait) time.Sleep(tc.wait)
p.terminate() p.terminate()

View File

@@ -6,9 +6,13 @@ package asynq
import ( import (
"context" "context"
"fmt"
"syscall"
"testing" "testing"
"time" "time"
"github.com/hibiken/asynq/internal/rdb"
"github.com/hibiken/asynq/internal/testbroker"
"go.uber.org/goleak" "go.uber.org/goleak"
) )
@@ -49,6 +53,35 @@ func TestServer(t *testing.T) {
srv.Stop() srv.Stop()
} }
func TestServerRun(t *testing.T) {
// https://github.com/go-redis/redis/issues/1029
ignoreOpt := goleak.IgnoreTopFunction("github.com/go-redis/redis/v7/internal/pool.(*ConnPool).reaper")
defer goleak.VerifyNoLeaks(t, ignoreOpt)
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{})
done := make(chan struct{})
// Make sure server exits when receiving TERM signal.
go func() {
time.Sleep(2 * time.Second)
syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
done <- struct{}{}
}()
go func() {
select {
case <-time.After(10 * time.Second):
t.Fatal("server did not stop after receiving TERM signal")
case <-done:
}
}()
mux := NewServeMux()
if err := srv.Run(mux); err != nil {
t.Fatal(err)
}
}
func TestServerErrServerStopped(t *testing.T) { func TestServerErrServerStopped(t *testing.T) {
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{}) srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{})
handler := NewServeMux() handler := NewServeMux()
@@ -83,3 +116,95 @@ func TestServerErrServerRunning(t *testing.T) {
} }
srv.Stop() srv.Stop()
} }
func TestServerWithRedisDown(t *testing.T) {
// Make sure that server does not panic and exit if redis is down.
defer func() {
if r := recover(); r != nil {
t.Errorf("panic occurred: %v", r)
}
}()
r := rdb.NewRDB(setup(t))
testBroker := testbroker.NewTestBroker(r)
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{})
srv.broker = testBroker
srv.scheduler.broker = testBroker
srv.heartbeater.broker = testBroker
srv.processor.broker = testBroker
srv.subscriber.broker = testBroker
testBroker.Sleep()
// no-op handler
h := func(ctx context.Context, task *Task) error {
return nil
}
err := srv.Start(HandlerFunc(h))
if err != nil {
t.Fatal(err)
}
time.Sleep(3 * time.Second)
srv.Stop()
}
func TestServerWithFlakyBroker(t *testing.T) {
// Make sure that server does not panic and exit if redis is down.
defer func() {
if r := recover(); r != nil {
t.Errorf("panic occurred: %v", r)
}
}()
r := rdb.NewRDB(setup(t))
testBroker := testbroker.NewTestBroker(r)
srv := NewServer(RedisClientOpt{Addr: redisAddr, DB: redisDB}, Config{})
srv.broker = testBroker
srv.scheduler.broker = testBroker
srv.heartbeater.broker = testBroker
srv.processor.broker = testBroker
srv.subscriber.broker = testBroker
c := NewClient(RedisClientOpt{Addr: redisAddr, DB: redisDB})
h := func(ctx context.Context, task *Task) error {
// force task retry.
if task.Type == "bad_task" {
return fmt.Errorf("could not process %q", task.Type)
}
time.Sleep(2 * time.Second)
return nil
}
err := srv.Start(HandlerFunc(h))
if err != nil {
t.Fatal(err)
}
for i := 0; i < 10; i++ {
err := c.Enqueue(NewTask("enqueued", nil), MaxRetry(i))
if err != nil {
t.Fatal(err)
}
err = c.Enqueue(NewTask("bad_task", nil))
if err != nil {
t.Fatal(err)
}
err = c.EnqueueIn(time.Duration(i)*time.Second, NewTask("scheduled", nil))
if err != nil {
t.Fatal(err)
}
}
// simulate redis going down.
testBroker.Sleep()
time.Sleep(3 * time.Second)
// simulate redis comes back online.
testBroker.Wakeup()
time.Sleep(3 * time.Second)
srv.Stop()
}