2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-20 09:16:12 +08:00

Compare commits

..

24 Commits

Author SHA1 Message Date
Ken Hibino
04702ddfd2 Change ErrorHandler function signature 2020-07-04 05:53:50 -07:00
Ken Hibino
6705f7c27a Return Result struct to caller of Enqueue 2020-07-03 21:49:53 -07:00
Ken Hibino
e27ae0d33a Replace github.com/rs/xid with github.com/google/uuid 2020-07-02 06:38:13 -07:00
Ken Hibino
6cd0ab65a3 Add version command to CLI 2020-06-29 20:59:15 -07:00
Ken Hibino
83c9d5ae94 Add migrate command to CLI
The command converts all messages in redis to be compatible for asynq
v0.10.0
2020-06-29 06:11:47 -07:00
Ken Hibino
7eebbf181e Update docs 2020-06-29 06:11:47 -07:00
Ken Hibino
7b1770da96 Minor code cleanup 2020-06-29 06:11:47 -07:00
Ken Hibino
e2c5882368 Use int64 type for Timeout and Deadline in TaskMessage 2020-06-29 06:11:47 -07:00
Ken Hibino
50df107ace Clean up processor test 2020-06-29 06:11:47 -07:00
Ken Hibino
9699d196e5 Add recoverer 2020-06-29 06:11:47 -07:00
Ken Hibino
1c5f7a791b Add RDB.ListDeadlineExceeded 2020-06-29 06:11:47 -07:00
Ken Hibino
232efe8279 Fix processor 2020-06-29 06:11:47 -07:00
Ken Hibino
ef4a4a8334 Add deadline to syncRequest
- syncer will drop a request if its deadline has been exceeded
2020-06-29 06:11:47 -07:00
Ken Hibino
65e17a3469 Update processor to adapt for deadlines set change
- Processor dequeues tasks only when it's available to process
- Processor retries a task when its context's Done channel is closed
2020-06-29 06:11:47 -07:00
Ken Hibino
88d94a2a9d Update RDB.Requeue to remove message from deadlines set 2020-06-29 06:11:47 -07:00
Ken Hibino
7433b94aac Update RDB.Dequeue to return deadline as time.Time 2020-06-29 06:11:47 -07:00
Ken Hibino
08ac7793ab Update RDB.Kill to remove message from deadlines set 2020-06-29 06:11:47 -07:00
Ken Hibino
02b653df72 Update RDB.Retry to remove message from deadlines set 2020-06-29 06:11:47 -07:00
Ken Hibino
bee784c052 Update RDB.Done to remove message from deadlines set 2020-06-29 06:11:47 -07:00
Ken Hibino
4ea58052f8 Update RDB.Dequeue to return message and deadline 2020-06-29 06:11:47 -07:00
Ken Hibino
5afb4861a5 Add task message to deadlines set on dequeue
Updated dequeueCmd to decode the message and compute its deadline and add
the message to the Deadline set.
2020-06-29 06:11:47 -07:00
Ken Hibino
68e6b379fc Use default timeout of 30mins if both timeout and deadline are not
provided
2020-06-29 06:11:47 -07:00
Ken Hibino
0e70a14899 Change TaskMessage Timeout and Deadline to int
* This change breaks existing tasks in Redis
2020-06-29 06:11:47 -07:00
Ken Hibino
f01c7b8e66 Add redis key for deadlines in base package 2020-06-29 06:11:47 -07:00
28 changed files with 966 additions and 3125 deletions

View File

@@ -7,16 +7,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
## [0.11.0] - 2020-07-28
### Added
- `Inspector` type was added to monitor and mutate state of queues and tasks.
- `HealthCheckFunc` and `HealthCheckInterval` fields were added to `Config` to allow user to specify a callback
function to check for broker connection.
## [0.10.0] - 2020-07-06
### Changed
- All tasks now requires timeout or deadline. By default, timeout is set to 30 mins.

View File

@@ -34,7 +34,6 @@ A system can consist of multiple worker servers and brokers, giving way to high
- Scheduling of tasks
- Durability since tasks are written to Redis
- [Retries](https://github.com/hibiken/asynq/wiki/Task-Retry) of failed tasks
- Automatic recovery of tasks in the event of a worker crash
- [Weighted priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#weighted-priority-queues)
- [Strict priority queues](https://github.com/hibiken/asynq/wiki/Priority-Queues#strict-priority-queues)
- Low latency to add a task since writes are fast in Redis

View File

@@ -36,7 +36,7 @@ func TestClientEnqueueAt(t *testing.T) {
opts []Option
wantRes *Result
wantEnqueued map[string][]*base.TaskMessage
wantScheduled []base.Z
wantScheduled []h.ZSetEntry
}{
{
desc: "Process task immediately",
@@ -75,9 +75,9 @@ func TestClientEnqueueAt(t *testing.T) {
Deadline: noDeadline,
},
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
wantScheduled: []base.Z{
wantScheduled: []h.ZSetEntry{
{
Message: &base.TaskMessage{
Msg: &base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
@@ -85,7 +85,7 @@ func TestClientEnqueueAt(t *testing.T) {
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
Score: oneHourLater.Unix(),
Score: float64(oneHourLater.Unix()),
},
},
},
@@ -376,7 +376,7 @@ func TestClientEnqueueIn(t *testing.T) {
opts []Option
wantRes *Result
wantEnqueued map[string][]*base.TaskMessage
wantScheduled []base.Z
wantScheduled []h.ZSetEntry
}{
{
desc: "schedule a task to be enqueued in one hour",
@@ -390,9 +390,9 @@ func TestClientEnqueueIn(t *testing.T) {
Deadline: noDeadline,
},
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
wantScheduled: []base.Z{
wantScheduled: []h.ZSetEntry{
{
Message: &base.TaskMessage{
Msg: &base.TaskMessage{
Type: task.Type,
Payload: task.Payload.data,
Retry: defaultMaxRetry,
@@ -400,7 +400,7 @@ func TestClientEnqueueIn(t *testing.T) {
Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(),
},
Score: time.Now().Add(time.Hour).Unix(),
Score: float64(time.Now().Add(time.Hour).Unix()),
},
},
},

View File

@@ -1,80 +0,0 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"sync"
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
)
// healthchecker is responsible for pinging broker periodically
// and call user provided HeathCheckFunc with the ping result.
type healthchecker struct {
logger *log.Logger
broker base.Broker
// channel to communicate back to the long running "healthchecker" goroutine.
done chan struct{}
// interval between healthchecks.
interval time.Duration
// function to call periodically.
healthcheckFunc func(error)
}
type healthcheckerParams struct {
logger *log.Logger
broker base.Broker
interval time.Duration
healthcheckFunc func(error)
}
func newHealthChecker(params healthcheckerParams) *healthchecker {
return &healthchecker{
logger: params.logger,
broker: params.broker,
done: make(chan struct{}),
interval: params.interval,
healthcheckFunc: params.healthcheckFunc,
}
}
func (hc *healthchecker) terminate() {
if hc.healthcheckFunc == nil {
return
}
hc.logger.Debug("Healthchecker shutting down...")
// Signal the healthchecker goroutine to stop.
hc.done <- struct{}{}
}
func (hc *healthchecker) start(wg *sync.WaitGroup) {
if hc.healthcheckFunc == nil {
return
}
wg.Add(1)
go func() {
defer wg.Done()
timer := time.NewTimer(hc.interval)
for {
select {
case <-hc.done:
hc.logger.Debug("Healthchecker done")
timer.Stop()
return
case <-timer.C:
err := hc.broker.Ping()
hc.healthcheckFunc(err)
timer.Reset(hc.interval)
}
}
}()
}

View File

@@ -1,101 +0,0 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"sync"
"testing"
"time"
"github.com/hibiken/asynq/internal/rdb"
"github.com/hibiken/asynq/internal/testbroker"
)
func TestHealthChecker(t *testing.T) {
r := setup(t)
rdbClient := rdb.NewRDB(r)
var (
// mu guards called and e variables.
mu sync.Mutex
called int
e error
)
checkFn := func(err error) {
mu.Lock()
defer mu.Unlock()
called++
e = err
}
hc := newHealthChecker(healthcheckerParams{
logger: testLogger,
broker: rdbClient,
interval: 1 * time.Second,
healthcheckFunc: checkFn,
})
hc.start(&sync.WaitGroup{})
time.Sleep(2 * time.Second)
mu.Lock()
if called == 0 {
t.Errorf("Healthchecker did not call the provided HealthCheckFunc")
}
if e != nil {
t.Errorf("HealthCheckFunc was called with non-nil error: %v", e)
}
mu.Unlock()
hc.terminate()
}
func TestHealthCheckerWhenRedisDown(t *testing.T) {
// Make sure that healthchecker goroutine doesn't panic
// if it cannot connect to redis.
defer func() {
if r := recover(); r != nil {
t.Errorf("panic occurred: %v", r)
}
}()
r := rdb.NewRDB(setup(t))
testBroker := testbroker.NewTestBroker(r)
var (
// mu guards called and e variables.
mu sync.Mutex
called int
e error
)
checkFn := func(err error) {
mu.Lock()
defer mu.Unlock()
called++
e = err
}
hc := newHealthChecker(healthcheckerParams{
logger: testLogger,
broker: testBroker,
interval: 1 * time.Second,
healthcheckFunc: checkFn,
})
testBroker.Sleep()
hc.start(&sync.WaitGroup{})
time.Sleep(2 * time.Second)
mu.Lock()
if called == 0 {
t.Errorf("Healthchecker did not call the provided HealthCheckFunc")
}
if e == nil {
t.Errorf("HealthCheckFunc was called with nil; want non-nil error")
}
mu.Unlock()
hc.terminate()
}

View File

@@ -1,500 +0,0 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/rdb"
)
// Inspector is a client interface to inspect and mutate the state of
// queues and tasks.
type Inspector struct {
rdb *rdb.RDB
}
// New returns a new instance of Inspector.
func NewInspector(r RedisConnOpt) *Inspector {
return &Inspector{
rdb: rdb.NewRDB(createRedisClient(r)),
}
}
// Stats represents a state of queues at a certain time.
type Stats struct {
Enqueued int
InProgress int
Scheduled int
Retry int
Dead int
Processed int
Failed int
Queues []*QueueInfo
Timestamp time.Time
}
// QueueInfo holds information about a queue.
type QueueInfo struct {
// Name of the queue (e.g. "default", "critical").
// Note: It doesn't include the prefix "asynq:queues:".
Name string
// Paused indicates whether the queue is paused.
// If true, tasks in the queue should not be processed.
Paused bool
// Size is the number of tasks in the queue.
Size int
}
// CurrentStats returns a current stats of the queues.
func (i *Inspector) CurrentStats() (*Stats, error) {
stats, err := i.rdb.CurrentStats()
if err != nil {
return nil, err
}
var qs []*QueueInfo
for _, q := range stats.Queues {
qs = append(qs, (*QueueInfo)(q))
}
return &Stats{
Enqueued: stats.Enqueued,
InProgress: stats.InProgress,
Scheduled: stats.Scheduled,
Retry: stats.Retry,
Dead: stats.Dead,
Processed: stats.Processed,
Failed: stats.Failed,
Queues: qs,
Timestamp: stats.Timestamp,
}, nil
}
// DailyStats holds aggregate data for a given day.
type DailyStats struct {
Processed int
Failed int
Date time.Time
}
// History returns a list of stats from the last n days.
func (i *Inspector) History(n int) ([]*DailyStats, error) {
stats, err := i.rdb.HistoricalStats(n)
if err != nil {
return nil, err
}
var res []*DailyStats
for _, s := range stats {
res = append(res, &DailyStats{
Processed: s.Processed,
Failed: s.Failed,
Date: s.Time,
})
}
return res, nil
}
// EnqueuedTask is a task in a queue and is ready to be processed.
type EnqueuedTask struct {
*Task
ID string
Queue string
}
// InProgressTask is a task that's currently being processed.
type InProgressTask struct {
*Task
ID string
}
// ScheduledTask is a task scheduled to be processed in the future.
type ScheduledTask struct {
*Task
ID string
Queue string
NextEnqueueAt time.Time
score int64
}
// RetryTask is a task scheduled to be retried in the future.
type RetryTask struct {
*Task
ID string
Queue string
NextEnqueueAt time.Time
MaxRetry int
Retried int
ErrorMsg string
// TODO: LastFailedAt time.Time
score int64
}
// DeadTask is a task exhausted its retries.
// DeadTask won't be retried automatically.
type DeadTask struct {
*Task
ID string
Queue string
MaxRetry int
Retried int
LastFailedAt time.Time
ErrorMsg string
score int64
}
// Key returns a key used to delete, enqueue, and kill the task.
func (t *ScheduledTask) Key() string {
return fmt.Sprintf("s:%v:%v", t.ID, t.score)
}
// Key returns a key used to delete, enqueue, and kill the task.
func (t *RetryTask) Key() string {
return fmt.Sprintf("r:%v:%v", t.ID, t.score)
}
// Key returns a key used to delete, enqueue, and kill the task.
func (t *DeadTask) Key() string {
return fmt.Sprintf("d:%v:%v", t.ID, t.score)
}
// parseTaskKey parses a key string and returns each part of key with proper
// type if valid, otherwise it reports an error.
func parseTaskKey(key string) (id uuid.UUID, score int64, qtype string, err error) {
parts := strings.Split(key, ":")
if len(parts) != 3 {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
id, err = uuid.Parse(parts[1])
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
score, err = strconv.ParseInt(parts[2], 10, 64)
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
qtype = parts[0]
if len(qtype) != 1 || !strings.Contains("srd", qtype) {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
return id, score, qtype, nil
}
// ListOption specifies behavior of list operation.
type ListOption interface{}
// Internal list option representations.
type (
pageSizeOpt int
pageNumOpt int
)
type listOption struct {
pageSize int
pageNum int
}
const (
// Page size used by default in list operation.
defaultPageSize = 30
// Page number used by default in list operation.
defaultPageNum = 1
)
func composeListOptions(opts ...ListOption) listOption {
res := listOption{
pageSize: defaultPageSize,
pageNum: defaultPageNum,
}
for _, opt := range opts {
switch opt := opt.(type) {
case pageSizeOpt:
res.pageSize = int(opt)
case pageNumOpt:
res.pageNum = int(opt)
default:
// ignore unexpected option
}
}
return res
}
// PageSize returns an option to specify the page size for list operation.
//
// Negative page size is treated as zero.
func PageSize(n int) ListOption {
if n < 0 {
n = 0
}
return pageSizeOpt(n)
}
// Page returns an option to specify the page number for list operation.
// The value 1 fetches the first page.
//
// Negative page number is treated as one.
func Page(n int) ListOption {
if n < 0 {
n = 1
}
return pageNumOpt(n)
}
// ListScheduledTasks retrieves tasks in the specified queue.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*EnqueuedTask, error) {
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListEnqueued(qname, pgn)
if err != nil {
return nil, err
}
var tasks []*EnqueuedTask
for _, m := range msgs {
tasks = append(tasks, &EnqueuedTask{
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
})
}
return tasks, err
}
// ListScheduledTasks retrieves tasks currently being processed.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListInProgressTasks(opts ...ListOption) ([]*InProgressTask, error) {
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListInProgress(pgn)
if err != nil {
return nil, err
}
var tasks []*InProgressTask
for _, m := range msgs {
tasks = append(tasks, &InProgressTask{
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
})
}
return tasks, err
}
// ListScheduledTasks retrieves tasks in scheduled state.
// Tasks are sorted by NextEnqueueAt field in ascending order.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListScheduledTasks(opts ...ListOption) ([]*ScheduledTask, error) {
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListScheduled(pgn)
if err != nil {
return nil, err
}
var tasks []*ScheduledTask
for _, z := range zs {
enqueueAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &ScheduledTask{
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,
NextEnqueueAt: enqueueAt,
score: z.Score,
})
}
return tasks, nil
}
// ListScheduledTasks retrieves tasks in retry state.
// Tasks are sorted by NextEnqueueAt field in ascending order.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListRetryTasks(opts ...ListOption) ([]*RetryTask, error) {
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListRetry(pgn)
if err != nil {
return nil, err
}
var tasks []*RetryTask
for _, z := range zs {
enqueueAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &RetryTask{
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,
NextEnqueueAt: enqueueAt,
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
// TODO: LastFailedAt: z.Message.LastFailedAt
ErrorMsg: z.Message.ErrorMsg,
score: z.Score,
})
}
return tasks, nil
}
// ListScheduledTasks retrieves tasks in retry state.
// Tasks are sorted by LastFailedAt field in descending order.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListDeadTasks(opts ...ListOption) ([]*DeadTask, error) {
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListDead(pgn)
if err != nil {
return nil, err
}
var tasks []*DeadTask
for _, z := range zs {
failedAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &DeadTask{
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
LastFailedAt: failedAt,
ErrorMsg: z.Message.ErrorMsg,
score: z.Score,
})
}
return tasks, nil
return nil, nil
}
// DeleteAllScheduledTasks deletes all tasks in scheduled state,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllScheduledTasks() (int, error) {
n, err := i.rdb.DeleteAllScheduledTasks()
return int(n), err
}
// DeleteAllRetryTasks deletes all tasks in retry state,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllRetryTasks() (int, error) {
n, err := i.rdb.DeleteAllRetryTasks()
return int(n), err
}
// DeleteAllDeadTasks deletes all tasks in dead state,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllDeadTasks() (int, error) {
n, err := i.rdb.DeleteAllDeadTasks()
return int(n), err
}
// DeleteTaskByKey deletes a task with the given key.
func (i *Inspector) DeleteTaskByKey(key string) error {
id, score, qtype, err := parseTaskKey(key)
if err != nil {
return err
}
switch qtype {
case "s":
return i.rdb.DeleteScheduledTask(id, score)
case "r":
return i.rdb.DeleteRetryTask(id, score)
case "d":
return i.rdb.DeleteDeadTask(id, score)
default:
return fmt.Errorf("invalid key")
}
}
// EnqueueAllScheduledTasks enqueues all tasks in the scheduled state,
// and reports the number of tasks enqueued.
func (i *Inspector) EnqueueAllScheduledTasks() (int, error) {
n, err := i.rdb.EnqueueAllScheduledTasks()
return int(n), err
}
// EnqueueAllRetryTasks enqueues all tasks in the retry state,
// and reports the number of tasks enqueued.
func (i *Inspector) EnqueueAllRetryTasks() (int, error) {
n, err := i.rdb.EnqueueAllRetryTasks()
return int(n), err
}
// EnqueueAllDeadTasks enqueues all tasks in the dead state,
// and reports the number of tasks enqueued.
func (i *Inspector) EnqueueAllDeadTasks() (int, error) {
n, err := i.rdb.EnqueueAllDeadTasks()
return int(n), err
}
// EnqueueTaskByKey enqueues a task with the given key.
func (i *Inspector) EnqueueTaskByKey(key string) error {
id, score, qtype, err := parseTaskKey(key)
if err != nil {
return err
}
switch qtype {
case "s":
return i.rdb.EnqueueScheduledTask(id, score)
case "r":
return i.rdb.EnqueueRetryTask(id, score)
case "d":
return i.rdb.EnqueueDeadTask(id, score)
default:
return fmt.Errorf("invalid key")
}
}
// KillAllScheduledTasks kills all tasks in scheduled state,
// and reports the number of tasks killed.
func (i *Inspector) KillAllScheduledTasks() (int, error) {
n, err := i.rdb.KillAllScheduledTasks()
return int(n), err
}
// KillAllRetryTasks kills all tasks in retry state,
// and reports the number of tasks killed.
func (i *Inspector) KillAllRetryTasks() (int, error) {
n, err := i.rdb.KillAllRetryTasks()
return int(n), err
}
// KillTaskByKey kills a task with the given key.
func (i *Inspector) KillTaskByKey(key string) error {
id, score, qtype, err := parseTaskKey(key)
if err != nil {
return err
}
switch qtype {
case "s":
return i.rdb.KillScheduledTask(id, score)
case "r":
return i.rdb.KillRetryTask(id, score)
case "d":
return fmt.Errorf("task already dead")
default:
return fmt.Errorf("invalid key")
}
}
// PauseQueue pauses task processing on the specified queue.
// If the queue is already paused, it will return a non-nil error.
func (i *Inspector) PauseQueue(qname string) error {
return i.rdb.Pause(qname)
}
// UnpauseQueue resumes task processing on the specified queue.
// If the queue is not paused, it will return a non-nil error.
func (i *Inspector) UnpauseQueue(qname string) error {
return i.rdb.Unpause(qname)
}

File diff suppressed because it is too large Load Diff

View File

@@ -17,6 +17,12 @@ import (
"github.com/hibiken/asynq/internal/base"
)
// ZSetEntry is an entry in redis sorted set.
type ZSetEntry struct {
Msg *base.TaskMessage
Score float64
}
// SortMsgOpt is a cmp.Option to sort base.TaskMessage for comparing slice of task messages.
var SortMsgOpt = cmp.Transformer("SortTaskMessages", func(in []*base.TaskMessage) []*base.TaskMessage {
out := append([]*base.TaskMessage(nil), in...) // Copy input to avoid mutating it
@@ -27,10 +33,10 @@ var SortMsgOpt = cmp.Transformer("SortTaskMessages", func(in []*base.TaskMessage
})
// SortZSetEntryOpt is an cmp.Option to sort ZSetEntry for comparing slice of zset entries.
var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []base.Z) []base.Z {
out := append([]base.Z(nil), in...) // Copy input to avoid mutating it
var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []ZSetEntry) []ZSetEntry {
out := append([]ZSetEntry(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].Message.ID.String() < out[j].Message.ID.String()
return out[i].Msg.ID.String() < out[j].Msg.ID.String()
})
return out
})
@@ -171,15 +177,6 @@ func SeedEnqueuedQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage,
seedRedisList(tb, r, queue, msgs)
}
// SeedAllEnqueuedQueues initializes all of the specified queues with the given messages.
//
// enqueued maps a queue name to a list of messages.
func SeedAllEnqueuedQueues(tb testing.TB, r *redis.Client, enqueued map[string][]*base.TaskMessage) {
for q, msgs := range enqueued {
SeedEnqueuedQueue(tb, r, msgs, q)
}
}
// SeedInProgressQueue initializes the in-progress queue with the given messages.
func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) {
tb.Helper()
@@ -187,25 +184,25 @@ func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessag
}
// SeedScheduledQueue initializes the scheduled queue with the given messages.
func SeedScheduledQueue(tb testing.TB, r *redis.Client, entries []base.Z) {
func SeedScheduledQueue(tb testing.TB, r *redis.Client, entries []ZSetEntry) {
tb.Helper()
seedRedisZSet(tb, r, base.ScheduledQueue, entries)
}
// SeedRetryQueue initializes the retry queue with the given messages.
func SeedRetryQueue(tb testing.TB, r *redis.Client, entries []base.Z) {
func SeedRetryQueue(tb testing.TB, r *redis.Client, entries []ZSetEntry) {
tb.Helper()
seedRedisZSet(tb, r, base.RetryQueue, entries)
}
// SeedDeadQueue initializes the dead queue with the given messages.
func SeedDeadQueue(tb testing.TB, r *redis.Client, entries []base.Z) {
func SeedDeadQueue(tb testing.TB, r *redis.Client, entries []ZSetEntry) {
tb.Helper()
seedRedisZSet(tb, r, base.DeadQueue, entries)
}
// SeedDeadlines initializes the deadlines set with the given entries.
func SeedDeadlines(tb testing.TB, r *redis.Client, entries []base.Z) {
func SeedDeadlines(tb testing.TB, r *redis.Client, entries []ZSetEntry) {
tb.Helper()
seedRedisZSet(tb, r, base.KeyDeadlines, entries)
}
@@ -219,9 +216,9 @@ func seedRedisList(tb testing.TB, c *redis.Client, key string, msgs []*base.Task
}
}
func seedRedisZSet(tb testing.TB, c *redis.Client, key string, items []base.Z) {
func seedRedisZSet(tb testing.TB, c *redis.Client, key string, items []ZSetEntry) {
for _, item := range items {
z := &redis.Z{Member: MustMarshal(tb, item.Message), Score: float64(item.Score)}
z := &redis.Z{Member: MustMarshal(tb, item.Msg), Score: float64(item.Score)}
if err := c.ZAdd(key, z).Err(); err != nil {
tb.Fatal(err)
}
@@ -265,25 +262,25 @@ func GetDeadMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage {
}
// GetScheduledEntries returns all task messages and its score in the scheduled queue.
func GetScheduledEntries(tb testing.TB, r *redis.Client) []base.Z {
func GetScheduledEntries(tb testing.TB, r *redis.Client) []ZSetEntry {
tb.Helper()
return getZSetEntries(tb, r, base.ScheduledQueue)
}
// GetRetryEntries returns all task messages and its score in the retry queue.
func GetRetryEntries(tb testing.TB, r *redis.Client) []base.Z {
func GetRetryEntries(tb testing.TB, r *redis.Client) []ZSetEntry {
tb.Helper()
return getZSetEntries(tb, r, base.RetryQueue)
}
// GetDeadEntries returns all task messages and its score in the dead queue.
func GetDeadEntries(tb testing.TB, r *redis.Client) []base.Z {
func GetDeadEntries(tb testing.TB, r *redis.Client) []ZSetEntry {
tb.Helper()
return getZSetEntries(tb, r, base.DeadQueue)
}
// GetDeadlinesEntries returns all task messages and its score in the deadlines set.
func GetDeadlinesEntries(tb testing.TB, r *redis.Client) []base.Z {
func GetDeadlinesEntries(tb testing.TB, r *redis.Client) []ZSetEntry {
tb.Helper()
return getZSetEntries(tb, r, base.KeyDeadlines)
}
@@ -298,13 +295,13 @@ func getZSetMessages(tb testing.TB, r *redis.Client, zset string) []*base.TaskMe
return MustUnmarshalSlice(tb, data)
}
func getZSetEntries(tb testing.TB, r *redis.Client, zset string) []base.Z {
func getZSetEntries(tb testing.TB, r *redis.Client, zset string) []ZSetEntry {
data := r.ZRangeWithScores(zset, 0, -1).Val()
var entries []base.Z
var entries []ZSetEntry
for _, z := range data {
entries = append(entries, base.Z{
Message: MustUnmarshal(tb, z.Member.(string)),
Score: int64(z.Score),
entries = append(entries, ZSetEntry{
Msg: MustUnmarshal(tb, z.Member.(string)),
Score: z.Score,
})
}
return entries

View File

@@ -133,12 +133,6 @@ func DecodeMessage(s string) (*TaskMessage, error) {
return &msg, nil
}
// Z represents sorted set member.
type Z struct {
Message *TaskMessage
Score int64
}
// ServerStatus represents status of a server.
// ServerStatus methods are concurrency safe.
type ServerStatus struct {
@@ -263,7 +257,6 @@ func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool) {
//
// See rdb.RDB as a reference implementation.
type Broker interface {
Ping() error
Enqueue(msg *TaskMessage) error
EnqueueUnique(msg *TaskMessage, ttl time.Duration) error
Dequeue(qnames ...string) (*TaskMessage, time.Time, error)

View File

@@ -51,6 +51,56 @@ type DailyStats struct {
Time time.Time
}
// EnqueuedTask is a task in a queue and is ready to be processed.
type EnqueuedTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
Queue string
}
// InProgressTask is a task that's currently being processed.
type InProgressTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
}
// ScheduledTask is a task that's scheduled to be processed in the future.
type ScheduledTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
ProcessAt time.Time
Score int64
Queue string
}
// RetryTask is a task that's in retry queue because worker failed to process the task.
type RetryTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
// TODO(hibiken): add LastFailedAt time.Time
ProcessAt time.Time
ErrorMsg string
Retried int
Retry int
Score int64
Queue string
}
// DeadTask is a task in that has exhausted all retries.
type DeadTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
LastFailedAt time.Time
ErrorMsg string
Score int64
Queue string
}
// KEYS[1] -> asynq:queues
// KEYS[2] -> asynq:in_progress
// KEYS[3] -> asynq:scheduled
@@ -239,79 +289,158 @@ func (p Pagination) stop() int64 {
}
// ListEnqueued returns enqueued tasks that are ready to be processed.
func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*base.TaskMessage, error) {
func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*EnqueuedTask, error) {
qkey := base.QueueKey(qname)
if !r.client.SIsMember(base.AllQueues, qkey).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listMessages(qkey, pgn)
}
// ListInProgress returns all tasks that are currently being processed.
func (r *RDB) ListInProgress(pgn Pagination) ([]*base.TaskMessage, error) {
return r.listMessages(base.InProgressQueue, pgn)
}
// listMessages returns a list of TaskMessage in Redis list with the given key.
func (r *RDB) listMessages(key string, pgn Pagination) ([]*base.TaskMessage, error) {
// Note: Because we use LPUSH to redis list, we need to calculate the
// correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1
start := -pgn.stop() - 1
data, err := r.client.LRange(key, start, stop).Result()
data, err := r.client.LRange(qkey, start, stop).Result()
if err != nil {
return nil, err
}
reverse(data)
var msgs []*base.TaskMessage
var tasks []*EnqueuedTask
for _, s := range data {
m, err := base.DecodeMessage(s)
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
}
msgs = append(msgs, m)
tasks = append(tasks, &EnqueuedTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
Queue: msg.Queue,
})
}
return tasks, nil
}
return msgs, nil
// ListInProgress returns all tasks that are currently being processed.
func (r *RDB) ListInProgress(pgn Pagination) ([]*InProgressTask, error) {
// Note: Because we use LPUSH to redis list, we need to calculate the
// correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1
start := -pgn.stop() - 1
data, err := r.client.LRange(base.InProgressQueue, start, stop).Result()
if err != nil {
return nil, err
}
reverse(data)
var tasks []*InProgressTask
for _, s := range data {
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
}
tasks = append(tasks, &InProgressTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
})
}
return tasks, nil
}
// ListScheduled returns all tasks that are scheduled to be processed
// in the future.
func (r *RDB) ListScheduled(pgn Pagination) ([]base.Z, error) {
return r.listZSetEntries(base.ScheduledQueue, pgn)
}
// ListRetry returns all tasks that have failed before and willl be retried
// in the future.
func (r *RDB) ListRetry(pgn Pagination) ([]base.Z, error) {
return r.listZSetEntries(base.RetryQueue, pgn)
}
// ListDead returns all tasks that have exhausted its retry limit.
func (r *RDB) ListDead(pgn Pagination) ([]base.Z, error) {
return r.listZSetEntries(base.DeadQueue, pgn)
}
// listZSetEntries returns a list of message and score pairs in Redis sorted-set
// with the given key.
func (r *RDB) listZSetEntries(key string, pgn Pagination) ([]base.Z, error) {
data, err := r.client.ZRangeWithScores(key, pgn.start(), pgn.stop()).Result()
func (r *RDB) ListScheduled(pgn Pagination) ([]*ScheduledTask, error) {
data, err := r.client.ZRangeWithScores(base.ScheduledQueue, pgn.start(), pgn.stop()).Result()
if err != nil {
return nil, err
}
var res []base.Z
var tasks []*ScheduledTask
for _, z := range data {
s, ok := z.Member.(string)
if !ok {
continue // bad data, ignore and continue
}
msg, err := base.DecodeMessage(s)
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
}
res = append(res, base.Z{msg, int64(z.Score)})
processAt := time.Unix(int64(z.Score), 0)
tasks = append(tasks, &ScheduledTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
Queue: msg.Queue,
ProcessAt: processAt,
Score: int64(z.Score),
})
}
return res, nil
return tasks, nil
}
// ListRetry returns all tasks that have failed before and willl be retried
// in the future.
func (r *RDB) ListRetry(pgn Pagination) ([]*RetryTask, error) {
data, err := r.client.ZRangeWithScores(base.RetryQueue, pgn.start(), pgn.stop()).Result()
if err != nil {
return nil, err
}
var tasks []*RetryTask
for _, z := range data {
s, ok := z.Member.(string)
if !ok {
continue // bad data, ignore and continue
}
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
}
processAt := time.Unix(int64(z.Score), 0)
tasks = append(tasks, &RetryTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
ErrorMsg: msg.ErrorMsg,
Retry: msg.Retry,
Retried: msg.Retried,
Queue: msg.Queue,
ProcessAt: processAt,
Score: int64(z.Score),
})
}
return tasks, nil
}
// ListDead returns all tasks that have exhausted its retry limit.
func (r *RDB) ListDead(pgn Pagination) ([]*DeadTask, error) {
data, err := r.client.ZRangeWithScores(base.DeadQueue, pgn.start(), pgn.stop()).Result()
if err != nil {
return nil, err
}
var tasks []*DeadTask
for _, z := range data {
s, ok := z.Member.(string)
if !ok {
continue // bad data, ignore and continue
}
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
}
lastFailedAt := time.Unix(int64(z.Score), 0)
tasks = append(tasks, &DeadTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
ErrorMsg: msg.ErrorMsg,
Queue: msg.Queue,
LastFailedAt: lastFailedAt,
Score: int64(z.Score),
})
}
return tasks, nil
}
// EnqueueDeadTask finds a task that matches the given id and score from dead queue
@@ -575,40 +704,19 @@ func (r *RDB) deleteTask(zset, id string, score float64) error {
return nil
}
// KEYS[1] -> queue to delete
var deleteAllCmd = redis.NewScript(`
local n = redis.call("ZCARD", KEYS[1])
redis.call("DEL", KEYS[1])
return n`)
// DeleteAllDeadTasks deletes all tasks from the dead queue
// and returns the number of tasks deleted.
func (r *RDB) DeleteAllDeadTasks() (int64, error) {
return r.deleteAll(base.DeadQueue)
// DeleteAllDeadTasks deletes all tasks from the dead queue.
func (r *RDB) DeleteAllDeadTasks() error {
return r.client.Del(base.DeadQueue).Err()
}
// DeleteAllRetryTasks deletes all tasks from the dead queue
// and returns the number of tasks deleted.
func (r *RDB) DeleteAllRetryTasks() (int64, error) {
return r.deleteAll(base.RetryQueue)
// DeleteAllRetryTasks deletes all tasks from the dead queue.
func (r *RDB) DeleteAllRetryTasks() error {
return r.client.Del(base.RetryQueue).Err()
}
// DeleteAllScheduledTasks deletes all tasks from the dead queue
// and returns the number of tasks deleted.
func (r *RDB) DeleteAllScheduledTasks() (int64, error) {
return r.deleteAll(base.ScheduledQueue)
}
func (r *RDB) deleteAll(key string) (int64, error) {
res, err := deleteAllCmd.Run(r.client, []string{key}).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
}
return n, nil
// DeleteAllScheduledTasks deletes all tasks from the dead queue.
func (r *RDB) DeleteAllScheduledTasks() error {
return r.client.Del(base.ScheduledQueue).Err()
}
// ErrQueueNotFound indicates specified queue does not exist.

File diff suppressed because it is too large Load Diff

View File

@@ -45,11 +45,6 @@ func (r *RDB) Close() error {
return r.client.Close()
}
// Ping checks the connection with redis server.
func (r *RDB) Ping() error {
return r.client.Ping().Err()
}
// KEYS[1] -> asynq:queues:<qname>
// KEYS[2] -> asynq:queues
// ARGV[1] -> task message data

View File

@@ -148,7 +148,7 @@ func TestDequeue(t *testing.T) {
err error
wantEnqueued map[string][]*base.TaskMessage
wantInProgress []*base.TaskMessage
wantDeadlines []base.Z
wantDeadlines []h.ZSetEntry
}{
{
enqueued: map[string][]*base.TaskMessage{
@@ -162,10 +162,10 @@ func TestDequeue(t *testing.T) {
"default": {},
},
wantInProgress: []*base.TaskMessage{t1},
wantDeadlines: []base.Z{
wantDeadlines: []h.ZSetEntry{
{
Message: t1,
Score: t1Deadline,
Msg: t1,
Score: float64(t1Deadline),
},
},
},
@@ -181,7 +181,7 @@ func TestDequeue(t *testing.T) {
"default": {},
},
wantInProgress: []*base.TaskMessage{},
wantDeadlines: []base.Z{},
wantDeadlines: []h.ZSetEntry{},
},
{
enqueued: map[string][]*base.TaskMessage{
@@ -199,10 +199,10 @@ func TestDequeue(t *testing.T) {
"low": {t3},
},
wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []base.Z{
wantDeadlines: []h.ZSetEntry{
{
Message: t2,
Score: t2Deadline,
Msg: t2,
Score: float64(t2Deadline),
},
},
},
@@ -222,10 +222,10 @@ func TestDequeue(t *testing.T) {
"low": {t2, t1},
},
wantInProgress: []*base.TaskMessage{t3},
wantDeadlines: []base.Z{
wantDeadlines: []h.ZSetEntry{
{
Message: t3,
Score: t3Deadline,
Msg: t3,
Score: float64(t3Deadline),
},
},
},
@@ -245,7 +245,7 @@ func TestDequeue(t *testing.T) {
"low": {},
},
wantInProgress: []*base.TaskMessage{},
wantDeadlines: []base.Z{},
wantDeadlines: []h.ZSetEntry{},
},
}
@@ -412,70 +412,70 @@ func TestDone(t *testing.T) {
tests := []struct {
inProgress []*base.TaskMessage // initial state of the in-progress list
deadlines []base.Z // initial state of deadlines set
deadlines []h.ZSetEntry // initial state of deadlines set
target *base.TaskMessage // task to remove
wantInProgress []*base.TaskMessage // final state of the in-progress list
wantDeadlines []base.Z // final state of the deadline set
wantDeadlines []h.ZSetEntry // final state of the deadline set
}{
{
inProgress: []*base.TaskMessage{t1, t2},
deadlines: []base.Z{
deadlines: []h.ZSetEntry{
{
Message: t1,
Score: t1Deadline,
Msg: t1,
Score: float64(t1Deadline),
},
{
Message: t2,
Score: t2Deadline,
Msg: t2,
Score: float64(t2Deadline),
},
},
target: t1,
wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []base.Z{
wantDeadlines: []h.ZSetEntry{
{
Message: t2,
Score: t2Deadline,
Msg: t2,
Score: float64(t2Deadline),
},
},
},
{
inProgress: []*base.TaskMessage{t1},
deadlines: []base.Z{
deadlines: []h.ZSetEntry{
{
Message: t1,
Score: t1Deadline,
Msg: t1,
Score: float64(t1Deadline),
},
},
target: t1,
wantInProgress: []*base.TaskMessage{},
wantDeadlines: []base.Z{},
wantDeadlines: []h.ZSetEntry{},
},
{
inProgress: []*base.TaskMessage{t1, t2, t3},
deadlines: []base.Z{
deadlines: []h.ZSetEntry{
{
Message: t1,
Score: t1Deadline,
Msg: t1,
Score: float64(t1Deadline),
},
{
Message: t2,
Score: t2Deadline,
Msg: t2,
Score: float64(t2Deadline),
},
{
Message: t3,
Score: t3Deadline,
Msg: t3,
Score: float64(t3Deadline),
},
},
target: t3,
wantInProgress: []*base.TaskMessage{t1, t2},
wantDeadlines: []base.Z{
wantDeadlines: []h.ZSetEntry{
{
Message: t1,
Score: t1Deadline,
Msg: t1,
Score: float64(t1Deadline),
},
{
Message: t2,
Score: t2Deadline,
Msg: t2,
Score: float64(t2Deadline),
},
},
},
@@ -560,28 +560,28 @@ func TestRequeue(t *testing.T) {
tests := []struct {
enqueued map[string][]*base.TaskMessage // initial state of queues
inProgress []*base.TaskMessage // initial state of the in-progress list
deadlines []base.Z // initial state of the deadlines set
deadlines []h.ZSetEntry // initial state of the deadlines set
target *base.TaskMessage // task to requeue
wantEnqueued map[string][]*base.TaskMessage // final state of queues
wantInProgress []*base.TaskMessage // final state of the in-progress list
wantDeadlines []base.Z // final state of the deadlines set
wantDeadlines []h.ZSetEntry // final state of the deadlines set
}{
{
enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {},
},
inProgress: []*base.TaskMessage{t1, t2},
deadlines: []base.Z{
{Message: t1, Score: t1Deadline},
{Message: t2, Score: t2Deadline},
deadlines: []h.ZSetEntry{
{Msg: t1, Score: float64(t1Deadline)},
{Msg: t2, Score: float64(t2Deadline)},
},
target: t1,
wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1},
},
wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []base.Z{
{Message: t2, Score: t2Deadline},
wantDeadlines: []h.ZSetEntry{
{Msg: t2, Score: float64(t2Deadline)},
},
},
{
@@ -589,15 +589,15 @@ func TestRequeue(t *testing.T) {
base.DefaultQueueName: {t1},
},
inProgress: []*base.TaskMessage{t2},
deadlines: []base.Z{
{Message: t2, Score: t2Deadline},
deadlines: []h.ZSetEntry{
{Msg: t2, Score: float64(t2Deadline)},
},
target: t2,
wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1, t2},
},
wantInProgress: []*base.TaskMessage{},
wantDeadlines: []base.Z{},
wantDeadlines: []h.ZSetEntry{},
},
{
enqueued: map[string][]*base.TaskMessage{
@@ -605,9 +605,9 @@ func TestRequeue(t *testing.T) {
"critical": {},
},
inProgress: []*base.TaskMessage{t2, t3},
deadlines: []base.Z{
{Message: t2, Score: t2Deadline},
{Message: t3, Score: t3Deadline},
deadlines: []h.ZSetEntry{
{Msg: t2, Score: float64(t2Deadline)},
{Msg: t3, Score: float64(t3Deadline)},
},
target: t3,
wantEnqueued: map[string][]*base.TaskMessage{
@@ -615,8 +615,8 @@ func TestRequeue(t *testing.T) {
"critical": {t3},
},
wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []base.Z{
{Message: t2, Score: t2Deadline},
wantDeadlines: []h.ZSetEntry{
{Msg: t2, Score: float64(t2Deadline)},
},
},
}
@@ -765,42 +765,42 @@ func TestRetry(t *testing.T) {
tests := []struct {
inProgress []*base.TaskMessage
deadlines []base.Z
retry []base.Z
deadlines []h.ZSetEntry
retry []h.ZSetEntry
msg *base.TaskMessage
processAt time.Time
errMsg string
wantInProgress []*base.TaskMessage
wantDeadlines []base.Z
wantRetry []base.Z
wantDeadlines []h.ZSetEntry
wantRetry []h.ZSetEntry
}{
{
inProgress: []*base.TaskMessage{t1, t2},
deadlines: []base.Z{
{Message: t1, Score: t1Deadline},
{Message: t2, Score: t2Deadline},
deadlines: []h.ZSetEntry{
{Msg: t1, Score: float64(t1Deadline)},
{Msg: t2, Score: float64(t2Deadline)},
},
retry: []base.Z{
retry: []h.ZSetEntry{
{
Message: t3,
Score: now.Add(time.Minute).Unix(),
Msg: t3,
Score: float64(now.Add(time.Minute).Unix()),
},
},
msg: t1,
processAt: now.Add(5 * time.Minute),
errMsg: errMsg,
wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []base.Z{
{Message: t2, Score: t2Deadline},
wantDeadlines: []h.ZSetEntry{
{Msg: t2, Score: float64(t2Deadline)},
},
wantRetry: []base.Z{
wantRetry: []h.ZSetEntry{
{
Message: h.TaskMessageAfterRetry(*t1, errMsg),
Score: now.Add(5 * time.Minute).Unix(),
Msg: h.TaskMessageAfterRetry(*t1, errMsg),
Score: float64(now.Add(5 * time.Minute).Unix()),
},
{
Message: t3,
Score: now.Add(time.Minute).Unix(),
Msg: t3,
Score: float64(now.Add(time.Minute).Unix()),
},
},
},
@@ -891,59 +891,59 @@ func TestKill(t *testing.T) {
// TODO(hibiken): add test cases for trimming
tests := []struct {
inProgress []*base.TaskMessage
deadlines []base.Z
dead []base.Z
deadlines []h.ZSetEntry
dead []h.ZSetEntry
target *base.TaskMessage // task to kill
wantInProgress []*base.TaskMessage
wantDeadlines []base.Z
wantDead []base.Z
wantDeadlines []h.ZSetEntry
wantDead []h.ZSetEntry
}{
{
inProgress: []*base.TaskMessage{t1, t2},
deadlines: []base.Z{
{Message: t1, Score: t1Deadline},
{Message: t2, Score: t2Deadline},
deadlines: []h.ZSetEntry{
{Msg: t1, Score: float64(t1Deadline)},
{Msg: t2, Score: float64(t2Deadline)},
},
dead: []base.Z{
dead: []h.ZSetEntry{
{
Message: t3,
Score: now.Add(-time.Hour).Unix(),
Msg: t3,
Score: float64(now.Add(-time.Hour).Unix()),
},
},
target: t1,
wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []base.Z{
{Message: t2, Score: t2Deadline},
wantDeadlines: []h.ZSetEntry{
{Msg: t2, Score: float64(t2Deadline)},
},
wantDead: []base.Z{
wantDead: []h.ZSetEntry{
{
Message: h.TaskMessageWithError(*t1, errMsg),
Score: now.Unix(),
Msg: h.TaskMessageWithError(*t1, errMsg),
Score: float64(now.Unix()),
},
{
Message: t3,
Score: now.Add(-time.Hour).Unix(),
Msg: t3,
Score: float64(now.Add(-time.Hour).Unix()),
},
},
},
{
inProgress: []*base.TaskMessage{t1, t2, t3},
deadlines: []base.Z{
{Message: t1, Score: t1Deadline},
{Message: t2, Score: t2Deadline},
{Message: t3, Score: t3Deadline},
deadlines: []h.ZSetEntry{
{Msg: t1, Score: float64(t1Deadline)},
{Msg: t2, Score: float64(t2Deadline)},
{Msg: t3, Score: float64(t3Deadline)},
},
dead: []base.Z{},
dead: []h.ZSetEntry{},
target: t1,
wantInProgress: []*base.TaskMessage{t2, t3},
wantDeadlines: []base.Z{
{Message: t2, Score: t2Deadline},
{Message: t3, Score: t3Deadline},
wantDeadlines: []h.ZSetEntry{
{Msg: t2, Score: float64(t2Deadline)},
{Msg: t3, Score: float64(t3Deadline)},
},
wantDead: []base.Z{
wantDead: []h.ZSetEntry{
{
Message: h.TaskMessageWithError(*t1, errMsg),
Score: now.Unix(),
Msg: h.TaskMessageWithError(*t1, errMsg),
Score: float64(now.Unix()),
},
},
},
@@ -1009,19 +1009,19 @@ func TestCheckAndEnqueue(t *testing.T) {
hourFromNow := time.Now().Add(time.Hour)
tests := []struct {
scheduled []base.Z
retry []base.Z
scheduled []h.ZSetEntry
retry []h.ZSetEntry
wantEnqueued map[string][]*base.TaskMessage
wantScheduled []*base.TaskMessage
wantRetry []*base.TaskMessage
}{
{
scheduled: []base.Z{
{Message: t1, Score: secondAgo.Unix()},
{Message: t2, Score: secondAgo.Unix()},
scheduled: []h.ZSetEntry{
{Msg: t1, Score: float64(secondAgo.Unix())},
{Msg: t2, Score: float64(secondAgo.Unix())},
},
retry: []base.Z{
{Message: t3, Score: secondAgo.Unix()}},
retry: []h.ZSetEntry{
{Msg: t3, Score: float64(secondAgo.Unix())}},
wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1, t2, t3},
},
@@ -1029,11 +1029,11 @@ func TestCheckAndEnqueue(t *testing.T) {
wantRetry: []*base.TaskMessage{},
},
{
scheduled: []base.Z{
{Message: t1, Score: hourFromNow.Unix()},
{Message: t2, Score: secondAgo.Unix()}},
retry: []base.Z{
{Message: t3, Score: secondAgo.Unix()}},
scheduled: []h.ZSetEntry{
{Msg: t1, Score: float64(hourFromNow.Unix())},
{Msg: t2, Score: float64(secondAgo.Unix())}},
retry: []h.ZSetEntry{
{Msg: t3, Score: float64(secondAgo.Unix())}},
wantEnqueued: map[string][]*base.TaskMessage{
"default": {t2, t3},
},
@@ -1041,11 +1041,11 @@ func TestCheckAndEnqueue(t *testing.T) {
wantRetry: []*base.TaskMessage{},
},
{
scheduled: []base.Z{
{Message: t1, Score: hourFromNow.Unix()},
{Message: t2, Score: hourFromNow.Unix()}},
retry: []base.Z{
{Message: t3, Score: hourFromNow.Unix()}},
scheduled: []h.ZSetEntry{
{Msg: t1, Score: float64(hourFromNow.Unix())},
{Msg: t2, Score: float64(hourFromNow.Unix())}},
retry: []h.ZSetEntry{
{Msg: t3, Score: float64(hourFromNow.Unix())}},
wantEnqueued: map[string][]*base.TaskMessage{
"default": {},
},
@@ -1053,12 +1053,12 @@ func TestCheckAndEnqueue(t *testing.T) {
wantRetry: []*base.TaskMessage{t3},
},
{
scheduled: []base.Z{
{Message: t1, Score: secondAgo.Unix()},
{Message: t4, Score: secondAgo.Unix()},
scheduled: []h.ZSetEntry{
{Msg: t1, Score: float64(secondAgo.Unix())},
{Msg: t4, Score: float64(secondAgo.Unix())},
},
retry: []base.Z{
{Message: t5, Score: secondAgo.Unix()}},
retry: []h.ZSetEntry{
{Msg: t5, Score: float64(secondAgo.Unix())}},
wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1},
"critical": {t4},
@@ -1112,41 +1112,41 @@ func TestListDeadlineExceeded(t *testing.T) {
tests := []struct {
desc string
deadlines []base.Z
deadlines []h.ZSetEntry
t time.Time
want []*base.TaskMessage
}{
{
desc: "with one task in-progress",
deadlines: []base.Z{
{Message: t1, Score: fiveMinutesAgo.Unix()},
deadlines: []h.ZSetEntry{
{Msg: t1, Score: float64(fiveMinutesAgo.Unix())},
},
t: time.Now(),
want: []*base.TaskMessage{t1},
},
{
desc: "with multiple tasks in-progress, and one expired",
deadlines: []base.Z{
{Message: t1, Score: oneHourAgo.Unix()},
{Message: t2, Score: fiveMinutesFromNow.Unix()},
{Message: t3, Score: oneHourFromNow.Unix()},
deadlines: []h.ZSetEntry{
{Msg: t1, Score: float64(oneHourAgo.Unix())},
{Msg: t2, Score: float64(fiveMinutesFromNow.Unix())},
{Msg: t3, Score: float64(oneHourFromNow.Unix())},
},
t: time.Now(),
want: []*base.TaskMessage{t1},
},
{
desc: "with multiple expired tasks in-progress",
deadlines: []base.Z{
{Message: t1, Score: oneHourAgo.Unix()},
{Message: t2, Score: fiveMinutesAgo.Unix()},
{Message: t3, Score: oneHourFromNow.Unix()},
deadlines: []h.ZSetEntry{
{Msg: t1, Score: float64(oneHourAgo.Unix())},
{Msg: t2, Score: float64(fiveMinutesAgo.Unix())},
{Msg: t3, Score: float64(oneHourFromNow.Unix())},
},
t: time.Now(),
want: []*base.TaskMessage{t1, t2},
},
{
desc: "with empty in-progress queue",
deadlines: []base.Z{},
deadlines: []h.ZSetEntry{},
t: time.Now(),
want: []*base.TaskMessage{},
},

View File

@@ -180,15 +180,6 @@ func (tb *TestBroker) PublishCancelation(id string) error {
return tb.real.PublishCancelation(id)
}
func (tb *TestBroker) Ping() error {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return errRedisDown
}
return tb.real.Ping()
}
func (tb *TestBroker) Close() error {
tb.mu.Lock()
defer tb.mu.Unlock()

View File

@@ -191,19 +191,9 @@ func (p *processor) exec() {
p.cancelations.Delete(msg.ID.String())
}()
// check context before starting a worker goroutine.
select {
case <-ctx.Done():
// already canceled (e.g. deadline exceeded).
p.retryOrKill(ctx, msg, ctx.Err())
return
default:
}
resCh := make(chan error, 1)
go func() {
resCh <- perform(ctx, NewTask(msg.Type, msg.Payload), p.handler)
}()
task := NewTask(msg.Type, msg.Payload)
go func() { resCh <- perform(ctx, task, p.handler) }()
select {
case <-p.abort:
@@ -212,6 +202,7 @@ func (p *processor) exec() {
p.requeue(msg)
return
case <-ctx.Done():
p.logger.Debugf("Retrying task. task id=%s", msg.ID) // TODO: Improve this log message and above
p.retryOrKill(ctx, msg, ctx.Err())
return
case resErr := <-resCh:
@@ -220,6 +211,9 @@ func (p *processor) exec() {
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead
if resErr != nil {
if p.errHandler != nil {
p.errHandler.HandleError(ctx, task, resErr)
}
p.retryOrKill(ctx, msg, resErr)
return
}
@@ -258,11 +252,7 @@ func (p *processor) markAsDone(ctx context.Context, msg *base.TaskMessage) {
}
func (p *processor) retryOrKill(ctx context.Context, msg *base.TaskMessage, err error) {
if p.errHandler != nil {
p.errHandler.HandleError(ctx, NewTask(msg.Type, msg.Payload), err)
}
if msg.Retried >= msg.Retry {
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
p.kill(ctx, msg, err)
} else {
p.retry(ctx, msg, err)
@@ -291,6 +281,7 @@ func (p *processor) retry(ctx context.Context, msg *base.TaskMessage, e error) {
}
func (p *processor) kill(ctx context.Context, msg *base.TaskMessage, e error) {
p.logger.Warnf("Retry exhausted for task id=%s", msg.ID)
err := p.broker.Kill(msg, e.Error())
if err != nil {
errMsg := fmt.Sprintf("Could not move task id=%s from %q to %q", msg.ID, base.InProgressQueue, base.DeadQueue)

View File

@@ -223,7 +223,7 @@ func TestProcessorRetry(t *testing.T) {
delay time.Duration // retry delay duration
handler Handler // task handler
wait time.Duration // wait duration between starting and stopping processor for this test case
wantRetry []base.Z // tasks in retry queue at the end
wantRetry []h.ZSetEntry // tasks in retry queue at the end
wantDead []*base.TaskMessage // tasks in dead queue at the end
wantErrCount int // number of times error handler should be called
}{
@@ -235,10 +235,10 @@ func TestProcessorRetry(t *testing.T) {
return fmt.Errorf(errMsg)
}),
wait: 2 * time.Second,
wantRetry: []base.Z{
{Message: h.TaskMessageAfterRetry(*m2, errMsg), Score: now.Add(time.Minute).Unix()},
{Message: h.TaskMessageAfterRetry(*m3, errMsg), Score: now.Add(time.Minute).Unix()},
{Message: h.TaskMessageAfterRetry(*m4, errMsg), Score: now.Add(time.Minute).Unix()},
wantRetry: []h.ZSetEntry{
{Msg: h.TaskMessageAfterRetry(*m2, errMsg), Score: float64(now.Add(time.Minute).Unix())},
{Msg: h.TaskMessageAfterRetry(*m3, errMsg), Score: float64(now.Add(time.Minute).Unix())},
{Msg: h.TaskMessageAfterRetry(*m4, errMsg), Score: float64(now.Add(time.Minute).Unix())},
},
wantDead: []*base.TaskMessage{h.TaskMessageWithError(*m1, errMsg)},
wantErrCount: 4,

View File

@@ -34,24 +34,24 @@ func TestRecoverer(t *testing.T) {
tests := []struct {
desc string
inProgress []*base.TaskMessage
deadlines []base.Z
retry []base.Z
dead []base.Z
deadlines []h.ZSetEntry
retry []h.ZSetEntry
dead []h.ZSetEntry
wantInProgress []*base.TaskMessage
wantDeadlines []base.Z
wantDeadlines []h.ZSetEntry
wantRetry []*base.TaskMessage
wantDead []*base.TaskMessage
}{
{
desc: "with one task in-progress",
inProgress: []*base.TaskMessage{t1},
deadlines: []base.Z{
{Message: t1, Score: fiveMinutesAgo.Unix()},
deadlines: []h.ZSetEntry{
{Msg: t1, Score: float64(fiveMinutesAgo.Unix())},
},
retry: []base.Z{},
dead: []base.Z{},
retry: []h.ZSetEntry{},
dead: []h.ZSetEntry{},
wantInProgress: []*base.TaskMessage{},
wantDeadlines: []base.Z{},
wantDeadlines: []h.ZSetEntry{},
wantRetry: []*base.TaskMessage{
h.TaskMessageAfterRetry(*t1, "deadline exceeded"),
},
@@ -60,30 +60,30 @@ func TestRecoverer(t *testing.T) {
{
desc: "with a task with max-retry reached",
inProgress: []*base.TaskMessage{t4},
deadlines: []base.Z{
{Message: t4, Score: fiveMinutesAgo.Unix()},
deadlines: []h.ZSetEntry{
{Msg: t4, Score: float64(fiveMinutesAgo.Unix())},
},
retry: []base.Z{},
dead: []base.Z{},
retry: []h.ZSetEntry{},
dead: []h.ZSetEntry{},
wantInProgress: []*base.TaskMessage{},
wantDeadlines: []base.Z{},
wantDeadlines: []h.ZSetEntry{},
wantRetry: []*base.TaskMessage{},
wantDead: []*base.TaskMessage{h.TaskMessageWithError(*t4, "deadline exceeded")},
},
{
desc: "with multiple tasks in-progress, and one expired",
inProgress: []*base.TaskMessage{t1, t2, t3},
deadlines: []base.Z{
{Message: t1, Score: oneHourAgo.Unix()},
{Message: t2, Score: fiveMinutesFromNow.Unix()},
{Message: t3, Score: oneHourFromNow.Unix()},
deadlines: []h.ZSetEntry{
{Msg: t1, Score: float64(oneHourAgo.Unix())},
{Msg: t2, Score: float64(fiveMinutesFromNow.Unix())},
{Msg: t3, Score: float64(oneHourFromNow.Unix())},
},
retry: []base.Z{},
dead: []base.Z{},
retry: []h.ZSetEntry{},
dead: []h.ZSetEntry{},
wantInProgress: []*base.TaskMessage{t2, t3},
wantDeadlines: []base.Z{
{Message: t2, Score: fiveMinutesFromNow.Unix()},
{Message: t3, Score: oneHourFromNow.Unix()},
wantDeadlines: []h.ZSetEntry{
{Msg: t2, Score: float64(fiveMinutesFromNow.Unix())},
{Msg: t3, Score: float64(oneHourFromNow.Unix())},
},
wantRetry: []*base.TaskMessage{
h.TaskMessageAfterRetry(*t1, "deadline exceeded"),
@@ -93,16 +93,16 @@ func TestRecoverer(t *testing.T) {
{
desc: "with multiple expired tasks in-progress",
inProgress: []*base.TaskMessage{t1, t2, t3},
deadlines: []base.Z{
{Message: t1, Score: oneHourAgo.Unix()},
{Message: t2, Score: fiveMinutesAgo.Unix()},
{Message: t3, Score: oneHourFromNow.Unix()},
deadlines: []h.ZSetEntry{
{Msg: t1, Score: float64(oneHourAgo.Unix())},
{Msg: t2, Score: float64(fiveMinutesAgo.Unix())},
{Msg: t3, Score: float64(oneHourFromNow.Unix())},
},
retry: []base.Z{},
dead: []base.Z{},
retry: []h.ZSetEntry{},
dead: []h.ZSetEntry{},
wantInProgress: []*base.TaskMessage{t3},
wantDeadlines: []base.Z{
{Message: t3, Score: oneHourFromNow.Unix()},
wantDeadlines: []h.ZSetEntry{
{Msg: t3, Score: float64(oneHourFromNow.Unix())},
},
wantRetry: []*base.TaskMessage{
h.TaskMessageAfterRetry(*t1, "deadline exceeded"),
@@ -113,11 +113,11 @@ func TestRecoverer(t *testing.T) {
{
desc: "with empty in-progress queue",
inProgress: []*base.TaskMessage{},
deadlines: []base.Z{},
retry: []base.Z{},
dead: []base.Z{},
deadlines: []h.ZSetEntry{},
retry: []h.ZSetEntry{},
dead: []h.ZSetEntry{},
wantInProgress: []*base.TaskMessage{},
wantDeadlines: []base.Z{},
wantDeadlines: []h.ZSetEntry{},
wantRetry: []*base.TaskMessage{},
wantDead: []*base.TaskMessage{},
},

View File

@@ -31,8 +31,8 @@ func TestScheduler(t *testing.T) {
now := time.Now()
tests := []struct {
initScheduled []base.Z // scheduled queue initial state
initRetry []base.Z // retry queue initial state
initScheduled []h.ZSetEntry // scheduled queue initial state
initRetry []h.ZSetEntry // retry queue initial state
initQueue []*base.TaskMessage // default queue initial state
wait time.Duration // wait duration before checking for final state
wantScheduled []*base.TaskMessage // schedule queue final state
@@ -40,12 +40,12 @@ func TestScheduler(t *testing.T) {
wantQueue []*base.TaskMessage // default queue final state
}{
{
initScheduled: []base.Z{
{Message: t1, Score: now.Add(time.Hour).Unix()},
{Message: t2, Score: now.Add(-2 * time.Second).Unix()},
initScheduled: []h.ZSetEntry{
{Msg: t1, Score: float64(now.Add(time.Hour).Unix())},
{Msg: t2, Score: float64(now.Add(-2 * time.Second).Unix())},
},
initRetry: []base.Z{
{Message: t3, Score: time.Now().Add(-500 * time.Millisecond).Unix()},
initRetry: []h.ZSetEntry{
{Msg: t3, Score: float64(time.Now().Add(-500 * time.Millisecond).Unix())},
},
initQueue: []*base.TaskMessage{t4},
wait: pollInterval * 2,
@@ -54,12 +54,12 @@ func TestScheduler(t *testing.T) {
wantQueue: []*base.TaskMessage{t2, t3, t4},
},
{
initScheduled: []base.Z{
{Message: t1, Score: now.Unix()},
{Message: t2, Score: now.Add(-2 * time.Second).Unix()},
{Message: t3, Score: now.Add(-500 * time.Millisecond).Unix()},
initScheduled: []h.ZSetEntry{
{Msg: t1, Score: float64(now.Unix())},
{Msg: t2, Score: float64(now.Add(-2 * time.Second).Unix())},
{Msg: t3, Score: float64(now.Add(-500 * time.Millisecond).Unix())},
},
initRetry: []base.Z{},
initRetry: []h.ZSetEntry{},
initQueue: []*base.TaskMessage{t4},
wait: pollInterval * 2,
wantScheduled: []*base.TaskMessage{},

View File

@@ -47,7 +47,6 @@ type Server struct {
heartbeater *heartbeater
subscriber *subscriber
recoverer *recoverer
healthchecker *healthchecker
}
// Config specifies the server's background-task processing behavior.
@@ -124,18 +123,9 @@ type Config struct {
//
// If unset or zero, default timeout of 8 seconds is used.
ShutdownTimeout time.Duration
// HealthCheckFunc is called periodically with any errors encountered during ping to the
// connected redis server.
HealthCheckFunc func(error)
// HealthCheckInterval specifies the interval between healthchecks.
//
// If unset or zero, the interval is set to 15 seconds.
HealthCheckInterval time.Duration
}
// An ErrorHandler handles an error occured during task processing.
// An ErrorHandler handles errors returned by the task handler.
type ErrorHandler interface {
HandleError(ctx context.Context, task *Task, err error)
}
@@ -260,11 +250,7 @@ var defaultQueueConfig = map[string]int{
base.DefaultQueueName: 1,
}
const (
defaultShutdownTimeout = 8 * time.Second
defaultHealthCheckInterval = 15 * time.Second
)
const defaultShutdownTimeout = 8 * time.Second
// NewServer returns a new Server given a redis connection option
// and background processing configuration.
@@ -290,10 +276,6 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
if shutdownTimeout == 0 {
shutdownTimeout = defaultShutdownTimeout
}
healthcheckInterval := cfg.HealthCheckInterval
if healthcheckInterval == 0 {
healthcheckInterval = defaultHealthCheckInterval
}
logger := log.NewLogger(cfg.Logger)
loglevel := cfg.LogLevel
if loglevel == level_unspecified {
@@ -354,12 +336,6 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
retryDelayFunc: delayFunc,
interval: 1 * time.Minute,
})
healthchecker := newHealthChecker(healthcheckerParams{
logger: logger,
broker: rdb,
interval: healthcheckInterval,
healthcheckFunc: cfg.HealthCheckFunc,
})
return &Server{
logger: logger,
broker: rdb,
@@ -370,7 +346,6 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
heartbeater: heartbeater,
subscriber: subscriber,
recoverer: recoverer,
healthchecker: healthchecker,
}
}
@@ -438,7 +413,6 @@ func (srv *Server) Start(handler Handler) error {
srv.logger.Info("Starting processing")
srv.heartbeater.start(&srv.wg)
srv.healthchecker.start(&srv.wg)
srv.subscriber.start(&srv.wg)
srv.syncer.start(&srv.wg)
srv.recoverer.start(&srv.wg)
@@ -468,7 +442,6 @@ func (srv *Server) Stop() {
srv.recoverer.terminate()
srv.syncer.terminate()
srv.subscriber.terminate()
srv.healthchecker.terminate()
srv.heartbeater.terminate()
srv.wg.Wait()

View File

@@ -8,14 +8,15 @@ import (
"fmt"
"os"
"github.com/hibiken/asynq"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// delCmd represents the del command
var delCmd = &cobra.Command{
Use: "del [task key]",
Use: "del [task id]",
Short: "Deletes a task given an identifier",
Long: `Del (asynq del) will delete a task given an identifier.
@@ -43,12 +44,27 @@ func init() {
}
func del(cmd *cobra.Command, args []string) {
i := asynq.NewInspector(asynq.RedisClientOpt{
id, score, qtype, err := parseQueryID(args[0])
if err != nil {
fmt.Println(err)
os.Exit(1)
}
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
err := i.DeleteTaskByKey(args[0])
}))
switch qtype {
case "s":
err = r.DeleteScheduledTask(id, score)
case "r":
err = r.DeleteRetryTask(id, score)
case "d":
err = r.DeleteDeadTask(id, score)
default:
fmt.Println("invalid argument")
os.Exit(1)
}
if err != nil {
fmt.Println(err)
os.Exit(1)

View File

@@ -8,7 +8,8 @@ import (
"fmt"
"os"
"github.com/hibiken/asynq"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
@@ -44,22 +45,20 @@ func init() {
}
func delall(cmd *cobra.Command, args []string) {
i := asynq.NewInspector(asynq.RedisClientOpt{
c := redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
var (
n int
err error
)
r := rdb.NewRDB(c)
var err error
switch args[0] {
case "scheduled":
n, err = i.DeleteAllScheduledTasks()
err = r.DeleteAllScheduledTasks()
case "retry":
n, err = i.DeleteAllRetryTasks()
err = r.DeleteAllRetryTasks()
case "dead":
n, err = i.DeleteAllDeadTasks()
err = r.DeleteAllDeadTasks()
default:
fmt.Printf("error: `asynq delall [state]` only accepts %v as the argument.\n", delallValidArgs)
os.Exit(1)
@@ -68,5 +67,5 @@ func delall(cmd *cobra.Command, args []string) {
fmt.Println(err)
os.Exit(1)
}
fmt.Printf("Deleted all %d tasks in %q state\n", n, args[0])
fmt.Printf("Deleted all tasks in %q state\n", args[0])
}

View File

@@ -8,14 +8,15 @@ import (
"fmt"
"os"
"github.com/hibiken/asynq"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// enqCmd represents the enq command
var enqCmd = &cobra.Command{
Use: "enq [task key]",
Use: "enq [task id]",
Short: "Enqueues a task given an identifier",
Long: `Enq (asynq enq) will enqueue a task given an identifier.
@@ -46,12 +47,27 @@ func init() {
}
func enq(cmd *cobra.Command, args []string) {
i := asynq.NewInspector(asynq.RedisClientOpt{
id, score, qtype, err := parseQueryID(args[0])
if err != nil {
fmt.Println(err)
os.Exit(1)
}
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
err := i.EnqueueTaskByKey(args[0])
}))
switch qtype {
case "s":
err = r.EnqueueScheduledTask(id, score)
case "r":
err = r.EnqueueRetryTask(id, score)
case "d":
err = r.EnqueueDeadTask(id, score)
default:
fmt.Println("invalid argument")
os.Exit(1)
}
if err != nil {
fmt.Println(err)
os.Exit(1)

View File

@@ -8,7 +8,8 @@ import (
"fmt"
"os"
"github.com/hibiken/asynq"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
@@ -47,22 +48,21 @@ func init() {
}
func enqall(cmd *cobra.Command, args []string) {
i := asynq.NewInspector(asynq.RedisClientOpt{
c := redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
var (
n int
err error
)
r := rdb.NewRDB(c)
var n int64
var err error
switch args[0] {
case "scheduled":
n, err = i.EnqueueAllScheduledTasks()
n, err = r.EnqueueAllScheduledTasks()
case "retry":
n, err = i.EnqueueAllRetryTasks()
n, err = r.EnqueueAllRetryTasks()
case "dead":
n, err = i.EnqueueAllDeadTasks()
n, err = r.EnqueueAllDeadTasks()
default:
fmt.Printf("error: `asynq enqall [state]` only accepts %v as the argument.\n", enqallValidArgs)
os.Exit(1)

View File

@@ -10,7 +10,8 @@ import (
"strings"
"text/tabwriter"
"github.com/hibiken/asynq"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
@@ -37,13 +38,14 @@ func init() {
}
func history(cmd *cobra.Command, args []string) {
i := asynq.NewInspector(asynq.RedisClientOpt{
c := redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
r := rdb.NewRDB(c)
stats, err := i.History(days)
stats, err := r.HistoricalStats(days)
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -51,7 +53,7 @@ func history(cmd *cobra.Command, args []string) {
printDailyStats(stats)
}
func printDailyStats(stats []*asynq.DailyStats) {
func printDailyStats(stats []*rdb.DailyStats) {
format := strings.Repeat("%v\t", 4) + "\n"
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
fmt.Fprintf(tw, format, "Date (UTC)", "Processed", "Failed", "Error Rate")
@@ -63,7 +65,7 @@ func printDailyStats(stats []*asynq.DailyStats) {
} else {
errrate = fmt.Sprintf("%.2f%%", float64(s.Failed)/float64(s.Processed)*100)
}
fmt.Fprintf(tw, format, s.Date.Format("2006-01-02"), s.Processed, s.Failed, errrate)
fmt.Fprintf(tw, format, s.Time.Format("2006-01-02"), s.Processed, s.Failed, errrate)
}
tw.Flush()
}

View File

@@ -8,14 +8,15 @@ import (
"fmt"
"os"
"github.com/hibiken/asynq"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
// killCmd represents the kill command
var killCmd = &cobra.Command{
Use: "kill [task key]",
Use: "kill [task id]",
Short: "Kills a task given an identifier",
Long: `Kill (asynq kill) will put a task in dead state given an identifier.
@@ -43,12 +44,25 @@ func init() {
}
func kill(cmd *cobra.Command, args []string) {
i := asynq.NewInspector(asynq.RedisClientOpt{
id, score, qtype, err := parseQueryID(args[0])
if err != nil {
fmt.Println(err)
os.Exit(1)
}
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
err := i.KillTaskByKey(args[0])
}))
switch qtype {
case "s":
err = r.KillScheduledTask(id, score)
case "r":
err = r.KillRetryTask(id, score)
default:
fmt.Println("invalid argument")
os.Exit(1)
}
if err != nil {
fmt.Println(err)
os.Exit(1)

View File

@@ -8,7 +8,8 @@ import (
"fmt"
"os"
"github.com/hibiken/asynq"
"github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
@@ -44,20 +45,19 @@ func init() {
}
func killall(cmd *cobra.Command, args []string) {
i := asynq.NewInspector(asynq.RedisClientOpt{
c := redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
var (
n int
err error
)
r := rdb.NewRDB(c)
var n int64
var err error
switch args[0] {
case "scheduled":
n, err = i.KillAllScheduledTasks()
n, err = r.KillAllScheduledTasks()
case "retry":
n, err = i.KillAllRetryTasks()
n, err = r.KillAllRetryTasks()
default:
fmt.Printf("error: `asynq killall [state]` only accepts %v as the argument.\n", killallValidArgs)
os.Exit(1)

View File

@@ -8,10 +8,13 @@ import (
"fmt"
"io"
"os"
"strconv"
"strings"
"time"
"github.com/hibiken/asynq"
"github.com/go-redis/redis/v7"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
@@ -59,11 +62,12 @@ func ls(cmd *cobra.Command, args []string) {
fmt.Println("page number cannot be negative.")
os.Exit(1)
}
i := asynq.NewInspector(asynq.RedisClientOpt{
c := redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"),
DB: viper.GetInt("db"),
Password: viper.GetString("password"),
})
r := rdb.NewRDB(c)
parts := strings.Split(args[0], ":")
switch parts[0] {
case "enqueued":
@@ -71,23 +75,54 @@ func ls(cmd *cobra.Command, args []string) {
fmt.Printf("error: Missing queue name\n`asynq ls enqueued:[queue name]`\n")
os.Exit(1)
}
listEnqueued(i, parts[1])
listEnqueued(r, parts[1])
case "inprogress":
listInProgress(i)
listInProgress(r)
case "scheduled":
listScheduled(i)
listScheduled(r)
case "retry":
listRetry(i)
listRetry(r)
case "dead":
listDead(i)
listDead(r)
default:
fmt.Printf("error: `asynq ls [state]`\nonly accepts %v as the argument.\n", lsValidArgs)
os.Exit(1)
}
}
func listEnqueued(i *asynq.Inspector, qname string) {
tasks, err := i.ListEnqueuedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
// queryID returns an identifier used for "enq" command.
// score is the zset score and queryType should be one
// of "s", "r" or "d" (scheduled, retry, dead respectively).
func queryID(id uuid.UUID, score int64, qtype string) string {
const format = "%v:%v:%v"
return fmt.Sprintf(format, qtype, score, id)
}
// parseQueryID is a reverse operation of queryID function.
// It takes a queryID and return each part of id with proper
// type if valid, otherwise it reports an error.
func parseQueryID(queryID string) (id uuid.UUID, score int64, qtype string, err error) {
parts := strings.Split(queryID, ":")
if len(parts) != 3 {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
id, err = uuid.Parse(parts[2])
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
score, err = strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
qtype = parts[0]
if len(qtype) != 1 || !strings.Contains("srd", qtype) {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
return id, score, qtype, nil
}
func listEnqueued(r *rdb.RDB, qname string) {
tasks, err := r.ListEnqueued(qname, rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -97,16 +132,17 @@ func listEnqueued(i *asynq.Inspector, qname string) {
return
}
cols := []string{"ID", "Type", "Payload", "Queue"}
printTable(cols, func(w io.Writer, tmpl string) {
printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, t.Queue)
}
})
}
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
}
func listInProgress(i *asynq.Inspector) {
tasks, err := i.ListInProgressTasks(asynq.PageSize(pageSize), asynq.Page(pageNum))
func listInProgress(r *rdb.RDB) {
tasks, err := r.ListInProgress(rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -116,16 +152,17 @@ func listInProgress(i *asynq.Inspector) {
return
}
cols := []string{"ID", "Type", "Payload"}
printTable(cols, func(w io.Writer, tmpl string) {
printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload)
}
})
}
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
}
func listScheduled(i *asynq.Inspector) {
tasks, err := i.ListScheduledTasks(asynq.PageSize(pageSize), asynq.Page(pageNum))
func listScheduled(r *rdb.RDB) {
tasks, err := r.ListScheduled(rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -134,19 +171,19 @@ func listScheduled(i *asynq.Inspector) {
fmt.Println("No scheduled tasks")
return
}
cols := []string{"Key", "Type", "Payload", "Process In", "Queue"}
printTable(cols, func(w io.Writer, tmpl string) {
cols := []string{"ID", "Type", "Payload", "Process In", "Queue"}
printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks {
processIn := fmt.Sprintf("%.0f seconds",
t.NextEnqueueAt.Sub(time.Now()).Seconds())
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, processIn, t.Queue)
processIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds())
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "s"), t.Type, t.Payload, processIn, t.Queue)
}
})
}
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
}
func listRetry(i *asynq.Inspector) {
tasks, err := i.ListRetryTasks(asynq.PageSize(pageSize), asynq.Page(pageNum))
func listRetry(r *rdb.RDB) {
tasks, err := r.ListRetry(rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -155,23 +192,24 @@ func listRetry(i *asynq.Inspector) {
fmt.Println("No retry tasks")
return
}
cols := []string{"Key", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry", "Queue"}
printTable(cols, func(w io.Writer, tmpl string) {
cols := []string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry", "Queue"}
printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks {
var nextRetry string
if d := t.NextEnqueueAt.Sub(time.Now()); d > 0 {
if d := t.ProcessAt.Sub(time.Now()); d > 0 {
nextRetry = fmt.Sprintf("in %v", d.Round(time.Second))
} else {
nextRetry = "right now"
}
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, nextRetry, t.ErrorMsg, t.Retried, t.MaxRetry, t.Queue)
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "r"), t.Type, t.Payload, nextRetry, t.ErrorMsg, t.Retried, t.Retry, t.Queue)
}
})
}
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
}
func listDead(i *asynq.Inspector) {
tasks, err := i.ListDeadTasks(asynq.PageSize(pageSize), asynq.Page(pageNum))
func listDead(r *rdb.RDB) {
tasks, err := r.ListDead(rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil {
fmt.Println(err)
os.Exit(1)
@@ -180,11 +218,12 @@ func listDead(i *asynq.Inspector) {
fmt.Println("No dead tasks")
return
}
cols := []string{"Key", "Type", "Payload", "Last Failed", "Last Error", "Queue"}
printTable(cols, func(w io.Writer, tmpl string) {
cols := []string{"ID", "Type", "Payload", "Last Failed", "Last Error", "Queue"}
printRows := func(w io.Writer, tmpl string) {
for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg, t.Queue)
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "d"), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg, t.Queue)
}
})
}
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
}

View File

@@ -166,7 +166,6 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=