diff --git a/CHANGELOG.md b/CHANGELOG.md index 5250c75..a488762 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,8 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +**IMPORTATNT**: All `Inspector` related code are moved to subpackage "github.com/hibiken/asynq/inspeq" + ### Changed +- `Inspector` related code are moved to subpackage "github.com/hibken/asynq/inspeq". - `ErrorMsg` field in `RetryTask` and `ArchivedTask` was renamed to `LastError`. ### Added diff --git a/client.go b/client.go index bc53876..010d4b7 100644 --- a/client.go +++ b/client.go @@ -7,7 +7,6 @@ package asynq import ( "errors" "fmt" - "strconv" "strings" "sync" "time" @@ -172,73 +171,6 @@ func (d processInOption) String() string { return fmt.Sprintf("ProcessIn(%v) func (d processInOption) Type() OptionType { return ProcessInOpt } func (d processInOption) Value() interface{} { return time.Duration(d) } -// parseOption interprets a string s as an Option and returns the Option if parsing is successful, -// otherwise returns non-nil error. -func parseOption(s string) (Option, error) { - fn, arg := parseOptionFunc(s), parseOptionArg(s) - switch fn { - case "Queue": - qname, err := strconv.Unquote(arg) - if err != nil { - return nil, err - } - return Queue(qname), nil - case "MaxRetry": - n, err := strconv.Atoi(arg) - if err != nil { - return nil, err - } - return MaxRetry(n), nil - case "Timeout": - d, err := time.ParseDuration(arg) - if err != nil { - return nil, err - } - return Timeout(d), nil - case "Deadline": - t, err := time.Parse(time.UnixDate, arg) - if err != nil { - return nil, err - } - return Deadline(t), nil - case "Unique": - d, err := time.ParseDuration(arg) - if err != nil { - return nil, err - } - return Unique(d), nil - case "ProcessAt": - t, err := time.Parse(time.UnixDate, arg) - if err != nil { - return nil, err - } - return ProcessAt(t), nil - case "ProcessIn": - d, err := time.ParseDuration(arg) - if err != nil { - return nil, err - } - return ProcessIn(d), nil - default: - return nil, fmt.Errorf("cannot not parse option string %q", s) - } -} - -func parseOptionFunc(s string) string { - i := strings.Index(s, "(") - return s[:i] -} - -func parseOptionArg(s string) string { - i := strings.Index(s, "(") - if i >= 0 { - j := strings.Index(s, ")") - if j > i { - return s[i+1 : j] - } - } - return "" -} // ErrDuplicateTask indicates that the given task could not be enqueued since it's a duplicate of another task. // @@ -272,7 +204,7 @@ func composeOptions(opts ...Option) (option, error) { res.retry = int(opt) case queueOption: trimmed := strings.TrimSpace(string(opt)) - if err := validateQueueName(trimmed); err != nil { + if err := base.ValidateQueueName(trimmed); err != nil { return option{}, err } res.queue = trimmed @@ -293,13 +225,6 @@ func composeOptions(opts ...Option) (option, error) { return res, nil } -func validateQueueName(qname string) error { - if len(qname) == 0 { - return fmt.Errorf("queue name must contain one or more characters") - } - return nil -} - const ( // Default max retry count used if nothing is specified. defaultMaxRetry = 25 diff --git a/client_test.go b/client_test.go index de59b99..698efa6 100644 --- a/client_test.go +++ b/client_test.go @@ -775,70 +775,3 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) { } } -func TestParseOption(t *testing.T) { - oneHourFromNow := time.Now().Add(1 * time.Hour) - tests := []struct { - s string - wantType OptionType - wantVal interface{} - }{ - {`MaxRetry(10)`, MaxRetryOpt, 10}, - {`Queue("email")`, QueueOpt, "email"}, - {`Timeout(3m)`, TimeoutOpt, 3 * time.Minute}, - {Deadline(oneHourFromNow).String(), DeadlineOpt, oneHourFromNow}, - {`Unique(1h)`, UniqueOpt, 1 * time.Hour}, - {ProcessAt(oneHourFromNow).String(), ProcessAtOpt, oneHourFromNow}, - {`ProcessIn(10m)`, ProcessInOpt, 10 * time.Minute}, - } - - for _, tc := range tests { - t.Run(tc.s, func(t *testing.T) { - got, err := parseOption(tc.s) - if err != nil { - t.Fatalf("returned error: %v", err) - } - if got == nil { - t.Fatal("returned nil") - } - if got.Type() != tc.wantType { - t.Fatalf("got type %v, want type %v ", got.Type(), tc.wantType) - } - switch tc.wantType { - case QueueOpt: - gotVal, ok := got.Value().(string) - if !ok { - t.Fatal("returned Option with non-string value") - } - if gotVal != tc.wantVal.(string) { - t.Fatalf("got value %v, want %v", gotVal, tc.wantVal) - } - case MaxRetryOpt: - gotVal, ok := got.Value().(int) - if !ok { - t.Fatal("returned Option with non-int value") - } - if gotVal != tc.wantVal.(int) { - t.Fatalf("got value %v, want %v", gotVal, tc.wantVal) - } - case TimeoutOpt, UniqueOpt, ProcessInOpt: - gotVal, ok := got.Value().(time.Duration) - if !ok { - t.Fatal("returned Option with non duration value") - } - if gotVal != tc.wantVal.(time.Duration) { - t.Fatalf("got value %v, want %v", gotVal, tc.wantVal) - } - case DeadlineOpt, ProcessAtOpt: - gotVal, ok := got.Value().(time.Time) - if !ok { - t.Fatal("returned Option with non time value") - } - if cmp.Equal(gotVal, tc.wantVal.(time.Time)) { - t.Fatalf("got value %v, want %v", gotVal, tc.wantVal) - } - default: - t.Fatalf("returned Option with unexpected type: %v", got.Type()) - } - }) - } -} diff --git a/inspector.go b/inspeq/inspector.go similarity index 80% rename from inspector.go rename to inspeq/inspector.go index 3cef876..348e8f7 100644 --- a/inspector.go +++ b/inspeq/inspector.go @@ -2,7 +2,7 @@ // Use of this source code is governed by a MIT license // that can be found in the LICENSE file. -package asynq +package inspeq import ( "fmt" @@ -10,7 +10,10 @@ import ( "strings" "time" + "github.com/go-redis/redis/v7" "github.com/google/uuid" + "github.com/hibiken/asynq" + "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" ) @@ -20,13 +23,81 @@ type Inspector struct { rdb *rdb.RDB } -// NewInspector returns a new instance of Inspector. -func NewInspector(r RedisConnOpt) *Inspector { +// New returns a new instance of Inspector. +func New(r asynq.RedisConnOpt) *Inspector { return &Inspector{ rdb: rdb.NewRDB(createRedisClient(r)), } } +// createRedisClient returns a redis client given a redis connection configuration. +// +// Passing an unexpected type as a RedisConnOpt argument will cause panic. +func createRedisClient(r asynq.RedisConnOpt) redis.UniversalClient { + switch r := r.(type) { + case *asynq.RedisClientOpt: + return redis.NewClient(&redis.Options{ + Network: r.Network, + Addr: r.Addr, + Username: r.Username, + Password: r.Password, + DB: r.DB, + PoolSize: r.PoolSize, + TLSConfig: r.TLSConfig, + }) + case asynq.RedisClientOpt: + return redis.NewClient(&redis.Options{ + Network: r.Network, + Addr: r.Addr, + Username: r.Username, + Password: r.Password, + DB: r.DB, + PoolSize: r.PoolSize, + TLSConfig: r.TLSConfig, + }) + case *asynq.RedisFailoverClientOpt: + return redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: r.MasterName, + SentinelAddrs: r.SentinelAddrs, + SentinelPassword: r.SentinelPassword, + Username: r.Username, + Password: r.Password, + DB: r.DB, + PoolSize: r.PoolSize, + TLSConfig: r.TLSConfig, + }) + case asynq.RedisFailoverClientOpt: + return redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: r.MasterName, + SentinelAddrs: r.SentinelAddrs, + SentinelPassword: r.SentinelPassword, + Username: r.Username, + Password: r.Password, + DB: r.DB, + PoolSize: r.PoolSize, + TLSConfig: r.TLSConfig, + }) + case asynq.RedisClusterClientOpt: + return redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: r.Addrs, + MaxRedirects: r.MaxRedirects, + Username: r.Username, + Password: r.Password, + TLSConfig: r.TLSConfig, + }) + case *asynq.RedisClusterClientOpt: + return redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: r.Addrs, + MaxRedirects: r.MaxRedirects, + Username: r.Username, + Password: r.Password, + TLSConfig: r.TLSConfig, + }) + default: + panic(fmt.Sprintf("inspeq: unexpected type %T for RedisConnOpt", r)) + } +} + // Close closes the connection with redis. func (i *Inspector) Close() error { return i.rdb.Close() @@ -70,7 +141,7 @@ type QueueStats struct { // CurrentStats returns a current stats of the given queue. func (i *Inspector) CurrentStats(qname string) (*QueueStats, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return nil, err } stats, err := i.rdb.CurrentStats(qname) @@ -108,7 +179,7 @@ type DailyStats struct { // History returns a list of stats from the last n days. func (i *Inspector) History(qname string, n int) ([]*DailyStats, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return nil, err } stats, err := i.rdb.HistoricalStats(qname, n) @@ -168,7 +239,7 @@ func (i *Inspector) DeleteQueue(qname string, force bool) error { // PendingTask is a task in a queue and is ready to be processed. type PendingTask struct { - *Task + *asynq.Task ID string Queue string MaxRetry int @@ -178,7 +249,7 @@ type PendingTask struct { // ActiveTask is a task that's currently being processed. type ActiveTask struct { - *Task + *asynq.Task ID string Queue string MaxRetry int @@ -188,7 +259,7 @@ type ActiveTask struct { // ScheduledTask is a task scheduled to be processed in the future. type ScheduledTask struct { - *Task + *asynq.Task ID string Queue string MaxRetry int @@ -201,7 +272,7 @@ type ScheduledTask struct { // RetryTask is a task scheduled to be retried in the future. type RetryTask struct { - *Task + *asynq.Task ID string Queue string NextProcessAt time.Time @@ -218,7 +289,7 @@ type RetryTask struct { // A task can be archived when the task exhausts its retry counts or manually // archived by a user via the CLI or Inspector. type ArchivedTask struct { - *Task + *asynq.Task ID string Queue string MaxRetry int @@ -352,7 +423,7 @@ func Page(n int) ListOption { // // By default, it retrieves the first 30 tasks. func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*PendingTask, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return nil, err } opt := composeListOptions(opts...) @@ -364,7 +435,7 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi var tasks []*PendingTask for _, m := range msgs { tasks = append(tasks, &PendingTask{ - Task: NewTask(m.Type, m.Payload), + Task: asynq.NewTask(m.Type, m.Payload), ID: m.ID.String(), Queue: m.Queue, MaxRetry: m.Retry, @@ -379,7 +450,7 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi // // By default, it retrieves the first 30 tasks. func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*ActiveTask, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return nil, err } opt := composeListOptions(opts...) @@ -392,7 +463,7 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*Active for _, m := range msgs { tasks = append(tasks, &ActiveTask{ - Task: NewTask(m.Type, m.Payload), + Task: asynq.NewTask(m.Type, m.Payload), ID: m.ID.String(), Queue: m.Queue, MaxRetry: m.Retry, @@ -408,7 +479,7 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*Active // // By default, it retrieves the first 30 tasks. func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*ScheduledTask, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return nil, err } opt := composeListOptions(opts...) @@ -420,7 +491,7 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Sch var tasks []*ScheduledTask for _, z := range zs { processAt := time.Unix(z.Score, 0) - t := NewTask(z.Message.Type, z.Message.Payload) + t := asynq.NewTask(z.Message.Type, z.Message.Payload) tasks = append(tasks, &ScheduledTask{ Task: t, ID: z.Message.ID.String(), @@ -440,7 +511,7 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Sch // // By default, it retrieves the first 30 tasks. func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTask, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return nil, err } opt := composeListOptions(opts...) @@ -452,7 +523,7 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa var tasks []*RetryTask for _, z := range zs { processAt := time.Unix(z.Score, 0) - t := NewTask(z.Message.Type, z.Message.Payload) + t := asynq.NewTask(z.Message.Type, z.Message.Payload) tasks = append(tasks, &RetryTask{ Task: t, ID: z.Message.ID.String(), @@ -473,7 +544,7 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa // // By default, it retrieves the first 30 tasks. func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*ArchivedTask, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return nil, err } opt := composeListOptions(opts...) @@ -485,7 +556,7 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Arch var tasks []*ArchivedTask for _, z := range zs { failedAt := time.Unix(z.Score, 0) - t := NewTask(z.Message.Type, z.Message.Payload) + t := asynq.NewTask(z.Message.Type, z.Message.Payload) tasks = append(tasks, &ArchivedTask{ Task: t, ID: z.Message.ID.String(), @@ -503,7 +574,7 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Arch // DeleteAllPendingTasks deletes all pending tasks from the specified queue, // and reports the number tasks deleted. func (i *Inspector) DeleteAllPendingTasks(qname string) (int, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return 0, err } n, err := i.rdb.DeleteAllPendingTasks(qname) @@ -513,7 +584,7 @@ func (i *Inspector) DeleteAllPendingTasks(qname string) (int, error) { // DeleteAllScheduledTasks deletes all scheduled tasks from the specified queue, // and reports the number tasks deleted. func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return 0, err } n, err := i.rdb.DeleteAllScheduledTasks(qname) @@ -523,7 +594,7 @@ func (i *Inspector) DeleteAllScheduledTasks(qname string) (int, error) { // DeleteAllRetryTasks deletes all retry tasks from the specified queue, // and reports the number tasks deleted. func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return 0, err } n, err := i.rdb.DeleteAllRetryTasks(qname) @@ -533,7 +604,7 @@ func (i *Inspector) DeleteAllRetryTasks(qname string) (int, error) { // DeleteAllArchivedTasks deletes all archived tasks from the specified queue, // and reports the number tasks deleted. func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return 0, err } n, err := i.rdb.DeleteAllArchivedTasks(qname) @@ -542,7 +613,7 @@ func (i *Inspector) DeleteAllArchivedTasks(qname string) (int, error) { // DeleteTaskByKey deletes a task with the given key from the given queue. func (i *Inspector) DeleteTaskByKey(qname, key string) error { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return err } prefix, id, score, err := parseTaskKey(key) @@ -566,7 +637,7 @@ func (i *Inspector) DeleteTaskByKey(qname, key string) error { // RunAllScheduledTasks transition all scheduled tasks to pending state from the given queue, // and reports the number of tasks transitioned. func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return 0, err } n, err := i.rdb.RunAllScheduledTasks(qname) @@ -576,7 +647,7 @@ func (i *Inspector) RunAllScheduledTasks(qname string) (int, error) { // RunAllRetryTasks transition all retry tasks to pending state from the given queue, // and reports the number of tasks transitioned. func (i *Inspector) RunAllRetryTasks(qname string) (int, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return 0, err } n, err := i.rdb.RunAllRetryTasks(qname) @@ -586,7 +657,7 @@ func (i *Inspector) RunAllRetryTasks(qname string) (int, error) { // RunAllArchivedTasks transition all archived tasks to pending state from the given queue, // and reports the number of tasks transitioned. func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return 0, err } n, err := i.rdb.RunAllArchivedTasks(qname) @@ -595,7 +666,7 @@ func (i *Inspector) RunAllArchivedTasks(qname string) (int, error) { // RunTaskByKey transition a task to pending state given task key and queue name. func (i *Inspector) RunTaskByKey(qname, key string) error { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return err } prefix, id, score, err := parseTaskKey(key) @@ -619,7 +690,7 @@ func (i *Inspector) RunTaskByKey(qname, key string) error { // ArchiveAllPendingTasks archives all pending tasks from the given queue, // and reports the number of tasks archived. func (i *Inspector) ArchiveAllPendingTasks(qname string) (int, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return 0, err } n, err := i.rdb.ArchiveAllPendingTasks(qname) @@ -629,7 +700,7 @@ func (i *Inspector) ArchiveAllPendingTasks(qname string) (int, error) { // ArchiveAllScheduledTasks archives all scheduled tasks from the given queue, // and reports the number of tasks archiveed. func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return 0, err } n, err := i.rdb.ArchiveAllScheduledTasks(qname) @@ -639,7 +710,7 @@ func (i *Inspector) ArchiveAllScheduledTasks(qname string) (int, error) { // ArchiveAllRetryTasks archives all retry tasks from the given queue, // and reports the number of tasks archiveed. func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return 0, err } n, err := i.rdb.ArchiveAllRetryTasks(qname) @@ -648,7 +719,7 @@ func (i *Inspector) ArchiveAllRetryTasks(qname string) (int, error) { // ArchiveTaskByKey archives a task with the given key in the given queue. func (i *Inspector) ArchiveTaskByKey(qname, key string) error { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return err } prefix, id, score, err := parseTaskKey(key) @@ -680,7 +751,7 @@ func (i *Inspector) CancelActiveTask(id string) error { // PauseQueue pauses task processing on the specified queue. // If the queue is already paused, it will return a non-nil error. func (i *Inspector) PauseQueue(qname string) error { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return err } return i.rdb.Pause(qname) @@ -689,7 +760,7 @@ func (i *Inspector) PauseQueue(qname string) error { // UnpauseQueue resumes task processing on the specified queue. // If the queue is not paused, it will return a non-nil error. func (i *Inspector) UnpauseQueue(qname string) error { - if err := validateQueueName(qname); err != nil { + if err := base.ValidateQueueName(qname); err != nil { return err } return i.rdb.Unpause(qname) @@ -728,7 +799,7 @@ func (i *Inspector) Servers() ([]*ServerInfo, error) { Started: w.Started, Deadline: w.Deadline, Task: &ActiveTask{ - Task: NewTask(w.Type, w.Payload), + Task: asynq.NewTask(w.Type, w.Payload), ID: w.ID, Queue: w.Queue, }, @@ -812,10 +883,10 @@ type SchedulerEntry struct { Spec string // Periodic Task registered for this entry. - Task *Task + Task *asynq.Task // Opts is the options for the periodic task. - Opts []Option + Opts []asynq.Option // Next shows the next time the task will be enqueued. Next time.Time @@ -834,8 +905,8 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) { return nil, err } for _, e := range res { - task := NewTask(e.Type, e.Payload) - var opts []Option + task := asynq.NewTask(e.Type, e.Payload) + var opts []asynq.Option for _, s := range e.Opts { if o, err := parseOption(s); err == nil { // ignore bad data @@ -854,6 +925,74 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) { return entries, nil } +// parseOption interprets a string s as an Option and returns the Option if parsing is successful, +// otherwise returns non-nil error. +func parseOption(s string) (asynq.Option, error) { + fn, arg := parseOptionFunc(s), parseOptionArg(s) + switch fn { + case "Queue": + qname, err := strconv.Unquote(arg) + if err != nil { + return nil, err + } + return asynq.Queue(qname), nil + case "MaxRetry": + n, err := strconv.Atoi(arg) + if err != nil { + return nil, err + } + return asynq.MaxRetry(n), nil + case "Timeout": + d, err := time.ParseDuration(arg) + if err != nil { + return nil, err + } + return asynq.Timeout(d), nil + case "Deadline": + t, err := time.Parse(time.UnixDate, arg) + if err != nil { + return nil, err + } + return asynq.Deadline(t), nil + case "Unique": + d, err := time.ParseDuration(arg) + if err != nil { + return nil, err + } + return asynq.Unique(d), nil + case "ProcessAt": + t, err := time.Parse(time.UnixDate, arg) + if err != nil { + return nil, err + } + return asynq.ProcessAt(t), nil + case "ProcessIn": + d, err := time.ParseDuration(arg) + if err != nil { + return nil, err + } + return asynq.ProcessIn(d), nil + default: + return nil, fmt.Errorf("cannot not parse option string %q", s) + } +} + +func parseOptionFunc(s string) string { + i := strings.Index(s, "(") + return s[:i] +} + +func parseOptionArg(s string) string { + i := strings.Index(s, "(") + if i >= 0 { + j := strings.Index(s, ")") + if j > i { + return s[i+1 : j] + } + } + return "" +} + // SchedulerEnqueueEvent holds information about an enqueue event by a scheduler. type SchedulerEnqueueEvent struct { // ID of the task that was enqueued. diff --git a/inspector_test.go b/inspeq/inspector_test.go similarity index 76% rename from inspector_test.go rename to inspeq/inspector_test.go index fce1212..8f27e46 100644 --- a/inspector_test.go +++ b/inspeq/inspector_test.go @@ -2,27 +2,108 @@ // Use of this source code is governed by a MIT license // that can be found in the LICENSE file. -package asynq +package inspeq import ( + "flag" "fmt" "math" "sort" + "strings" "testing" "time" + "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" - "github.com/hibiken/asynq/internal/asynqtest" + "github.com/hibiken/asynq" h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" ) +// variables used for package testing. +var ( + redisAddr string + redisDB int + + useRedisCluster bool + redisClusterAddrs string // comma-separated list of host:port + + testLogLevel = asynq.FatalLevel +) + +var testLogger *log.Logger + +func init() { + flag.StringVar(&redisAddr, "redis_addr", "localhost:6379", "redis address to use in testing") + flag.IntVar(&redisDB, "redis_db", 14, "redis db number to use in testing") + flag.BoolVar(&useRedisCluster, "redis_cluster", false, "use redis cluster as a broker in testing") + flag.StringVar(&redisClusterAddrs, "redis_cluster_addrs", "localhost:7000,localhost:7001,localhost:7002", "comma separated list of redis server addresses") + flag.Var(&testLogLevel, "loglevel", "log level to use in testing") + + testLogger = log.NewLogger(nil) + testLogger.SetLevel(toInternalLogLevel(testLogLevel)) +} + +func toInternalLogLevel(l asynq.LogLevel) log.Level { + switch l { + case asynq.DebugLevel: + return log.DebugLevel + case asynq.InfoLevel: + return log.InfoLevel + case asynq.WarnLevel: + return log.WarnLevel + case asynq.ErrorLevel: + return log.ErrorLevel + case asynq.FatalLevel: + return log.FatalLevel + } + panic(fmt.Sprintf("inspeq: unexpected log level: %v", l)) +} + +func setup(tb testing.TB) (r redis.UniversalClient) { + tb.Helper() + if useRedisCluster { + addrs := strings.Split(redisClusterAddrs, ",") + if len(addrs) == 0 { + tb.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.") + } + r = redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: addrs, + }) + } else { + r = redis.NewClient(&redis.Options{ + Addr: redisAddr, + DB: redisDB, + }) + } + // Start each test with a clean slate. + h.FlushDB(tb, r) + return r +} + +func getRedisConnOpt(tb testing.TB) asynq.RedisConnOpt { + tb.Helper() + if useRedisCluster { + addrs := strings.Split(redisClusterAddrs, ",") + if len(addrs) == 0 { + tb.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.") + } + return asynq.RedisClusterClientOpt{ + Addrs: addrs, + } + } + return asynq.RedisClientOpt{ + Addr: redisAddr, + DB: redisDB, + } +} func TestInspectorQueues(t *testing.T) { r := setup(t) defer r.Close() - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { queues []string @@ -55,7 +136,7 @@ func TestInspectorQueues(t *testing.T) { func TestInspectorDeleteQueue(t *testing.T) { r := setup(t) defer r.Close() - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) defer inspector.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) @@ -144,7 +225,7 @@ func TestInspectorDeleteQueue(t *testing.T) { func TestInspectorDeleteQueueErrorQueueNotEmpty(t *testing.T) { r := setup(t) defer r.Close() - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) defer inspector.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) @@ -200,7 +281,7 @@ func TestInspectorDeleteQueueErrorQueueNotEmpty(t *testing.T) { func TestInspectorDeleteQueueErrorQueueNotFound(t *testing.T) { r := setup(t) defer r.Close() - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) defer inspector.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) @@ -256,17 +337,17 @@ func TestInspectorDeleteQueueErrorQueueNotFound(t *testing.T) { func TestInspectorCurrentStats(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessage("task3", nil) - m4 := asynqtest.NewTaskMessage("task4", nil) - m5 := asynqtest.NewTaskMessageWithQueue("task5", nil, "critical") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessage("task4", nil) + m5 := h.NewTaskMessageWithQueue("task5", nil, "critical") m6 := h.NewTaskMessageWithQueue("task6", nil, "low") now := time.Now() timeCmpOpt := cmpopts.EquateApproxTime(time.Second) ignoreMemUsg := cmpopts.IgnoreFields(QueueStats{}, "MemoryUsage") - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { pending map[string][]*base.TaskMessage @@ -336,12 +417,12 @@ func TestInspectorCurrentStats(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllPendingQueues(t, r, tc.pending) - asynqtest.SeedAllActiveQueues(t, r, tc.active) - asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) - asynqtest.SeedAllRetryQueues(t, r, tc.retry) - asynqtest.SeedAllArchivedQueues(t, r, tc.archived) + h.FlushDB(t, r) + h.SeedAllPendingQueues(t, r, tc.pending) + h.SeedAllActiveQueues(t, r, tc.active) + h.SeedAllScheduledQueues(t, r, tc.scheduled) + h.SeedAllRetryQueues(t, r, tc.retry) + h.SeedAllArchivedQueues(t, r, tc.archived) for qname, n := range tc.processed { processedKey := base.ProcessedKey(qname, now) r.Set(processedKey, n, 0) @@ -370,7 +451,7 @@ func TestInspectorHistory(t *testing.T) { r := setup(t) defer r.Close() now := time.Now().UTC() - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { qname string // queue of interest @@ -382,7 +463,7 @@ func TestInspectorHistory(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) + h.FlushDB(t, r) r.SAdd(base.AllQueues, tc.qname) // populate last n days data @@ -423,7 +504,7 @@ func TestInspectorHistory(t *testing.T) { func createPendingTask(msg *base.TaskMessage) *PendingTask { return &PendingTask{ - Task: NewTask(msg.Type, msg.Payload), + Task: asynq.NewTask(msg.Type, msg.Payload), ID: msg.ID.String(), Queue: msg.Queue, MaxRetry: msg.Retry, @@ -435,12 +516,12 @@ func createPendingTask(msg *base.TaskMessage) *PendingTask { func TestInspectorListPendingTasks(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessage("task3", nil) - m4 := asynqtest.NewTaskMessage("task4", nil) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessage("task4", nil) - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { desc string @@ -482,9 +563,9 @@ func TestInspectorListPendingTasks(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) + h.FlushDB(t, r) for q, msgs := range tc.pending { - asynqtest.SeedPendingQueue(t, r, msgs, q) + h.SeedPendingQueue(t, r, msgs, q) } got, err := inspector.ListPendingTasks(tc.qname) @@ -493,7 +574,7 @@ func TestInspectorListPendingTasks(t *testing.T) { tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(Payload{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListPendingTasks(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -504,16 +585,16 @@ func TestInspectorListPendingTasks(t *testing.T) { func TestInspectorListActiveTasks(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessage("task3", nil) - m4 := asynqtest.NewTaskMessage("task4", nil) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessage("task4", nil) - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) createActiveTask := func(msg *base.TaskMessage) *ActiveTask { return &ActiveTask{ - Task: NewTask(msg.Type, msg.Payload), + Task: asynq.NewTask(msg.Type, msg.Payload), ID: msg.ID.String(), Queue: msg.Queue, MaxRetry: msg.Retry, @@ -543,15 +624,15 @@ func TestInspectorListActiveTasks(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllActiveQueues(t, r, tc.active) + h.FlushDB(t, r) + h.SeedAllActiveQueues(t, r, tc.active) got, err := inspector.ListActiveTasks(tc.qname) if err != nil { t.Errorf("%s; ListActiveTasks(%q) returned error: %v", tc.qname, tc.desc, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(Payload{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListActiveTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -562,7 +643,7 @@ func TestInspectorListActiveTasks(t *testing.T) { func createScheduledTask(z base.Z) *ScheduledTask { msg := z.Message return &ScheduledTask{ - Task: NewTask(msg.Type, msg.Payload), + Task: asynq.NewTask(msg.Type, msg.Payload), ID: msg.ID.String(), Queue: msg.Queue, MaxRetry: msg.Retry, @@ -576,17 +657,17 @@ func createScheduledTask(z base.Z) *ScheduledTask { func TestInspectorListScheduledTasks(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessage("task3", nil) - m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { desc string @@ -619,15 +700,15 @@ func TestInspectorListScheduledTasks(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) + h.FlushDB(t, r) + h.SeedAllScheduledQueues(t, r, tc.scheduled) got, err := inspector.ListScheduledTasks(tc.qname) if err != nil { t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(Payload{}, ScheduledTask{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}, ScheduledTask{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListScheduledTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -638,7 +719,7 @@ func TestInspectorListScheduledTasks(t *testing.T) { func createRetryTask(z base.Z) *RetryTask { msg := z.Message return &RetryTask{ - Task: NewTask(msg.Type, msg.Payload), + Task: asynq.NewTask(msg.Type, msg.Payload), ID: msg.ID.String(), Queue: msg.Queue, NextProcessAt: time.Unix(z.Score, 0), @@ -652,17 +733,17 @@ func createRetryTask(z base.Z) *RetryTask { func TestInspectorListRetryTasks(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessage("task3", nil) - m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { desc string @@ -696,15 +777,15 @@ func TestInspectorListRetryTasks(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllRetryQueues(t, r, tc.retry) + h.FlushDB(t, r) + h.SeedAllRetryQueues(t, r, tc.retry) got, err := inspector.ListRetryTasks(tc.qname) if err != nil { t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(Payload{}, RetryTask{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}, RetryTask{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListRetryTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -715,7 +796,7 @@ func TestInspectorListRetryTasks(t *testing.T) { func createArchivedTask(z base.Z) *ArchivedTask { msg := z.Message return &ArchivedTask{ - Task: NewTask(msg.Type, msg.Payload), + Task: asynq.NewTask(msg.Type, msg.Payload), ID: msg.ID.String(), Queue: msg.Queue, MaxRetry: msg.Retry, @@ -729,17 +810,17 @@ func createArchivedTask(z base.Z) *ArchivedTask { func TestInspectorListArchivedTasks(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessage("task3", nil) - m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(-5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(-2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { desc string @@ -772,15 +853,15 @@ func TestInspectorListArchivedTasks(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllArchivedQueues(t, r, tc.archived) + h.FlushDB(t, r) + h.SeedAllArchivedQueues(t, r, tc.archived) got, err := inspector.ListArchivedTasks(tc.qname) if err != nil { t.Errorf("%s; ListArchivedTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(Payload{}, ArchivedTask{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}, ArchivedTask{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListArchivedTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -793,13 +874,13 @@ func TestInspectorListPagination(t *testing.T) { var msgs []*base.TaskMessage for i := 0; i <= 99; i++ { msgs = append(msgs, - asynqtest.NewTaskMessage(fmt.Sprintf("task%d", i), nil)) + h.NewTaskMessage(fmt.Sprintf("task%d", i), nil)) } r := setup(t) defer r.Close() - asynqtest.SeedPendingQueue(t, r, msgs, base.DefaultQueueName) + h.SeedPendingQueue(t, r, msgs, base.DefaultQueueName) - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { page int @@ -841,7 +922,7 @@ func TestInspectorListPagination(t *testing.T) { t.Errorf("ListPendingTask('default') returned error: %v", err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(Payload{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("ListPendingTask('default') = %v, want %v; (-want,+got)\n%s", got, tc.want, diff) @@ -852,12 +933,12 @@ func TestInspectorListPagination(t *testing.T) { func TestInspectorDeleteAllPendingTasks(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessage("task3", nil) - m4 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessageWithQueue("task3", nil, "custom") - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { pending map[string][]*base.TaskMessage @@ -892,8 +973,8 @@ func TestInspectorDeleteAllPendingTasks(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllPendingQueues(t, r, tc.pending) + h.FlushDB(t, r) + h.SeedAllPendingQueues(t, r, tc.pending) got, err := inspector.DeleteAllPendingTasks(tc.qname) if err != nil { @@ -905,8 +986,8 @@ func TestInspectorDeleteAllPendingTasks(t *testing.T) { } for qname, want := range tc.wantPending { - gotPending := asynqtest.GetPendingMessages(t, r, qname) - if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + gotPending := h.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -916,17 +997,17 @@ func TestInspectorDeleteAllPendingTasks(t *testing.T) { func TestInspectorDeleteAllScheduledTasks(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessage("task3", nil) - m4 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessageWithQueue("task3", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -959,8 +1040,8 @@ func TestInspectorDeleteAllScheduledTasks(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) + h.FlushDB(t, r) + h.SeedAllScheduledQueues(t, r, tc.scheduled) got, err := inspector.DeleteAllScheduledTasks(tc.qname) if err != nil { @@ -971,8 +1052,8 @@ func TestInspectorDeleteAllScheduledTasks(t *testing.T) { t.Errorf("DeleteAllScheduledTasks(%q) = %d, want %d", tc.qname, got, tc.want) } for qname, want := range tc.wantScheduled { - gotScheduled := asynqtest.GetScheduledEntries(t, r, qname) - if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" { + gotScheduled := h.GetScheduledEntries(t, r, qname) + if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -982,17 +1063,17 @@ func TestInspectorDeleteAllScheduledTasks(t *testing.T) { func TestInspectorDeleteAllRetryTasks(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessage("task3", nil) - m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -1025,8 +1106,8 @@ func TestInspectorDeleteAllRetryTasks(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllRetryQueues(t, r, tc.retry) + h.FlushDB(t, r) + h.SeedAllRetryQueues(t, r, tc.retry) got, err := inspector.DeleteAllRetryTasks(tc.qname) if err != nil { @@ -1037,8 +1118,8 @@ func TestInspectorDeleteAllRetryTasks(t *testing.T) { t.Errorf("DeleteAllRetryTasks(%q) = %d, want %d", tc.qname, got, tc.want) } for qname, want := range tc.wantRetry { - gotRetry := asynqtest.GetRetryEntries(t, r, qname) - if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" { + gotRetry := h.GetRetryEntries(t, r, qname) + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1048,17 +1129,17 @@ func TestInspectorDeleteAllRetryTasks(t *testing.T) { func TestInspectorDeleteAllArchivedTasks(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessage("task3", nil) - m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { archived map[string][]base.Z @@ -1091,8 +1172,8 @@ func TestInspectorDeleteAllArchivedTasks(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllArchivedQueues(t, r, tc.archived) + h.FlushDB(t, r) + h.SeedAllArchivedQueues(t, r, tc.archived) got, err := inspector.DeleteAllArchivedTasks(tc.qname) if err != nil { @@ -1103,8 +1184,8 @@ func TestInspectorDeleteAllArchivedTasks(t *testing.T) { t.Errorf("DeleteAllArchivedTasks(%q) = %d, want %d", tc.qname, got, tc.want) } for qname, want := range tc.wantArchived { - gotArchived := asynqtest.GetArchivedEntries(t, r, qname) - if diff := cmp.Diff(want, gotArchived, asynqtest.SortZSetEntryOpt); diff != "" { + gotArchived := h.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1114,14 +1195,14 @@ func TestInspectorDeleteAllArchivedTasks(t *testing.T) { func TestInspectorArchiveAllPendingTasks(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessage("task3", nil) - m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { pending map[string][]*base.TaskMessage @@ -1195,9 +1276,9 @@ func TestInspectorArchiveAllPendingTasks(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllPendingQueues(t, r, tc.pending) - asynqtest.SeedAllArchivedQueues(t, r, tc.archived) + h.FlushDB(t, r) + h.SeedAllPendingQueues(t, r, tc.pending) + h.SeedAllArchivedQueues(t, r, tc.archived) got, err := inspector.ArchiveAllPendingTasks(tc.qname) if err != nil { @@ -1208,8 +1289,8 @@ func TestInspectorArchiveAllPendingTasks(t *testing.T) { t.Errorf("ArchiveAllPendingTasks(%q) = %d, want %d", tc.qname, got, tc.want) } for qname, want := range tc.wantPending { - gotPending := asynqtest.GetPendingMessages(t, r, qname) - if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + gotPending := h.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1218,8 +1299,8 @@ func TestInspectorArchiveAllPendingTasks(t *testing.T) { approxOpt := cmp.Comparer(func(a, b int64) bool { return math.Abs(float64(a-b)) < 2 }) - gotArchived := asynqtest.GetArchivedEntries(t, r, qname) - if diff := cmp.Diff(want, gotArchived, asynqtest.SortZSetEntryOpt, approxOpt); diff != "" { + gotArchived := h.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, approxOpt); diff != "" { t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1229,17 +1310,17 @@ func TestInspectorArchiveAllPendingTasks(t *testing.T) { func TestInspectorArchiveAllScheduledTasks(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessage("task3", nil) - m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -1328,9 +1409,9 @@ func TestInspectorArchiveAllScheduledTasks(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) - asynqtest.SeedAllArchivedQueues(t, r, tc.archived) + h.FlushDB(t, r) + h.SeedAllScheduledQueues(t, r, tc.scheduled) + h.SeedAllArchivedQueues(t, r, tc.archived) got, err := inspector.ArchiveAllScheduledTasks(tc.qname) if err != nil { @@ -1341,8 +1422,8 @@ func TestInspectorArchiveAllScheduledTasks(t *testing.T) { t.Errorf("ArchiveAllScheduledTasks(%q) = %d, want %d", tc.qname, got, tc.want) } for qname, want := range tc.wantScheduled { - gotScheduled := asynqtest.GetScheduledEntries(t, r, qname) - if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" { + gotScheduled := h.GetScheduledEntries(t, r, qname) + if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1351,8 +1432,8 @@ func TestInspectorArchiveAllScheduledTasks(t *testing.T) { approxOpt := cmp.Comparer(func(a, b int64) bool { return math.Abs(float64(a-b)) < 2 }) - gotArchived := asynqtest.GetArchivedEntries(t, r, qname) - if diff := cmp.Diff(want, gotArchived, asynqtest.SortZSetEntryOpt, approxOpt); diff != "" { + gotArchived := h.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, approxOpt); diff != "" { t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1362,17 +1443,17 @@ func TestInspectorArchiveAllScheduledTasks(t *testing.T) { func TestInspectorArchiveAllRetryTasks(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessage("task3", nil) - m4 := asynqtest.NewTaskMessageWithQueue("task4", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessage("task3", nil) + m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -1445,9 +1526,9 @@ func TestInspectorArchiveAllRetryTasks(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllRetryQueues(t, r, tc.retry) - asynqtest.SeedAllArchivedQueues(t, r, tc.archived) + h.FlushDB(t, r) + h.SeedAllRetryQueues(t, r, tc.retry) + h.SeedAllArchivedQueues(t, r, tc.archived) got, err := inspector.ArchiveAllRetryTasks(tc.qname) if err != nil { @@ -1458,15 +1539,15 @@ func TestInspectorArchiveAllRetryTasks(t *testing.T) { t.Errorf("ArchiveAllRetryTasks(%q) = %d, want %d", tc.qname, got, tc.want) } for qname, want := range tc.wantRetry { - gotRetry := asynqtest.GetRetryEntries(t, r, qname) - if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" { + gotRetry := h.GetRetryEntries(t, r, qname) + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff) } } - cmpOpt := asynqtest.EquateInt64Approx(2) // allow for 2 seconds difference in Z.Score + cmpOpt := h.EquateInt64Approx(2) // allow for 2 seconds difference in Z.Score for qname, want := range tc.wantArchived { - wantArchived := asynqtest.GetArchivedEntries(t, r, qname) - if diff := cmp.Diff(want, wantArchived, asynqtest.SortZSetEntryOpt, cmpOpt); diff != "" { + wantArchived := h.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, h.SortZSetEntryOpt, cmpOpt); diff != "" { t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1476,17 +1557,17 @@ func TestInspectorArchiveAllRetryTasks(t *testing.T) { func TestInspectorRunAllScheduledTasks(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low") - m4 := asynqtest.NewTaskMessage("task4", nil) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessageWithQueue("task2", nil, "critical") + m3 := h.NewTaskMessageWithQueue("task3", nil, "low") + m4 := h.NewTaskMessage("task4", nil) now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -1563,9 +1644,9 @@ func TestInspectorRunAllScheduledTasks(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) - asynqtest.SeedAllPendingQueues(t, r, tc.pending) + h.FlushDB(t, r) + h.SeedAllScheduledQueues(t, r, tc.scheduled) + h.SeedAllPendingQueues(t, r, tc.pending) got, err := inspector.RunAllScheduledTasks(tc.qname) if err != nil { @@ -1576,14 +1657,14 @@ func TestInspectorRunAllScheduledTasks(t *testing.T) { t.Errorf("RunAllScheduledTasks(%q) = %d, want %d", tc.qname, got, tc.want) } for qname, want := range tc.wantScheduled { - gotScheduled := asynqtest.GetScheduledEntries(t, r, qname) - if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" { + gotScheduled := h.GetScheduledEntries(t, r, qname) + if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff) } } for qname, want := range tc.wantPending { - gotPending := asynqtest.GetPendingMessages(t, r, qname) - if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + gotPending := h.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1593,17 +1674,17 @@ func TestInspectorRunAllScheduledTasks(t *testing.T) { func TestInspectorRunAllRetryTasks(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low") - m4 := asynqtest.NewTaskMessage("task2", nil) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessageWithQueue("task2", nil, "critical") + m3 := h.NewTaskMessageWithQueue("task3", nil, "low") + m4 := h.NewTaskMessage("task2", nil) now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -1680,9 +1761,9 @@ func TestInspectorRunAllRetryTasks(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllRetryQueues(t, r, tc.retry) - asynqtest.SeedAllPendingQueues(t, r, tc.pending) + h.FlushDB(t, r) + h.SeedAllRetryQueues(t, r, tc.retry) + h.SeedAllPendingQueues(t, r, tc.pending) got, err := inspector.RunAllRetryTasks(tc.qname) if err != nil { @@ -1693,14 +1774,14 @@ func TestInspectorRunAllRetryTasks(t *testing.T) { t.Errorf("RunAllRetryTasks(%q) = %d, want %d", tc.qname, got, tc.want) } for qname, want := range tc.wantRetry { - gotRetry := asynqtest.GetRetryEntries(t, r, qname) - if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" { + gotRetry := h.GetRetryEntries(t, r, qname) + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff) } } for qname, want := range tc.wantPending { - gotPending := asynqtest.GetPendingMessages(t, r, qname) - if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + gotPending := h.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1710,17 +1791,17 @@ func TestInspectorRunAllRetryTasks(t *testing.T) { func TestInspectorRunAllArchivedTasks(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low") - m4 := asynqtest.NewTaskMessage("task2", nil) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessageWithQueue("task2", nil, "critical") + m3 := h.NewTaskMessageWithQueue("task3", nil, "low") + m4 := h.NewTaskMessage("task2", nil) now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(-5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(-2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { archived map[string][]base.Z @@ -1793,9 +1874,9 @@ func TestInspectorRunAllArchivedTasks(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllArchivedQueues(t, r, tc.archived) - asynqtest.SeedAllPendingQueues(t, r, tc.pending) + h.FlushDB(t, r) + h.SeedAllArchivedQueues(t, r, tc.archived) + h.SeedAllPendingQueues(t, r, tc.pending) got, err := inspector.RunAllArchivedTasks(tc.qname) if err != nil { @@ -1806,15 +1887,15 @@ func TestInspectorRunAllArchivedTasks(t *testing.T) { t.Errorf("RunAllArchivedTasks(%q) = %d, want %d", tc.qname, got, tc.want) } for qname, want := range tc.wantArchived { - wantArchived := asynqtest.GetArchivedEntries(t, r, qname) - if diff := cmp.Diff(want, wantArchived, asynqtest.SortZSetEntryOpt); diff != "" { + wantArchived := h.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } for qname, want := range tc.wantPending { - gotPending := asynqtest.GetPendingMessages(t, r, qname) - if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + gotPending := h.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1824,10 +1905,10 @@ func TestInspectorRunAllArchivedTasks(t *testing.T) { func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") - inspector := NewInspector(getRedisConnOpt(t)) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + inspector := New(getRedisConnOpt(t)) tests := []struct { pending map[string][]*base.TaskMessage @@ -1862,8 +1943,8 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllPendingQueues(t, r, tc.pending) + h.FlushDB(t, r) + h.SeedAllPendingQueues(t, r, tc.pending) if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", @@ -1872,8 +1953,8 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { } for qname, want := range tc.wantPending { - got := asynqtest.GetPendingMessages(t, r, qname) - if diff := cmp.Diff(want, got, asynqtest.SortMsgOpt); diff != "" { + got := h.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, got, h.SortMsgOpt); diff != "" { t.Errorf("unspected pending tasks in queue %q: (-want,+got):\n%s", qname, diff) continue @@ -1885,15 +1966,15 @@ func TestInspectorDeleteTaskByKeyDeletesPendingTask(t *testing.T) { func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -1916,15 +1997,15 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) + h.FlushDB(t, r) + h.SeedAllScheduledQueues(t, r, tc.scheduled) if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) } for qname, want := range tc.wantScheduled { - gotScheduled := asynqtest.GetScheduledEntries(t, r, qname) - if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" { + gotScheduled := h.GetScheduledEntries(t, r, qname) + if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff) } @@ -1935,15 +2016,15 @@ func TestInspectorDeleteTaskByKeyDeletesScheduledTask(t *testing.T) { func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -1966,16 +2047,16 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllRetryQueues(t, r, tc.retry) + h.FlushDB(t, r) + h.SeedAllRetryQueues(t, r, tc.retry) if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } for qname, want := range tc.wantRetry { - gotRetry := asynqtest.GetRetryEntries(t, r, qname) - if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" { + gotRetry := h.GetRetryEntries(t, r, qname) + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -1985,15 +2066,15 @@ func TestInspectorDeleteTaskByKeyDeletesRetryTask(t *testing.T) { func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(-5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { archived map[string][]base.Z @@ -2016,16 +2097,16 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllArchivedQueues(t, r, tc.archived) + h.FlushDB(t, r) + h.SeedAllArchivedQueues(t, r, tc.archived) if err := inspector.DeleteTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("DeleteTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } for qname, want := range tc.wantArchived { - wantArchived := asynqtest.GetArchivedEntries(t, r, qname) - if diff := cmp.Diff(want, wantArchived, asynqtest.SortZSetEntryOpt); diff != "" { + wantArchived := h.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } @@ -2035,15 +2116,15 @@ func TestInspectorDeleteTaskByKeyDeletesArchivedTask(t *testing.T) { func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessage("task2", nil) - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessage("task2", nil) + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -2076,25 +2157,25 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) - asynqtest.SeedAllPendingQueues(t, r, tc.pending) + h.FlushDB(t, r) + h.SeedAllScheduledQueues(t, r, tc.scheduled) + h.SeedAllPendingQueues(t, r, tc.pending) if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } for qname, want := range tc.wantScheduled { - gotScheduled := asynqtest.GetScheduledEntries(t, r, qname) - if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" { + gotScheduled := h.GetScheduledEntries(t, r, qname) + if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff) } } for qname, want := range tc.wantPending { - gotPending := asynqtest.GetPendingMessages(t, r, qname) - if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + gotPending := h.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff) } @@ -2105,15 +2186,15 @@ func TestInspectorRunTaskByKeyRunsScheduledTask(t *testing.T) { func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom") - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessageWithQueue("task2", nil, "custom") + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -2146,24 +2227,24 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllRetryQueues(t, r, tc.retry) - asynqtest.SeedAllPendingQueues(t, r, tc.pending) + h.FlushDB(t, r) + h.SeedAllRetryQueues(t, r, tc.retry) + h.SeedAllPendingQueues(t, r, tc.pending) if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } for qname, want := range tc.wantRetry { - gotRetry := asynqtest.GetRetryEntries(t, r, qname) - if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" { + gotRetry := h.GetRetryEntries(t, r, qname) + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff) } } for qname, want := range tc.wantPending { - gotPending := asynqtest.GetPendingMessages(t, r, qname) - if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + gotPending := h.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff) } @@ -2174,15 +2255,15 @@ func TestInspectorRunTaskByKeyRunsRetryTask(t *testing.T) { func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "critical") - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "low") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessageWithQueue("task2", nil, "critical") + m3 := h.NewTaskMessageWithQueue("task3", nil, "low") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(-5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { archived map[string][]base.Z @@ -2219,24 +2300,24 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllArchivedQueues(t, r, tc.archived) - asynqtest.SeedAllPendingQueues(t, r, tc.pending) + h.FlushDB(t, r) + h.SeedAllArchivedQueues(t, r, tc.archived) + h.SeedAllPendingQueues(t, r, tc.pending) if err := inspector.RunTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("RunTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } for qname, want := range tc.wantArchived { - wantArchived := asynqtest.GetArchivedEntries(t, r, qname) - if diff := cmp.Diff(want, wantArchived, asynqtest.SortZSetEntryOpt); diff != "" { + wantArchived := h.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } } for qname, want := range tc.wantPending { - gotPending := asynqtest.GetPendingMessages(t, r, qname) - if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + gotPending := h.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("unexpected pending tasks in queue %q: (-want, +got)\n%s", qname, diff) } @@ -2247,10 +2328,10 @@ func TestInspectorRunTaskByKeyRunsArchivedTask(t *testing.T) { func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom") - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") - inspector := NewInspector(getRedisConnOpt(t)) + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessageWithQueue("task2", nil, "custom") + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") + inspector := New(getRedisConnOpt(t)) now := time.Now() tests := []struct { @@ -2308,9 +2389,9 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllPendingQueues(t, r, tc.pending) - asynqtest.SeedAllArchivedQueues(t, r, tc.archived) + h.FlushDB(t, r) + h.SeedAllPendingQueues(t, r, tc.pending) + h.SeedAllArchivedQueues(t, r, tc.archived) if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v", @@ -2318,16 +2399,16 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { continue } for qname, want := range tc.wantPending { - gotPending := asynqtest.GetPendingMessages(t, r, qname) - if diff := cmp.Diff(want, gotPending, asynqtest.SortMsgOpt); diff != "" { + gotPending := h.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { t.Errorf("unexpected pending tasks in queue %q: (-want,+got)\n%s", qname, diff) } } for qname, want := range tc.wantArchived { - wantArchived := asynqtest.GetArchivedEntries(t, r, qname) - if diff := cmp.Diff(want, wantArchived, asynqtest.SortZSetEntryOpt); diff != "" { + wantArchived := h.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected archived tasks in queue %q: (-want,+got)\n%s", qname, diff) } @@ -2338,15 +2419,15 @@ func TestInspectorArchiveTaskByKeyArchivesPendingTask(t *testing.T) { func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom") - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessageWithQueue("task2", nil, "custom") + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -2385,25 +2466,25 @@ func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllScheduledQueues(t, r, tc.scheduled) - asynqtest.SeedAllArchivedQueues(t, r, tc.archived) + h.FlushDB(t, r) + h.SeedAllScheduledQueues(t, r, tc.scheduled) + h.SeedAllArchivedQueues(t, r, tc.archived) if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } for qname, want := range tc.wantScheduled { - gotScheduled := asynqtest.GetScheduledEntries(t, r, qname) - if diff := cmp.Diff(want, gotScheduled, asynqtest.SortZSetEntryOpt); diff != "" { + gotScheduled := h.GetScheduledEntries(t, r, qname) + if diff := cmp.Diff(want, gotScheduled, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected scheduled tasks in queue %q: (-want, +got)\n%s", qname, diff) } } for qname, want := range tc.wantArchived { - wantArchived := asynqtest.GetArchivedEntries(t, r, qname) - if diff := cmp.Diff(want, wantArchived, asynqtest.SortZSetEntryOpt); diff != "" { + wantArchived := h.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } @@ -2414,15 +2495,15 @@ func TestInspectorArchiveTaskByKeyArchivesScheduledTask(t *testing.T) { func TestInspectorArchiveTaskByKeyArchivesRetryTask(t *testing.T) { r := setup(t) defer r.Close() - m1 := asynqtest.NewTaskMessage("task1", nil) - m2 := asynqtest.NewTaskMessageWithQueue("task2", nil, "custom") - m3 := asynqtest.NewTaskMessageWithQueue("task3", nil, "custom") + m1 := h.NewTaskMessage("task1", nil) + m2 := h.NewTaskMessageWithQueue("task2", nil, "custom") + m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -2460,24 +2541,24 @@ func TestInspectorArchiveTaskByKeyArchivesRetryTask(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) - asynqtest.SeedAllRetryQueues(t, r, tc.retry) - asynqtest.SeedAllArchivedQueues(t, r, tc.archived) + h.FlushDB(t, r) + h.SeedAllRetryQueues(t, r, tc.retry) + h.SeedAllArchivedQueues(t, r, tc.archived) if err := inspector.ArchiveTaskByKey(tc.qname, tc.key); err != nil { t.Errorf("ArchiveTaskByKey(%q, %q) returned error: %v", tc.qname, tc.key, err) continue } for qname, want := range tc.wantRetry { - gotRetry := asynqtest.GetRetryEntries(t, r, qname) - if diff := cmp.Diff(want, gotRetry, asynqtest.SortZSetEntryOpt); diff != "" { + gotRetry := h.GetRetryEntries(t, r, qname) + if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected retry tasks in queue %q: (-want, +got)\n%s", qname, diff) } } for qname, want := range tc.wantArchived { - wantArchived := asynqtest.GetArchivedEntries(t, r, qname) - if diff := cmp.Diff(want, wantArchived, asynqtest.SortZSetEntryOpt); diff != "" { + wantArchived := h.GetArchivedEntries(t, r, qname) + if diff := cmp.Diff(want, wantArchived, h.SortZSetEntryOpt); diff != "" { t.Errorf("unexpected archived tasks in queue %q: (-want, +got)\n%s", qname, diff) } @@ -2496,7 +2577,7 @@ var sortSchedulerEntry = cmp.Transformer("SortSchedulerEntry", func(in []*Schedu func TestInspectorSchedulerEntries(t *testing.T) { r := setup(t) rdbClient := rdb.NewRDB(r) - inspector := NewInspector(getRedisConnOpt(t)) + inspector := New(getRedisConnOpt(t)) now := time.Now().UTC() schedulerID := "127.0.0.1:9876:abc123" @@ -2507,7 +2588,7 @@ func TestInspectorSchedulerEntries(t *testing.T) { }{ { data: []*base.SchedulerEntry{ - &base.SchedulerEntry{ + { Spec: "* * * * *", Type: "foo", Payload: nil, @@ -2515,7 +2596,7 @@ func TestInspectorSchedulerEntries(t *testing.T) { Next: now.Add(5 * time.Hour), Prev: now.Add(-2 * time.Hour), }, - &base.SchedulerEntry{ + { Spec: "@every 20m", Type: "bar", Payload: map[string]interface{}{"fiz": "baz"}, @@ -2525,17 +2606,17 @@ func TestInspectorSchedulerEntries(t *testing.T) { }, }, want: []*SchedulerEntry{ - &SchedulerEntry{ + { Spec: "* * * * *", - Task: NewTask("foo", nil), + Task: asynq.NewTask("foo", nil), Opts: nil, Next: now.Add(5 * time.Hour), Prev: now.Add(-2 * time.Hour), }, - &SchedulerEntry{ + { Spec: "@every 20m", - Task: NewTask("bar", map[string]interface{}{"fiz": "baz"}), - Opts: []Option{Queue("bar"), MaxRetry(20)}, + Task: asynq.NewTask("bar", map[string]interface{}{"fiz": "baz"}), + Opts: []asynq.Option{asynq.Queue("bar"), asynq.MaxRetry(20)}, Next: now.Add(1 * time.Minute), Prev: now.Add(-19 * time.Minute), }, @@ -2544,7 +2625,7 @@ func TestInspectorSchedulerEntries(t *testing.T) { } for _, tc := range tests { - asynqtest.FlushDB(t, r) + h.FlushDB(t, r) err := rdbClient.WriteSchedulerEntries(schedulerID, tc.data, time.Minute) if err != nil { t.Fatalf("could not write data: %v", err) @@ -2554,10 +2635,78 @@ func TestInspectorSchedulerEntries(t *testing.T) { t.Errorf("SchedulerEntries() returned error: %v", err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(Payload{}) + ignoreOpt := cmpopts.IgnoreUnexported(asynq.Payload{}) if diff := cmp.Diff(tc.want, got, sortSchedulerEntry, ignoreOpt); diff != "" { t.Errorf("SchedulerEntries() = %v, want %v; (-want,+got)\n%s", got, tc.want, diff) } } } + +func TestParseOption(t *testing.T) { + oneHourFromNow := time.Now().Add(1 * time.Hour) + tests := []struct { + s string + wantType asynq.OptionType + wantVal interface{} + }{ + {`MaxRetry(10)`, asynq.MaxRetryOpt, 10}, + {`Queue("email")`, asynq.QueueOpt, "email"}, + {`Timeout(3m)`, asynq.TimeoutOpt, 3 * time.Minute}, + {asynq.Deadline(oneHourFromNow).String(), asynq.DeadlineOpt, oneHourFromNow}, + {`Unique(1h)`, asynq.UniqueOpt, 1 * time.Hour}, + {asynq.ProcessAt(oneHourFromNow).String(), asynq.ProcessAtOpt, oneHourFromNow}, + {`ProcessIn(10m)`, asynq.ProcessInOpt, 10 * time.Minute}, + } + + for _, tc := range tests { + t.Run(tc.s, func(t *testing.T) { + got, err := parseOption(tc.s) + if err != nil { + t.Fatalf("returned error: %v", err) + } + if got == nil { + t.Fatal("returned nil") + } + if got.Type() != tc.wantType { + t.Fatalf("got type %v, want type %v ", got.Type(), tc.wantType) + } + switch tc.wantType { + case asynq.QueueOpt: + gotVal, ok := got.Value().(string) + if !ok { + t.Fatal("returned Option with non-string value") + } + if gotVal != tc.wantVal.(string) { + t.Fatalf("got value %v, want %v", gotVal, tc.wantVal) + } + case asynq.MaxRetryOpt: + gotVal, ok := got.Value().(int) + if !ok { + t.Fatal("returned Option with non-int value") + } + if gotVal != tc.wantVal.(int) { + t.Fatalf("got value %v, want %v", gotVal, tc.wantVal) + } + case asynq.TimeoutOpt, asynq.UniqueOpt, asynq.ProcessInOpt: + gotVal, ok := got.Value().(time.Duration) + if !ok { + t.Fatal("returned Option with non duration value") + } + if gotVal != tc.wantVal.(time.Duration) { + t.Fatalf("got value %v, want %v", gotVal, tc.wantVal) + } + case asynq.DeadlineOpt, asynq.ProcessAtOpt: + gotVal, ok := got.Value().(time.Time) + if !ok { + t.Fatal("returned Option with non time value") + } + if cmp.Equal(gotVal, tc.wantVal.(time.Time)) { + t.Fatalf("got value %v, want %v", gotVal, tc.wantVal) + } + default: + t.Fatalf("returned Option with unexpected type: %v", got.Type()) + } + }) + } +} \ No newline at end of file diff --git a/internal/base/base.go b/internal/base/base.go index 2d1bca0..8023a0d 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -36,6 +36,15 @@ const ( CancelChannel = "asynq:cancel" // PubSub channel ) +// ValidateQueueName validates a given qname to be used as a queue name. +// Returns nil if valid, otherwise returns non-nil error. +func ValidateQueueName(qname string) error { + if len(qname) == 0 { + return fmt.Errorf("queue name must contain one or more characters") + } + return nil +} + // QueueKey returns a redis key for the given queue name. func QueueKey(qname string) string { return fmt.Sprintf("asynq:{%s}", qname)