mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-27 08:12:19 +08:00
Update RDB.EnqueueUnique and RDB.ScheduleUnique with specific errors
This commit is contained in:
parent
ffe9aa74b3
commit
d98ecdebb4
@ -147,9 +147,17 @@ func CanonicalCode(err error) Code {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/******************************************
|
/******************************************
|
||||||
Domin Specific Error Types
|
Domin Specific Error Types & Values
|
||||||
*******************************************/
|
*******************************************/
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrNoProcessableTask indicates that there are no tasks ready to be processed.
|
||||||
|
ErrNoProcessableTask = errors.New("no tasks are ready for processing")
|
||||||
|
|
||||||
|
// ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock.
|
||||||
|
ErrDuplicateTask = errors.New("task already exists")
|
||||||
|
)
|
||||||
|
|
||||||
// TaskNotFoundError indicates that a task with the given ID does not exist
|
// TaskNotFoundError indicates that a task with the given ID does not exist
|
||||||
// in the given queue.
|
// in the given queue.
|
||||||
type TaskNotFoundError struct {
|
type TaskNotFoundError struct {
|
||||||
|
@ -6,28 +6,18 @@
|
|||||||
package rdb
|
package rdb
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-redis/redis/v7"
|
"github.com/go-redis/redis/v7"
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
|
"github.com/hibiken/asynq/internal/errors"
|
||||||
"github.com/spf13/cast"
|
"github.com/spf13/cast"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TODO: remove this & use internal/errors package instead.
|
|
||||||
var (
|
var (
|
||||||
// ErrNoProcessableTask indicates that there are no tasks ready to be processed.
|
// ErrNoProcessableTask indicates that there are no tasks ready to be processed.
|
||||||
ErrNoProcessableTask = errors.New("no tasks are ready for processing")
|
ErrNoProcessableTask = errors.New("no tasks are ready for processing")
|
||||||
|
|
||||||
// ErrTaskNotFound indicates that a task that matches the given identifier was not found.
|
|
||||||
ErrTaskNotFound = fmt.Errorf("%w: could not find a task in the queue", base.ErrNotFound)
|
|
||||||
|
|
||||||
// ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock.
|
|
||||||
ErrDuplicateTask = errors.New("task already exists")
|
|
||||||
|
|
||||||
// ErrQueueNotFound indicates that a queue with the given name does not exist.
|
|
||||||
ErrQueueNotFound = fmt.Errorf("%w: queue does not exist", base.ErrNotFound)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const statsTTL = 90 * 24 * time.Hour // 90 days
|
const statsTTL = 90 * 24 * time.Hour // 90 days
|
||||||
@ -115,12 +105,13 @@ return 1
|
|||||||
// EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired.
|
// EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired.
|
||||||
// It returns ErrDuplicateTask if the lock cannot be acquired.
|
// It returns ErrDuplicateTask if the lock cannot be acquired.
|
||||||
func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
|
func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
|
||||||
|
var op errors.Op = "rdb.EnqueueUnique"
|
||||||
encoded, err := base.EncodeMessage(msg)
|
encoded, err := base.EncodeMessage(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.E(op, errors.Internal, "cannot encode task message: %v", err)
|
||||||
}
|
}
|
||||||
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
|
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
|
||||||
return err
|
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||||
}
|
}
|
||||||
keys := []string{
|
keys := []string{
|
||||||
msg.UniqueKey,
|
msg.UniqueKey,
|
||||||
@ -136,14 +127,14 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
|
|||||||
}
|
}
|
||||||
res, err := enqueueUniqueCmd.Run(r.client, keys, argv...).Result()
|
res, err := enqueueUniqueCmd.Run(r.client, keys, argv...).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
|
||||||
}
|
}
|
||||||
n, ok := res.(int64)
|
n, ok := res.(int64)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("could not cast %v to int64", res)
|
return errors.E(op, errors.Internal, fmt.Sprintf("unexpected return value from Lua script: %v", res))
|
||||||
}
|
}
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
return ErrDuplicateTask
|
return errors.E(op, errors.AlreadyExists, errors.ErrDuplicateTask)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -402,12 +393,13 @@ return 1
|
|||||||
// ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired.
|
// ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired.
|
||||||
// It returns ErrDuplicateTask if the lock cannot be acquired.
|
// It returns ErrDuplicateTask if the lock cannot be acquired.
|
||||||
func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error {
|
func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error {
|
||||||
|
var op errors.Op = "rdb.ScheduleUnique"
|
||||||
encoded, err := base.EncodeMessage(msg)
|
encoded, err := base.EncodeMessage(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode task message: %v", err))
|
||||||
}
|
}
|
||||||
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
|
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
|
||||||
return err
|
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||||
}
|
}
|
||||||
keys := []string{
|
keys := []string{
|
||||||
msg.UniqueKey,
|
msg.UniqueKey,
|
||||||
@ -424,14 +416,14 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim
|
|||||||
}
|
}
|
||||||
res, err := scheduleUniqueCmd.Run(r.client, keys, argv...).Result()
|
res, err := scheduleUniqueCmd.Run(r.client, keys, argv...).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
|
||||||
}
|
}
|
||||||
n, ok := res.(int64)
|
n, ok := res.(int64)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("could not cast %v to int64", res)
|
return errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res))
|
||||||
}
|
}
|
||||||
if n == 0 {
|
if n == 0 {
|
||||||
return ErrDuplicateTask
|
return errors.E(op, errors.AlreadyExists, errors.ErrDuplicateTask)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -7,7 +7,6 @@ package rdb
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@ -20,6 +19,7 @@ import (
|
|||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
h "github.com/hibiken/asynq/internal/asynqtest"
|
h "github.com/hibiken/asynq/internal/asynqtest"
|
||||||
"github.com/hibiken/asynq/internal/base"
|
"github.com/hibiken/asynq/internal/base"
|
||||||
|
"github.com/hibiken/asynq/internal/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
// variables used for package testing.
|
// variables used for package testing.
|
||||||
@ -190,9 +190,8 @@ func TestEnqueueUnique(t *testing.T) {
|
|||||||
|
|
||||||
// Enqueue the second message, should fail.
|
// Enqueue the second message, should fail.
|
||||||
got := r.EnqueueUnique(tc.msg, tc.ttl)
|
got := r.EnqueueUnique(tc.msg, tc.ttl)
|
||||||
if got != ErrDuplicateTask {
|
if !errors.Is(got, errors.ErrDuplicateTask) {
|
||||||
t.Errorf("Second message: (*RDB).EnqueueUnique(%v, %v) = %v, want %v",
|
t.Errorf("Second message: (*RDB).EnqueueUnique(msg, ttl) = %v, want %v", got, errors.ErrDuplicateTask)
|
||||||
tc.msg, tc.ttl, got, ErrDuplicateTask)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
gotTTL := r.client.TTL(tc.msg.UniqueKey).Val()
|
gotTTL := r.client.TTL(tc.msg.UniqueKey).Val()
|
||||||
@ -908,7 +907,7 @@ func TestScheduleUnique(t *testing.T) {
|
|||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
h.FlushDB(t, r.client) // clean up db before each test case
|
h.FlushDB(t, r.client) // clean up db before each test case
|
||||||
|
|
||||||
desc := fmt.Sprintf("(*RDB).ScheduleUnique(%v, %v, %v)", tc.msg, tc.processAt, tc.ttl)
|
desc := "(*RDB).ScheduleUnique(msg, processAt, ttl)"
|
||||||
err := r.ScheduleUnique(tc.msg, tc.processAt, tc.ttl)
|
err := r.ScheduleUnique(tc.msg, tc.processAt, tc.ttl)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Frist task: %s = %v, want nil", desc, err)
|
t.Errorf("Frist task: %s = %v, want nil", desc, err)
|
||||||
@ -963,8 +962,8 @@ func TestScheduleUnique(t *testing.T) {
|
|||||||
|
|
||||||
// Enqueue the second message, should fail.
|
// Enqueue the second message, should fail.
|
||||||
got := r.ScheduleUnique(tc.msg, tc.processAt, tc.ttl)
|
got := r.ScheduleUnique(tc.msg, tc.processAt, tc.ttl)
|
||||||
if got != ErrDuplicateTask {
|
if !errors.Is(got, errors.ErrDuplicateTask) {
|
||||||
t.Errorf("Second task: %s = %v, want %v", desc, got, ErrDuplicateTask)
|
t.Errorf("Second task: %s = %v, want %v", desc, got, errors.ErrDuplicateTask)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user