mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-15 20:08:46 +08:00
Update RDB.Dequeue to return message and deadline
This commit is contained in:
parent
5afb4861a5
commit
4ea58052f8
@ -267,7 +267,7 @@ func (c *Cancelations) GetAll() []context.CancelFunc {
|
|||||||
type Broker interface {
|
type Broker interface {
|
||||||
Enqueue(msg *TaskMessage) error
|
Enqueue(msg *TaskMessage) error
|
||||||
EnqueueUnique(msg *TaskMessage, ttl time.Duration) error
|
EnqueueUnique(msg *TaskMessage, ttl time.Duration) error
|
||||||
Dequeue(qnames ...string) (*TaskMessage, error)
|
Dequeue(qnames ...string) (*TaskMessage, int, error)
|
||||||
Done(msg *TaskMessage) error
|
Done(msg *TaskMessage) error
|
||||||
Requeue(msg *TaskMessage) error
|
Requeue(msg *TaskMessage) error
|
||||||
Schedule(msg *TaskMessage, processAt time.Time) error
|
Schedule(msg *TaskMessage, processAt time.Time) error
|
||||||
|
@ -102,22 +102,26 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dequeue queries given queues in order and pops a task message if there is one and returns it.
|
// Dequeue queries given queues in order and pops a task message
|
||||||
|
// off a queue if one exists and returns the message and deadline in Unix time in seconds.
|
||||||
// Dequeue skips a queue if the queue is paused.
|
// Dequeue skips a queue if the queue is paused.
|
||||||
// If all queues are empty, ErrNoProcessableTask error is returned.
|
// If all queues are empty, ErrNoProcessableTask error is returned.
|
||||||
func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) {
|
func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline int, err error) {
|
||||||
var qkeys []interface{}
|
var qkeys []interface{}
|
||||||
for _, q := range qnames {
|
for _, q := range qnames {
|
||||||
qkeys = append(qkeys, base.QueueKey(q))
|
qkeys = append(qkeys, base.QueueKey(q))
|
||||||
}
|
}
|
||||||
data, err := r.dequeue(qkeys...)
|
data, deadline, err := r.dequeue(qkeys...)
|
||||||
if err == redis.Nil {
|
if err == redis.Nil {
|
||||||
return nil, ErrNoProcessableTask
|
return nil, 0, ErrNoProcessableTask
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
return base.DecodeMessage(data)
|
if msg, err = base.DecodeMessage(data); err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
return msg, deadline, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:in_progress
|
// KEYS[1] -> asynq:in_progress
|
||||||
@ -134,9 +138,9 @@ var dequeueCmd = redis.NewScript(`
|
|||||||
for i = 2, table.getn(ARGV) do
|
for i = 2, table.getn(ARGV) do
|
||||||
local qkey = ARGV[i]
|
local qkey = ARGV[i]
|
||||||
if redis.call("SISMEMBER", KEYS[2], qkey) == 0 then
|
if redis.call("SISMEMBER", KEYS[2], qkey) == 0 then
|
||||||
local res = redis.call("RPOPLPUSH", qkey, KEYS[1])
|
local msg = redis.call("RPOPLPUSH", qkey, KEYS[1])
|
||||||
if res then
|
if msg then
|
||||||
local decoded = cjson.decode(res)
|
local decoded = cjson.decode(msg)
|
||||||
local timeout = decoded["Timeout"]
|
local timeout = decoded["Timeout"]
|
||||||
local deadline = decoded["Deadline"]
|
local deadline = decoded["Deadline"]
|
||||||
local score
|
local score
|
||||||
@ -149,23 +153,36 @@ for i = 2, table.getn(ARGV) do
|
|||||||
else
|
else
|
||||||
return redis.error_reply("asynq internal error: both timeout and deadline are not set")
|
return redis.error_reply("asynq internal error: both timeout and deadline are not set")
|
||||||
end
|
end
|
||||||
redis.call("ZADD", KEYS[3], score, res)
|
redis.call("ZADD", KEYS[3], score, msg)
|
||||||
return res
|
return {msg, score}
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
return nil`)
|
return nil`)
|
||||||
|
|
||||||
func (r *RDB) dequeue(qkeys ...interface{}) (data string, err error) {
|
func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int, err error) {
|
||||||
var args []interface{}
|
var args []interface{}
|
||||||
args = append(args, time.Now().Unix())
|
args = append(args, time.Now().Unix())
|
||||||
args = append(args, qkeys...)
|
args = append(args, qkeys...)
|
||||||
res, err := dequeueCmd.Run(r.client,
|
res, err := dequeueCmd.Run(r.client,
|
||||||
[]string{base.InProgressQueue, base.PausedQueues, base.KeyDeadlines}, args...).Result()
|
[]string{base.InProgressQueue, base.PausedQueues, base.KeyDeadlines}, args...).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return "", 0, err
|
||||||
}
|
}
|
||||||
return cast.ToStringE(res)
|
data, err := cast.ToSliceE(res)
|
||||||
|
if err != nil {
|
||||||
|
return "", 0, err
|
||||||
|
}
|
||||||
|
if len(data) != 2 {
|
||||||
|
return "", 0, fmt.Errorf("asynq: internal error: dequeue command returned %v values", len(data))
|
||||||
|
}
|
||||||
|
if msgjson, err = cast.ToStringE(data[0]); err != nil {
|
||||||
|
return "", 0, err
|
||||||
|
}
|
||||||
|
if deadline, err = cast.ToIntE(data[1]); err != nil {
|
||||||
|
return "", 0, err
|
||||||
|
}
|
||||||
|
return msgjson, deadline, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:in_progress
|
// KEYS[1] -> asynq:in_progress
|
||||||
|
@ -143,7 +143,8 @@ func TestDequeue(t *testing.T) {
|
|||||||
tests := []struct {
|
tests := []struct {
|
||||||
enqueued map[string][]*base.TaskMessage
|
enqueued map[string][]*base.TaskMessage
|
||||||
args []string // list of queues to query
|
args []string // list of queues to query
|
||||||
want *base.TaskMessage
|
wantMsg *base.TaskMessage
|
||||||
|
wantDeadline int
|
||||||
err error
|
err error
|
||||||
wantEnqueued map[string][]*base.TaskMessage
|
wantEnqueued map[string][]*base.TaskMessage
|
||||||
wantInProgress []*base.TaskMessage
|
wantInProgress []*base.TaskMessage
|
||||||
@ -154,7 +155,8 @@ func TestDequeue(t *testing.T) {
|
|||||||
"default": {t1},
|
"default": {t1},
|
||||||
},
|
},
|
||||||
args: []string{"default"},
|
args: []string{"default"},
|
||||||
want: t1,
|
wantMsg: t1,
|
||||||
|
wantDeadline: t1Deadline,
|
||||||
err: nil,
|
err: nil,
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"default": {},
|
"default": {},
|
||||||
@ -172,7 +174,8 @@ func TestDequeue(t *testing.T) {
|
|||||||
"default": {},
|
"default": {},
|
||||||
},
|
},
|
||||||
args: []string{"default"},
|
args: []string{"default"},
|
||||||
want: nil,
|
wantMsg: nil,
|
||||||
|
wantDeadline: 0,
|
||||||
err: ErrNoProcessableTask,
|
err: ErrNoProcessableTask,
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"default": {},
|
"default": {},
|
||||||
@ -187,7 +190,8 @@ func TestDequeue(t *testing.T) {
|
|||||||
"low": {t3},
|
"low": {t3},
|
||||||
},
|
},
|
||||||
args: []string{"critical", "default", "low"},
|
args: []string{"critical", "default", "low"},
|
||||||
want: t2,
|
wantMsg: t2,
|
||||||
|
wantDeadline: t2Deadline,
|
||||||
err: nil,
|
err: nil,
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"default": {t1},
|
"default": {t1},
|
||||||
@ -209,7 +213,8 @@ func TestDequeue(t *testing.T) {
|
|||||||
"low": {t2, t1},
|
"low": {t2, t1},
|
||||||
},
|
},
|
||||||
args: []string{"critical", "default", "low"},
|
args: []string{"critical", "default", "low"},
|
||||||
want: t3,
|
wantMsg: t3,
|
||||||
|
wantDeadline: t3Deadline,
|
||||||
err: nil,
|
err: nil,
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"default": {},
|
"default": {},
|
||||||
@ -231,7 +236,8 @@ func TestDequeue(t *testing.T) {
|
|||||||
"low": {},
|
"low": {},
|
||||||
},
|
},
|
||||||
args: []string{"critical", "default", "low"},
|
args: []string{"critical", "default", "low"},
|
||||||
want: nil,
|
wantMsg: nil,
|
||||||
|
wantDeadline: 0,
|
||||||
err: ErrNoProcessableTask,
|
err: ErrNoProcessableTask,
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"default": {},
|
"default": {},
|
||||||
@ -249,10 +255,20 @@ func TestDequeue(t *testing.T) {
|
|||||||
h.SeedEnqueuedQueue(t, r.client, msgs, queue)
|
h.SeedEnqueuedQueue(t, r.client, msgs, queue)
|
||||||
}
|
}
|
||||||
|
|
||||||
got, err := r.Dequeue(tc.args...)
|
gotMsg, gotDeadline, err := r.Dequeue(tc.args...)
|
||||||
if !cmp.Equal(got, tc.want) || err != tc.err {
|
if err != tc.err {
|
||||||
t.Errorf("(*RDB).Dequeue(%v) = %v, %v; want %v, %v",
|
t.Errorf("(*RDB).Dequeue(%v) returned error %v; want %v",
|
||||||
tc.args, got, err, tc.want, tc.err)
|
tc.args, err, tc.err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !cmp.Equal(gotMsg, tc.wantMsg) || err != tc.err {
|
||||||
|
t.Errorf("(*RDB).Dequeue(%v) returned message %v; want %v",
|
||||||
|
tc.args, gotMsg, tc.wantMsg)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if gotDeadline != tc.wantDeadline {
|
||||||
|
t.Errorf("(*RDB).Dequeue(%v) returned deadline %v; want %v",
|
||||||
|
tc.args, gotDeadline, tc.wantDeadline)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -284,7 +300,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
|
|||||||
paused []string // list of paused queues
|
paused []string // list of paused queues
|
||||||
enqueued map[string][]*base.TaskMessage
|
enqueued map[string][]*base.TaskMessage
|
||||||
args []string // list of queues to query
|
args []string // list of queues to query
|
||||||
want *base.TaskMessage
|
wantMsg *base.TaskMessage
|
||||||
err error
|
err error
|
||||||
wantEnqueued map[string][]*base.TaskMessage
|
wantEnqueued map[string][]*base.TaskMessage
|
||||||
wantInProgress []*base.TaskMessage
|
wantInProgress []*base.TaskMessage
|
||||||
@ -296,7 +312,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
|
|||||||
"critical": {t2},
|
"critical": {t2},
|
||||||
},
|
},
|
||||||
args: []string{"default", "critical"},
|
args: []string{"default", "critical"},
|
||||||
want: t2,
|
wantMsg: t2,
|
||||||
err: nil,
|
err: nil,
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"default": {t1},
|
"default": {t1},
|
||||||
@ -310,7 +326,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
|
|||||||
"default": {t1},
|
"default": {t1},
|
||||||
},
|
},
|
||||||
args: []string{"default"},
|
args: []string{"default"},
|
||||||
want: nil,
|
wantMsg: nil,
|
||||||
err: ErrNoProcessableTask,
|
err: ErrNoProcessableTask,
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"default": {t1},
|
"default": {t1},
|
||||||
@ -324,7 +340,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
|
|||||||
"critical": {t2},
|
"critical": {t2},
|
||||||
},
|
},
|
||||||
args: []string{"default", "critical"},
|
args: []string{"default", "critical"},
|
||||||
want: nil,
|
wantMsg: nil,
|
||||||
err: ErrNoProcessableTask,
|
err: ErrNoProcessableTask,
|
||||||
wantEnqueued: map[string][]*base.TaskMessage{
|
wantEnqueued: map[string][]*base.TaskMessage{
|
||||||
"default": {t1},
|
"default": {t1},
|
||||||
@ -345,10 +361,10 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
|
|||||||
h.SeedEnqueuedQueue(t, r.client, msgs, queue)
|
h.SeedEnqueuedQueue(t, r.client, msgs, queue)
|
||||||
}
|
}
|
||||||
|
|
||||||
got, err := r.Dequeue(tc.args...)
|
got, _, err := r.Dequeue(tc.args...)
|
||||||
if !cmp.Equal(got, tc.want) || err != tc.err {
|
if !cmp.Equal(got, tc.wantMsg) || err != tc.err {
|
||||||
t.Errorf("Dequeue(%v) = %v, %v; want %v, %v",
|
t.Errorf("Dequeue(%v) = %v, %v; want %v, %v",
|
||||||
tc.args, got, err, tc.want, tc.err)
|
tc.args, got, err, tc.wantMsg, tc.err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,11 +60,11 @@ func (tb *TestBroker) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) er
|
|||||||
return tb.real.EnqueueUnique(msg, ttl)
|
return tb.real.EnqueueUnique(msg, ttl)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, error) {
|
func (tb *TestBroker) Dequeue(qnames ...string) (*base.TaskMessage, int, error) {
|
||||||
tb.mu.Lock()
|
tb.mu.Lock()
|
||||||
defer tb.mu.Unlock()
|
defer tb.mu.Unlock()
|
||||||
if tb.sleeping {
|
if tb.sleeping {
|
||||||
return nil, errRedisDown
|
return nil, 0, errRedisDown
|
||||||
}
|
}
|
||||||
return tb.real.Dequeue(qnames...)
|
return tb.real.Dequeue(qnames...)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user