mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Update Dequeue operation to skip paused queues
This commit is contained in:
parent
4595bd41c3
commit
363cfedb49
@ -103,19 +103,14 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Dequeue queries given queues in order and pops a task message if there is one and returns it.
|
// Dequeue queries given queues in order and pops a task message if there is one and returns it.
|
||||||
|
// Dequeue skips a queue if the queue is paused.
|
||||||
// If all queues are empty, ErrNoProcessableTask error is returned.
|
// If all queues are empty, ErrNoProcessableTask error is returned.
|
||||||
func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) {
|
func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) {
|
||||||
var data string
|
|
||||||
var err error
|
|
||||||
if len(qnames) == 1 {
|
|
||||||
data, err = r.dequeueSingle(base.QueueKey(qnames[0]))
|
|
||||||
} else {
|
|
||||||
var keys []string
|
var keys []string
|
||||||
for _, q := range qnames {
|
for _, q := range qnames {
|
||||||
keys = append(keys, base.QueueKey(q))
|
keys = append(keys, base.QueueKey(q))
|
||||||
}
|
}
|
||||||
data, err = r.dequeue(keys...)
|
data, err := r.dequeue(keys...)
|
||||||
}
|
|
||||||
if err == redis.Nil {
|
if err == redis.Nil {
|
||||||
return nil, ErrNoProcessableTask
|
return nil, ErrNoProcessableTask
|
||||||
}
|
}
|
||||||
@ -130,29 +125,30 @@ func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) {
|
|||||||
return &msg, nil
|
return &msg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RDB) dequeueSingle(queue string) (data string, err error) {
|
|
||||||
// timeout needed to avoid blocking forever
|
|
||||||
return r.client.BRPopLPush(queue, base.InProgressQueue, time.Second).Result()
|
|
||||||
}
|
|
||||||
|
|
||||||
// KEYS[1] -> asynq:in_progress
|
// KEYS[1] -> asynq:in_progress
|
||||||
|
// KEYS[2] -> asynq:paused
|
||||||
// ARGV -> List of queues to query in order
|
// ARGV -> List of queues to query in order
|
||||||
|
//
|
||||||
|
// dequeueCmd checks whether a queue is paused first, before
|
||||||
|
// calling RPOPLPUSH to pop a task from the queue.
|
||||||
var dequeueCmd = redis.NewScript(`
|
var dequeueCmd = redis.NewScript(`
|
||||||
local res
|
|
||||||
for _, qkey in ipairs(ARGV) do
|
for _, qkey in ipairs(ARGV) do
|
||||||
res = redis.call("RPOPLPUSH", qkey, KEYS[1])
|
if redis.call("SISMEMBER", KEYS[2], qkey) == 0 then
|
||||||
|
local res = redis.call("RPOPLPUSH", qkey, KEYS[1])
|
||||||
if res then
|
if res then
|
||||||
return res
|
return res
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
return res`)
|
end
|
||||||
|
return nil`)
|
||||||
|
|
||||||
func (r *RDB) dequeue(queues ...string) (data string, err error) {
|
func (r *RDB) dequeue(queues ...string) (data string, err error) {
|
||||||
var args []interface{}
|
var args []interface{}
|
||||||
for _, qkey := range queues {
|
for _, qkey := range queues {
|
||||||
args = append(args, qkey)
|
args = append(args, qkey)
|
||||||
}
|
}
|
||||||
res, err := dequeueCmd.Run(r.client, []string{base.InProgressQueue}, args...).Result()
|
res, err := dequeueCmd.Run(r.client,
|
||||||
|
[]string{base.InProgressQueue, base.PausedQueues}, args...).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
@ -227,6 +227,97 @@ func TestDequeue(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDequeueIgnoresPausedQueues(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello!"})
|
||||||
|
t2 := h.NewTaskMessage("export_csv", nil)
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
paused []string // list of paused queues
|
||||||
|
enqueued map[string][]*base.TaskMessage
|
||||||
|
args []string // list of queues to query
|
||||||
|
want *base.TaskMessage
|
||||||
|
err error
|
||||||
|
wantEnqueued map[string][]*base.TaskMessage
|
||||||
|
wantInProgress []*base.TaskMessage
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
paused: []string{"default"},
|
||||||
|
enqueued: map[string][]*base.TaskMessage{
|
||||||
|
"default": {t1},
|
||||||
|
"critical": {t2},
|
||||||
|
},
|
||||||
|
args: []string{"default", "critical"},
|
||||||
|
want: t2,
|
||||||
|
err: nil,
|
||||||
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
|
"default": {t1},
|
||||||
|
"critical": {},
|
||||||
|
},
|
||||||
|
wantInProgress: []*base.TaskMessage{t2},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
paused: []string{"default"},
|
||||||
|
enqueued: map[string][]*base.TaskMessage{
|
||||||
|
"default": {t1},
|
||||||
|
},
|
||||||
|
args: []string{"default"},
|
||||||
|
want: nil,
|
||||||
|
err: ErrNoProcessableTask,
|
||||||
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
|
"default": {t1},
|
||||||
|
},
|
||||||
|
wantInProgress: []*base.TaskMessage{},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
paused: []string{"critical", "default"},
|
||||||
|
enqueued: map[string][]*base.TaskMessage{
|
||||||
|
"default": {t1},
|
||||||
|
"critical": {t2},
|
||||||
|
},
|
||||||
|
args: []string{"default", "critical"},
|
||||||
|
want: nil,
|
||||||
|
err: ErrNoProcessableTask,
|
||||||
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
|
"default": {t1},
|
||||||
|
"critical": {t2},
|
||||||
|
},
|
||||||
|
wantInProgress: []*base.TaskMessage{},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
h.FlushDB(t, r.client) // clean up db before each test case
|
||||||
|
for _, qname := range tc.paused {
|
||||||
|
if err := r.Pause(qname); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for queue, msgs := range tc.enqueued {
|
||||||
|
h.SeedEnqueuedQueue(t, r.client, msgs, queue)
|
||||||
|
}
|
||||||
|
|
||||||
|
got, err := r.Dequeue(tc.args...)
|
||||||
|
if !cmp.Equal(got, tc.want) || err != tc.err {
|
||||||
|
t.Errorf("Dequeue(%v) = %v, %v; want %v, %v",
|
||||||
|
tc.args, got, err, tc.want, tc.err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
for queue, want := range tc.wantEnqueued {
|
||||||
|
gotEnqueued := h.GetEnqueuedMessages(t, r.client, queue)
|
||||||
|
if diff := cmp.Diff(want, gotEnqueued, h.SortMsgOpt); diff != "" {
|
||||||
|
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
gotInProgress := h.GetInProgressMessages(t, r.client)
|
||||||
|
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" {
|
||||||
|
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestDone(t *testing.T) {
|
func TestDone(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
t1 := h.NewTaskMessage("send_email", nil)
|
t1 := h.NewTaskMessage("send_email", nil)
|
||||||
|
12
processor.go
12
processor.go
@ -166,14 +166,12 @@ func (p *processor) exec() {
|
|||||||
msg, err := p.broker.Dequeue(qnames...)
|
msg, err := p.broker.Dequeue(qnames...)
|
||||||
switch {
|
switch {
|
||||||
case err == rdb.ErrNoProcessableTask:
|
case err == rdb.ErrNoProcessableTask:
|
||||||
// queues are empty, this is a normal behavior.
|
|
||||||
if len(qnames) > 1 {
|
|
||||||
// sleep to avoid slamming redis and let scheduler move tasks into queues.
|
|
||||||
// Note: With multiple queues, we are not using blocking pop operation and
|
|
||||||
// polling queues instead. This adds significant load to redis.
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
|
||||||
p.logger.Debug("All queues are empty")
|
p.logger.Debug("All queues are empty")
|
||||||
|
// Queues are empty, this is a normal behavior.
|
||||||
|
// Sleep to avoid slamming redis and let scheduler move tasks into queues.
|
||||||
|
// Note: We are not using blocking pop operation and polling queues instead.
|
||||||
|
// This adds significant load to redis.
|
||||||
|
time.Sleep(time.Second)
|
||||||
return
|
return
|
||||||
case err != nil:
|
case err != nil:
|
||||||
if p.errLogLimiter.Allow() {
|
if p.errLogLimiter.Allow() {
|
||||||
|
Loading…
Reference in New Issue
Block a user