2019-12-05 12:30:37 +08:00
|
|
|
package rdb
|
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
2019-12-08 22:46:04 +08:00
|
|
|
"fmt"
|
2019-12-26 12:17:00 +08:00
|
|
|
"strconv"
|
2019-12-23 01:09:57 +08:00
|
|
|
"strings"
|
2019-12-05 12:30:37 +08:00
|
|
|
"time"
|
|
|
|
|
2019-12-08 22:46:04 +08:00
|
|
|
"github.com/go-redis/redis/v7"
|
2019-12-22 23:15:45 +08:00
|
|
|
"github.com/hibiken/asynq/internal/base"
|
2019-12-11 23:38:24 +08:00
|
|
|
"github.com/rs/xid"
|
2019-12-05 12:30:37 +08:00
|
|
|
)
|
|
|
|
|
|
|
|
// Stats represents a state of queues at a certain time.
|
|
|
|
type Stats struct {
|
|
|
|
Enqueued int
|
|
|
|
InProgress int
|
|
|
|
Scheduled int
|
|
|
|
Retry int
|
|
|
|
Dead int
|
2019-12-26 12:17:00 +08:00
|
|
|
Processed int
|
|
|
|
Failed int
|
2019-12-05 12:30:37 +08:00
|
|
|
Timestamp time.Time
|
|
|
|
}
|
|
|
|
|
|
|
|
// EnqueuedTask is a task in a queue and is ready to be processed.
|
|
|
|
// Note: This is read only and used for monitoring purpose.
|
|
|
|
type EnqueuedTask struct {
|
2019-12-11 23:38:24 +08:00
|
|
|
ID xid.ID
|
2019-12-05 12:30:37 +08:00
|
|
|
Type string
|
|
|
|
Payload map[string]interface{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// InProgressTask is a task that's currently being processed.
|
|
|
|
// Note: This is read only and used for monitoring purpose.
|
|
|
|
type InProgressTask struct {
|
2019-12-11 23:38:24 +08:00
|
|
|
ID xid.ID
|
2019-12-05 12:30:37 +08:00
|
|
|
Type string
|
|
|
|
Payload map[string]interface{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// ScheduledTask is a task that's scheduled to be processed in the future.
|
|
|
|
// Note: This is read only and used for monitoring purpose.
|
|
|
|
type ScheduledTask struct {
|
2019-12-11 23:38:24 +08:00
|
|
|
ID xid.ID
|
2019-12-05 12:30:37 +08:00
|
|
|
Type string
|
|
|
|
Payload map[string]interface{}
|
|
|
|
ProcessAt time.Time
|
2019-12-09 08:36:08 +08:00
|
|
|
Score int64
|
2019-12-05 12:30:37 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// RetryTask is a task that's in retry queue because worker failed to process the task.
|
|
|
|
// Note: This is read only and used for monitoring purpose.
|
|
|
|
type RetryTask struct {
|
2019-12-11 23:38:24 +08:00
|
|
|
ID xid.ID
|
2019-12-05 12:30:37 +08:00
|
|
|
Type string
|
|
|
|
Payload map[string]interface{}
|
|
|
|
// TODO(hibiken): add LastFailedAt time.Time
|
|
|
|
ProcessAt time.Time
|
|
|
|
ErrorMsg string
|
|
|
|
Retried int
|
|
|
|
Retry int
|
2019-12-09 08:36:08 +08:00
|
|
|
Score int64
|
2019-12-05 12:30:37 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// DeadTask is a task in that has exhausted all retries.
|
|
|
|
// Note: This is read only and used for monitoring purpose.
|
|
|
|
type DeadTask struct {
|
2019-12-11 23:38:24 +08:00
|
|
|
ID xid.ID
|
2019-12-05 12:30:37 +08:00
|
|
|
Type string
|
|
|
|
Payload map[string]interface{}
|
|
|
|
LastFailedAt time.Time
|
|
|
|
ErrorMsg string
|
2019-12-09 08:36:08 +08:00
|
|
|
Score int64
|
2019-12-05 12:30:37 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// CurrentStats returns a current state of the queues.
|
|
|
|
func (r *RDB) CurrentStats() (*Stats, error) {
|
2019-12-26 12:17:00 +08:00
|
|
|
now := time.Now()
|
2019-12-05 12:30:37 +08:00
|
|
|
pipe := r.client.Pipeline()
|
2019-12-22 23:15:45 +08:00
|
|
|
qlen := pipe.LLen(base.DefaultQueue)
|
|
|
|
plen := pipe.LLen(base.InProgressQueue)
|
|
|
|
slen := pipe.ZCard(base.ScheduledQueue)
|
|
|
|
rlen := pipe.ZCard(base.RetryQueue)
|
|
|
|
dlen := pipe.ZCard(base.DeadQueue)
|
2019-12-26 12:17:00 +08:00
|
|
|
pcount := pipe.Get(base.ProcessedKey(now))
|
|
|
|
fcount := pipe.Get(base.FailureKey(now))
|
2019-12-05 12:30:37 +08:00
|
|
|
_, err := pipe.Exec()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2019-12-26 12:17:00 +08:00
|
|
|
p, err := strconv.Atoi(pcount.Val())
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
f, err := strconv.Atoi(fcount.Val())
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2019-12-05 12:30:37 +08:00
|
|
|
return &Stats{
|
|
|
|
Enqueued: int(qlen.Val()),
|
|
|
|
InProgress: int(plen.Val()),
|
|
|
|
Scheduled: int(slen.Val()),
|
|
|
|
Retry: int(rlen.Val()),
|
|
|
|
Dead: int(dlen.Val()),
|
2019-12-26 12:17:00 +08:00
|
|
|
Processed: p,
|
|
|
|
Failed: f,
|
|
|
|
Timestamp: now,
|
2019-12-05 12:30:37 +08:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
2019-12-23 01:09:57 +08:00
|
|
|
// RedisInfo returns a map of redis info.
|
|
|
|
func (r *RDB) RedisInfo() (map[string]string, error) {
|
|
|
|
res, err := r.client.Info().Result()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
info := make(map[string]string)
|
|
|
|
lines := strings.Split(res, "\r\n")
|
|
|
|
for _, l := range lines {
|
|
|
|
kv := strings.Split(l, ":")
|
|
|
|
if len(kv) == 2 {
|
|
|
|
info[kv[0]] = kv[1]
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return info, nil
|
|
|
|
}
|
|
|
|
|
2019-12-05 12:30:37 +08:00
|
|
|
// ListEnqueued returns all enqueued tasks that are ready to be processed.
|
|
|
|
func (r *RDB) ListEnqueued() ([]*EnqueuedTask, error) {
|
2019-12-22 23:15:45 +08:00
|
|
|
data, err := r.client.LRange(base.DefaultQueue, 0, -1).Result()
|
2019-12-05 12:30:37 +08:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
var tasks []*EnqueuedTask
|
|
|
|
for _, s := range data {
|
2019-12-22 23:15:45 +08:00
|
|
|
var msg base.TaskMessage
|
2019-12-05 12:30:37 +08:00
|
|
|
err := json.Unmarshal([]byte(s), &msg)
|
|
|
|
if err != nil {
|
|
|
|
// continue // bad data, ignore and continue
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
tasks = append(tasks, &EnqueuedTask{
|
|
|
|
ID: msg.ID,
|
|
|
|
Type: msg.Type,
|
|
|
|
Payload: msg.Payload,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
return tasks, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// ListInProgress returns all tasks that are currently being processed.
|
|
|
|
func (r *RDB) ListInProgress() ([]*InProgressTask, error) {
|
2019-12-22 23:15:45 +08:00
|
|
|
data, err := r.client.LRange(base.InProgressQueue, 0, -1).Result()
|
2019-12-05 12:30:37 +08:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
var tasks []*InProgressTask
|
|
|
|
for _, s := range data {
|
2019-12-22 23:15:45 +08:00
|
|
|
var msg base.TaskMessage
|
2019-12-05 12:30:37 +08:00
|
|
|
err := json.Unmarshal([]byte(s), &msg)
|
|
|
|
if err != nil {
|
|
|
|
continue // bad data, ignore and continue
|
|
|
|
}
|
|
|
|
tasks = append(tasks, &InProgressTask{
|
|
|
|
ID: msg.ID,
|
|
|
|
Type: msg.Type,
|
|
|
|
Payload: msg.Payload,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
return tasks, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// ListScheduled returns all tasks that are scheduled to be processed
|
|
|
|
// in the future.
|
|
|
|
func (r *RDB) ListScheduled() ([]*ScheduledTask, error) {
|
2019-12-22 23:15:45 +08:00
|
|
|
data, err := r.client.ZRangeWithScores(base.ScheduledQueue, 0, -1).Result()
|
2019-12-05 12:30:37 +08:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
var tasks []*ScheduledTask
|
|
|
|
for _, z := range data {
|
|
|
|
s, ok := z.Member.(string)
|
|
|
|
if !ok {
|
|
|
|
continue // bad data, ignore and continue
|
|
|
|
}
|
2019-12-22 23:15:45 +08:00
|
|
|
var msg base.TaskMessage
|
2019-12-05 12:30:37 +08:00
|
|
|
err := json.Unmarshal([]byte(s), &msg)
|
|
|
|
if err != nil {
|
|
|
|
continue // bad data, ignore and continue
|
|
|
|
}
|
|
|
|
processAt := time.Unix(int64(z.Score), 0)
|
|
|
|
tasks = append(tasks, &ScheduledTask{
|
|
|
|
ID: msg.ID,
|
|
|
|
Type: msg.Type,
|
|
|
|
Payload: msg.Payload,
|
|
|
|
ProcessAt: processAt,
|
2019-12-09 08:36:08 +08:00
|
|
|
Score: int64(z.Score),
|
2019-12-05 12:30:37 +08:00
|
|
|
})
|
|
|
|
}
|
|
|
|
return tasks, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// ListRetry returns all tasks that have failed before and willl be retried
|
|
|
|
// in the future.
|
|
|
|
func (r *RDB) ListRetry() ([]*RetryTask, error) {
|
2019-12-22 23:15:45 +08:00
|
|
|
data, err := r.client.ZRangeWithScores(base.RetryQueue, 0, -1).Result()
|
2019-12-05 12:30:37 +08:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
var tasks []*RetryTask
|
|
|
|
for _, z := range data {
|
|
|
|
s, ok := z.Member.(string)
|
|
|
|
if !ok {
|
|
|
|
continue // bad data, ignore and continue
|
|
|
|
}
|
2019-12-22 23:15:45 +08:00
|
|
|
var msg base.TaskMessage
|
2019-12-05 12:30:37 +08:00
|
|
|
err := json.Unmarshal([]byte(s), &msg)
|
|
|
|
if err != nil {
|
|
|
|
continue // bad data, ignore and continue
|
|
|
|
}
|
|
|
|
processAt := time.Unix(int64(z.Score), 0)
|
|
|
|
tasks = append(tasks, &RetryTask{
|
|
|
|
ID: msg.ID,
|
|
|
|
Type: msg.Type,
|
|
|
|
Payload: msg.Payload,
|
|
|
|
ErrorMsg: msg.ErrorMsg,
|
|
|
|
Retry: msg.Retry,
|
|
|
|
Retried: msg.Retried,
|
|
|
|
ProcessAt: processAt,
|
2019-12-09 08:36:08 +08:00
|
|
|
Score: int64(z.Score),
|
2019-12-05 12:30:37 +08:00
|
|
|
})
|
|
|
|
}
|
|
|
|
return tasks, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// ListDead returns all tasks that have exhausted its retry limit.
|
|
|
|
func (r *RDB) ListDead() ([]*DeadTask, error) {
|
2019-12-22 23:15:45 +08:00
|
|
|
data, err := r.client.ZRangeWithScores(base.DeadQueue, 0, -1).Result()
|
2019-12-05 12:30:37 +08:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
var tasks []*DeadTask
|
|
|
|
for _, z := range data {
|
|
|
|
s, ok := z.Member.(string)
|
|
|
|
if !ok {
|
|
|
|
continue // bad data, ignore and continue
|
|
|
|
}
|
2019-12-22 23:15:45 +08:00
|
|
|
var msg base.TaskMessage
|
2019-12-05 12:30:37 +08:00
|
|
|
err := json.Unmarshal([]byte(s), &msg)
|
|
|
|
if err != nil {
|
|
|
|
continue // bad data, ignore and continue
|
|
|
|
}
|
|
|
|
lastFailedAt := time.Unix(int64(z.Score), 0)
|
|
|
|
tasks = append(tasks, &DeadTask{
|
|
|
|
ID: msg.ID,
|
|
|
|
Type: msg.Type,
|
|
|
|
Payload: msg.Payload,
|
|
|
|
ErrorMsg: msg.ErrorMsg,
|
|
|
|
LastFailedAt: lastFailedAt,
|
2019-12-09 08:36:08 +08:00
|
|
|
Score: int64(z.Score),
|
2019-12-05 12:30:37 +08:00
|
|
|
})
|
|
|
|
}
|
|
|
|
return tasks, nil
|
|
|
|
}
|
2019-12-08 22:46:04 +08:00
|
|
|
|
2019-12-10 11:33:07 +08:00
|
|
|
// EnqueueDeadTask finds a task that matches the given id and score from dead queue
|
2019-12-09 06:17:57 +08:00
|
|
|
// and enqueues it for processing. If a task that matches the id and score
|
|
|
|
// does not exist, it returns ErrTaskNotFound.
|
2019-12-11 23:38:24 +08:00
|
|
|
func (r *RDB) EnqueueDeadTask(id xid.ID, score int64) error {
|
2019-12-22 23:15:45 +08:00
|
|
|
n, err := r.removeAndEnqueue(base.DeadQueue, id.String(), float64(score))
|
2019-12-08 22:46:04 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if n == 0 {
|
|
|
|
return ErrTaskNotFound
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-12-10 11:33:07 +08:00
|
|
|
// EnqueueRetryTask finds a task that matches the given id and score from retry queue
|
2019-12-09 06:17:57 +08:00
|
|
|
// and enqueues it for processing. If a task that matches the id and score
|
|
|
|
// does not exist, it returns ErrTaskNotFound.
|
2019-12-11 23:38:24 +08:00
|
|
|
func (r *RDB) EnqueueRetryTask(id xid.ID, score int64) error {
|
2019-12-22 23:15:45 +08:00
|
|
|
n, err := r.removeAndEnqueue(base.RetryQueue, id.String(), float64(score))
|
2019-12-08 22:46:04 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if n == 0 {
|
|
|
|
return ErrTaskNotFound
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-12-10 11:33:07 +08:00
|
|
|
// EnqueueScheduledTask finds a task that matches the given id and score from scheduled queue
|
2019-12-09 06:17:57 +08:00
|
|
|
// and enqueues it for processing. If a task that matches the id and score does not
|
2019-12-08 22:46:04 +08:00
|
|
|
// exist, it returns ErrTaskNotFound.
|
2019-12-11 23:38:24 +08:00
|
|
|
func (r *RDB) EnqueueScheduledTask(id xid.ID, score int64) error {
|
2019-12-22 23:15:45 +08:00
|
|
|
n, err := r.removeAndEnqueue(base.ScheduledQueue, id.String(), float64(score))
|
2019-12-08 22:46:04 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if n == 0 {
|
|
|
|
return ErrTaskNotFound
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-12-11 13:48:19 +08:00
|
|
|
// EnqueueAllScheduledTasks enqueues all tasks from scheduled queue
|
2019-12-11 13:38:25 +08:00
|
|
|
// and returns the number of tasks enqueued.
|
|
|
|
func (r *RDB) EnqueueAllScheduledTasks() (int64, error) {
|
2019-12-22 23:15:45 +08:00
|
|
|
return r.removeAndEnqueueAll(base.ScheduledQueue)
|
2019-12-11 12:28:31 +08:00
|
|
|
}
|
|
|
|
|
2019-12-11 13:48:19 +08:00
|
|
|
// EnqueueAllRetryTasks enqueues all tasks from retry queue
|
2019-12-11 13:38:25 +08:00
|
|
|
// and returns the number of tasks enqueued.
|
|
|
|
func (r *RDB) EnqueueAllRetryTasks() (int64, error) {
|
2019-12-22 23:15:45 +08:00
|
|
|
return r.removeAndEnqueueAll(base.RetryQueue)
|
2019-12-11 12:28:31 +08:00
|
|
|
}
|
|
|
|
|
2019-12-11 13:38:25 +08:00
|
|
|
// EnqueueAllDeadTasks enqueues all tasks from dead queue
|
|
|
|
// and returns the number of tasks enqueued.
|
|
|
|
func (r *RDB) EnqueueAllDeadTasks() (int64, error) {
|
2019-12-22 23:15:45 +08:00
|
|
|
return r.removeAndEnqueueAll(base.DeadQueue)
|
2019-12-11 12:28:31 +08:00
|
|
|
}
|
|
|
|
|
2019-12-08 22:46:04 +08:00
|
|
|
func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) {
|
|
|
|
script := redis.NewScript(`
|
|
|
|
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
|
|
|
|
for _, msg in ipairs(msgs) do
|
|
|
|
local decoded = cjson.decode(msg)
|
|
|
|
if decoded["ID"] == ARGV[2] then
|
|
|
|
redis.call("ZREM", KEYS[1], msg)
|
|
|
|
redis.call("LPUSH", KEYS[2], msg)
|
|
|
|
return 1
|
|
|
|
end
|
|
|
|
end
|
|
|
|
return 0
|
|
|
|
`)
|
2019-12-22 23:15:45 +08:00
|
|
|
res, err := script.Run(r.client, []string{zset, base.DefaultQueue}, score, id).Result()
|
2019-12-08 22:46:04 +08:00
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
n, ok := res.(int64)
|
|
|
|
if !ok {
|
|
|
|
return 0, fmt.Errorf("could not cast %v to int64", res)
|
|
|
|
}
|
|
|
|
return n, nil
|
|
|
|
}
|
2019-12-11 12:28:31 +08:00
|
|
|
|
2019-12-11 13:38:25 +08:00
|
|
|
func (r *RDB) removeAndEnqueueAll(zset string) (int64, error) {
|
2019-12-11 12:28:31 +08:00
|
|
|
script := redis.NewScript(`
|
|
|
|
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
|
|
|
|
for _, msg in ipairs(msgs) do
|
|
|
|
redis.call("ZREM", KEYS[1], msg)
|
|
|
|
redis.call("LPUSH", KEYS[2], msg)
|
|
|
|
end
|
|
|
|
return table.getn(msgs)
|
|
|
|
`)
|
2019-12-22 23:15:45 +08:00
|
|
|
res, err := script.Run(r.client, []string{zset, base.DefaultQueue}).Result()
|
2019-12-11 12:28:31 +08:00
|
|
|
if err != nil {
|
2019-12-11 13:38:25 +08:00
|
|
|
return 0, err
|
2019-12-11 12:28:31 +08:00
|
|
|
}
|
2019-12-11 13:38:25 +08:00
|
|
|
n, ok := res.(int64)
|
|
|
|
if !ok {
|
|
|
|
return 0, fmt.Errorf("could not cast %v to int64", res)
|
|
|
|
}
|
|
|
|
return n, nil
|
2019-12-11 12:28:31 +08:00
|
|
|
}
|
2019-12-12 11:56:19 +08:00
|
|
|
|
|
|
|
// DeleteDeadTask finds a task that matches the given id and score from dead queue
|
|
|
|
// and deletes it. If a task that matches the id and score does not exist,
|
|
|
|
// it returns ErrTaskNotFound.
|
|
|
|
func (r *RDB) DeleteDeadTask(id xid.ID, score int64) error {
|
2019-12-22 23:15:45 +08:00
|
|
|
return r.deleteTask(base.DeadQueue, id.String(), float64(score))
|
2019-12-12 11:56:19 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// DeleteRetryTask finds a task that matches the given id and score from retry queue
|
|
|
|
// and deletes it. If a task that matches the id and score does not exist,
|
|
|
|
// it returns ErrTaskNotFound.
|
|
|
|
func (r *RDB) DeleteRetryTask(id xid.ID, score int64) error {
|
2019-12-22 23:15:45 +08:00
|
|
|
return r.deleteTask(base.RetryQueue, id.String(), float64(score))
|
2019-12-12 11:56:19 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// DeleteScheduledTask finds a task that matches the given id and score from
|
|
|
|
// scheduled queue and deletes it. If a task that matches the id and score
|
|
|
|
//does not exist, it returns ErrTaskNotFound.
|
|
|
|
func (r *RDB) DeleteScheduledTask(id xid.ID, score int64) error {
|
2019-12-22 23:15:45 +08:00
|
|
|
return r.deleteTask(base.ScheduledQueue, id.String(), float64(score))
|
2019-12-12 11:56:19 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (r *RDB) deleteTask(zset, id string, score float64) error {
|
|
|
|
script := redis.NewScript(`
|
|
|
|
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
|
|
|
|
for _, msg in ipairs(msgs) do
|
|
|
|
local decoded = cjson.decode(msg)
|
|
|
|
if decoded["ID"] == ARGV[2] then
|
|
|
|
redis.call("ZREM", KEYS[1], msg)
|
|
|
|
return 1
|
|
|
|
end
|
|
|
|
end
|
|
|
|
return 0
|
|
|
|
`)
|
|
|
|
res, err := script.Run(r.client, []string{zset}, score, id).Result()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
n, ok := res.(int64)
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("could not cast %v to int64", res)
|
|
|
|
}
|
|
|
|
if n == 0 {
|
|
|
|
return ErrTaskNotFound
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
2019-12-12 22:38:01 +08:00
|
|
|
|
|
|
|
// DeleteAllDeadTasks deletes all tasks from the dead queue.
|
|
|
|
func (r *RDB) DeleteAllDeadTasks() error {
|
2019-12-22 23:15:45 +08:00
|
|
|
return r.client.Del(base.DeadQueue).Err()
|
2019-12-12 22:38:01 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// DeleteAllRetryTasks deletes all tasks from the dead queue.
|
|
|
|
func (r *RDB) DeleteAllRetryTasks() error {
|
2019-12-22 23:15:45 +08:00
|
|
|
return r.client.Del(base.RetryQueue).Err()
|
2019-12-12 22:38:01 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
// DeleteAllScheduledTasks deletes all tasks from the dead queue.
|
|
|
|
func (r *RDB) DeleteAllScheduledTasks() error {
|
2019-12-22 23:15:45 +08:00
|
|
|
return r.client.Del(base.ScheduledQueue).Err()
|
2019-12-12 22:38:01 +08:00
|
|
|
}
|