diff --git a/internal/errors/errors.go b/internal/errors/errors.go index 4736095..ebcecfc 100644 --- a/internal/errors/errors.go +++ b/internal/errors/errors.go @@ -147,9 +147,17 @@ func CanonicalCode(err error) Code { } /****************************************** - Domin Specific Error Types + Domin Specific Error Types & Values *******************************************/ +var ( + // ErrNoProcessableTask indicates that there are no tasks ready to be processed. + ErrNoProcessableTask = errors.New("no tasks are ready for processing") + + // ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock. + ErrDuplicateTask = errors.New("task already exists") +) + // TaskNotFoundError indicates that a task with the given ID does not exist // in the given queue. type TaskNotFoundError struct { diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 4b8f101..1969664 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -6,28 +6,18 @@ package rdb import ( - "errors" "fmt" "time" "github.com/go-redis/redis/v7" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/errors" "github.com/spf13/cast" ) -// TODO: remove this & use internal/errors package instead. var ( // ErrNoProcessableTask indicates that there are no tasks ready to be processed. ErrNoProcessableTask = errors.New("no tasks are ready for processing") - - // ErrTaskNotFound indicates that a task that matches the given identifier was not found. - ErrTaskNotFound = fmt.Errorf("%w: could not find a task in the queue", base.ErrNotFound) - - // ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock. - ErrDuplicateTask = errors.New("task already exists") - - // ErrQueueNotFound indicates that a queue with the given name does not exist. - ErrQueueNotFound = fmt.Errorf("%w: queue does not exist", base.ErrNotFound) ) const statsTTL = 90 * 24 * time.Hour // 90 days @@ -115,12 +105,13 @@ return 1 // EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired. // It returns ErrDuplicateTask if the lock cannot be acquired. func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { + var op errors.Op = "rdb.EnqueueUnique" encoded, err := base.EncodeMessage(msg) if err != nil { - return err + return errors.E(op, errors.Internal, "cannot encode task message: %v", err) } if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { - return err + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) } keys := []string{ msg.UniqueKey, @@ -136,14 +127,14 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { } res, err := enqueueUniqueCmd.Run(r.client, keys, argv...).Result() if err != nil { - return err + return errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err)) } n, ok := res.(int64) if !ok { - return fmt.Errorf("could not cast %v to int64", res) + return errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from Lua script: %v", res)) } if n == 0 { - return ErrDuplicateTask + return errors.E(op, errors.AlreadyExists, errors.ErrDuplicateTask) } return nil } @@ -402,12 +393,13 @@ return 1 // ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired. // It returns ErrDuplicateTask if the lock cannot be acquired. func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error { + var op errors.Op = "rdb.ScheduleUnique" encoded, err := base.EncodeMessage(msg) if err != nil { - return err + return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode task message: %v", err)) } if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { - return err + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) } keys := []string{ msg.UniqueKey, @@ -424,14 +416,14 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim } res, err := scheduleUniqueCmd.Run(r.client, keys, argv...).Result() if err != nil { - return err + return errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err)) } n, ok := res.(int64) if !ok { - return fmt.Errorf("could not cast %v to int64", res) + return errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res)) } if n == 0 { - return ErrDuplicateTask + return errors.E(op, errors.AlreadyExists, errors.ErrDuplicateTask) } return nil } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 0f44828..234cc14 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -7,7 +7,6 @@ package rdb import ( "encoding/json" "flag" - "fmt" "strconv" "strings" "sync" @@ -20,6 +19,7 @@ import ( "github.com/google/uuid" h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/errors" ) // variables used for package testing. @@ -190,9 +190,8 @@ func TestEnqueueUnique(t *testing.T) { // Enqueue the second message, should fail. got := r.EnqueueUnique(tc.msg, tc.ttl) - if got != ErrDuplicateTask { - t.Errorf("Second message: (*RDB).EnqueueUnique(%v, %v) = %v, want %v", - tc.msg, tc.ttl, got, ErrDuplicateTask) + if !errors.Is(got, errors.ErrDuplicateTask) { + t.Errorf("Second message: (*RDB).EnqueueUnique(msg, ttl) = %v, want %v", got, errors.ErrDuplicateTask) continue } gotTTL := r.client.TTL(tc.msg.UniqueKey).Val() @@ -908,7 +907,7 @@ func TestScheduleUnique(t *testing.T) { for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - desc := fmt.Sprintf("(*RDB).ScheduleUnique(%v, %v, %v)", tc.msg, tc.processAt, tc.ttl) + desc := "(*RDB).ScheduleUnique(msg, processAt, ttl)" err := r.ScheduleUnique(tc.msg, tc.processAt, tc.ttl) if err != nil { t.Errorf("Frist task: %s = %v, want nil", desc, err) @@ -963,8 +962,8 @@ func TestScheduleUnique(t *testing.T) { // Enqueue the second message, should fail. got := r.ScheduleUnique(tc.msg, tc.processAt, tc.ttl) - if got != ErrDuplicateTask { - t.Errorf("Second task: %s = %v, want %v", desc, got, ErrDuplicateTask) + if !errors.Is(got, errors.ErrDuplicateTask) { + t.Errorf("Second task: %s = %v, want %v", desc, got, errors.ErrDuplicateTask) continue }