2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-04-21 16:20:18 +08:00

Change (*RDB).Dequeue to query multiple queues in order

This commit is contained in:
Ken Hibino 2020-01-01 13:42:24 -08:00
parent eb191c07d1
commit 5c42bdc4c4
5 changed files with 177 additions and 31 deletions

View File

@ -110,6 +110,18 @@ func SeedDefaultQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage)
seedRedisList(tb, r, base.DefaultQueue, msgs) seedRedisList(tb, r, base.DefaultQueue, msgs)
} }
// SeedHighPriorityQueue initializes the high-priority queue with the given messages.
func SeedHighPriorityQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) {
tb.Helper()
seedRedisList(tb, r, base.HighPriorityQueue, msgs)
}
// SeedLowPriorityQueue initializes the low-priority queue with the given messages.
func SeedLowPriorityQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) {
tb.Helper()
seedRedisList(tb, r, base.LowPriorityQueue, msgs)
}
// SeedInProgressQueue initializes the in-progress queue with the given messages. // SeedInProgressQueue initializes the in-progress queue with the given messages.
func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) { func SeedInProgressQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) {
tb.Helper() tb.Helper()
@ -158,6 +170,18 @@ func GetEnqueuedMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage {
return getListMessages(tb, r, base.DefaultQueue) return getListMessages(tb, r, base.DefaultQueue)
} }
// GetHighPriorityMessages returns all task messages in the high-priority queue.
func GetHighPriorityMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage {
tb.Helper()
return getListMessages(tb, r, base.HighPriorityQueue)
}
// GetLowPriorityMessages returns all task messages in the low-priority queue.
func GetLowPriorityMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage {
tb.Helper()
return getListMessages(tb, r, base.LowPriorityQueue)
}
// GetInProgressMessages returns all task messages in the in-progress queue. // GetInProgressMessages returns all task messages in the in-progress queue.
func GetInProgressMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage { func GetInProgressMessages(tb testing.TB, r *redis.Client) []*base.TaskMessage {
tb.Helper() tb.Helper()

View File

@ -328,7 +328,7 @@ func TestListRetry(t *testing.T) {
m1 := &base.TaskMessage{ m1 := &base.TaskMessage{
ID: xid.New(), ID: xid.New(),
Type: "send_email", Type: "send_email",
Queue: "default", Priority: base.PriorityDefault,
Payload: map[string]interface{}{"subject": "hello"}, Payload: map[string]interface{}{"subject": "hello"},
ErrorMsg: "email server not responding", ErrorMsg: "email server not responding",
Retry: 25, Retry: 25,
@ -337,7 +337,7 @@ func TestListRetry(t *testing.T) {
m2 := &base.TaskMessage{ m2 := &base.TaskMessage{
ID: xid.New(), ID: xid.New(),
Type: "reindex", Type: "reindex",
Queue: "default", Priority: base.PriorityDefault,
Payload: nil, Payload: nil,
ErrorMsg: "search engine not responding", ErrorMsg: "search engine not responding",
Retry: 25, Retry: 25,
@ -412,14 +412,14 @@ func TestListDead(t *testing.T) {
m1 := &base.TaskMessage{ m1 := &base.TaskMessage{
ID: xid.New(), ID: xid.New(),
Type: "send_email", Type: "send_email",
Queue: "default", Priority: base.PriorityDefault,
Payload: map[string]interface{}{"subject": "hello"}, Payload: map[string]interface{}{"subject": "hello"},
ErrorMsg: "email server not responding", ErrorMsg: "email server not responding",
} }
m2 := &base.TaskMessage{ m2 := &base.TaskMessage{
ID: xid.New(), ID: xid.New(),
Type: "reindex", Type: "reindex",
Queue: "default", Priority: base.PriorityDefault,
Payload: nil, Payload: nil,
ErrorMsg: "search engine not responding", ErrorMsg: "search engine not responding",
} }

View File

@ -12,8 +12,8 @@ import (
) )
var ( var (
// ErrDequeueTimeout indicates that the blocking dequeue operation timed out. // ErrNoProcessableTask indicates that the dequeue operation returns no task because all queues are empty.
ErrDequeueTimeout = errors.New("blocking dequeue operation timed out") ErrNoProcessableTask = errors.New("all queues are empty; no task to process")
// ErrTaskNotFound indicates that a task that matches the given identifier was not found. // ErrTaskNotFound indicates that a task that matches the given identifier was not found.
ErrTaskNotFound = errors.New("could not find a task") ErrTaskNotFound = errors.New("could not find a task")
@ -50,14 +50,16 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
// once a task is available, it adds the task to "in progress" queue // once a task is available, it adds the task to "in progress" queue
// and returns the task. If there are no tasks for the entire timeout // and returns the task. If there are no tasks for the entire timeout
// duration, it returns ErrDequeueTimeout. // duration, it returns ErrDequeueTimeout.
func (r *RDB) Dequeue(timeout time.Duration) (*base.TaskMessage, error) { func (r *RDB) Dequeue(queues ...string) (*base.TaskMessage, error) {
data, err := r.client.BRPopLPush(base.DefaultQueue, base.InProgressQueue, timeout).Result() data, err := r.dequeue(queues...)
if err == redis.Nil { if err == redis.Nil {
return nil, ErrDequeueTimeout // all queues are empty // TODO(hibiken): Rename this sentinel error
return nil, ErrNoProcessableTask
} }
if err != nil { if err != nil {
return nil, err return nil, err
} }
var msg base.TaskMessage var msg base.TaskMessage
err = json.Unmarshal([]byte(data), &msg) err = json.Unmarshal([]byte(data), &msg)
if err != nil { if err != nil {
@ -66,6 +68,19 @@ func (r *RDB) Dequeue(timeout time.Duration) (*base.TaskMessage, error) {
return &msg, nil return &msg, nil
} }
func (r *RDB) dequeue(queues ...string) (data string, err error) {
for _, qname := range queues {
data, err = r.client.RPopLPush(qname, base.InProgressQueue).Result()
if err == nil {
return data, nil
}
if err != redis.Nil {
return "", err
}
}
return data, err
}
// Done removes the task from in-progress queue to mark the task as done. // Done removes the task from in-progress queue to mark the task as done.
func (r *RDB) Done(msg *base.TaskMessage) error { func (r *RDB) Done(msg *base.TaskMessage) error {
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)

View File

@ -55,37 +55,143 @@ func TestEnqueue(t *testing.T) {
func TestDequeue(t *testing.T) { func TestDequeue(t *testing.T) {
r := setup(t) r := setup(t)
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"}) t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"})
t2 := h.NewTaskMessage("reindex", map[string]interface{}{})
t3 := h.NewTaskMessage("gen_thumbnail", map[string]interface{}{})
t4 := h.NewTaskMessage("send_notification", nil)
tests := []struct { tests := []struct {
enqueued []*base.TaskMessage desc string
enqueuedHigh []*base.TaskMessage
enqueuedDefault []*base.TaskMessage
enqueuedLow []*base.TaskMessage
args []string
want *base.TaskMessage want *base.TaskMessage
err error err error
wantHigh []*base.TaskMessage
wantDefault []*base.TaskMessage
wantLow []*base.TaskMessage
wantInProgress []*base.TaskMessage wantInProgress []*base.TaskMessage
}{ }{
{ {
enqueued: []*base.TaskMessage{t1}, desc: "default only",
enqueuedHigh: []*base.TaskMessage{},
enqueuedDefault: []*base.TaskMessage{t1, t2, t3, t4},
enqueuedLow: []*base.TaskMessage{},
args: []string{base.HighPriorityQueue, base.DefaultQueue, base.LowPriorityQueue},
want: t1, want: t1,
err: nil, err: nil,
wantHigh: []*base.TaskMessage{},
wantDefault: []*base.TaskMessage{t2, t3, t4},
wantLow: []*base.TaskMessage{},
wantInProgress: []*base.TaskMessage{t1}, wantInProgress: []*base.TaskMessage{t1},
}, },
{ {
enqueued: []*base.TaskMessage{}, desc: "all queues empty",
enqueuedHigh: []*base.TaskMessage{},
enqueuedDefault: []*base.TaskMessage{},
enqueuedLow: []*base.TaskMessage{},
args: []string{base.HighPriorityQueue, base.DefaultQueue, base.LowPriorityQueue},
want: nil, want: nil,
err: ErrDequeueTimeout, err: ErrNoProcessableTask,
wantHigh: []*base.TaskMessage{},
wantDefault: []*base.TaskMessage{},
wantLow: []*base.TaskMessage{},
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
}, },
{
desc: "all queues full",
enqueuedHigh: []*base.TaskMessage{t2},
enqueuedDefault: []*base.TaskMessage{t1, t3},
enqueuedLow: []*base.TaskMessage{t4},
args: []string{base.HighPriorityQueue, base.DefaultQueue, base.LowPriorityQueue},
want: t2,
err: nil,
wantHigh: []*base.TaskMessage{},
wantDefault: []*base.TaskMessage{t1, t3},
wantLow: []*base.TaskMessage{t4},
wantInProgress: []*base.TaskMessage{t2},
},
{
desc: "low queue only",
enqueuedHigh: []*base.TaskMessage{},
enqueuedDefault: []*base.TaskMessage{},
enqueuedLow: []*base.TaskMessage{t3, t4},
args: []string{base.HighPriorityQueue, base.DefaultQueue, base.LowPriorityQueue},
want: t3,
err: nil,
wantHigh: []*base.TaskMessage{},
wantDefault: []*base.TaskMessage{},
wantLow: []*base.TaskMessage{t4},
wantInProgress: []*base.TaskMessage{t3},
},
{
desc: "all queues full with reverse priority args",
enqueuedHigh: []*base.TaskMessage{t2},
enqueuedDefault: []*base.TaskMessage{t1},
enqueuedLow: []*base.TaskMessage{t3, t4},
args: []string{base.LowPriorityQueue, base.DefaultQueue, base.HighPriorityQueue},
want: t3,
err: nil,
wantHigh: []*base.TaskMessage{t2},
wantDefault: []*base.TaskMessage{t1},
wantLow: []*base.TaskMessage{t4},
wantInProgress: []*base.TaskMessage{t3},
},
{
desc: "all queues full with defaualt queue first in args",
enqueuedHigh: []*base.TaskMessage{t2},
enqueuedDefault: []*base.TaskMessage{t1},
enqueuedLow: []*base.TaskMessage{t3, t4},
args: []string{base.DefaultQueue, base.LowPriorityQueue, base.HighPriorityQueue},
want: t1,
err: nil,
wantHigh: []*base.TaskMessage{t2},
wantDefault: []*base.TaskMessage{},
wantLow: []*base.TaskMessage{t3, t4},
wantInProgress: []*base.TaskMessage{t1},
},
{
desc: "first queue in args empty",
enqueuedHigh: []*base.TaskMessage{},
enqueuedDefault: []*base.TaskMessage{t1, t2, t3},
enqueuedLow: []*base.TaskMessage{t4},
args: []string{base.HighPriorityQueue, base.DefaultQueue, base.LowPriorityQueue},
want: t1,
err: nil,
wantHigh: []*base.TaskMessage{},
wantDefault: []*base.TaskMessage{t2, t3},
wantLow: []*base.TaskMessage{t4},
wantInProgress: []*base.TaskMessage{t1},
},
} }
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedDefaultQueue(t, r.client, tc.enqueued) h.SeedHighPriorityQueue(t, r.client, tc.enqueuedHigh)
h.SeedDefaultQueue(t, r.client, tc.enqueuedDefault)
h.SeedLowPriorityQueue(t, r.client, tc.enqueuedLow)
got, err := r.Dequeue(time.Second) got, err := r.Dequeue(tc.args...)
if !cmp.Equal(got, tc.want) || err != tc.err { if !cmp.Equal(got, tc.want) || err != tc.err {
t.Errorf("(*RDB).Dequeue(time.Second) = %v, %v; want %v, %v", t.Errorf("(*RDB).Dequeue() = %v, %v; want %v, %v",
got, err, tc.want, tc.err) got, err, tc.want, tc.err)
continue continue
} }
gotHigh := h.GetHighPriorityMessages(t, r.client)
if diff := cmp.Diff(tc.wantHigh, gotHigh, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.HighPriorityQueue, diff)
}
gotDefault := h.GetEnqueuedMessages(t, r.client)
if diff := cmp.Diff(tc.wantDefault, gotDefault, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.DefaultQueue, diff)
}
gotLow := h.GetLowPriorityMessages(t, r.client)
if diff := cmp.Diff(tc.wantLow, gotLow, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.LowPriorityQueue, diff)
}
gotInProgress := h.GetInProgressMessages(t, r.client) gotInProgress := h.GetInProgressMessages(t, r.client)
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff) t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff)
@ -296,7 +402,7 @@ func TestRetry(t *testing.T) {
ID: t1.ID, ID: t1.ID,
Type: t1.Type, Type: t1.Type,
Payload: t1.Payload, Payload: t1.Payload,
Queue: t1.Queue, Priority: t1.Priority,
Retry: t1.Retry, Retry: t1.Retry,
Retried: t1.Retried + 1, Retried: t1.Retried + 1,
ErrorMsg: errMsg, ErrorMsg: errMsg,
@ -391,7 +497,7 @@ func TestRetryWithMutatedTask(t *testing.T) {
ID: t1.ID, ID: t1.ID,
Type: t1.Type, Type: t1.Type,
Payload: t1.Payload, Payload: t1.Payload,
Queue: t1.Queue, Priority: t1.Priority,
Retry: t1.Retry, Retry: t1.Retry,
Retried: t1.Retried + 1, Retried: t1.Retried + 1,
ErrorMsg: errMsg, ErrorMsg: errMsg,
@ -488,7 +594,7 @@ func TestKill(t *testing.T) {
ID: t1.ID, ID: t1.ID,
Type: t1.Type, Type: t1.Type,
Payload: t1.Payload, Payload: t1.Payload,
Queue: t1.Queue, Priority: t1.Priority,
Retry: t1.Retry, Retry: t1.Retry,
Retried: t1.Retried, Retried: t1.Retried,
ErrorMsg: errMsg, ErrorMsg: errMsg,
@ -591,7 +697,7 @@ func TestKillWithMutatedTask(t *testing.T) {
ID: t1.ID, ID: t1.ID,
Type: t1.Type, Type: t1.Type,
Payload: t1.Payload, Payload: t1.Payload,
Queue: t1.Queue, Priority: t1.Priority,
Retry: t1.Retry, Retry: t1.Retry,
Retried: t1.Retried, Retried: t1.Retried,
ErrorMsg: errMsg, ErrorMsg: errMsg,

View File

@ -102,9 +102,10 @@ func (p *processor) start() {
// exec pulls a task out of the queue and starts a worker goroutine to // exec pulls a task out of the queue and starts a worker goroutine to
// process the task. // process the task.
func (p *processor) exec() { func (p *processor) exec() {
msg, err := p.rdb.Dequeue(p.dequeueTimeout) // TODO(hibiken): sort the queues based on weight, but prevent starvation
if err == rdb.ErrDequeueTimeout { msg, err := p.rdb.Dequeue(base.HighPriorityQueue, base.DefaultQueue, base.LowPriorityQueue)
// timed out, this is a normal behavior. if err == rdb.ErrNoProcessableTask {
// queues are empty, this is a normal behavior.
return return
} }
if err != nil { if err != nil {