Update enqueue methods in RDB

This commit is contained in:
Ken Hibino
2020-08-13 06:54:32 -07:00
parent dbf140a767
commit 47e9ba4eba
3 changed files with 422 additions and 272 deletions

View File

@@ -5,8 +5,6 @@
package rdb
import (
"encoding/json"
"errors"
"fmt"
"strings"
"time"
@@ -57,8 +55,6 @@ type DailyStats struct {
Time time.Time
}
var ErrQueueNotFound = errors.New("rdb: queue does not exist")
// KEYS[1] -> asynq:<qname>
// KEYS[2] -> asynq:<qname>:in_progress
// KEYS[3] -> asynq:<qname>:scheduled
@@ -104,7 +100,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
return nil, err
}
if !exists {
return nil, ErrQueueNotFound
return nil, &ErrQueueNotFound{qname}
}
now := time.Now()
res, err := currentStatsCmd.Run(r.client, []string{
@@ -138,7 +134,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
stats.InProgress = val
case base.ScheduledKey(qname):
stats.Scheduled = val
case ase.RetryKey(qname):
case base.RetryKey(qname):
stats.Retry = val
case base.DeadKey(qname):
stats.Dead = val
@@ -181,7 +177,7 @@ func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) {
ts := now.Add(-time.Duration(i) * day)
days = append(days, ts)
keys = append(keys, base.ProcessedKey(qname, ts))
keys = append(keys, base.FailureKey(qname, ts))
keys = append(keys, base.FailedKey(qname, ts))
}
res, err := historicalStatsCmd.Run(r.client, keys).Result()
if err != nil {
@@ -247,11 +243,10 @@ func (p Pagination) stop() int64 {
// ListEnqueued returns enqueued tasks that are ready to be processed.
func (r *RDB) ListEnqueued(qname string, pgn Pagination) ([]*base.TaskMessage, error) {
qkey := base.QueueKey(qname)
if !r.client.SIsMember(base.AllQueues, qkey).Val() {
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listMessages(qkey, pgn)
return r.listMessages(base.QueueKey(qname), pgn)
}
// ListInProgress returns all tasks that are currently being processed for the given queue.
@@ -321,11 +316,11 @@ func (r *RDB) listZSetEntries(key string, pgn Pagination) ([]base.Z, error) {
return res, nil
}
// EnqueueDeadTask finds a task that matches the given id and score from dead queue
// and enqueues it for processing. If a task that matches the id and score
// does not exist, it returns ErrTaskNotFound.
func (r *RDB) EnqueueDeadTask(id uuid.UUID, score int64) error {
n, err := r.removeAndEnqueue(base.DeadQueue, id.String(), float64(score))
// EnqueueDeadTask finds a dead task that matches the given id and score from
// the given queue and enqueues it for processing.
//If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) EnqueueDeadTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndEnqueue(base.DeadKey(qname), base.QueueKey(qname), id.String(), float64(score))
if err != nil {
return err
}
@@ -335,11 +330,11 @@ func (r *RDB) EnqueueDeadTask(id uuid.UUID, score int64) error {
return nil
}
// EnqueueRetryTask finds a task that matches the given id and score from retry queue
// and enqueues it for processing. If a task that matches the id and score
// does not exist, it returns ErrTaskNotFound.
func (r *RDB) EnqueueRetryTask(id uuid.UUID, score int64) error {
n, err := r.removeAndEnqueue(base.RetryQueue, id.String(), float64(score))
// EnqueueRetryTask finds a retry task that matches the given id and score from
// the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) EnqueueRetryTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndEnqueue(base.RetryKey(qname), base.QueueKey(qname), id.String(), float64(score))
if err != nil {
return err
}
@@ -349,11 +344,11 @@ func (r *RDB) EnqueueRetryTask(id uuid.UUID, score int64) error {
return nil
}
// EnqueueScheduledTask finds a task that matches the given id and score from scheduled queue
// and enqueues it for processing. If a task that matches the id and score does not
// exist, it returns ErrTaskNotFound.
func (r *RDB) EnqueueScheduledTask(id uuid.UUID, score int64) error {
n, err := r.removeAndEnqueue(base.ScheduledQueue, id.String(), float64(score))
// EnqueueScheduledTask finds a scheduled task that matches the given id and score from
// from the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) EnqueueScheduledTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndEnqueue(base.ScheduledKey(qname), base.QueueKey(qname), id.String(), float64(score))
if err != nil {
return err
}
@@ -363,22 +358,22 @@ func (r *RDB) EnqueueScheduledTask(id uuid.UUID, score int64) error {
return nil
}
// EnqueueAllScheduledTasks enqueues all tasks from scheduled queue
// EnqueueAllScheduledTasks enqueues all scheduled tasks from the given queue
// and returns the number of tasks enqueued.
func (r *RDB) EnqueueAllScheduledTasks() (int64, error) {
return r.removeAndEnqueueAll(base.ScheduledQueue)
func (r *RDB) EnqueueAllScheduledTasks(qname string) (int64, error) {
return r.removeAndEnqueueAll(base.ScheduledKey(qname), base.QueueKey(qname))
}
// EnqueueAllRetryTasks enqueues all tasks from retry queue
// EnqueueAllRetryTasks enqueues all retry tasks from the given queue
// and returns the number of tasks enqueued.
func (r *RDB) EnqueueAllRetryTasks() (int64, error) {
return r.removeAndEnqueueAll(base.RetryQueue)
func (r *RDB) EnqueueAllRetryTasks(qname string) (int64, error) {
return r.removeAndEnqueueAll(base.RetryKey(qname), base.QueueKey(qname))
}
// EnqueueAllDeadTasks enqueues all tasks from dead queue
// and returns the number of tasks enqueued.
func (r *RDB) EnqueueAllDeadTasks() (int64, error) {
return r.removeAndEnqueueAll(base.DeadQueue)
func (r *RDB) EnqueueAllDeadTasks(qname string) (int64, error) {
return r.removeAndEnqueueAll(base.DeadKey(qname), base.QueueKey(qname))
}
var removeAndEnqueueCmd = redis.NewScript(`
@@ -386,16 +381,15 @@ local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if decoded["ID"] == ARGV[2] then
local qkey = ARGV[3] .. decoded["Queue"]
redis.call("LPUSH", qkey, msg)
redis.call("LPUSH", KEYS[2], msg)
redis.call("ZREM", KEYS[1], msg)
return 1
end
end
return 0`)
func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) {
res, err := removeAndEnqueueCmd.Run(r.client, []string{zset}, score, id, base.QueuePrefix).Result()
func (r *RDB) removeAndEnqueue(zset, qkey, id string, score float64) (int64, error) {
res, err := removeAndEnqueueCmd.Run(r.client, []string{zset, qkey}, score, id).Result()
if err != nil {
return 0, err
}
@@ -409,15 +403,13 @@ func (r *RDB) removeAndEnqueue(zset, id string, score float64) (int64, error) {
var removeAndEnqueueAllCmd = redis.NewScript(`
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
local qkey = ARGV[1] .. decoded["Queue"]
redis.call("LPUSH", qkey, msg)
redis.call("LPUSH", KEYS[2], msg)
redis.call("ZREM", KEYS[1], msg)
end
return table.getn(msgs)`)
func (r *RDB) removeAndEnqueueAll(zset string) (int64, error) {
res, err := removeAndEnqueueAllCmd.Run(r.client, []string{zset}, base.QueuePrefix).Result()
func (r *RDB) removeAndEnqueueAll(zset, qkey string) (int64, error) {
res, err := removeAndEnqueueAllCmd.Run(r.client, []string{zset, qkey}).Result()
if err != nil {
return 0, err
}
@@ -428,6 +420,7 @@ func (r *RDB) removeAndEnqueueAll(zset string) (int64, error) {
return n, nil
}
/*
// KillRetryTask finds a task that matches the given id and score from retry queue
// and moves it to dead queue. If a task that maches the id and score does not exist,
// it returns ErrTaskNotFound.
@@ -617,6 +610,7 @@ func (r *RDB) deleteAll(key string) (int64, error) {
}
return n, nil
}
*/
// ErrQueueNotFound indicates specified queue does not exist.
type ErrQueueNotFound struct {
@@ -636,6 +630,7 @@ func (e *ErrQueueNotEmpty) Error() string {
return fmt.Sprintf("queue %q is not empty", e.qname)
}
/*
// Skip checking whether queue is empty before removing.
var removeQueueForceCmd = redis.NewScript(`
local n = redis.call("SREM", KEYS[1], KEYS[2])
@@ -695,7 +690,7 @@ for _, key in ipairs(keys) do
local s = redis.call("GET", key)
if s then
table.insert(res, s)
end
end
end
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
return res`)
@@ -732,7 +727,7 @@ for _, key in ipairs(keys) do
local workers = redis.call("HVALS", key)
for _, w in ipairs(workers) do
table.insert(res, w)
end
end
end
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
return res`)
@@ -758,15 +753,16 @@ func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
}
return workers, nil
}
*/
// Pause pauses processing of tasks from the given queue.
func (r *RDB) Pause(qname string) error {
key := base.PauseKey(qname)
exists, err := r.client.SetNX(key, time.Now().Unix(), 0).Result()
key := base.PausedKey(qname)
ok, err := r.client.SetNX(key, time.Now().Unix(), 0).Result()
if err != nil {
return err
}
if exists {
if !ok {
return fmt.Errorf("queue %q is already paused", qname)
}
return nil
@@ -774,7 +770,7 @@ func (r *RDB) Pause(qname string) error {
// Unpause resumes processing of tasks from the given queue.
func (r *RDB) Unpause(qname string) error {
key := base.PauseKey(qname)
key := base.PausedKey(qname)
deleted, err := r.client.Del(key).Result()
if err != nil {
return err