2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 07:12:17 +08:00

Implement all "list tasks" methods in rdb

This commit is contained in:
Ken Hibino 2019-12-04 17:10:39 -08:00
parent afacc31990
commit 918f33d37d

View File

@ -281,14 +281,52 @@ func (r *RDB) CurrentStats() (*Stats, error) {
}, nil
}
func (r *RDB) ListEnqueued() ([]*TaskMessage, error) {
return r.rangeList(DefaultQueue)
// ListEnqueued returns all enqueued tasks that are ready to be processed.
func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) {
data, err := r.client.LRange(DefaultQueue, 0, -1).Result()
if err != nil {
return nil, err
}
var tasks []*EnqueuedTask
for _, s := range data {
var msg TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
}
tasks = append(tasks, &EnqueuedTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
})
}
return tasks, nil
}
func (r *RDB) ListInProgress() ([]*TaskMessage, error) {
return r.rangeList(InProgress)
// ListInProgress returns all tasks that are currently being processed.
func (r *RDB) ListInProgress() ([]*InProgressTask, error) {
data, err := r.client.LRange(DefaultQueue, 0, -1).Result()
if err != nil {
return nil, err
}
var tasks []*InProgressTask
for _, s := range data {
var msg TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
continue // bad data, ignore and continue
}
tasks = append(tasks, &InProgressTask{
ID: msg.ID,
Type: msg.Type,
Payload: msg.Payload,
})
}
return tasks, nil
}
// ListScheduled returns all tasks that are scheduled to be processed
// in the future.
func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
data, err := r.client.ZRangeWithScores(Scheduled, 0, -1).Result()
if err != nil {
@ -316,6 +354,8 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
return tasks, nil
}
// ListRetry returns all tasks that have failed before and willl be retried
// in the future.
func (r *RDB) ListRetry() ([]*RetryTask, error) {
data, err := r.client.ZRangeWithScores(Retry, 0, -1).Result()
if err != nil {
@ -346,6 +386,7 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) {
return tasks, nil
}
// ListDead returns all tasks that have exhausted its retry limit.
func (r *RDB) ListDead() ([]*DeadTask, error) {
data, err := r.client.ZRangeWithScores(Dead, 0, -1).Result()
if err != nil {
@ -373,34 +414,3 @@ func (r *RDB) ListDead() ([]*DeadTask, error) {
}
return tasks, nil
}
func (r *RDB) rangeList(key string) ([]*TaskMessage, error) {
data, err := r.client.LRange(key, 0, -1).Result()
if err != nil {
return nil, err
}
return r.toMessageSlice(data), nil
}
func (r *RDB) rangeZSet(key string) ([]*TaskMessage, error) {
data, err := r.client.ZRange(key, 0, -1).Result()
if err != nil {
return nil, err
}
return r.toMessageSlice(data), nil
}
// toMessageSlice convers json strings to a slice of task messages.
func (r *RDB) toMessageSlice(data []string) []*TaskMessage {
var msgs []*TaskMessage
for _, s := range data {
var msg TaskMessage
err := json.Unmarshal([]byte(s), &msg)
if err != nil {
// bad data; ignore and continue
continue
}
msgs = append(msgs, &msg)
}
return msgs
}