From 5c42bdc4c4f2a8d95a2e8910f639b28f186bfa6f Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 1 Jan 2020 13:42:24 -0800 Subject: [PATCH] Change (*RDB).Dequeue to query multiple queues in order --- internal/asynqtest/asynqtest.go | 24 ++++++ internal/rdb/inspect_test.go | 8 +- internal/rdb/rdb.go | 25 ++++-- internal/rdb/rdb_test.go | 144 +++++++++++++++++++++++++++----- processor.go | 7 +- 5 files changed, 177 insertions(+), 31 deletions(-) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 4ddb19b..9c88c93 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -110,6 +110,18 @@ func SeedDefaultQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) seedRedisList(tb, r, base.DefaultQueue, msgs) } +// SeedHighPriorityQueue initializes the high-priority queue with the given messages. +func SeedHighPriorityQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) { + tb.Helper() + seedRedisList(tb, r, base.HighPriorityQueue, msgs) +} + +// SeedLowPriorityQueue initializes the low-priority queue with the given messages. +func SeedLowPriorityQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) { + tb.Helper() + seedRedisList(tb, r, base.LowPriorityQueue, msgs) +} + // SeedInProgressQueue initializes the in-progress queue with the given messages. func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) { tb.Helper() @@ -158,6 +170,18 @@ func GetEnqueuedMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage { return getListMessages(tb, r, base.DefaultQueue) } +// GetHighPriorityMessages returns all task messages in the high-priority queue. +func GetHighPriorityMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage { + tb.Helper() + return getListMessages(tb, r, base.HighPriorityQueue) +} + +// GetLowPriorityMessages returns all task messages in the low-priority queue. +func GetLowPriorityMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage { + tb.Helper() + return getListMessages(tb, r, base.LowPriorityQueue) +} + // GetInProgressMessages returns all task messages in the in-progress queue. func GetInProgressMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage { tb.Helper() diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 2111244..89bcc17 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -328,7 +328,7 @@ func TestListRetry(t *testing.T) { m1 := &base.TaskMessage{ ID: xid.New(), Type: "send_email", - Queue: "default", + Priority: base.PriorityDefault, Payload: map[string]interface{}{"subject": "hello"}, ErrorMsg: "email server not responding", Retry: 25, @@ -337,7 +337,7 @@ func TestListRetry(t *testing.T) { m2 := &base.TaskMessage{ ID: xid.New(), Type: "reindex", - Queue: "default", + Priority: base.PriorityDefault, Payload: nil, ErrorMsg: "search engine not responding", Retry: 25, @@ -412,14 +412,14 @@ func TestListDead(t *testing.T) { m1 := &base.TaskMessage{ ID: xid.New(), Type: "send_email", - Queue: "default", + Priority: base.PriorityDefault, Payload: map[string]interface{}{"subject": "hello"}, ErrorMsg: "email server not responding", } m2 := &base.TaskMessage{ ID: xid.New(), Type: "reindex", - Queue: "default", + Priority: base.PriorityDefault, Payload: nil, ErrorMsg: "search engine not responding", } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index ac9adcf..3de4f0a 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -12,8 +12,8 @@ import ( ) var ( - // ErrDequeueTimeout indicates that the blocking dequeue operation timed out. - ErrDequeueTimeout = errors.New("blocking dequeue operation timed out") + // ErrNoProcessableTask indicates that the dequeue operation returns no task because all queues are empty. + ErrNoProcessableTask = errors.New("all queues are empty; no task to process") // ErrTaskNotFound indicates that a task that matches the given identifier was not found. ErrTaskNotFound = errors.New("could not find a task") @@ -50,14 +50,16 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { // once a task is available, it adds the task to "in progress" queue // and returns the task. If there are no tasks for the entire timeout // duration, it returns ErrDequeueTimeout. -func (r *RDB) Dequeue(timeout time.Duration) (*base.TaskMessage, error) { - data, err := r.client.BRPopLPush(base.DefaultQueue, base.InProgressQueue, timeout).Result() +func (r *RDB) Dequeue(queues ...string) (*base.TaskMessage, error) { + data, err := r.dequeue(queues...) if err == redis.Nil { - return nil, ErrDequeueTimeout + // all queues are empty // TODO(hibiken): Rename this sentinel error + return nil, ErrNoProcessableTask } if err != nil { return nil, err } + var msg base.TaskMessage err = json.Unmarshal([]byte(data), &msg) if err != nil { @@ -66,6 +68,19 @@ func (r *RDB) Dequeue(timeout time.Duration) (*base.TaskMessage, error) { return &msg, nil } +func (r *RDB) dequeue(queues ...string) (data string, err error) { + for _, qname := range queues { + data, err = r.client.RPopLPush(qname, base.InProgressQueue).Result() + if err == nil { + return data, nil + } + if err != redis.Nil { + return "", err + } + } + return data, err +} + // Done removes the task from in-progress queue to mark the task as done. func (r *RDB) Done(msg *base.TaskMessage) error { bytes, err := json.Marshal(msg) diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index c4d8c31..7085c59 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -55,37 +55,143 @@ func TestEnqueue(t *testing.T) { func TestDequeue(t *testing.T) { r := setup(t) t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"}) + t2 := h.NewTaskMessage("reindex", map[string]interface{}{}) + t3 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{}) + t4 := h.NewTaskMessage("send_notification", nil) tests := []struct { - enqueued []*base.TaskMessage - want *base.TaskMessage - err error - wantInProgress []*base.TaskMessage + desc string + enqueuedHigh []*base.TaskMessage + enqueuedDefault []*base.TaskMessage + enqueuedLow []*base.TaskMessage + args []string + want *base.TaskMessage + err error + wantHigh []*base.TaskMessage + wantDefault []*base.TaskMessage + wantLow []*base.TaskMessage + wantInProgress []*base.TaskMessage }{ { - enqueued: []*base.TaskMessage{t1}, - want: t1, - err: nil, - wantInProgress: []*base.TaskMessage{t1}, + desc: "default only", + enqueuedHigh: []*base.TaskMessage{}, + enqueuedDefault: []*base.TaskMessage{t1, t2, t3, t4}, + enqueuedLow: []*base.TaskMessage{}, + args: []string{base.HighPriorityQueue, base.DefaultQueue, base.LowPriorityQueue}, + want: t1, + err: nil, + wantHigh: []*base.TaskMessage{}, + wantDefault: []*base.TaskMessage{t2, t3, t4}, + wantLow: []*base.TaskMessage{}, + wantInProgress: []*base.TaskMessage{t1}, }, { - enqueued: []*base.TaskMessage{}, - want: nil, - err: ErrDequeueTimeout, - wantInProgress: []*base.TaskMessage{}, + desc: "all queues empty", + enqueuedHigh: []*base.TaskMessage{}, + enqueuedDefault: []*base.TaskMessage{}, + enqueuedLow: []*base.TaskMessage{}, + args: []string{base.HighPriorityQueue, base.DefaultQueue, base.LowPriorityQueue}, + want: nil, + err: ErrNoProcessableTask, + wantHigh: []*base.TaskMessage{}, + wantDefault: []*base.TaskMessage{}, + wantLow: []*base.TaskMessage{}, + wantInProgress: []*base.TaskMessage{}, + }, + { + desc: "all queues full", + enqueuedHigh: []*base.TaskMessage{t2}, + enqueuedDefault: []*base.TaskMessage{t1, t3}, + enqueuedLow: []*base.TaskMessage{t4}, + args: []string{base.HighPriorityQueue, base.DefaultQueue, base.LowPriorityQueue}, + want: t2, + err: nil, + wantHigh: []*base.TaskMessage{}, + wantDefault: []*base.TaskMessage{t1, t3}, + wantLow: []*base.TaskMessage{t4}, + wantInProgress: []*base.TaskMessage{t2}, + }, + { + desc: "low queue only", + enqueuedHigh: []*base.TaskMessage{}, + enqueuedDefault: []*base.TaskMessage{}, + enqueuedLow: []*base.TaskMessage{t3, t4}, + args: []string{base.HighPriorityQueue, base.DefaultQueue, base.LowPriorityQueue}, + want: t3, + err: nil, + wantHigh: []*base.TaskMessage{}, + wantDefault: []*base.TaskMessage{}, + wantLow: []*base.TaskMessage{t4}, + wantInProgress: []*base.TaskMessage{t3}, + }, + { + desc: "all queues full with reverse priority args", + enqueuedHigh: []*base.TaskMessage{t2}, + enqueuedDefault: []*base.TaskMessage{t1}, + enqueuedLow: []*base.TaskMessage{t3, t4}, + args: []string{base.LowPriorityQueue, base.DefaultQueue, base.HighPriorityQueue}, + want: t3, + err: nil, + wantHigh: []*base.TaskMessage{t2}, + wantDefault: []*base.TaskMessage{t1}, + wantLow: []*base.TaskMessage{t4}, + wantInProgress: []*base.TaskMessage{t3}, + }, + { + desc: "all queues full with defaualt queue first in args", + enqueuedHigh: []*base.TaskMessage{t2}, + enqueuedDefault: []*base.TaskMessage{t1}, + enqueuedLow: []*base.TaskMessage{t3, t4}, + args: []string{base.DefaultQueue, base.LowPriorityQueue, base.HighPriorityQueue}, + want: t1, + err: nil, + wantHigh: []*base.TaskMessage{t2}, + wantDefault: []*base.TaskMessage{}, + wantLow: []*base.TaskMessage{t3, t4}, + wantInProgress: []*base.TaskMessage{t1}, + }, + { + desc: "first queue in args empty", + enqueuedHigh: []*base.TaskMessage{}, + enqueuedDefault: []*base.TaskMessage{t1, t2, t3}, + enqueuedLow: []*base.TaskMessage{t4}, + args: []string{base.HighPriorityQueue, base.DefaultQueue, base.LowPriorityQueue}, + want: t1, + err: nil, + wantHigh: []*base.TaskMessage{}, + wantDefault: []*base.TaskMessage{t2, t3}, + wantLow: []*base.TaskMessage{t4}, + wantInProgress: []*base.TaskMessage{t1}, }, } for _, tc := range tests { h.FlushDB(t, r.client) // clean up db before each test case - h.SeedDefaultQueue(t, r.client, tc.enqueued) + h.SeedHighPriorityQueue(t, r.client, tc.enqueuedHigh) + h.SeedDefaultQueue(t, r.client, tc.enqueuedDefault) + h.SeedLowPriorityQueue(t, r.client, tc.enqueuedLow) - got, err := r.Dequeue(time.Second) + got, err := r.Dequeue(tc.args...) if !cmp.Equal(got, tc.want) || err != tc.err { - t.Errorf("(*RDB).Dequeue(time.Second) = %v, %v; want %v, %v", + t.Errorf("(*RDB).Dequeue() = %v, %v; want %v, %v", got, err, tc.want, tc.err) continue } + gotHigh := h.GetHighPriorityMessages(t, r.client) + if diff := cmp.Diff(tc.wantHigh, gotHigh, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.HighPriorityQueue, diff) + } + + gotDefault := h.GetEnqueuedMessages(t, r.client) + if diff := cmp.Diff(tc.wantDefault, gotDefault, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.DefaultQueue, diff) + } + + gotLow := h.GetLowPriorityMessages(t, r.client) + if diff := cmp.Diff(tc.wantLow, gotLow, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.LowPriorityQueue, diff) + } + gotInProgress := h.GetInProgressMessages(t, r.client) if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff) @@ -296,7 +402,7 @@ func TestRetry(t *testing.T) { ID: t1.ID, Type: t1.Type, Payload: t1.Payload, - Queue: t1.Queue, + Priority: t1.Priority, Retry: t1.Retry, Retried: t1.Retried + 1, ErrorMsg: errMsg, @@ -391,7 +497,7 @@ func TestRetryWithMutatedTask(t *testing.T) { ID: t1.ID, Type: t1.Type, Payload: t1.Payload, - Queue: t1.Queue, + Priority: t1.Priority, Retry: t1.Retry, Retried: t1.Retried + 1, ErrorMsg: errMsg, @@ -488,7 +594,7 @@ func TestKill(t *testing.T) { ID: t1.ID, Type: t1.Type, Payload: t1.Payload, - Queue: t1.Queue, + Priority: t1.Priority, Retry: t1.Retry, Retried: t1.Retried, ErrorMsg: errMsg, @@ -591,7 +697,7 @@ func TestKillWithMutatedTask(t *testing.T) { ID: t1.ID, Type: t1.Type, Payload: t1.Payload, - Queue: t1.Queue, + Priority: t1.Priority, Retry: t1.Retry, Retried: t1.Retried, ErrorMsg: errMsg, diff --git a/processor.go b/processor.go index 39d82b3..36bc097 100644 --- a/processor.go +++ b/processor.go @@ -102,9 +102,10 @@ func (p *processor) start() { // exec pulls a task out of the queue and starts a worker goroutine to // process the task. func (p *processor) exec() { - msg, err := p.rdb.Dequeue(p.dequeueTimeout) - if err == rdb.ErrDequeueTimeout { - // timed out, this is a normal behavior. + // TODO(hibiken): sort the queues based on weight, but prevent starvation + msg, err := p.rdb.Dequeue(base.HighPriorityQueue, base.DefaultQueue, base.LowPriorityQueue) + if err == rdb.ErrNoProcessableTask { + // queues are empty, this is a normal behavior. return } if err != nil {