diff --git a/client.go b/client.go index dc1841c..81ec5a5 100644 --- a/client.go +++ b/client.go @@ -5,7 +5,6 @@ package asynq import ( - "errors" "fmt" "strings" "sync" @@ -14,6 +13,7 @@ import ( "github.com/go-redis/redis/v7" "github.com/google/uuid" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/rdb" ) @@ -346,7 +346,7 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) { err = c.schedule(msg, opt.processAt, opt.uniqueTTL) } switch { - case err == rdb.ErrDuplicateTask: + case errors.Is(err, errors.ErrDuplicateTask): return nil, fmt.Errorf("%w", ErrDuplicateTask) case err != nil: return nil, err diff --git a/inspeq/inspector.go b/inspeq/inspector.go index e300b09..2aa2e7b 100644 --- a/inspeq/inspector.go +++ b/inspeq/inspector.go @@ -5,7 +5,6 @@ package inspeq import ( - "errors" "fmt" "strconv" "strings" @@ -15,6 +14,7 @@ import ( "github.com/google/uuid" "github.com/hibiken/asynq" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/rdb" ) @@ -154,10 +154,10 @@ var ( // returns ErrQueueNotEmpty. func (i *Inspector) DeleteQueue(qname string, force bool) error { err := i.rdb.RemoveQueue(qname, force) - if _, ok := err.(*rdb.QueueNotFoundError); ok { + if errors.IsQueueNotFound(err) { return fmt.Errorf("%w: queue=%q", ErrQueueNotFound, qname) } - if _, ok := err.(*rdb.QueueNotEmptyError); ok { + if errors.IsQueueNotEmpty(err) { return fmt.Errorf("%w: queue=%q", ErrQueueNotEmpty, qname) } return err @@ -549,13 +549,13 @@ func (i *Inspector) DeleteTaskByKey(qname, key string) error { } switch prefix { case keyPrefixPending: - return i.rdb.DeletePendingTask(qname, id) + return i.rdb.DeleteTask(qname, id) case keyPrefixScheduled: - return i.rdb.DeleteScheduledTask(qname, id) + return i.rdb.DeleteTask(qname, id) case keyPrefixRetry: - return i.rdb.DeleteRetryTask(qname, id) + return i.rdb.DeleteTask(qname, id) case keyPrefixArchived: - return i.rdb.DeleteArchivedTask(qname, id) + return i.rdb.DeleteTask(qname, id) default: return fmt.Errorf("invalid key") } @@ -657,11 +657,11 @@ func (i *Inspector) ArchiveTaskByKey(qname, key string) error { } switch prefix { case keyPrefixPending: - return i.rdb.ArchivePendingTask(qname, id) + return i.rdb.ArchiveTask(qname, id) case keyPrefixScheduled: - return i.rdb.ArchiveScheduledTask(qname, id) + return i.rdb.ArchiveTask(qname, id) case keyPrefixRetry: - return i.rdb.ArchiveRetryTask(qname, id) + return i.rdb.ArchiveTask(qname, id) case keyPrefixArchived: return fmt.Errorf("task is already archived") default: diff --git a/inspeq/inspector_test.go b/inspeq/inspector_test.go index 0fedc9f..e66d70a 100644 --- a/inspeq/inspector_test.go +++ b/inspeq/inspector_test.go @@ -5,6 +5,7 @@ package inspeq import ( + "errors" "flag" "fmt" "math" @@ -271,7 +272,7 @@ func TestInspectorDeleteQueueErrorQueueNotEmpty(t *testing.T) { h.SeedAllArchivedQueues(t, r, tc.archived) err := inspector.DeleteQueue(tc.qname, tc.force) - if _, ok := err.(*ErrQueueNotEmpty); !ok { + if !errors.Is(err, ErrQueueNotEmpty) { t.Errorf("DeleteQueue(%v, %t) did not return ErrQueueNotEmpty", tc.qname, tc.force) } @@ -327,7 +328,7 @@ func TestInspectorDeleteQueueErrorQueueNotFound(t *testing.T) { h.SeedAllArchivedQueues(t, r, tc.archived) err := inspector.DeleteQueue(tc.qname, tc.force) - if _, ok := err.(*ErrQueueNotFound); !ok { + if !errors.Is(err, ErrQueueNotFound) { t.Errorf("DeleteQueue(%v, %t) did not return ErrQueueNotFound", tc.qname, tc.force) } diff --git a/processor.go b/processor.go index 605d077..c1857bf 100644 --- a/processor.go +++ b/processor.go @@ -6,7 +6,6 @@ package asynq import ( "context" - "errors" "fmt" "math/rand" "runtime" @@ -17,8 +16,8 @@ import ( "time" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/errors" "github.com/hibiken/asynq/internal/log" - "github.com/hibiken/asynq/internal/rdb" "golang.org/x/time/rate" ) @@ -163,7 +162,7 @@ func (p *processor) exec() { qnames := p.queues() msg, deadline, err := p.broker.Dequeue(qnames...) switch { - case err == rdb.ErrNoProcessableTask: + case errors.Is(err, errors.ErrNoProcessableTask): p.logger.Debug("All queues are empty") // Queues are empty, this is a normal behavior. // Sleep to avoid slamming redis and let scheduler move tasks into queues.