2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 00:02:19 +08:00

Move all inspector related code to subpackage inspeq

This commit is contained in:
Ken Hibino 2021-01-28 08:59:13 -08:00
parent eba7c4e085
commit e13122723a
6 changed files with 627 additions and 469 deletions

View File

@ -7,8 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
**IMPORTATNT**: All `Inspector` related code are moved to subpackage "github.com/hibiken/asynq/inspeq"
### Changed ### Changed
- `Inspector` related code are moved to subpackage "github.com/hibken/asynq/inspeq".
- `ErrorMsg` field in `RetryTask` and `ArchivedTask` was renamed to `LastError`. - `ErrorMsg` field in `RetryTask` and `ArchivedTask` was renamed to `LastError`.
### Added ### Added

View File

@ -7,7 +7,6 @@ package asynq
import ( import (
"errors" "errors"
"fmt" "fmt"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@ -172,73 +171,6 @@ func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v)
func (d processInOption) Type() OptionType { return ProcessInOpt } func (d processInOption) Type() OptionType { return ProcessInOpt }
func (d processInOption) Value() interface{} { return time.Duration(d) } func (d processInOption) Value() interface{} { return time.Duration(d) }
// parseOption interprets a string s as an Option and returns the Option if parsing is successful,
// otherwise returns non-nil error.
func parseOption(s string) (Option, error) {
fn, arg := parseOptionFunc(s), parseOptionArg(s)
switch fn {
case "Queue":
qname, err := strconv.Unquote(arg)
if err != nil {
return nil, err
}
return Queue(qname), nil
case "MaxRetry":
n, err := strconv.Atoi(arg)
if err != nil {
return nil, err
}
return MaxRetry(n), nil
case "Timeout":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return Timeout(d), nil
case "Deadline":
t, err := time.Parse(time.UnixDate, arg)
if err != nil {
return nil, err
}
return Deadline(t), nil
case "Unique":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return Unique(d), nil
case "ProcessAt":
t, err := time.Parse(time.UnixDate, arg)
if err != nil {
return nil, err
}
return ProcessAt(t), nil
case "ProcessIn":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return ProcessIn(d), nil
default:
return nil, fmt.Errorf("cannot not parse option string %q", s)
}
}
func parseOptionFunc(s string) string {
i := strings.Index(s, "(")
return s[:i]
}
func parseOptionArg(s string) string {
i := strings.Index(s, "(")
if i >= 0 {
j := strings.Index(s, ")")
if j > i {
return s[i+1 : j]
}
}
return ""
}
// ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task. // ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task.
// //
@ -272,7 +204,7 @@ func composeOptions(opts ...Option) (option, error) {
res.retry = int(opt) res.retry = int(opt)
case queueOption: case queueOption:
trimmed := strings.TrimSpace(string(opt)) trimmed := strings.TrimSpace(string(opt))
if err := validateQueueName(trimmed); err != nil { if err := base.ValidateQueueName(trimmed); err != nil {
return option{}, err return option{}, err
} }
res.queue = trimmed res.queue = trimmed
@ -293,13 +225,6 @@ func composeOptions(opts ...Option) (option, error) {
return res, nil return res, nil
} }
func validateQueueName(qname string) error {
if len(qname) == 0 {
return fmt.Errorf("queue name must contain one or more characters")
}
return nil
}
const ( const (
// Default max retry count used if nothing is specified. // Default max retry count used if nothing is specified.
defaultMaxRetry = 25 defaultMaxRetry = 25

View File

@ -775,70 +775,3 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
} }
} }
func TestParseOption(t *testing.T) {
oneHourFromNow := time.Now().Add(1 * time.Hour)
tests := []struct {
s string
wantType OptionType
wantVal interface{}
}{
{`MaxRetry(10)`, MaxRetryOpt, 10},
{`Queue("email")`, QueueOpt, "email"},
{`Timeout(3m)`, TimeoutOpt, 3 * time.Minute},
{Deadline(oneHourFromNow).String(), DeadlineOpt, oneHourFromNow},
{`Unique(1h)`, UniqueOpt, 1 * time.Hour},
{ProcessAt(oneHourFromNow).String(), ProcessAtOpt, oneHourFromNow},
{`ProcessIn(10m)`, ProcessInOpt, 10 * time.Minute},
}
for _, tc := range tests {
t.Run(tc.s, func(t *testing.T) {
got, err := parseOption(tc.s)
if err != nil {
t.Fatalf("returned error: %v", err)
}
if got == nil {
t.Fatal("returned nil")
}
if got.Type() != tc.wantType {
t.Fatalf("got type %v, want type %v ", got.Type(), tc.wantType)
}
switch tc.wantType {
case QueueOpt:
gotVal, ok := got.Value().(string)
if !ok {
t.Fatal("returned Option with non-string value")
}
if gotVal != tc.wantVal.(string) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
case MaxRetryOpt:
gotVal, ok := got.Value().(int)
if !ok {
t.Fatal("returned Option with non-int value")
}
if gotVal != tc.wantVal.(int) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
case TimeoutOpt, UniqueOpt, ProcessInOpt:
gotVal, ok := got.Value().(time.Duration)
if !ok {
t.Fatal("returned Option with non duration value")
}
if gotVal != tc.wantVal.(time.Duration) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
case DeadlineOpt, ProcessAtOpt:
gotVal, ok := got.Value().(time.Time)
if !ok {
t.Fatal("returned Option with non time value")
}
if cmp.Equal(gotVal, tc.wantVal.(time.Time)) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
default:
t.Fatalf("returned Option with unexpected type: %v", got.Type())
}
})
}
}

View File

@ -2,7 +2,7 @@
// Use of this source code is governed by a MIT license // Use of this source code is governed by a MIT license
// that can be found in the LICENSE file. // that can be found in the LICENSE file.
package asynq package inspeq
import ( import (
"fmt" "fmt"
@ -10,7 +10,10 @@ import (
"strings" "strings"
"time" "time"
"github.com/go-redis/redis/v7"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb" "github.com/hibiken/asynq/internal/rdb"
) )
@ -20,13 +23,81 @@ type Inspector struct {
rdb *rdb.RDB rdb *rdb.RDB
} }
// NewInspector returns a new instance of Inspector. // New returns a new instance of Inspector.
func NewInspector(r RedisConnOpt) *Inspector { func New(r asynq.RedisConnOpt) *Inspector {
return &Inspector{ return &Inspector{
rdb: rdb.NewRDB(createRedisClient(r)), rdb: rdb.NewRDB(createRedisClient(r)),
} }
} }
// createRedisClient returns a redis client given a redis connection configuration.
//
// Passing an unexpected type as a RedisConnOpt argument will cause panic.
func createRedisClient(r asynq.RedisConnOpt) redis.UniversalClient {
switch r := r.(type) {
case *asynq.RedisClientOpt:
return redis.NewClient(&redis.Options{
Network: r.Network,
Addr: r.Addr,
Username: r.Username,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case asynq.RedisClientOpt:
return redis.NewClient(&redis.Options{
Network: r.Network,
Addr: r.Addr,
Username: r.Username,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case *asynq.RedisFailoverClientOpt:
return redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: r.MasterName,
SentinelAddrs: r.SentinelAddrs,
SentinelPassword: r.SentinelPassword,
Username: r.Username,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case asynq.RedisFailoverClientOpt:
return redis.NewFailoverClient(&redis.FailoverOptions{
MasterName: r.MasterName,
SentinelAddrs: r.SentinelAddrs,
SentinelPassword: r.SentinelPassword,
Username: r.Username,
Password: r.Password,
DB: r.DB,
PoolSize: r.PoolSize,
TLSConfig: r.TLSConfig,
})
case asynq.RedisClusterClientOpt:
return redis.NewClusterClient(&redis.ClusterOptions{
Addrs: r.Addrs,
MaxRedirects: r.MaxRedirects,
Username: r.Username,
Password: r.Password,
TLSConfig: r.TLSConfig,
})
case *asynq.RedisClusterClientOpt:
return redis.NewClusterClient(&redis.ClusterOptions{
Addrs: r.Addrs,
MaxRedirects: r.MaxRedirects,
Username: r.Username,
Password: r.Password,
TLSConfig: r.TLSConfig,
})
default:
panic(fmt.Sprintf("inspeq: unexpected type %T for RedisConnOpt", r))
}
}
// Close closes the connection with redis. // Close closes the connection with redis.
func (i *Inspector) Close() error { func (i *Inspector) Close() error {
return i.rdb.Close() return i.rdb.Close()
@ -70,7 +141,7 @@ type QueueStats struct {
// CurrentStats returns a current stats of the given queue. // CurrentStats returns a current stats of the given queue.
func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) { func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return nil, err return nil, err
} }
stats, err := i.rdb.CurrentStats(qname) stats, err := i.rdb.CurrentStats(qname)
@ -108,7 +179,7 @@ type DailyStats struct {
// History returns a list of stats from the last n days. // History returns a list of stats from the last n days.
func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) { func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return nil, err return nil, err
} }
stats, err := i.rdb.HistoricalStats(qname, n) stats, err := i.rdb.HistoricalStats(qname, n)
@ -168,7 +239,7 @@ func (i *Inspector) DeleteQueue(qname string, force bool) error {
// PendingTask is a task in a queue and is ready to be processed. // PendingTask is a task in a queue and is ready to be processed.
type PendingTask struct { type PendingTask struct {
*Task *asynq.Task
ID string ID string
Queue string Queue string
MaxRetry int MaxRetry int
@ -178,7 +249,7 @@ type PendingTask struct {
// ActiveTask is a task that's currently being processed. // ActiveTask is a task that's currently being processed.
type ActiveTask struct { type ActiveTask struct {
*Task *asynq.Task
ID string ID string
Queue string Queue string
MaxRetry int MaxRetry int
@ -188,7 +259,7 @@ type ActiveTask struct {
// ScheduledTask is a task scheduled to be processed in the future. // ScheduledTask is a task scheduled to be processed in the future.
type ScheduledTask struct { type ScheduledTask struct {
*Task *asynq.Task
ID string ID string
Queue string Queue string
MaxRetry int MaxRetry int
@ -201,7 +272,7 @@ type ScheduledTask struct {
// RetryTask is a task scheduled to be retried in the future. // RetryTask is a task scheduled to be retried in the future.
type RetryTask struct { type RetryTask struct {
*Task *asynq.Task
ID string ID string
Queue string Queue string
NextProcessAt time.Time NextProcessAt time.Time
@ -218,7 +289,7 @@ type RetryTask struct {
// A task can be archived when the task exhausts its retry counts or manually // A task can be archived when the task exhausts its retry counts or manually
// archived by a user via the CLI or Inspector. // archived by a user via the CLI or Inspector.
type ArchivedTask struct { type ArchivedTask struct {
*Task *asynq.Task
ID string ID string
Queue string Queue string
MaxRetry int MaxRetry int
@ -352,7 +423,7 @@ func Page(n int) ListOption {
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*PendingTask, error) { func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*PendingTask, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return nil, err return nil, err
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
@ -364,7 +435,7 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi
var tasks []*PendingTask var tasks []*PendingTask
for _, m := range msgs { for _, m := range msgs {
tasks = append(tasks, &PendingTask{ tasks = append(tasks, &PendingTask{
Task: NewTask(m.Type, m.Payload), Task: asynq.NewTask(m.Type, m.Payload),
ID: m.ID.String(), ID: m.ID.String(),
Queue: m.Queue, Queue: m.Queue,
MaxRetry: m.Retry, MaxRetry: m.Retry,
@ -379,7 +450,7 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*ActiveTask, error) { func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*ActiveTask, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return nil, err return nil, err
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
@ -392,7 +463,7 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*Active
for _, m := range msgs { for _, m := range msgs {
tasks = append(tasks, &ActiveTask{ tasks = append(tasks, &ActiveTask{
Task: NewTask(m.Type, m.Payload), Task: asynq.NewTask(m.Type, m.Payload),
ID: m.ID.String(), ID: m.ID.String(),
Queue: m.Queue, Queue: m.Queue,
MaxRetry: m.Retry, MaxRetry: m.Retry,
@ -408,7 +479,7 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*Active
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error) { func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return nil, err return nil, err
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
@ -420,7 +491,7 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Sch
var tasks []*ScheduledTask var tasks []*ScheduledTask
for _, z := range zs { for _, z := range zs {
processAt := time.Unix(z.Score, 0) processAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload) t := asynq.NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &ScheduledTask{ tasks = append(tasks, &ScheduledTask{
Task: t, Task: t,
ID: z.Message.ID.String(), ID: z.Message.ID.String(),
@ -440,7 +511,7 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Sch
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error) { func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return nil, err return nil, err
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
@ -452,7 +523,7 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa
var tasks []*RetryTask var tasks []*RetryTask
for _, z := range zs { for _, z := range zs {
processAt := time.Unix(z.Score, 0) processAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload) t := asynq.NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &RetryTask{ tasks = append(tasks, &RetryTask{
Task: t, Task: t,
ID: z.Message.ID.String(), ID: z.Message.ID.String(),
@ -473,7 +544,7 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa
// //
// By default, it retrieves the first 30 tasks. // By default, it retrieves the first 30 tasks.
func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*ArchivedTask, error) { func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*ArchivedTask, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return nil, err return nil, err
} }
opt := composeListOptions(opts...) opt := composeListOptions(opts...)
@ -485,7 +556,7 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Arch
var tasks []*ArchivedTask var tasks []*ArchivedTask
for _, z := range zs { for _, z := range zs {
failedAt := time.Unix(z.Score, 0) failedAt := time.Unix(z.Score, 0)
t := NewTask(z.Message.Type, z.Message.Payload) t := asynq.NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &ArchivedTask{ tasks = append(tasks, &ArchivedTask{
Task: t, Task: t,
ID: z.Message.ID.String(), ID: z.Message.ID.String(),
@ -503,7 +574,7 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Arch
// DeleteAllPendingTasks deletes all pending tasks from the specified queue, // DeleteAllPendingTasks deletes all pending tasks from the specified queue,
// and reports the number tasks deleted. // and reports the number tasks deleted.
func (i *Inspector) DeleteAllPendingTasks(qname string) (int, error) { func (i *Inspector) DeleteAllPendingTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.DeleteAllPendingTasks(qname) n, err := i.rdb.DeleteAllPendingTasks(qname)
@ -513,7 +584,7 @@ func (i *Inspector) DeleteAllPendingTasks(qname string) (int, error) {
// DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue, // DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue,
// and reports the number tasks deleted. // and reports the number tasks deleted.
func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) { func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.DeleteAllScheduledTasks(qname) n, err := i.rdb.DeleteAllScheduledTasks(qname)
@ -523,7 +594,7 @@ func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) {
// DeleteAllRetryTasks deletes all retry tasks from the specified queue, // DeleteAllRetryTasks deletes all retry tasks from the specified queue,
// and reports the number tasks deleted. // and reports the number tasks deleted.
func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) { func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.DeleteAllRetryTasks(qname) n, err := i.rdb.DeleteAllRetryTasks(qname)
@ -533,7 +604,7 @@ func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) {
// DeleteAllArchivedTasks deletes all archived tasks from the specified queue, // DeleteAllArchivedTasks deletes all archived tasks from the specified queue,
// and reports the number tasks deleted. // and reports the number tasks deleted.
func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) { func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.DeleteAllArchivedTasks(qname) n, err := i.rdb.DeleteAllArchivedTasks(qname)
@ -542,7 +613,7 @@ func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) {
// DeleteTaskByKey deletes a task with the given key from the given queue. // DeleteTaskByKey deletes a task with the given key from the given queue.
func (i *Inspector) DeleteTaskByKey(qname, key string) error { func (i *Inspector) DeleteTaskByKey(qname, key string) error {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return err return err
} }
prefix, id, score, err := parseTaskKey(key) prefix, id, score, err := parseTaskKey(key)
@ -566,7 +637,7 @@ func (i *Inspector) DeleteTaskByKey(qname, key string) error {
// RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue, // RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue,
// and reports the number of tasks transitioned. // and reports the number of tasks transitioned.
func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) { func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.RunAllScheduledTasks(qname) n, err := i.rdb.RunAllScheduledTasks(qname)
@ -576,7 +647,7 @@ func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) {
// RunAllRetryTasks transition all retry tasks to pending state from the given queue, // RunAllRetryTasks transition all retry tasks to pending state from the given queue,
// and reports the number of tasks transitioned. // and reports the number of tasks transitioned.
func (i *Inspector) RunAllRetryTasks(qname string) (int, error) { func (i *Inspector) RunAllRetryTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.RunAllRetryTasks(qname) n, err := i.rdb.RunAllRetryTasks(qname)
@ -586,7 +657,7 @@ func (i *Inspector) RunAllRetryTasks(qname string) (int, error) {
// RunAllArchivedTasks transition all archived tasks to pending state from the given queue, // RunAllArchivedTasks transition all archived tasks to pending state from the given queue,
// and reports the number of tasks transitioned. // and reports the number of tasks transitioned.
func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) { func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.RunAllArchivedTasks(qname) n, err := i.rdb.RunAllArchivedTasks(qname)
@ -595,7 +666,7 @@ func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) {
// RunTaskByKey transition a task to pending state given task key and queue name. // RunTaskByKey transition a task to pending state given task key and queue name.
func (i *Inspector) RunTaskByKey(qname, key string) error { func (i *Inspector) RunTaskByKey(qname, key string) error {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return err return err
} }
prefix, id, score, err := parseTaskKey(key) prefix, id, score, err := parseTaskKey(key)
@ -619,7 +690,7 @@ func (i *Inspector) RunTaskByKey(qname, key string) error {
// ArchiveAllPendingTasks archives all pending tasks from the given queue, // ArchiveAllPendingTasks archives all pending tasks from the given queue,
// and reports the number of tasks archived. // and reports the number of tasks archived.
func (i *Inspector) ArchiveAllPendingTasks(qname string) (int, error) { func (i *Inspector) ArchiveAllPendingTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.ArchiveAllPendingTasks(qname) n, err := i.rdb.ArchiveAllPendingTasks(qname)
@ -629,7 +700,7 @@ func (i *Inspector) ArchiveAllPendingTasks(qname string) (int, error) {
// ArchiveAllScheduledTasks archives all scheduled tasks from the given queue, // ArchiveAllScheduledTasks archives all scheduled tasks from the given queue,
// and reports the number of tasks archiveed. // and reports the number of tasks archiveed.
func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error) { func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.ArchiveAllScheduledTasks(qname) n, err := i.rdb.ArchiveAllScheduledTasks(qname)
@ -639,7 +710,7 @@ func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error) {
// ArchiveAllRetryTasks archives all retry tasks from the given queue, // ArchiveAllRetryTasks archives all retry tasks from the given queue,
// and reports the number of tasks archiveed. // and reports the number of tasks archiveed.
func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) { func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return 0, err return 0, err
} }
n, err := i.rdb.ArchiveAllRetryTasks(qname) n, err := i.rdb.ArchiveAllRetryTasks(qname)
@ -648,7 +719,7 @@ func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) {
// ArchiveTaskByKey archives a task with the given key in the given queue. // ArchiveTaskByKey archives a task with the given key in the given queue.
func (i *Inspector) ArchiveTaskByKey(qname, key string) error { func (i *Inspector) ArchiveTaskByKey(qname, key string) error {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return err return err
} }
prefix, id, score, err := parseTaskKey(key) prefix, id, score, err := parseTaskKey(key)
@ -680,7 +751,7 @@ func (i *Inspector) CancelActiveTask(id string) error {
// PauseQueue pauses task processing on the specified queue. // PauseQueue pauses task processing on the specified queue.
// If the queue is already paused, it will return a non-nil error. // If the queue is already paused, it will return a non-nil error.
func (i *Inspector) PauseQueue(qname string) error { func (i *Inspector) PauseQueue(qname string) error {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return err return err
} }
return i.rdb.Pause(qname) return i.rdb.Pause(qname)
@ -689,7 +760,7 @@ func (i *Inspector) PauseQueue(qname string) error {
// UnpauseQueue resumes task processing on the specified queue. // UnpauseQueue resumes task processing on the specified queue.
// If the queue is not paused, it will return a non-nil error. // If the queue is not paused, it will return a non-nil error.
func (i *Inspector) UnpauseQueue(qname string) error { func (i *Inspector) UnpauseQueue(qname string) error {
if err := validateQueueName(qname); err != nil { if err := base.ValidateQueueName(qname); err != nil {
return err return err
} }
return i.rdb.Unpause(qname) return i.rdb.Unpause(qname)
@ -728,7 +799,7 @@ func (i *Inspector) Servers() ([]*ServerInfo, error) {
Started: w.Started, Started: w.Started,
Deadline: w.Deadline, Deadline: w.Deadline,
Task: &ActiveTask{ Task: &ActiveTask{
Task: NewTask(w.Type, w.Payload), Task: asynq.NewTask(w.Type, w.Payload),
ID: w.ID, ID: w.ID,
Queue: w.Queue, Queue: w.Queue,
}, },
@ -812,10 +883,10 @@ type SchedulerEntry struct {
Spec string Spec string
// Periodic Task registered for this entry. // Periodic Task registered for this entry.
Task *Task Task *asynq.Task
// Opts is the options for the periodic task. // Opts is the options for the periodic task.
Opts []Option Opts []asynq.Option
// Next shows the next time the task will be enqueued. // Next shows the next time the task will be enqueued.
Next time.Time Next time.Time
@ -834,8 +905,8 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {
return nil, err return nil, err
} }
for _, e := range res { for _, e := range res {
task := NewTask(e.Type, e.Payload) task := asynq.NewTask(e.Type, e.Payload)
var opts []Option var opts []asynq.Option
for _, s := range e.Opts { for _, s := range e.Opts {
if o, err := parseOption(s); err == nil { if o, err := parseOption(s); err == nil {
// ignore bad data // ignore bad data
@ -854,6 +925,74 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {
return entries, nil return entries, nil
} }
// parseOption interprets a string s as an Option and returns the Option if parsing is successful,
// otherwise returns non-nil error.
func parseOption(s string) (asynq.Option, error) {
fn, arg := parseOptionFunc(s), parseOptionArg(s)
switch fn {
case "Queue":
qname, err := strconv.Unquote(arg)
if err != nil {
return nil, err
}
return asynq.Queue(qname), nil
case "MaxRetry":
n, err := strconv.Atoi(arg)
if err != nil {
return nil, err
}
return asynq.MaxRetry(n), nil
case "Timeout":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return asynq.Timeout(d), nil
case "Deadline":
t, err := time.Parse(time.UnixDate, arg)
if err != nil {
return nil, err
}
return asynq.Deadline(t), nil
case "Unique":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return asynq.Unique(d), nil
case "ProcessAt":
t, err := time.Parse(time.UnixDate, arg)
if err != nil {
return nil, err
}
return asynq.ProcessAt(t), nil
case "ProcessIn":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return asynq.ProcessIn(d), nil
default:
return nil, fmt.Errorf("cannot not parse option string %q", s)
}
}
func parseOptionFunc(s string) string {
i := strings.Index(s, "(")
return s[:i]
}
func parseOptionArg(s string) string {
i := strings.Index(s, "(")
if i >= 0 {
j := strings.Index(s, ")")
if j > i {
return s[i+1 : j]
}
}
return ""
}
// SchedulerEnqueueEvent holds information about an enqueue event by a scheduler. // SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.
type SchedulerEnqueueEvent struct { type SchedulerEnqueueEvent struct {
// ID of the task that was enqueued. // ID of the task that was enqueued.

File diff suppressed because it is too large Load Diff

View File

@ -36,6 +36,15 @@ const (
CancelChannel = "asynq:cancel" // PubSub channel CancelChannel = "asynq:cancel" // PubSub channel
) )
// ValidateQueueName validates a given qname to be used as a queue name.
// Returns nil if valid, otherwise returns non-nil error.
func ValidateQueueName(qname string) error {
if len(qname) == 0 {
return fmt.Errorf("queue name must contain one or more characters")
}
return nil
}
// QueueKey returns a redis key for the given queue name. // QueueKey returns a redis key for the given queue name.
func QueueKey(qname string) string { func QueueKey(qname string) string {
return fmt.Sprintf("asynq:{%s}", qname) return fmt.Sprintf("asynq:{%s}", qname)