mirror of
https://github.com/hibiken/asynq.git
synced 2025-10-25 23:06:12 +08:00
Update ListDeadlineExceeded in RDB
This commit is contained in:
@@ -469,23 +469,25 @@ func (r *RDB) forwardAll(src, dst string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListDeadlineExceeded returns a list of task messages that have exceeded the given deadline.
|
||||
func (r *RDB) ListDeadlineExceeded(deadline time.Time) ([]*base.TaskMessage, error) {
|
||||
// ListDeadlineExceeded returns a list of task messages that have exceeded the deadline from the given queues.
|
||||
func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) {
|
||||
var msgs []*base.TaskMessage
|
||||
opt := &redis.ZRangeBy{
|
||||
Min: "-inf",
|
||||
Max: strconv.FormatInt(deadline.Unix(), 10),
|
||||
}
|
||||
res, err := r.client.ZRangeByScore(base.KeyDeadlines, opt).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, s := range res {
|
||||
msg, err := base.DecodeMessage(s)
|
||||
for _, qname := range qnames {
|
||||
res, err := r.client.ZRangeByScore(base.DeadlinesKey(qname), opt).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
msgs = append(msgs, msg)
|
||||
for _, s := range res {
|
||||
msg, err := base.DecodeMessage(s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
msgs = append(msgs, msg)
|
||||
}
|
||||
}
|
||||
return msgs, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user