2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

Move inspeq package content to asynq package

This commit is contained in:
Ken Hibino 2021-05-18 18:45:15 -07:00
parent 0ec3b55e6b
commit 12f4c7cf6e
4 changed files with 90 additions and 195 deletions

View File

@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `Inspector.RunTaskByKey` is replaced with `Inspector.RunTask`
- `Inspector.DeleteTaskByKey` is replaced with `Inspector.DeleteTask`
- `Inspector.ArchiveTaskByKey` is replaced with `Inspector.ArchiveTask`
- `inspeq` package is removed. All types and functions from the package is moved to `asynq` package.
## [0.17.2] - 2021-06-06

View File

@ -2,7 +2,7 @@
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package inspeq
package asynq
import (
"fmt"
@ -12,7 +12,6 @@ import (
"github.com/go-redis/redis/v7"
"github.com/google/uuid"
"github.com/hibiken/asynq"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb"
@ -25,7 +24,7 @@ type Inspector struct {
}
// New returns a new instance of Inspector.
func New(r asynq.RedisConnOpt) *Inspector {
func NewInspector(r RedisConnOpt) *Inspector {
c, ok := r.MakeRedisClient().(redis.UniversalClient)
if !ok {
panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r))
@ -170,7 +169,7 @@ func (i *Inspector) DeleteQueue(qname string, force bool) error {
// PendingTask is a task in a queue and is ready to be processed.
type PendingTask struct {
*asynq.Task
*Task
ID string
Queue string
MaxRetry int
@ -180,7 +179,7 @@ type PendingTask struct {
// ActiveTask is a task that's currently being processed.
type ActiveTask struct {
*asynq.Task
*Task
ID string
Queue string
MaxRetry int
@ -190,7 +189,7 @@ type ActiveTask struct {
// ScheduledTask is a task scheduled to be processed in the future.
type ScheduledTask struct {
*asynq.Task
*Task
ID string
Queue string
MaxRetry int
@ -203,7 +202,7 @@ type ScheduledTask struct {
// RetryTask is a task scheduled to be retried in the future.
type RetryTask struct {
*asynq.Task
*Task
ID string
Queue string
NextProcessAt time.Time
@ -220,7 +219,7 @@ type RetryTask struct {
// A task can be archived when the task exhausts its retry counts or manually
// archived by a user via the CLI or Inspector.
type ArchivedTask struct {
*asynq.Task
*Task
ID string
Queue string
MaxRetry int
@ -366,7 +365,7 @@ func (i *Inspector) ListPendingTasks(qname string, opts ...ListOption) ([]*Pendi
var tasks []*PendingTask
for _, m := range msgs {
tasks = append(tasks, &PendingTask{
Task: asynq.NewTask(m.Type, m.Payload),
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
MaxRetry: m.Retry,
@ -392,9 +391,8 @@ func (i *Inspector) ListActiveTasks(qname string, opts ...ListOption) ([]*Active
}
var tasks []*ActiveTask
for _, m := range msgs {
tasks = append(tasks, &ActiveTask{
Task: asynq.NewTask(m.Type, m.Payload),
Task: NewTask(m.Type, m.Payload),
ID: m.ID.String(),
Queue: m.Queue,
MaxRetry: m.Retry,
@ -422,7 +420,7 @@ func (i *Inspector) ListScheduledTasks(qname string, opts ...ListOption) ([]*Sch
var tasks []*ScheduledTask
for _, z := range zs {
processAt := time.Unix(z.Score, 0)
t := asynq.NewTask(z.Message.Type, z.Message.Payload)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &ScheduledTask{
Task: t,
ID: z.Message.ID.String(),
@ -454,7 +452,7 @@ func (i *Inspector) ListRetryTasks(qname string, opts ...ListOption) ([]*RetryTa
var tasks []*RetryTask
for _, z := range zs {
processAt := time.Unix(z.Score, 0)
t := asynq.NewTask(z.Message.Type, z.Message.Payload)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &RetryTask{
Task: t,
ID: z.Message.ID.String(),
@ -487,7 +485,7 @@ func (i *Inspector) ListArchivedTasks(qname string, opts ...ListOption) ([]*Arch
var tasks []*ArchivedTask
for _, z := range zs {
failedAt := time.Unix(z.Score, 0)
t := asynq.NewTask(z.Message.Type, z.Message.Payload)
t := NewTask(z.Message.Type, z.Message.Payload)
tasks = append(tasks, &ArchivedTask{
Task: t,
ID: z.Message.ID.String(),
@ -743,7 +741,7 @@ func (i *Inspector) Servers() ([]*ServerInfo, error) {
Started: w.Started,
Deadline: w.Deadline,
Task: &ActiveTask{
Task: asynq.NewTask(w.Type, w.Payload),
Task: NewTask(w.Type, w.Payload),
ID: w.ID,
Queue: w.Queue,
},
@ -827,10 +825,10 @@ type SchedulerEntry struct {
Spec string
// Periodic Task registered for this entry.
Task *asynq.Task
Task *Task
// Opts is the options for the periodic task.
Opts []asynq.Option
Opts []Option
// Next shows the next time the task will be enqueued.
Next time.Time
@ -849,8 +847,8 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {
return nil, err
}
for _, e := range res {
task := asynq.NewTask(e.Type, e.Payload)
var opts []asynq.Option
task := NewTask(e.Type, e.Payload)
var opts []Option
for _, s := range e.Opts {
if o, err := parseOption(s); err == nil {
// ignore bad data
@ -871,7 +869,7 @@ func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {
// parseOption interprets a string s as an Option and returns the Option if parsing is successful,
// otherwise returns non-nil error.
func parseOption(s string) (asynq.Option, error) {
func parseOption(s string) (Option, error) {
fn, arg := parseOptionFunc(s), parseOptionArg(s)
switch fn {
case "Queue":
@ -879,43 +877,43 @@ func parseOption(s string) (asynq.Option, error) {
if err != nil {
return nil, err
}
return asynq.Queue(qname), nil
return Queue(qname), nil
case "MaxRetry":
n, err := strconv.Atoi(arg)
if err != nil {
return nil, err
}
return asynq.MaxRetry(n), nil
return MaxRetry(n), nil
case "Timeout":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return asynq.Timeout(d), nil
return Timeout(d), nil
case "Deadline":
t, err := time.Parse(time.UnixDate, arg)
if err != nil {
return nil, err
}
return asynq.Deadline(t), nil
return Deadline(t), nil
case "Unique":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return asynq.Unique(d), nil
return Unique(d), nil
case "ProcessAt":
t, err := time.Parse(time.UnixDate, arg)
if err != nil {
return nil, err
}
return asynq.ProcessAt(t), nil
return ProcessAt(t), nil
case "ProcessIn":
d, err := time.ParseDuration(arg)
if err != nil {
return nil, err
}
return asynq.ProcessIn(d), nil
return ProcessIn(d), nil
default:
return nil, fmt.Errorf("cannot not parse option string %q", s)
}

View File

@ -2,110 +2,28 @@
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package inspeq
package asynq
import (
"errors"
"flag"
"fmt"
"math"
"sort"
"strings"
"testing"
"time"
"github.com/go-redis/redis/v7"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"github.com/hibiken/asynq"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb"
)
// variables used for package testing.
var (
redisAddr string
redisDB int
useRedisCluster bool
redisClusterAddrs string // comma-separated list of host:port
testLogLevel = asynq.FatalLevel
)
var testLogger *log.Logger
func init() {
flag.StringVar(&redisAddr, "redis_addr", "localhost:6379", "redis address to use in testing")
flag.IntVar(&redisDB, "redis_db", 13, "redis db number to use in testing")
flag.BoolVar(&useRedisCluster, "redis_cluster", false, "use redis cluster as a broker in testing")
flag.StringVar(&redisClusterAddrs, "redis_cluster_addrs", "localhost:7000,localhost:7001,localhost:7002", "comma separated list of redis server addresses")
flag.Var(&testLogLevel, "loglevel", "log level to use in testing")
testLogger = log.NewLogger(nil)
testLogger.SetLevel(toInternalLogLevel(testLogLevel))
}
func toInternalLogLevel(l asynq.LogLevel) log.Level {
switch l {
case asynq.DebugLevel:
return log.DebugLevel
case asynq.InfoLevel:
return log.InfoLevel
case asynq.WarnLevel:
return log.WarnLevel
case asynq.ErrorLevel:
return log.ErrorLevel
case asynq.FatalLevel:
return log.FatalLevel
}
panic(fmt.Sprintf("inspeq: unexpected log level: %v", l))
}
func setup(tb testing.TB) (r redis.UniversalClient) {
tb.Helper()
if useRedisCluster {
addrs := strings.Split(redisClusterAddrs, ",")
if len(addrs) == 0 {
tb.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.")
}
r = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: addrs,
})
} else {
r = redis.NewClient(&redis.Options{
Addr: redisAddr,
DB: redisDB,
})
}
// Start each test with a clean slate.
h.FlushDB(tb, r)
return r
}
func getRedisConnOpt(tb testing.TB) asynq.RedisConnOpt {
tb.Helper()
if useRedisCluster {
addrs := strings.Split(redisClusterAddrs, ",")
if len(addrs) == 0 {
tb.Fatal("No redis cluster addresses provided. Please set addresses using --redis_cluster_addrs flag.")
}
return asynq.RedisClusterClientOpt{
Addrs: addrs,
}
}
return asynq.RedisClientOpt{
Addr: redisAddr,
DB: redisDB,
}
}
func TestInspectorQueues(t *testing.T) {
r := setup(t)
defer r.Close()
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
queues []string
@ -138,7 +56,7 @@ func TestInspectorQueues(t *testing.T) {
func TestInspectorDeleteQueue(t *testing.T) {
r := setup(t)
defer r.Close()
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
defer inspector.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
@ -227,7 +145,7 @@ func TestInspectorDeleteQueue(t *testing.T) {
func TestInspectorDeleteQueueErrorQueueNotEmpty(t *testing.T) {
r := setup(t)
defer r.Close()
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
defer inspector.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
@ -283,7 +201,7 @@ func TestInspectorDeleteQueueErrorQueueNotEmpty(t *testing.T) {
func TestInspectorDeleteQueueErrorQueueNotFound(t *testing.T) {
r := setup(t)
defer r.Close()
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
defer inspector.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
@ -349,7 +267,7 @@ func TestInspectorCurrentStats(t *testing.T) {
timeCmpOpt := cmpopts.EquateApproxTime(time.Second)
ignoreMemUsg := cmpopts.IgnoreFields(QueueStats{}, "MemoryUsage")
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
pending map[string][]*base.TaskMessage
@ -453,7 +371,7 @@ func TestInspectorHistory(t *testing.T) {
r := setup(t)
defer r.Close()
now := time.Now().UTC()
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
qname string // queue of interest
@ -506,7 +424,7 @@ func TestInspectorHistory(t *testing.T) {
func createPendingTask(msg *base.TaskMessage) *PendingTask {
return &PendingTask{
Task: asynq.NewTask(msg.Type, msg.Payload),
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
MaxRetry: msg.Retry,
@ -523,7 +441,7 @@ func TestInspectorListPendingTasks(t *testing.T) {
m3 := h.NewTaskMessageWithQueue("task3", nil, "critical")
m4 := h.NewTaskMessageWithQueue("task4", nil, "low")
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
desc string
@ -576,7 +494,7 @@ func TestInspectorListPendingTasks(t *testing.T) {
tc.desc, tc.qname, err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{})
ignoreOpt := cmpopts.IgnoreUnexported(Task{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("%s; ListPendingTasks(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff)
@ -592,11 +510,11 @@ func TestInspectorListActiveTasks(t *testing.T) {
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
createActiveTask := func(msg *base.TaskMessage) *ActiveTask {
return &ActiveTask{
Task: asynq.NewTask(msg.Type, msg.Payload),
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
MaxRetry: msg.Retry,
@ -634,7 +552,7 @@ func TestInspectorListActiveTasks(t *testing.T) {
t.Errorf("%s; ListActiveTasks(%q) returned error: %v", tc.qname, tc.desc, err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{})
ignoreOpt := cmpopts.IgnoreUnexported(Task{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("%s; ListActiveTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff)
@ -645,7 +563,7 @@ func TestInspectorListActiveTasks(t *testing.T) {
func createScheduledTask(z base.Z) *ScheduledTask {
msg := z.Message
return &ScheduledTask{
Task: asynq.NewTask(msg.Type, msg.Payload),
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
MaxRetry: msg.Retry,
@ -669,7 +587,7 @@ func TestInspectorListScheduledTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
desc string
@ -710,7 +628,7 @@ func TestInspectorListScheduledTasks(t *testing.T) {
t.Errorf("%s; ListScheduledTasks(%q) returned error: %v", tc.desc, tc.qname, err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, ScheduledTask{})
ignoreOpt := cmpopts.IgnoreUnexported(Task{}, ScheduledTask{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("%s; ListScheduledTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff)
@ -721,7 +639,7 @@ func TestInspectorListScheduledTasks(t *testing.T) {
func createRetryTask(z base.Z) *RetryTask {
msg := z.Message
return &RetryTask{
Task: asynq.NewTask(msg.Type, msg.Payload),
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
NextProcessAt: time.Unix(z.Score, 0),
@ -745,7 +663,7 @@ func TestInspectorListRetryTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
desc string
@ -787,7 +705,7 @@ func TestInspectorListRetryTasks(t *testing.T) {
t.Errorf("%s; ListRetryTasks(%q) returned error: %v", tc.desc, tc.qname, err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, RetryTask{})
ignoreOpt := cmpopts.IgnoreUnexported(Task{}, RetryTask{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("%s; ListRetryTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff)
@ -798,7 +716,7 @@ func TestInspectorListRetryTasks(t *testing.T) {
func createArchivedTask(z base.Z) *ArchivedTask {
msg := z.Message
return &ArchivedTask{
Task: asynq.NewTask(msg.Type, msg.Payload),
Task: NewTask(msg.Type, msg.Payload),
ID: msg.ID.String(),
Queue: msg.Queue,
MaxRetry: msg.Retry,
@ -822,7 +740,7 @@ func TestInspectorListArchivedTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(-2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
desc string
@ -863,7 +781,7 @@ func TestInspectorListArchivedTasks(t *testing.T) {
t.Errorf("%s; ListArchivedTasks(%q) returned error: %v", tc.desc, tc.qname, err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{}, ArchivedTask{})
ignoreOpt := cmpopts.IgnoreUnexported(Task{}, ArchivedTask{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("%s; ListArchivedTask(%q) = %v, want %v; (-want,+got)\n%s",
tc.desc, tc.qname, got, tc.want, diff)
@ -882,7 +800,7 @@ func TestInspectorListPagination(t *testing.T) {
defer r.Close()
h.SeedPendingQueue(t, r, msgs, base.DefaultQueueName)
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
page int
@ -924,7 +842,7 @@ func TestInspectorListPagination(t *testing.T) {
t.Errorf("ListPendingTask('default') returned error: %v", err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{})
ignoreOpt := cmpopts.IgnoreUnexported(Task{})
if diff := cmp.Diff(tc.want, got, ignoreOpt); diff != "" {
t.Errorf("ListPendingTask('default') = %v, want %v; (-want,+got)\n%s",
got, tc.want, diff)
@ -940,7 +858,7 @@ func TestInspectorDeleteAllPendingTasks(t *testing.T) {
m3 := h.NewTaskMessage("task3", nil)
m4 := h.NewTaskMessageWithQueue("task3", nil, "custom")
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
pending map[string][]*base.TaskMessage
@ -1009,7 +927,7 @@ func TestInspectorDeleteAllScheduledTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
scheduled map[string][]base.Z
@ -1075,7 +993,7 @@ func TestInspectorDeleteAllRetryTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
retry map[string][]base.Z
@ -1141,7 +1059,7 @@ func TestInspectorDeleteAllArchivedTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
archived map[string][]base.Z
@ -1204,7 +1122,7 @@ func TestInspectorArchiveAllPendingTasks(t *testing.T) {
now := time.Now()
z1 := base.Z{Message: m1, Score: now.Add(5 * time.Minute).Unix()}
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
pending map[string][]*base.TaskMessage
@ -1321,7 +1239,7 @@ func TestInspectorArchiveAllScheduledTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
scheduled map[string][]base.Z
@ -1454,7 +1372,7 @@ func TestInspectorArchiveAllRetryTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
retry map[string][]base.Z
@ -1568,7 +1486,7 @@ func TestInspectorRunAllScheduledTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
scheduled map[string][]base.Z
@ -1685,7 +1603,7 @@ func TestInspectorRunAllRetryTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
retry map[string][]base.Z
@ -1802,7 +1720,7 @@ func TestInspectorRunAllArchivedTasks(t *testing.T) {
z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()}
z4 := base.Z{Message: m4, Score: now.Add(-2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
archived map[string][]base.Z
@ -1909,7 +1827,7 @@ func TestInspectorDeleteTaskDeletesPendingTask(t *testing.T) {
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
pending map[string][]*base.TaskMessage
@ -1974,7 +1892,7 @@ func TestInspectorDeleteTaskDeletesScheduledTask(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
scheduled map[string][]base.Z
@ -2024,7 +1942,7 @@ func TestInspectorDeleteTaskDeletesRetryTask(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
retry map[string][]base.Z
@ -2074,7 +1992,7 @@ func TestInspectorDeleteTaskDeletesArchivedTask(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
archived map[string][]base.Z
@ -2124,7 +2042,7 @@ func TestInspectorDeleteTaskError(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
archived map[string][]base.Z
@ -2189,7 +2107,7 @@ func TestInspectorRunTaskRunsScheduledTask(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
scheduled map[string][]base.Z
@ -2259,7 +2177,7 @@ func TestInspectorRunTaskRunsRetryTask(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
retry map[string][]base.Z
@ -2328,7 +2246,7 @@ func TestInspectorRunTaskRunsArchivedTask(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
archived map[string][]base.Z
@ -2401,7 +2319,7 @@ func TestInspectorRunTaskError(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(-15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(-2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
archived map[string][]base.Z
@ -2496,7 +2414,7 @@ func TestInspectorArchiveTaskArchivesPendingTask(t *testing.T) {
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessageWithQueue("task2", nil, "custom")
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
now := time.Now()
tests := []struct {
@ -2591,7 +2509,7 @@ func TestInspectorArchiveTaskArchivesScheduledTask(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
scheduled map[string][]base.Z
@ -2667,7 +2585,7 @@ func TestInspectorArchiveTaskArchivesRetryTask(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
retry map[string][]base.Z
@ -2741,7 +2659,7 @@ func TestInspectorArchiveTaskError(t *testing.T) {
z2 := base.Z{Message: m2, Score: now.Add(15 * time.Minute).Unix()}
z3 := base.Z{Message: m3, Score: now.Add(2 * time.Minute).Unix()}
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
tests := []struct {
retry map[string][]base.Z
@ -2833,7 +2751,7 @@ var sortSchedulerEntry = cmp.Transformer("SortSchedulerEntry", func(in []*Schedu
func TestInspectorSchedulerEntries(t *testing.T) {
r := setup(t)
rdbClient := rdb.NewRDB(r)
inspector := New(getRedisConnOpt(t))
inspector := NewInspector(getRedisConnOpt(t))
now := time.Now().UTC()
schedulerID := "127.0.0.1:9876:abc123"
@ -2864,15 +2782,15 @@ func TestInspectorSchedulerEntries(t *testing.T) {
want: []*SchedulerEntry{
{
Spec: "* * * * *",
Task: asynq.NewTask("foo", nil),
Task: NewTask("foo", nil),
Opts: nil,
Next: now.Add(5 * time.Hour),
Prev: now.Add(-2 * time.Hour),
},
{
Spec: "@every 20m",
Task: asynq.NewTask("bar", h.JSON(map[string]interface{}{"fiz": "baz"})),
Opts: []asynq.Option{asynq.Queue("bar"), asynq.MaxRetry(20)},
Task: NewTask("bar", h.JSON(map[string]interface{}{"fiz": "baz"})),
Opts: []Option{Queue("bar"), MaxRetry(20)},
Next: now.Add(1 * time.Minute),
Prev: now.Add(-19 * time.Minute),
},
@ -2891,7 +2809,7 @@ func TestInspectorSchedulerEntries(t *testing.T) {
t.Errorf("SchedulerEntries() returned error: %v", err)
continue
}
ignoreOpt := cmpopts.IgnoreUnexported(asynq.Task{})
ignoreOpt := cmpopts.IgnoreUnexported(Task{})
if diff := cmp.Diff(tc.want, got, sortSchedulerEntry, ignoreOpt); diff != "" {
t.Errorf("SchedulerEntries() = %v, want %v; (-want,+got)\n%s",
got, tc.want, diff)
@ -2903,16 +2821,16 @@ func TestParseOption(t *testing.T) {
oneHourFromNow := time.Now().Add(1 * time.Hour)
tests := []struct {
s string
wantType asynq.OptionType
wantType OptionType
wantVal interface{}
}{
{`MaxRetry(10)`, asynq.MaxRetryOpt, 10},
{`Queue("email")`, asynq.QueueOpt, "email"},
{`Timeout(3m)`, asynq.TimeoutOpt, 3 * time.Minute},
{asynq.Deadline(oneHourFromNow).String(), asynq.DeadlineOpt, oneHourFromNow},
{`Unique(1h)`, asynq.UniqueOpt, 1 * time.Hour},
{asynq.ProcessAt(oneHourFromNow).String(), asynq.ProcessAtOpt, oneHourFromNow},
{`ProcessIn(10m)`, asynq.ProcessInOpt, 10 * time.Minute},
{`MaxRetry(10)`, MaxRetryOpt, 10},
{`Queue("email")`, QueueOpt, "email"},
{`Timeout(3m)`, TimeoutOpt, 3 * time.Minute},
{Deadline(oneHourFromNow).String(), DeadlineOpt, oneHourFromNow},
{`Unique(1h)`, UniqueOpt, 1 * time.Hour},
{ProcessAt(oneHourFromNow).String(), ProcessAtOpt, oneHourFromNow},
{`ProcessIn(10m)`, ProcessInOpt, 10 * time.Minute},
}
for _, tc := range tests {
@ -2928,7 +2846,7 @@ func TestParseOption(t *testing.T) {
t.Fatalf("got type %v, want type %v ", got.Type(), tc.wantType)
}
switch tc.wantType {
case asynq.QueueOpt:
case QueueOpt:
gotVal, ok := got.Value().(string)
if !ok {
t.Fatal("returned Option with non-string value")
@ -2936,7 +2854,7 @@ func TestParseOption(t *testing.T) {
if gotVal != tc.wantVal.(string) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
case asynq.MaxRetryOpt:
case MaxRetryOpt:
gotVal, ok := got.Value().(int)
if !ok {
t.Fatal("returned Option with non-int value")
@ -2944,7 +2862,7 @@ func TestParseOption(t *testing.T) {
if gotVal != tc.wantVal.(int) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
case asynq.TimeoutOpt, asynq.UniqueOpt, asynq.ProcessInOpt:
case TimeoutOpt, UniqueOpt, ProcessInOpt:
gotVal, ok := got.Value().(time.Duration)
if !ok {
t.Fatal("returned Option with non duration value")
@ -2952,7 +2870,7 @@ func TestParseOption(t *testing.T) {
if gotVal != tc.wantVal.(time.Duration) {
t.Fatalf("got value %v, want %v", gotVal, tc.wantVal)
}
case asynq.DeadlineOpt, asynq.ProcessAtOpt:
case DeadlineOpt, ProcessAtOpt:
gotVal, ok := got.Value().(time.Time)
if !ok {
t.Fatal("returned Option with non time value")

View File

@ -1,22 +0,0 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
/*
Package inspeq provides helper types and functions to inspect queues and tasks managed by Asynq.
Inspector is used to query and mutate the state of queues and tasks.
Example:
inspector := inspeq.New(asynq.RedisClientOpt{Addr: "localhost:6379"})
tasks, err := inspector.ListArchivedTasks("my-queue")
for _, t := range tasks {
if err := inspector.DeleteTaskByKey(t.Key()); err != nil {
// handle error
}
}
*/
package inspeq