From 12f4c7cf6efc5ae55d707b83f23828b59b620345 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 18 May 2021 18:45:15 -0700 Subject: [PATCH] Move inspeq package content to asynq package --- CHANGELOG.md | 1 + inspeq/inspector.go => inspector.go | 52 +++-- inspeq/inspector_test.go => inspector_test.go | 210 ++++++------------ inspeq/doc.go | 22 -- 4 files changed, 90 insertions(+), 195 deletions(-) rename inspeq/inspector.go => inspector.go (96%) rename inspeq/inspector_test.go => inspector_test.go (93%) delete mode 100644 inspeq/doc.go diff --git a/CHANGELOG.md b/CHANGELOG.md index e8ac415..aac4782 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `Inspector.RunTaskByKey` is replaced with `Inspector.RunTask` - `Inspector.DeleteTaskByKey` is replaced with `Inspector.DeleteTask` - `Inspector.ArchiveTaskByKey` is replaced with `Inspector.ArchiveTask` +- `inspeq` package is removed. All types and functions from the package is moved to `asynq` package. ## [0.17.2] - 2021-06-06 diff --git a/inspeq/inspector.go b/inspector.go similarity index 96% rename from inspeq/inspector.go rename to inspector.go index bb2c8ee..797c481 100644 --- a/inspeq/inspector.go +++ b/inspector.go @@ -2,7 +2,7 @@ // Use of this source code is governed by a MIT license // that can be found in the LICENSE file. -package inspeq +package asynq import ( "fmt" @@ -12,7 +12,6 @@ import ( "github.com/go-redis/redis/v7" "github.com/google/uuid" - "github.com/hibiken/asynq" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/rdb" @@ -25,7 +24,7 @@ type Inspector struct { } // New returns a new instance of Inspector. -func New(r asynq.RedisConnOpt) *Inspector { +func NewInspector(r RedisConnOpt) *Inspector { c, ok := r.MakeRedisClient().(redis.UniversalClient) if !ok { panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r)) @@ -170,7 +169,7 @@ func (i *Inspector) DeleteQueue(qname string, force bool) error { // PendingTask is a task in a queue and is ready to be processed. type PendingTask struct { - *asynq.Task + *Task ID string Queue string MaxRetry int @@ -180,7 +179,7 @@ type PendingTask struct { // ActiveTask is a task that's currently being processed. type ActiveTask struct { - *asynq.Task + *Task ID string Queue string MaxRetry int @@ -190,7 +189,7 @@ type ActiveTask struct { // ScheduledTask is a task scheduled to be processed in the future. type ScheduledTask struct { - *asynq.Task + *Task ID string Queue string MaxRetry int @@ -203,7 +202,7 @@ type ScheduledTask struct { // RetryTask is a task scheduled to be retried in the future. type RetryTask struct { - *asynq.Task + *Task ID string Queue string NextProcessAt time.Time @@ -220,7 +219,7 @@ type RetryTask struct { // A task can be archived when the task exhausts its retry counts or manually // archived by a user via the CLI or Inspector. type ArchivedTask struct { - *asynq.Task + *Task ID string Queue string MaxRetry int @@ -366,7 +365,7 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi var tasks []*PendingTask for _, m := range msgs { tasks = append(tasks, &PendingTask{ - Task: asynq.NewTask(m.Type, m.Payload), + Task: NewTask(m.Type, m.Payload), ID: m.ID.String(), Queue: m.Queue, MaxRetry: m.Retry, @@ -392,9 +391,8 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*Active } var tasks []*ActiveTask for _, m := range msgs { - tasks = append(tasks, &ActiveTask{ - Task: asynq.NewTask(m.Type, m.Payload), + Task: NewTask(m.Type, m.Payload), ID: m.ID.String(), Queue: m.Queue, MaxRetry: m.Retry, @@ -422,7 +420,7 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Sch var tasks []*ScheduledTask for _, z := range zs { processAt := time.Unix(z.Score, 0) - t := asynq.NewTask(z.Message.Type, z.Message.Payload) + t := NewTask(z.Message.Type, z.Message.Payload) tasks = append(tasks, &ScheduledTask{ Task: t, ID: z.Message.ID.String(), @@ -454,7 +452,7 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa var tasks []*RetryTask for _, z := range zs { processAt := time.Unix(z.Score, 0) - t := asynq.NewTask(z.Message.Type, z.Message.Payload) + t := NewTask(z.Message.Type, z.Message.Payload) tasks = append(tasks, &RetryTask{ Task: t, ID: z.Message.ID.String(), @@ -487,7 +485,7 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Arch var tasks []*ArchivedTask for _, z := range zs { failedAt := time.Unix(z.Score, 0) - t := asynq.NewTask(z.Message.Type, z.Message.Payload) + t := NewTask(z.Message.Type, z.Message.Payload) tasks = append(tasks, &ArchivedTask{ Task: t, ID: z.Message.ID.String(), @@ -743,7 +741,7 @@ func (i *Inspector) Servers() ([]*ServerInfo, error) { Started: w.Started, Deadline: w.Deadline, Task: &ActiveTask{ - Task: asynq.NewTask(w.Type, w.Payload), + Task: NewTask(w.Type, w.Payload), ID: w.ID, Queue: w.Queue, }, @@ -827,10 +825,10 @@ type SchedulerEntry struct { Spec string // Periodic Task registered for this entry. - Task *asynq.Task + Task *Task // Opts is the options for the periodic task. - Opts []asynq.Option + Opts []Option // Next shows the next time the task will be enqueued. Next time.Time @@ -849,8 +847,8 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) { return nil, err } for _, e := range res { - task := asynq.NewTask(e.Type, e.Payload) - var opts []asynq.Option + task := NewTask(e.Type, e.Payload) + var opts []Option for _, s := range e.Opts { if o, err := parseOption(s); err == nil { // ignore bad data @@ -871,7 +869,7 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) { // parseOption interprets a string s as an Option and returns the Option if parsing is successful, // otherwise returns non-nil error. -func parseOption(s string) (asynq.Option, error) { +func parseOption(s string) (Option, error) { fn, arg := parseOptionFunc(s), parseOptionArg(s) switch fn { case "Queue": @@ -879,43 +877,43 @@ func parseOption(s string) (asynq.Option, error) { if err != nil { return nil, err } - return asynq.Queue(qname), nil + return Queue(qname), nil case "MaxRetry": n, err := strconv.Atoi(arg) if err != nil { return nil, err } - return asynq.MaxRetry(n), nil + return MaxRetry(n), nil case "Timeout": d, err := time.ParseDuration(arg) if err != nil { return nil, err } - return asynq.Timeout(d), nil + return Timeout(d), nil case "Deadline": t, err := time.Parse(time.UnixDate, arg) if err != nil { return nil, err } - return asynq.Deadline(t), nil + return Deadline(t), nil case "Unique": d, err := time.ParseDuration(arg) if err != nil { return nil, err } - return asynq.Unique(d), nil + return Unique(d), nil case "ProcessAt": t, err := time.Parse(time.UnixDate, arg) if err != nil { return nil, err } - return asynq.ProcessAt(t), nil + return ProcessAt(t), nil case "ProcessIn": d, err := time.ParseDuration(arg) if err != nil { return nil, err } - return asynq.ProcessIn(d), nil + return ProcessIn(d), nil default: return nil, fmt.Errorf("cannot not parse option string %q", s) } diff --git a/inspeq/inspector_test.go b/inspector_test.go similarity index 93% rename from inspeq/inspector_test.go rename to inspector_test.go index 06a364b..c9a3fdb 100644 --- a/inspeq/inspector_test.go +++ b/inspector_test.go @@ -2,110 +2,28 @@ // Use of this source code is governed by a MIT license // that can be found in the LICENSE file. -package inspeq +package asynq import ( "errors" - "flag" "fmt" "math" "sort" - "strings" "testing" "time" - "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/google/uuid" - "github.com/hibiken/asynq" h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" - "github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/rdb" ) -// variables used for package testing. -var ( - redisAddr string - redisDB int - - useRedisCluster bool - redisClusterAddrs string // comma-separated list of host:port - - testLogLevel = asynq.FatalLevel -) - -var testLogger *log.Logger - -func init() { - flag.StringVar(&redisAddr, "redis_addr", "localhost:6379", "redis address to use in testing") - flag.IntVar(&redisDB, "redis_db", 13, "redis db number to use in testing") - flag.BoolVar(&useRedisCluster, "redis_cluster", false, "use redis cluster as a broker in testing") - flag.StringVar(&redisClusterAddrs, "redis_cluster_addrs", "localhost:7000,localhost:7001,localhost:7002", "comma separated list of redis server addresses") - flag.Var(&testLogLevel, "loglevel", "log level to use in testing") - - testLogger = log.NewLogger(nil) - testLogger.SetLevel(toInternalLogLevel(testLogLevel)) -} - -func toInternalLogLevel(l asynq.LogLevel) log.Level { - switch l { - case asynq.DebugLevel: - return log.DebugLevel - case asynq.InfoLevel: - return log.InfoLevel - case asynq.WarnLevel: - return log.WarnLevel - case asynq.ErrorLevel: - return log.ErrorLevel - case asynq.FatalLevel: - return log.FatalLevel - } - panic(fmt.Sprintf("inspeq: unexpected log level: %v", l)) -} - -func setup(tb testing.TB) (r redis.UniversalClient) { - tb.Helper() - if useRedisCluster { - addrs := strings.Split(redisClusterAddrs, ",") - if len(addrs) == 0 { - tb.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.") - } - r = redis.NewClusterClient(&redis.ClusterOptions{ - Addrs: addrs, - }) - } else { - r = redis.NewClient(&redis.Options{ - Addr: redisAddr, - DB: redisDB, - }) - } - // Start each test with a clean slate. - h.FlushDB(tb, r) - return r -} - -func getRedisConnOpt(tb testing.TB) asynq.RedisConnOpt { - tb.Helper() - if useRedisCluster { - addrs := strings.Split(redisClusterAddrs, ",") - if len(addrs) == 0 { - tb.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.") - } - return asynq.RedisClusterClientOpt{ - Addrs: addrs, - } - } - return asynq.RedisClientOpt{ - Addr: redisAddr, - DB: redisDB, - } -} func TestInspectorQueues(t *testing.T) { r := setup(t) defer r.Close() - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { queues []string @@ -138,7 +56,7 @@ func TestInspectorQueues(t *testing.T) { func TestInspectorDeleteQueue(t *testing.T) { r := setup(t) defer r.Close() - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) defer inspector.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) @@ -227,7 +145,7 @@ func TestInspectorDeleteQueue(t *testing.T) { func TestInspectorDeleteQueueErrorQueueNotEmpty(t *testing.T) { r := setup(t) defer r.Close() - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) defer inspector.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) @@ -283,7 +201,7 @@ func TestInspectorDeleteQueueErrorQueueNotEmpty(t *testing.T) { func TestInspectorDeleteQueueErrorQueueNotFound(t *testing.T) { r := setup(t) defer r.Close() - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) defer inspector.Close() m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) @@ -349,7 +267,7 @@ func TestInspectorCurrentStats(t *testing.T) { timeCmpOpt := cmpopts.EquateApproxTime(time.Second) ignoreMemUsg := cmpopts.IgnoreFields(QueueStats{}, "MemoryUsage") - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { pending map[string][]*base.TaskMessage @@ -453,7 +371,7 @@ func TestInspectorHistory(t *testing.T) { r := setup(t) defer r.Close() now := time.Now().UTC() - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { qname string // queue of interest @@ -506,7 +424,7 @@ func TestInspectorHistory(t *testing.T) { func createPendingTask(msg *base.TaskMessage) *PendingTask { return &PendingTask{ - Task: asynq.NewTask(msg.Type, msg.Payload), + Task: NewTask(msg.Type, msg.Payload), ID: msg.ID.String(), Queue: msg.Queue, MaxRetry: msg.Retry, @@ -523,7 +441,7 @@ func TestInspectorListPendingTasks(t *testing.T) { m3 := h.NewTaskMessageWithQueue("task3", nil, "critical") m4 := h.NewTaskMessageWithQueue("task4", nil, "low") - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { desc string @@ -576,7 +494,7 @@ func TestInspectorListPendingTasks(t *testing.T) { tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) + ignoreOpt := cmpopts.IgnoreUnexported(Task{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListPendingTasks(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -592,11 +510,11 @@ func TestInspectorListActiveTasks(t *testing.T) { m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") m4 := h.NewTaskMessageWithQueue("task4", nil, "custom") - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) createActiveTask := func(msg *base.TaskMessage) *ActiveTask { return &ActiveTask{ - Task: asynq.NewTask(msg.Type, msg.Payload), + Task: NewTask(msg.Type, msg.Payload), ID: msg.ID.String(), Queue: msg.Queue, MaxRetry: msg.Retry, @@ -634,7 +552,7 @@ func TestInspectorListActiveTasks(t *testing.T) { t.Errorf("%s; ListActiveTasks(%q) returned error: %v", tc.qname, tc.desc, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) + ignoreOpt := cmpopts.IgnoreUnexported(Task{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListActiveTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -645,7 +563,7 @@ func TestInspectorListActiveTasks(t *testing.T) { func createScheduledTask(z base.Z) *ScheduledTask { msg := z.Message return &ScheduledTask{ - Task: asynq.NewTask(msg.Type, msg.Payload), + Task: NewTask(msg.Type, msg.Payload), ID: msg.ID.String(), Queue: msg.Queue, MaxRetry: msg.Retry, @@ -669,7 +587,7 @@ func TestInspectorListScheduledTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { desc string @@ -710,7 +628,7 @@ func TestInspectorListScheduledTasks(t *testing.T) { t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, ScheduledTask{}) + ignoreOpt := cmpopts.IgnoreUnexported(Task{}, ScheduledTask{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListScheduledTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -721,7 +639,7 @@ func TestInspectorListScheduledTasks(t *testing.T) { func createRetryTask(z base.Z) *RetryTask { msg := z.Message return &RetryTask{ - Task: asynq.NewTask(msg.Type, msg.Payload), + Task: NewTask(msg.Type, msg.Payload), ID: msg.ID.String(), Queue: msg.Queue, NextProcessAt: time.Unix(z.Score, 0), @@ -745,7 +663,7 @@ func TestInspectorListRetryTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { desc string @@ -787,7 +705,7 @@ func TestInspectorListRetryTasks(t *testing.T) { t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, RetryTask{}) + ignoreOpt := cmpopts.IgnoreUnexported(Task{}, RetryTask{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListRetryTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -798,7 +716,7 @@ func TestInspectorListRetryTasks(t *testing.T) { func createArchivedTask(z base.Z) *ArchivedTask { msg := z.Message return &ArchivedTask{ - Task: asynq.NewTask(msg.Type, msg.Payload), + Task: NewTask(msg.Type, msg.Payload), ID: msg.ID.String(), Queue: msg.Queue, MaxRetry: msg.Retry, @@ -822,7 +740,7 @@ func TestInspectorListArchivedTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(-2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { desc string @@ -863,7 +781,7 @@ func TestInspectorListArchivedTasks(t *testing.T) { t.Errorf("%s; ListArchivedTasks(%q) returned error: %v", tc.desc, tc.qname, err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, ArchivedTask{}) + ignoreOpt := cmpopts.IgnoreUnexported(Task{}, ArchivedTask{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("%s; ListArchivedTask(%q) = %v, want %v; (-want,+got)\n%s", tc.desc, tc.qname, got, tc.want, diff) @@ -882,7 +800,7 @@ func TestInspectorListPagination(t *testing.T) { defer r.Close() h.SeedPendingQueue(t, r, msgs, base.DefaultQueueName) - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { page int @@ -924,7 +842,7 @@ func TestInspectorListPagination(t *testing.T) { t.Errorf("ListPendingTask('default') returned error: %v", err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) + ignoreOpt := cmpopts.IgnoreUnexported(Task{}) if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" { t.Errorf("ListPendingTask('default') = %v, want %v; (-want,+got)\n%s", got, tc.want, diff) @@ -940,7 +858,7 @@ func TestInspectorDeleteAllPendingTasks(t *testing.T) { m3 := h.NewTaskMessage("task3", nil) m4 := h.NewTaskMessageWithQueue("task3", nil, "custom") - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { pending map[string][]*base.TaskMessage @@ -1009,7 +927,7 @@ func TestInspectorDeleteAllScheduledTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -1075,7 +993,7 @@ func TestInspectorDeleteAllRetryTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -1141,7 +1059,7 @@ func TestInspectorDeleteAllArchivedTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { archived map[string][]base.Z @@ -1204,7 +1122,7 @@ func TestInspectorArchiveAllPendingTasks(t *testing.T) { now := time.Now() z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()} z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { pending map[string][]*base.TaskMessage @@ -1321,7 +1239,7 @@ func TestInspectorArchiveAllScheduledTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -1454,7 +1372,7 @@ func TestInspectorArchiveAllRetryTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -1568,7 +1486,7 @@ func TestInspectorRunAllScheduledTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -1685,7 +1603,7 @@ func TestInspectorRunAllRetryTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -1802,7 +1720,7 @@ func TestInspectorRunAllArchivedTasks(t *testing.T) { z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} z4 := base.Z{Message: m4, Score: now.Add(-2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { archived map[string][]base.Z @@ -1909,7 +1827,7 @@ func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) { m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessage("task2", nil) m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { pending map[string][]*base.TaskMessage @@ -1974,7 +1892,7 @@ func TestInspectorDeleteTaskDeletesScheduledTask(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -2024,7 +1942,7 @@ func TestInspectorDeleteTaskDeletesRetryTask(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -2074,7 +1992,7 @@ func TestInspectorDeleteTaskDeletesArchivedTask(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { archived map[string][]base.Z @@ -2124,7 +2042,7 @@ func TestInspectorDeleteTaskError(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { archived map[string][]base.Z @@ -2189,7 +2107,7 @@ func TestInspectorRunTaskRunsScheduledTask(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -2259,7 +2177,7 @@ func TestInspectorRunTaskRunsRetryTask(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -2328,7 +2246,7 @@ func TestInspectorRunTaskRunsArchivedTask(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { archived map[string][]base.Z @@ -2401,7 +2319,7 @@ func TestInspectorRunTaskError(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { archived map[string][]base.Z @@ -2496,7 +2414,7 @@ func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) { m1 := h.NewTaskMessage("task1", nil) m2 := h.NewTaskMessageWithQueue("task2", nil, "custom") m3 := h.NewTaskMessageWithQueue("task3", nil, "custom") - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) now := time.Now() tests := []struct { @@ -2591,7 +2509,7 @@ func TestInspectorArchiveTaskArchivesScheduledTask(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { scheduled map[string][]base.Z @@ -2667,7 +2585,7 @@ func TestInspectorArchiveTaskArchivesRetryTask(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -2741,7 +2659,7 @@ func TestInspectorArchiveTaskError(t *testing.T) { z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()} z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()} - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) tests := []struct { retry map[string][]base.Z @@ -2833,7 +2751,7 @@ var sortSchedulerEntry = cmp.Transformer("SortSchedulerEntry", func(in []*Schedu func TestInspectorSchedulerEntries(t *testing.T) { r := setup(t) rdbClient := rdb.NewRDB(r) - inspector := New(getRedisConnOpt(t)) + inspector := NewInspector(getRedisConnOpt(t)) now := time.Now().UTC() schedulerID := "127.0.0.1:9876:abc123" @@ -2864,15 +2782,15 @@ func TestInspectorSchedulerEntries(t *testing.T) { want: []*SchedulerEntry{ { Spec: "* * * * *", - Task: asynq.NewTask("foo", nil), + Task: NewTask("foo", nil), Opts: nil, Next: now.Add(5 * time.Hour), Prev: now.Add(-2 * time.Hour), }, { Spec: "@every 20m", - Task: asynq.NewTask("bar", h.JSON(map[string]interface{}{"fiz": "baz"})), - Opts: []asynq.Option{asynq.Queue("bar"), asynq.MaxRetry(20)}, + Task: NewTask("bar", h.JSON(map[string]interface{}{"fiz": "baz"})), + Opts: []Option{Queue("bar"), MaxRetry(20)}, Next: now.Add(1 * time.Minute), Prev: now.Add(-19 * time.Minute), }, @@ -2891,7 +2809,7 @@ func TestInspectorSchedulerEntries(t *testing.T) { t.Errorf("SchedulerEntries() returned error: %v", err) continue } - ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}) + ignoreOpt := cmpopts.IgnoreUnexported(Task{}) if diff := cmp.Diff(tc.want, got, sortSchedulerEntry, ignoreOpt); diff != "" { t.Errorf("SchedulerEntries() = %v, want %v; (-want,+got)\n%s", got, tc.want, diff) @@ -2903,16 +2821,16 @@ func TestParseOption(t *testing.T) { oneHourFromNow := time.Now().Add(1 * time.Hour) tests := []struct { s string - wantType asynq.OptionType + wantType OptionType wantVal interface{} }{ - {`MaxRetry(10)`, asynq.MaxRetryOpt, 10}, - {`Queue("email")`, asynq.QueueOpt, "email"}, - {`Timeout(3m)`, asynq.TimeoutOpt, 3 * time.Minute}, - {asynq.Deadline(oneHourFromNow).String(), asynq.DeadlineOpt, oneHourFromNow}, - {`Unique(1h)`, asynq.UniqueOpt, 1 * time.Hour}, - {asynq.ProcessAt(oneHourFromNow).String(), asynq.ProcessAtOpt, oneHourFromNow}, - {`ProcessIn(10m)`, asynq.ProcessInOpt, 10 * time.Minute}, + {`MaxRetry(10)`, MaxRetryOpt, 10}, + {`Queue("email")`, QueueOpt, "email"}, + {`Timeout(3m)`, TimeoutOpt, 3 * time.Minute}, + {Deadline(oneHourFromNow).String(), DeadlineOpt, oneHourFromNow}, + {`Unique(1h)`, UniqueOpt, 1 * time.Hour}, + {ProcessAt(oneHourFromNow).String(), ProcessAtOpt, oneHourFromNow}, + {`ProcessIn(10m)`, ProcessInOpt, 10 * time.Minute}, } for _, tc := range tests { @@ -2928,7 +2846,7 @@ func TestParseOption(t *testing.T) { t.Fatalf("got type %v, want type %v ", got.Type(), tc.wantType) } switch tc.wantType { - case asynq.QueueOpt: + case QueueOpt: gotVal, ok := got.Value().(string) if !ok { t.Fatal("returned Option with non-string value") @@ -2936,7 +2854,7 @@ func TestParseOption(t *testing.T) { if gotVal != tc.wantVal.(string) { t.Fatalf("got value %v, want %v", gotVal, tc.wantVal) } - case asynq.MaxRetryOpt: + case MaxRetryOpt: gotVal, ok := got.Value().(int) if !ok { t.Fatal("returned Option with non-int value") @@ -2944,7 +2862,7 @@ func TestParseOption(t *testing.T) { if gotVal != tc.wantVal.(int) { t.Fatalf("got value %v, want %v", gotVal, tc.wantVal) } - case asynq.TimeoutOpt, asynq.UniqueOpt, asynq.ProcessInOpt: + case TimeoutOpt, UniqueOpt, ProcessInOpt: gotVal, ok := got.Value().(time.Duration) if !ok { t.Fatal("returned Option with non duration value") @@ -2952,7 +2870,7 @@ func TestParseOption(t *testing.T) { if gotVal != tc.wantVal.(time.Duration) { t.Fatalf("got value %v, want %v", gotVal, tc.wantVal) } - case asynq.DeadlineOpt, asynq.ProcessAtOpt: + case DeadlineOpt, ProcessAtOpt: gotVal, ok := got.Value().(time.Time) if !ok { t.Fatal("returned Option with non time value") diff --git a/inspeq/doc.go b/inspeq/doc.go deleted file mode 100644 index 24fb5d0..0000000 --- a/inspeq/doc.go +++ /dev/null @@ -1,22 +0,0 @@ -// Copyright 2020 Kentaro Hibino. All rights reserved. -// Use of this source code is governed by a MIT license -// that can be found in the LICENSE file. - -/* -Package inspeq provides helper types and functions to inspect queues and tasks managed by Asynq. - -Inspector is used to query and mutate the state of queues and tasks. - -Example: - - inspector := inspeq.New(asynq.RedisClientOpt{Addr: "localhost:6379"}) - - tasks, err := inspector.ListArchivedTasks("my-queue") - - for _, t := range tasks { - if err := inspector.DeleteTaskByKey(t.Key()); err != nil { - // handle error - } - } -*/ -package inspeq