diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index f6c63ff..6848da9 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -103,25 +103,25 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { } // Dequeue queries given queues in order and pops a task message -// off a queue if one exists and returns the message and deadline in Unix time in seconds. +// off a queue if one exists and returns the message and deadline. // Dequeue skips a queue if the queue is paused. // If all queues are empty, ErrNoProcessableTask error is returned. -func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline int, err error) { +func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error) { var qkeys []interface{} for _, q := range qnames { qkeys = append(qkeys, base.QueueKey(q)) } - data, deadline, err := r.dequeue(qkeys...) + data, d, err := r.dequeue(qkeys...) if err == redis.Nil { - return nil, 0, ErrNoProcessableTask + return nil, time.Time{}, ErrNoProcessableTask } if err != nil { - return nil, 0, err + return nil, time.Time{}, err } if msg, err = base.DecodeMessage(data); err != nil { - return nil, 0, err + return nil, time.Time{}, err } - return msg, deadline, nil + return msg, time.Unix(d, 0), nil } // KEYS[1] -> asynq:in_progress @@ -160,7 +160,7 @@ for i = 2, table.getn(ARGV) do end return nil`) -func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int, err error) { +func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int64, err error) { var args []interface{} args = append(args, time.Now().Unix()) args = append(args, qkeys...) @@ -179,7 +179,7 @@ func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int, err e if msgjson, err = cast.ToStringE(data[0]); err != nil { return "", 0, err } - if deadline, err = cast.ToIntE(data[1]); err != nil { + if deadline, err = cast.ToInt64E(data[1]); err != nil { return "", 0, err } return msgjson, deadline, nil diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index e298785..34930e8 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -144,7 +144,7 @@ func TestDequeue(t *testing.T) { enqueued map[string][]*base.TaskMessage args []string // list of queues to query wantMsg *base.TaskMessage - wantDeadline int + wantDeadline time.Time err error wantEnqueued map[string][]*base.TaskMessage wantInProgress []*base.TaskMessage @@ -156,7 +156,7 @@ func TestDequeue(t *testing.T) { }, args: []string{"default"}, wantMsg: t1, - wantDeadline: t1Deadline, + wantDeadline: time.Unix(int64(t1Deadline), 0), err: nil, wantEnqueued: map[string][]*base.TaskMessage{ "default": {}, @@ -175,7 +175,7 @@ func TestDequeue(t *testing.T) { }, args: []string{"default"}, wantMsg: nil, - wantDeadline: 0, + wantDeadline: time.Time{}, err: ErrNoProcessableTask, wantEnqueued: map[string][]*base.TaskMessage{ "default": {}, @@ -191,7 +191,7 @@ func TestDequeue(t *testing.T) { }, args: []string{"critical", "default", "low"}, wantMsg: t2, - wantDeadline: t2Deadline, + wantDeadline: time.Unix(int64(t2Deadline), 0), err: nil, wantEnqueued: map[string][]*base.TaskMessage{ "default": {t1}, @@ -214,7 +214,7 @@ func TestDequeue(t *testing.T) { }, args: []string{"critical", "default", "low"}, wantMsg: t3, - wantDeadline: t3Deadline, + wantDeadline: time.Unix(int64(t3Deadline), 0), err: nil, wantEnqueued: map[string][]*base.TaskMessage{ "default": {}, @@ -237,7 +237,7 @@ func TestDequeue(t *testing.T) { }, args: []string{"critical", "default", "low"}, wantMsg: nil, - wantDeadline: 0, + wantDeadline: time.Time{}, err: ErrNoProcessableTask, wantEnqueued: map[string][]*base.TaskMessage{ "default": {},