2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-20 21:26:14 +08:00

Compare commits

..

24 Commits

Author SHA1 Message Date
Ken Hibino
04702ddfd2 Change ErrorHandler function signature 2020-07-04 05:53:50 -07:00
Ken Hibino
6705f7c27a Return Result struct to caller of Enqueue 2020-07-03 21:49:53 -07:00
Ken Hibino
e27ae0d33a Replace github.com/rs/xid with github.com/google/uuid 2020-07-02 06:38:13 -07:00
Ken Hibino
6cd0ab65a3 Add version command to CLI 2020-06-29 20:59:15 -07:00
Ken Hibino
83c9d5ae94 Add migrate command to CLI
The command converts all messages in redis to be compatible for asynq
v0.10.0
2020-06-29 06:11:47 -07:00
Ken Hibino
7eebbf181e Update docs 2020-06-29 06:11:47 -07:00
Ken Hibino
7b1770da96 Minor code cleanup 2020-06-29 06:11:47 -07:00
Ken Hibino
e2c5882368 Use int64 type for Timeout and Deadline in TaskMessage 2020-06-29 06:11:47 -07:00
Ken Hibino
50df107ace Clean up processor test 2020-06-29 06:11:47 -07:00
Ken Hibino
9699d196e5 Add recoverer 2020-06-29 06:11:47 -07:00
Ken Hibino
1c5f7a791b Add RDB.ListDeadlineExceeded 2020-06-29 06:11:47 -07:00
Ken Hibino
232efe8279 Fix processor 2020-06-29 06:11:47 -07:00
Ken Hibino
ef4a4a8334 Add deadline to syncRequest
- syncer will drop a request if its deadline has been exceeded
2020-06-29 06:11:47 -07:00
Ken Hibino
65e17a3469 Update processor to adapt for deadlines set change
- Processor dequeues tasks only when it's available to process
- Processor retries a task when its context's Done channel is closed
2020-06-29 06:11:47 -07:00
Ken Hibino
88d94a2a9d Update RDB.Requeue to remove message from deadlines set 2020-06-29 06:11:47 -07:00
Ken Hibino
7433b94aac Update RDB.Dequeue to return deadline as time.Time 2020-06-29 06:11:47 -07:00
Ken Hibino
08ac7793ab Update RDB.Kill to remove message from deadlines set 2020-06-29 06:11:47 -07:00
Ken Hibino
02b653df72 Update RDB.Retry to remove message from deadlines set 2020-06-29 06:11:47 -07:00
Ken Hibino
bee784c052 Update RDB.Done to remove message from deadlines set 2020-06-29 06:11:47 -07:00
Ken Hibino
4ea58052f8 Update RDB.Dequeue to return message and deadline 2020-06-29 06:11:47 -07:00
Ken Hibino
5afb4861a5 Add task message to deadlines set on dequeue
Updated dequeueCmd to decode the message and compute its deadline and add
the message to the Deadline set.
2020-06-29 06:11:47 -07:00
Ken Hibino
68e6b379fc Use default timeout of 30mins if both timeout and deadline are not
provided
2020-06-29 06:11:47 -07:00
Ken Hibino
0e70a14899 Change TaskMessage Timeout and Deadline to int
* This change breaks existing tasks in Redis
2020-06-29 06:11:47 -07:00
Ken Hibino
f01c7b8e66 Add redis key for deadlines in base package 2020-06-29 06:11:47 -07:00
28 changed files with 966 additions and 3125 deletions

View File

@@ -7,16 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
## [0.11.0] - 2020-07-28
### Added
- `Inspector` type was added to monitor and mutate state of queues and tasks.
- `HealthCheckFunc` and `HealthCheckInterval` fields were added to `Config` to allow user to specify a callback
function to check for broker connection.
## [0.10.0] - 2020-07-06
### Changed ### Changed
- All tasks now requires timeout or deadline. By default, timeout is set to 30 mins. - All tasks now requires timeout or deadline. By default, timeout is set to 30 mins.

View File

@@ -34,7 +34,6 @@ A system can consist of multiple worker servers and brokers, giving way to high
- Scheduling of tasks - Scheduling of tasks
- Durability since tasks are written to Redis - Durability since tasks are written to Redis
- [Retries](https://github.com/hibiken/asynq/wiki/Task-Retry) of failed tasks - [Retries](https://github.com/hibiken/asynq/wiki/Task-Retry) of failed tasks
- Automatic recovery of tasks in the event of a worker crash
- [Weighted priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#weighted-priority-queues) - [Weighted priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#weighted-priority-queues)
- [Strict priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#strict-priority-queues) - [Strict priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#strict-priority-queues)
- Low latency to add a task since writes are fast in Redis - Low latency to add a task since writes are fast in Redis

View File

@@ -36,7 +36,7 @@ func TestClientEnqueueAt(t *testing.T) {
opts []Option opts []Option
wantRes *Result wantRes *Result
wantEnqueued map[string][]*base.TaskMessage wantEnqueued map[string][]*base.TaskMessage
wantScheduled []base.Z wantScheduled []h.ZSetEntry
}{ }{
{ {
desc: "Process task immediately", desc: "Process task immediately",
@@ -75,9 +75,9 @@ func TestClientEnqueueAt(t *testing.T) {
Deadline: noDeadline, Deadline: noDeadline,
}, },
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
wantScheduled: []base.Z{ wantScheduled: []h.ZSetEntry{
{ {
Message: &base.TaskMessage{ Msg: &base.TaskMessage{
Type: task.Type, Type: task.Type,
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
@@ -85,7 +85,7 @@ func TestClientEnqueueAt(t *testing.T) {
Timeout: int64(defaultTimeout.Seconds()), Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(), Deadline: noDeadline.Unix(),
}, },
Score: oneHourLater.Unix(), Score: float64(oneHourLater.Unix()),
}, },
}, },
}, },
@@ -376,7 +376,7 @@ func TestClientEnqueueIn(t *testing.T) {
opts []Option opts []Option
wantRes *Result wantRes *Result
wantEnqueued map[string][]*base.TaskMessage wantEnqueued map[string][]*base.TaskMessage
wantScheduled []base.Z wantScheduled []h.ZSetEntry
}{ }{
{ {
desc: "schedule a task to be enqueued in one hour", desc: "schedule a task to be enqueued in one hour",
@@ -390,9 +390,9 @@ func TestClientEnqueueIn(t *testing.T) {
Deadline: noDeadline, Deadline: noDeadline,
}, },
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
wantScheduled: []base.Z{ wantScheduled: []h.ZSetEntry{
{ {
Message: &base.TaskMessage{ Msg: &base.TaskMessage{
Type: task.Type, Type: task.Type,
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
@@ -400,7 +400,7 @@ func TestClientEnqueueIn(t *testing.T) {
Timeout: int64(defaultTimeout.Seconds()), Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(), Deadline: noDeadline.Unix(),
}, },
Score: time.Now().Add(time.Hour).Unix(), Score: float64(time.Now().Add(time.Hour).Unix()),
}, },
}, },
}, },

View File

@@ -1,80 +0,0 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"sync"
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
)
// healthchecker is responsible for pinging broker periodically
// and call user provided HeathCheckFunc with the ping result.
type healthchecker struct {
logger *log.Logger
broker base.Broker
// channel to communicate back to the long running "healthchecker" goroutine.
done chan struct{}
// interval between healthchecks.
interval time.Duration
// function to call periodically.
healthcheckFunc func(error)
}
type healthcheckerParams struct {
logger *log.Logger
broker base.Broker
interval time.Duration
healthcheckFunc func(error)
}
func newHealthChecker(params healthcheckerParams) *healthchecker {
return &healthchecker{
logger: params.logger,
broker: params.broker,
done: make(chan struct{}),
interval: params.interval,
healthcheckFunc: params.healthcheckFunc,
}
}
func (hc *healthchecker) terminate() {
if hc.healthcheckFunc == nil {
return
}
hc.logger.Debug("Healthchecker shutting down...")
// Signal the healthchecker goroutine to stop.
hc.done <- struct{}{}
}
func (hc *healthchecker) start(wg *sync.WaitGroup) {
if hc.healthcheckFunc == nil {
return
}
wg.Add(1)
go func() {
defer wg.Done()
timer := time.NewTimer(hc.interval)
for {
select {
case <-hc.done:
hc.logger.Debug("Healthchecker done")
timer.Stop()
return
case <-timer.C:
err := hc.broker.Ping()
hc.healthcheckFunc(err)
timer.Reset(hc.interval)
}
}
}()
}

View File

@@ -1,101 +0,0 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"sync"
"testing"
"time"
"github.com/hibiken/asynq/internal/rdb"
"github.com/hibiken/asynq/internal/testbroker"
)
func TestHealthChecker(t *testing.T) {
r := setup(t)
rdbClient := rdb.NewRDB(r)
var (
// mu guards called and e variables.
mu sync.Mutex
called int
e error
)
checkFn := func(err error) {
mu.Lock()
defer mu.Unlock()
called++
e = err
}
hc := newHealthChecker(healthcheckerParams{
logger: testLogger,
broker: rdbClient,
interval: 1 * time.Second,
healthcheckFunc: checkFn,
})
hc.start(&sync.WaitGroup{})
time.Sleep(2 * time.Second)
mu.Lock()
if called == 0 {
t.Errorf("Healthchecker did not call the provided HealthCheckFunc")
}
if e != nil {
t.Errorf("HealthCheckFunc was called with non-nil error: %v", e)
}
mu.Unlock()
hc.terminate()
}
func TestHealthCheckerWhenRedisDown(t *testing.T) {
// Make sure that healthchecker goroutine doesn't panic
// if it cannot connect to redis.
defer func() {
if r := recover(); r != nil {
t.Errorf("panic occurred: %v", r)
}
}()
r := rdb.NewRDB(setup(t))
testBroker := testbroker.NewTestBroker(r)
var (
// mu guards called and e variables.
mu sync.Mutex
called int
e error
)
checkFn := func(err error) {
mu.Lock()
defer mu.Unlock()
called++
e = err
}
hc := newHealthChecker(healthcheckerParams{
logger: testLogger,
broker: testBroker,
interval: 1 * time.Second,
healthcheckFunc: checkFn,
})
testBroker.Sleep()
hc.start(&sync.WaitGroup{})
time.Sleep(2 * time.Second)
mu.Lock()
if called == 0 {
t.Errorf("Healthchecker did not call the provided HealthCheckFunc")
}
if e == nil {
t.Errorf("HealthCheckFunc was called with nil; want non-nil error")
}
mu.Unlock()
hc.terminate()
}

View File

@@ -1,500 +0,0 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/rdb"
)
// Inspector is a client interface to inspect and mutate the state of
// queues and tasks.
type Inspector struct {
rdb *rdb.RDB
}
// New returns a new instance of Inspector.
func NewInspector(r RedisConnOpt) *Inspector {
return &Inspector{
rdb: rdb.NewRDB(createRedisClient(r)),
}
}
// Stats represents a state of queues at a certain time.
type Stats struct {
Enqueued int
InProgress int
Scheduled int
Retry int
Dead int
Processed int
Failed int
Queues []*QueueInfo
Timestamp time.Time
}
// QueueInfo holds information about a queue.
type QueueInfo struct {
// Name of the queue (e.g. "default", "critical").
// Note: It doesn't include the prefix "asynq:queues:".
Name string
// Paused indicates whether the queue is paused.
// If true, tasks in the queue should not be processed.
Paused bool
// Size is the number of tasks in the queue.
Size int
}
// CurrentStats returns a current stats of the queues.
func (i *Inspector) CurrentStats() (*Stats, error) {
stats, err := i.rdb.CurrentStats()
if err != nil {
return nil, err
}
var qs []*QueueInfo
for _, q := range stats.Queues {
qs = append(qs, (*QueueInfo)(q))
}
return &Stats{
Enqueued: stats.Enqueued,
InProgress: stats.InProgress,
Scheduled: stats.Scheduled,
Retry: stats.Retry,
Dead: stats.Dead,
Processed: stats.Processed,
Failed: stats.Failed,
Queues: qs,
Timestamp: stats.Timestamp,
}, nil
}
// DailyStats holds aggregate data for a given day.
type DailyStats struct {
Processed int
Failed int
Date time.Time
}
// History returns a list of stats from the last n days.
func (i *Inspector) History(n int) ([]*DailyStats, error) {
stats, err := i.rdb.HistoricalStats(n)
if err != nil {
return nil, err
}
var res []*DailyStats
for _, s := range stats {
res = append(res, &DailyStats{
Processed: s.Processed,
Failed: s.Failed,
Date: s.Time,
})
}
return res, nil
}
// EnqueuedTask is a task in a queue and is ready to be processed.
type EnqueuedTask struct {
*Task
ID string
Queue string
}
// InProgressTask is a task that's currently being processed.
type InProgressTask struct {
*Task
ID string
}
// ScheduledTask is a task scheduled to be processed in the future.
type ScheduledTask struct {
*Task
ID string
Queue string
NextEnqueueAt time.Time
score int64
}
// RetryTask is a task scheduled to be retried in the future.
type RetryTask struct {
*Task
ID string
Queue string
NextEnqueueAt time.Time
MaxRetry int
Retried int
ErrorMsg string
// TODO: LastFailedAt time.Time
score int64
}
// DeadTask is a task exhausted its retries.
// DeadTask won't be retried automatically.
type DeadTask struct {
*Task
ID string
Queue string
MaxRetry int
Retried int
LastFailedAt time.Time
ErrorMsg string
score int64
}
// Key returns a key used to delete, enqueue, and kill the task.
func (t *ScheduledTask) Key() string {
return fmt.Sprintf("s:%v:%v", t.ID, t.score)
}
// Key returns a key used to delete, enqueue, and kill the task.
func (t *RetryTask) Key() string {
return fmt.Sprintf("r:%v:%v", t.ID, t.score)
}
// Key returns a key used to delete, enqueue, and kill the task.
func (t *DeadTask) Key() string {
return fmt.Sprintf("d:%v:%v", t.ID, t.score)
}
// parseTaskKey parses a key string and returns each part of key with proper
// type if valid, otherwise it reports an error.
func parseTaskKey(key string) (id uuid.UUID, score int64, qtype string, err error) {
parts := strings.Split(key, ":")
if len(parts) != 3 {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
id, err = uuid.Parse(parts[1])
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
score, err = strconv.ParseInt(parts[2], 10, 64)
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
qtype = parts[0]
if len(qtype) != 1 || !strings.Contains("srd", qtype) {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
return id, score, qtype, nil
}
// ListOption specifies behavior of list operation.
type ListOption interface{}
// Internal list option representations.
type (
pageSizeOpt int
pageNumOpt int
)
type listOption struct {
pageSize int
pageNum int
}
const (
// Page size used by default in list operation.
defaultPageSize = 30
// Page number used by default in list operation.
defaultPageNum = 1
)
func composeListOptions(opts ...ListOption) listOption {
res := listOption{
pageSize: defaultPageSize,
pageNum: defaultPageNum,
}
for _, opt := range opts {
switch opt := opt.(type) {
case pageSizeOpt:
res.pageSize = int(opt)
case pageNumOpt:
res.pageNum = int(opt)
default:
// ignore unexpected option
}
}
return res
}
// PageSize returns an option to specify the page size for list operation.
//
// Negative page size is treated as zero.
func PageSize(n int) ListOption {
if n < 0 {
n = 0
}
return pageSizeOpt(n)
}
// Page returns an option to specify the page number for list operation.
// The value 1 fetches the first page.
//
// Negative page number is treated as one.
func Page(n int) ListOption {
if n < 0 {
n = 1
}
return pageNumOpt(n)
}
// ListScheduledTasks retrieves tasks in the specified queue.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*EnqueuedTask, error) {
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListEnqueued(qname, pgn)
if err != nil {
return nil, err
}
var tasks []*EnqueuedTask
for _, m := range msgs {
tasks = append(tasks, &EnqueuedTask{
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
})
}
return tasks, err
}
// ListScheduledTasks retrieves tasks currently being processed.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListInProgressTasks(opts ...ListOption) ([]*InProgressTask, error) {
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListInProgress(pgn)
if err != nil {
return nil, err
}
var tasks []*InProgressTask
for _, m := range msgs {
tasks = append(tasks, &InProgressTask{
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
})
}
return tasks, err
}
// ListScheduledTasks retrieves tasks in scheduled state.
// Tasks are sorted by NextEnqueueAt field in ascending order.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListScheduledTasks(opts ...ListOption) ([]*ScheduledTask, error) {
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListScheduled(pgn)
if err != nil {
return nil, err
}
var tasks []*ScheduledTask
for _, z := range zs {
enqueueAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &ScheduledTask{
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,
NextEnqueueAt: enqueueAt,
score: z.Score,
})
}
return tasks, nil
}
// ListScheduledTasks retrieves tasks in retry state.
// Tasks are sorted by NextEnqueueAt field in ascending order.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListRetryTasks(opts ...ListOption) ([]*RetryTask, error) {
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListRetry(pgn)
if err != nil {
return nil, err
}
var tasks []*RetryTask
for _, z := range zs {
enqueueAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &RetryTask{
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,
NextEnqueueAt: enqueueAt,
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
// TODO: LastFailedAt: z.Message.LastFailedAt
ErrorMsg: z.Message.ErrorMsg,
score: z.Score,
})
}
return tasks, nil
}
// ListScheduledTasks retrieves tasks in retry state.
// Tasks are sorted by LastFailedAt field in descending order.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListDeadTasks(opts ...ListOption) ([]*DeadTask, error) {
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListDead(pgn)
if err != nil {
return nil, err
}
var tasks []*DeadTask
for _, z := range zs {
failedAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &DeadTask{
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
LastFailedAt: failedAt,
ErrorMsg: z.Message.ErrorMsg,
score: z.Score,
})
}
return tasks, nil
return nil, nil
}
// DeleteAllScheduledTasks deletes all tasks in scheduled state,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllScheduledTasks() (int, error) {
n, err := i.rdb.DeleteAllScheduledTasks()
return int(n), err
}
// DeleteAllRetryTasks deletes all tasks in retry state,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllRetryTasks() (int, error) {
n, err := i.rdb.DeleteAllRetryTasks()
return int(n), err
}
// DeleteAllDeadTasks deletes all tasks in dead state,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllDeadTasks() (int, error) {
n, err := i.rdb.DeleteAllDeadTasks()
return int(n), err
}
// DeleteTaskByKey deletes a task with the given key.
func (i *Inspector) DeleteTaskByKey(key string) error {
id, score, qtype, err := parseTaskKey(key)
if err != nil {
return err
}
switch qtype {
case "s":
return i.rdb.DeleteScheduledTask(id, score)
case "r":
return i.rdb.DeleteRetryTask(id, score)
case "d":
return i.rdb.DeleteDeadTask(id, score)
default:
return fmt.Errorf("invalid key")
}
}
// EnqueueAllScheduledTasks enqueues all tasks in the scheduled state,
// and reports the number of tasks enqueued.
func (i *Inspector) EnqueueAllScheduledTasks() (int, error) {
n, err := i.rdb.EnqueueAllScheduledTasks()
return int(n), err
}
// EnqueueAllRetryTasks enqueues all tasks in the retry state,
// and reports the number of tasks enqueued.
func (i *Inspector) EnqueueAllRetryTasks() (int, error) {
n, err := i.rdb.EnqueueAllRetryTasks()
return int(n), err
}
// EnqueueAllDeadTasks enqueues all tasks in the dead state,
// and reports the number of tasks enqueued.
func (i *Inspector) EnqueueAllDeadTasks() (int, error) {
n, err := i.rdb.EnqueueAllDeadTasks()
return int(n), err
}
// EnqueueTaskByKey enqueues a task with the given key.
func (i *Inspector) EnqueueTaskByKey(key string) error {
id, score, qtype, err := parseTaskKey(key)
if err != nil {
return err
}
switch qtype {
case "s":
return i.rdb.EnqueueScheduledTask(id, score)
case "r":
return i.rdb.EnqueueRetryTask(id, score)
case "d":
return i.rdb.EnqueueDeadTask(id, score)
default:
return fmt.Errorf("invalid key")
}
}
// KillAllScheduledTasks kills all tasks in scheduled state,
// and reports the number of tasks killed.
func (i *Inspector) KillAllScheduledTasks() (int, error) {
n, err := i.rdb.KillAllScheduledTasks()
return int(n), err
}
// KillAllRetryTasks kills all tasks in retry state,
// and reports the number of tasks killed.
func (i *Inspector) KillAllRetryTasks() (int, error) {
n, err := i.rdb.KillAllRetryTasks()
return int(n), err
}
// KillTaskByKey kills a task with the given key.
func (i *Inspector) KillTaskByKey(key string) error {
id, score, qtype, err := parseTaskKey(key)
if err != nil {
return err
}
switch qtype {
case "s":
return i.rdb.KillScheduledTask(id, score)
case "r":
return i.rdb.KillRetryTask(id, score)
case "d":
return fmt.Errorf("task already dead")
default:
return fmt.Errorf("invalid key")
}
}
// PauseQueue pauses task processing on the specified queue.
// If the queue is already paused, it will return a non-nil error.
func (i *Inspector) PauseQueue(qname string) error {
return i.rdb.Pause(qname)
}
// UnpauseQueue resumes task processing on the specified queue.
// If the queue is not paused, it will return a non-nil error.
func (i *Inspector) UnpauseQueue(qname string) error {
return i.rdb.Unpause(qname)
}

File diff suppressed because it is too large Load Diff

View File

@@ -17,6 +17,12 @@ import (
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
) )
// ZSetEntry is an entry in redis sorted set.
type ZSetEntry struct {
Msg *base.TaskMessage
Score float64
}
// SortMsgOpt is a cmp.Option to sort base.TaskMessage for comparing slice of task messages. // SortMsgOpt is a cmp.Option to sort base.TaskMessage for comparing slice of task messages.
var SortMsgOpt = cmp.Transformer("SortTaskMessages", func(in []*base.TaskMessage) []*base.TaskMessage { var SortMsgOpt = cmp.Transformer("SortTaskMessages", func(in []*base.TaskMessage) []*base.TaskMessage {
out := append([]*base.TaskMessage(nil), in...) // Copy input to avoid mutating it out := append([]*base.TaskMessage(nil), in...) // Copy input to avoid mutating it
@@ -27,10 +33,10 @@ var SortMsgOpt = cmp.Transformer("SortTaskMessages", func(in []*base.TaskMessage
}) })
// SortZSetEntryOpt is an cmp.Option to sort ZSetEntry for comparing slice of zset entries. // SortZSetEntryOpt is an cmp.Option to sort ZSetEntry for comparing slice of zset entries.
var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []base.Z) []base.Z { var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []ZSetEntry) []ZSetEntry {
out := append([]base.Z(nil), in...) // Copy input to avoid mutating it out := append([]ZSetEntry(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool { sort.Slice(out, func(i, j int) bool {
return out[i].Message.ID.String() < out[j].Message.ID.String() return out[i].Msg.ID.String() < out[j].Msg.ID.String()
}) })
return out return out
}) })
@@ -171,15 +177,6 @@ func SeedEnqueuedQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage,
seedRedisList(tb, r, queue, msgs) seedRedisList(tb, r, queue, msgs)
} }
// SeedAllEnqueuedQueues initializes all of the specified queues with the given messages.
//
// enqueued maps a queue name to a list of messages.
func SeedAllEnqueuedQueues(tb testing.TB, r *redis.Client, enqueued map[string][]*base.TaskMessage) {
for q, msgs := range enqueued {
SeedEnqueuedQueue(tb, r, msgs, q)
}
}
// SeedInProgressQueue initializes the in-progress queue with the given messages. // SeedInProgressQueue initializes the in-progress queue with the given messages.
func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) { func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) {
tb.Helper() tb.Helper()
@@ -187,25 +184,25 @@ func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessag
} }
// SeedScheduledQueue initializes the scheduled queue with the given messages. // SeedScheduledQueue initializes the scheduled queue with the given messages.
func SeedScheduledQueue(tb testing.TB, r *redis.Client, entries []base.Z) { func SeedScheduledQueue(tb testing.TB, r *redis.Client, entries []ZSetEntry) {
tb.Helper() tb.Helper()
seedRedisZSet(tb, r, base.ScheduledQueue, entries) seedRedisZSet(tb, r, base.ScheduledQueue, entries)
} }
// SeedRetryQueue initializes the retry queue with the given messages. // SeedRetryQueue initializes the retry queue with the given messages.
func SeedRetryQueue(tb testing.TB, r *redis.Client, entries []base.Z) { func SeedRetryQueue(tb testing.TB, r *redis.Client, entries []ZSetEntry) {
tb.Helper() tb.Helper()
seedRedisZSet(tb, r, base.RetryQueue, entries) seedRedisZSet(tb, r, base.RetryQueue, entries)
} }
// SeedDeadQueue initializes the dead queue with the given messages. // SeedDeadQueue initializes the dead queue with the given messages.
func SeedDeadQueue(tb testing.TB, r *redis.Client, entries []base.Z) { func SeedDeadQueue(tb testing.TB, r *redis.Client, entries []ZSetEntry) {
tb.Helper() tb.Helper()
seedRedisZSet(tb, r, base.DeadQueue, entries) seedRedisZSet(tb, r, base.DeadQueue, entries)
} }
// SeedDeadlines initializes the deadlines set with the given entries. // SeedDeadlines initializes the deadlines set with the given entries.
func SeedDeadlines(tb testing.TB, r *redis.Client, entries []base.Z) { func SeedDeadlines(tb testing.TB, r *redis.Client, entries []ZSetEntry) {
tb.Helper() tb.Helper()
seedRedisZSet(tb, r, base.KeyDeadlines, entries) seedRedisZSet(tb, r, base.KeyDeadlines, entries)
} }
@@ -219,9 +216,9 @@ func seedRedisList(tb testing.TB, c *redis.Client, key string, msgs []*base.Task
} }
} }
func seedRedisZSet(tb testing.TB, c *redis.Client, key string, items []base.Z) { func seedRedisZSet(tb testing.TB, c *redis.Client, key string, items []ZSetEntry) {
for _, item := range items { for _, item := range items {
z := &redis.Z{Member: MustMarshal(tb, item.Message), Score: float64(item.Score)} z := &redis.Z{Member: MustMarshal(tb, item.Msg), Score: float64(item.Score)}
if err := c.ZAdd(key, z).Err(); err != nil { if err := c.ZAdd(key, z).Err(); err != nil {
tb.Fatal(err) tb.Fatal(err)
} }
@@ -265,25 +262,25 @@ func GetDeadMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage {
} }
// GetScheduledEntries returns all task messages and its score in the scheduled queue. // GetScheduledEntries returns all task messages and its score in the scheduled queue.
func GetScheduledEntries(tb testing.TB, r *redis.Client) []base.Z { func GetScheduledEntries(tb testing.TB, r *redis.Client) []ZSetEntry {
tb.Helper() tb.Helper()
return getZSetEntries(tb, r, base.ScheduledQueue) return getZSetEntries(tb, r, base.ScheduledQueue)
} }
// GetRetryEntries returns all task messages and its score in the retry queue. // GetRetryEntries returns all task messages and its score in the retry queue.
func GetRetryEntries(tb testing.TB, r *redis.Client) []base.Z { func GetRetryEntries(tb testing.TB, r *redis.Client) []ZSetEntry {
tb.Helper() tb.Helper()
return getZSetEntries(tb, r, base.RetryQueue) return getZSetEntries(tb, r, base.RetryQueue)
} }
// GetDeadEntries returns all task messages and its score in the dead queue. // GetDeadEntries returns all task messages and its score in the dead queue.
func GetDeadEntries(tb testing.TB, r *redis.Client) []base.Z { func GetDeadEntries(tb testing.TB, r *redis.Client) []ZSetEntry {
tb.Helper() tb.Helper()
return getZSetEntries(tb, r, base.DeadQueue) return getZSetEntries(tb, r, base.DeadQueue)
} }
// GetDeadlinesEntries returns all task messages and its score in the deadlines set. // GetDeadlinesEntries returns all task messages and its score in the deadlines set.
func GetDeadlinesEntries(tb testing.TB, r *redis.Client) []base.Z { func GetDeadlinesEntries(tb testing.TB, r *redis.Client) []ZSetEntry {
tb.Helper() tb.Helper()
return getZSetEntries(tb, r, base.KeyDeadlines) return getZSetEntries(tb, r, base.KeyDeadlines)
} }
@@ -298,13 +295,13 @@ func getZSetMessages(tb testing.TB, r *redis.Client, zset string) []*base.TaskMe
return MustUnmarshalSlice(tb, data) return MustUnmarshalSlice(tb, data)
} }
func getZSetEntries(tb testing.TB, r *redis.Client, zset string) []base.Z { func getZSetEntries(tb testing.TB, r *redis.Client, zset string) []ZSetEntry {
data := r.ZRangeWithScores(zset, 0, -1).Val() data := r.ZRangeWithScores(zset, 0, -1).Val()
var entries []base.Z var entries []ZSetEntry
for _, z := range data { for _, z := range data {
entries = append(entries, base.Z{ entries = append(entries, ZSetEntry{
Message: MustUnmarshal(tb, z.Member.(string)), Msg: MustUnmarshal(tb, z.Member.(string)),
Score: int64(z.Score), Score: z.Score,
}) })
} }
return entries return entries

View File

@@ -133,12 +133,6 @@ func DecodeMessage(s string) (*TaskMessage, error) {
return &msg, nil return &msg, nil
} }
// Z represents sorted set member.
type Z struct {
Message *TaskMessage
Score int64
}
// ServerStatus represents status of a server. // ServerStatus represents status of a server.
// ServerStatus methods are concurrency safe. // ServerStatus methods are concurrency safe.
type ServerStatus struct { type ServerStatus struct {
@@ -263,7 +257,6 @@ func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool) {
// //
// See rdb.RDB as a reference implementation. // See rdb.RDB as a reference implementation.
type Broker interface { type Broker interface {
Ping() error
Enqueue(msg *TaskMessage) error Enqueue(msg *TaskMessage) error
EnqueueUnique(msg *TaskMessage, ttl time.Duration) error EnqueueUnique(msg *TaskMessage, ttl time.Duration) error
Dequeue(qnames ...string) (*TaskMessage, time.Time, error) Dequeue(qnames ...string) (*TaskMessage, time.Time, error)

View File

@@ -51,6 +51,56 @@ type DailyStats struct {
Time time.Time Time time.Time
} }
// EnqueuedTask is a task in a queue and is ready to be processed.
type EnqueuedTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
Queue string
}
// InProgressTask is a task that's currently being processed.
type InProgressTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
}
// ScheduledTask is a task that's scheduled to be processed in the future.
type ScheduledTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
ProcessAt time.Time
Score int64
Queue string
}
// RetryTask is a task that's in retry queue because worker failed to process the task.
type RetryTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
// TODO(hibiken): add LastFailedAt time.Time
ProcessAt time.Time
ErrorMsg string
Retried int
Retry int
Score int64
Queue string
}
// DeadTask is a task in that has exhausted all retries.
type DeadTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
LastFailedAt time.Time
ErrorMsg string
Score int64
Queue string
}
// KEYS[1] -> asynq:queues // KEYS[1] -> asynq:queues
// KEYS[2] -> asynq:in_progress // KEYS[2] -> asynq:in_progress
// KEYS[3] -> asynq:scheduled // KEYS[3] -> asynq:scheduled
@@ -239,79 +289,158 @@ func (p Pagination) stop() int64 {
} }
// ListEnqueued returns enqueued tasks that are ready to be processed. // ListEnqueued returns enqueued tasks that are ready to be processed.
func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*base.TaskMessage, error) { func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*EnqueuedTask, error) {
qkey := base.QueueKey(qname) qkey := base.QueueKey(qname)
if !r.client.SIsMember(base.AllQueues, qkey).Val() { if !r.client.SIsMember(base.AllQueues, qkey).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname) return nil, fmt.Errorf("queue %q does not exist", qname)
} }
return r.listMessages(qkey, pgn)
}
// ListInProgress returns all tasks that are currently being processed.
func (r *RDB) ListInProgress(pgn Pagination) ([]*base.TaskMessage, error) {
return r.listMessages(base.InProgressQueue, pgn)
}
// listMessages returns a list of TaskMessage in Redis list with the given key.
func (r *RDB) listMessages(key string, pgn Pagination) ([]*base.TaskMessage, error) {
// Note: Because we use LPUSH to redis list, we need to calculate the // Note: Because we use LPUSH to redis list, we need to calculate the
// correct range and reverse the list to get the tasks with pagination. // correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1 stop := -pgn.start() - 1
start := -pgn.stop() - 1 start := -pgn.stop() - 1
data, err := r.client.LRange(key, start, stop).Result() data, err := r.client.LRange(qkey, start, stop).Result()
if err != nil { if err != nil {
return nil, err return nil, err
} }
reverse(data) reverse(data)
var msgs []*base.TaskMessage var tasks []*EnqueuedTask
for _, s := range data { for _, s := range data {
m, err := base.DecodeMessage(s) var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil { if err != nil {
continue // bad data, ignore and continue continue // bad data, ignore and continue
} }
msgs = append(msgs, m) tasks = append(tasks, &EnqueuedTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
Queue: msg.Queue,
})
}
return tasks, nil
} }
return msgs, nil
// ListInProgress returns all tasks that are currently being processed.
func (r *RDB) ListInProgress(pgn Pagination) ([]*InProgressTask, error) {
// Note: Because we use LPUSH to redis list, we need to calculate the
// correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1
start := -pgn.stop() - 1
data, err := r.client.LRange(base.InProgressQueue, start, stop).Result()
if err != nil {
return nil, err
}
reverse(data)
var tasks []*InProgressTask
for _, s := range data {
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
}
tasks = append(tasks, &InProgressTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
})
}
return tasks, nil
} }
// ListScheduled returns all tasks that are scheduled to be processed // ListScheduled returns all tasks that are scheduled to be processed
// in the future. // in the future.
func (r *RDB) ListScheduled(pgn Pagination) ([]base.Z, error) { func (r *RDB) ListScheduled(pgn Pagination) ([]*ScheduledTask, error) {
return r.listZSetEntries(base.ScheduledQueue, pgn) data, err := r.client.ZRangeWithScores(base.ScheduledQueue, pgn.start(), pgn.stop()).Result()
}
// ListRetry returns all tasks that have failed before and willl be retried
// in the future.
func (r *RDB) ListRetry(pgn Pagination) ([]base.Z, error) {
return r.listZSetEntries(base.RetryQueue, pgn)
}
// ListDead returns all tasks that have exhausted its retry limit.
func (r *RDB) ListDead(pgn Pagination) ([]base.Z, error) {
return r.listZSetEntries(base.DeadQueue, pgn)
}
// listZSetEntries returns a list of message and score pairs in Redis sorted-set
// with the given key.
func (r *RDB) listZSetEntries(key string, pgn Pagination) ([]base.Z, error) {
data, err := r.client.ZRangeWithScores(key, pgn.start(), pgn.stop()).Result()
if err != nil { if err != nil {
return nil, err return nil, err
} }
var res []base.Z var tasks []*ScheduledTask
for _, z := range data { for _, z := range data {
s, ok := z.Member.(string) s, ok := z.Member.(string)
if !ok { if !ok {
continue // bad data, ignore and continue continue // bad data, ignore and continue
} }
msg, err := base.DecodeMessage(s) var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil { if err != nil {
continue // bad data, ignore and continue continue // bad data, ignore and continue
} }
res = append(res, base.Z{msg, int64(z.Score)}) processAt := time.Unix(int64(z.Score), 0)
tasks = append(tasks, &ScheduledTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
Queue: msg.Queue,
ProcessAt: processAt,
Score: int64(z.Score),
})
} }
return res, nil return tasks, nil
}
// ListRetry returns all tasks that have failed before and willl be retried
// in the future.
func (r *RDB) ListRetry(pgn Pagination) ([]*RetryTask, error) {
data, err := r.client.ZRangeWithScores(base.RetryQueue, pgn.start(), pgn.stop()).Result()
if err != nil {
return nil, err
}
var tasks []*RetryTask
for _, z := range data {
s, ok := z.Member.(string)
if !ok {
continue // bad data, ignore and continue
}
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
}
processAt := time.Unix(int64(z.Score), 0)
tasks = append(tasks, &RetryTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
ErrorMsg: msg.ErrorMsg,
Retry: msg.Retry,
Retried: msg.Retried,
Queue: msg.Queue,
ProcessAt: processAt,
Score: int64(z.Score),
})
}
return tasks, nil
}
// ListDead returns all tasks that have exhausted its retry limit.
func (r *RDB) ListDead(pgn Pagination) ([]*DeadTask, error) {
data, err := r.client.ZRangeWithScores(base.DeadQueue, pgn.start(), pgn.stop()).Result()
if err != nil {
return nil, err
}
var tasks []*DeadTask
for _, z := range data {
s, ok := z.Member.(string)
if !ok {
continue // bad data, ignore and continue
}
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
}
lastFailedAt := time.Unix(int64(z.Score), 0)
tasks = append(tasks, &DeadTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
ErrorMsg: msg.ErrorMsg,
Queue: msg.Queue,
LastFailedAt: lastFailedAt,
Score: int64(z.Score),
})
}
return tasks, nil
} }
// EnqueueDeadTask finds a task that matches the given id and score from dead queue // EnqueueDeadTask finds a task that matches the given id and score from dead queue
@@ -575,40 +704,19 @@ func (r *RDB) deleteTask(zset, id string, score float64) error {
return nil return nil
} }
// KEYS[1] -> queue to delete // DeleteAllDeadTasks deletes all tasks from the dead queue.
var deleteAllCmd = redis.NewScript(` func (r *RDB) DeleteAllDeadTasks() error {
local n = redis.call("ZCARD", KEYS[1]) return r.client.Del(base.DeadQueue).Err()
redis.call("DEL", KEYS[1])
return n`)
// DeleteAllDeadTasks deletes all tasks from the dead queue
// and returns the number of tasks deleted.
func (r *RDB) DeleteAllDeadTasks() (int64, error) {
return r.deleteAll(base.DeadQueue)
} }
// DeleteAllRetryTasks deletes all tasks from the dead queue // DeleteAllRetryTasks deletes all tasks from the dead queue.
// and returns the number of tasks deleted. func (r *RDB) DeleteAllRetryTasks() error {
func (r *RDB) DeleteAllRetryTasks() (int64, error) { return r.client.Del(base.RetryQueue).Err()
return r.deleteAll(base.RetryQueue)
} }
// DeleteAllScheduledTasks deletes all tasks from the dead queue // DeleteAllScheduledTasks deletes all tasks from the dead queue.
// and returns the number of tasks deleted. func (r *RDB) DeleteAllScheduledTasks() error {
func (r *RDB) DeleteAllScheduledTasks() (int64, error) { return r.client.Del(base.ScheduledQueue).Err()
return r.deleteAll(base.ScheduledQueue)
}
func (r *RDB) deleteAll(key string) (int64, error) {
res, err := deleteAllCmd.Run(r.client, []string{key}).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
}
return n, nil
} }
// ErrQueueNotFound indicates specified queue does not exist. // ErrQueueNotFound indicates specified queue does not exist.

File diff suppressed because it is too large Load Diff

View File

@@ -45,11 +45,6 @@ func (r *RDB) Close() error {
return r.client.Close() return r.client.Close()
} }
// Ping checks the connection with redis server.
func (r *RDB) Ping() error {
return r.client.Ping().Err()
}
// KEYS[1] -> asynq:queues:<qname> // KEYS[1] -> asynq:queues:<qname>
// KEYS[2] -> asynq:queues // KEYS[2] -> asynq:queues
// ARGV[1] -> task message data // ARGV[1] -> task message data

View File

@@ -148,7 +148,7 @@ func TestDequeue(t *testing.T) {
err error err error
wantEnqueued map[string][]*base.TaskMessage wantEnqueued map[string][]*base.TaskMessage
wantInProgress []*base.TaskMessage wantInProgress []*base.TaskMessage
wantDeadlines []base.Z wantDeadlines []h.ZSetEntry
}{ }{
{ {
enqueued: map[string][]*base.TaskMessage{ enqueued: map[string][]*base.TaskMessage{
@@ -162,10 +162,10 @@ func TestDequeue(t *testing.T) {
"default": {}, "default": {},
}, },
wantInProgress: []*base.TaskMessage{t1}, wantInProgress: []*base.TaskMessage{t1},
wantDeadlines: []base.Z{ wantDeadlines: []h.ZSetEntry{
{ {
Message: t1, Msg: t1,
Score: t1Deadline, Score: float64(t1Deadline),
}, },
}, },
}, },
@@ -181,7 +181,7 @@ func TestDequeue(t *testing.T) {
"default": {}, "default": {},
}, },
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
wantDeadlines: []base.Z{}, wantDeadlines: []h.ZSetEntry{},
}, },
{ {
enqueued: map[string][]*base.TaskMessage{ enqueued: map[string][]*base.TaskMessage{
@@ -199,10 +199,10 @@ func TestDequeue(t *testing.T) {
"low": {t3}, "low": {t3},
}, },
wantInProgress: []*base.TaskMessage{t2}, wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []base.Z{ wantDeadlines: []h.ZSetEntry{
{ {
Message: t2, Msg: t2,
Score: t2Deadline, Score: float64(t2Deadline),
}, },
}, },
}, },
@@ -222,10 +222,10 @@ func TestDequeue(t *testing.T) {
"low": {t2, t1}, "low": {t2, t1},
}, },
wantInProgress: []*base.TaskMessage{t3}, wantInProgress: []*base.TaskMessage{t3},
wantDeadlines: []base.Z{ wantDeadlines: []h.ZSetEntry{
{ {
Message: t3, Msg: t3,
Score: t3Deadline, Score: float64(t3Deadline),
}, },
}, },
}, },
@@ -245,7 +245,7 @@ func TestDequeue(t *testing.T) {
"low": {}, "low": {},
}, },
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
wantDeadlines: []base.Z{}, wantDeadlines: []h.ZSetEntry{},
}, },
} }
@@ -412,70 +412,70 @@ func TestDone(t *testing.T) {
tests := []struct { tests := []struct {
inProgress []*base.TaskMessage // initial state of the in-progress list inProgress []*base.TaskMessage // initial state of the in-progress list
deadlines []base.Z // initial state of deadlines set deadlines []h.ZSetEntry // initial state of deadlines set
target *base.TaskMessage // task to remove target *base.TaskMessage // task to remove
wantInProgress []*base.TaskMessage // final state of the in-progress list wantInProgress []*base.TaskMessage // final state of the in-progress list
wantDeadlines []base.Z // final state of the deadline set wantDeadlines []h.ZSetEntry // final state of the deadline set
}{ }{
{ {
inProgress: []*base.TaskMessage{t1, t2}, inProgress: []*base.TaskMessage{t1, t2},
deadlines: []base.Z{ deadlines: []h.ZSetEntry{
{ {
Message: t1, Msg: t1,
Score: t1Deadline, Score: float64(t1Deadline),
}, },
{ {
Message: t2, Msg: t2,
Score: t2Deadline, Score: float64(t2Deadline),
}, },
}, },
target: t1, target: t1,
wantInProgress: []*base.TaskMessage{t2}, wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []base.Z{ wantDeadlines: []h.ZSetEntry{
{ {
Message: t2, Msg: t2,
Score: t2Deadline, Score: float64(t2Deadline),
}, },
}, },
}, },
{ {
inProgress: []*base.TaskMessage{t1}, inProgress: []*base.TaskMessage{t1},
deadlines: []base.Z{ deadlines: []h.ZSetEntry{
{ {
Message: t1, Msg: t1,
Score: t1Deadline, Score: float64(t1Deadline),
}, },
}, },
target: t1, target: t1,
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
wantDeadlines: []base.Z{}, wantDeadlines: []h.ZSetEntry{},
}, },
{ {
inProgress: []*base.TaskMessage{t1, t2, t3}, inProgress: []*base.TaskMessage{t1, t2, t3},
deadlines: []base.Z{ deadlines: []h.ZSetEntry{
{ {
Message: t1, Msg: t1,
Score: t1Deadline, Score: float64(t1Deadline),
}, },
{ {
Message: t2, Msg: t2,
Score: t2Deadline, Score: float64(t2Deadline),
}, },
{ {
Message: t3, Msg: t3,
Score: t3Deadline, Score: float64(t3Deadline),
}, },
}, },
target: t3, target: t3,
wantInProgress: []*base.TaskMessage{t1, t2}, wantInProgress: []*base.TaskMessage{t1, t2},
wantDeadlines: []base.Z{ wantDeadlines: []h.ZSetEntry{
{ {
Message: t1, Msg: t1,
Score: t1Deadline, Score: float64(t1Deadline),
}, },
{ {
Message: t2, Msg: t2,
Score: t2Deadline, Score: float64(t2Deadline),
}, },
}, },
}, },
@@ -560,28 +560,28 @@ func TestRequeue(t *testing.T) {
tests := []struct { tests := []struct {
enqueued map[string][]*base.TaskMessage // initial state of queues enqueued map[string][]*base.TaskMessage // initial state of queues
inProgress []*base.TaskMessage // initial state of the in-progress list inProgress []*base.TaskMessage // initial state of the in-progress list
deadlines []base.Z // initial state of the deadlines set deadlines []h.ZSetEntry // initial state of the deadlines set
target *base.TaskMessage // task to requeue target *base.TaskMessage // task to requeue
wantEnqueued map[string][]*base.TaskMessage // final state of queues wantEnqueued map[string][]*base.TaskMessage // final state of queues
wantInProgress []*base.TaskMessage // final state of the in-progress list wantInProgress []*base.TaskMessage // final state of the in-progress list
wantDeadlines []base.Z // final state of the deadlines set wantDeadlines []h.ZSetEntry // final state of the deadlines set
}{ }{
{ {
enqueued: map[string][]*base.TaskMessage{ enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {}, base.DefaultQueueName: {},
}, },
inProgress: []*base.TaskMessage{t1, t2}, inProgress: []*base.TaskMessage{t1, t2},
deadlines: []base.Z{ deadlines: []h.ZSetEntry{
{Message: t1, Score: t1Deadline}, {Msg: t1, Score: float64(t1Deadline)},
{Message: t2, Score: t2Deadline}, {Msg: t2, Score: float64(t2Deadline)},
}, },
target: t1, target: t1,
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1}, base.DefaultQueueName: {t1},
}, },
wantInProgress: []*base.TaskMessage{t2}, wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []base.Z{ wantDeadlines: []h.ZSetEntry{
{Message: t2, Score: t2Deadline}, {Msg: t2, Score: float64(t2Deadline)},
}, },
}, },
{ {
@@ -589,15 +589,15 @@ func TestRequeue(t *testing.T) {
base.DefaultQueueName: {t1}, base.DefaultQueueName: {t1},
}, },
inProgress: []*base.TaskMessage{t2}, inProgress: []*base.TaskMessage{t2},
deadlines: []base.Z{ deadlines: []h.ZSetEntry{
{Message: t2, Score: t2Deadline}, {Msg: t2, Score: float64(t2Deadline)},
}, },
target: t2, target: t2,
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1, t2}, base.DefaultQueueName: {t1, t2},
}, },
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
wantDeadlines: []base.Z{}, wantDeadlines: []h.ZSetEntry{},
}, },
{ {
enqueued: map[string][]*base.TaskMessage{ enqueued: map[string][]*base.TaskMessage{
@@ -605,9 +605,9 @@ func TestRequeue(t *testing.T) {
"critical": {}, "critical": {},
}, },
inProgress: []*base.TaskMessage{t2, t3}, inProgress: []*base.TaskMessage{t2, t3},
deadlines: []base.Z{ deadlines: []h.ZSetEntry{
{Message: t2, Score: t2Deadline}, {Msg: t2, Score: float64(t2Deadline)},
{Message: t3, Score: t3Deadline}, {Msg: t3, Score: float64(t3Deadline)},
}, },
target: t3, target: t3,
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
@@ -615,8 +615,8 @@ func TestRequeue(t *testing.T) {
"critical": {t3}, "critical": {t3},
}, },
wantInProgress: []*base.TaskMessage{t2}, wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []base.Z{ wantDeadlines: []h.ZSetEntry{
{Message: t2, Score: t2Deadline}, {Msg: t2, Score: float64(t2Deadline)},
}, },
}, },
} }
@@ -765,42 +765,42 @@ func TestRetry(t *testing.T) {
tests := []struct { tests := []struct {
inProgress []*base.TaskMessage inProgress []*base.TaskMessage
deadlines []base.Z deadlines []h.ZSetEntry
retry []base.Z retry []h.ZSetEntry
msg *base.TaskMessage msg *base.TaskMessage
processAt time.Time processAt time.Time
errMsg string errMsg string
wantInProgress []*base.TaskMessage wantInProgress []*base.TaskMessage
wantDeadlines []base.Z wantDeadlines []h.ZSetEntry
wantRetry []base.Z wantRetry []h.ZSetEntry
}{ }{
{ {
inProgress: []*base.TaskMessage{t1, t2}, inProgress: []*base.TaskMessage{t1, t2},
deadlines: []base.Z{ deadlines: []h.ZSetEntry{
{Message: t1, Score: t1Deadline}, {Msg: t1, Score: float64(t1Deadline)},
{Message: t2, Score: t2Deadline}, {Msg: t2, Score: float64(t2Deadline)},
}, },
retry: []base.Z{ retry: []h.ZSetEntry{
{ {
Message: t3, Msg: t3,
Score: now.Add(time.Minute).Unix(), Score: float64(now.Add(time.Minute).Unix()),
}, },
}, },
msg: t1, msg: t1,
processAt: now.Add(5 * time.Minute), processAt: now.Add(5 * time.Minute),
errMsg: errMsg, errMsg: errMsg,
wantInProgress: []*base.TaskMessage{t2}, wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []base.Z{ wantDeadlines: []h.ZSetEntry{
{Message: t2, Score: t2Deadline}, {Msg: t2, Score: float64(t2Deadline)},
}, },
wantRetry: []base.Z{ wantRetry: []h.ZSetEntry{
{ {
Message: h.TaskMessageAfterRetry(*t1, errMsg), Msg: h.TaskMessageAfterRetry(*t1, errMsg),
Score: now.Add(5 * time.Minute).Unix(), Score: float64(now.Add(5 * time.Minute).Unix()),
}, },
{ {
Message: t3, Msg: t3,
Score: now.Add(time.Minute).Unix(), Score: float64(now.Add(time.Minute).Unix()),
}, },
}, },
}, },
@@ -891,59 +891,59 @@ func TestKill(t *testing.T) {
// TODO(hibiken): add test cases for trimming // TODO(hibiken): add test cases for trimming
tests := []struct { tests := []struct {
inProgress []*base.TaskMessage inProgress []*base.TaskMessage
deadlines []base.Z deadlines []h.ZSetEntry
dead []base.Z dead []h.ZSetEntry
target *base.TaskMessage // task to kill target *base.TaskMessage // task to kill
wantInProgress []*base.TaskMessage wantInProgress []*base.TaskMessage
wantDeadlines []base.Z wantDeadlines []h.ZSetEntry
wantDead []base.Z wantDead []h.ZSetEntry
}{ }{
{ {
inProgress: []*base.TaskMessage{t1, t2}, inProgress: []*base.TaskMessage{t1, t2},
deadlines: []base.Z{ deadlines: []h.ZSetEntry{
{Message: t1, Score: t1Deadline}, {Msg: t1, Score: float64(t1Deadline)},
{Message: t2, Score: t2Deadline}, {Msg: t2, Score: float64(t2Deadline)},
}, },
dead: []base.Z{ dead: []h.ZSetEntry{
{ {
Message: t3, Msg: t3,
Score: now.Add(-time.Hour).Unix(), Score: float64(now.Add(-time.Hour).Unix()),
}, },
}, },
target: t1, target: t1,
wantInProgress: []*base.TaskMessage{t2}, wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []base.Z{ wantDeadlines: []h.ZSetEntry{
{Message: t2, Score: t2Deadline}, {Msg: t2, Score: float64(t2Deadline)},
}, },
wantDead: []base.Z{ wantDead: []h.ZSetEntry{
{ {
Message: h.TaskMessageWithError(*t1, errMsg), Msg: h.TaskMessageWithError(*t1, errMsg),
Score: now.Unix(), Score: float64(now.Unix()),
}, },
{ {
Message: t3, Msg: t3,
Score: now.Add(-time.Hour).Unix(), Score: float64(now.Add(-time.Hour).Unix()),
}, },
}, },
}, },
{ {
inProgress: []*base.TaskMessage{t1, t2, t3}, inProgress: []*base.TaskMessage{t1, t2, t3},
deadlines: []base.Z{ deadlines: []h.ZSetEntry{
{Message: t1, Score: t1Deadline}, {Msg: t1, Score: float64(t1Deadline)},
{Message: t2, Score: t2Deadline}, {Msg: t2, Score: float64(t2Deadline)},
{Message: t3, Score: t3Deadline}, {Msg: t3, Score: float64(t3Deadline)},
}, },
dead: []base.Z{}, dead: []h.ZSetEntry{},
target: t1, target: t1,
wantInProgress: []*base.TaskMessage{t2, t3}, wantInProgress: []*base.TaskMessage{t2, t3},
wantDeadlines: []base.Z{ wantDeadlines: []h.ZSetEntry{
{Message: t2, Score: t2Deadline}, {Msg: t2, Score: float64(t2Deadline)},
{Message: t3, Score: t3Deadline}, {Msg: t3, Score: float64(t3Deadline)},
}, },
wantDead: []base.Z{ wantDead: []h.ZSetEntry{
{ {
Message: h.TaskMessageWithError(*t1, errMsg), Msg: h.TaskMessageWithError(*t1, errMsg),
Score: now.Unix(), Score: float64(now.Unix()),
}, },
}, },
}, },
@@ -1009,19 +1009,19 @@ func TestCheckAndEnqueue(t *testing.T) {
hourFromNow := time.Now().Add(time.Hour) hourFromNow := time.Now().Add(time.Hour)
tests := []struct { tests := []struct {
scheduled []base.Z scheduled []h.ZSetEntry
retry []base.Z retry []h.ZSetEntry
wantEnqueued map[string][]*base.TaskMessage wantEnqueued map[string][]*base.TaskMessage
wantScheduled []*base.TaskMessage wantScheduled []*base.TaskMessage
wantRetry []*base.TaskMessage wantRetry []*base.TaskMessage
}{ }{
{ {
scheduled: []base.Z{ scheduled: []h.ZSetEntry{
{Message: t1, Score: secondAgo.Unix()}, {Msg: t1, Score: float64(secondAgo.Unix())},
{Message: t2, Score: secondAgo.Unix()}, {Msg: t2, Score: float64(secondAgo.Unix())},
}, },
retry: []base.Z{ retry: []h.ZSetEntry{
{Message: t3, Score: secondAgo.Unix()}}, {Msg: t3, Score: float64(secondAgo.Unix())}},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1, t2, t3}, "default": {t1, t2, t3},
}, },
@@ -1029,11 +1029,11 @@ func TestCheckAndEnqueue(t *testing.T) {
wantRetry: []*base.TaskMessage{}, wantRetry: []*base.TaskMessage{},
}, },
{ {
scheduled: []base.Z{ scheduled: []h.ZSetEntry{
{Message: t1, Score: hourFromNow.Unix()}, {Msg: t1, Score: float64(hourFromNow.Unix())},
{Message: t2, Score: secondAgo.Unix()}}, {Msg: t2, Score: float64(secondAgo.Unix())}},
retry: []base.Z{ retry: []h.ZSetEntry{
{Message: t3, Score: secondAgo.Unix()}}, {Msg: t3, Score: float64(secondAgo.Unix())}},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t2, t3}, "default": {t2, t3},
}, },
@@ -1041,11 +1041,11 @@ func TestCheckAndEnqueue(t *testing.T) {
wantRetry: []*base.TaskMessage{}, wantRetry: []*base.TaskMessage{},
}, },
{ {
scheduled: []base.Z{ scheduled: []h.ZSetEntry{
{Message: t1, Score: hourFromNow.Unix()}, {Msg: t1, Score: float64(hourFromNow.Unix())},
{Message: t2, Score: hourFromNow.Unix()}}, {Msg: t2, Score: float64(hourFromNow.Unix())}},
retry: []base.Z{ retry: []h.ZSetEntry{
{Message: t3, Score: hourFromNow.Unix()}}, {Msg: t3, Score: float64(hourFromNow.Unix())}},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {}, "default": {},
}, },
@@ -1053,12 +1053,12 @@ func TestCheckAndEnqueue(t *testing.T) {
wantRetry: []*base.TaskMessage{t3}, wantRetry: []*base.TaskMessage{t3},
}, },
{ {
scheduled: []base.Z{ scheduled: []h.ZSetEntry{
{Message: t1, Score: secondAgo.Unix()}, {Msg: t1, Score: float64(secondAgo.Unix())},
{Message: t4, Score: secondAgo.Unix()}, {Msg: t4, Score: float64(secondAgo.Unix())},
}, },
retry: []base.Z{ retry: []h.ZSetEntry{
{Message: t5, Score: secondAgo.Unix()}}, {Msg: t5, Score: float64(secondAgo.Unix())}},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1}, "default": {t1},
"critical": {t4}, "critical": {t4},
@@ -1112,41 +1112,41 @@ func TestListDeadlineExceeded(t *testing.T) {
tests := []struct { tests := []struct {
desc string desc string
deadlines []base.Z deadlines []h.ZSetEntry
t time.Time t time.Time
want []*base.TaskMessage want []*base.TaskMessage
}{ }{
{ {
desc: "with one task in-progress", desc: "with one task in-progress",
deadlines: []base.Z{ deadlines: []h.ZSetEntry{
{Message: t1, Score: fiveMinutesAgo.Unix()}, {Msg: t1, Score: float64(fiveMinutesAgo.Unix())},
}, },
t: time.Now(), t: time.Now(),
want: []*base.TaskMessage{t1}, want: []*base.TaskMessage{t1},
}, },
{ {
desc: "with multiple tasks in-progress, and one expired", desc: "with multiple tasks in-progress, and one expired",
deadlines: []base.Z{ deadlines: []h.ZSetEntry{
{Message: t1, Score: oneHourAgo.Unix()}, {Msg: t1, Score: float64(oneHourAgo.Unix())},
{Message: t2, Score: fiveMinutesFromNow.Unix()}, {Msg: t2, Score: float64(fiveMinutesFromNow.Unix())},
{Message: t3, Score: oneHourFromNow.Unix()}, {Msg: t3, Score: float64(oneHourFromNow.Unix())},
}, },
t: time.Now(), t: time.Now(),
want: []*base.TaskMessage{t1}, want: []*base.TaskMessage{t1},
}, },
{ {
desc: "with multiple expired tasks in-progress", desc: "with multiple expired tasks in-progress",
deadlines: []base.Z{ deadlines: []h.ZSetEntry{
{Message: t1, Score: oneHourAgo.Unix()}, {Msg: t1, Score: float64(oneHourAgo.Unix())},
{Message: t2, Score: fiveMinutesAgo.Unix()}, {Msg: t2, Score: float64(fiveMinutesAgo.Unix())},
{Message: t3, Score: oneHourFromNow.Unix()}, {Msg: t3, Score: float64(oneHourFromNow.Unix())},
}, },
t: time.Now(), t: time.Now(),
want: []*base.TaskMessage{t1, t2}, want: []*base.TaskMessage{t1, t2},
}, },
{ {
desc: "with empty in-progress queue", desc: "with empty in-progress queue",
deadlines: []base.Z{}, deadlines: []h.ZSetEntry{},
t: time.Now(), t: time.Now(),
want: []*base.TaskMessage{}, want: []*base.TaskMessage{},
}, },

View File

@@ -180,15 +180,6 @@ func (tb *TestBroker) PublishCancelation(id string) error {
return tb.real.PublishCancelation(id) return tb.real.PublishCancelation(id)
} }
func (tb *TestBroker) Ping() error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.Ping()
}
func (tb *TestBroker) Close() error { func (tb *TestBroker) Close() error {
tb.mu.Lock() tb.mu.Lock()
defer tb.mu.Unlock() defer tb.mu.Unlock()

View File

@@ -191,19 +191,9 @@ func (p *processor) exec() {
p.cancelations.Delete(msg.ID.String()) p.cancelations.Delete(msg.ID.String())
}() }()
// check context before starting a worker goroutine.
select {
case <-ctx.Done():
// already canceled (e.g. deadline exceeded).
p.retryOrKill(ctx, msg, ctx.Err())
return
default:
}
resCh := make(chan error, 1) resCh := make(chan error, 1)
go func() { task := NewTask(msg.Type, msg.Payload)
resCh <- perform(ctx, NewTask(msg.Type, msg.Payload), p.handler) go func() { resCh <- perform(ctx, task, p.handler) }()
}()
select { select {
case <-p.abort: case <-p.abort:
@@ -212,6 +202,7 @@ func (p *processor) exec() {
p.requeue(msg) p.requeue(msg)
return return
case <-ctx.Done(): case <-ctx.Done():
p.logger.Debugf("Retrying task. task id=%s", msg.ID) // TODO: Improve this log message and above
p.retryOrKill(ctx, msg, ctx.Err()) p.retryOrKill(ctx, msg, ctx.Err())
return return
case resErr := <-resCh: case resErr := <-resCh:
@@ -220,6 +211,9 @@ func (p *processor) exec() {
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry // 2) Retry -> Removes the message from InProgress & Adds the message to Retry
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead // 3) Kill -> Removes the message from InProgress & Adds the message to Dead
if resErr != nil { if resErr != nil {
if p.errHandler != nil {
p.errHandler.HandleError(ctx, task, resErr)
}
p.retryOrKill(ctx, msg, resErr) p.retryOrKill(ctx, msg, resErr)
return return
} }
@@ -258,11 +252,7 @@ func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
} }
func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) { func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) {
if p.errHandler != nil {
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
}
if msg.Retried >= msg.Retry { if msg.Retried >= msg.Retry {
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
p.kill(ctx, msg, err) p.kill(ctx, msg, err)
} else { } else {
p.retry(ctx, msg, err) p.retry(ctx, msg, err)
@@ -291,6 +281,7 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) {
} }
func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) { func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) {
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
err := p.broker.Kill(msg, e.Error()) err := p.broker.Kill(msg, e.Error())
if err != nil { if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue) errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)

View File

@@ -223,7 +223,7 @@ func TestProcessorRetry(t *testing.T) {
delay time.Duration // retry delay duration delay time.Duration // retry delay duration
handler Handler // task handler handler Handler // task handler
wait time.Duration // wait duration between starting and stopping processor for this test case wait time.Duration // wait duration between starting and stopping processor for this test case
wantRetry []base.Z // tasks in retry queue at the end wantRetry []h.ZSetEntry // tasks in retry queue at the end
wantDead []*base.TaskMessage // tasks in dead queue at the end wantDead []*base.TaskMessage // tasks in dead queue at the end
wantErrCount int // number of times error handler should be called wantErrCount int // number of times error handler should be called
}{ }{
@@ -235,10 +235,10 @@ func TestProcessorRetry(t *testing.T) {
return fmt.Errorf(errMsg) return fmt.Errorf(errMsg)
}), }),
wait: 2 * time.Second, wait: 2 * time.Second,
wantRetry: []base.Z{ wantRetry: []h.ZSetEntry{
{Message: h.TaskMessageAfterRetry(*m2, errMsg), Score: now.Add(time.Minute).Unix()}, {Msg: h.TaskMessageAfterRetry(*m2, errMsg), Score: float64(now.Add(time.Minute).Unix())},
{Message: h.TaskMessageAfterRetry(*m3, errMsg), Score: now.Add(time.Minute).Unix()}, {Msg: h.TaskMessageAfterRetry(*m3, errMsg), Score: float64(now.Add(time.Minute).Unix())},
{Message: h.TaskMessageAfterRetry(*m4, errMsg), Score: now.Add(time.Minute).Unix()}, {Msg: h.TaskMessageAfterRetry(*m4, errMsg), Score: float64(now.Add(time.Minute).Unix())},
}, },
wantDead: []*base.TaskMessage{h.TaskMessageWithError(*m1, errMsg)}, wantDead: []*base.TaskMessage{h.TaskMessageWithError(*m1, errMsg)},
wantErrCount: 4, wantErrCount: 4,

View File

@@ -34,24 +34,24 @@ func TestRecoverer(t *testing.T) {
tests := []struct { tests := []struct {
desc string desc string
inProgress []*base.TaskMessage inProgress []*base.TaskMessage
deadlines []base.Z deadlines []h.ZSetEntry
retry []base.Z retry []h.ZSetEntry
dead []base.Z dead []h.ZSetEntry
wantInProgress []*base.TaskMessage wantInProgress []*base.TaskMessage
wantDeadlines []base.Z wantDeadlines []h.ZSetEntry
wantRetry []*base.TaskMessage wantRetry []*base.TaskMessage
wantDead []*base.TaskMessage wantDead []*base.TaskMessage
}{ }{
{ {
desc: "with one task in-progress", desc: "with one task in-progress",
inProgress: []*base.TaskMessage{t1}, inProgress: []*base.TaskMessage{t1},
deadlines: []base.Z{ deadlines: []h.ZSetEntry{
{Message: t1, Score: fiveMinutesAgo.Unix()}, {Msg: t1, Score: float64(fiveMinutesAgo.Unix())},
}, },
retry: []base.Z{}, retry: []h.ZSetEntry{},
dead: []base.Z{}, dead: []h.ZSetEntry{},
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
wantDeadlines: []base.Z{}, wantDeadlines: []h.ZSetEntry{},
wantRetry: []*base.TaskMessage{ wantRetry: []*base.TaskMessage{
h.TaskMessageAfterRetry(*t1, "deadline exceeded"), h.TaskMessageAfterRetry(*t1, "deadline exceeded"),
}, },
@@ -60,30 +60,30 @@ func TestRecoverer(t *testing.T) {
{ {
desc: "with a task with max-retry reached", desc: "with a task with max-retry reached",
inProgress: []*base.TaskMessage{t4}, inProgress: []*base.TaskMessage{t4},
deadlines: []base.Z{ deadlines: []h.ZSetEntry{
{Message: t4, Score: fiveMinutesAgo.Unix()}, {Msg: t4, Score: float64(fiveMinutesAgo.Unix())},
}, },
retry: []base.Z{}, retry: []h.ZSetEntry{},
dead: []base.Z{}, dead: []h.ZSetEntry{},
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
wantDeadlines: []base.Z{}, wantDeadlines: []h.ZSetEntry{},
wantRetry: []*base.TaskMessage{}, wantRetry: []*base.TaskMessage{},
wantDead: []*base.TaskMessage{h.TaskMessageWithError(*t4, "deadline exceeded")}, wantDead: []*base.TaskMessage{h.TaskMessageWithError(*t4, "deadline exceeded")},
}, },
{ {
desc: "with multiple tasks in-progress, and one expired", desc: "with multiple tasks in-progress, and one expired",
inProgress: []*base.TaskMessage{t1, t2, t3}, inProgress: []*base.TaskMessage{t1, t2, t3},
deadlines: []base.Z{ deadlines: []h.ZSetEntry{
{Message: t1, Score: oneHourAgo.Unix()}, {Msg: t1, Score: float64(oneHourAgo.Unix())},
{Message: t2, Score: fiveMinutesFromNow.Unix()}, {Msg: t2, Score: float64(fiveMinutesFromNow.Unix())},
{Message: t3, Score: oneHourFromNow.Unix()}, {Msg: t3, Score: float64(oneHourFromNow.Unix())},
}, },
retry: []base.Z{}, retry: []h.ZSetEntry{},
dead: []base.Z{}, dead: []h.ZSetEntry{},
wantInProgress: []*base.TaskMessage{t2, t3}, wantInProgress: []*base.TaskMessage{t2, t3},
wantDeadlines: []base.Z{ wantDeadlines: []h.ZSetEntry{
{Message: t2, Score: fiveMinutesFromNow.Unix()}, {Msg: t2, Score: float64(fiveMinutesFromNow.Unix())},
{Message: t3, Score: oneHourFromNow.Unix()}, {Msg: t3, Score: float64(oneHourFromNow.Unix())},
}, },
wantRetry: []*base.TaskMessage{ wantRetry: []*base.TaskMessage{
h.TaskMessageAfterRetry(*t1, "deadline exceeded"), h.TaskMessageAfterRetry(*t1, "deadline exceeded"),
@@ -93,16 +93,16 @@ func TestRecoverer(t *testing.T) {
{ {
desc: "with multiple expired tasks in-progress", desc: "with multiple expired tasks in-progress",
inProgress: []*base.TaskMessage{t1, t2, t3}, inProgress: []*base.TaskMessage{t1, t2, t3},
deadlines: []base.Z{ deadlines: []h.ZSetEntry{
{Message: t1, Score: oneHourAgo.Unix()}, {Msg: t1, Score: float64(oneHourAgo.Unix())},
{Message: t2, Score: fiveMinutesAgo.Unix()}, {Msg: t2, Score: float64(fiveMinutesAgo.Unix())},
{Message: t3, Score: oneHourFromNow.Unix()}, {Msg: t3, Score: float64(oneHourFromNow.Unix())},
}, },
retry: []base.Z{}, retry: []h.ZSetEntry{},
dead: []base.Z{}, dead: []h.ZSetEntry{},
wantInProgress: []*base.TaskMessage{t3}, wantInProgress: []*base.TaskMessage{t3},
wantDeadlines: []base.Z{ wantDeadlines: []h.ZSetEntry{
{Message: t3, Score: oneHourFromNow.Unix()}, {Msg: t3, Score: float64(oneHourFromNow.Unix())},
}, },
wantRetry: []*base.TaskMessage{ wantRetry: []*base.TaskMessage{
h.TaskMessageAfterRetry(*t1, "deadline exceeded"), h.TaskMessageAfterRetry(*t1, "deadline exceeded"),
@@ -113,11 +113,11 @@ func TestRecoverer(t *testing.T) {
{ {
desc: "with empty in-progress queue", desc: "with empty in-progress queue",
inProgress: []*base.TaskMessage{}, inProgress: []*base.TaskMessage{},
deadlines: []base.Z{}, deadlines: []h.ZSetEntry{},
retry: []base.Z{}, retry: []h.ZSetEntry{},
dead: []base.Z{}, dead: []h.ZSetEntry{},
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
wantDeadlines: []base.Z{}, wantDeadlines: []h.ZSetEntry{},
wantRetry: []*base.TaskMessage{}, wantRetry: []*base.TaskMessage{},
wantDead: []*base.TaskMessage{}, wantDead: []*base.TaskMessage{},
}, },

View File

@@ -31,8 +31,8 @@ func TestScheduler(t *testing.T) {
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
initScheduled []base.Z // scheduled queue initial state initScheduled []h.ZSetEntry // scheduled queue initial state
initRetry []base.Z // retry queue initial state initRetry []h.ZSetEntry // retry queue initial state
initQueue []*base.TaskMessage // default queue initial state initQueue []*base.TaskMessage // default queue initial state
wait time.Duration // wait duration before checking for final state wait time.Duration // wait duration before checking for final state
wantScheduled []*base.TaskMessage // schedule queue final state wantScheduled []*base.TaskMessage // schedule queue final state
@@ -40,12 +40,12 @@ func TestScheduler(t *testing.T) {
wantQueue []*base.TaskMessage // default queue final state wantQueue []*base.TaskMessage // default queue final state
}{ }{
{ {
initScheduled: []base.Z{ initScheduled: []h.ZSetEntry{
{Message: t1, Score: now.Add(time.Hour).Unix()}, {Msg: t1, Score: float64(now.Add(time.Hour).Unix())},
{Message: t2, Score: now.Add(-2 * time.Second).Unix()}, {Msg: t2, Score: float64(now.Add(-2 * time.Second).Unix())},
}, },
initRetry: []base.Z{ initRetry: []h.ZSetEntry{
{Message: t3, Score: time.Now().Add(-500 * time.Millisecond).Unix()}, {Msg: t3, Score: float64(time.Now().Add(-500 * time.Millisecond).Unix())},
}, },
initQueue: []*base.TaskMessage{t4}, initQueue: []*base.TaskMessage{t4},
wait: pollInterval * 2, wait: pollInterval * 2,
@@ -54,12 +54,12 @@ func TestScheduler(t *testing.T) {
wantQueue: []*base.TaskMessage{t2, t3, t4}, wantQueue: []*base.TaskMessage{t2, t3, t4},
}, },
{ {
initScheduled: []base.Z{ initScheduled: []h.ZSetEntry{
{Message: t1, Score: now.Unix()}, {Msg: t1, Score: float64(now.Unix())},
{Message: t2, Score: now.Add(-2 * time.Second).Unix()}, {Msg: t2, Score: float64(now.Add(-2 * time.Second).Unix())},
{Message: t3, Score: now.Add(-500 * time.Millisecond).Unix()}, {Msg: t3, Score: float64(now.Add(-500 * time.Millisecond).Unix())},
}, },
initRetry: []base.Z{}, initRetry: []h.ZSetEntry{},
initQueue: []*base.TaskMessage{t4}, initQueue: []*base.TaskMessage{t4},
wait: pollInterval * 2, wait: pollInterval * 2,
wantScheduled: []*base.TaskMessage{}, wantScheduled: []*base.TaskMessage{},

View File

@@ -47,7 +47,6 @@ type Server struct {
heartbeater *heartbeater heartbeater *heartbeater
subscriber *subscriber subscriber *subscriber
recoverer *recoverer recoverer *recoverer
healthchecker *healthchecker
} }
// Config specifies the server's background-task processing behavior. // Config specifies the server's background-task processing behavior.
@@ -124,18 +123,9 @@ type Config struct {
// //
// If unset or zero, default timeout of 8 seconds is used. // If unset or zero, default timeout of 8 seconds is used.
ShutdownTimeout time.Duration ShutdownTimeout time.Duration
// HealthCheckFunc is called periodically with any errors encountered during ping to the
// connected redis server.
HealthCheckFunc func(error)
// HealthCheckInterval specifies the interval between healthchecks.
//
// If unset or zero, the interval is set to 15 seconds.
HealthCheckInterval time.Duration
} }
// An ErrorHandler handles an error occured during task processing. // An ErrorHandler handles errors returned by the task handler.
type ErrorHandler interface { type ErrorHandler interface {
HandleError(ctx context.Context, task *Task, err error) HandleError(ctx context.Context, task *Task, err error)
} }
@@ -260,11 +250,7 @@ var defaultQueueConfig = map[string]int{
base.DefaultQueueName: 1, base.DefaultQueueName: 1,
} }
const ( const defaultShutdownTimeout = 8 * time.Second
defaultShutdownTimeout = 8 * time.Second
defaultHealthCheckInterval = 15 * time.Second
)
// NewServer returns a new Server given a redis connection option // NewServer returns a new Server given a redis connection option
// and background processing configuration. // and background processing configuration.
@@ -290,10 +276,6 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
if shutdownTimeout == 0 { if shutdownTimeout == 0 {
shutdownTimeout = defaultShutdownTimeout shutdownTimeout = defaultShutdownTimeout
} }
healthcheckInterval := cfg.HealthCheckInterval
if healthcheckInterval == 0 {
healthcheckInterval = defaultHealthCheckInterval
}
logger := log.NewLogger(cfg.Logger) logger := log.NewLogger(cfg.Logger)
loglevel := cfg.LogLevel loglevel := cfg.LogLevel
if loglevel == level_unspecified { if loglevel == level_unspecified {
@@ -354,12 +336,6 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
retryDelayFunc: delayFunc, retryDelayFunc: delayFunc,
interval: 1 * time.Minute, interval: 1 * time.Minute,
}) })
healthchecker := newHealthChecker(healthcheckerParams{
logger: logger,
broker: rdb,
interval: healthcheckInterval,
healthcheckFunc: cfg.HealthCheckFunc,
})
return &Server{ return &Server{
logger: logger, logger: logger,
broker: rdb, broker: rdb,
@@ -370,7 +346,6 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
heartbeater: heartbeater, heartbeater: heartbeater,
subscriber: subscriber, subscriber: subscriber,
recoverer: recoverer, recoverer: recoverer,
healthchecker: healthchecker,
} }
} }
@@ -438,7 +413,6 @@ func (srv *Server) Start(handler Handler) error {
srv.logger.Info("Starting processing") srv.logger.Info("Starting processing")
srv.heartbeater.start(&srv.wg) srv.heartbeater.start(&srv.wg)
srv.healthchecker.start(&srv.wg)
srv.subscriber.start(&srv.wg) srv.subscriber.start(&srv.wg)
srv.syncer.start(&srv.wg) srv.syncer.start(&srv.wg)
srv.recoverer.start(&srv.wg) srv.recoverer.start(&srv.wg)
@@ -468,7 +442,6 @@ func (srv *Server) Stop() {
srv.recoverer.terminate() srv.recoverer.terminate()
srv.syncer.terminate() srv.syncer.terminate()
srv.subscriber.terminate() srv.subscriber.terminate()
srv.healthchecker.terminate()
srv.heartbeater.terminate() srv.heartbeater.terminate()
srv.wg.Wait() srv.wg.Wait()

View File

@@ -8,14 +8,15 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/hibiken/asynq" "github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
// delCmd represents the del command // delCmd represents the del command
var delCmd = &cobra.Command{ var delCmd = &cobra.Command{
Use: "del [task key]", Use: "del [task id]",
Short: "Deletes a task given an identifier", Short: "Deletes a task given an identifier",
Long: `Del (asynq del) will delete a task given an identifier. Long: `Del (asynq del) will delete a task given an identifier.
@@ -43,12 +44,27 @@ func init() {
} }
func del(cmd *cobra.Command, args []string) { func del(cmd *cobra.Command, args []string) {
i := asynq.NewInspector(asynq.RedisClientOpt{ id, score, qtype, err := parseQueryID(args[0])
if err != nil {
fmt.Println(err)
os.Exit(1)
}
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
}) }))
err := i.DeleteTaskByKey(args[0]) switch qtype {
case "s":
err = r.DeleteScheduledTask(id, score)
case "r":
err = r.DeleteRetryTask(id, score)
case "d":
err = r.DeleteDeadTask(id, score)
default:
fmt.Println("invalid argument")
os.Exit(1)
}
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)

View File

@@ -8,7 +8,8 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/hibiken/asynq" "github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
@@ -44,22 +45,20 @@ func init() {
} }
func delall(cmd *cobra.Command, args []string) { func delall(cmd *cobra.Command, args []string) {
i := asynq.NewInspector(asynq.RedisClientOpt{ c := redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
}) })
var ( r := rdb.NewRDB(c)
n int var err error
err error
)
switch args[0] { switch args[0] {
case "scheduled": case "scheduled":
n, err = i.DeleteAllScheduledTasks() err = r.DeleteAllScheduledTasks()
case "retry": case "retry":
n, err = i.DeleteAllRetryTasks() err = r.DeleteAllRetryTasks()
case "dead": case "dead":
n, err = i.DeleteAllDeadTasks() err = r.DeleteAllDeadTasks()
default: default:
fmt.Printf("error: `asynq delall [state]` only accepts %v as the argument.\n", delallValidArgs) fmt.Printf("error: `asynq delall [state]` only accepts %v as the argument.\n", delallValidArgs)
os.Exit(1) os.Exit(1)
@@ -68,5 +67,5 @@ func delall(cmd *cobra.Command, args []string) {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
} }
fmt.Printf("Deleted all %d tasks in %q state\n", n, args[0]) fmt.Printf("Deleted all tasks in %q state\n", args[0])
} }

View File

@@ -8,14 +8,15 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/hibiken/asynq" "github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
// enqCmd represents the enq command // enqCmd represents the enq command
var enqCmd = &cobra.Command{ var enqCmd = &cobra.Command{
Use: "enq [task key]", Use: "enq [task id]",
Short: "Enqueues a task given an identifier", Short: "Enqueues a task given an identifier",
Long: `Enq (asynq enq) will enqueue a task given an identifier. Long: `Enq (asynq enq) will enqueue a task given an identifier.
@@ -46,12 +47,27 @@ func init() {
} }
func enq(cmd *cobra.Command, args []string) { func enq(cmd *cobra.Command, args []string) {
i := asynq.NewInspector(asynq.RedisClientOpt{ id, score, qtype, err := parseQueryID(args[0])
if err != nil {
fmt.Println(err)
os.Exit(1)
}
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
}) }))
err := i.EnqueueTaskByKey(args[0]) switch qtype {
case "s":
err = r.EnqueueScheduledTask(id, score)
case "r":
err = r.EnqueueRetryTask(id, score)
case "d":
err = r.EnqueueDeadTask(id, score)
default:
fmt.Println("invalid argument")
os.Exit(1)
}
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)

View File

@@ -8,7 +8,8 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/hibiken/asynq" "github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
@@ -47,22 +48,21 @@ func init() {
} }
func enqall(cmd *cobra.Command, args []string) { func enqall(cmd *cobra.Command, args []string) {
i := asynq.NewInspector(asynq.RedisClientOpt{ c := redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
}) })
var ( r := rdb.NewRDB(c)
n int var n int64
err error var err error
)
switch args[0] { switch args[0] {
case "scheduled": case "scheduled":
n, err = i.EnqueueAllScheduledTasks() n, err = r.EnqueueAllScheduledTasks()
case "retry": case "retry":
n, err = i.EnqueueAllRetryTasks() n, err = r.EnqueueAllRetryTasks()
case "dead": case "dead":
n, err = i.EnqueueAllDeadTasks() n, err = r.EnqueueAllDeadTasks()
default: default:
fmt.Printf("error: `asynq enqall [state]` only accepts %v as the argument.\n", enqallValidArgs) fmt.Printf("error: `asynq enqall [state]` only accepts %v as the argument.\n", enqallValidArgs)
os.Exit(1) os.Exit(1)

View File

@@ -10,7 +10,8 @@ import (
"strings" "strings"
"text/tabwriter" "text/tabwriter"
"github.com/hibiken/asynq" "github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
@@ -37,13 +38,14 @@ func init() {
} }
func history(cmd *cobra.Command, args []string) { func history(cmd *cobra.Command, args []string) {
i := asynq.NewInspector(asynq.RedisClientOpt{ c := redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
}) })
r := rdb.NewRDB(c)
stats, err := i.History(days) stats, err := r.HistoricalStats(days)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
@@ -51,7 +53,7 @@ func history(cmd *cobra.Command, args []string) {
printDailyStats(stats) printDailyStats(stats)
} }
func printDailyStats(stats []*asynq.DailyStats) { func printDailyStats(stats []*rdb.DailyStats) {
format := strings.Repeat("%v\t", 4) + "\n" format := strings.Repeat("%v\t", 4) + "\n"
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
fmt.Fprintf(tw, format, "Date (UTC)", "Processed", "Failed", "Error Rate") fmt.Fprintf(tw, format, "Date (UTC)", "Processed", "Failed", "Error Rate")
@@ -63,7 +65,7 @@ func printDailyStats(stats []*asynq.DailyStats) {
} else { } else {
errrate = fmt.Sprintf("%.2f%%", float64(s.Failed)/float64(s.Processed)*100) errrate = fmt.Sprintf("%.2f%%", float64(s.Failed)/float64(s.Processed)*100)
} }
fmt.Fprintf(tw, format, s.Date.Format("2006-01-02"), s.Processed, s.Failed, errrate) fmt.Fprintf(tw, format, s.Time.Format("2006-01-02"), s.Processed, s.Failed, errrate)
} }
tw.Flush() tw.Flush()
} }

View File

@@ -8,14 +8,15 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/hibiken/asynq" "github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
// killCmd represents the kill command // killCmd represents the kill command
var killCmd = &cobra.Command{ var killCmd = &cobra.Command{
Use: "kill [task key]", Use: "kill [task id]",
Short: "Kills a task given an identifier", Short: "Kills a task given an identifier",
Long: `Kill (asynq kill) will put a task in dead state given an identifier. Long: `Kill (asynq kill) will put a task in dead state given an identifier.
@@ -43,12 +44,25 @@ func init() {
} }
func kill(cmd *cobra.Command, args []string) { func kill(cmd *cobra.Command, args []string) {
i := asynq.NewInspector(asynq.RedisClientOpt{ id, score, qtype, err := parseQueryID(args[0])
if err != nil {
fmt.Println(err)
os.Exit(1)
}
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
}) }))
err := i.KillTaskByKey(args[0]) switch qtype {
case "s":
err = r.KillScheduledTask(id, score)
case "r":
err = r.KillRetryTask(id, score)
default:
fmt.Println("invalid argument")
os.Exit(1)
}
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)

View File

@@ -8,7 +8,8 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/hibiken/asynq" "github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
@@ -44,20 +45,19 @@ func init() {
} }
func killall(cmd *cobra.Command, args []string) { func killall(cmd *cobra.Command, args []string) {
i := asynq.NewInspector(asynq.RedisClientOpt{ c := redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
}) })
var ( r := rdb.NewRDB(c)
n int var n int64
err error var err error
)
switch args[0] { switch args[0] {
case "scheduled": case "scheduled":
n, err = i.KillAllScheduledTasks() n, err = r.KillAllScheduledTasks()
case "retry": case "retry":
n, err = i.KillAllRetryTasks() n, err = r.KillAllRetryTasks()
default: default:
fmt.Printf("error: `asynq killall [state]` only accepts %v as the argument.\n", killallValidArgs) fmt.Printf("error: `asynq killall [state]` only accepts %v as the argument.\n", killallValidArgs)
os.Exit(1) os.Exit(1)

View File

@@ -8,10 +8,13 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"strconv"
"strings" "strings"
"time" "time"
"github.com/hibiken/asynq" "github.com/go-redis/redis/v7"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
@@ -59,11 +62,12 @@ func ls(cmd *cobra.Command, args []string) {
fmt.Println("page number cannot be negative.") fmt.Println("page number cannot be negative.")
os.Exit(1) os.Exit(1)
} }
i := asynq.NewInspector(asynq.RedisClientOpt{ c := redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
}) })
r := rdb.NewRDB(c)
parts := strings.Split(args[0], ":") parts := strings.Split(args[0], ":")
switch parts[0] { switch parts[0] {
case "enqueued": case "enqueued":
@@ -71,23 +75,54 @@ func ls(cmd *cobra.Command, args []string) {
fmt.Printf("error: Missing queue name\n`asynq ls enqueued:[queue name]`\n") fmt.Printf("error: Missing queue name\n`asynq ls enqueued:[queue name]`\n")
os.Exit(1) os.Exit(1)
} }
listEnqueued(i, parts[1]) listEnqueued(r, parts[1])
case "inprogress": case "inprogress":
listInProgress(i) listInProgress(r)
case "scheduled": case "scheduled":
listScheduled(i) listScheduled(r)
case "retry": case "retry":
listRetry(i) listRetry(r)
case "dead": case "dead":
listDead(i) listDead(r)
default: default:
fmt.Printf("error: `asynq ls [state]`\nonly accepts %v as the argument.\n", lsValidArgs) fmt.Printf("error: `asynq ls [state]`\nonly accepts %v as the argument.\n", lsValidArgs)
os.Exit(1) os.Exit(1)
} }
} }
func listEnqueued(i *asynq.Inspector, qname string) { // queryID returns an identifier used for "enq" command.
tasks, err := i.ListEnqueuedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum)) // score is the zset score and queryType should be one
// of "s", "r" or "d" (scheduled, retry, dead respectively).
func queryID(id uuid.UUID, score int64, qtype string) string {
const format = "%v:%v:%v"
return fmt.Sprintf(format, qtype, score, id)
}
// parseQueryID is a reverse operation of queryID function.
// It takes a queryID and return each part of id with proper
// type if valid, otherwise it reports an error.
func parseQueryID(queryID string) (id uuid.UUID, score int64, qtype string, err error) {
parts := strings.Split(queryID, ":")
if len(parts) != 3 {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
id, err = uuid.Parse(parts[2])
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
score, err = strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
qtype = parts[0]
if len(qtype) != 1 || !strings.Contains("srd", qtype) {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
return id, score, qtype, nil
}
func listEnqueued(r *rdb.RDB, qname string) {
tasks, err := r.ListEnqueued(qname, rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
@@ -97,16 +132,17 @@ func listEnqueued(i *asynq.Inspector, qname string) {
return return
} }
cols := []string{"ID", "Type", "Payload", "Queue"} cols := []string{"ID", "Type", "Payload", "Queue"}
printTable(cols, func(w io.Writer, tmpl string) { printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, t.Queue) fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, t.Queue)
} }
}) }
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
} }
func listInProgress(i *asynq.Inspector) { func listInProgress(r *rdb.RDB) {
tasks, err := i.ListInProgressTasks(asynq.PageSize(pageSize), asynq.Page(pageNum)) tasks, err := r.ListInProgress(rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
@@ -116,16 +152,17 @@ func listInProgress(i *asynq.Inspector) {
return return
} }
cols := []string{"ID", "Type", "Payload"} cols := []string{"ID", "Type", "Payload"}
printTable(cols, func(w io.Writer, tmpl string) { printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload) fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload)
} }
}) }
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
} }
func listScheduled(i *asynq.Inspector) { func listScheduled(r *rdb.RDB) {
tasks, err := i.ListScheduledTasks(asynq.PageSize(pageSize), asynq.Page(pageNum)) tasks, err := r.ListScheduled(rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
@@ -134,19 +171,19 @@ func listScheduled(i *asynq.Inspector) {
fmt.Println("No scheduled tasks") fmt.Println("No scheduled tasks")
return return
} }
cols := []string{"Key", "Type", "Payload", "Process In", "Queue"} cols := []string{"ID", "Type", "Payload", "Process In", "Queue"}
printTable(cols, func(w io.Writer, tmpl string) { printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
processIn := fmt.Sprintf("%.0f seconds", processIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds())
t.NextEnqueueAt.Sub(time.Now()).Seconds()) fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "s"), t.Type, t.Payload, processIn, t.Queue)
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, processIn, t.Queue)
} }
}) }
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
} }
func listRetry(i *asynq.Inspector) { func listRetry(r *rdb.RDB) {
tasks, err := i.ListRetryTasks(asynq.PageSize(pageSize), asynq.Page(pageNum)) tasks, err := r.ListRetry(rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
@@ -155,23 +192,24 @@ func listRetry(i *asynq.Inspector) {
fmt.Println("No retry tasks") fmt.Println("No retry tasks")
return return
} }
cols := []string{"Key", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry", "Queue"} cols := []string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry", "Queue"}
printTable(cols, func(w io.Writer, tmpl string) { printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
var nextRetry string var nextRetry string
if d := t.NextEnqueueAt.Sub(time.Now()); d > 0 { if d := t.ProcessAt.Sub(time.Now()); d > 0 {
nextRetry = fmt.Sprintf("in %v", d.Round(time.Second)) nextRetry = fmt.Sprintf("in %v", d.Round(time.Second))
} else { } else {
nextRetry = "right now" nextRetry = "right now"
} }
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, nextRetry, t.ErrorMsg, t.Retried, t.MaxRetry, t.Queue) fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "r"), t.Type, t.Payload, nextRetry, t.ErrorMsg, t.Retried, t.Retry, t.Queue)
} }
}) }
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
} }
func listDead(i *asynq.Inspector) { func listDead(r *rdb.RDB) {
tasks, err := i.ListDeadTasks(asynq.PageSize(pageSize), asynq.Page(pageNum)) tasks, err := r.ListDead(rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
@@ -180,11 +218,12 @@ func listDead(i *asynq.Inspector) {
fmt.Println("No dead tasks") fmt.Println("No dead tasks")
return return
} }
cols := []string{"Key", "Type", "Payload", "Last Failed", "Last Error", "Queue"} cols := []string{"ID", "Type", "Payload", "Last Failed", "Last Error", "Queue"}
printTable(cols, func(w io.Writer, tmpl string) { printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg, t.Queue) fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "d"), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg, t.Queue)
} }
}) }
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
} }

View File

@@ -166,7 +166,6 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=