2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-02-23 20:30:19 +08:00

Change RDB.Dequeue to query multiple queues

This commit is contained in:
Ken Hibino 2020-01-07 06:28:34 -08:00
parent 2af9eb2c88
commit 53d0902808
7 changed files with 147 additions and 33 deletions

View File

@ -108,10 +108,16 @@ func FlushDB(tb testing.TB, r *redis.Client) {
} }
} }
// SeedDefaultQueue initializes the default queue with the given messages. // SeedEnqueuedQueue initializes the specified queue with the given messages.
func SeedDefaultQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage) { //
// If queue name option is not passed, it defaults to the default queue.
func SeedEnqueuedQueue(tb testing.TB, r *redis.Client, msgs []*base.TaskMessage, queueOpt ...string) {
tb.Helper() tb.Helper()
seedRedisList(tb, r, base.DefaultQueue, msgs) queue := base.DefaultQueue
if len(queueOpt) > 0 {
queue = base.QueueKey(queueOpt[0])
}
seedRedisList(tb, r, queue, msgs)
} }
// SeedInProgressQueue initializes the in-progress queue with the given messages. // SeedInProgressQueue initializes the in-progress queue with the given messages.

View File

@ -119,7 +119,7 @@ func TestCurrentStats(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedDefaultQueue(t, r.client, tc.enqueued) h.SeedEnqueuedQueue(t, r.client, tc.enqueued)
h.SeedInProgressQueue(t, r.client, tc.inProgress) h.SeedInProgressQueue(t, r.client, tc.inProgress)
h.SeedScheduledQueue(t, r.client, tc.scheduled) h.SeedScheduledQueue(t, r.client, tc.scheduled)
h.SeedRetryQueue(t, r.client, tc.retry) h.SeedRetryQueue(t, r.client, tc.retry)
@ -262,7 +262,7 @@ func TestListEnqueued(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedDefaultQueue(t, r.client, tc.enqueued) h.SeedEnqueuedQueue(t, r.client, tc.enqueued)
got, err := r.ListEnqueued() got, err := r.ListEnqueued()
if err != nil { if err != nil {

View File

@ -13,11 +13,12 @@ import (
"github.com/go-redis/redis/v7" "github.com/go-redis/redis/v7"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/spf13/cast"
) )
var ( var (
// ErrDequeueTimeout indicates that the blocking dequeue operation timed out. // ErrNoProcessableTask indicates that there are no tasks ready to be processed.
ErrDequeueTimeout = errors.New("blocking dequeue operation timed out") ErrNoProcessableTask = errors.New("no tasks are ready for processing")
// ErrTaskNotFound indicates that a task that matches the given identifier was not found. // ErrTaskNotFound indicates that a task that matches the given identifier was not found.
ErrTaskNotFound = errors.New("could not find a task") ErrTaskNotFound = errors.New("could not find a task")
@ -50,14 +51,17 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
return r.client.LPush(key, string(bytes)).Err() return r.client.LPush(key, string(bytes)).Err()
} }
// Dequeue blocks until there is a task available to be processed, // Dequeue queries given queues in order and pops a task message if there
// once a task is available, it adds the task to "in progress" queue // is one and returns it. If all queues are empty, ErrNoProcessableTask
// and returns the task. If there are no tasks for the entire timeout // error is returned.
// duration, it returns ErrDequeueTimeout. func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) {
func (r *RDB) Dequeue(timeout time.Duration) (*base.TaskMessage, error) { var keys []string
data, err := r.client.BRPopLPush(base.DefaultQueue, base.InProgressQueue, timeout).Result() for _, q := range qnames {
keys = append(keys, base.QueueKey(q))
}
data, err := r.dequeue(keys...)
if err == redis.Nil { if err == redis.Nil {
return nil, ErrDequeueTimeout return nil, ErrNoProcessableTask
} }
if err != nil { if err != nil {
return nil, err return nil, err
@ -70,6 +74,28 @@ func (r *RDB) Dequeue(timeout time.Duration) (*base.TaskMessage, error) {
return &msg, nil return &msg, nil
} }
func (r *RDB) dequeue(queues ...string) (data string, err error) {
var args []interface{}
for _, qkey := range queues {
args = append(args, qkey)
}
script := redis.NewScript(`
local res
for _, qkey in ipairs(ARGV) do
res = redis.call("RPOPLPUSH", qkey, KEYS[1])
if res then
return res
end
end
return res
`)
res, err := script.Run(r.client, []string{base.InProgressQueue}, args...).Result()
if err != nil {
return "", err
}
return cast.ToStringE(res)
}
// Done removes the task from in-progress queue to mark the task as done. // Done removes the task from in-progress queue to mark the task as done.
func (r *RDB) Done(msg *base.TaskMessage) error { func (r *RDB) Done(msg *base.TaskMessage) error {
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)

View File

@ -59,37 +59,111 @@ func TestEnqueue(t *testing.T) {
func TestDequeue(t *testing.T) { func TestDequeue(t *testing.T) {
r := setup(t) r := setup(t)
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"}) t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"})
t2 := h.NewTaskMessage("export_csv", nil)
t3 := h.NewTaskMessage("reindex", nil)
tests := []struct { tests := []struct {
enqueued []*base.TaskMessage enqueued map[string][]*base.TaskMessage
args []string // list of queues to query
want *base.TaskMessage want *base.TaskMessage
err error err error
wantEnqueued map[string][]*base.TaskMessage
wantInProgress []*base.TaskMessage wantInProgress []*base.TaskMessage
}{ }{
{ {
enqueued: []*base.TaskMessage{t1}, enqueued: map[string][]*base.TaskMessage{
"default": {t1},
},
args: []string{"default"},
want: t1, want: t1,
err: nil, err: nil,
wantEnqueued: map[string][]*base.TaskMessage{
"default": {},
},
wantInProgress: []*base.TaskMessage{t1}, wantInProgress: []*base.TaskMessage{t1},
}, },
{ {
enqueued: []*base.TaskMessage{}, enqueued: map[string][]*base.TaskMessage{
"default": {},
},
args: []string{"default"},
want: nil, want: nil,
err: ErrDequeueTimeout, err: ErrNoProcessableTask,
wantEnqueued: map[string][]*base.TaskMessage{
"default": {},
},
wantInProgress: []*base.TaskMessage{},
},
{
enqueued: map[string][]*base.TaskMessage{
"default": {t1},
"critical": {t2},
"low": {t3},
},
args: []string{"critical", "default", "low"},
want: t2,
err: nil,
wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1},
"critical": {},
"low": {t3},
},
wantInProgress: []*base.TaskMessage{t2},
},
{
enqueued: map[string][]*base.TaskMessage{
"default": {t1},
"critical": {},
"low": {t2, t3},
},
args: []string{"critical", "default", "low"},
want: t1,
err: nil,
wantEnqueued: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
"low": {t2, t3},
},
wantInProgress: []*base.TaskMessage{t1},
},
{
enqueued: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
"low": {},
},
args: []string{"critical", "default", "low"},
want: nil,
err: ErrNoProcessableTask,
wantEnqueued: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
"low": {},
},
wantInProgress: []*base.TaskMessage{}, wantInProgress: []*base.TaskMessage{},
}, },
} }
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedDefaultQueue(t, r.client, tc.enqueued) for queue, msgs := range tc.enqueued {
h.SeedEnqueuedQueue(t, r.client, msgs, queue)
}
got, err := r.Dequeue(time.Second) got, err := r.Dequeue(tc.args...)
if !cmp.Equal(got, tc.want) || err != tc.err { if !cmp.Equal(got, tc.want) || err != tc.err {
t.Errorf("(*RDB).Dequeue(time.Second) = %v, %v; want %v, %v", t.Errorf("(*RDB).Dequeue(%v) = %v, %v; want %v, %v",
got, err, tc.want, tc.err) tc.args, got, err, tc.want, tc.err)
continue continue
} }
for queue, want := range tc.wantEnqueued {
gotEnqueued := h.GetEnqueuedMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff)
}
}
gotInProgress := h.GetInProgressMessages(t, r.client) gotInProgress := h.GetInProgressMessages(t, r.client)
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff) t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff)
@ -178,7 +252,7 @@ func TestRequeue(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedDefaultQueue(t, r.client, tc.enqueued) h.SeedEnqueuedQueue(t, r.client, tc.enqueued)
h.SeedInProgressQueue(t, r.client, tc.inProgress) h.SeedInProgressQueue(t, r.client, tc.inProgress)
err := r.Requeue(tc.target) err := r.Requeue(tc.target)
@ -468,7 +542,7 @@ func TestRestoreUnfinished(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case h.FlushDB(t, r.client) // clean up db before each test case
h.SeedInProgressQueue(t, r.client, tc.inProgress) h.SeedInProgressQueue(t, r.client, tc.inProgress)
h.SeedDefaultQueue(t, r.client, tc.enqueued) h.SeedEnqueuedQueue(t, r.client, tc.enqueued)
got, err := r.RestoreUnfinished() got, err := r.RestoreUnfinished()
if got != tc.want || err != nil { if got != tc.want || err != nil {

View File

@ -109,9 +109,17 @@ func (p *processor) start() {
// exec pulls a task out of the queue and starts a worker goroutine to // exec pulls a task out of the queue and starts a worker goroutine to
// process the task. // process the task.
func (p *processor) exec() { func (p *processor) exec() {
msg, err := p.rdb.Dequeue(p.dequeueTimeout) // TODO(hibiken): Randomize the order to avoid starving low priority queues
if err == rdb.ErrDequeueTimeout { var qnames []string
// timed out, this is a normal behavior. for q := range p.queueConfig {
qnames = append(qnames, q)
}
msg, err := p.rdb.Dequeue(qnames...)
if err == rdb.ErrNoProcessableTask {
// queues are empty, this is a normal behavior.
// sleep to avoid slamming redis and let scheduler move tasks into queues.
time.Sleep(time.Second)
return return
} }
if err != nil { if err != nil {

View File

@ -52,7 +52,7 @@ func TestProcessorSuccess(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case. h.FlushDB(t, r) // clean up db before each test case.
h.SeedDefaultQueue(t, r, tc.enqueued) // initialize default queue. h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue.
// instantiate a new processor // instantiate a new processor
var mu sync.Mutex var mu sync.Mutex
@ -138,7 +138,7 @@ func TestProcessorRetry(t *testing.T) {
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case. h.FlushDB(t, r) // clean up db before each test case.
h.SeedDefaultQueue(t, r, tc.enqueued) // initialize default queue. h.SeedEnqueuedQueue(t, r, tc.enqueued) // initialize default queue.
// instantiate a new processor // instantiate a new processor
delayFunc := func(n int, e error, t *Task) time.Duration { delayFunc := func(n int, e error, t *Task) time.Duration {

View File

@ -67,7 +67,7 @@ func TestScheduler(t *testing.T) {
h.FlushDB(t, r) // clean up db before each test case. h.FlushDB(t, r) // clean up db before each test case.
h.SeedScheduledQueue(t, r, tc.initScheduled) // initialize scheduled queue h.SeedScheduledQueue(t, r, tc.initScheduled) // initialize scheduled queue
h.SeedRetryQueue(t, r, tc.initRetry) // initialize retry queue h.SeedRetryQueue(t, r, tc.initRetry) // initialize retry queue
h.SeedDefaultQueue(t, r, tc.initQueue) // initialize default queue h.SeedEnqueuedQueue(t, r, tc.initQueue) // initialize default queue
s.start() s.start()
time.Sleep(tc.wait) time.Sleep(tc.wait)