2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 08:12:19 +08:00

Update RDB.Dequeue with new errors package

This commit is contained in:
Ken Hibino 2021-05-10 06:04:42 -07:00
parent 8117ce8972
commit a19ad19382
2 changed files with 115 additions and 73 deletions

View File

@ -15,11 +15,6 @@ import (
"github.com/spf13/cast" "github.com/spf13/cast"
) )
var (
// ErrNoProcessableTask indicates that there are no tasks ready to be processed.
ErrNoProcessableTask = errors.New("no tasks are ready for processing")
)
const statsTTL = 90 * 24 * time.Hour // 90 days const statsTTL = 90 * 24 * time.Hour // 90 days
// RDB is a client interface to query and mutate task queues. // RDB is a client interface to query and mutate task queues.
@ -139,32 +134,24 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
return nil return nil
} }
// Dequeue queries given queues in order and pops a task message // Input:
// off a queue if one exists and returns the message and deadline.
// Dequeue skips a queue if the queue is paused.
// If all queues are empty, ErrNoProcessableTask error is returned.
func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error) {
encoded, d, err := r.dequeue(qnames...)
if err != nil {
return nil, time.Time{}, err
}
if msg, err = base.DecodeMessage([]byte(encoded)); err != nil {
return nil, time.Time{}, err
}
return msg, time.Unix(d, 0), nil
}
// KEYS[1] -> asynq:{<qname>}:pending // KEYS[1] -> asynq:{<qname>}:pending
// KEYS[2] -> asynq:{<qname>}:paused // KEYS[2] -> asynq:{<qname>}:paused
// KEYS[3] -> asynq:{<qname>}:active // KEYS[3] -> asynq:{<qname>}:active
// KEYS[4] -> asynq:{<qname>}:deadlines // KEYS[4] -> asynq:{<qname>}:deadlines
// --
// ARGV[1] -> current time in Unix time // ARGV[1] -> current time in Unix time
// ARGV[2] -> task key prefix // ARGV[2] -> task key prefix
// //
// dequeueCmd checks whether a queue is paused first, before // Output:
// Returns nil if no processable task is found in the given queue.
// Returns tuple {msg , deadline} if task is found, where `msg` is the encoded
// TaskMessage, and `deadline` is Unix time in seconds.
//
// Note: dequeueCmd checks whether a queue is paused first, before
// calling RPOPLPUSH to pop a task from the queue. // calling RPOPLPUSH to pop a task from the queue.
// It computes the task deadline by inspecting Timout and Deadline fields, // It computes the task deadline by inspecting Timout and Deadline fields,
// and inserts the task with deadlines set. // and inserts the task to the deadlines zset with the computed deadline.
var dequeueCmd = redis.NewScript(` var dequeueCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[2]) == 0 then if redis.call("EXISTS", KEYS[2]) == 0 then
local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3]) local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
@ -191,7 +178,12 @@ if redis.call("EXISTS", KEYS[2]) == 0 then
end end
return nil`) return nil`)
func (r *RDB) dequeue(qnames ...string) (encoded string, deadline int64, err error) { // Dequeue queries given queues in order and pops a task message
// off a queue if one exists and returns the message and deadline.
// Dequeue skips a queue if the queue is paused.
// If all queues are empty, ErrNoProcessableTask error is returned.
func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error) {
var op errors.Op = "rdb.Dequeue"
for _, qname := range qnames { for _, qname := range qnames {
keys := []string{ keys := []string{
base.PendingKey(qname), base.PendingKey(qname),
@ -207,24 +199,29 @@ func (r *RDB) dequeue(qnames ...string) (encoded string, deadline int64, err err
if err == redis.Nil { if err == redis.Nil {
continue continue
} else if err != nil { } else if err != nil {
return "", 0, err return nil, time.Time{}, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err))
} }
data, err := cast.ToSliceE(res) data, err := cast.ToSliceE(res)
if err != nil { if err != nil {
return "", 0, err return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res))
} }
if len(data) != 2 { if len(data) != 2 {
return "", 0, fmt.Errorf("asynq: internal error: dequeue command returned %d values", len(data)) return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("Lua script returned %d values; expected 2", len(data)))
} }
if encoded, err = cast.ToStringE(data[0]); err != nil { encoded, err := cast.ToStringE(data[0])
return "", 0, err if err != nil {
return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res))
} }
if deadline, err = cast.ToInt64E(data[1]); err != nil { d, err := cast.ToInt64E(data[1])
return "", 0, err if err != nil {
return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res))
} }
return encoded, deadline, nil if msg, err = base.DecodeMessage([]byte(encoded)); err != nil {
return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cannot decode message: %v", err))
} }
return "", 0, ErrNoProcessableTask return msg, time.Unix(d, 0), nil
}
return nil, time.Time{}, errors.E(op, errors.NotFound, errors.ErrNoProcessableTask)
} }
// KEYS[1] -> asynq:{<qname>}:active // KEYS[1] -> asynq:{<qname>}:active

View File

@ -238,7 +238,6 @@ func TestDequeue(t *testing.T) {
args []string // list of queues to query args []string // list of queues to query
wantMsg *base.TaskMessage wantMsg *base.TaskMessage
wantDeadline time.Time wantDeadline time.Time
err error
wantPending map[string][]*base.TaskMessage wantPending map[string][]*base.TaskMessage
wantActive map[string][]*base.TaskMessage wantActive map[string][]*base.TaskMessage
wantDeadlines map[string][]base.Z wantDeadlines map[string][]base.Z
@ -250,7 +249,6 @@ func TestDequeue(t *testing.T) {
args: []string{"default"}, args: []string{"default"},
wantMsg: t1, wantMsg: t1,
wantDeadline: time.Unix(t1Deadline, 0), wantDeadline: time.Unix(t1Deadline, 0),
err: nil,
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {}, "default": {},
}, },
@ -261,24 +259,6 @@ func TestDequeue(t *testing.T) {
"default": {{Message: t1, Score: t1Deadline}}, "default": {{Message: t1, Score: t1Deadline}},
}, },
}, },
{
pending: map[string][]*base.TaskMessage{
"default": {},
},
args: []string{"default"},
wantMsg: nil,
wantDeadline: time.Time{},
err: ErrNoProcessableTask,
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
wantActive: map[string][]*base.TaskMessage{
"default": {},
},
wantDeadlines: map[string][]base.Z{
"default": {},
},
},
{ {
pending: map[string][]*base.TaskMessage{ pending: map[string][]*base.TaskMessage{
"default": {t1}, "default": {t1},
@ -288,7 +268,6 @@ func TestDequeue(t *testing.T) {
args: []string{"critical", "default", "low"}, args: []string{"critical", "default", "low"},
wantMsg: t2, wantMsg: t2,
wantDeadline: time.Unix(t2Deadline, 0), wantDeadline: time.Unix(t2Deadline, 0),
err: nil,
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {t1}, "default": {t1},
"critical": {}, "critical": {},
@ -314,7 +293,6 @@ func TestDequeue(t *testing.T) {
args: []string{"critical", "default", "low"}, args: []string{"critical", "default", "low"},
wantMsg: t1, wantMsg: t1,
wantDeadline: time.Unix(t1Deadline, 0), wantDeadline: time.Unix(t1Deadline, 0),
err: nil,
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {}, "default": {},
"critical": {}, "critical": {},
@ -331,6 +309,77 @@ func TestDequeue(t *testing.T) {
"low": {}, "low": {},
}, },
}, },
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllPendingQueues(t, r.client, tc.pending)
gotMsg, gotDeadline, err := r.Dequeue(tc.args...)
if err != nil {
t.Errorf("(*RDB).Dequeue(%v) returned error %v", tc.args, err)
continue
}
if !cmp.Equal(gotMsg, tc.wantMsg) {
t.Errorf("(*RDB).Dequeue(%v) returned message %v; want %v",
tc.args, gotMsg, tc.wantMsg)
continue
}
if !cmp.Equal(gotDeadline, tc.wantDeadline, cmpopts.EquateApproxTime(1*time.Second)) {
t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v",
tc.args, gotDeadline, tc.wantDeadline)
continue
}
for queue, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.PendingKey(queue), diff)
}
}
for queue, want := range tc.wantActive {
gotActive := h.GetActiveMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.ActiveKey(queue), diff)
}
}
for queue, want := range tc.wantDeadlines {
gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue)
if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.DeadlinesKey(queue), diff)
}
}
}
}
func TestDequeueError(t *testing.T) {
r := setup(t)
defer r.Close()
tests := []struct {
pending map[string][]*base.TaskMessage
args []string // list of queues to query
wantErr error
wantPending map[string][]*base.TaskMessage
wantActive map[string][]*base.TaskMessage
wantDeadlines map[string][]base.Z
}{
{
pending: map[string][]*base.TaskMessage{
"default": {},
},
args: []string{"default"},
wantErr: errors.ErrNoProcessableTask,
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
wantActive: map[string][]*base.TaskMessage{
"default": {},
},
wantDeadlines: map[string][]base.Z{
"default": {},
},
},
{ {
pending: map[string][]*base.TaskMessage{ pending: map[string][]*base.TaskMessage{
"default": {}, "default": {},
@ -338,9 +387,7 @@ func TestDequeue(t *testing.T) {
"low": {}, "low": {},
}, },
args: []string{"critical", "default", "low"}, args: []string{"critical", "default", "low"},
wantMsg: nil, wantErr: errors.ErrNoProcessableTask,
wantDeadline: time.Time{},
err: ErrNoProcessableTask,
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {}, "default": {},
"critical": {}, "critical": {},
@ -363,20 +410,18 @@ func TestDequeue(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllPendingQueues(t, r.client, tc.pending) h.SeedAllPendingQueues(t, r.client, tc.pending)
gotMsg, gotDeadline, err := r.Dequeue(tc.args...) gotMsg, gotDeadline, gotErr := r.Dequeue(tc.args...)
if err != tc.err { if !errors.Is(gotErr, tc.wantErr) {
t.Errorf("(*RDB).Dequeue(%v) returned error %v; want %v", t.Errorf("(*RDB).Dequeue(%v) returned error %v; want %v",
tc.args, err, tc.err) tc.args, gotErr, tc.wantErr)
continue continue
} }
if !cmp.Equal(gotMsg, tc.wantMsg) || err != tc.err { if gotMsg != nil {
t.Errorf("(*RDB).Dequeue(%v) returned message %v; want %v", t.Errorf("(*RDB).Dequeue(%v) returned message %v; want nil", tc.args, gotMsg)
tc.args, gotMsg, tc.wantMsg)
continue continue
} }
if !cmp.Equal(gotDeadline, tc.wantDeadline, cmpopts.EquateApproxTime(1*time.Second)) { if !gotDeadline.IsZero() {
t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v", t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v", tc.args, gotDeadline, time.Time{})
tc.args, gotDeadline, tc.wantDeadline)
continue continue
} }
@ -426,7 +471,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
pending map[string][]*base.TaskMessage pending map[string][]*base.TaskMessage
args []string // list of queues to query args []string // list of queues to query
wantMsg *base.TaskMessage wantMsg *base.TaskMessage
err error wantErr error
wantPending map[string][]*base.TaskMessage wantPending map[string][]*base.TaskMessage
wantActive map[string][]*base.TaskMessage wantActive map[string][]*base.TaskMessage
}{ }{
@ -438,7 +483,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
}, },
args: []string{"default", "critical"}, args: []string{"default", "critical"},
wantMsg: t2, wantMsg: t2,
err: nil, wantErr: nil,
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {t1}, "default": {t1},
"critical": {}, "critical": {},
@ -455,7 +500,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
}, },
args: []string{"default"}, args: []string{"default"},
wantMsg: nil, wantMsg: nil,
err: ErrNoProcessableTask, wantErr: errors.ErrNoProcessableTask,
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {t1}, "default": {t1},
}, },
@ -471,7 +516,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
}, },
args: []string{"default", "critical"}, args: []string{"default", "critical"},
wantMsg: nil, wantMsg: nil,
err: ErrNoProcessableTask, wantErr: errors.ErrNoProcessableTask,
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {t1}, "default": {t1},
"critical": {t2}, "critical": {t2},
@ -493,9 +538,9 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
h.SeedAllPendingQueues(t, r.client, tc.pending) h.SeedAllPendingQueues(t, r.client, tc.pending)
got, _, err := r.Dequeue(tc.args...) got, _, err := r.Dequeue(tc.args...)
if !cmp.Equal(got, tc.wantMsg) || err != tc.err { if !cmp.Equal(got, tc.wantMsg) || !errors.Is(err, tc.wantErr) {
t.Errorf("Dequeue(%v) = %v, %v; want %v, %v", t.Errorf("Dequeue(%v) = %v, %v; want %v, %v",
tc.args, got, err, tc.wantMsg, tc.err) tc.args, got, err, tc.wantMsg, tc.wantErr)
continue continue
} }