diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index f71158c..8d59afe 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -15,11 +15,6 @@ import ( "github.com/spf13/cast" ) -var ( - // ErrNoProcessableTask indicates that there are no tasks ready to be processed. - ErrNoProcessableTask = errors.New("no tasks are ready for processing") -) - const statsTTL = 90 * 24 * time.Hour // 90 days // RDB is a client interface to query and mutate task queues. @@ -139,32 +134,24 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { return nil } -// Dequeue queries given queues in order and pops a task message -// off a queue if one exists and returns the message and deadline. -// Dequeue skips a queue if the queue is paused. -// If all queues are empty, ErrNoProcessableTask error is returned. -func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error) { - encoded, d, err := r.dequeue(qnames...) - if err != nil { - return nil, time.Time{}, err - } - if msg, err = base.DecodeMessage([]byte(encoded)); err != nil { - return nil, time.Time{}, err - } - return msg, time.Unix(d, 0), nil -} - +// Input: // KEYS[1] -> asynq:{}:pending // KEYS[2] -> asynq:{}:paused // KEYS[3] -> asynq:{}:active // KEYS[4] -> asynq:{}:deadlines +// -- // ARGV[1] -> current time in Unix time // ARGV[2] -> task key prefix // -// dequeueCmd checks whether a queue is paused first, before +// Output: +// Returns nil if no processable task is found in the given queue. +// Returns tuple {msg , deadline} if task is found, where `msg` is the encoded +// TaskMessage, and `deadline` is Unix time in seconds. +// +// Note: dequeueCmd checks whether a queue is paused first, before // calling RPOPLPUSH to pop a task from the queue. // It computes the task deadline by inspecting Timout and Deadline fields, -// and inserts the task with deadlines set. +// and inserts the task to the deadlines zset with the computed deadline. var dequeueCmd = redis.NewScript(` if redis.call("EXISTS", KEYS[2]) == 0 then local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3]) @@ -191,7 +178,12 @@ if redis.call("EXISTS", KEYS[2]) == 0 then end return nil`) -func (r *RDB) dequeue(qnames ...string) (encoded string, deadline int64, err error) { +// Dequeue queries given queues in order and pops a task message +// off a queue if one exists and returns the message and deadline. +// Dequeue skips a queue if the queue is paused. +// If all queues are empty, ErrNoProcessableTask error is returned. +func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error) { + var op errors.Op = "rdb.Dequeue" for _, qname := range qnames { keys := []string{ base.PendingKey(qname), @@ -207,24 +199,29 @@ func (r *RDB) dequeue(qnames ...string) (encoded string, deadline int64, err err if err == redis.Nil { continue } else if err != nil { - return "", 0, err + return nil, time.Time{}, errors.E(op, errors.Unknown, fmt.Sprintf("redis eval error: %v", err)) } data, err := cast.ToSliceE(res) if err != nil { - return "", 0, err + return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res)) } if len(data) != 2 { - return "", 0, fmt.Errorf("asynq: internal error: dequeue command returned %d values", len(data)) + return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("Lua script returned %d values; expected 2", len(data))) } - if encoded, err = cast.ToStringE(data[0]); err != nil { - return "", 0, err + encoded, err := cast.ToStringE(data[0]) + if err != nil { + return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res)) } - if deadline, err = cast.ToInt64E(data[1]); err != nil { - return "", 0, err + d, err := cast.ToInt64E(data[1]) + if err != nil { + return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cast error: unexpected return value from Lua script: %v", res)) } - return encoded, deadline, nil + if msg, err = base.DecodeMessage([]byte(encoded)); err != nil { + return nil, time.Time{}, errors.E(op, errors.Internal, fmt.Sprintf("cannot decode message: %v", err)) + } + return msg, time.Unix(d, 0), nil } - return "", 0, ErrNoProcessableTask + return nil, time.Time{}, errors.E(op, errors.NotFound, errors.ErrNoProcessableTask) } // KEYS[1] -> asynq:{}:active diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 830d2c5..e711ae3 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -238,7 +238,6 @@ func TestDequeue(t *testing.T) { args []string // list of queues to query wantMsg *base.TaskMessage wantDeadline time.Time - err error wantPending map[string][]*base.TaskMessage wantActive map[string][]*base.TaskMessage wantDeadlines map[string][]base.Z @@ -250,7 +249,6 @@ func TestDequeue(t *testing.T) { args: []string{"default"}, wantMsg: t1, wantDeadline: time.Unix(t1Deadline, 0), - err: nil, wantPending: map[string][]*base.TaskMessage{ "default": {}, }, @@ -261,24 +259,6 @@ func TestDequeue(t *testing.T) { "default": {{Message: t1, Score: t1Deadline}}, }, }, - { - pending: map[string][]*base.TaskMessage{ - "default": {}, - }, - args: []string{"default"}, - wantMsg: nil, - wantDeadline: time.Time{}, - err: ErrNoProcessableTask, - wantPending: map[string][]*base.TaskMessage{ - "default": {}, - }, - wantActive: map[string][]*base.TaskMessage{ - "default": {}, - }, - wantDeadlines: map[string][]base.Z{ - "default": {}, - }, - }, { pending: map[string][]*base.TaskMessage{ "default": {t1}, @@ -288,7 +268,6 @@ func TestDequeue(t *testing.T) { args: []string{"critical", "default", "low"}, wantMsg: t2, wantDeadline: time.Unix(t2Deadline, 0), - err: nil, wantPending: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {}, @@ -314,7 +293,6 @@ func TestDequeue(t *testing.T) { args: []string{"critical", "default", "low"}, wantMsg: t1, wantDeadline: time.Unix(t1Deadline, 0), - err: nil, wantPending: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, @@ -331,16 +309,85 @@ func TestDequeue(t *testing.T) { "low": {}, }, }, + } + + for _, tc := range tests { + h.FlushDB(t, r.client) // clean up db before each test case + h.SeedAllPendingQueues(t, r.client, tc.pending) + + gotMsg, gotDeadline, err := r.Dequeue(tc.args...) + if err != nil { + t.Errorf("(*RDB).Dequeue(%v) returned error %v", tc.args, err) + continue + } + if !cmp.Equal(gotMsg, tc.wantMsg) { + t.Errorf("(*RDB).Dequeue(%v) returned message %v; want %v", + tc.args, gotMsg, tc.wantMsg) + continue + } + if !cmp.Equal(gotDeadline, tc.wantDeadline, cmpopts.EquateApproxTime(1*time.Second)) { + t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v", + tc.args, gotDeadline, tc.wantDeadline) + continue + } + + for queue, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r.client, queue) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.PendingKey(queue), diff) + } + } + for queue, want := range tc.wantActive { + gotActive := h.GetActiveMessages(t, r.client, queue) + if diff := cmp.Diff(want, gotActive, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.ActiveKey(queue), diff) + } + } + for queue, want := range tc.wantDeadlines { + gotDeadlines := h.GetDeadlinesEntries(t, r.client, queue) + if diff := cmp.Diff(want, gotDeadlines, h.SortZSetEntryOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.DeadlinesKey(queue), diff) + } + } + } +} + +func TestDequeueError(t *testing.T) { + r := setup(t) + defer r.Close() + + tests := []struct { + pending map[string][]*base.TaskMessage + args []string // list of queues to query + wantErr error + wantPending map[string][]*base.TaskMessage + wantActive map[string][]*base.TaskMessage + wantDeadlines map[string][]base.Z + }{ + { + pending: map[string][]*base.TaskMessage{ + "default": {}, + }, + args: []string{"default"}, + wantErr: errors.ErrNoProcessableTask, + wantPending: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantActive: map[string][]*base.TaskMessage{ + "default": {}, + }, + wantDeadlines: map[string][]base.Z{ + "default": {}, + }, + }, { pending: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, "low": {}, }, - args: []string{"critical", "default", "low"}, - wantMsg: nil, - wantDeadline: time.Time{}, - err: ErrNoProcessableTask, + args: []string{"critical", "default", "low"}, + wantErr: errors.ErrNoProcessableTask, wantPending: map[string][]*base.TaskMessage{ "default": {}, "critical": {}, @@ -363,20 +410,18 @@ func TestDequeue(t *testing.T) { h.FlushDB(t, r.client) // clean up db before each test case h.SeedAllPendingQueues(t, r.client, tc.pending) - gotMsg, gotDeadline, err := r.Dequeue(tc.args...) - if err != tc.err { + gotMsg, gotDeadline, gotErr := r.Dequeue(tc.args...) + if !errors.Is(gotErr, tc.wantErr) { t.Errorf("(*RDB).Dequeue(%v) returned error %v; want %v", - tc.args, err, tc.err) + tc.args, gotErr, tc.wantErr) continue } - if !cmp.Equal(gotMsg, tc.wantMsg) || err != tc.err { - t.Errorf("(*RDB).Dequeue(%v) returned message %v; want %v", - tc.args, gotMsg, tc.wantMsg) + if gotMsg != nil { + t.Errorf("(*RDB).Dequeue(%v) returned message %v; want nil", tc.args, gotMsg) continue } - if !cmp.Equal(gotDeadline, tc.wantDeadline, cmpopts.EquateApproxTime(1*time.Second)) { - t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v", - tc.args, gotDeadline, tc.wantDeadline) + if !gotDeadline.IsZero() { + t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v", tc.args, gotDeadline, time.Time{}) continue } @@ -426,7 +471,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { pending map[string][]*base.TaskMessage args []string // list of queues to query wantMsg *base.TaskMessage - err error + wantErr error wantPending map[string][]*base.TaskMessage wantActive map[string][]*base.TaskMessage }{ @@ -438,7 +483,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { }, args: []string{"default", "critical"}, wantMsg: t2, - err: nil, + wantErr: nil, wantPending: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {}, @@ -455,7 +500,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { }, args: []string{"default"}, wantMsg: nil, - err: ErrNoProcessableTask, + wantErr: errors.ErrNoProcessableTask, wantPending: map[string][]*base.TaskMessage{ "default": {t1}, }, @@ -471,7 +516,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { }, args: []string{"default", "critical"}, wantMsg: nil, - err: ErrNoProcessableTask, + wantErr: errors.ErrNoProcessableTask, wantPending: map[string][]*base.TaskMessage{ "default": {t1}, "critical": {t2}, @@ -493,9 +538,9 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { h.SeedAllPendingQueues(t, r.client, tc.pending) got, _, err := r.Dequeue(tc.args...) - if !cmp.Equal(got, tc.wantMsg) || err != tc.err { + if !cmp.Equal(got, tc.wantMsg) || !errors.Is(err, tc.wantErr) { t.Errorf("Dequeue(%v) = %v, %v; want %v, %v", - tc.args, got, err, tc.wantMsg, tc.err) + tc.args, got, err, tc.wantMsg, tc.wantErr) continue }