Compare commits

..

17 Commits

Author SHA1 Message Date
Ken Hibino
6df2c3ae2b v0.6.2 2020-03-15 21:02:28 -07:00
Ken Hibino
37554fd23c Update readme example code 2020-03-15 14:56:00 -07:00
Ken Hibino
77f5a38453 Refactor payload_test to reduce cyclomatic complexities 2020-03-14 12:30:42 -07:00
Ken Hibino
8d2b9d6be7 Add comments to exported types and functions from internal/log package 2020-03-13 21:04:45 -07:00
Bo-Yi Wu
1b7d557c66 fix typo 2020-03-13 20:02:26 -07:00
Bo-Yi Wu
30b68728d4 chore(lint): fix from gofmt -s 2020-03-13 20:01:39 -07:00
Ken Hibino
310d38620d Minor tweak to readme example code 2020-03-13 17:27:20 -07:00
Ken Hibino
1a53bbf21b Update changelog 2020-03-13 17:27:20 -07:00
Ken Hibino
9c79a7d507 Simplify code with gofmt -s 2020-03-13 14:24:24 -07:00
Ken Hibino
516f95edff Add Use method to better support middlewares with ServeMux 2020-03-13 14:13:17 -07:00
Ken Hibino
cf7a677312 v0.6.1 2020-03-12 08:42:34 -07:00
Ken Hibino
0bc6eba021 Allow custom logger to be used in Background 2020-03-12 08:40:37 -07:00
Ken Hibino
d664d68fa4 Extract out log package 2020-03-09 07:17:52 -07:00
Ken Hibino
a425f54d23 [ci skip] Remove todo comment 2020-03-09 06:09:07 -07:00
Ken Hibino
3c722386b0 Add Deadline option when enqueuing tasks
Deadline option sets the deadline for the given task's context deadline.
2020-03-08 17:12:42 -07:00
Ken Hibino
25992c2781 [ci skip] Minor readme update 2020-03-03 21:39:16 -08:00
Ken Hibino
b9e3cad7a7 [ci skip] Update readme
- Added flow chart for task queue
- Reordered sections
2020-03-02 07:06:11 -08:00
28 changed files with 1268 additions and 567 deletions

View File

@@ -7,6 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.6.2] - 2020-03-15
### Added
- `Use` method was added to `ServeMux` to apply middlewares to all handlers.
## [0.6.1] - 2020-03-12
### Added
- `Client` can optionally schedule task with `asynq.Deadline(time)` to specify deadline for task's context. Default is no deadline.
- `Logger` option was added to config, which allows user to specify the logger used by the background instance.
## [0.6.0] - 2020-03-01
### Added

217
README.md
View File

@@ -12,15 +12,7 @@ It is backed by Redis and it is designed to have a low barrier to entry. It shou
**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users. The public API could change without a major version update before v1.0.0 release.
![Gif](/docs/assets/demo.gif)
## Installation
To install `asynq` library, run the following command:
```sh
go get -u github.com/hibiken/asynq
```
![Task Queue Diagram](/docs/assets/task-queue.png)
## Quickstart
@@ -30,62 +22,133 @@ First, make sure you are running a Redis server locally.
$ redis-server
```
To create and schedule tasks, use `Client` and provide a task and when to enqueue the task.
Next, write a package that encapslates task creation and task handling.
```go
func main() {
r := &asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
package tasks
import (
"fmt"
"github.com/hibiken/asynq"
)
// A list of background task types.
const (
EmailDelivery = "email:deliver"
ImageProcessing = "image:process"
)
// Write function NewXXXTask to create a task.
func NewEmailDeliveryTask(userID int, tmplID string) *asynq.Task {
payload := map[string]interface{}{"user_id": userID, "template_id": tmplID}
return asynq.NewTask(EmailDelivery, payload)
}
func NewImageProcessingTask(src, dst string) *asynq.Task {
payload := map[string]interface{}{"src": src, "dst": dst}
return asynq.NewTask(ImageProcessing, payload)
}
// Write function HandleXXXTask to handle the given task.
// NOTE: It satisfies the asynq.HandlerFunc interface.
func HandleEmailDeliveryTask(ctx context.Context, t *asynq.Task) error {
userID, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
tmplID, err := t.Payload.GetString("template_id")
if err != nil {
return err
}
fmt.Printf("Send Email to User: user_id = %d, template_id = %s\n", userID, tmplID)
// Email delivery logic ...
return nil
}
client := asynq.NewClient(r)
// Create a task with task type and payload
t1 := asynq.NewTask("email:signup", map[string]interface{}{"user_id": 42})
t2 := asynq.NewTask("email:reminder", map[string]interface{}{"user_id": 42})
// Process immediately
err := client.Enqueue(t1)
// Process 24 hrs later
err = client.EnqueueIn(24*time.Hour, t2)
// Process at specified time.
target := time.Date(2020, time.March, 6, 10, 0, 0, 0, time.UTC)
err = client.EnqueueAt(target, t2)
// Pass options to specify processing behavior for a given task.
//
// MaxRetry specifies the maximum number of times this task will be retried (Default is 25).
// Queue specifies which queue to enqueue this task to (Default is "default").
// Timeout specifies the the timeout for the task's context (Default is no timeout).
err = client.Enqueue(t1, asynq.MaxRetry(10), asynq.Queue("critical"), asynq.Timeout(time.Minute))
func HandleImageProcessingTask(ctx context.Context, t *asynq.Task) error {
src, err := t.Payload.GetString("src")
if err != nil {
return err
}
dst, err := t.Payload.GetString("dst")
if err != nil {
return err
}
fmt.Printf("Process image: src = %s, dst = %s\n", src, dst)
// Image processing logic ...
return nil
}
```
To start the background workers, use `Background` and provide your `Handler` to process the tasks.
`Handler` is an interface with one method `ProcessTask` with the following signature.
In your web application code, import the above package and use [`Client`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Client) to enqueue tasks to the task queue.
A task will be processed by a background worker as soon as the task gets enqueued.
Scheduled tasks will be stored in Redis and will be enqueued at the specified time.
```go
// ProcessTask should return nil if the processing of a task
// is successful.
//
// If ProcessTask return a non-nil error or panics, the task
// will be retried after delay.
type Handler interface {
ProcessTask(context.Context, *asynq.Task) error
package main
import (
"time"
"github.com/hibiken/asynq"
"your/app/package/tasks"
)
const redisAddr = "127.0.0.1:6379"
func main() {
r := &asynq.RedisClientOpt{Addr: redisAddr}
c := asynq.NewClient(r)
// Example 1: Enqueue task to be processed immediately.
t := tasks.NewEmailDeliveryTask(42, "some:template:id")
err := c.Enqueue(t)
if err != nil {
log.Fatal("could not enqueue task: %v", err)
}
// Example 2: Schedule task to be processed in the future.
t = tasks.NewEmailDeliveryTask(42, "other:template:id")
err = c.EnqueueIn(24*time.Hour, t)
if err != nil {
log.Fatal("could not schedule task: %v", err)
}
// Example 3: Pass options to tune task processing behavior.
// Options include MaxRetry, Queue, Timeout, Deadline, etc.
t = tasks.NewImageProcessingTask("some/blobstore/url", "other/blobstore/url")
err = c.Enqueue(t, asynq.MaxRetry(10), asynq.Queue("critical"), asynq.Timeout(time.Minute))
if err != nil {
log.Fatal("could not enqueue task: %v", err)
}
}
```
You can optionally use `ServeMux` to create a handler, just as you would with `"net/http"` Handler.
Next, create a binary to process these tasks in the background.
To start the background workers, use [`Background`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Background) and provide your [`Handler`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#Handler) to process the tasks.
You can optionally use [`ServeMux`](https://pkg.go.dev/github.com/hibiken/asynq?tab=doc#ServeMux) to create a handler, just as you would with [`"net/http"`](https://golang.org/pkg/net/http/) Handler.
```go
package main
import (
"github.com/hibiken/asynq"
"your/app/package/tasks"
)
const redisAddr = "127.0.0.1:6379"
func main() {
r := &asynq.RedisClientOpt{
Addr: "127.0.0.1:6379",
}
r := &asynq.RedisClientOpt{Addr: redisAddr}
bg := asynq.NewBackground(r, &asynq.Config{
// Specify how many concurrent workers to use
@@ -99,29 +162,43 @@ func main() {
// See the godoc for other configuration options
})
// mux maps a type to a handler
mux := asynq.NewServeMux()
mux.HandleFunc("email:signup", signupEmailHandler)
mux.HandleFunc("email:reminder", reminderEmailHandler)
mux.HandleFunc(tasks.EmailDelivery, tasks.HandleEmailDeliveryTask)
mux.HandleFunc(tasks.ImageProcessing, tasks.HandleImageProcessingTask)
// ...register other handlers...
bg.Run(mux)
}
// function with the same signature as the ProcessTask method for the Handler interface.
func signupEmailHandler(ctx context.Context, t *asynq.Task) error {
id, err := t.Payload.GetInt("user_id")
if err != nil {
return err
}
fmt.Printf("Send welcome email to user %d\n", id)
// ...your email sending logic...
return nil
}
```
For a more detailed walk-through of the library, see our [Getting Started Guide](https://github.com/hibiken/asynq/wiki/Getting-Started).
To Learn more about `asynq` features and APIs, see our [Wiki pages](https://github.com/hibiken/asynq/wiki) and [godoc](https://godoc.org/github.com/hibiken/asynq).
To Learn more about `asynq` features and APIs, see our [Wiki](https://github.com/hibiken/asynq/wiki) and [godoc](https://godoc.org/github.com/hibiken/asynq).
## Command Line Tool
Asynq ships with a command line tool to inspect the state of queues and tasks.
Here's an example of running the `stats` command.
![Gif](/docs/assets/demo.gif)
For details on how to use the tool, refer to the tool's [README](/tools/asynqmon/README.md).
## Installation
To install `asynq` library, run the following command:
```sh
go get -u github.com/hibiken/asynq
```
To install the CLI tool, run the following command:
```sh
go get -u github.com/hibiken/asynq/tools/asynqmon
```
## Requirements
@@ -130,18 +207,6 @@ To Learn more about `asynq` features and APIs, see our [Wiki pages](https://gith
| [Redis](https://redis.io/) | v2.8+ |
| [Go](https://golang.org/) | v1.12+ |
## Command Line Tool
Asynq ships with a command line tool to inspect the state of queues and tasks.
To install, run the following command:
```sh
go get -u github.com/hibiken/asynq/tools/asynqmon
```
For details on how to use the tool, refer to the tool's [README](/tools/asynqmon/README.md).
## Contributing
We are open to, and grateful for, any contributions (Github issues/pull-requests, feedback on Gitter channel, etc) made by the community.

View File

@@ -5,12 +5,14 @@
package asynq
import (
"os"
"sort"
"testing"
"github.com/go-redis/redis/v7"
"github.com/google/go-cmp/cmp"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/log"
)
// This file defines test helper functions used by
@@ -22,6 +24,8 @@ const (
redisDB = 14
)
var testLogger = log.NewLogger(os.Stderr)
func setup(tb testing.TB) *redis.Client {
tb.Helper()
r := redis.NewClient(&redis.Options{

View File

@@ -16,6 +16,7 @@ import (
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb"
)
@@ -39,6 +40,8 @@ type Background struct {
// wait group to wait for all goroutines to finish.
wg sync.WaitGroup
logger Logger
rdb *rdb.RDB
scheduler *scheduler
processor *processor
@@ -104,6 +107,11 @@ type Config struct {
//
// ErrorHandler: asynq.ErrorHandlerFunc(reportError)
ErrorHandler ErrorHandler
// Logger specifies the logger used by the background instance.
//
// If unset, default logger is used.
Logger Logger
}
// An ErrorHandler handles errors returned by the task handler.
@@ -120,6 +128,25 @@ func (fn ErrorHandlerFunc) HandleError(task *Task, err error, retried, maxRetry
fn(task, err, retried, maxRetry)
}
// Logger implements logging with various log levels.
type Logger interface {
// Debug logs a message at Debug level.
Debug(format string, args ...interface{})
// Info logs a message at Info level.
Info(format string, args ...interface{})
// Warn logs a message at Warning level.
Warn(format string, args ...interface{})
// Error logs a message at Error level.
Error(format string, args ...interface{})
// Fatal logs a message at Fatal level
// and process will exit with status set to 1.
Fatal(format string, args ...interface{})
}
// Formula taken from https://github.com/mperham/sidekiq.
func defaultDelayFunc(n int, e error, t *Task) time.Duration {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
@@ -151,6 +178,10 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
if len(queues) == 0 {
queues = defaultQueueConfig
}
logger := cfg.Logger
if logger == nil {
logger = log.NewLogger(os.Stderr)
}
host, err := os.Hostname()
if err != nil {
@@ -162,12 +193,13 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background {
ps := base.NewProcessState(host, pid, n, queues, cfg.StrictPriority)
syncCh := make(chan *syncRequest)
cancels := base.NewCancelations()
syncer := newSyncer(syncCh, 5*time.Second)
heartbeater := newHeartbeater(rdb, ps, 5*time.Second)
scheduler := newScheduler(rdb, 5*time.Second, queues)
processor := newProcessor(rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler)
subscriber := newSubscriber(rdb, cancels)
syncer := newSyncer(logger, syncCh, 5*time.Second)
heartbeater := newHeartbeater(logger, rdb, ps, 5*time.Second)
scheduler := newScheduler(logger, rdb, 5*time.Second, queues)
processor := newProcessor(logger, rdb, ps, delayFunc, syncCh, cancels, cfg.ErrorHandler)
subscriber := newSubscriber(logger, rdb, cancels)
return &Background{
logger: logger,
rdb: rdb,
ps: ps,
scheduler: scheduler,
@@ -205,14 +237,20 @@ func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error {
// a signal, it gracefully shuts down all pending workers and other
// goroutines to process the tasks.
func (bg *Background) Run(handler Handler) {
logger.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid()))
logger.info("Starting processing")
type prefixLogger interface {
SetPrefix(prefix string)
}
// If logger supports setting prefix, then set prefix for log output.
if l, ok := bg.logger.(prefixLogger); ok {
l.SetPrefix(fmt.Sprintf("asynq: pid=%d ", os.Getpid()))
}
bg.logger.Info("Starting processing")
bg.start(handler)
defer bg.stop()
logger.info("Send signal TSTP to stop processing new tasks")
logger.info("Send signal TERM or INT to terminate the process")
bg.logger.Info("Send signal TSTP to stop processing new tasks")
bg.logger.Info("Send signal TERM or INT to terminate the process")
// Wait for a signal to terminate.
sigs := make(chan os.Signal, 1)
@@ -227,7 +265,7 @@ func (bg *Background) Run(handler Handler) {
break
}
fmt.Println()
logger.info("Starting graceful shutdown")
bg.logger.Info("Starting graceful shutdown")
}
// starts the background-task processing.
@@ -271,5 +309,5 @@ func (bg *Background) stop() {
bg.rdb.Close()
bg.running = false
logger.info("Bye!")
bg.logger.Info("Bye!")
}

View File

@@ -34,9 +34,10 @@ type Option interface{}
// Internal option representations.
type (
retryOption int
queueOption string
timeoutOption time.Duration
retryOption int
queueOption string
timeoutOption time.Duration
deadlineOption time.Time
)
// MaxRetry returns an option to specify the max number of times
@@ -64,17 +65,24 @@ func Timeout(d time.Duration) Option {
return timeoutOption(d)
}
// Deadline returns an option to specify the deadline for the given task.
func Deadline(t time.Time) Option {
return deadlineOption(t)
}
type option struct {
retry int
queue string
timeout time.Duration
retry int
queue string
timeout time.Duration
deadline time.Time
}
func composeOptions(opts ...Option) option {
res := option{
retry: defaultMaxRetry,
queue: base.DefaultQueueName,
timeout: 0,
retry: defaultMaxRetry,
queue: base.DefaultQueueName,
timeout: 0,
deadline: time.Time{},
}
for _, opt := range opts {
switch opt := opt.(type) {
@@ -84,6 +92,8 @@ func composeOptions(opts ...Option) option {
res.queue = string(opt)
case timeoutOption:
res.timeout = time.Duration(opt)
case deadlineOption:
res.deadline = time.Time(opt)
default:
// ignore unexpected option
}
@@ -105,12 +115,13 @@ const (
func (c *Client) EnqueueAt(t time.Time, task *Task, opts ...Option) error {
opt := composeOptions(opts...)
msg := &base.TaskMessage{
ID: xid.New(),
Type: task.Type,
Payload: task.Payload.data,
Queue: opt.queue,
Retry: opt.retry,
Timeout: opt.timeout.String(),
ID: xid.New(),
Type: task.Type,
Payload: task.Payload.data,
Queue: opt.queue,
Retry: opt.retry,
Timeout: opt.timeout.String(),
Deadline: opt.deadline.Format(time.RFC3339),
}
return c.enqueue(msg, t)
}

View File

@@ -25,6 +25,9 @@ func TestClientEnqueueAt(t *testing.T) {
var (
now = time.Now()
oneHourLater = now.Add(time.Hour)
noTimeout = time.Duration(0).String()
noDeadline = time.Time{}.Format(time.RFC3339)
)
tests := []struct {
@@ -41,13 +44,14 @@ func TestClientEnqueueAt(t *testing.T) {
processAt: now,
opts: []Option{},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: time.Duration(0).String(),
"default": {
{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: noTimeout,
Deadline: noDeadline,
},
},
},
@@ -62,11 +66,12 @@ func TestClientEnqueueAt(t *testing.T) {
wantScheduled: []h.ZSetEntry{
{
Msg: &base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: time.Duration(0).String(),
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: noTimeout,
Deadline: noDeadline,
},
Score: float64(oneHourLater.Unix()),
},
@@ -106,6 +111,11 @@ func TestClientEnqueue(t *testing.T) {
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
var (
noTimeout = time.Duration(0).String()
noDeadline = time.Time{}.Format(time.RFC3339)
)
tests := []struct {
desc string
task *Task
@@ -119,13 +129,14 @@ func TestClientEnqueue(t *testing.T) {
MaxRetry(3),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 3,
Queue: "default",
Timeout: time.Duration(0).String(),
"default": {
{
Type: task.Type,
Payload: task.Payload.data,
Retry: 3,
Queue: "default",
Timeout: noTimeout,
Deadline: noDeadline,
},
},
},
@@ -137,13 +148,14 @@ func TestClientEnqueue(t *testing.T) {
MaxRetry(-2),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 0, // Retry count should be set to zero
Queue: "default",
Timeout: time.Duration(0).String(),
"default": {
{
Type: task.Type,
Payload: task.Payload.data,
Retry: 0, // Retry count should be set to zero
Queue: "default",
Timeout: noTimeout,
Deadline: noDeadline,
},
},
},
@@ -156,13 +168,14 @@ func TestClientEnqueue(t *testing.T) {
MaxRetry(10),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: 10, // Last option takes precedence
Queue: "default",
Timeout: time.Duration(0).String(),
"default": {
{
Type: task.Type,
Payload: task.Payload.data,
Retry: 10, // Last option takes precedence
Queue: "default",
Timeout: noTimeout,
Deadline: noDeadline,
},
},
},
@@ -174,13 +187,14 @@ func TestClientEnqueue(t *testing.T) {
Queue("custom"),
},
wantEnqueued: map[string][]*base.TaskMessage{
"custom": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "custom",
Timeout: time.Duration(0).String(),
"custom": {
{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "custom",
Timeout: noTimeout,
Deadline: noDeadline,
},
},
},
@@ -192,31 +206,52 @@ func TestClientEnqueue(t *testing.T) {
Queue("HIGH"),
},
wantEnqueued: map[string][]*base.TaskMessage{
"high": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "high",
Timeout: time.Duration(0).String(),
"high": {
{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "high",
Timeout: noTimeout,
Deadline: noDeadline,
},
},
},
},
{
desc: "Timeout option sets the timeout duration",
desc: "With timeout option",
task: task,
opts: []Option{
Timeout(20 * time.Second),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: (20 * time.Second).String(),
"default": {
{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: (20 * time.Second).String(),
Deadline: noDeadline,
},
},
},
},
{
desc: "With deadline option",
task: task,
opts: []Option{
Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)),
},
wantEnqueued: map[string][]*base.TaskMessage{
"default": {
{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: noTimeout,
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Format(time.RFC3339),
},
},
},
@@ -250,6 +285,11 @@ func TestClientEnqueueIn(t *testing.T) {
task := NewTask("send_email", map[string]interface{}{"to": "customer@gmail.com", "from": "merchant@example.com"})
var (
noTimeout = time.Duration(0).String()
noDeadline = time.Time{}.Format(time.RFC3339)
)
tests := []struct {
desc string
task *Task
@@ -267,11 +307,12 @@ func TestClientEnqueueIn(t *testing.T) {
wantScheduled: []h.ZSetEntry{
{
Msg: &base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: time.Duration(0).String(),
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: noTimeout,
Deadline: noDeadline,
},
Score: float64(time.Now().Add(time.Hour).Unix()),
},
@@ -283,13 +324,14 @@ func TestClientEnqueueIn(t *testing.T) {
delay: 0,
opts: []Option{},
wantEnqueued: map[string][]*base.TaskMessage{
"default": []*base.TaskMessage{
&base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: time.Duration(0).String(),
"default": {
{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
Queue: "default",
Timeout: noTimeout,
Deadline: noDeadline,
},
},
},

BIN
docs/assets/task-queue.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 54 KiB

View File

@@ -15,7 +15,8 @@ import (
// heartbeater is responsible for writing process info to redis periodically to
// indicate that the background worker process is up.
type heartbeater struct {
rdb *rdb.RDB
logger Logger
rdb *rdb.RDB
ps *base.ProcessState
@@ -26,8 +27,9 @@ type heartbeater struct {
interval time.Duration
}
func newHeartbeater(rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) *heartbeater {
func newHeartbeater(l Logger, rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration) *heartbeater {
return &heartbeater{
logger: l,
rdb: rdb,
ps: ps,
done: make(chan struct{}),
@@ -36,7 +38,7 @@ func newHeartbeater(rdb *rdb.RDB, ps *base.ProcessState, interval time.Duration)
}
func (h *heartbeater) terminate() {
logger.info("Heartbeater shutting down...")
h.logger.Info("Heartbeater shutting down...")
// Signal the heartbeater goroutine to stop.
h.done <- struct{}{}
}
@@ -52,7 +54,7 @@ func (h *heartbeater) start(wg *sync.WaitGroup) {
select {
case <-h.done:
h.rdb.ClearProcessState(h.ps)
logger.info("Heartbeater done")
h.logger.Info("Heartbeater done")
return
case <-time.After(h.interval):
h.beat()
@@ -66,6 +68,6 @@ func (h *heartbeater) beat() {
// and short enough to expire quickly once the process is shut down or killed.
err := h.rdb.WriteProcessState(h.ps, h.interval*2)
if err != nil {
logger.error("could not write heartbeat data: %v", err)
h.logger.Error("could not write heartbeat data: %v", err)
}
}

View File

@@ -36,7 +36,7 @@ func TestHeartbeater(t *testing.T) {
h.FlushDB(t, r)
state := base.NewProcessState(tc.host, tc.pid, tc.concurrency, tc.queues, false)
hb := newHeartbeater(rdbClient, state, tc.interval)
hb := newHeartbeater(testLogger, rdbClient, state, tc.interval)
var wg sync.WaitGroup
hb.start(&wg)

View File

@@ -90,6 +90,13 @@ type TaskMessage struct {
//
// Zero means no limit.
Timeout string
// Deadline specifies the deadline for the task.
// Task won't be processed if it exceeded its deadline.
// The string shoulbe be in RFC3339 format.
//
// time.Time's zero value means no deadline.
Deadline string
}
// ProcessState holds process level information.

View File

@@ -110,9 +110,9 @@ func TestProcessStateConcurrentAccess(t *testing.T) {
var wg sync.WaitGroup
started := time.Now()
msgs := []*TaskMessage{
&TaskMessage{ID: xid.New(), Type: "type1", Payload: map[string]interface{}{"user_id": 42}},
&TaskMessage{ID: xid.New(), Type: "type2"},
&TaskMessage{ID: xid.New(), Type: "type3"},
{ID: xid.New(), Type: "type1", Payload: map[string]interface{}{"user_id": 42}},
{ID: xid.New(), Type: "type2"},
{ID: xid.New(), Type: "type3"},
}
// Simulate hearbeater calling SetStatus and SetStarted.

57
internal/log/log.go Normal file
View File

@@ -0,0 +1,57 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
// Package log exports logging related types and functions.
package log
import (
"io"
stdlog "log"
"os"
)
// NewLogger creates and returns a new instance of Logger.
func NewLogger(out io.Writer) *Logger {
return &Logger{
stdlog.New(out, "", stdlog.Ldate|stdlog.Ltime|stdlog.Lmicroseconds|stdlog.LUTC),
}
}
// Logger is a wrapper object around log.Logger from the standard library.
// It supports logging at various log levels.
type Logger struct {
*stdlog.Logger
}
// Debug logs a message at Debug level.
func (l *Logger) Debug(format string, args ...interface{}) {
format = "DEBUG: " + format
l.Printf(format, args...)
}
// Info logs a message at Info level.
func (l *Logger) Info(format string, args ...interface{}) {
format = "INFO: " + format
l.Printf(format, args...)
}
// Warn logs a message at Warning level.
func (l *Logger) Warn(format string, args ...interface{}) {
format = "WARN: " + format
l.Printf(format, args...)
}
// Error logs a message at Error level.
func (l *Logger) Error(format string, args ...interface{}) {
format = "ERROR: " + format
l.Printf(format, args...)
}
// Fatal logs a message at Fatal level
// and process will exit with status set to 1.
func (l *Logger) Fatal(format string, args ...interface{}) {
format = "FATAL: " + format
l.Printf(format, args...)
os.Exit(1)
}

View File

@@ -1,4 +1,8 @@
package asynq
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package log
import (
"bytes"
@@ -20,6 +24,38 @@ type tester struct {
wantPattern string // regexp that log output must match
}
func TestLoggerDebug(t *testing.T) {
tests := []tester{
{
desc: "without trailing newline, logger adds newline",
message: "hello, world!",
wantPattern: fmt.Sprintf("^%s %s%s DEBUG: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
{
desc: "with trailing newline, logger preserves newline",
message: "hello, world!\n",
wantPattern: fmt.Sprintf("^%s %s%s DEBUG: hello, world!\n$", rgxdate, rgxtime, rgxmicroseconds),
},
}
for _, tc := range tests {
var buf bytes.Buffer
logger := NewLogger(&buf)
logger.Debug(tc.message)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
if err != nil {
t.Fatal("pattern did not compile:", err)
}
if !matched {
t.Errorf("logger.info(%q) outputted %q, should match pattern %q",
tc.message, got, tc.wantPattern)
}
}
}
func TestLoggerInfo(t *testing.T) {
tests := []tester{
{
@@ -36,9 +72,9 @@ func TestLoggerInfo(t *testing.T) {
for _, tc := range tests {
var buf bytes.Buffer
logger := newLogger(&buf)
logger := NewLogger(&buf)
logger.info(tc.message)
logger.Info(tc.message)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
@@ -68,9 +104,9 @@ func TestLoggerWarn(t *testing.T) {
for _, tc := range tests {
var buf bytes.Buffer
logger := newLogger(&buf)
logger := NewLogger(&buf)
logger.warn(tc.message)
logger.Warn(tc.message)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)
@@ -100,9 +136,9 @@ func TestLoggerError(t *testing.T) {
for _, tc := range tests {
var buf bytes.Buffer
logger := newLogger(&buf)
logger := NewLogger(&buf)
logger.error(tc.message)
logger.Error(tc.message)
got := buf.String()
matched, err := regexp.MatchString(tc.wantPattern, got)

View File

@@ -67,7 +67,6 @@ func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) {
if len(qnames) == 1 {
data, err = r.dequeueSingle(base.QueueKey(qnames[0]))
} else {
// TODO(hibiken): Take keys are argument and don't compute every time
var keys []string
for _, q := range qnames {
keys = append(keys, base.QueueKey(q))

View File

@@ -884,7 +884,7 @@ func TestWriteProcessStateWithWorkers(t *testing.T) {
gotWorkers[key] = &w
}
wantWorkers := map[string]*base.WorkerInfo{
msg1.ID.String(): &base.WorkerInfo{
msg1.ID.String(): {
Host: host,
PID: pid,
ID: msg1.ID,
@@ -893,7 +893,7 @@ func TestWriteProcessStateWithWorkers(t *testing.T) {
Payload: msg1.Payload,
Started: w1Started,
},
msg2.ID.String(): &base.WorkerInfo{
msg2.ID.String(): {
Host: host,
PID: pid,
ID: msg2.ID,

View File

@@ -1,35 +0,0 @@
package asynq
import (
"io"
"log"
"os"
)
// global logger used in asynq package.
var logger = newLogger(os.Stderr)
func newLogger(out io.Writer) *asynqLogger {
return &asynqLogger{
log.New(out, "", log.Ldate|log.Ltime|log.Lmicroseconds|log.LUTC),
}
}
type asynqLogger struct {
*log.Logger
}
func (l *asynqLogger) info(format string, args ...interface{}) {
format = "INFO: " + format
l.Printf(format, args...)
}
func (l *asynqLogger) warn(format string, args ...interface{}) {
format = "WARN: " + format
l.Printf(format, args...)
}
func (l *asynqLogger) error(format string, args ...interface{}) {
format = "ERROR: " + format
l.Printf(format, args...)
}

View File

@@ -14,333 +14,626 @@ import (
"github.com/hibiken/asynq/internal/base"
)
func TestPayloadGet(t *testing.T) {
names := []string{"luke", "anakin", "rey"}
primes := []int{2, 3, 5, 7, 11, 13, 17}
user := map[string]interface{}{"name": "Ken", "score": 3.14}
location := map[string]string{"address": "123 Main St.", "state": "NY", "zipcode": "10002"}
favs := map[string][]string{
"movies": []string{"forrest gump", "star wars"},
"tv_shows": []string{"game of thrones", "HIMYM", "breaking bad"},
type payloadTest struct {
data map[string]interface{}
key string
nonkey string
}
func TestPayloadString(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"name": "gopher"},
key: "name",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetString(tc.key)
if err != nil || got != tc.data[tc.key] {
t.Errorf("Payload.GetString(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
b, err := json.Marshal(in)
if err != nil {
t.Fatal(err)
}
var out base.TaskMessage
err = json.Unmarshal(b, &out)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetString(tc.key)
if err != nil || got != tc.data[tc.key] {
t.Errorf("With Marshaling: Payload.GetString(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetString(tc.nonkey)
if err == nil || got != "" {
t.Errorf("Payload.GetString(%q) = %v, %v; want '', error",
tc.key, got, err)
}
}
}
func TestPayloadInt(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"user_id": 42},
key: "user_id",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetInt(tc.key)
if err != nil || got != tc.data[tc.key] {
t.Errorf("Payload.GetInt(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
b, err := json.Marshal(in)
if err != nil {
t.Fatal(err)
}
var out base.TaskMessage
err = json.Unmarshal(b, &out)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetInt(tc.key)
if err != nil || got != tc.data[tc.key] {
t.Errorf("With Marshaling: Payload.GetInt(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetInt(tc.nonkey)
if err == nil || got != 0 {
t.Errorf("Payload.GetInt(%q) = %v, %v; want 0, error",
tc.key, got, err)
}
}
}
func TestPayloadFloat64(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"pi": 3.14},
key: "pi",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetFloat64(tc.key)
if err != nil || got != tc.data[tc.key] {
t.Errorf("Payload.GetFloat64(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
b, err := json.Marshal(in)
if err != nil {
t.Fatal(err)
}
var out base.TaskMessage
err = json.Unmarshal(b, &out)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetFloat64(tc.key)
if err != nil || got != tc.data[tc.key] {
t.Errorf("With Marshaling: Payload.GetFloat64(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetFloat64(tc.nonkey)
if err == nil || got != 0 {
t.Errorf("Payload.GetFloat64(%q) = %v, %v; want 0, error",
tc.key, got, err)
}
}
}
func TestPayloadBool(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"enabled": true},
key: "enabled",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetBool(tc.key)
if err != nil || got != tc.data[tc.key] {
t.Errorf("Payload.GetBool(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
b, err := json.Marshal(in)
if err != nil {
t.Fatal(err)
}
var out base.TaskMessage
err = json.Unmarshal(b, &out)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetBool(tc.key)
if err != nil || got != tc.data[tc.key] {
t.Errorf("With Marshaling: Payload.GetBool(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetBool(tc.nonkey)
if err == nil || got != false {
t.Errorf("Payload.GetBool(%q) = %v, %v; want false, error",
tc.key, got, err)
}
}
}
func TestPayloadStringSlice(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"names": []string{"luke", "rey", "anakin"}},
key: "names",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetStringSlice(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetStringSlice(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
b, err := json.Marshal(in)
if err != nil {
t.Fatal(err)
}
var out base.TaskMessage
err = json.Unmarshal(b, &out)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetStringSlice(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetStringSlice(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetStringSlice(tc.nonkey)
if err == nil || got != nil {
t.Errorf("Payload.GetStringSlice(%q) = %v, %v; want nil, error",
tc.key, got, err)
}
}
}
func TestPayloadIntSlice(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"nums": []int{9, 8, 7}},
key: "nums",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetIntSlice(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetIntSlice(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
b, err := json.Marshal(in)
if err != nil {
t.Fatal(err)
}
var out base.TaskMessage
err = json.Unmarshal(b, &out)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetIntSlice(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetIntSlice(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetIntSlice(tc.nonkey)
if err == nil || got != nil {
t.Errorf("Payload.GetIntSlice(%q) = %v, %v; want nil, error",
tc.key, got, err)
}
}
}
func TestPayloadStringMap(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"user": map[string]interface{}{"name": "Jon Doe", "score": 2.2}},
key: "user",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetStringMap(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetStringMap(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
b, err := json.Marshal(in)
if err != nil {
t.Fatal(err)
}
var out base.TaskMessage
err = json.Unmarshal(b, &out)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetStringMap(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetStringMap(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetStringMap(tc.nonkey)
if err == nil || got != nil {
t.Errorf("Payload.GetStringMap(%q) = %v, %v; want nil, error",
tc.key, got, err)
}
}
}
func TestPayloadStringMapString(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"address": map[string]string{"line": "123 Main St", "city": "San Francisco", "state": "CA"}},
key: "address",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetStringMapString(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetStringMapString(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
b, err := json.Marshal(in)
if err != nil {
t.Fatal(err)
}
var out base.TaskMessage
err = json.Unmarshal(b, &out)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetStringMapString(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetStringMapString(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetStringMapString(tc.nonkey)
if err == nil || got != nil {
t.Errorf("Payload.GetStringMapString(%q) = %v, %v; want nil, error",
tc.key, got, err)
}
}
}
func TestPayloadStringMapStringSlice(t *testing.T) {
favs := map[string][]string{
"movies": {"forrest gump", "star wars"},
"tv_shows": {"game of thrones", "HIMYM", "breaking bad"},
}
tests := []payloadTest{
{
data: map[string]interface{}{"favorites": favs},
key: "favorites",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetStringMapStringSlice(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
b, err := json.Marshal(in)
if err != nil {
t.Fatal(err)
}
var out base.TaskMessage
err = json.Unmarshal(b, &out)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetStringMapStringSlice(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetStringMapStringSlice(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetStringMapStringSlice(tc.nonkey)
if err == nil || got != nil {
t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v; want nil, error",
tc.key, got, err)
}
}
}
func TestPayloadStringMapInt(t *testing.T) {
counter := map[string]int{
"a": 1,
"b": 101,
"c": 42,
}
tests := []payloadTest{
{
data: map[string]interface{}{"counts": counter},
key: "counts",
nonkey: "unknown",
},
}
for _, tc := range tests {
payload := Payload{tc.data}
got, err := payload.GetStringMapInt(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetStringMapInt(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
b, err := json.Marshal(in)
if err != nil {
t.Fatal(err)
}
var out base.TaskMessage
err = json.Unmarshal(b, &out)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetStringMapInt(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetStringMapInt(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
// access non-existent key.
got, err = payload.GetStringMapInt(tc.nonkey)
if err == nil || got != nil {
t.Errorf("Payload.GetStringMapInt(%q) = %v, %v; want nil, error",
tc.key, got, err)
}
}
}
func TestPayloadStringMapBool(t *testing.T) {
features := map[string]bool{
"A": false,
"B": true,
"C": true,
}
now := time.Now()
duration := 15 * time.Minute
data := map[string]interface{}{
"greeting": "Hello",
"user_id": 9876,
"pi": 3.1415,
"enabled": false,
"names": names,
"primes": primes,
"user": user,
"location": location,
"favs": favs,
"counter": counter,
"features": features,
"timestamp": now,
"duration": duration,
}
payload := Payload{data}
gotStr, err := payload.GetString("greeting")
if gotStr != "Hello" || err != nil {
t.Errorf("Payload.GetString(%q) = %v, %v, want %v, nil",
"greeting", gotStr, err, "Hello")
tests := []payloadTest{
{
data: map[string]interface{}{"features": features},
key: "features",
nonkey: "unknown",
},
}
gotInt, err := payload.GetInt("user_id")
if gotInt != 9876 || err != nil {
t.Errorf("Payload.GetInt(%q) = %v, %v, want, %v, nil",
"user_id", gotInt, err, 9876)
}
for _, tc := range tests {
payload := Payload{tc.data}
gotFloat, err := payload.GetFloat64("pi")
if gotFloat != 3.1415 || err != nil {
t.Errorf("Payload.GetFloat64(%q) = %v, %v, want, %v, nil",
"pi", gotFloat, err, 3.141592)
}
got, err := payload.GetStringMapBool(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetStringMapBool(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
gotBool, err := payload.GetBool("enabled")
if gotBool != false || err != nil {
t.Errorf("Payload.GetBool(%q) = %v, %v, want, %v, nil",
"enabled", gotBool, err, false)
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
b, err := json.Marshal(in)
if err != nil {
t.Fatal(err)
}
var out base.TaskMessage
err = json.Unmarshal(b, &out)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetStringMapBool(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetStringMapBool(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
gotStrSlice, err := payload.GetStringSlice("names")
if diff := cmp.Diff(gotStrSlice, names); diff != "" {
t.Errorf("Payload.GetStringSlice(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
"names", gotStrSlice, err, names, diff)
}
gotIntSlice, err := payload.GetIntSlice("primes")
if diff := cmp.Diff(gotIntSlice, primes); diff != "" {
t.Errorf("Payload.GetIntSlice(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
"primes", gotIntSlice, err, primes, diff)
}
gotStrMap, err := payload.GetStringMap("user")
if diff := cmp.Diff(gotStrMap, user); diff != "" {
t.Errorf("Payload.GetStringMap(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
"user", gotStrMap, err, user, diff)
}
gotStrMapStr, err := payload.GetStringMapString("location")
if diff := cmp.Diff(gotStrMapStr, location); diff != "" {
t.Errorf("Payload.GetStringMapString(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
"location", gotStrMapStr, err, location, diff)
}
gotStrMapStrSlice, err := payload.GetStringMapStringSlice("favs")
if diff := cmp.Diff(gotStrMapStrSlice, favs); diff != "" {
t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
"favs", gotStrMapStrSlice, err, favs, diff)
}
gotStrMapInt, err := payload.GetStringMapInt("counter")
if diff := cmp.Diff(gotStrMapInt, counter); diff != "" {
t.Errorf("Payload.GetStringMapInt(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
"counter", gotStrMapInt, err, counter, diff)
}
gotStrMapBool, err := payload.GetStringMapBool("features")
if diff := cmp.Diff(gotStrMapBool, features); diff != "" {
t.Errorf("Payload.GetStringMapBool(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
"features", gotStrMapBool, err, features, diff)
}
gotTime, err := payload.GetTime("timestamp")
if !gotTime.Equal(now) {
t.Errorf("Payload.GetTime(%q) = %v, %v, want %v, nil",
"timestamp", gotTime, err, now)
}
gotDuration, err := payload.GetDuration("duration")
if gotDuration != duration {
t.Errorf("Payload.GetDuration(%q) = %v, %v, want %v, nil",
"duration", gotDuration, err, duration)
// access non-existent key.
got, err = payload.GetStringMapBool(tc.nonkey)
if err == nil || got != nil {
t.Errorf("Payload.GetStringMapBool(%q) = %v, %v; want nil, error",
tc.key, got, err)
}
}
}
func TestPayloadGetWithMarshaling(t *testing.T) {
names := []string{"luke", "anakin", "rey"}
primes := []int{2, 3, 5, 7, 11, 13, 17}
user := map[string]interface{}{"name": "Ken", "score": 3.14}
location := map[string]string{"address": "123 Main St.", "state": "NY", "zipcode": "10002"}
favs := map[string][]string{
"movies": []string{"forrest gump", "star wars"},
"tv_shows": []string{"game of throwns", "HIMYM", "breaking bad"},
}
counter := map[string]int{
"a": 1,
"b": 101,
"c": 42,
}
features := map[string]bool{
"A": false,
"B": true,
"C": true,
}
now := time.Now()
duration := 15 * time.Minute
in := Payload{map[string]interface{}{
"subject": "Hello",
"recipient_id": 9876,
"pi": 3.14,
"enabled": true,
"names": names,
"primes": primes,
"user": user,
"location": location,
"favs": favs,
"counter": counter,
"features": features,
"timestamp": now,
"duration": duration,
}}
// encode and then decode task messsage
inMsg := h.NewTaskMessage("testing", in.data)
data, err := json.Marshal(inMsg)
if err != nil {
t.Fatal(err)
}
var outMsg base.TaskMessage
err = json.Unmarshal(data, &outMsg)
if err != nil {
t.Fatal(err)
}
out := Payload{outMsg.Payload}
gotStr, err := out.GetString("subject")
if gotStr != "Hello" || err != nil {
t.Errorf("Payload.GetString(%q) = %v, %v; want %q, nil",
"subject", gotStr, err, "Hello")
func TestPayloadTime(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"current": time.Now()},
key: "current",
nonkey: "unknown",
},
}
gotInt, err := out.GetInt("recipient_id")
if gotInt != 9876 || err != nil {
t.Errorf("Payload.GetInt(%q) = %v, %v; want %v, nil",
"recipient_id", gotInt, err, 9876)
}
for _, tc := range tests {
payload := Payload{tc.data}
gotFloat, err := out.GetFloat64("pi")
if gotFloat != 3.14 || err != nil {
t.Errorf("Payload.GetFloat64(%q) = %v, %v; want %v, nil",
"pi", gotFloat, err, 3.14)
}
got, err := payload.GetTime(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetTime(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
gotBool, err := out.GetBool("enabled")
if gotBool != true || err != nil {
t.Errorf("Payload.GetBool(%q) = %v, %v; want %v, nil",
"enabled", gotBool, err, true)
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
b, err := json.Marshal(in)
if err != nil {
t.Fatal(err)
}
var out base.TaskMessage
err = json.Unmarshal(b, &out)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetTime(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetTime(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
gotStrSlice, err := out.GetStringSlice("names")
if diff := cmp.Diff(gotStrSlice, names); diff != "" {
t.Errorf("Payload.GetStringSlice(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
"names", gotStrSlice, err, names, diff)
}
gotIntSlice, err := out.GetIntSlice("primes")
if diff := cmp.Diff(gotIntSlice, primes); diff != "" {
t.Errorf("Payload.GetIntSlice(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
"primes", gotIntSlice, err, primes, diff)
}
gotStrMap, err := out.GetStringMap("user")
if diff := cmp.Diff(gotStrMap, user); diff != "" {
t.Errorf("Payload.GetStringMap(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
"user", gotStrMap, err, user, diff)
}
gotStrMapStr, err := out.GetStringMapString("location")
if diff := cmp.Diff(gotStrMapStr, location); diff != "" {
t.Errorf("Payload.GetStringMapString(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
"location", gotStrMapStr, err, location, diff)
}
gotStrMapStrSlice, err := out.GetStringMapStringSlice("favs")
if diff := cmp.Diff(gotStrMapStrSlice, favs); diff != "" {
t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
"favs", gotStrMapStrSlice, err, favs, diff)
}
gotStrMapInt, err := out.GetStringMapInt("counter")
if diff := cmp.Diff(gotStrMapInt, counter); diff != "" {
t.Errorf("Payload.GetStringMapInt(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
"counter", gotStrMapInt, err, counter, diff)
}
gotStrMapBool, err := out.GetStringMapBool("features")
if diff := cmp.Diff(gotStrMapBool, features); diff != "" {
t.Errorf("Payload.GetStringMapBool(%q) = %v, %v, want %v, nil;\n(-want,+got)\n%s",
"features", gotStrMapBool, err, features, diff)
}
gotTime, err := out.GetTime("timestamp")
if !gotTime.Equal(now) {
t.Errorf("Payload.GetTime(%q) = %v, %v, want %v, nil",
"timestamp", gotTime, err, now)
}
gotDuration, err := out.GetDuration("duration")
if gotDuration != duration {
t.Errorf("Payload.GetDuration(%q) = %v, %v, want %v, nil",
"duration", gotDuration, err, duration)
// access non-existent key.
got, err = payload.GetTime(tc.nonkey)
if err == nil || !got.IsZero() {
t.Errorf("Payload.GetTime(%q) = %v, %v; want %v, error",
tc.key, got, err, time.Time{})
}
}
}
func TestPayloadKeyNotFound(t *testing.T) {
payload := Payload{nil}
key := "something"
gotStr, err := payload.GetString(key)
if err == nil || gotStr != "" {
t.Errorf("Payload.GetString(%q) = %v, %v; want '', error",
key, gotStr, err)
func TestPayloadDuration(t *testing.T) {
tests := []payloadTest{
{
data: map[string]interface{}{"duration": 15 * time.Minute},
key: "duration",
nonkey: "unknown",
},
}
gotInt, err := payload.GetInt(key)
if err == nil || gotInt != 0 {
t.Errorf("Payload.GetInt(%q) = %v, %v; want 0, error",
key, gotInt, err)
}
for _, tc := range tests {
payload := Payload{tc.data}
gotFloat, err := payload.GetFloat64(key)
if err == nil || gotFloat != 0 {
t.Errorf("Payload.GetFloat64(%q = %v, %v; want 0, error",
key, gotFloat, err)
}
got, err := payload.GetDuration(tc.key)
diff := cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("Payload.GetDuration(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
gotBool, err := payload.GetBool(key)
if err == nil || gotBool != false {
t.Errorf("Payload.GetBool(%q) = %v, %v; want false, error",
key, gotBool, err)
}
// encode and then decode task messsage.
in := h.NewTaskMessage("testing", tc.data)
b, err := json.Marshal(in)
if err != nil {
t.Fatal(err)
}
var out base.TaskMessage
err = json.Unmarshal(b, &out)
if err != nil {
t.Fatal(err)
}
payload = Payload{out.Payload}
got, err = payload.GetDuration(tc.key)
diff = cmp.Diff(got, tc.data[tc.key])
if err != nil || diff != "" {
t.Errorf("With Marshaling: Payload.GetDuration(%q) = %v, %v, want %v, nil",
tc.key, got, err, tc.data[tc.key])
}
gotStrSlice, err := payload.GetStringSlice(key)
if err == nil || gotStrSlice != nil {
t.Errorf("Payload.GetStringSlice(%q) = %v, %v; want nil, error",
key, gotStrSlice, err)
}
gotIntSlice, err := payload.GetIntSlice(key)
if err == nil || gotIntSlice != nil {
t.Errorf("Payload.GetIntSlice(%q) = %v, %v; want nil, error",
key, gotIntSlice, err)
}
gotStrMap, err := payload.GetStringMap(key)
if err == nil || gotStrMap != nil {
t.Errorf("Payload.GetStringMap(%q) = %v, %v; want nil, error",
key, gotStrMap, err)
}
gotStrMapStr, err := payload.GetStringMapString(key)
if err == nil || gotStrMapStr != nil {
t.Errorf("Payload.GetStringMapString(%q) = %v, %v; want nil, error",
key, gotStrMapStr, err)
}
gotStrMapStrSlice, err := payload.GetStringMapStringSlice(key)
if err == nil || gotStrMapStrSlice != nil {
t.Errorf("Payload.GetStringMapStringSlice(%q) = %v, %v; want nil, error",
key, gotStrMapStrSlice, err)
}
gotStrMapInt, err := payload.GetStringMapInt(key)
if err == nil || gotStrMapInt != nil {
t.Errorf("Payload.GetStringMapInt(%q) = %v, %v, want nil, error",
key, gotStrMapInt, err)
}
gotStrMapBool, err := payload.GetStringMapBool(key)
if err == nil || gotStrMapBool != nil {
t.Errorf("Payload.GetStringMapBool(%q) = %v, %v, want nil, error",
key, gotStrMapBool, err)
}
gotTime, err := payload.GetTime(key)
if err == nil || !gotTime.IsZero() {
t.Errorf("Payload.GetTime(%q) = %v, %v, want %v, error",
key, gotTime, err, time.Time{})
}
gotDuration, err := payload.GetDuration(key)
if err == nil || gotDuration != 0 {
t.Errorf("Payload.GetDuration(%q) = %v, %v, want 0, error",
key, gotDuration, err)
// access non-existent key.
got, err = payload.GetDuration(tc.nonkey)
if err == nil || got != 0 {
t.Errorf("Payload.GetDuration(%q) = %v, %v; want %v, error",
tc.key, got, err, time.Duration(0))
}
}
}

View File

@@ -18,7 +18,8 @@ import (
)
type processor struct {
rdb *rdb.RDB
logger Logger
rdb *rdb.RDB
ps *base.ProcessState
@@ -61,7 +62,7 @@ type processor struct {
type retryDelayFunc func(n int, err error, task *Task) time.Duration
// newProcessor constructs a new processor.
func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc,
func newProcessor(l Logger, r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc,
syncCh chan<- *syncRequest, c *base.Cancelations, errHandler ErrorHandler) *processor {
info := ps.Get()
qcfg := normalizeQueueCfg(info.Queues)
@@ -70,6 +71,7 @@ func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc,
orderedQueues = sortByPriority(qcfg)
}
return &processor{
logger: l,
rdb: r,
ps: ps,
queueConfig: qcfg,
@@ -91,7 +93,7 @@ func newProcessor(r *rdb.RDB, ps *base.ProcessState, fn retryDelayFunc,
// It's safe to call this method multiple times.
func (p *processor) stop() {
p.once.Do(func() {
logger.info("Processor shutting down...")
p.logger.Info("Processor shutting down...")
// Unblock if processor is waiting for sema token.
close(p.abort)
// Signal the processor goroutine to stop processing tasks
@@ -107,7 +109,7 @@ func (p *processor) terminate() {
// IDEA: Allow user to customize this timeout value.
const timeout = 8 * time.Second
time.AfterFunc(timeout, func() { close(p.quit) })
logger.info("Waiting for all workers to finish...")
p.logger.Info("Waiting for all workers to finish...")
// send cancellation signal to all in-progress task handlers
for _, cancel := range p.cancelations.GetAll() {
@@ -118,7 +120,7 @@ func (p *processor) terminate() {
for i := 0; i < cap(p.sema); i++ {
p.sema <- struct{}{}
}
logger.info("All workers have finished")
p.logger.Info("All workers have finished")
p.restore() // move any unfinished tasks back to the queue.
}
@@ -132,7 +134,7 @@ func (p *processor) start(wg *sync.WaitGroup) {
for {
select {
case <-p.done:
logger.info("Processor done")
p.logger.Info("Processor done")
return
default:
p.exec()
@@ -158,7 +160,7 @@ func (p *processor) exec() {
}
if err != nil {
if p.errLogLimiter.Allow() {
logger.error("Dequeue error: %v", err)
p.logger.Error("Dequeue error: %v", err)
}
return
}
@@ -188,7 +190,7 @@ func (p *processor) exec() {
select {
case <-p.quit:
// time is up, quit this worker goroutine.
logger.warn("Quitting worker to process task id=%s", msg.ID)
p.logger.Warn("Quitting worker. task id=%s", msg.ID)
return
case resErr := <-resCh:
// Note: One of three things should happen.
@@ -217,17 +219,17 @@ func (p *processor) exec() {
func (p *processor) restore() {
n, err := p.rdb.RequeueAll()
if err != nil {
logger.error("Could not restore unfinished tasks: %v", err)
p.logger.Error("Could not restore unfinished tasks: %v", err)
}
if n > 0 {
logger.info("Restored %d unfinished tasks back to queue", n)
p.logger.Info("Restored %d unfinished tasks back to queue", n)
}
}
func (p *processor) requeue(msg *base.TaskMessage) {
err := p.rdb.Requeue(msg)
if err != nil {
logger.error("Could not push task id=%s back to queue: %v", msg.ID, err)
p.logger.Error("Could not push task id=%s back to queue: %v", msg.ID, err)
}
}
@@ -235,7 +237,7 @@ func (p *processor) markAsDone(msg *base.TaskMessage) {
err := p.rdb.Done(msg)
if err != nil {
errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue)
logger.warn("%s; Will retry syncing", errMsg)
p.logger.Warn("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.rdb.Done(msg)
@@ -251,7 +253,7 @@ func (p *processor) retry(msg *base.TaskMessage, e error) {
err := p.rdb.Retry(msg, retryAt, e.Error())
if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.RetryQueue)
logger.warn("%s; Will retry syncing", errMsg)
p.logger.Warn("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.rdb.Retry(msg, retryAt, e.Error())
@@ -262,11 +264,11 @@ func (p *processor) retry(msg *base.TaskMessage, e error) {
}
func (p *processor) kill(msg *base.TaskMessage, e error) {
logger.warn("Retry exhausted for task id=%s", msg.ID)
p.logger.Warn("Retry exhausted for task id=%s", msg.ID)
err := p.rdb.Kill(msg, e.Error())
if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)
logger.warn("%s; Will retry syncing", errMsg)
p.logger.Warn("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{
fn: func() error {
return p.rdb.Kill(msg, e.Error())
@@ -391,14 +393,18 @@ func gcd(xs ...int) int {
}
// createContext returns a context and cancel function for a given task message.
func createContext(msg *base.TaskMessage) (context.Context, context.CancelFunc) {
func createContext(msg *base.TaskMessage) (ctx context.Context, cancel context.CancelFunc) {
ctx = context.Background()
timeout, err := time.ParseDuration(msg.Timeout)
if err != nil {
logger.error("cannot parse timeout duration for %+v", msg)
return context.WithCancel(context.Background())
if err == nil && timeout != 0 {
ctx, cancel = context.WithTimeout(ctx, timeout)
}
if timeout == 0 {
return context.WithCancel(context.Background())
deadline, err := time.Parse(time.RFC3339, msg.Deadline)
if err == nil && !deadline.IsZero() {
ctx, cancel = context.WithDeadline(ctx, deadline)
}
return context.WithTimeout(context.Background(), timeout)
if cancel == nil {
ctx, cancel = context.WithCancel(ctx)
}
return ctx, cancel
}

View File

@@ -17,6 +17,7 @@ import (
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
"github.com/rs/xid"
)
func TestProcessorSuccess(t *testing.T) {
@@ -68,7 +69,7 @@ func TestProcessorSuccess(t *testing.T) {
}
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
cancelations := base.NewCancelations()
p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations, nil)
p := newProcessor(testLogger, rdbClient, ps, defaultDelayFunc, nil, cancelations, nil)
p.handler = HandlerFunc(handler)
var wg sync.WaitGroup
@@ -166,7 +167,7 @@ func TestProcessorRetry(t *testing.T) {
}
ps := base.NewProcessState("localhost", 1234, 10, defaultQueueConfig, false)
cancelations := base.NewCancelations()
p := newProcessor(rdbClient, ps, delayFunc, nil, cancelations, ErrorHandlerFunc(errHandler))
p := newProcessor(testLogger, rdbClient, ps, delayFunc, nil, cancelations, ErrorHandlerFunc(errHandler))
p.handler = tc.handler
var wg sync.WaitGroup
@@ -232,7 +233,7 @@ func TestProcessorQueues(t *testing.T) {
for _, tc := range tests {
cancelations := base.NewCancelations()
ps := base.NewProcessState("localhost", 1234, 10, tc.queueCfg, false)
p := newProcessor(nil, ps, defaultDelayFunc, nil, cancelations, nil)
p := newProcessor(testLogger, nil, ps, defaultDelayFunc, nil, cancelations, nil)
got := p.queues()
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
@@ -300,7 +301,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
// Note: Set concurrency to 1 to make sure tasks are processed one at a time.
cancelations := base.NewCancelations()
ps := base.NewProcessState("localhost", 1234, 1 /* concurrency */, queueCfg, true /*strict*/)
p := newProcessor(rdbClient, ps, defaultDelayFunc, nil, cancelations, nil)
p := newProcessor(testLogger, rdbClient, ps, defaultDelayFunc, nil, cancelations, nil)
p.handler = HandlerFunc(handler)
var wg sync.WaitGroup
@@ -363,3 +364,85 @@ func TestPerform(t *testing.T) {
}
}
}
func TestCreateContextWithTimeRestrictions(t *testing.T) {
var (
noTimeout = time.Duration(0)
noDeadline = time.Time{}
)
tests := []struct {
desc string
timeout time.Duration
deadline time.Time
wantDeadline time.Time
}{
{"only with timeout", 10 * time.Second, noDeadline, time.Now().Add(10 * time.Second)},
{"only with deadline", noTimeout, time.Now().Add(time.Hour), time.Now().Add(time.Hour)},
{"with timeout and deadline (timeout < deadline)", 10 * time.Second, time.Now().Add(time.Hour), time.Now().Add(10 * time.Second)},
{"with timeout and deadline (timeout > deadline)", 10 * time.Minute, time.Now().Add(30 * time.Second), time.Now().Add(30 * time.Second)},
}
for _, tc := range tests {
msg := &base.TaskMessage{
Type: "something",
ID: xid.New(),
Timeout: tc.timeout.String(),
Deadline: tc.deadline.Format(time.RFC3339),
}
ctx, cancel := createContext(msg)
select {
case x := <-ctx.Done():
t.Errorf("%s: <-ctx.Done() == %v, want nothing (it should block)", tc.desc, x)
default:
}
got, ok := ctx.Deadline()
if !ok {
t.Errorf("%s: ctx.Deadline() returned false, want deadline to be set", tc.desc)
}
if !cmp.Equal(tc.wantDeadline, got, cmpopts.EquateApproxTime(time.Second)) {
t.Errorf("%s: ctx.Deadline() returned %v, want %v", tc.desc, got, tc.wantDeadline)
}
cancel()
select {
case <-ctx.Done():
default:
t.Errorf("ctx.Done() blocked, want it to be non-blocking")
}
}
}
func TestCreateContextWithoutTimeRestrictions(t *testing.T) {
msg := &base.TaskMessage{
Type: "something",
ID: xid.New(),
Timeout: time.Duration(0).String(), // zero value to indicate no timeout
Deadline: time.Time{}.Format(time.RFC3339), // zero value to indicate no deadline
}
ctx, cancel := createContext(msg)
select {
case x := <-ctx.Done():
t.Errorf("<-ctx.Done() == %v, want nothing (it should block)", x)
default:
}
_, ok := ctx.Deadline()
if ok {
t.Error("ctx.Deadline() returned true, want deadline to not be set")
}
cancel()
select {
case <-ctx.Done():
default:
t.Error("ctx.Done() blocked, want it to be non-blocking")
}
}

View File

@@ -12,7 +12,8 @@ import (
)
type scheduler struct {
rdb *rdb.RDB
logger Logger
rdb *rdb.RDB
// channel to communicate back to the long running "scheduler" goroutine.
done chan struct{}
@@ -24,12 +25,13 @@ type scheduler struct {
qnames []string
}
func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *scheduler {
func newScheduler(l Logger, r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *scheduler {
var qnames []string
for q := range qcfg {
qnames = append(qnames, q)
}
return &scheduler{
logger: l,
rdb: r,
done: make(chan struct{}),
avgInterval: avgInterval,
@@ -38,7 +40,7 @@ func newScheduler(r *rdb.RDB, avgInterval time.Duration, qcfg map[string]int) *s
}
func (s *scheduler) terminate() {
logger.info("Scheduler shutting down...")
s.logger.Info("Scheduler shutting down...")
// Signal the scheduler goroutine to stop polling.
s.done <- struct{}{}
}
@@ -51,7 +53,7 @@ func (s *scheduler) start(wg *sync.WaitGroup) {
for {
select {
case <-s.done:
logger.info("Scheduler done")
s.logger.Info("Scheduler done")
return
case <-time.After(s.avgInterval):
s.exec()
@@ -62,6 +64,6 @@ func (s *scheduler) start(wg *sync.WaitGroup) {
func (s *scheduler) exec() {
if err := s.rdb.CheckAndEnqueue(s.qnames...); err != nil {
logger.error("Could not enqueue scheduled tasks: %v", err)
s.logger.Error("Could not enqueue scheduled tasks: %v", err)
}
}

View File

@@ -19,7 +19,7 @@ func TestScheduler(t *testing.T) {
r := setup(t)
rdbClient := rdb.NewRDB(r)
const pollInterval = time.Second
s := newScheduler(rdbClient, pollInterval, defaultQueueConfig)
s := newScheduler(testLogger, rdbClient, pollInterval, defaultQueueConfig)
t1 := h.NewTaskMessage("gen_thumbnail", nil)
t2 := h.NewTaskMessage("send_email", nil)
t3 := h.NewTaskMessage("reindex", nil)

View File

@@ -23,9 +23,10 @@ import (
// "images:thumbnails" and the former will receive tasks with type name beginning
// with "images".
type ServeMux struct {
mu sync.RWMutex
m map[string]muxEntry
es []muxEntry // slice of entries sorted from longest to shortest.
mu sync.RWMutex
m map[string]muxEntry
es []muxEntry // slice of entries sorted from longest to shortest.
mws []MiddlewareFunc
}
type muxEntry struct {
@@ -33,6 +34,11 @@ type muxEntry struct {
pattern string
}
// MiddlewareFunc is a function which receives an asynq.Handler and returns another asynq.Handler.
// Typically, the returned handler is a closure which does something with the context and task passed
// to it, and then calls the handler passed as parameter to the MiddlewareFunc.
type MiddlewareFunc func(Handler) Handler
// NewServeMux allocates and returns a new ServeMux.
func NewServeMux() *ServeMux {
return new(ServeMux)
@@ -60,6 +66,9 @@ func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string) {
if h == nil {
h, pattern = NotFoundHandler(), ""
}
for i := len(mux.mws) - 1; i >= 0; i-- {
h = mux.mws[i](h)
}
return h, pattern
}
@@ -130,6 +139,16 @@ func (mux *ServeMux) HandleFunc(pattern string, handler func(context.Context, *T
mux.Handle(pattern, HandlerFunc(handler))
}
// Use appends a MiddlewareFunc to the chain.
// Middlewares are executed in the order that they are applied to the ServeMux.
func (mux *ServeMux) Use(mws ...MiddlewareFunc) {
mux.mu.Lock()
defer mux.mu.Unlock()
for _, fn := range mws {
mux.mws = append(mux.mws, fn)
}
}
// NotFound returns an error indicating that the handler was not found for the given task.
func NotFound(ctx context.Context, task *Task) error {
return fmt.Errorf("handler not found for task %q", task.Type)

View File

@@ -7,9 +7,12 @@ package asynq
import (
"context"
"testing"
"github.com/google/go-cmp/cmp"
)
var called string
var called string // identity of the handler that was called.
var invoked []string // list of middlewares in the order they were invoked.
// makeFakeHandler returns a handler that updates the global called variable
// to the given identity.
@@ -20,6 +23,17 @@ func makeFakeHandler(identity string) Handler {
})
}
// makeFakeMiddleware returns a middleware function that appends the given identity
//to the global invoked slice.
func makeFakeMiddleware(identity string) MiddlewareFunc {
return func(next Handler) Handler {
return HandlerFunc(func(ctx context.Context, t *Task) error {
invoked = append(invoked, identity)
return next.ProcessTask(ctx, t)
})
}
}
// A list of pattern, handler pair that is registered with mux.
var serveMuxRegister = []struct {
pattern string
@@ -114,3 +128,43 @@ func TestServeMuxNotFound(t *testing.T) {
}
}
}
var middlewareTests = []struct {
typename string // task's type name
middlewares []string // middlewares to use. They should be called in this order.
want string // identifier of the handler that should be called
}{
{"email:signup", []string{"logging", "expiration"}, "signup email handler"},
{"csv:export", []string{}, "csv export handler"},
{"email:daily", []string{"expiration", "logging"}, "default email handler"},
}
func TestServeMuxMiddlewares(t *testing.T) {
for _, tc := range middlewareTests {
mux := NewServeMux()
for _, e := range serveMuxRegister {
mux.Handle(e.pattern, e.h)
}
var mws []MiddlewareFunc
for _, s := range tc.middlewares {
mws = append(mws, makeFakeMiddleware(s))
}
mux.Use(mws...)
invoked = []string{} // reset to empty slice
called = "" // reset to zero value
task := NewTask(tc.typename, nil)
if err := mux.ProcessTask(context.Background(), task); err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(invoked, tc.middlewares); diff != "" {
t.Errorf("invoked middlewares were %v, want %v", invoked, tc.middlewares)
}
if called != tc.want {
t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type, tc.want)
}
}
}

View File

@@ -12,7 +12,8 @@ import (
)
type subscriber struct {
rdb *rdb.RDB
logger Logger
rdb *rdb.RDB
// channel to communicate back to the long running "subscriber" goroutine.
done chan struct{}
@@ -21,8 +22,9 @@ type subscriber struct {
cancelations *base.Cancelations
}
func newSubscriber(rdb *rdb.RDB, cancelations *base.Cancelations) *subscriber {
func newSubscriber(l Logger, rdb *rdb.RDB, cancelations *base.Cancelations) *subscriber {
return &subscriber{
logger: l,
rdb: rdb,
done: make(chan struct{}),
cancelations: cancelations,
@@ -30,7 +32,7 @@ func newSubscriber(rdb *rdb.RDB, cancelations *base.Cancelations) *subscriber {
}
func (s *subscriber) terminate() {
logger.info("Subscriber shutting down...")
s.logger.Info("Subscriber shutting down...")
// Signal the subscriber goroutine to stop.
s.done <- struct{}{}
}
@@ -39,7 +41,7 @@ func (s *subscriber) start(wg *sync.WaitGroup) {
pubsub, err := s.rdb.CancelationPubSub()
cancelCh := pubsub.Channel()
if err != nil {
logger.error("cannot subscribe to cancelation channel: %v", err)
s.logger.Error("cannot subscribe to cancelation channel: %v", err)
return
}
wg.Add(1)
@@ -49,7 +51,7 @@ func (s *subscriber) start(wg *sync.WaitGroup) {
select {
case <-s.done:
pubsub.Close()
logger.info("Subscriber done")
s.logger.Info("Subscriber done")
return
case msg := <-cancelCh:
cancel, ok := s.cancelations.Get(msg.Payload)

View File

@@ -37,7 +37,7 @@ func TestSubscriber(t *testing.T) {
cancelations := base.NewCancelations()
cancelations.Add(tc.registeredID, fakeCancelFunc)
subscriber := newSubscriber(rdbClient, cancelations)
subscriber := newSubscriber(testLogger, rdbClient, cancelations)
var wg sync.WaitGroup
subscriber.start(&wg)

View File

@@ -12,6 +12,8 @@ import (
// syncer is responsible for queuing up failed requests to redis and retry
// those requests to sync state between the background process and redis.
type syncer struct {
logger Logger
requestsCh <-chan *syncRequest
// channel to communicate back to the long running "syncer" goroutine.
@@ -26,8 +28,9 @@ type syncRequest struct {
errMsg string // error message
}
func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
func newSyncer(l Logger, requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
return &syncer{
logger: l,
requestsCh: requestsCh,
done: make(chan struct{}),
interval: interval,
@@ -35,7 +38,7 @@ func newSyncer(requestsCh <-chan *syncRequest, interval time.Duration) *syncer {
}
func (s *syncer) terminate() {
logger.info("Syncer shutting down...")
s.logger.Info("Syncer shutting down...")
// Signal the syncer goroutine to stop.
s.done <- struct{}{}
}
@@ -51,10 +54,10 @@ func (s *syncer) start(wg *sync.WaitGroup) {
// Try sync one last time before shutting down.
for _, req := range requests {
if err := req.fn(); err != nil {
logger.error(req.errMsg)
s.logger.Error(req.errMsg)
}
}
logger.info("Syncer done")
s.logger.Info("Syncer done")
return
case req := <-s.requestsCh:
requests = append(requests, req)

View File

@@ -27,7 +27,7 @@ func TestSyncer(t *testing.T) {
const interval = time.Second
syncRequestCh := make(chan *syncRequest)
syncer := newSyncer(syncRequestCh, interval)
syncer := newSyncer(testLogger, syncRequestCh, interval)
var wg sync.WaitGroup
syncer.start(&wg)
defer syncer.terminate()
@@ -52,7 +52,7 @@ func TestSyncer(t *testing.T) {
func TestSyncerRetry(t *testing.T) {
const interval = time.Second
syncRequestCh := make(chan *syncRequest)
syncer := newSyncer(syncRequestCh, interval)
syncer := newSyncer(testLogger, syncRequestCh, interval)
var wg sync.WaitGroup
syncer.start(&wg)

View File

@@ -22,7 +22,7 @@ var workersCmd = &cobra.Command{
Short: "Shows all running workers information",
Long: `Workers (asynqmon workers) will show all running workers information.
The command shows the follwoing for each worker:
The command shows the following for each worker:
* Process in which the worker is running
* ID of the task worker is processing
* Type of the task worker is processing