2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

Update RDB.Dequeue to return deadline as time.Time

This commit is contained in:
Ken Hibino 2020-06-18 11:25:33 -07:00
parent 113451ce6a
commit 92af00f9fd
2 changed files with 15 additions and 15 deletions

View File

@ -103,25 +103,25 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
} }
// Dequeue queries given queues in order and pops a task message // Dequeue queries given queues in order and pops a task message
// off a queue if one exists and returns the message and deadline in Unix time in seconds. // off a queue if one exists and returns the message and deadline.
// Dequeue skips a queue if the queue is paused. // Dequeue skips a queue if the queue is paused.
// If all queues are empty, ErrNoProcessableTask error is returned. // If all queues are empty, ErrNoProcessableTask error is returned.
func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline int, err error) { func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error) {
var qkeys []interface{} var qkeys []interface{}
for _, q := range qnames { for _, q := range qnames {
qkeys = append(qkeys, base.QueueKey(q)) qkeys = append(qkeys, base.QueueKey(q))
} }
data, deadline, err := r.dequeue(qkeys...) data, d, err := r.dequeue(qkeys...)
if err == redis.Nil { if err == redis.Nil {
return nil, 0, ErrNoProcessableTask return nil, time.Time{}, ErrNoProcessableTask
} }
if err != nil { if err != nil {
return nil, 0, err return nil, time.Time{}, err
} }
if msg, err = base.DecodeMessage(data); err != nil { if msg, err = base.DecodeMessage(data); err != nil {
return nil, 0, err return nil, time.Time{}, err
} }
return msg, deadline, nil return msg, time.Unix(d, 0), nil
} }
// KEYS[1] -> asynq:in_progress // KEYS[1] -> asynq:in_progress
@ -160,7 +160,7 @@ for i = 2, table.getn(ARGV) do
end end
return nil`) return nil`)
func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int, err error) { func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int64, err error) {
var args []interface{} var args []interface{}
args = append(args, time.Now().Unix()) args = append(args, time.Now().Unix())
args = append(args, qkeys...) args = append(args, qkeys...)
@ -179,7 +179,7 @@ func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int, err e
if msgjson, err = cast.ToStringE(data[0]); err != nil { if msgjson, err = cast.ToStringE(data[0]); err != nil {
return "", 0, err return "", 0, err
} }
if deadline, err = cast.ToIntE(data[1]); err != nil { if deadline, err = cast.ToInt64E(data[1]); err != nil {
return "", 0, err return "", 0, err
} }
return msgjson, deadline, nil return msgjson, deadline, nil

View File

@ -144,7 +144,7 @@ func TestDequeue(t *testing.T) {
enqueued map[string][]*base.TaskMessage enqueued map[string][]*base.TaskMessage
args []string // list of queues to query args []string // list of queues to query
wantMsg *base.TaskMessage wantMsg *base.TaskMessage
wantDeadline int wantDeadline time.Time
err error err error
wantEnqueued map[string][]*base.TaskMessage wantEnqueued map[string][]*base.TaskMessage
wantInProgress []*base.TaskMessage wantInProgress []*base.TaskMessage
@ -156,7 +156,7 @@ func TestDequeue(t *testing.T) {
}, },
args: []string{"default"}, args: []string{"default"},
wantMsg: t1, wantMsg: t1,
wantDeadline: t1Deadline, wantDeadline: time.Unix(int64(t1Deadline), 0),
err: nil, err: nil,
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {}, "default": {},
@ -175,7 +175,7 @@ func TestDequeue(t *testing.T) {
}, },
args: []string{"default"}, args: []string{"default"},
wantMsg: nil, wantMsg: nil,
wantDeadline: 0, wantDeadline: time.Time{},
err: ErrNoProcessableTask, err: ErrNoProcessableTask,
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {}, "default": {},
@ -191,7 +191,7 @@ func TestDequeue(t *testing.T) {
}, },
args: []string{"critical", "default", "low"}, args: []string{"critical", "default", "low"},
wantMsg: t2, wantMsg: t2,
wantDeadline: t2Deadline, wantDeadline: time.Unix(int64(t2Deadline), 0),
err: nil, err: nil,
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1}, "default": {t1},
@ -214,7 +214,7 @@ func TestDequeue(t *testing.T) {
}, },
args: []string{"critical", "default", "low"}, args: []string{"critical", "default", "low"},
wantMsg: t3, wantMsg: t3,
wantDeadline: t3Deadline, wantDeadline: time.Unix(int64(t3Deadline), 0),
err: nil, err: nil,
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {}, "default": {},
@ -237,7 +237,7 @@ func TestDequeue(t *testing.T) {
}, },
args: []string{"critical", "default", "low"}, args: []string{"critical", "default", "low"},
wantMsg: nil, wantMsg: nil,
wantDeadline: 0, wantDeadline: time.Time{},
err: ErrNoProcessableTask, err: ErrNoProcessableTask,
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {}, "default": {},