diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 91397b0..fb99198 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -281,14 +281,52 @@ func (r *RDB) CurrentStats() (*Stats, error) { }, nil } -func (r *RDB) ListEnqueued() ([]*TaskMessage, error) { - return r.rangeList(DefaultQueue) +// ListEnqueued returns all enqueued tasks that are ready to be processed. +func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) { + data, err := r.client.LRange(DefaultQueue, 0, -1).Result() + if err != nil { + return nil, err + } + var tasks []*EnqueuedTask + for _, s := range data { + var msg TaskMessage + err := json.Unmarshal([]byte(s), &msg) + if err != nil { + continue // bad data, ignore and continue + } + tasks = append(tasks, &EnqueuedTask{ + ID: msg.ID, + Type: msg.Type, + Payload: msg.Payload, + }) + } + return tasks, nil } -func (r *RDB) ListInProgress() ([]*TaskMessage, error) { - return r.rangeList(InProgress) +// ListInProgress returns all tasks that are currently being processed. +func (r *RDB) ListInProgress() ([]*InProgressTask, error) { + data, err := r.client.LRange(DefaultQueue, 0, -1).Result() + if err != nil { + return nil, err + } + var tasks []*InProgressTask + for _, s := range data { + var msg TaskMessage + err := json.Unmarshal([]byte(s), &msg) + if err != nil { + continue // bad data, ignore and continue + } + tasks = append(tasks, &InProgressTask{ + ID: msg.ID, + Type: msg.Type, + Payload: msg.Payload, + }) + } + return tasks, nil } +// ListScheduled returns all tasks that are scheduled to be processed +// in the future. func (r *RDB) ListScheduled() ([]*ScheduledTask, error) { data, err := r.client.ZRangeWithScores(Scheduled, 0, -1).Result() if err != nil { @@ -316,6 +354,8 @@ func (r *RDB) ListScheduled() ([]*ScheduledTask, error) { return tasks, nil } +// ListRetry returns all tasks that have failed before and willl be retried +// in the future. func (r *RDB) ListRetry() ([]*RetryTask, error) { data, err := r.client.ZRangeWithScores(Retry, 0, -1).Result() if err != nil { @@ -346,6 +386,7 @@ func (r *RDB) ListRetry() ([]*RetryTask, error) { return tasks, nil } +// ListDead returns all tasks that have exhausted its retry limit. func (r *RDB) ListDead() ([]*DeadTask, error) { data, err := r.client.ZRangeWithScores(Dead, 0, -1).Result() if err != nil { @@ -373,34 +414,3 @@ func (r *RDB) ListDead() ([]*DeadTask, error) { } return tasks, nil } - -func (r *RDB) rangeList(key string) ([]*TaskMessage, error) { - data, err := r.client.LRange(key, 0, -1).Result() - if err != nil { - return nil, err - } - return r.toMessageSlice(data), nil -} - -func (r *RDB) rangeZSet(key string) ([]*TaskMessage, error) { - data, err := r.client.ZRange(key, 0, -1).Result() - if err != nil { - return nil, err - } - return r.toMessageSlice(data), nil -} - -// toMessageSlice convers json strings to a slice of task messages. -func (r *RDB) toMessageSlice(data []string) []*TaskMessage { - var msgs []*TaskMessage - for _, s := range data { - var msg TaskMessage - err := json.Unmarshal([]byte(s), &msg) - if err != nil { - // bad data; ignore and continue - continue - } - msgs = append(msgs, &msg) - } - return msgs -}