2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 07:12:17 +08:00

Add Inspector type

This commit is contained in:
Ken Hibino 2020-07-13 06:29:41 -07:00
parent 9bd3d8e19e
commit a28f61f313
20 changed files with 2830 additions and 931 deletions

View File

@ -36,7 +36,7 @@ func TestClientEnqueueAt(t *testing.T) {
opts []Option opts []Option
wantRes *Result wantRes *Result
wantEnqueued map[string][]*base.TaskMessage wantEnqueued map[string][]*base.TaskMessage
wantScheduled []h.ZSetEntry wantScheduled []base.Z
}{ }{
{ {
desc: "Process task immediately", desc: "Process task immediately",
@ -75,9 +75,9 @@ func TestClientEnqueueAt(t *testing.T) {
Deadline: noDeadline, Deadline: noDeadline,
}, },
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
wantScheduled: []h.ZSetEntry{ wantScheduled: []base.Z{
{ {
Msg: &base.TaskMessage{ Message: &base.TaskMessage{
Type: task.Type, Type: task.Type,
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
@ -85,7 +85,7 @@ func TestClientEnqueueAt(t *testing.T) {
Timeout: int64(defaultTimeout.Seconds()), Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(), Deadline: noDeadline.Unix(),
}, },
Score: float64(oneHourLater.Unix()), Score: oneHourLater.Unix(),
}, },
}, },
}, },
@ -376,7 +376,7 @@ func TestClientEnqueueIn(t *testing.T) {
opts []Option opts []Option
wantRes *Result wantRes *Result
wantEnqueued map[string][]*base.TaskMessage wantEnqueued map[string][]*base.TaskMessage
wantScheduled []h.ZSetEntry wantScheduled []base.Z
}{ }{
{ {
desc: "schedule a task to be enqueued in one hour", desc: "schedule a task to be enqueued in one hour",
@ -390,9 +390,9 @@ func TestClientEnqueueIn(t *testing.T) {
Deadline: noDeadline, Deadline: noDeadline,
}, },
wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil wantEnqueued: nil, // db is flushed in setup so list does not exist hence nil
wantScheduled: []h.ZSetEntry{ wantScheduled: []base.Z{
{ {
Msg: &base.TaskMessage{ Message: &base.TaskMessage{
Type: task.Type, Type: task.Type,
Payload: task.Payload.data, Payload: task.Payload.data,
Retry: defaultMaxRetry, Retry: defaultMaxRetry,
@ -400,7 +400,7 @@ func TestClientEnqueueIn(t *testing.T) {
Timeout: int64(defaultTimeout.Seconds()), Timeout: int64(defaultTimeout.Seconds()),
Deadline: noDeadline.Unix(), Deadline: noDeadline.Unix(),
}, },
Score: float64(time.Now().Add(time.Hour).Unix()), Score: time.Now().Add(time.Hour).Unix(),
}, },
}, },
}, },

488
inspector.go Normal file
View File

@ -0,0 +1,488 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/rdb"
)
// Inspector is a client interface to inspect and mutate the state of
// queues and tasks.
type Inspector struct {
rdb *rdb.RDB
}
// New returns a new instance of Inspector.
func NewInspector(r RedisConnOpt) *Inspector {
return &Inspector{
rdb: rdb.NewRDB(createRedisClient(r)),
}
}
// Stats represents a state of queues at a certain time.
type Stats struct {
Enqueued int
InProgress int
Scheduled int
Retry int
Dead int
Processed int
Failed int
Queues []*QueueInfo
Timestamp time.Time
}
// QueueInfo holds information about a queue.
type QueueInfo struct {
// Name of the queue (e.g. "default", "critical").
// Note: It doesn't include the prefix "asynq:queues:".
Name string
// Paused indicates whether the queue is paused.
// If true, tasks in the queue should not be processed.
Paused bool
// Size is the number of tasks in the queue.
Size int
}
// CurrentStats returns a current stats of the queues.
func (i *Inspector) CurrentStats() (*Stats, error) {
stats, err := i.rdb.CurrentStats()
if err != nil {
return nil, err
}
var qs []*QueueInfo
for _, q := range stats.Queues {
qs = append(qs, (*QueueInfo)(q))
}
return &Stats{
Enqueued: stats.Enqueued,
InProgress: stats.InProgress,
Scheduled: stats.Scheduled,
Retry: stats.Retry,
Dead: stats.Dead,
Processed: stats.Processed,
Failed: stats.Failed,
Queues: qs,
Timestamp: stats.Timestamp,
}, nil
}
// DailyStats holds aggregate data for a given day.
type DailyStats struct {
Processed int
Failed int
Date time.Time
}
// History returns a list of stats from the last n days.
func (i *Inspector) History(n int) ([]*DailyStats, error) {
stats, err := i.rdb.HistoricalStats(n)
if err != nil {
return nil, err
}
var res []*DailyStats
for _, s := range stats {
res = append(res, &DailyStats{
Processed: s.Processed,
Failed: s.Failed,
Date: s.Time,
})
}
return res, nil
}
// EnqueuedTask is a task in a queue and is ready to be processed.
type EnqueuedTask struct {
*Task
ID string
Queue string
}
// InProgressTask is a task that's currently being processed.
type InProgressTask struct {
*Task
ID string
}
// ScheduledTask is a task scheduled to be processed in the future.
type ScheduledTask struct {
*Task
ID string
Queue string
NextEnqueueAt time.Time
score int64
}
// RetryTask is a task scheduled to be retried in the future.
type RetryTask struct {
*Task
ID string
Queue string
NextEnqueueAt time.Time
MaxRetry int
Retried int
ErrorMsg string
// TODO: LastFailedAt time.Time
score int64
}
// DeadTask is a task exhausted its retries.
// DeadTask won't be retried automatically.
type DeadTask struct {
*Task
ID string
Queue string
MaxRetry int
Retried int
LastFailedAt time.Time
ErrorMsg string
score int64
}
// Key returns a key used to delete, enqueue, and kill the task.
func (t *ScheduledTask) Key() string {
return fmt.Sprintf("s:%v:%v", t.ID, t.score)
}
// Key returns a key used to delete, enqueue, and kill the task.
func (t *RetryTask) Key() string {
return fmt.Sprintf("r:%v:%v", t.ID, t.score)
}
// Key returns a key used to delete, enqueue, and kill the task.
func (t *DeadTask) Key() string {
return fmt.Sprintf("d:%v:%v", t.ID, t.score)
}
// parseTaskKey parses a key string and returns each part of key with proper
// type if valid, otherwise it reports an error.
func parseTaskKey(key string) (id uuid.UUID, score int64, qtype string, err error) {
parts := strings.Split(key, ":")
if len(parts) != 3 {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
id, err = uuid.Parse(parts[1])
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
score, err = strconv.ParseInt(parts[2], 10, 64)
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
qtype = parts[0]
if len(qtype) != 1 || !strings.Contains("srd", qtype) {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
return id, score, qtype, nil
}
// ListOption specifies behavior of list operation.
type ListOption interface{}
// Internal list option representations.
type (
pageSizeOpt int
pageNumOpt int
)
type listOption struct {
pageSize int
pageNum int
}
const (
// Page size used by default in list operation.
defaultPageSize = 30
// Page number used by default in list operation.
defaultPageNum = 1
)
func composeListOptions(opts ...ListOption) listOption {
res := listOption{
pageSize: defaultPageSize,
pageNum: defaultPageNum,
}
for _, opt := range opts {
switch opt := opt.(type) {
case pageSizeOpt:
res.pageSize = int(opt)
case pageNumOpt:
res.pageNum = int(opt)
default:
// ignore unexpected option
}
}
return res
}
// PageSize returns an option to specify the page size for list operation.
//
// Negative page size is treated as zero.
func PageSize(n int) ListOption {
if n < 0 {
n = 0
}
return pageSizeOpt(n)
}
// Page returns an option to specify the page number for list operation.
// The value 1 fetches the first page.
//
// Negative page number is treated as one.
func Page(n int) ListOption {
if n < 0 {
n = 1
}
return pageNumOpt(n)
}
// ListScheduledTasks retrieves tasks in the specified queue.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListEnqueuedTasks(qname string, opts ...ListOption) ([]*EnqueuedTask, error) {
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListEnqueued(qname, pgn)
if err != nil {
return nil, err
}
var tasks []*EnqueuedTask
for _, m := range msgs {
tasks = append(tasks, &EnqueuedTask{
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
})
}
return tasks, err
}
// ListScheduledTasks retrieves tasks currently being processed.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListInProgressTasks(opts ...ListOption) ([]*InProgressTask, error) {
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
msgs, err := i.rdb.ListInProgress(pgn)
if err != nil {
return nil, err
}
var tasks []*InProgressTask
for _, m := range msgs {
tasks = append(tasks, &InProgressTask{
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
})
}
return tasks, err
}
// ListScheduledTasks retrieves tasks in scheduled state.
// Tasks are sorted by NextEnqueueAt field in ascending order.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListScheduledTasks(opts ...ListOption) ([]*ScheduledTask, error) {
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListScheduled(pgn)
if err != nil {
return nil, err
}
var tasks []*ScheduledTask
for _, z := range zs {
enqueueAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &ScheduledTask{
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,
NextEnqueueAt: enqueueAt,
score: z.Score,
})
}
return tasks, nil
}
// ListScheduledTasks retrieves tasks in retry state.
// Tasks are sorted by NextEnqueueAt field in ascending order.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListRetryTasks(opts ...ListOption) ([]*RetryTask, error) {
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListRetry(pgn)
if err != nil {
return nil, err
}
var tasks []*RetryTask
for _, z := range zs {
enqueueAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &RetryTask{
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,
NextEnqueueAt: enqueueAt,
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
// TODO: LastFailedAt: z.Message.LastFailedAt
ErrorMsg: z.Message.ErrorMsg,
score: z.Score,
})
}
return tasks, nil
}
// ListScheduledTasks retrieves tasks in retry state.
// Tasks are sorted by LastFailedAt field in descending order.
//
// By default, it retrieves the first 30 tasks.
func (i *Inspector) ListDeadTasks(opts ...ListOption) ([]*DeadTask, error) {
opt := composeListOptions(opts...)
pgn := rdb.Pagination{Size: opt.pageSize, Page: opt.pageNum - 1}
zs, err := i.rdb.ListDead(pgn)
if err != nil {
return nil, err
}
var tasks []*DeadTask
for _, z := range zs {
failedAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &DeadTask{
Task: t,
ID: z.Message.ID.String(),
Queue: z.Message.Queue,
MaxRetry: z.Message.Retry,
Retried: z.Message.Retried,
LastFailedAt: failedAt,
ErrorMsg: z.Message.ErrorMsg,
score: z.Score,
})
}
return tasks, nil
return nil, nil
}
// DeleteAllScheduledTasks deletes all tasks in scheduled state,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllScheduledTasks() (int, error) {
n, err := i.rdb.DeleteAllScheduledTasks()
return int(n), err
}
// DeleteAllRetryTasks deletes all tasks in retry state,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllRetryTasks() (int, error) {
n, err := i.rdb.DeleteAllRetryTasks()
return int(n), err
}
// DeleteAllDeadTasks deletes all tasks in dead state,
// and reports the number tasks deleted.
func (i *Inspector) DeleteAllDeadTasks() (int, error) {
n, err := i.rdb.DeleteAllDeadTasks()
return int(n), err
}
// DeleteTaskByKey deletes a task with the given key.
func (i *Inspector) DeleteTaskByKey(key string) error {
id, score, qtype, err := parseTaskKey(key)
if err != nil {
return err
}
switch qtype {
case "s":
return i.rdb.DeleteScheduledTask(id, score)
case "r":
return i.rdb.DeleteRetryTask(id, score)
case "d":
return i.rdb.DeleteDeadTask(id, score)
default:
return fmt.Errorf("invalid key")
}
}
// EnqueueAllScheduledTasks enqueues all tasks in the scheduled state,
// and reports the number of tasks enqueued.
func (i *Inspector) EnqueueAllScheduledTasks() (int, error) {
n, err := i.rdb.EnqueueAllScheduledTasks()
return int(n), err
}
// EnqueueAllRetryTasks enqueues all tasks in the retry state,
// and reports the number of tasks enqueued.
func (i *Inspector) EnqueueAllRetryTasks() (int, error) {
n, err := i.rdb.EnqueueAllRetryTasks()
return int(n), err
}
// EnqueueAllDeadTasks enqueues all tasks in the dead state,
// and reports the number of tasks enqueued.
func (i *Inspector) EnqueueAllDeadTasks() (int, error) {
n, err := i.rdb.EnqueueAllDeadTasks()
return int(n), err
}
// EnqueueTaskByKey enqueues a task with the given key.
func (i *Inspector) EnqueueTaskByKey(key string) error {
id, score, qtype, err := parseTaskKey(key)
if err != nil {
return err
}
switch qtype {
case "s":
return i.rdb.EnqueueScheduledTask(id, score)
case "r":
return i.rdb.EnqueueRetryTask(id, score)
case "d":
return i.rdb.EnqueueDeadTask(id, score)
default:
return fmt.Errorf("invalid key")
}
}
// KillAllScheduledTasks kills all tasks in scheduled state,
// and reports the number of tasks killed.
func (i *Inspector) KillAllScheduledTasks() (int, error) {
n, err := i.rdb.KillAllScheduledTasks()
return int(n), err
}
// KillAllRetryTasks kills all tasks in retry state,
// and reports the number of tasks killed.
func (i *Inspector) KillAllRetryTasks() (int, error) {
n, err := i.rdb.KillAllRetryTasks()
return int(n), err
}
// KillTaskByKey kills a task with the given key.
func (i *Inspector) KillTaskByKey(key string) error {
id, score, qtype, err := parseTaskKey(key)
if err != nil {
return err
}
switch qtype {
case "s":
return i.rdb.KillScheduledTask(id, score)
case "r":
return i.rdb.KillRetryTask(id, score)
case "d":
return fmt.Errorf("task already dead")
default:
return fmt.Errorf("invalid key")
}
}

1636
inspector_test.go Normal file

File diff suppressed because it is too large Load Diff

View File

@ -17,12 +17,6 @@ import (
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
) )
// ZSetEntry is an entry in redis sorted set.
type ZSetEntry struct {
Msg *base.TaskMessage
Score float64
}
// SortMsgOpt is a cmp.Option to sort base.TaskMessage for comparing slice of task messages. // SortMsgOpt is a cmp.Option to sort base.TaskMessage for comparing slice of task messages.
var SortMsgOpt = cmp.Transformer("SortTaskMessages", func(in []*base.TaskMessage) []*base.TaskMessage { var SortMsgOpt = cmp.Transformer("SortTaskMessages", func(in []*base.TaskMessage) []*base.TaskMessage {
out := append([]*base.TaskMessage(nil), in...) // Copy input to avoid mutating it out := append([]*base.TaskMessage(nil), in...) // Copy input to avoid mutating it
@ -33,10 +27,10 @@ var SortMsgOpt = cmp.Transformer("SortTaskMessages", func(in []*base.TaskMessage
}) })
// SortZSetEntryOpt is an cmp.Option to sort ZSetEntry for comparing slice of zset entries. // SortZSetEntryOpt is an cmp.Option to sort ZSetEntry for comparing slice of zset entries.
var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []ZSetEntry) []ZSetEntry { var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []base.Z) []base.Z {
out := append([]ZSetEntry(nil), in...) // Copy input to avoid mutating it out := append([]base.Z(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool { sort.Slice(out, func(i, j int) bool {
return out[i].Msg.ID.String() < out[j].Msg.ID.String() return out[i].Message.ID.String() < out[j].Message.ID.String()
}) })
return out return out
}) })
@ -177,6 +171,15 @@ func SeedEnqueuedQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage,
seedRedisList(tb, r, queue, msgs) seedRedisList(tb, r, queue, msgs)
} }
// SeedAllEnqueuedQueues initializes all of the specified queues with the given messages.
//
// enqueued map maps a queue name a list of messages.
func SeedAllEnqueuedQueues(tb testing.TB, r *redis.Client, enqueued map[string][]*base.TaskMessage) {
for q, msgs := range enqueued {
SeedEnqueuedQueue(tb, r, msgs, q)
}
}
// SeedInProgressQueue initializes the in-progress queue with the given messages. // SeedInProgressQueue initializes the in-progress queue with the given messages.
func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) { func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) {
tb.Helper() tb.Helper()
@ -184,25 +187,25 @@ func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessag
} }
// SeedScheduledQueue initializes the scheduled queue with the given messages. // SeedScheduledQueue initializes the scheduled queue with the given messages.
func SeedScheduledQueue(tb testing.TB, r *redis.Client, entries []ZSetEntry) { func SeedScheduledQueue(tb testing.TB, r *redis.Client, entries []base.Z) {
tb.Helper() tb.Helper()
seedRedisZSet(tb, r, base.ScheduledQueue, entries) seedRedisZSet(tb, r, base.ScheduledQueue, entries)
} }
// SeedRetryQueue initializes the retry queue with the given messages. // SeedRetryQueue initializes the retry queue with the given messages.
func SeedRetryQueue(tb testing.TB, r *redis.Client, entries []ZSetEntry) { func SeedRetryQueue(tb testing.TB, r *redis.Client, entries []base.Z) {
tb.Helper() tb.Helper()
seedRedisZSet(tb, r, base.RetryQueue, entries) seedRedisZSet(tb, r, base.RetryQueue, entries)
} }
// SeedDeadQueue initializes the dead queue with the given messages. // SeedDeadQueue initializes the dead queue with the given messages.
func SeedDeadQueue(tb testing.TB, r *redis.Client, entries []ZSetEntry) { func SeedDeadQueue(tb testing.TB, r *redis.Client, entries []base.Z) {
tb.Helper() tb.Helper()
seedRedisZSet(tb, r, base.DeadQueue, entries) seedRedisZSet(tb, r, base.DeadQueue, entries)
} }
// SeedDeadlines initializes the deadlines set with the given entries. // SeedDeadlines initializes the deadlines set with the given entries.
func SeedDeadlines(tb testing.TB, r *redis.Client, entries []ZSetEntry) { func SeedDeadlines(tb testing.TB, r *redis.Client, entries []base.Z) {
tb.Helper() tb.Helper()
seedRedisZSet(tb, r, base.KeyDeadlines, entries) seedRedisZSet(tb, r, base.KeyDeadlines, entries)
} }
@ -216,9 +219,9 @@ func seedRedisList(tb testing.TB, c *redis.Client, key string, msgs []*base.Task
} }
} }
func seedRedisZSet(tb testing.TB, c *redis.Client, key string, items []ZSetEntry) { func seedRedisZSet(tb testing.TB, c *redis.Client, key string, items []base.Z) {
for _, item := range items { for _, item := range items {
z := &redis.Z{Member: MustMarshal(tb, item.Msg), Score: float64(item.Score)} z := &redis.Z{Member: MustMarshal(tb, item.Message), Score: float64(item.Score)}
if err := c.ZAdd(key, z).Err(); err != nil { if err := c.ZAdd(key, z).Err(); err != nil {
tb.Fatal(err) tb.Fatal(err)
} }
@ -262,25 +265,25 @@ func GetDeadMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage {
} }
// GetScheduledEntries returns all task messages and its score in the scheduled queue. // GetScheduledEntries returns all task messages and its score in the scheduled queue.
func GetScheduledEntries(tb testing.TB, r *redis.Client) []ZSetEntry { func GetScheduledEntries(tb testing.TB, r *redis.Client) []base.Z {
tb.Helper() tb.Helper()
return getZSetEntries(tb, r, base.ScheduledQueue) return getZSetEntries(tb, r, base.ScheduledQueue)
} }
// GetRetryEntries returns all task messages and its score in the retry queue. // GetRetryEntries returns all task messages and its score in the retry queue.
func GetRetryEntries(tb testing.TB, r *redis.Client) []ZSetEntry { func GetRetryEntries(tb testing.TB, r *redis.Client) []base.Z {
tb.Helper() tb.Helper()
return getZSetEntries(tb, r, base.RetryQueue) return getZSetEntries(tb, r, base.RetryQueue)
} }
// GetDeadEntries returns all task messages and its score in the dead queue. // GetDeadEntries returns all task messages and its score in the dead queue.
func GetDeadEntries(tb testing.TB, r *redis.Client) []ZSetEntry { func GetDeadEntries(tb testing.TB, r *redis.Client) []base.Z {
tb.Helper() tb.Helper()
return getZSetEntries(tb, r, base.DeadQueue) return getZSetEntries(tb, r, base.DeadQueue)
} }
// GetDeadlinesEntries returns all task messages and its score in the deadlines set. // GetDeadlinesEntries returns all task messages and its score in the deadlines set.
func GetDeadlinesEntries(tb testing.TB, r *redis.Client) []ZSetEntry { func GetDeadlinesEntries(tb testing.TB, r *redis.Client) []base.Z {
tb.Helper() tb.Helper()
return getZSetEntries(tb, r, base.KeyDeadlines) return getZSetEntries(tb, r, base.KeyDeadlines)
} }
@ -295,13 +298,13 @@ func getZSetMessages(tb testing.TB, r *redis.Client, zset string) []*base.TaskMe
return MustUnmarshalSlice(tb, data) return MustUnmarshalSlice(tb, data)
} }
func getZSetEntries(tb testing.TB, r *redis.Client, zset string) []ZSetEntry { func getZSetEntries(tb testing.TB, r *redis.Client, zset string) []base.Z {
data := r.ZRangeWithScores(zset, 0, -1).Val() data := r.ZRangeWithScores(zset, 0, -1).Val()
var entries []ZSetEntry var entries []base.Z
for _, z := range data { for _, z := range data {
entries = append(entries, ZSetEntry{ entries = append(entries, base.Z{
Msg: MustUnmarshal(tb, z.Member.(string)), Message: MustUnmarshal(tb, z.Member.(string)),
Score: z.Score, Score: int64(z.Score),
}) })
} }
return entries return entries

View File

@ -133,6 +133,12 @@ func DecodeMessage(s string) (*TaskMessage, error) {
return &msg, nil return &msg, nil
} }
// Z represents sorted set member.
type Z struct {
Message *TaskMessage
Score int64
}
// ServerStatus represents status of a server. // ServerStatus represents status of a server.
// ServerStatus methods are concurrency safe. // ServerStatus methods are concurrency safe.
type ServerStatus struct { type ServerStatus struct {

View File

@ -51,56 +51,6 @@ type DailyStats struct {
Time time.Time Time time.Time
} }
// EnqueuedTask is a task in a queue and is ready to be processed.
type EnqueuedTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
Queue string
}
// InProgressTask is a task that's currently being processed.
type InProgressTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
}
// ScheduledTask is a task that's scheduled to be processed in the future.
type ScheduledTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
ProcessAt time.Time
Score int64
Queue string
}
// RetryTask is a task that's in retry queue because worker failed to process the task.
type RetryTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
// TODO(hibiken): add LastFailedAt time.Time
ProcessAt time.Time
ErrorMsg string
Retried int
Retry int
Score int64
Queue string
}
// DeadTask is a task in that has exhausted all retries.
type DeadTask struct {
ID uuid.UUID
Type string
Payload map[string]interface{}
LastFailedAt time.Time
ErrorMsg string
Score int64
Queue string
}
// KEYS[1] -> asynq:queues // KEYS[1] -> asynq:queues
// KEYS[2] -> asynq:in_progress // KEYS[2] -> asynq:in_progress
// KEYS[3] -> asynq:scheduled // KEYS[3] -> asynq:scheduled
@ -289,158 +239,79 @@ func (p Pagination) stop() int64 {
} }
// ListEnqueued returns enqueued tasks that are ready to be processed. // ListEnqueued returns enqueued tasks that are ready to be processed.
func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*EnqueuedTask, error) { func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*base.TaskMessage, error) {
qkey := base.QueueKey(qname) qkey := base.QueueKey(qname)
if !r.client.SIsMember(base.AllQueues, qkey).Val() { if !r.client.SIsMember(base.AllQueues, qkey).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname) return nil, fmt.Errorf("queue %q does not exist", qname)
} }
// Note: Because we use LPUSH to redis list, we need to calculate the return r.listMessages(qkey, pgn)
// correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1
start := -pgn.stop() - 1
data, err := r.client.LRange(qkey, start, stop).Result()
if err != nil {
return nil, err
}
reverse(data)
var tasks []*EnqueuedTask
for _, s := range data {
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
}
tasks = append(tasks, &EnqueuedTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
Queue: msg.Queue,
})
}
return tasks, nil
} }
// ListInProgress returns all tasks that are currently being processed. // ListInProgress returns all tasks that are currently being processed.
func (r *RDB) ListInProgress(pgn Pagination) ([]*InProgressTask, error) { func (r *RDB) ListInProgress(pgn Pagination) ([]*base.TaskMessage, error) {
return r.listMessages(base.InProgressQueue, pgn)
}
// listMessages returns a list of TaskMessage in Redis list with the given key.
func (r *RDB) listMessages(key string, pgn Pagination) ([]*base.TaskMessage, error) {
// Note: Because we use LPUSH to redis list, we need to calculate the // Note: Because we use LPUSH to redis list, we need to calculate the
// correct range and reverse the list to get the tasks with pagination. // correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1 stop := -pgn.start() - 1
start := -pgn.stop() - 1 start := -pgn.stop() - 1
data, err := r.client.LRange(base.InProgressQueue, start, stop).Result() data, err := r.client.LRange(key, start, stop).Result()
if err != nil { if err != nil {
return nil, err return nil, err
} }
reverse(data) reverse(data)
var tasks []*InProgressTask var msgs []*base.TaskMessage
for _, s := range data { for _, s := range data {
var msg base.TaskMessage m, err := base.DecodeMessage(s)
err := json.Unmarshal([]byte(s), &msg)
if err != nil { if err != nil {
continue // bad data, ignore and continue continue // bad data, ignore and continue
} }
tasks = append(tasks, &InProgressTask{ msgs = append(msgs, m)
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
})
} }
return tasks, nil return msgs, nil
} }
// ListScheduled returns all tasks that are scheduled to be processed // ListScheduled returns all tasks that are scheduled to be processed
// in the future. // in the future.
func (r *RDB) ListScheduled(pgn Pagination) ([]*ScheduledTask, error) { func (r *RDB) ListScheduled(pgn Pagination) ([]base.Z, error) {
data, err := r.client.ZRangeWithScores(base.ScheduledQueue, pgn.start(), pgn.stop()).Result() return r.listZSetEntries(base.ScheduledQueue, pgn)
if err != nil {
return nil, err
}
var tasks []*ScheduledTask
for _, z := range data {
s, ok := z.Member.(string)
if !ok {
continue // bad data, ignore and continue
}
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
}
processAt := time.Unix(int64(z.Score), 0)
tasks = append(tasks, &ScheduledTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
Queue: msg.Queue,
ProcessAt: processAt,
Score: int64(z.Score),
})
}
return tasks, nil
} }
// ListRetry returns all tasks that have failed before and willl be retried // ListRetry returns all tasks that have failed before and willl be retried
// in the future. // in the future.
func (r *RDB) ListRetry(pgn Pagination) ([]*RetryTask, error) { func (r *RDB) ListRetry(pgn Pagination) ([]base.Z, error) {
data, err := r.client.ZRangeWithScores(base.RetryQueue, pgn.start(), pgn.stop()).Result() return r.listZSetEntries(base.RetryQueue, pgn)
if err != nil {
return nil, err
}
var tasks []*RetryTask
for _, z := range data {
s, ok := z.Member.(string)
if !ok {
continue // bad data, ignore and continue
}
var msg base.TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
}
processAt := time.Unix(int64(z.Score), 0)
tasks = append(tasks, &RetryTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
ErrorMsg: msg.ErrorMsg,
Retry: msg.Retry,
Retried: msg.Retried,
Queue: msg.Queue,
ProcessAt: processAt,
Score: int64(z.Score),
})
}
return tasks, nil
} }
// ListDead returns all tasks that have exhausted its retry limit. // ListDead returns all tasks that have exhausted its retry limit.
func (r *RDB) ListDead(pgn Pagination) ([]*DeadTask, error) { func (r *RDB) ListDead(pgn Pagination) ([]base.Z, error) {
data, err := r.client.ZRangeWithScores(base.DeadQueue, pgn.start(), pgn.stop()).Result() return r.listZSetEntries(base.DeadQueue, pgn)
}
// listZSetEntries returns a list of message and score pairs in Redis sorted-set
// with the given key.
func (r *RDB) listZSetEntries(key string, pgn Pagination) ([]base.Z, error) {
data, err := r.client.ZRangeWithScores(key, pgn.start(), pgn.stop()).Result()
if err != nil { if err != nil {
return nil, err return nil, err
} }
var tasks []*DeadTask var res []base.Z
for _, z := range data { for _, z := range data {
s, ok := z.Member.(string) s, ok := z.Member.(string)
if !ok { if !ok {
continue // bad data, ignore and continue continue // bad data, ignore and continue
} }
var msg base.TaskMessage msg, err := base.DecodeMessage(s)
err := json.Unmarshal([]byte(s), &msg)
if err != nil { if err != nil {
continue // bad data, ignore and continue continue // bad data, ignore and continue
} }
lastFailedAt := time.Unix(int64(z.Score), 0) res = append(res, base.Z{msg, int64(z.Score)})
tasks = append(tasks, &DeadTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
ErrorMsg: msg.ErrorMsg,
Queue: msg.Queue,
LastFailedAt: lastFailedAt,
Score: int64(z.Score),
})
} }
return tasks, nil return res, nil
} }
// EnqueueDeadTask finds a task that matches the given id and score from dead queue // EnqueueDeadTask finds a task that matches the given id and score from dead queue
@ -704,19 +575,40 @@ func (r *RDB) deleteTask(zset, id string, score float64) error {
return nil return nil
} }
// DeleteAllDeadTasks deletes all tasks from the dead queue. // KEYS[1] -> queue to delete
func (r *RDB) DeleteAllDeadTasks() error { var deleteAllCmd = redis.NewScript(`
return r.client.Del(base.DeadQueue).Err() local n = redis.call("ZCARD", KEYS[1])
redis.call("DEL", KEYS[1])
return n`)
// DeleteAllDeadTasks deletes all tasks from the dead queue
// and returns the number of tasks deleted.
func (r *RDB) DeleteAllDeadTasks() (int64, error) {
return r.deleteAll(base.DeadQueue)
} }
// DeleteAllRetryTasks deletes all tasks from the dead queue. // DeleteAllRetryTasks deletes all tasks from the dead queue
func (r *RDB) DeleteAllRetryTasks() error { // and returns the number of tasks deleted.
return r.client.Del(base.RetryQueue).Err() func (r *RDB) DeleteAllRetryTasks() (int64, error) {
return r.deleteAll(base.RetryQueue)
} }
// DeleteAllScheduledTasks deletes all tasks from the dead queue. // DeleteAllScheduledTasks deletes all tasks from the dead queue
func (r *RDB) DeleteAllScheduledTasks() error { // and returns the number of tasks deleted.
return r.client.Del(base.ScheduledQueue).Err() func (r *RDB) DeleteAllScheduledTasks() (int64, error) {
return r.deleteAll(base.ScheduledQueue)
}
func (r *RDB) deleteAll(key string) (int64, error) {
res, err := deleteAllCmd.Run(r.client, []string{key}).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
}
return n, nil
} }
// ErrQueueNotFound indicates specified queue does not exist. // ErrQueueNotFound indicates specified queue does not exist.

File diff suppressed because it is too large Load Diff

View File

@ -148,7 +148,7 @@ func TestDequeue(t *testing.T) {
err error err error
wantEnqueued map[string][]*base.TaskMessage wantEnqueued map[string][]*base.TaskMessage
wantInProgress []*base.TaskMessage wantInProgress []*base.TaskMessage
wantDeadlines []h.ZSetEntry wantDeadlines []base.Z
}{ }{
{ {
enqueued: map[string][]*base.TaskMessage{ enqueued: map[string][]*base.TaskMessage{
@ -162,10 +162,10 @@ func TestDequeue(t *testing.T) {
"default": {}, "default": {},
}, },
wantInProgress: []*base.TaskMessage{t1}, wantInProgress: []*base.TaskMessage{t1},
wantDeadlines: []h.ZSetEntry{ wantDeadlines: []base.Z{
{ {
Msg: t1, Message: t1,
Score: float64(t1Deadline), Score: t1Deadline,
}, },
}, },
}, },
@ -181,7 +181,7 @@ func TestDequeue(t *testing.T) {
"default": {}, "default": {},
}, },
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
wantDeadlines: []h.ZSetEntry{}, wantDeadlines: []base.Z{},
}, },
{ {
enqueued: map[string][]*base.TaskMessage{ enqueued: map[string][]*base.TaskMessage{
@ -199,10 +199,10 @@ func TestDequeue(t *testing.T) {
"low": {t3}, "low": {t3},
}, },
wantInProgress: []*base.TaskMessage{t2}, wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []h.ZSetEntry{ wantDeadlines: []base.Z{
{ {
Msg: t2, Message: t2,
Score: float64(t2Deadline), Score: t2Deadline,
}, },
}, },
}, },
@ -222,10 +222,10 @@ func TestDequeue(t *testing.T) {
"low": {t2, t1}, "low": {t2, t1},
}, },
wantInProgress: []*base.TaskMessage{t3}, wantInProgress: []*base.TaskMessage{t3},
wantDeadlines: []h.ZSetEntry{ wantDeadlines: []base.Z{
{ {
Msg: t3, Message: t3,
Score: float64(t3Deadline), Score: t3Deadline,
}, },
}, },
}, },
@ -245,7 +245,7 @@ func TestDequeue(t *testing.T) {
"low": {}, "low": {},
}, },
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
wantDeadlines: []h.ZSetEntry{}, wantDeadlines: []base.Z{},
}, },
} }
@ -412,70 +412,70 @@ func TestDone(t *testing.T) {
tests := []struct { tests := []struct {
inProgress []*base.TaskMessage // initial state of the in-progress list inProgress []*base.TaskMessage // initial state of the in-progress list
deadlines []h.ZSetEntry // initial state of deadlines set deadlines []base.Z // initial state of deadlines set
target *base.TaskMessage // task to remove target *base.TaskMessage // task to remove
wantInProgress []*base.TaskMessage // final state of the in-progress list wantInProgress []*base.TaskMessage // final state of the in-progress list
wantDeadlines []h.ZSetEntry // final state of the deadline set wantDeadlines []base.Z // final state of the deadline set
}{ }{
{ {
inProgress: []*base.TaskMessage{t1, t2}, inProgress: []*base.TaskMessage{t1, t2},
deadlines: []h.ZSetEntry{ deadlines: []base.Z{
{ {
Msg: t1, Message: t1,
Score: float64(t1Deadline), Score: t1Deadline,
}, },
{ {
Msg: t2, Message: t2,
Score: float64(t2Deadline), Score: t2Deadline,
}, },
}, },
target: t1, target: t1,
wantInProgress: []*base.TaskMessage{t2}, wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []h.ZSetEntry{ wantDeadlines: []base.Z{
{ {
Msg: t2, Message: t2,
Score: float64(t2Deadline), Score: t2Deadline,
}, },
}, },
}, },
{ {
inProgress: []*base.TaskMessage{t1}, inProgress: []*base.TaskMessage{t1},
deadlines: []h.ZSetEntry{ deadlines: []base.Z{
{ {
Msg: t1, Message: t1,
Score: float64(t1Deadline), Score: t1Deadline,
}, },
}, },
target: t1, target: t1,
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
wantDeadlines: []h.ZSetEntry{}, wantDeadlines: []base.Z{},
}, },
{ {
inProgress: []*base.TaskMessage{t1, t2, t3}, inProgress: []*base.TaskMessage{t1, t2, t3},
deadlines: []h.ZSetEntry{ deadlines: []base.Z{
{ {
Msg: t1, Message: t1,
Score: float64(t1Deadline), Score: t1Deadline,
}, },
{ {
Msg: t2, Message: t2,
Score: float64(t2Deadline), Score: t2Deadline,
}, },
{ {
Msg: t3, Message: t3,
Score: float64(t3Deadline), Score: t3Deadline,
}, },
}, },
target: t3, target: t3,
wantInProgress: []*base.TaskMessage{t1, t2}, wantInProgress: []*base.TaskMessage{t1, t2},
wantDeadlines: []h.ZSetEntry{ wantDeadlines: []base.Z{
{ {
Msg: t1, Message: t1,
Score: float64(t1Deadline), Score: t1Deadline,
}, },
{ {
Msg: t2, Message: t2,
Score: float64(t2Deadline), Score: t2Deadline,
}, },
}, },
}, },
@ -560,28 +560,28 @@ func TestRequeue(t *testing.T) {
tests := []struct { tests := []struct {
enqueued map[string][]*base.TaskMessage // initial state of queues enqueued map[string][]*base.TaskMessage // initial state of queues
inProgress []*base.TaskMessage // initial state of the in-progress list inProgress []*base.TaskMessage // initial state of the in-progress list
deadlines []h.ZSetEntry // initial state of the deadlines set deadlines []base.Z // initial state of the deadlines set
target *base.TaskMessage // task to requeue target *base.TaskMessage // task to requeue
wantEnqueued map[string][]*base.TaskMessage // final state of queues wantEnqueued map[string][]*base.TaskMessage // final state of queues
wantInProgress []*base.TaskMessage // final state of the in-progress list wantInProgress []*base.TaskMessage // final state of the in-progress list
wantDeadlines []h.ZSetEntry // final state of the deadlines set wantDeadlines []base.Z // final state of the deadlines set
}{ }{
{ {
enqueued: map[string][]*base.TaskMessage{ enqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {}, base.DefaultQueueName: {},
}, },
inProgress: []*base.TaskMessage{t1, t2}, inProgress: []*base.TaskMessage{t1, t2},
deadlines: []h.ZSetEntry{ deadlines: []base.Z{
{Msg: t1, Score: float64(t1Deadline)}, {Message: t1, Score: t1Deadline},
{Msg: t2, Score: float64(t2Deadline)}, {Message: t2, Score: t2Deadline},
}, },
target: t1, target: t1,
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1}, base.DefaultQueueName: {t1},
}, },
wantInProgress: []*base.TaskMessage{t2}, wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []h.ZSetEntry{ wantDeadlines: []base.Z{
{Msg: t2, Score: float64(t2Deadline)}, {Message: t2, Score: t2Deadline},
}, },
}, },
{ {
@ -589,15 +589,15 @@ func TestRequeue(t *testing.T) {
base.DefaultQueueName: {t1}, base.DefaultQueueName: {t1},
}, },
inProgress: []*base.TaskMessage{t2}, inProgress: []*base.TaskMessage{t2},
deadlines: []h.ZSetEntry{ deadlines: []base.Z{
{Msg: t2, Score: float64(t2Deadline)}, {Message: t2, Score: t2Deadline},
}, },
target: t2, target: t2,
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
base.DefaultQueueName: {t1, t2}, base.DefaultQueueName: {t1, t2},
}, },
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
wantDeadlines: []h.ZSetEntry{}, wantDeadlines: []base.Z{},
}, },
{ {
enqueued: map[string][]*base.TaskMessage{ enqueued: map[string][]*base.TaskMessage{
@ -605,9 +605,9 @@ func TestRequeue(t *testing.T) {
"critical": {}, "critical": {},
}, },
inProgress: []*base.TaskMessage{t2, t3}, inProgress: []*base.TaskMessage{t2, t3},
deadlines: []h.ZSetEntry{ deadlines: []base.Z{
{Msg: t2, Score: float64(t2Deadline)}, {Message: t2, Score: t2Deadline},
{Msg: t3, Score: float64(t3Deadline)}, {Message: t3, Score: t3Deadline},
}, },
target: t3, target: t3,
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
@ -615,8 +615,8 @@ func TestRequeue(t *testing.T) {
"critical": {t3}, "critical": {t3},
}, },
wantInProgress: []*base.TaskMessage{t2}, wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []h.ZSetEntry{ wantDeadlines: []base.Z{
{Msg: t2, Score: float64(t2Deadline)}, {Message: t2, Score: t2Deadline},
}, },
}, },
} }
@ -765,42 +765,42 @@ func TestRetry(t *testing.T) {
tests := []struct { tests := []struct {
inProgress []*base.TaskMessage inProgress []*base.TaskMessage
deadlines []h.ZSetEntry deadlines []base.Z
retry []h.ZSetEntry retry []base.Z
msg *base.TaskMessage msg *base.TaskMessage
processAt time.Time processAt time.Time
errMsg string errMsg string
wantInProgress []*base.TaskMessage wantInProgress []*base.TaskMessage
wantDeadlines []h.ZSetEntry wantDeadlines []base.Z
wantRetry []h.ZSetEntry wantRetry []base.Z
}{ }{
{ {
inProgress: []*base.TaskMessage{t1, t2}, inProgress: []*base.TaskMessage{t1, t2},
deadlines: []h.ZSetEntry{ deadlines: []base.Z{
{Msg: t1, Score: float64(t1Deadline)}, {Message: t1, Score: t1Deadline},
{Msg: t2, Score: float64(t2Deadline)}, {Message: t2, Score: t2Deadline},
}, },
retry: []h.ZSetEntry{ retry: []base.Z{
{ {
Msg: t3, Message: t3,
Score: float64(now.Add(time.Minute).Unix()), Score: now.Add(time.Minute).Unix(),
}, },
}, },
msg: t1, msg: t1,
processAt: now.Add(5 * time.Minute), processAt: now.Add(5 * time.Minute),
errMsg: errMsg, errMsg: errMsg,
wantInProgress: []*base.TaskMessage{t2}, wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []h.ZSetEntry{ wantDeadlines: []base.Z{
{Msg: t2, Score: float64(t2Deadline)}, {Message: t2, Score: t2Deadline},
}, },
wantRetry: []h.ZSetEntry{ wantRetry: []base.Z{
{ {
Msg: h.TaskMessageAfterRetry(*t1, errMsg), Message: h.TaskMessageAfterRetry(*t1, errMsg),
Score: float64(now.Add(5 * time.Minute).Unix()), Score: now.Add(5 * time.Minute).Unix(),
}, },
{ {
Msg: t3, Message: t3,
Score: float64(now.Add(time.Minute).Unix()), Score: now.Add(time.Minute).Unix(),
}, },
}, },
}, },
@ -891,59 +891,59 @@ func TestKill(t *testing.T) {
// TODO(hibiken): add test cases for trimming // TODO(hibiken): add test cases for trimming
tests := []struct { tests := []struct {
inProgress []*base.TaskMessage inProgress []*base.TaskMessage
deadlines []h.ZSetEntry deadlines []base.Z
dead []h.ZSetEntry dead []base.Z
target *base.TaskMessage // task to kill target *base.TaskMessage // task to kill
wantInProgress []*base.TaskMessage wantInProgress []*base.TaskMessage
wantDeadlines []h.ZSetEntry wantDeadlines []base.Z
wantDead []h.ZSetEntry wantDead []base.Z
}{ }{
{ {
inProgress: []*base.TaskMessage{t1, t2}, inProgress: []*base.TaskMessage{t1, t2},
deadlines: []h.ZSetEntry{ deadlines: []base.Z{
{Msg: t1, Score: float64(t1Deadline)}, {Message: t1, Score: t1Deadline},
{Msg: t2, Score: float64(t2Deadline)}, {Message: t2, Score: t2Deadline},
}, },
dead: []h.ZSetEntry{ dead: []base.Z{
{ {
Msg: t3, Message: t3,
Score: float64(now.Add(-time.Hour).Unix()), Score: now.Add(-time.Hour).Unix(),
}, },
}, },
target: t1, target: t1,
wantInProgress: []*base.TaskMessage{t2}, wantInProgress: []*base.TaskMessage{t2},
wantDeadlines: []h.ZSetEntry{ wantDeadlines: []base.Z{
{Msg: t2, Score: float64(t2Deadline)}, {Message: t2, Score: t2Deadline},
}, },
wantDead: []h.ZSetEntry{ wantDead: []base.Z{
{ {
Msg: h.TaskMessageWithError(*t1, errMsg), Message: h.TaskMessageWithError(*t1, errMsg),
Score: float64(now.Unix()), Score: now.Unix(),
}, },
{ {
Msg: t3, Message: t3,
Score: float64(now.Add(-time.Hour).Unix()), Score: now.Add(-time.Hour).Unix(),
}, },
}, },
}, },
{ {
inProgress: []*base.TaskMessage{t1, t2, t3}, inProgress: []*base.TaskMessage{t1, t2, t3},
deadlines: []h.ZSetEntry{ deadlines: []base.Z{
{Msg: t1, Score: float64(t1Deadline)}, {Message: t1, Score: t1Deadline},
{Msg: t2, Score: float64(t2Deadline)}, {Message: t2, Score: t2Deadline},
{Msg: t3, Score: float64(t3Deadline)}, {Message: t3, Score: t3Deadline},
}, },
dead: []h.ZSetEntry{}, dead: []base.Z{},
target: t1, target: t1,
wantInProgress: []*base.TaskMessage{t2, t3}, wantInProgress: []*base.TaskMessage{t2, t3},
wantDeadlines: []h.ZSetEntry{ wantDeadlines: []base.Z{
{Msg: t2, Score: float64(t2Deadline)}, {Message: t2, Score: t2Deadline},
{Msg: t3, Score: float64(t3Deadline)}, {Message: t3, Score: t3Deadline},
}, },
wantDead: []h.ZSetEntry{ wantDead: []base.Z{
{ {
Msg: h.TaskMessageWithError(*t1, errMsg), Message: h.TaskMessageWithError(*t1, errMsg),
Score: float64(now.Unix()), Score: now.Unix(),
}, },
}, },
}, },
@ -1009,19 +1009,19 @@ func TestCheckAndEnqueue(t *testing.T) {
hourFromNow := time.Now().Add(time.Hour) hourFromNow := time.Now().Add(time.Hour)
tests := []struct { tests := []struct {
scheduled []h.ZSetEntry scheduled []base.Z
retry []h.ZSetEntry retry []base.Z
wantEnqueued map[string][]*base.TaskMessage wantEnqueued map[string][]*base.TaskMessage
wantScheduled []*base.TaskMessage wantScheduled []*base.TaskMessage
wantRetry []*base.TaskMessage wantRetry []*base.TaskMessage
}{ }{
{ {
scheduled: []h.ZSetEntry{ scheduled: []base.Z{
{Msg: t1, Score: float64(secondAgo.Unix())}, {Message: t1, Score: secondAgo.Unix()},
{Msg: t2, Score: float64(secondAgo.Unix())}, {Message: t2, Score: secondAgo.Unix()},
}, },
retry: []h.ZSetEntry{ retry: []base.Z{
{Msg: t3, Score: float64(secondAgo.Unix())}}, {Message: t3, Score: secondAgo.Unix()}},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1, t2, t3}, "default": {t1, t2, t3},
}, },
@ -1029,11 +1029,11 @@ func TestCheckAndEnqueue(t *testing.T) {
wantRetry: []*base.TaskMessage{}, wantRetry: []*base.TaskMessage{},
}, },
{ {
scheduled: []h.ZSetEntry{ scheduled: []base.Z{
{Msg: t1, Score: float64(hourFromNow.Unix())}, {Message: t1, Score: hourFromNow.Unix()},
{Msg: t2, Score: float64(secondAgo.Unix())}}, {Message: t2, Score: secondAgo.Unix()}},
retry: []h.ZSetEntry{ retry: []base.Z{
{Msg: t3, Score: float64(secondAgo.Unix())}}, {Message: t3, Score: secondAgo.Unix()}},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t2, t3}, "default": {t2, t3},
}, },
@ -1041,11 +1041,11 @@ func TestCheckAndEnqueue(t *testing.T) {
wantRetry: []*base.TaskMessage{}, wantRetry: []*base.TaskMessage{},
}, },
{ {
scheduled: []h.ZSetEntry{ scheduled: []base.Z{
{Msg: t1, Score: float64(hourFromNow.Unix())}, {Message: t1, Score: hourFromNow.Unix()},
{Msg: t2, Score: float64(hourFromNow.Unix())}}, {Message: t2, Score: hourFromNow.Unix()}},
retry: []h.ZSetEntry{ retry: []base.Z{
{Msg: t3, Score: float64(hourFromNow.Unix())}}, {Message: t3, Score: hourFromNow.Unix()}},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {}, "default": {},
}, },
@ -1053,12 +1053,12 @@ func TestCheckAndEnqueue(t *testing.T) {
wantRetry: []*base.TaskMessage{t3}, wantRetry: []*base.TaskMessage{t3},
}, },
{ {
scheduled: []h.ZSetEntry{ scheduled: []base.Z{
{Msg: t1, Score: float64(secondAgo.Unix())}, {Message: t1, Score: secondAgo.Unix()},
{Msg: t4, Score: float64(secondAgo.Unix())}, {Message: t4, Score: secondAgo.Unix()},
}, },
retry: []h.ZSetEntry{ retry: []base.Z{
{Msg: t5, Score: float64(secondAgo.Unix())}}, {Message: t5, Score: secondAgo.Unix()}},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1}, "default": {t1},
"critical": {t4}, "critical": {t4},
@ -1112,41 +1112,41 @@ func TestListDeadlineExceeded(t *testing.T) {
tests := []struct { tests := []struct {
desc string desc string
deadlines []h.ZSetEntry deadlines []base.Z
t time.Time t time.Time
want []*base.TaskMessage want []*base.TaskMessage
}{ }{
{ {
desc: "with one task in-progress", desc: "with one task in-progress",
deadlines: []h.ZSetEntry{ deadlines: []base.Z{
{Msg: t1, Score: float64(fiveMinutesAgo.Unix())}, {Message: t1, Score: fiveMinutesAgo.Unix()},
}, },
t: time.Now(), t: time.Now(),
want: []*base.TaskMessage{t1}, want: []*base.TaskMessage{t1},
}, },
{ {
desc: "with multiple tasks in-progress, and one expired", desc: "with multiple tasks in-progress, and one expired",
deadlines: []h.ZSetEntry{ deadlines: []base.Z{
{Msg: t1, Score: float64(oneHourAgo.Unix())}, {Message: t1, Score: oneHourAgo.Unix()},
{Msg: t2, Score: float64(fiveMinutesFromNow.Unix())}, {Message: t2, Score: fiveMinutesFromNow.Unix()},
{Msg: t3, Score: float64(oneHourFromNow.Unix())}, {Message: t3, Score: oneHourFromNow.Unix()},
}, },
t: time.Now(), t: time.Now(),
want: []*base.TaskMessage{t1}, want: []*base.TaskMessage{t1},
}, },
{ {
desc: "with multiple expired tasks in-progress", desc: "with multiple expired tasks in-progress",
deadlines: []h.ZSetEntry{ deadlines: []base.Z{
{Msg: t1, Score: float64(oneHourAgo.Unix())}, {Message: t1, Score: oneHourAgo.Unix()},
{Msg: t2, Score: float64(fiveMinutesAgo.Unix())}, {Message: t2, Score: fiveMinutesAgo.Unix()},
{Msg: t3, Score: float64(oneHourFromNow.Unix())}, {Message: t3, Score: oneHourFromNow.Unix()},
}, },
t: time.Now(), t: time.Now(),
want: []*base.TaskMessage{t1, t2}, want: []*base.TaskMessage{t1, t2},
}, },
{ {
desc: "with empty in-progress queue", desc: "with empty in-progress queue",
deadlines: []h.ZSetEntry{}, deadlines: []base.Z{},
t: time.Now(), t: time.Now(),
want: []*base.TaskMessage{}, want: []*base.TaskMessage{},
}, },

View File

@ -223,7 +223,7 @@ func TestProcessorRetry(t *testing.T) {
delay time.Duration // retry delay duration delay time.Duration // retry delay duration
handler Handler // task handler handler Handler // task handler
wait time.Duration // wait duration between starting and stopping processor for this test case wait time.Duration // wait duration between starting and stopping processor for this test case
wantRetry []h.ZSetEntry // tasks in retry queue at the end wantRetry []base.Z // tasks in retry queue at the end
wantDead []*base.TaskMessage // tasks in dead queue at the end wantDead []*base.TaskMessage // tasks in dead queue at the end
wantErrCount int // number of times error handler should be called wantErrCount int // number of times error handler should be called
}{ }{
@ -235,10 +235,10 @@ func TestProcessorRetry(t *testing.T) {
return fmt.Errorf(errMsg) return fmt.Errorf(errMsg)
}), }),
wait: 2 * time.Second, wait: 2 * time.Second,
wantRetry: []h.ZSetEntry{ wantRetry: []base.Z{
{Msg: h.TaskMessageAfterRetry(*m2, errMsg), Score: float64(now.Add(time.Minute).Unix())}, {Message: h.TaskMessageAfterRetry(*m2, errMsg), Score: now.Add(time.Minute).Unix()},
{Msg: h.TaskMessageAfterRetry(*m3, errMsg), Score: float64(now.Add(time.Minute).Unix())}, {Message: h.TaskMessageAfterRetry(*m3, errMsg), Score: now.Add(time.Minute).Unix()},
{Msg: h.TaskMessageAfterRetry(*m4, errMsg), Score: float64(now.Add(time.Minute).Unix())}, {Message: h.TaskMessageAfterRetry(*m4, errMsg), Score: now.Add(time.Minute).Unix()},
}, },
wantDead: []*base.TaskMessage{h.TaskMessageWithError(*m1, errMsg)}, wantDead: []*base.TaskMessage{h.TaskMessageWithError(*m1, errMsg)},
wantErrCount: 4, wantErrCount: 4,

View File

@ -34,24 +34,24 @@ func TestRecoverer(t *testing.T) {
tests := []struct { tests := []struct {
desc string desc string
inProgress []*base.TaskMessage inProgress []*base.TaskMessage
deadlines []h.ZSetEntry deadlines []base.Z
retry []h.ZSetEntry retry []base.Z
dead []h.ZSetEntry dead []base.Z
wantInProgress []*base.TaskMessage wantInProgress []*base.TaskMessage
wantDeadlines []h.ZSetEntry wantDeadlines []base.Z
wantRetry []*base.TaskMessage wantRetry []*base.TaskMessage
wantDead []*base.TaskMessage wantDead []*base.TaskMessage
}{ }{
{ {
desc: "with one task in-progress", desc: "with one task in-progress",
inProgress: []*base.TaskMessage{t1}, inProgress: []*base.TaskMessage{t1},
deadlines: []h.ZSetEntry{ deadlines: []base.Z{
{Msg: t1, Score: float64(fiveMinutesAgo.Unix())}, {Message: t1, Score: fiveMinutesAgo.Unix()},
}, },
retry: []h.ZSetEntry{}, retry: []base.Z{},
dead: []h.ZSetEntry{}, dead: []base.Z{},
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
wantDeadlines: []h.ZSetEntry{}, wantDeadlines: []base.Z{},
wantRetry: []*base.TaskMessage{ wantRetry: []*base.TaskMessage{
h.TaskMessageAfterRetry(*t1, "deadline exceeded"), h.TaskMessageAfterRetry(*t1, "deadline exceeded"),
}, },
@ -60,30 +60,30 @@ func TestRecoverer(t *testing.T) {
{ {
desc: "with a task with max-retry reached", desc: "with a task with max-retry reached",
inProgress: []*base.TaskMessage{t4}, inProgress: []*base.TaskMessage{t4},
deadlines: []h.ZSetEntry{ deadlines: []base.Z{
{Msg: t4, Score: float64(fiveMinutesAgo.Unix())}, {Message: t4, Score: fiveMinutesAgo.Unix()},
}, },
retry: []h.ZSetEntry{}, retry: []base.Z{},
dead: []h.ZSetEntry{}, dead: []base.Z{},
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
wantDeadlines: []h.ZSetEntry{}, wantDeadlines: []base.Z{},
wantRetry: []*base.TaskMessage{}, wantRetry: []*base.TaskMessage{},
wantDead: []*base.TaskMessage{h.TaskMessageWithError(*t4, "deadline exceeded")}, wantDead: []*base.TaskMessage{h.TaskMessageWithError(*t4, "deadline exceeded")},
}, },
{ {
desc: "with multiple tasks in-progress, and one expired", desc: "with multiple tasks in-progress, and one expired",
inProgress: []*base.TaskMessage{t1, t2, t3}, inProgress: []*base.TaskMessage{t1, t2, t3},
deadlines: []h.ZSetEntry{ deadlines: []base.Z{
{Msg: t1, Score: float64(oneHourAgo.Unix())}, {Message: t1, Score: oneHourAgo.Unix()},
{Msg: t2, Score: float64(fiveMinutesFromNow.Unix())}, {Message: t2, Score: fiveMinutesFromNow.Unix()},
{Msg: t3, Score: float64(oneHourFromNow.Unix())}, {Message: t3, Score: oneHourFromNow.Unix()},
}, },
retry: []h.ZSetEntry{}, retry: []base.Z{},
dead: []h.ZSetEntry{}, dead: []base.Z{},
wantInProgress: []*base.TaskMessage{t2, t3}, wantInProgress: []*base.TaskMessage{t2, t3},
wantDeadlines: []h.ZSetEntry{ wantDeadlines: []base.Z{
{Msg: t2, Score: float64(fiveMinutesFromNow.Unix())}, {Message: t2, Score: fiveMinutesFromNow.Unix()},
{Msg: t3, Score: float64(oneHourFromNow.Unix())}, {Message: t3, Score: oneHourFromNow.Unix()},
}, },
wantRetry: []*base.TaskMessage{ wantRetry: []*base.TaskMessage{
h.TaskMessageAfterRetry(*t1, "deadline exceeded"), h.TaskMessageAfterRetry(*t1, "deadline exceeded"),
@ -93,16 +93,16 @@ func TestRecoverer(t *testing.T) {
{ {
desc: "with multiple expired tasks in-progress", desc: "with multiple expired tasks in-progress",
inProgress: []*base.TaskMessage{t1, t2, t3}, inProgress: []*base.TaskMessage{t1, t2, t3},
deadlines: []h.ZSetEntry{ deadlines: []base.Z{
{Msg: t1, Score: float64(oneHourAgo.Unix())}, {Message: t1, Score: oneHourAgo.Unix()},
{Msg: t2, Score: float64(fiveMinutesAgo.Unix())}, {Message: t2, Score: fiveMinutesAgo.Unix()},
{Msg: t3, Score: float64(oneHourFromNow.Unix())}, {Message: t3, Score: oneHourFromNow.Unix()},
}, },
retry: []h.ZSetEntry{}, retry: []base.Z{},
dead: []h.ZSetEntry{}, dead: []base.Z{},
wantInProgress: []*base.TaskMessage{t3}, wantInProgress: []*base.TaskMessage{t3},
wantDeadlines: []h.ZSetEntry{ wantDeadlines: []base.Z{
{Msg: t3, Score: float64(oneHourFromNow.Unix())}, {Message: t3, Score: oneHourFromNow.Unix()},
}, },
wantRetry: []*base.TaskMessage{ wantRetry: []*base.TaskMessage{
h.TaskMessageAfterRetry(*t1, "deadline exceeded"), h.TaskMessageAfterRetry(*t1, "deadline exceeded"),
@ -113,11 +113,11 @@ func TestRecoverer(t *testing.T) {
{ {
desc: "with empty in-progress queue", desc: "with empty in-progress queue",
inProgress: []*base.TaskMessage{}, inProgress: []*base.TaskMessage{},
deadlines: []h.ZSetEntry{}, deadlines: []base.Z{},
retry: []h.ZSetEntry{}, retry: []base.Z{},
dead: []h.ZSetEntry{}, dead: []base.Z{},
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
wantDeadlines: []h.ZSetEntry{}, wantDeadlines: []base.Z{},
wantRetry: []*base.TaskMessage{}, wantRetry: []*base.TaskMessage{},
wantDead: []*base.TaskMessage{}, wantDead: []*base.TaskMessage{},
}, },

View File

@ -31,8 +31,8 @@ func TestScheduler(t *testing.T) {
now := time.Now() now := time.Now()
tests := []struct { tests := []struct {
initScheduled []h.ZSetEntry // scheduled queue initial state initScheduled []base.Z // scheduled queue initial state
initRetry []h.ZSetEntry // retry queue initial state initRetry []base.Z // retry queue initial state
initQueue []*base.TaskMessage // default queue initial state initQueue []*base.TaskMessage // default queue initial state
wait time.Duration // wait duration before checking for final state wait time.Duration // wait duration before checking for final state
wantScheduled []*base.TaskMessage // schedule queue final state wantScheduled []*base.TaskMessage // schedule queue final state
@ -40,12 +40,12 @@ func TestScheduler(t *testing.T) {
wantQueue []*base.TaskMessage // default queue final state wantQueue []*base.TaskMessage // default queue final state
}{ }{
{ {
initScheduled: []h.ZSetEntry{ initScheduled: []base.Z{
{Msg: t1, Score: float64(now.Add(time.Hour).Unix())}, {Message: t1, Score: now.Add(time.Hour).Unix()},
{Msg: t2, Score: float64(now.Add(-2 * time.Second).Unix())}, {Message: t2, Score: now.Add(-2 * time.Second).Unix()},
}, },
initRetry: []h.ZSetEntry{ initRetry: []base.Z{
{Msg: t3, Score: float64(time.Now().Add(-500 * time.Millisecond).Unix())}, {Message: t3, Score: time.Now().Add(-500 * time.Millisecond).Unix()},
}, },
initQueue: []*base.TaskMessage{t4}, initQueue: []*base.TaskMessage{t4},
wait: pollInterval * 2, wait: pollInterval * 2,
@ -54,12 +54,12 @@ func TestScheduler(t *testing.T) {
wantQueue: []*base.TaskMessage{t2, t3, t4}, wantQueue: []*base.TaskMessage{t2, t3, t4},
}, },
{ {
initScheduled: []h.ZSetEntry{ initScheduled: []base.Z{
{Msg: t1, Score: float64(now.Unix())}, {Message: t1, Score: now.Unix()},
{Msg: t2, Score: float64(now.Add(-2 * time.Second).Unix())}, {Message: t2, Score: now.Add(-2 * time.Second).Unix()},
{Msg: t3, Score: float64(now.Add(-500 * time.Millisecond).Unix())}, {Message: t3, Score: now.Add(-500 * time.Millisecond).Unix()},
}, },
initRetry: []h.ZSetEntry{}, initRetry: []base.Z{},
initQueue: []*base.TaskMessage{t4}, initQueue: []*base.TaskMessage{t4},
wait: pollInterval * 2, wait: pollInterval * 2,
wantScheduled: []*base.TaskMessage{}, wantScheduled: []*base.TaskMessage{},

View File

@ -8,15 +8,14 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/go-redis/redis/v7" "github.com/hibiken/asynq"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
// delCmd represents the del command // delCmd represents the del command
var delCmd = &cobra.Command{ var delCmd = &cobra.Command{
Use: "del [task id]", Use: "del [task key]",
Short: "Deletes a task given an identifier", Short: "Deletes a task given an identifier",
Long: `Del (asynq del) will delete a task given an identifier. Long: `Del (asynq del) will delete a task given an identifier.
@ -44,27 +43,12 @@ func init() {
} }
func del(cmd *cobra.Command, args []string) { func del(cmd *cobra.Command, args []string) {
id, score, qtype, err := parseQueryID(args[0]) i := asynq.NewInspector(asynq.RedisClientOpt{
if err != nil {
fmt.Println(err)
os.Exit(1)
}
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
})) })
switch qtype { err := i.DeleteTaskByKey(args[0])
case "s":
err = r.DeleteScheduledTask(id, score)
case "r":
err = r.DeleteRetryTask(id, score)
case "d":
err = r.DeleteDeadTask(id, score)
default:
fmt.Println("invalid argument")
os.Exit(1)
}
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)

View File

@ -8,8 +8,7 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/go-redis/redis/v7" "github.com/hibiken/asynq"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
@ -45,20 +44,22 @@ func init() {
} }
func delall(cmd *cobra.Command, args []string) { func delall(cmd *cobra.Command, args []string) {
c := redis.NewClient(&redis.Options{ i := asynq.NewInspector(asynq.RedisClientOpt{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
}) })
r := rdb.NewRDB(c) var (
var err error n int
err error
)
switch args[0] { switch args[0] {
case "scheduled": case "scheduled":
err = r.DeleteAllScheduledTasks() n, err = i.DeleteAllScheduledTasks()
case "retry": case "retry":
err = r.DeleteAllRetryTasks() n, err = i.DeleteAllRetryTasks()
case "dead": case "dead":
err = r.DeleteAllDeadTasks() n, err = i.DeleteAllDeadTasks()
default: default:
fmt.Printf("error: `asynq delall [state]` only accepts %v as the argument.\n", delallValidArgs) fmt.Printf("error: `asynq delall [state]` only accepts %v as the argument.\n", delallValidArgs)
os.Exit(1) os.Exit(1)
@ -67,5 +68,5 @@ func delall(cmd *cobra.Command, args []string) {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
} }
fmt.Printf("Deleted all tasks in %q state\n", args[0]) fmt.Printf("Deleted all %d tasks in %q state\n", n, args[0])
} }

View File

@ -8,15 +8,14 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/go-redis/redis/v7" "github.com/hibiken/asynq"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
// enqCmd represents the enq command // enqCmd represents the enq command
var enqCmd = &cobra.Command{ var enqCmd = &cobra.Command{
Use: "enq [task id]", Use: "enq [task key]",
Short: "Enqueues a task given an identifier", Short: "Enqueues a task given an identifier",
Long: `Enq (asynq enq) will enqueue a task given an identifier. Long: `Enq (asynq enq) will enqueue a task given an identifier.
@ -47,27 +46,12 @@ func init() {
} }
func enq(cmd *cobra.Command, args []string) { func enq(cmd *cobra.Command, args []string) {
id, score, qtype, err := parseQueryID(args[0]) i := asynq.NewInspector(asynq.RedisClientOpt{
if err != nil {
fmt.Println(err)
os.Exit(1)
}
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
})) })
switch qtype { err := i.EnqueueTaskByKey(args[0])
case "s":
err = r.EnqueueScheduledTask(id, score)
case "r":
err = r.EnqueueRetryTask(id, score)
case "d":
err = r.EnqueueDeadTask(id, score)
default:
fmt.Println("invalid argument")
os.Exit(1)
}
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)

View File

@ -8,8 +8,7 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/go-redis/redis/v7" "github.com/hibiken/asynq"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
@ -48,21 +47,22 @@ func init() {
} }
func enqall(cmd *cobra.Command, args []string) { func enqall(cmd *cobra.Command, args []string) {
c := redis.NewClient(&redis.Options{ i := asynq.NewInspector(asynq.RedisClientOpt{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
}) })
r := rdb.NewRDB(c) var (
var n int64 n int
var err error err error
)
switch args[0] { switch args[0] {
case "scheduled": case "scheduled":
n, err = r.EnqueueAllScheduledTasks() n, err = i.EnqueueAllScheduledTasks()
case "retry": case "retry":
n, err = r.EnqueueAllRetryTasks() n, err = i.EnqueueAllRetryTasks()
case "dead": case "dead":
n, err = r.EnqueueAllDeadTasks() n, err = i.EnqueueAllDeadTasks()
default: default:
fmt.Printf("error: `asynq enqall [state]` only accepts %v as the argument.\n", enqallValidArgs) fmt.Printf("error: `asynq enqall [state]` only accepts %v as the argument.\n", enqallValidArgs)
os.Exit(1) os.Exit(1)

View File

@ -10,8 +10,7 @@ import (
"strings" "strings"
"text/tabwriter" "text/tabwriter"
"github.com/go-redis/redis/v7" "github.com/hibiken/asynq"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
@ -38,14 +37,13 @@ func init() {
} }
func history(cmd *cobra.Command, args []string) { func history(cmd *cobra.Command, args []string) {
c := redis.NewClient(&redis.Options{ i := asynq.NewInspector(asynq.RedisClientOpt{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
}) })
r := rdb.NewRDB(c)
stats, err := r.HistoricalStats(days) stats, err := i.History(days)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
@ -53,7 +51,7 @@ func history(cmd *cobra.Command, args []string) {
printDailyStats(stats) printDailyStats(stats)
} }
func printDailyStats(stats []*rdb.DailyStats) { func printDailyStats(stats []*asynq.DailyStats) {
format := strings.Repeat("%v\t", 4) + "\n" format := strings.Repeat("%v\t", 4) + "\n"
tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0) tw := new(tabwriter.Writer).Init(os.Stdout, 0, 8, 2, ' ', 0)
fmt.Fprintf(tw, format, "Date (UTC)", "Processed", "Failed", "Error Rate") fmt.Fprintf(tw, format, "Date (UTC)", "Processed", "Failed", "Error Rate")

View File

@ -8,15 +8,14 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/go-redis/redis/v7" "github.com/hibiken/asynq"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
// killCmd represents the kill command // killCmd represents the kill command
var killCmd = &cobra.Command{ var killCmd = &cobra.Command{
Use: "kill [task id]", Use: "kill [task key]",
Short: "Kills a task given an identifier", Short: "Kills a task given an identifier",
Long: `Kill (asynq kill) will put a task in dead state given an identifier. Long: `Kill (asynq kill) will put a task in dead state given an identifier.
@ -44,25 +43,12 @@ func init() {
} }
func kill(cmd *cobra.Command, args []string) { func kill(cmd *cobra.Command, args []string) {
id, score, qtype, err := parseQueryID(args[0]) i := asynq.NewInspector(asynq.RedisClientOpt{
if err != nil {
fmt.Println(err)
os.Exit(1)
}
r := rdb.NewRDB(redis.NewClient(&redis.Options{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
})) })
switch qtype { err := i.KillTaskByKey(args[0])
case "s":
err = r.KillScheduledTask(id, score)
case "r":
err = r.KillRetryTask(id, score)
default:
fmt.Println("invalid argument")
os.Exit(1)
}
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)

View File

@ -8,8 +8,7 @@ import (
"fmt" "fmt"
"os" "os"
"github.com/go-redis/redis/v7" "github.com/hibiken/asynq"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
@ -45,19 +44,20 @@ func init() {
} }
func killall(cmd *cobra.Command, args []string) { func killall(cmd *cobra.Command, args []string) {
c := redis.NewClient(&redis.Options{ i := asynq.NewInspector(asynq.RedisClientOpt{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
}) })
r := rdb.NewRDB(c) var (
var n int64 n int
var err error err error
)
switch args[0] { switch args[0] {
case "scheduled": case "scheduled":
n, err = r.KillAllScheduledTasks() n, err = i.KillAllScheduledTasks()
case "retry": case "retry":
n, err = r.KillAllRetryTasks() n, err = i.KillAllRetryTasks()
default: default:
fmt.Printf("error: `asynq killall [state]` only accepts %v as the argument.\n", killallValidArgs) fmt.Printf("error: `asynq killall [state]` only accepts %v as the argument.\n", killallValidArgs)
os.Exit(1) os.Exit(1)

View File

@ -8,13 +8,10 @@ import (
"fmt" "fmt"
"io" "io"
"os" "os"
"strconv"
"strings" "strings"
"time" "time"
"github.com/go-redis/redis/v7" "github.com/hibiken/asynq"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/rdb"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/viper" "github.com/spf13/viper"
) )
@ -62,12 +59,11 @@ func ls(cmd *cobra.Command, args []string) {
fmt.Println("page number cannot be negative.") fmt.Println("page number cannot be negative.")
os.Exit(1) os.Exit(1)
} }
c := redis.NewClient(&redis.Options{ i := asynq.NewInspector(asynq.RedisClientOpt{
Addr: viper.GetString("uri"), Addr: viper.GetString("uri"),
DB: viper.GetInt("db"), DB: viper.GetInt("db"),
Password: viper.GetString("password"), Password: viper.GetString("password"),
}) })
r := rdb.NewRDB(c)
parts := strings.Split(args[0], ":") parts := strings.Split(args[0], ":")
switch parts[0] { switch parts[0] {
case "enqueued": case "enqueued":
@ -75,54 +71,23 @@ func ls(cmd *cobra.Command, args []string) {
fmt.Printf("error: Missing queue name\n`asynq ls enqueued:[queue name]`\n") fmt.Printf("error: Missing queue name\n`asynq ls enqueued:[queue name]`\n")
os.Exit(1) os.Exit(1)
} }
listEnqueued(r, parts[1]) listEnqueued(i, parts[1])
case "inprogress": case "inprogress":
listInProgress(r) listInProgress(i)
case "scheduled": case "scheduled":
listScheduled(r) listScheduled(i)
case "retry": case "retry":
listRetry(r) listRetry(i)
case "dead": case "dead":
listDead(r) listDead(i)
default: default:
fmt.Printf("error: `asynq ls [state]`\nonly accepts %v as the argument.\n", lsValidArgs) fmt.Printf("error: `asynq ls [state]`\nonly accepts %v as the argument.\n", lsValidArgs)
os.Exit(1) os.Exit(1)
} }
} }
// queryID returns an identifier used for "enq" command. func listEnqueued(i *asynq.Inspector, qname string) {
// score is the zset score and queryType should be one tasks, err := i.ListEnqueuedTasks(qname, asynq.PageSize(pageSize), asynq.Page(pageNum))
// of "s", "r" or "d" (scheduled, retry, dead respectively).
func queryID(id uuid.UUID, score int64, qtype string) string {
const format = "%v:%v:%v"
return fmt.Sprintf(format, qtype, score, id)
}
// parseQueryID is a reverse operation of queryID function.
// It takes a queryID and return each part of id with proper
// type if valid, otherwise it reports an error.
func parseQueryID(queryID string) (id uuid.UUID, score int64, qtype string, err error) {
parts := strings.Split(queryID, ":")
if len(parts) != 3 {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
id, err = uuid.Parse(parts[2])
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
score, err = strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
qtype = parts[0]
if len(qtype) != 1 || !strings.Contains("srd", qtype) {
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
}
return id, score, qtype, nil
}
func listEnqueued(r *rdb.RDB, qname string) {
tasks, err := r.ListEnqueued(qname, rdb.Pagination{Size: pageSize, Page: pageNum})
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
@ -132,17 +97,16 @@ func listEnqueued(r *rdb.RDB, qname string) {
return return
} }
cols := []string{"ID", "Type", "Payload", "Queue"} cols := []string{"ID", "Type", "Payload", "Queue"}
printRows := func(w io.Writer, tmpl string) { printTable(cols, func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, t.Queue) fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload, t.Queue)
} }
} })
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
} }
func listInProgress(r *rdb.RDB) { func listInProgress(i *asynq.Inspector) {
tasks, err := r.ListInProgress(rdb.Pagination{Size: pageSize, Page: pageNum}) tasks, err := i.ListInProgressTasks(asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
@ -152,17 +116,16 @@ func listInProgress(r *rdb.RDB) {
return return
} }
cols := []string{"ID", "Type", "Payload"} cols := []string{"ID", "Type", "Payload"}
printRows := func(w io.Writer, tmpl string) { printTable(cols, func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload) fmt.Fprintf(w, tmpl, t.ID, t.Type, t.Payload)
} }
} })
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
} }
func listScheduled(r *rdb.RDB) { func listScheduled(i *asynq.Inspector) {
tasks, err := r.ListScheduled(rdb.Pagination{Size: pageSize, Page: pageNum}) tasks, err := i.ListScheduledTasks(asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
@ -171,19 +134,19 @@ func listScheduled(r *rdb.RDB) {
fmt.Println("No scheduled tasks") fmt.Println("No scheduled tasks")
return return
} }
cols := []string{"ID", "Type", "Payload", "Process In", "Queue"} cols := []string{"Key", "Type", "Payload", "Process In", "Queue"}
printRows := func(w io.Writer, tmpl string) { printTable(cols, func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
processIn := fmt.Sprintf("%.0f seconds", t.ProcessAt.Sub(time.Now()).Seconds()) processIn := fmt.Sprintf("%.0f seconds",
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "s"), t.Type, t.Payload, processIn, t.Queue) t.NextEnqueueAt.Sub(time.Now()).Seconds())
fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, processIn, t.Queue)
} }
} })
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
} }
func listRetry(r *rdb.RDB) { func listRetry(i *asynq.Inspector) {
tasks, err := r.ListRetry(rdb.Pagination{Size: pageSize, Page: pageNum}) tasks, err := i.ListRetryTasks(asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
@ -192,24 +155,23 @@ func listRetry(r *rdb.RDB) {
fmt.Println("No retry tasks") fmt.Println("No retry tasks")
return return
} }
cols := []string{"ID", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry", "Queue"} cols := []string{"Key", "Type", "Payload", "Next Retry", "Last Error", "Retried", "Max Retry", "Queue"}
printRows := func(w io.Writer, tmpl string) { printTable(cols, func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
var nextRetry string var nextRetry string
if d := t.ProcessAt.Sub(time.Now()); d > 0 { if d := t.NextEnqueueAt.Sub(time.Now()); d > 0 {
nextRetry = fmt.Sprintf("in %v", d.Round(time.Second)) nextRetry = fmt.Sprintf("in %v", d.Round(time.Second))
} else { } else {
nextRetry = "right now" nextRetry = "right now"
} }
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "r"), t.Type, t.Payload, nextRetry, t.ErrorMsg, t.Retried, t.Retry, t.Queue) fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, nextRetry, t.ErrorMsg, t.Retried, t.MaxRetry, t.Queue)
} }
} })
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
} }
func listDead(r *rdb.RDB) { func listDead(i *asynq.Inspector) {
tasks, err := r.ListDead(rdb.Pagination{Size: pageSize, Page: pageNum}) tasks, err := i.ListDeadTasks(asynq.PageSize(pageSize), asynq.Page(pageNum))
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
os.Exit(1) os.Exit(1)
@ -218,12 +180,11 @@ func listDead(r *rdb.RDB) {
fmt.Println("No dead tasks") fmt.Println("No dead tasks")
return return
} }
cols := []string{"ID", "Type", "Payload", "Last Failed", "Last Error", "Queue"} cols := []string{"Key", "Type", "Payload", "Last Failed", "Last Error", "Queue"}
printRows := func(w io.Writer, tmpl string) { printTable(cols, func(w io.Writer, tmpl string) {
for _, t := range tasks { for _, t := range tasks {
fmt.Fprintf(w, tmpl, queryID(t.ID, t.Score, "d"), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg, t.Queue) fmt.Fprintf(w, tmpl, t.Key(), t.Type, t.Payload, t.LastFailedAt, t.ErrorMsg, t.Queue)
} }
} })
printTable(cols, printRows)
fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum) fmt.Printf("\nShowing %d tasks from page %d\n", len(tasks), pageNum)
} }

View File

@ -166,6 +166,7 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4 h1:SvFZT6jyqRaOeXpc5h/JSfZenJ2O330aBsf7JfSUXmQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=