2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-08-19 15:08:55 +08:00

Refactor redis keys and store messages in protobuf

Changes:
- Task messages are stored under "asynq:{<qname>}:t:<task_id>" key in redis, value is a HASH type and message are stored under "msg" key in the hash. The hash also stores "deadline", "timeout".
- Redis LIST and ZSET stores task message IDs
- Task messages are serialized using protocol buffer
This commit is contained in:
Ken Hibino
2021-03-12 16:23:08 -08:00
parent 2516c4baba
commit 7af3981929
22 changed files with 2534 additions and 668 deletions

View File

@@ -259,8 +259,8 @@ func BenchmarkCheckAndEnqueue(b *testing.B) {
asynqtest.SeedScheduledQueue(b, r.client, zs, base.DefaultQueueName)
b.StartTimer()
if err := r.CheckAndEnqueue(base.DefaultQueueName); err != nil {
b.Fatalf("CheckAndEnqueue failed: %v", err)
if err := r.ForwardIfReady(base.DefaultQueueName); err != nil {
b.Fatalf("ForwardIfReady failed: %v", err)
}
}
}

View File

@@ -5,7 +5,6 @@
package rdb
import (
"encoding/json"
"fmt"
"strings"
"time"
@@ -110,7 +109,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
}
now := time.Now()
res, err := currentStatsCmd.Run(r.client, []string{
base.QueueKey(qname),
base.PendingKey(qname),
base.ActiveKey(qname),
base.ScheduledKey(qname),
base.RetryKey(qname),
@@ -135,7 +134,7 @@ func (r *RDB) CurrentStats(qname string) (*Stats, error) {
key := cast.ToString(data[i])
val := cast.ToInt(data[i+1])
switch key {
case base.QueueKey(qname):
case base.PendingKey(qname):
stats.Pending = val
size += val
case base.ActiveKey(qname):
@@ -312,7 +311,7 @@ func (r *RDB) ListPending(qname string, pgn Pagination) ([]*base.TaskMessage, er
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listMessages(base.QueueKey(qname), pgn)
return r.listMessages(base.PendingKey(qname), qname, pgn)
}
// ListActive returns all tasks that are currently being processed for the given queue.
@@ -320,23 +319,42 @@ func (r *RDB) ListActive(qname string, pgn Pagination) ([]*base.TaskMessage, err
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listMessages(base.ActiveKey(qname), pgn)
return r.listMessages(base.ActiveKey(qname), qname, pgn)
}
// KEYS[1] -> key for id list (e.g. asynq:{<qname>}:pending)
// ARGV[1] -> start offset
// ARGV[2] -> stop offset
// ARGV[3] -> task key prefix
var listMessagesCmd = redis.NewScript(`
local ids = redis.call("LRange", KEYS[1], ARGV[1], ARGV[2])
local res = {}
for _, id in ipairs(ids) do
local key = ARGV[3] .. id
table.insert(res, redis.call("HGET", key, "msg"))
end
return res
`)
// listMessages returns a list of TaskMessage in Redis list with the given key.
func (r *RDB) listMessages(key string, pgn Pagination) ([]*base.TaskMessage, error) {
func (r *RDB) listMessages(key, qname string, pgn Pagination) ([]*base.TaskMessage, error) {
// Note: Because we use LPUSH to redis list, we need to calculate the
// correct range and reverse the list to get the tasks with pagination.
stop := -pgn.start() - 1
start := -pgn.stop() - 1
data, err := r.client.LRange(key, start, stop).Result()
res, err := listMessagesCmd.Run(r.client,
[]string{key}, start, stop, base.TaskKeyPrefix(qname)).Result()
if err != nil {
return nil, err
}
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
reverse(data)
var msgs []*base.TaskMessage
for _, s := range data {
m, err := base.DecodeMessage(s)
m, err := base.DecodeMessage([]byte(s))
if err != nil {
continue // bad data, ignore and continue
}
@@ -352,7 +370,7 @@ func (r *RDB) ListScheduled(qname string, pgn Pagination) ([]base.Z, error) {
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listZSetEntries(base.ScheduledKey(qname), pgn)
return r.listZSetEntries(base.ScheduledKey(qname), qname, pgn)
}
// ListRetry returns all tasks from the given queue that have failed before
@@ -361,7 +379,7 @@ func (r *RDB) ListRetry(qname string, pgn Pagination) ([]base.Z, error) {
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listZSetEntries(base.RetryKey(qname), pgn)
return r.listZSetEntries(base.RetryKey(qname), qname, pgn)
}
// ListArchived returns all tasks from the given queue that have exhausted its retry limit.
@@ -369,36 +387,63 @@ func (r *RDB) ListArchived(qname string, pgn Pagination) ([]base.Z, error) {
if !r.client.SIsMember(base.AllQueues, qname).Val() {
return nil, fmt.Errorf("queue %q does not exist", qname)
}
return r.listZSetEntries(base.ArchivedKey(qname), pgn)
return r.listZSetEntries(base.ArchivedKey(qname), qname, pgn)
}
// KEYS[1] -> key for ids set (e.g. asynq:{<qname>}:scheduled)
// ARGV[1] -> min
// ARGV[2] -> max
// ARGV[3] -> task key prefix
//
// Returns an array populated with
// [msg1, score1, msg2, score2, ..., msgN, scoreN]
var listZSetEntriesCmd = redis.NewScript(`
local res = {}
local id_score_pairs = redis.call("ZRANGE", KEYS[1], ARGV[1], ARGV[2], "WITHSCORES")
for i = 1, table.getn(id_score_pairs), 2 do
local key = ARGV[3] .. id_score_pairs[i]
table.insert(res, redis.call("HGET", key, "msg"))
table.insert(res, id_score_pairs[i+1])
end
return res
`)
// listZSetEntries returns a list of message and score pairs in Redis sorted-set
// with the given key.
func (r *RDB) listZSetEntries(key string, pgn Pagination) ([]base.Z, error) {
data, err := r.client.ZRangeWithScores(key, pgn.start(), pgn.stop()).Result()
func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]base.Z, error) {
res, err := listZSetEntriesCmd.Run(r.client, []string{key},
pgn.start(), pgn.stop(), base.TaskKeyPrefix(qname)).Result()
if err != nil {
return nil, err
}
var res []base.Z
for _, z := range data {
s, ok := z.Member.(string)
if !ok {
continue // bad data, ignore and continue
data, err := cast.ToSliceE(res)
if err != nil {
return nil, err
}
var zs []base.Z
for i := 0; i < len(data); i += 2 {
s, err := cast.ToStringE(data[i])
if err != nil {
return nil, err
}
msg, err := base.DecodeMessage(s)
score, err := cast.ToInt64E(data[i+1])
if err != nil {
return nil, err
}
msg, err := base.DecodeMessage([]byte(s))
if err != nil {
continue // bad data, ignore and continue
}
res = append(res, base.Z{Message: msg, Score: int64(z.Score)})
zs = append(zs, base.Z{Message: msg, Score: score})
}
return res, nil
return zs, nil
}
// RunArchivedTask finds an archived task that matches the given id and score from
// the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndRun(base.ArchivedKey(qname), base.QueueKey(qname), id.String(), float64(score))
func (r *RDB) RunArchivedTask(qname string, id uuid.UUID) error {
n, err := r.removeAndRun(base.ArchivedKey(qname), base.PendingKey(qname), id.String())
if err != nil {
return err
}
@@ -411,8 +456,8 @@ func (r *RDB) RunArchivedTask(qname string, id uuid.UUID, score int64) error {
// RunRetryTask finds a retry task that matches the given id and score from
// the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndRun(base.RetryKey(qname), base.QueueKey(qname), id.String(), float64(score))
func (r *RDB) RunRetryTask(qname string, id uuid.UUID) error {
n, err := r.removeAndRun(base.RetryKey(qname), base.PendingKey(qname), id.String())
if err != nil {
return err
}
@@ -425,8 +470,8 @@ func (r *RDB) RunRetryTask(qname string, id uuid.UUID, score int64) error {
// RunScheduledTask finds a scheduled task that matches the given id and score from
// from the given queue and enqueues it for processing.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) RunScheduledTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndRun(base.ScheduledKey(qname), base.QueueKey(qname), id.String(), float64(score))
func (r *RDB) RunScheduledTask(qname string, id uuid.UUID) error {
n, err := r.removeAndRun(base.ScheduledKey(qname), base.PendingKey(qname), id.String())
if err != nil {
return err
}
@@ -439,35 +484,35 @@ func (r *RDB) RunScheduledTask(qname string, id uuid.UUID, score int64) error {
// RunAllScheduledTasks enqueues all scheduled tasks from the given queue
// and returns the number of tasks enqueued.
func (r *RDB) RunAllScheduledTasks(qname string) (int64, error) {
return r.removeAndRunAll(base.ScheduledKey(qname), base.QueueKey(qname))
return r.removeAndRunAll(base.ScheduledKey(qname), base.PendingKey(qname))
}
// RunAllRetryTasks enqueues all retry tasks from the given queue
// and returns the number of tasks enqueued.
func (r *RDB) RunAllRetryTasks(qname string) (int64, error) {
return r.removeAndRunAll(base.RetryKey(qname), base.QueueKey(qname))
return r.removeAndRunAll(base.RetryKey(qname), base.PendingKey(qname))
}
// RunAllArchivedTasks enqueues all archived tasks from the given queue
// and returns the number of tasks enqueued.
func (r *RDB) RunAllArchivedTasks(qname string) (int64, error) {
return r.removeAndRunAll(base.ArchivedKey(qname), base.QueueKey(qname))
return r.removeAndRunAll(base.ArchivedKey(qname), base.PendingKey(qname))
}
// KEYS[1] -> sorted set to remove the id from
// KEYS[2] -> asynq:{<qname>}:pending
// ARGV[1] -> task ID
var removeAndRunCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if decoded["ID"] == ARGV[2] then
redis.call("LPUSH", KEYS[2], msg)
redis.call("ZREM", KEYS[1], msg)
return 1
end
local n = redis.call("ZREM", KEYS[1], ARGV[1])
if n == 0 then
return 0
end
return 0`)
redis.call("LPUSH", KEYS[2], ARGV[1])
return 1
`)
func (r *RDB) removeAndRun(zset, qkey, id string, score float64) (int64, error) {
res, err := removeAndRunCmd.Run(r.client, []string{zset, qkey}, score, id).Result()
func (r *RDB) removeAndRun(zset, qkey, id string) (int64, error) {
res, err := removeAndRunCmd.Run(r.client, []string{zset, qkey}, id).Result()
if err != nil {
return 0, err
}
@@ -479,12 +524,12 @@ func (r *RDB) removeAndRun(zset, qkey, id string, score float64) (int64, error)
}
var removeAndRunAllCmd = redis.NewScript(`
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
redis.call("LPUSH", KEYS[2], msg)
redis.call("ZREM", KEYS[1], msg)
local ids = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
redis.call("LPUSH", KEYS[2], id)
redis.call("ZREM", KEYS[1], id)
end
return table.getn(msgs)`)
return table.getn(ids)`)
func (r *RDB) removeAndRunAll(zset, qkey string) (int64, error) {
res, err := removeAndRunAllCmd.Run(r.client, []string{zset, qkey}).Result()
@@ -498,10 +543,11 @@ func (r *RDB) removeAndRunAll(zset, qkey string) (int64, error) {
return n, nil
}
// ArchiveRetryTask finds a retry task that matches the given id and score from the given queue
// and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndArchive(base.RetryKey(qname), base.ArchivedKey(qname), id.String(), float64(score))
// ArchiveRetryTask finds a retry task that matches the given id
// from the given queue and archives it.
// If there's no match, it returns ErrTaskNotFound.
func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID) error {
n, err := r.removeAndArchive(base.RetryKey(qname), base.ArchivedKey(qname), id.String())
if err != nil {
return err
}
@@ -511,10 +557,11 @@ func (r *RDB) ArchiveRetryTask(qname string, id uuid.UUID, score int64) error {
return nil
}
// ArchiveScheduledTask finds a scheduled task that matches the given id and score from the given queue
// and archives it. If a task that maches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID, score int64) error {
n, err := r.removeAndArchive(base.ScheduledKey(qname), base.ArchivedKey(qname), id.String(), float64(score))
// ArchiveScheduledTask finds a scheduled task that matches the given id
// from the given queue and archives it.
// If there's no match, it returns ErrTaskNotFound.
func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID) error {
n, err := r.removeAndArchive(base.ScheduledKey(qname), base.ArchivedKey(qname), id.String())
if err != nil {
return err
}
@@ -526,13 +573,12 @@ func (r *RDB) ArchiveScheduledTask(qname string, id uuid.UUID, score int64) erro
// KEYS[1] -> asynq:{<qname>}
// KEYS[2] -> asynq:{<qname>}:archived
// ARGV[1] -> task message to archive
// ARGV[1] -> ID of the task to archive
// ARGV[2] -> current timestamp
// ARGV[3] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[4] -> max number of tasks in archive (e.g., 100)
var archivePendingCmd = redis.NewScript(`
local x = redis.call("LREM", KEYS[1], 1, ARGV[1])
if x == 0 then
if redis.call("LREM", KEYS[1], 1, ARGV[1]) == 0 then
return 0
end
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[1])
@@ -541,47 +587,33 @@ redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[4])
return 1
`)
func (r *RDB) archivePending(qname, msg string) (int64, error) {
keys := []string{base.QueueKey(qname), base.ArchivedKey(qname)}
now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
args := []interface{}{msg, now.Unix(), limit, maxArchiveSize}
res, err := archivePendingCmd.Run(r.client, keys, args...).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
}
return n, nil
}
// ArchivePendingTask finds a pending task that matches the given id from the given queue
// and archives it. If a task that maches the id does not exist, it returns ErrTaskNotFound.
// ArchivePendingTask finds a pending task that matches the given id
// from the given queue and archives it.
// If there's no match, it returns ErrTaskNotFound.
func (r *RDB) ArchivePendingTask(qname string, id uuid.UUID) error {
qkey := base.QueueKey(qname)
data, err := r.client.LRange(qkey, 0, -1).Result()
keys := []string{
base.PendingKey(qname),
base.ArchivedKey(qname),
}
now := time.Now()
argv := []interface{}{
id.String(),
now.Unix(),
now.AddDate(0, 0, -archivedExpirationInDays).Unix(),
maxArchiveSize,
}
res, err := archivePendingCmd.Run(r.client, keys, argv...).Result()
if err != nil {
return err
}
for _, s := range data {
msg, err := base.DecodeMessage(s)
if err != nil {
return err
}
if msg.ID == id {
n, err := r.archivePending(qname, s)
if err != nil {
return err
}
if n == 0 {
return ErrTaskNotFound
}
return nil
}
n, ok := res.(int64)
if !ok {
return fmt.Errorf("command error: unexpected return value %v", res)
}
return ErrTaskNotFound
if n == 0 {
return ErrTaskNotFound
}
return nil
}
// ArchiveAllRetryTasks archives all retry tasks from the given queue and
@@ -596,66 +628,64 @@ func (r *RDB) ArchiveAllScheduledTasks(qname string) (int64, error) {
return r.removeAndArchiveAll(base.ScheduledKey(qname), base.ArchivedKey(qname))
}
// KEYS[1] -> asynq:{<qname>}
// KEYS[1] -> asynq:{<qname>}:pending
// KEYS[2] -> asynq:{<qname>}:archived
// ARGV[1] -> current timestamp
// ARGV[2] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[3] -> max number of tasks in archive (e.g., 100)
var archiveAllPendingCmd = redis.NewScript(`
local msgs = redis.call("LRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
redis.call("ZADD", KEYS[2], ARGV[1], msg)
local ids = redis.call("LRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
redis.call("ZADD", KEYS[2], ARGV[1], id)
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3])
end
redis.call("DEL", KEYS[1])
return table.getn(msgs)`)
return table.getn(ids)`)
// ArchiveAllPendingTasks archives all pending tasks from the given queue and
// returns the number of tasks that were moved.
// returns the number of tasks moved.
func (r *RDB) ArchiveAllPendingTasks(qname string) (int64, error) {
keys := []string{base.QueueKey(qname), base.ArchivedKey(qname)}
keys := []string{base.PendingKey(qname), base.ArchivedKey(qname)}
now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
args := []interface{}{now.Unix(), limit, maxArchiveSize}
res, err := archiveAllPendingCmd.Run(r.client, keys, args...).Result()
argv := []interface{}{
now.Unix(),
now.AddDate(0, 0, -archivedExpirationInDays).Unix(),
maxArchiveSize,
}
res, err := archiveAllPendingCmd.Run(r.client, keys, argv...).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
return 0, fmt.Errorf("command error: unexpected return value %v", res)
}
return n, nil
}
// KEYS[1] -> ZSET to move task from (e.g., retry queue)
// KEYS[2] -> asynq:{<qname>}:archived
// ARGV[1] -> score of the task to archive
// ARGV[2] -> id of the task to archive
// ARGV[3] -> current timestamp
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[5] -> max number of tasks in archived state (e.g., 100)
// ARGV[1] -> id of the task to archive
// ARGV[2] -> current timestamp
// ARGV[3] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[4] -> max number of tasks in archived state (e.g., 100)
var removeAndArchiveCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if decoded["ID"] == ARGV[2] then
redis.call("ZREM", KEYS[1], msg)
redis.call("ZADD", KEYS[2], ARGV[3], msg)
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5])
return 1
end
if redis.call("ZREM", KEYS[1], ARGV[1]) == 0 then
return 0
end
return 0`)
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[1])
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[3])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[4])
return 1
`)
func (r *RDB) removeAndArchive(src, dst, id string, score float64) (int64, error) {
func (r *RDB) removeAndArchive(src, dst, id string) (int64, error) {
now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
res, err := removeAndArchiveCmd.Run(r.client,
[]string{src, dst},
score, id, now.Unix(), limit, maxArchiveSize).Result()
id, now.Unix(), limit, maxArchiveSize).Result()
if err != nil {
return 0, err
}
@@ -666,108 +696,106 @@ func (r *RDB) removeAndArchive(src, dst, id string, score float64) (int64, error
return n, nil
}
// KEYS[1] -> ZSET to move task from (e.g., retry queue)
// KEYS[1] -> ZSET to move task from (e.g., asynq:{<qname>}:retry)
// KEYS[2] -> asynq:{<qname>}:archived
// ARGV[1] -> current timestamp
// ARGV[2] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[3] -> max number of tasks in archive (e.g., 100)
var removeAndArchiveAllCmd = redis.NewScript(`
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
redis.call("ZADD", KEYS[2], ARGV[1], msg)
redis.call("ZREM", KEYS[1], msg)
local ids = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
redis.call("ZADD", KEYS[2], ARGV[1], id)
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[2])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[3])
end
return table.getn(msgs)`)
redis.call("DEL", KEYS[1])
return table.getn(ids)`)
func (r *RDB) removeAndArchiveAll(src, dst string) (int64, error) {
now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
res, err := removeAndArchiveAllCmd.Run(r.client, []string{src, dst},
now.Unix(), limit, maxArchiveSize).Result()
argv := []interface{}{
now.Unix(),
now.AddDate(0, 0, -archivedExpirationInDays).Unix(),
maxArchiveSize,
}
res, err := removeAndArchiveAllCmd.Run(r.client,
[]string{src, dst}, argv...).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
return 0, fmt.Errorf("command error: unexpected return value %v", res)
}
return n, nil
}
// DeleteArchivedTask deletes an archived task that matches the given id and score from the given queue.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) DeleteArchivedTask(qname string, id uuid.UUID, score int64) error {
return r.deleteTask(base.ArchivedKey(qname), id.String(), float64(score))
func (r *RDB) DeleteArchivedTask(qname string, id uuid.UUID) error {
return r.deleteTask(base.ArchivedKey(qname), qname, id.String())
}
// DeleteRetryTask deletes a retry task that matches the given id and score from the given queue.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) DeleteRetryTask(qname string, id uuid.UUID, score int64) error {
return r.deleteTask(base.RetryKey(qname), id.String(), float64(score))
func (r *RDB) DeleteRetryTask(qname string, id uuid.UUID) error {
return r.deleteTask(base.RetryKey(qname), qname, id.String())
}
// DeleteScheduledTask deletes a scheduled task that matches the given id and score from the given queue.
// If a task that matches the id and score does not exist, it returns ErrTaskNotFound.
func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID, score int64) error {
return r.deleteTask(base.ScheduledKey(qname), id.String(), float64(score))
func (r *RDB) DeleteScheduledTask(qname string, id uuid.UUID) error {
return r.deleteTask(base.ScheduledKey(qname), qname, id.String())
}
// KEYS[1] -> asynq:{<qname>}:pending
// KEYS[2] -> asynq:{<qname>}:t:<task_id>
// ARGV[1] -> task ID
var deletePendingTaskCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
return 0
end
return redis.call("DEL", KEYS[2])
`)
// DeletePendingTask deletes a pending tasks that matches the given id from the given queue.
// If a task that matches the id does not exist, it returns ErrTaskNotFound.
// If there's no match, it returns ErrTaskNotFound.
func (r *RDB) DeletePendingTask(qname string, id uuid.UUID) error {
qkey := base.QueueKey(qname)
data, err := r.client.LRange(qkey, 0, -1).Result()
if err != nil {
return err
}
for _, s := range data {
msg, err := base.DecodeMessage(s)
if err != nil {
return err
}
if msg.ID == id {
n, err := r.client.LRem(qkey, 1, s).Result()
if err != nil {
return err
}
if n == 0 {
return ErrTaskNotFound
}
if r.client.Get(msg.UniqueKey).Val() == msg.ID.String() {
if err := r.client.Del(msg.UniqueKey).Err(); err != nil {
return err
}
}
return nil
}
}
return ErrTaskNotFound
}
var deleteTaskCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], ARGV[1], ARGV[1])
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if decoded["ID"] == ARGV[2] then
redis.call("ZREM", KEYS[1], msg)
if redis.call("GET", decoded["UniqueKey"]) == ARGV[2] then
redis.call("DEL", decoded["UniqueKey"])
end
return 1
end
end
return 0`)
func (r *RDB) deleteTask(key, id string, score float64) error {
res, err := deleteTaskCmd.Run(r.client, []string{key}, score, id).Result()
keys := []string{base.PendingKey(qname), base.TaskKey(qname, id.String())}
res, err := deletePendingTaskCmd.Run(r.client, keys, id.String()).Result()
if err != nil {
return err
}
n, ok := res.(int64)
if !ok {
return fmt.Errorf("could not cast %v to int64", res)
return fmt.Errorf("command error: unexpected return value %v", res)
}
if n == 0 {
return ErrTaskNotFound
}
return nil
}
// KEYS[1] -> ZSET key to remove the task from (e.g. asynq:{<qname>}:retry)
// KEYS[2] -> asynq:{<qname>}:t:<task_id>
// ARGV[1] -> task ID
var deleteTaskCmd = redis.NewScript(`
if redis.call("ZREM", KEYS[1], ARGV[1]) == 0 then
return 0
end
return redis.call("DEL", KEYS[2])
`)
func (r *RDB) deleteTask(key, qname, id string) error {
keys := []string{key, base.TaskKey(qname, id)}
argv := []interface{}{id}
res, err := deleteTaskCmd.Run(r.client, keys, argv...).Result()
if err != nil {
return err
}
n, ok := res.(int64)
if !ok {
return fmt.Errorf("command error: unexpected return value %v", res)
}
if n == 0 {
return ErrTaskNotFound
@@ -776,37 +804,36 @@ func (r *RDB) deleteTask(key, id string, score float64) error {
}
// KEYS[1] -> queue to delete
// ARGV[1] -> task key prefix
var deleteAllCmd = redis.NewScript(`
local msgs = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if redis.call("GET", decoded["UniqueKey"]) == decoded["ID"] then
redis.call("DEL", decoded["UniqueKey"])
end
local ids = redis.call("ZRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
local key = ARGV[1] .. id
redis.call("DEL", key)
end
redis.call("DEL", KEYS[1])
return table.getn(msgs)`)
return table.getn(ids)`)
// DeleteAllArchivedTasks deletes all archived tasks from the given queue
// and returns the number of tasks deleted.
func (r *RDB) DeleteAllArchivedTasks(qname string) (int64, error) {
return r.deleteAll(base.ArchivedKey(qname))
return r.deleteAll(base.ArchivedKey(qname), qname)
}
// DeleteAllRetryTasks deletes all retry tasks from the given queue
// and returns the number of tasks deleted.
func (r *RDB) DeleteAllRetryTasks(qname string) (int64, error) {
return r.deleteAll(base.RetryKey(qname))
return r.deleteAll(base.RetryKey(qname), qname)
}
// DeleteAllScheduledTasks deletes all scheduled tasks from the given queue
// and returns the number of tasks deleted.
func (r *RDB) DeleteAllScheduledTasks(qname string) (int64, error) {
return r.deleteAll(base.ScheduledKey(qname))
return r.deleteAll(base.ScheduledKey(qname), qname)
}
func (r *RDB) deleteAll(key string) (int64, error) {
res, err := deleteAllCmd.Run(r.client, []string{key}).Result()
func (r *RDB) deleteAll(key, qname string) (int64, error) {
res, err := deleteAllCmd.Run(r.client, []string{key}, base.TaskKeyPrefix(qname)).Result()
if err != nil {
return 0, err
}
@@ -817,28 +844,28 @@ func (r *RDB) deleteAll(key string) (int64, error) {
return n, nil
}
// KEYS[1] -> asynq:{<qname>}
// KEYS[1] -> asynq:{<qname>}:pending
// ARGV[1] -> task key prefix
var deleteAllPendingCmd = redis.NewScript(`
local msgs = redis.call("LRANGE", KEYS[1], 0, -1)
for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg)
if redis.call("GET", decoded["UniqueKey"]) == decoded["ID"] then
redis.call("DEL", decoded["UniqueKey"])
end
local ids = redis.call("LRANGE", KEYS[1], 0, -1)
for _, id in ipairs(ids) do
local key = ARGV[1] .. id
redis.call("DEL", key)
end
redis.call("DEL", KEYS[1])
return table.getn(msgs)`)
return table.getn(ids)`)
// DeleteAllPendingTasks deletes all pending tasks from the given queue
// and returns the number of tasks deleted.
func (r *RDB) DeleteAllPendingTasks(qname string) (int64, error) {
res, err := deleteAllPendingCmd.Run(r.client, []string{base.QueueKey(qname)}).Result()
res, err := deleteAllPendingCmd.Run(r.client,
[]string{base.PendingKey(qname)}, base.TaskKeyPrefix(qname)).Result()
if err != nil {
return 0, err
}
n, ok := res.(int64)
if !ok {
return 0, fmt.Errorf("could not cast %v to int64", res)
return 0, fmt.Errorf("command error: unexpected return value %v", res)
}
return n, nil
}
@@ -868,11 +895,27 @@ func (e *ErrQueueNotEmpty) Error() string {
// KEYS[4] -> asynq:{<qname>}:retry
// KEYS[5] -> asynq:{<qname>}:archived
// KEYS[6] -> asynq:{<qname>}:deadlines
// ARGV[1] -> task key prefix
var removeQueueForceCmd = redis.NewScript(`
local active = redis.call("LLEN", KEYS[2])
if active > 0 then
return redis.error_reply("Queue has tasks active")
end
for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do
redis.call("DEL", ARGV[1] .. id)
end
redis.call("DEL", KEYS[1])
redis.call("DEL", KEYS[2])
redis.call("DEL", KEYS[3])
@@ -882,22 +925,36 @@ redis.call("DEL", KEYS[6])
return redis.status_reply("OK")`)
// Checks whether queue is empty before removing.
// KEYS[1] -> asynq:{<qname>}
// KEYS[1] -> asynq:{<qname>}:pending
// KEYS[2] -> asynq:{<qname>}:active
// KEYS[3] -> asynq:{<qname>}:scheduled
// KEYS[4] -> asynq:{<qname>}:retry
// KEYS[5] -> asynq:{<qname>}:archived
// KEYS[6] -> asynq:{<qname>}:deadlines
// ARGV[1] -> task key prefix
var removeQueueCmd = redis.NewScript(`
local pending = redis.call("LLEN", KEYS[1])
local active = redis.call("LLEN", KEYS[2])
local scheduled = redis.call("SCARD", KEYS[3])
local retry = redis.call("SCARD", KEYS[4])
local archived = redis.call("SCARD", KEYS[5])
local total = pending + active + scheduled + retry + archived
if total > 0 then
local ids = {}
for _, id in ipairs(redis.call("LRANGE", KEYS[1], 0, -1)) do
table.insert(ids, id)
end
for _, id in ipairs(redis.call("LRANGE", KEYS[2], 0, -1)) do
table.insert(ids, id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[3], 0, -1)) do
table.insert(ids, id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[4], 0, -1)) do
table.insert(ids, id)
end
for _, id in ipairs(redis.call("ZRANGE", KEYS[5], 0, -1)) do
table.insert(ids, id)
end
if table.getn(ids) > 0 then
return redis.error_reply("QUEUE NOT EMPTY")
end
for _, id in ipairs(ids) do
redis.call("DEL", ARGV[1] .. id)
end
redis.call("DEL", KEYS[1])
redis.call("DEL", KEYS[2])
redis.call("DEL", KEYS[3])
@@ -927,14 +984,14 @@ func (r *RDB) RemoveQueue(qname string, force bool) error {
script = removeQueueCmd
}
keys := []string{
base.QueueKey(qname),
base.PendingKey(qname),
base.ActiveKey(qname),
base.ScheduledKey(qname),
base.RetryKey(qname),
base.ArchivedKey(qname),
base.DeadlinesKey(qname),
}
if err := script.Run(r.client, keys).Err(); err != nil {
if err := script.Run(r.client, keys, base.TaskKeyPrefix(qname)).Err(); err != nil {
if err.Error() == "QUEUE NOT EMPTY" {
return &ErrQueueNotEmpty{qname}
}
@@ -967,46 +1024,47 @@ func (r *RDB) ListServers() ([]*base.ServerInfo, error) {
if err != nil {
continue // skip bad data
}
var info base.ServerInfo
if err := json.Unmarshal([]byte(data), &info); err != nil {
info, err := base.DecodeServerInfo([]byte(data))
if err != nil {
continue // skip bad data
}
servers = append(servers, &info)
servers = append(servers, info)
}
return servers, nil
}
// Note: Script also removes stale keys.
var listWorkerKeysCmd = redis.NewScript(`
var listWorkersCmd = redis.NewScript(`
local now = tonumber(ARGV[1])
local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
return keys`)
local res = {}
for _, key in ipairs(keys) do
local vals = redis.call("HVALS", key)
for _, v in ipairs(vals) do
table.insert(res, v)
end
end
return res`)
// ListWorkers returns the list of worker stats.
func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
now := time.Now()
res, err := listWorkerKeysCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result()
res, err := listWorkersCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result()
if err != nil {
return nil, err
}
keys, err := cast.ToStringSliceE(res)
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
var workers []*base.WorkerInfo
for _, key := range keys {
data, err := r.client.HVals(key).Result()
for _, s := range data {
w, err := base.DecodeWorkerInfo([]byte(s))
if err != nil {
continue // skip bad data
}
for _, s := range data {
var w base.WorkerInfo
if err := json.Unmarshal([]byte(s), &w); err != nil {
continue // skip bad data
}
workers = append(workers, &w)
}
workers = append(workers, w)
}
return workers, nil
}
@@ -1036,11 +1094,11 @@ func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) {
continue // skip bad data
}
for _, s := range data {
var e base.SchedulerEntry
if err := json.Unmarshal([]byte(s), &e); err != nil {
e, err := base.DecodeSchedulerEntry([]byte(s))
if err != nil {
continue // skip bad data
}
entries = append(entries, &e)
entries = append(entries, e)
}
}
return entries, nil
@@ -1059,11 +1117,11 @@ func (r *RDB) ListSchedulerEnqueueEvents(entryID string, pgn Pagination) ([]*bas
if err != nil {
return nil, err
}
var e base.SchedulerEnqueueEvent
if err := json.Unmarshal([]byte(data), &e); err != nil {
e, err := base.DecodeSchedulerEnqueueEvent([]byte(data))
if err != nil {
return nil, err
}
events = append(events, &e)
events = append(events, e)
}
return events, nil
}
@@ -1096,7 +1154,7 @@ func (r *RDB) Unpause(qname string) error {
// ClusterKeySlot returns an integer identifying the hash slot the given queue hashes to.
func (r *RDB) ClusterKeySlot(qname string) (int64, error) {
key := base.QueueKey(qname)
key := base.PendingKey(qname)
return r.client.ClusterKeySlot(key).Result()
}

View File

@@ -386,7 +386,7 @@ func TestListPendingPagination(t *testing.T) {
msgs = []*base.TaskMessage(nil) // empty list
for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("custom %d", i), nil)
msg := h.NewTaskMessageWithQueue(fmt.Sprintf("custom %d", i), nil, "custom")
msgs = append(msgs, msg)
}
// create 100 tasks in custom queue
@@ -841,7 +841,7 @@ func TestListRetryPagination(t *testing.T) {
}
}
func TestListDead(t *testing.T) {
func TestListArchived(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := &base.TaskMessage{
@@ -932,7 +932,7 @@ func TestListDead(t *testing.T) {
}
}
func TestListDeadPagination(t *testing.T) {
func TestListArchivedPagination(t *testing.T) {
r := setup(t)
defer r.Close()
var entries []base.Z
@@ -996,7 +996,7 @@ var (
zScoreCmpOpt = h.EquateInt64Approx(2) // allow for 2 seconds margin in Z.Score
)
func TestRunDeadTask(t *testing.T) {
func TestRunArchivedTask(t *testing.T) {
r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil)
@@ -1008,9 +1008,8 @@ func TestRunDeadTask(t *testing.T) {
tests := []struct {
archived map[string][]base.Z
qname string
score int64
id uuid.UUID
want error // expected return value from calling RunDeadTask
want error // expected return value from calling RunArchivedTask
wantArchived map[string][]*base.TaskMessage
wantPending map[string][]*base.TaskMessage
}{
@@ -1022,7 +1021,6 @@ func TestRunDeadTask(t *testing.T) {
},
},
qname: "default",
score: s2,
id: t2.ID,
want: nil,
wantArchived: map[string][]*base.TaskMessage{
@@ -1040,8 +1038,7 @@ func TestRunDeadTask(t *testing.T) {
},
},
qname: "default",
score: 123,
id: t2.ID,
id: uuid.New(),
want: ErrTaskNotFound,
wantArchived: map[string][]*base.TaskMessage{
"default": {t1, t2},
@@ -1061,7 +1058,6 @@ func TestRunDeadTask(t *testing.T) {
},
},
qname: "critical",
score: s1,
id: t3.ID,
want: nil,
wantArchived: map[string][]*base.TaskMessage{
@@ -1079,16 +1075,16 @@ func TestRunDeadTask(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllArchivedQueues(t, r.client, tc.archived)
got := r.RunArchivedTask(tc.qname, tc.id, tc.score)
got := r.RunArchivedTask(tc.qname, tc.id)
if got != tc.want {
t.Errorf("r.RunDeadTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
t.Errorf("r.RunDeadTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
continue
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff)
}
}
@@ -1113,7 +1109,6 @@ func TestRunRetryTask(t *testing.T) {
tests := []struct {
retry map[string][]base.Z
qname string
score int64
id uuid.UUID
want error // expected return value from calling RunRetryTask
wantRetry map[string][]*base.TaskMessage
@@ -1127,7 +1122,6 @@ func TestRunRetryTask(t *testing.T) {
},
},
qname: "default",
score: s2,
id: t2.ID,
want: nil,
wantRetry: map[string][]*base.TaskMessage{
@@ -1145,8 +1139,7 @@ func TestRunRetryTask(t *testing.T) {
},
},
qname: "default",
score: 123,
id: t2.ID,
id: uuid.New(),
want: ErrTaskNotFound,
wantRetry: map[string][]*base.TaskMessage{
"default": {t1, t2},
@@ -1166,7 +1159,6 @@ func TestRunRetryTask(t *testing.T) {
},
},
qname: "low",
score: s2,
id: t3.ID,
want: nil,
wantRetry: map[string][]*base.TaskMessage{
@@ -1184,16 +1176,16 @@ func TestRunRetryTask(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllRetryQueues(t, r.client, tc.retry) // initialize retry queue
got := r.RunRetryTask(tc.qname, tc.id, tc.score)
got := r.RunRetryTask(tc.qname, tc.id)
if got != tc.want {
t.Errorf("r.RunRetryTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
t.Errorf("r.RunRetryTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
continue
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff)
}
}
@@ -1218,7 +1210,6 @@ func TestRunScheduledTask(t *testing.T) {
tests := []struct {
scheduled map[string][]base.Z
qname string
score int64
id uuid.UUID
want error // expected return value from calling RunScheduledTask
wantScheduled map[string][]*base.TaskMessage
@@ -1232,7 +1223,6 @@ func TestRunScheduledTask(t *testing.T) {
},
},
qname: "default",
score: s2,
id: t2.ID,
want: nil,
wantScheduled: map[string][]*base.TaskMessage{
@@ -1250,8 +1240,7 @@ func TestRunScheduledTask(t *testing.T) {
},
},
qname: "default",
score: 123,
id: t2.ID,
id: uuid.New(),
want: ErrTaskNotFound,
wantScheduled: map[string][]*base.TaskMessage{
"default": {t1, t2},
@@ -1271,7 +1260,6 @@ func TestRunScheduledTask(t *testing.T) {
},
},
qname: "notifications",
score: s1,
id: t3.ID,
want: nil,
wantScheduled: map[string][]*base.TaskMessage{
@@ -1289,16 +1277,16 @@ func TestRunScheduledTask(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
got := r.RunScheduledTask(tc.qname, tc.id, tc.score)
got := r.RunScheduledTask(tc.qname, tc.id)
if got != tc.want {
t.Errorf("r.RunRetryTask(%q, %s, %d) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
t.Errorf("r.RunRetryTask(%q, %s) = %v, want %v", tc.qname, tc.id, got, tc.want)
continue
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff)
}
}
@@ -1405,7 +1393,7 @@ func TestRunAllScheduledTasks(t *testing.T) {
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff)
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.PendingKey(qname), diff)
}
}
for qname, want := range tc.wantScheduled {
@@ -1511,7 +1499,7 @@ func TestRunAllRetryTasks(t *testing.T) {
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff)
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.PendingKey(qname), diff)
}
}
for qname, want := range tc.wantRetry {
@@ -1523,7 +1511,7 @@ func TestRunAllRetryTasks(t *testing.T) {
}
}
func TestRunAllDeadTasks(t *testing.T) {
func TestRunAllArchivedTasks(t *testing.T) {
r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil)
@@ -1617,7 +1605,7 @@ func TestRunAllDeadTasks(t *testing.T) {
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.QueueKey(qname), diff)
t.Errorf("%s; mismatch found in %q; (-want, +got)\n%s", tc.desc, base.PendingKey(qname), diff)
}
}
for qname, want := range tc.wantArchived {
@@ -1629,7 +1617,7 @@ func TestRunAllDeadTasks(t *testing.T) {
}
}
func TestKillRetryTask(t *testing.T) {
func TestArchiveRetryTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
@@ -1646,7 +1634,6 @@ func TestKillRetryTask(t *testing.T) {
archived map[string][]base.Z
qname string
id uuid.UUID
score int64
want error
wantRetry map[string][]base.Z
wantArchived map[string][]base.Z
@@ -1663,7 +1650,6 @@ func TestKillRetryTask(t *testing.T) {
},
qname: "default",
id: m1.ID,
score: t1.Unix(),
want: nil,
wantRetry: map[string][]base.Z{
"default": {{Message: m2, Score: t2.Unix()}},
@@ -1680,8 +1666,7 @@ func TestKillRetryTask(t *testing.T) {
"default": {{Message: m2, Score: t2.Unix()}},
},
qname: "default",
id: m2.ID,
score: t2.Unix(),
id: uuid.New(),
want: ErrTaskNotFound,
wantRetry: map[string][]base.Z{
"default": {{Message: m1, Score: t1.Unix()}},
@@ -1707,7 +1692,6 @@ func TestKillRetryTask(t *testing.T) {
},
qname: "custom",
id: m3.ID,
score: t3.Unix(),
want: nil,
wantRetry: map[string][]base.Z{
"default": {
@@ -1730,10 +1714,10 @@ func TestKillRetryTask(t *testing.T) {
h.SeedAllRetryQueues(t, r.client, tc.retry)
h.SeedAllArchivedQueues(t, r.client, tc.archived)
got := r.ArchiveRetryTask(tc.qname, tc.id, tc.score)
got := r.ArchiveRetryTask(tc.qname, tc.id)
if got != tc.want {
t.Errorf("(*RDB).KillRetryTask(%q, %v, %v) = %v, want %v",
tc.qname, tc.id, tc.score, got, tc.want)
t.Errorf("(*RDB).ArchiveRetryTask(%q, %v) = %v, want %v",
tc.qname, tc.id, got, tc.want)
continue
}
@@ -1755,7 +1739,7 @@ func TestKillRetryTask(t *testing.T) {
}
}
func TestKillScheduledTask(t *testing.T) {
func TestArchiveScheduledTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
@@ -1772,7 +1756,6 @@ func TestKillScheduledTask(t *testing.T) {
archived map[string][]base.Z
qname string
id uuid.UUID
score int64
want error
wantScheduled map[string][]base.Z
wantArchived map[string][]base.Z
@@ -1789,7 +1772,6 @@ func TestKillScheduledTask(t *testing.T) {
},
qname: "default",
id: m1.ID,
score: t1.Unix(),
want: nil,
wantScheduled: map[string][]base.Z{
"default": {{Message: m2, Score: t2.Unix()}},
@@ -1807,7 +1789,6 @@ func TestKillScheduledTask(t *testing.T) {
},
qname: "default",
id: m2.ID,
score: t2.Unix(),
want: ErrTaskNotFound,
wantScheduled: map[string][]base.Z{
"default": {{Message: m1, Score: t1.Unix()}},
@@ -1833,7 +1814,6 @@ func TestKillScheduledTask(t *testing.T) {
},
qname: "custom",
id: m3.ID,
score: t3.Unix(),
want: nil,
wantScheduled: map[string][]base.Z{
"default": {
@@ -1856,10 +1836,10 @@ func TestKillScheduledTask(t *testing.T) {
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllArchivedQueues(t, r.client, tc.archived)
got := r.ArchiveScheduledTask(tc.qname, tc.id, tc.score)
got := r.ArchiveScheduledTask(tc.qname, tc.id)
if got != tc.want {
t.Errorf("(*RDB).KillScheduledTask(%q, %v, %v) = %v, want %v",
tc.qname, tc.id, tc.score, got, tc.want)
t.Errorf("(*RDB).ArchiveScheduledTask(%q, %v) = %v, want %v",
tc.qname, tc.id, got, tc.want)
continue
}
@@ -1881,7 +1861,244 @@ func TestKillScheduledTask(t *testing.T) {
}
}
func TestKillAllRetryTasks(t *testing.T) {
func TestArchivePendingTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
oneHourAgo := time.Now().Add(-1 * time.Hour)
tests := []struct {
pending map[string][]*base.TaskMessage
archived map[string][]base.Z
qname string
id uuid.UUID
want error
wantPending map[string][]*base.TaskMessage
wantArchived map[string][]base.Z
}{
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
},
archived: map[string][]base.Z{
"default": {},
},
qname: "default",
id: m1.ID,
want: nil,
wantPending: map[string][]*base.TaskMessage{
"default": {m2},
},
wantArchived: map[string][]base.Z{
"default": {{Message: m1, Score: time.Now().Unix()}},
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1},
},
archived: map[string][]base.Z{
"default": {{Message: m2, Score: oneHourAgo.Unix()}},
},
qname: "default",
id: m2.ID,
want: ErrTaskNotFound,
wantPending: map[string][]*base.TaskMessage{
"default": {m1},
},
wantArchived: map[string][]base.Z{
"default": {{Message: m2, Score: oneHourAgo.Unix()}},
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3, m4},
},
archived: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
id: m3.ID,
want: nil,
wantPending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m4},
},
wantArchived: map[string][]base.Z{
"default": {},
"custom": {{Message: m3, Score: time.Now().Unix()}},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllArchivedQueues(t, r.client, tc.archived)
got := r.ArchivePendingTask(tc.qname, tc.id)
if got != tc.want {
t.Errorf("(*RDB).ArchivePendingTask(%q, %v) = %v, want %v",
tc.qname, tc.id, got, tc.want)
continue
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.PendingKey(qname), diff)
}
}
for qname, want := range tc.wantArchived {
gotDead := h.GetArchivedEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.ArchivedKey(qname), diff)
}
}
}
}
func TestArchiveAllPendingTasks(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
m4 := h.NewTaskMessageWithQueue("task4", nil, "custom")
t1 := time.Now().Add(1 * time.Minute)
t2 := time.Now().Add(1 * time.Hour)
tests := []struct {
pending map[string][]*base.TaskMessage
archived map[string][]base.Z
qname string
want int64
wantPending map[string][]*base.TaskMessage
wantArchived map[string][]base.Z
}{
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
},
archived: map[string][]base.Z{
"default": {},
},
qname: "default",
want: 2,
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
wantArchived: map[string][]base.Z{
"default": {
{Message: m1, Score: time.Now().Unix()},
{Message: m2, Score: time.Now().Unix()},
},
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1},
},
archived: map[string][]base.Z{
"default": {{Message: m2, Score: t2.Unix()}},
},
qname: "default",
want: 1,
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
wantArchived: map[string][]base.Z{
"default": {
{Message: m1, Score: time.Now().Unix()},
{Message: m2, Score: t2.Unix()},
},
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {},
},
archived: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
},
qname: "default",
want: 0,
wantPending: map[string][]*base.TaskMessage{
"default": {},
},
wantArchived: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
{Message: m2, Score: t2.Unix()},
},
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3, m4},
},
archived: map[string][]base.Z{
"default": {},
"custom": {},
},
qname: "custom",
want: 2,
wantPending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {},
},
wantArchived: map[string][]base.Z{
"default": {},
"custom": {
{Message: m3, Score: time.Now().Unix()},
{Message: m4, Score: time.Now().Unix()},
},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllArchivedQueues(t, r.client, tc.archived)
got, err := r.ArchiveAllPendingTasks(tc.qname)
if got != tc.want || err != nil {
t.Errorf("(*RDB).KillAllRetryTasks(%q) = %v, %v; want %v, nil",
tc.qname, got, err, tc.want)
continue
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.PendingKey(qname), diff)
}
}
for qname, want := range tc.wantArchived {
gotDead := h.GetArchivedEntries(t, r.client, qname)
if diff := cmp.Diff(want, gotDead, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want,+got)\n%s",
base.ArchivedKey(qname), diff)
}
}
}
}
func TestArchiveAllRetryTasks(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
@@ -2028,7 +2245,7 @@ func TestKillAllRetryTasks(t *testing.T) {
}
}
func TestKillAllScheduledTasks(t *testing.T) {
func TestArchiveAllScheduledTasks(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
@@ -2175,7 +2392,7 @@ func TestKillAllScheduledTasks(t *testing.T) {
}
}
func TestDeleteDeadTask(t *testing.T) {
func TestDeleteArchivedTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
@@ -2189,7 +2406,6 @@ func TestDeleteDeadTask(t *testing.T) {
archived map[string][]base.Z
qname string
id uuid.UUID
score int64
want error
wantArchived map[string][]*base.TaskMessage
}{
@@ -2202,7 +2418,6 @@ func TestDeleteDeadTask(t *testing.T) {
},
qname: "default",
id: m1.ID,
score: t1.Unix(),
want: nil,
wantArchived: map[string][]*base.TaskMessage{
"default": {m2},
@@ -2220,7 +2435,6 @@ func TestDeleteDeadTask(t *testing.T) {
},
qname: "custom",
id: m3.ID,
score: t3.Unix(),
want: nil,
wantArchived: map[string][]*base.TaskMessage{
"default": {m1, m2},
@@ -2235,8 +2449,7 @@ func TestDeleteDeadTask(t *testing.T) {
},
},
qname: "default",
id: m1.ID,
score: t2.Unix(), // id and score mismatch
id: uuid.New(),
want: ErrTaskNotFound,
wantArchived: map[string][]*base.TaskMessage{
"default": {m1, m2},
@@ -2248,7 +2461,6 @@ func TestDeleteDeadTask(t *testing.T) {
},
qname: "default",
id: m1.ID,
score: t1.Unix(),
want: ErrTaskNotFound,
wantArchived: map[string][]*base.TaskMessage{
"default": {},
@@ -2260,9 +2472,9 @@ func TestDeleteDeadTask(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllArchivedQueues(t, r.client, tc.archived)
got := r.DeleteArchivedTask(tc.qname, tc.id, tc.score)
got := r.DeleteArchivedTask(tc.qname, tc.id)
if got != tc.want {
t.Errorf("r.DeleteDeadTask(%q, %v, %v) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
t.Errorf("r.DeleteArchivedTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want)
continue
}
@@ -2289,7 +2501,6 @@ func TestDeleteRetryTask(t *testing.T) {
retry map[string][]base.Z
qname string
id uuid.UUID
score int64
want error
wantRetry map[string][]*base.TaskMessage
}{
@@ -2302,7 +2513,6 @@ func TestDeleteRetryTask(t *testing.T) {
},
qname: "default",
id: m1.ID,
score: t1.Unix(),
want: nil,
wantRetry: map[string][]*base.TaskMessage{
"default": {m2},
@@ -2320,7 +2530,6 @@ func TestDeleteRetryTask(t *testing.T) {
},
qname: "custom",
id: m3.ID,
score: t3.Unix(),
want: nil,
wantRetry: map[string][]*base.TaskMessage{
"default": {m1, m2},
@@ -2332,8 +2541,7 @@ func TestDeleteRetryTask(t *testing.T) {
"default": {{Message: m1, Score: t1.Unix()}},
},
qname: "default",
id: m2.ID,
score: t2.Unix(),
id: uuid.New(),
want: ErrTaskNotFound,
wantRetry: map[string][]*base.TaskMessage{
"default": {m1},
@@ -2345,9 +2553,9 @@ func TestDeleteRetryTask(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllRetryQueues(t, r.client, tc.retry)
got := r.DeleteRetryTask(tc.qname, tc.id, tc.score)
got := r.DeleteRetryTask(tc.qname, tc.id)
if got != tc.want {
t.Errorf("r.DeleteRetryTask(%q, %v, %v) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
t.Errorf("r.DeleteRetryTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want)
continue
}
@@ -2374,7 +2582,6 @@ func TestDeleteScheduledTask(t *testing.T) {
scheduled map[string][]base.Z
qname string
id uuid.UUID
score int64
want error
wantScheduled map[string][]*base.TaskMessage
}{
@@ -2387,7 +2594,6 @@ func TestDeleteScheduledTask(t *testing.T) {
},
qname: "default",
id: m1.ID,
score: t1.Unix(),
want: nil,
wantScheduled: map[string][]*base.TaskMessage{
"default": {m2},
@@ -2405,7 +2611,6 @@ func TestDeleteScheduledTask(t *testing.T) {
},
qname: "custom",
id: m3.ID,
score: t3.Unix(),
want: nil,
wantScheduled: map[string][]*base.TaskMessage{
"default": {m1, m2},
@@ -2417,8 +2622,7 @@ func TestDeleteScheduledTask(t *testing.T) {
"default": {{Message: m1, Score: t1.Unix()}},
},
qname: "default",
id: m2.ID,
score: t2.Unix(),
id: uuid.New(),
want: ErrTaskNotFound,
wantScheduled: map[string][]*base.TaskMessage{
"default": {m1},
@@ -2430,9 +2634,9 @@ func TestDeleteScheduledTask(t *testing.T) {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
got := r.DeleteScheduledTask(tc.qname, tc.id, tc.score)
got := r.DeleteScheduledTask(tc.qname, tc.id)
if got != tc.want {
t.Errorf("r.DeleteScheduledTask(%q, %v, %v) = %v, want %v", tc.qname, tc.id, tc.score, got, tc.want)
t.Errorf("r.DeleteScheduledTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want)
continue
}
@@ -2445,67 +2649,76 @@ func TestDeleteScheduledTask(t *testing.T) {
}
}
func TestDeleteUniqueTask(t *testing.T) {
func TestDeletePendingTask(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := &base.TaskMessage{
ID: uuid.New(),
Type: "reindex",
Payload: nil,
Timeout: 1800,
Deadline: 0,
UniqueKey: "asynq:{default}:unique:reindex:nil",
Queue: "default",
}
t1 := time.Now().Add(5 * time.Minute)
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
tests := []struct {
scheduled map[string][]base.Z
qname string
id uuid.UUID
score int64
uniqueKey string
wantScheduled map[string][]*base.TaskMessage
pending map[string][]*base.TaskMessage
qname string
id uuid.UUID
want error
wantPending map[string][]*base.TaskMessage
}{
{
scheduled: map[string][]base.Z{
"default": {
{Message: m1, Score: t1.Unix()},
},
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
},
qname: "default",
id: m1.ID,
score: t1.Unix(),
uniqueKey: m1.UniqueKey,
wantScheduled: map[string][]*base.TaskMessage{
"default": {},
qname: "default",
id: m1.ID,
want: nil,
wantPending: map[string][]*base.TaskMessage{
"default": {m2},
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3},
},
qname: "custom",
id: m3.ID,
want: nil,
wantPending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {},
},
},
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
},
qname: "default",
id: uuid.New(),
want: ErrTaskNotFound,
wantPending: map[string][]*base.TaskMessage{
"default": {m1, m2},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
if err := r.client.SetNX(tc.uniqueKey, tc.id.String(), time.Minute).Err(); err != nil {
t.Fatalf("Could not set unique lock in redis: %v", err)
}
h.FlushDB(t, r.client)
h.SeedAllPendingQueues(t, r.client, tc.pending)
if err := r.DeleteScheduledTask(tc.qname, tc.id, tc.score); err != nil {
t.Errorf("r.DeleteScheduledTask(%q, %v, %v) returned error: %v", tc.qname, tc.id, tc.score, err)
got := r.DeletePendingTask(tc.qname, tc.id)
if got != tc.want {
t.Errorf("r.DeletePendingTask(%q, %v) = %v, want %v", tc.qname, tc.id, got, tc.want)
continue
}
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.ScheduledKey(qname), diff)
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff)
}
}
if r.client.Exists(tc.uniqueKey).Val() != 0 {
t.Errorf("Uniqueness lock %q still exists", tc.uniqueKey)
}
}
}
func TestDeleteAllArchivedTasks(t *testing.T) {
r := setup(t)
defer r.Close()
@@ -2775,6 +2988,63 @@ func TestDeleteAllScheduledTasks(t *testing.T) {
}
}
func TestDeleteAllPendingTasks(t *testing.T) {
r := setup(t)
defer r.Close()
m1 := h.NewTaskMessage("task1", nil)
m2 := h.NewTaskMessage("task2", nil)
m3 := h.NewTaskMessageWithQueue("task3", nil, "custom")
tests := []struct {
pending map[string][]*base.TaskMessage
qname string
want int64
wantPending map[string][]*base.TaskMessage
}{
{
pending: map[string][]*base.TaskMessage{
"default": {m1, m2},
"custom": {m3},
},
qname: "default",
want: 2,
wantPending: map[string][]*base.TaskMessage{
"default": {},
"custom": {m3},
},
},
{
pending: map[string][]*base.TaskMessage{
"custom": {},
},
qname: "custom",
want: 0,
wantPending: map[string][]*base.TaskMessage{
"custom": {},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllPendingQueues(t, r.client, tc.pending)
got, err := r.DeleteAllPendingTasks(tc.qname)
if err != nil {
t.Errorf("r.DeleteAllPendingTasks(%q) returned error: %v", tc.qname, err)
}
if got != tc.want {
t.Errorf("r.DeleteAllPendingTasks(%q) = %d, nil, want %d, nil", tc.qname, got, tc.want)
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff)
}
}
}
}
func TestRemoveQueue(t *testing.T) {
r := setup(t)
defer r.Close()
@@ -2861,7 +3131,7 @@ func TestRemoveQueue(t *testing.T) {
}
keys := []string{
base.QueueKey(tc.qname),
base.PendingKey(tc.qname),
base.ActiveKey(tc.qname),
base.DeadlinesKey(tc.qname),
base.ScheduledKey(tc.qname),
@@ -2873,6 +3143,10 @@ func TestRemoveQueue(t *testing.T) {
t.Errorf("key %q still exists", key)
}
}
if n := len(r.client.Keys(base.TaskKeyPrefix(tc.qname) + "*").Val()); n != 0 {
t.Errorf("%d keys still exists for tasks", n)
}
}
}
@@ -2990,7 +3264,7 @@ func TestRemoveQueueError(t *testing.T) {
for qname, want := range tc.pending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.QueueKey(qname), diff)
t.Errorf("%s;mismatch found in %q; (-want,+got):\n%s", tc.desc, base.PendingKey(qname), diff)
}
}
for qname, want := range tc.inProgress {

View File

@@ -6,10 +6,8 @@
package rdb
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"time"
"github.com/go-redis/redis/v7"
@@ -50,7 +48,19 @@ func (r *RDB) Ping() error {
return r.client.Ping().Err()
}
// Enqueue inserts the given task to the tail of the queue.
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:pending
// ARGV[1] -> task message data
// ARGV[2] -> task ID
// ARGV[3] -> task timeout in seconds (0 if not timeout)
// ARGV[4] -> task deadline in unix time (0 if no deadline)
var enqueueCmd = redis.NewScript(`
redis.call("HSET", KEYS[1], "msg", ARGV[1], "timeout", ARGV[3], "deadline", ARGV[4])
redis.call("LPUSH", KEYS[2], ARGV[2])
return 1
`)
// Enqueue adds the given task to the pending list of the queue.
func (r *RDB) Enqueue(msg *base.TaskMessage) error {
encoded, err := base.EncodeMessage(msg)
if err != nil {
@@ -59,21 +69,34 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
return err
}
key := base.QueueKey(msg.Queue)
return r.client.LPush(key, encoded).Err()
keys := []string{
base.TaskKey(msg.Queue, msg.ID.String()),
base.PendingKey(msg.Queue),
}
argv := []interface{}{
encoded,
msg.ID.String(),
msg.Timeout,
msg.Deadline,
}
return enqueueCmd.Run(r.client, keys, argv...).Err()
}
// KEYS[1] -> unique key
// KEYS[2] -> asynq:{<qname>}
// KEYS[2] -> asynq:{<qname>}:t:<taskid>
// KEYS[3] -> asynq:{<qname>}:pending
// ARGV[1] -> task ID
// ARGV[2] -> uniqueness lock TTL
// ARGV[3] -> task message data
// ARGV[4] -> task timeout in seconds (0 if not timeout)
// ARGV[5] -> task deadline in unix time (0 if no deadline)
var enqueueUniqueCmd = redis.NewScript(`
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
if not ok then
return 0
end
redis.call("LPUSH", KEYS[2], ARGV[3])
redis.call("HSET", KEYS[2], "msg", ARGV[3], "timeout", ARGV[4], "deadline", ARGV[5])
redis.call("LPUSH", KEYS[3], ARGV[1])
return 1
`)
@@ -87,9 +110,19 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
return err
}
res, err := enqueueUniqueCmd.Run(r.client,
[]string{msg.UniqueKey, base.QueueKey(msg.Queue)},
msg.ID.String(), int(ttl.Seconds()), encoded).Result()
keys := []string{
msg.UniqueKey,
base.TaskKey(msg.Queue, msg.ID.String()),
base.PendingKey(msg.Queue),
}
argv := []interface{}{
msg.ID.String(),
int(ttl.Seconds()),
encoded,
msg.Timeout,
msg.Deadline,
}
res, err := enqueueUniqueCmd.Run(r.client, keys, argv...).Result()
if err != nil {
return err
}
@@ -108,21 +141,22 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
// Dequeue skips a queue if the queue is paused.
// If all queues are empty, ErrNoProcessableTask error is returned.
func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error) {
data, d, err := r.dequeue(qnames...)
encoded, d, err := r.dequeue(qnames...)
if err != nil {
return nil, time.Time{}, err
}
if msg, err = base.DecodeMessage(data); err != nil {
if msg, err = base.DecodeMessage([]byte(encoded)); err != nil {
return nil, time.Time{}, err
}
return msg, time.Unix(d, 0), nil
}
// KEYS[1] -> asynq:{<qname>}
// KEYS[1] -> asynq:{<qname>}:pending
// KEYS[2] -> asynq:{<qname>}:paused
// KEYS[3] -> asynq:{<qname>}:active
// KEYS[4] -> asynq:{<qname>}:deadlines
// ARGV[1] -> current time in Unix time
// ARGV[1] -> current time in Unix time
// ARGV[2] -> task key prefix
//
// dequeueCmd checks whether a queue is paused first, before
// calling RPOPLPUSH to pop a task from the queue.
@@ -130,11 +164,13 @@ func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Ti
// and inserts the task with deadlines set.
var dequeueCmd = redis.NewScript(`
if redis.call("EXISTS", KEYS[2]) == 0 then
local msg = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
if msg then
local decoded = cjson.decode(msg)
local timeout = decoded["Timeout"]
local deadline = decoded["Deadline"]
local id = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
if id then
local key = ARGV[2] .. id
local data = redis.call("HMGET", key, "msg", "timeout", "deadline")
local msg = data[1]
local timeout = tonumber(data[2])
local deadline = tonumber(data[3])
local score
if timeout ~= 0 and deadline ~= 0 then
score = math.min(ARGV[1]+timeout, deadline)
@@ -145,21 +181,25 @@ if redis.call("EXISTS", KEYS[2]) == 0 then
else
return redis.error_reply("asynq internal error: both timeout and deadline are not set")
end
redis.call("ZADD", KEYS[4], score, msg)
redis.call("ZADD", KEYS[4], score, id)
return {msg, score}
end
end
return nil`)
func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err error) {
func (r *RDB) dequeue(qnames ...string) (encoded string, deadline int64, err error) {
for _, qname := range qnames {
keys := []string{
base.QueueKey(qname),
base.PendingKey(qname),
base.PausedKey(qname),
base.ActiveKey(qname),
base.DeadlinesKey(qname),
}
res, err := dequeueCmd.Run(r.client, keys, time.Now().Unix()).Result()
argv := []interface{}{
time.Now().Unix(),
base.TaskKeyPrefix(qname),
}
res, err := dequeueCmd.Run(r.client, keys, argv...).Result()
if err == redis.Nil {
continue
} else if err != nil {
@@ -172,21 +212,22 @@ func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err err
if len(data) != 2 {
return "", 0, fmt.Errorf("asynq: internal error: dequeue command returned %d values", len(data))
}
if msgjson, err = cast.ToStringE(data[0]); err != nil {
if encoded, err = cast.ToStringE(data[0]); err != nil {
return "", 0, err
}
if deadline, err = cast.ToInt64E(data[1]); err != nil {
return "", 0, err
}
return msgjson, deadline, nil
return encoded, deadline, nil
}
return "", 0, ErrNoProcessableTask
}
// KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// ARGV[1] -> base.TaskMessage value
// KEYS[3] -> asynq:{<qname>}:t:<task_id>
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// ARGV[1] -> task ID
// ARGV[2] -> stats expiration timestamp
var doneCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
@@ -195,20 +236,23 @@ end
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
local n = redis.call("INCR", KEYS[3])
if redis.call("DEL", KEYS[3]) == 0 then
return redis.error_reply("NOT FOUND")
end
local n = redis.call("INCR", KEYS[4])
if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[3], ARGV[2])
redis.call("EXPIREAT", KEYS[4], ARGV[2])
end
return redis.status_reply("OK")
`)
// KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[4] -> unique key
// ARGV[1] -> base.TaskMessage value
// KEYS[3] -> asynq:{<qname>}:t:<task_id>
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[5] -> unique key
// ARGV[1] -> task ID
// ARGV[2] -> stats expiration timestamp
// ARGV[3] -> task ID
var doneUniqueCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
@@ -216,12 +260,15 @@ end
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
local n = redis.call("INCR", KEYS[3])
if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[3], ARGV[2])
if redis.call("DEL", KEYS[3]) == 0 then
return redis.error_reply("NOT FOUND")
end
if redis.call("GET", KEYS[4]) == ARGV[3] then
redis.call("DEL", KEYS[4])
local n = redis.call("INCR", KEYS[4])
if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[4], ARGV[2])
end
if redis.call("GET", KEYS[5]) == ARGV[1] then
redis.call("DEL", KEYS[5])
end
return redis.status_reply("OK")
`)
@@ -229,30 +276,29 @@ return redis.status_reply("OK")
// Done removes the task from active queue to mark the task as done.
// It removes a uniqueness lock acquired by the task, if any.
func (r *RDB) Done(msg *base.TaskMessage) error {
encoded, err := base.EncodeMessage(msg)
if err != nil {
return err
}
now := time.Now()
expireAt := now.Add(statsTTL)
keys := []string{
base.ActiveKey(msg.Queue),
base.DeadlinesKey(msg.Queue),
base.TaskKey(msg.Queue, msg.ID.String()),
base.ProcessedKey(msg.Queue, now),
}
args := []interface{}{encoded, expireAt.Unix()}
argv := []interface{}{
msg.ID.String(),
expireAt.Unix(),
}
if len(msg.UniqueKey) > 0 {
keys = append(keys, msg.UniqueKey)
args = append(args, msg.ID.String())
return doneUniqueCmd.Run(r.client, keys, args...).Err()
return doneUniqueCmd.Run(r.client, keys, argv...).Err()
}
return doneCmd.Run(r.client, keys, args...).Err()
return doneCmd.Run(r.client, keys, argv...).Err()
}
// KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}
// ARGV[1] -> base.TaskMessage value
// KEYS[3] -> asynq:{<qname>}:pending
// ARGV[1] -> task ID
// Note: Use RPUSH to push to the head of the queue.
var requeueCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
@@ -266,16 +312,25 @@ return redis.status_reply("OK")`)
// Requeue moves the task from active queue to the specified queue.
func (r *RDB) Requeue(msg *base.TaskMessage) error {
encoded, err := base.EncodeMessage(msg)
if err != nil {
return err
}
return requeueCmd.Run(r.client,
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.QueueKey(msg.Queue)},
encoded).Err()
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.PendingKey(msg.Queue)},
msg.ID.String()).Err()
}
// Schedule adds the task to the backlog queue to be processed in the future.
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:scheduled
// ARGV[1] -> task message data
// ARGV[2] -> process_at time in Unix time
// ARGV[3] -> task ID
// ARGV[4] -> task timeout in seconds (0 if not timeout)
// ARGV[5] -> task deadline in unix time (0 if no deadline)
var scheduleCmd = redis.NewScript(`
redis.call("HSET", KEYS[1], "msg", ARGV[1], "timeout", ARGV[4], "deadline", ARGV[5])
redis.call("ZADD", KEYS[2], ARGV[2], ARGV[3])
return 1
`)
// Schedule adds the task to the scheduled set to be processed in the future.
func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error {
encoded, err := base.EncodeMessage(msg)
if err != nil {
@@ -284,22 +339,36 @@ func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error {
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
return err
}
score := float64(processAt.Unix())
return r.client.ZAdd(base.ScheduledKey(msg.Queue), &redis.Z{Score: score, Member: encoded}).Err()
keys := []string{
base.TaskKey(msg.Queue, msg.ID.String()),
base.ScheduledKey(msg.Queue),
}
argv := []interface{}{
encoded,
processAt.Unix(),
msg.ID.String(),
msg.Timeout,
msg.Deadline,
}
return scheduleCmd.Run(r.client, keys, argv...).Err()
}
// KEYS[1] -> unique key
// KEYS[2] -> asynq:{<qname>}:scheduled
// KEYS[2] -> asynq:{<qname>}:t:<task_id>
// KEYS[3] -> asynq:{<qname>}:scheduled
// ARGV[1] -> task ID
// ARGV[2] -> uniqueness lock TTL
// ARGV[3] -> score (process_at timestamp)
// ARGV[4] -> task message
// ARGV[5] -> task timeout in seconds (0 if not timeout)
// ARGV[6] -> task deadline in unix time (0 if no deadline)
var scheduleUniqueCmd = redis.NewScript(`
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
if not ok then
return 0
end
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[4])
redis.call("HSET", KEYS[2], "msg", ARGV[4], "timeout", ARGV[5], "deadline", ARGV[6])
redis.call("ZADD", KEYS[3], ARGV[3], ARGV[1])
return 1
`)
@@ -313,10 +382,20 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
return err
}
score := float64(processAt.Unix())
res, err := scheduleUniqueCmd.Run(r.client,
[]string{msg.UniqueKey, base.ScheduledKey(msg.Queue)},
msg.ID.String(), int(ttl.Seconds()), score, encoded).Result()
keys := []string{
msg.UniqueKey,
base.TaskKey(msg.Queue, msg.ID.String()),
base.ScheduledKey(msg.Queue),
}
argv := []interface{}{
msg.ID.String(),
int(ttl.Seconds()),
processAt.Unix(),
encoded,
msg.Timeout,
msg.Deadline,
}
res, err := scheduleUniqueCmd.Run(r.client, keys, argv...).Result()
if err != nil {
return err
}
@@ -330,54 +409,62 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim
return nil
}
// KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:retry
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[5] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
// ARGV[1] -> base.TaskMessage value to remove from base.ActiveQueue queue
// ARGV[2] -> base.TaskMessage value to add to Retry queue
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:active
// KEYS[3] -> asynq:{<qname>}:deadlines
// KEYS[4] -> asynq:{<qname>}:retry
// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
// ARGV[1] -> task ID
// ARGV[2] -> updated base.TaskMessage value
// ARGV[3] -> retry_at UNIX timestamp
// ARGV[4] -> stats expiration timestamp
var retryCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
redis.call("ZADD", KEYS[3], ARGV[3], ARGV[2])
local n = redis.call("INCR", KEYS[4])
redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1])
redis.call("HSET", KEYS[1], "msg", ARGV[2])
local n = redis.call("INCR", KEYS[5])
if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[4], ARGV[4])
end
local m = redis.call("INCR", KEYS[5])
if tonumber(m) == 1 then
redis.call("EXPIREAT", KEYS[5], ARGV[4])
end
local m = redis.call("INCR", KEYS[6])
if tonumber(m) == 1 then
redis.call("EXPIREAT", KEYS[6], ARGV[4])
end
return redis.status_reply("OK")`)
// Retry moves the task from active to retry queue, incrementing retry count
// and assigning error message to the task message.
func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error {
msgToRemove, err := base.EncodeMessage(msg)
if err != nil {
return err
}
modified := *msg
modified.Retried++
modified.ErrorMsg = errMsg
msgToAdd, err := base.EncodeMessage(&modified)
encoded, err := base.EncodeMessage(&modified)
if err != nil {
return err
}
now := time.Now()
processedKey := base.ProcessedKey(msg.Queue, now)
failedKey := base.FailedKey(msg.Queue, now)
expireAt := now.Add(statsTTL)
return retryCmd.Run(r.client,
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.RetryKey(msg.Queue), processedKey, failedKey},
msgToRemove, msgToAdd, processAt.Unix(), expireAt.Unix()).Err()
keys := []string{
base.TaskKey(msg.Queue, msg.ID.String()),
base.ActiveKey(msg.Queue),
base.DeadlinesKey(msg.Queue),
base.RetryKey(msg.Queue),
base.ProcessedKey(msg.Queue, now),
base.FailedKey(msg.Queue, now),
}
argv := []interface{}{
msg.ID.String(),
encoded,
processAt.Unix(),
expireAt.Unix(),
}
return retryCmd.Run(r.client, keys, argv...).Err()
}
const (
@@ -385,68 +472,78 @@ const (
archivedExpirationInDays = 90 // number of days before an archived task gets deleted permanently
)
// KEYS[1] -> asynq:{<qname>}:active
// KEYS[2] -> asynq:{<qname>}:deadlines
// KEYS[3] -> asynq:{<qname>}:archived
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[5] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
// ARGV[1] -> base.TaskMessage value to remove
// ARGV[2] -> base.TaskMessage value to add
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
// KEYS[2] -> asynq:{<qname>}:active
// KEYS[3] -> asynq:{<qname>}:deadlines
// KEYS[4] -> asynq:{<qname>}:archived
// KEYS[5] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
// KEYS[6] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
// ARGV[1] -> task ID
// ARGV[2] -> updated base.TaskMessage value
// ARGV[3] -> died_at UNIX timestamp
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
// ARGV[5] -> max number of tasks in archive (e.g., 100)
// ARGV[6] -> stats expiration timestamp
var archiveCmd = redis.NewScript(`
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
if redis.call("LREM", KEYS[2], 0, ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
if redis.call("ZREM", KEYS[3], ARGV[1]) == 0 then
return redis.error_reply("NOT FOUND")
end
redis.call("ZADD", KEYS[3], ARGV[3], ARGV[2])
redis.call("ZREMRANGEBYSCORE", KEYS[3], "-inf", ARGV[4])
redis.call("ZREMRANGEBYRANK", KEYS[3], 0, -ARGV[5])
local n = redis.call("INCR", KEYS[4])
redis.call("ZADD", KEYS[4], ARGV[3], ARGV[1])
redis.call("ZREMRANGEBYSCORE", KEYS[4], "-inf", ARGV[4])
redis.call("ZREMRANGEBYRANK", KEYS[4], 0, -ARGV[5])
redis.call("HSET", KEYS[1], "msg", ARGV[2])
local n = redis.call("INCR", KEYS[5])
if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[4], ARGV[6])
end
local m = redis.call("INCR", KEYS[5])
if tonumber(m) == 1 then
redis.call("EXPIREAT", KEYS[5], ARGV[6])
end
local m = redis.call("INCR", KEYS[6])
if tonumber(m) == 1 then
redis.call("EXPIREAT", KEYS[6], ARGV[6])
end
return redis.status_reply("OK")`)
// Archive sends the given task to archive, attaching the error message to the task.
// It also trims the archive by timestamp and set size.
func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
msgToRemove, err := base.EncodeMessage(msg)
if err != nil {
return err
}
modified := *msg
modified.ErrorMsg = errMsg
msgToAdd, err := base.EncodeMessage(&modified)
encoded, err := base.EncodeMessage(&modified)
if err != nil {
return err
}
now := time.Now()
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
processedKey := base.ProcessedKey(msg.Queue, now)
failedKey := base.FailedKey(msg.Queue, now)
cutoff := now.AddDate(0, 0, -archivedExpirationInDays)
expireAt := now.Add(statsTTL)
return archiveCmd.Run(r.client,
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.ArchivedKey(msg.Queue), processedKey, failedKey},
msgToRemove, msgToAdd, now.Unix(), limit, maxArchiveSize, expireAt.Unix()).Err()
keys := []string{
base.TaskKey(msg.Queue, msg.ID.String()),
base.ActiveKey(msg.Queue),
base.DeadlinesKey(msg.Queue),
base.ArchivedKey(msg.Queue),
base.ProcessedKey(msg.Queue, now),
base.FailedKey(msg.Queue, now),
}
argv := []interface{}{
msg.ID.String(),
encoded,
now.Unix(),
cutoff.Unix(),
maxArchiveSize,
expireAt.Unix(),
}
return archiveCmd.Run(r.client, keys, argv...).Err()
}
// CheckAndEnqueue checks for scheduled/retry tasks for the given queues
//and enqueues any tasks that are ready to be processed.
func (r *RDB) CheckAndEnqueue(qnames ...string) error {
// ForwardIfReady checks scheduled and retry sets of the given queues
// and move any tasks that are ready to be processed to the pending set.
func (r *RDB) ForwardIfReady(qnames ...string) error {
for _, qname := range qnames {
if err := r.forwardAll(base.ScheduledKey(qname), base.QueueKey(qname)); err != nil {
if err := r.forwardAll(base.ScheduledKey(qname), base.PendingKey(qname)); err != nil {
return err
}
if err := r.forwardAll(base.RetryKey(qname), base.QueueKey(qname)); err != nil {
if err := r.forwardAll(base.RetryKey(qname), base.PendingKey(qname)); err != nil {
return err
}
}
@@ -458,12 +555,12 @@ func (r *RDB) CheckAndEnqueue(qnames ...string) error {
// ARGV[1] -> current unix time
// Note: Script moves tasks up to 100 at a time to keep the runtime of script short.
var forwardCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)
for _, msg in ipairs(msgs) do
redis.call("LPUSH", KEYS[2], msg)
redis.call("ZREM", KEYS[1], msg)
local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)
for _, id in ipairs(ids) do
redis.call("LPUSH", KEYS[2], id)
redis.call("ZREM", KEYS[1], id)
end
return table.getn(msgs)`)
return table.getn(ids)`)
// forward moves tasks with a score less than the current unix time
// from the src zset to the dst list. It returns the number of tasks moved.
@@ -489,20 +586,35 @@ func (r *RDB) forwardAll(src, dst string) (err error) {
return nil
}
// KEYS[1] -> asynq:{<qname>}:deadlines
// ARGV[1] -> deadline in unix time
// ARGV[2] -> task key prefix
var listDeadlineExceededCmd = redis.NewScript(`
local res = {}
local ids = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
for _, id in ipairs(ids) do
local key = ARGV[2] .. id
table.insert(res, redis.call("HGET", key, "msg"))
end
return res
`)
// ListDeadlineExceeded returns a list of task messages that have exceeded the deadline from the given queues.
func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) {
var msgs []*base.TaskMessage
opt := &redis.ZRangeBy{
Min: "-inf",
Max: strconv.FormatInt(deadline.Unix(), 10),
}
for _, qname := range qnames {
res, err := r.client.ZRangeByScore(base.DeadlinesKey(qname), opt).Result()
res, err := listDeadlineExceededCmd.Run(r.client,
[]string{base.DeadlinesKey(qname)},
deadline.Unix(), base.TaskKeyPrefix(qname)).Result()
if err != nil {
return nil, err
}
for _, s := range res {
msg, err := base.DecodeMessage(s)
data, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
for _, s := range data {
msg, err := base.DecodeMessage([]byte(s))
if err != nil {
return nil, err
}
@@ -530,14 +642,14 @@ return redis.status_reply("OK")`)
// WriteServerState writes server state data to redis with expiration set to the value ttl.
func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error {
bytes, err := json.Marshal(info)
bytes, err := base.EncodeServerInfo(info)
if err != nil {
return err
}
exp := time.Now().Add(ttl).UTC()
args := []interface{}{ttl.Seconds(), bytes} // args to the lua script
for _, w := range workers {
bytes, err := json.Marshal(w)
bytes, err := base.EncodeWorkerInfo(w)
if err != nil {
continue // skip bad data
}
@@ -589,7 +701,7 @@ return redis.status_reply("OK")`)
func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error {
args := []interface{}{ttl.Seconds()}
for _, e := range entries {
bytes, err := json.Marshal(e)
bytes, err := base.EncodeSchedulerEntry(e)
if err != nil {
continue // skip bad data
}
@@ -644,7 +756,7 @@ const maxEvents = 1000
// RecordSchedulerEnqueueEvent records the time when the given task was enqueued.
func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error {
key := base.SchedulerHistoryKey(entryID)
data, err := json.Marshal(event)
data, err := base.EncodeSchedulerEnqueueEvent(event)
if err != nil {
return err
}

View File

@@ -83,7 +83,7 @@ func TestEnqueue(t *testing.T) {
gotPending := h.GetPendingMessages(t, r.client, tc.msg.Queue)
if len(gotPending) != 1 {
t.Errorf("%q has length %d, want 1", base.QueueKey(tc.msg.Queue), len(gotPending))
t.Errorf("%q has length %d, want 1", base.PendingKey(tc.msg.Queue), len(gotPending))
continue
}
if diff := cmp.Diff(tc.msg, gotPending[0]); diff != "" {
@@ -101,7 +101,7 @@ func TestEnqueueUnique(t *testing.T) {
m1 := base.TaskMessage{
ID: uuid.New(),
Type: "email",
Payload: map[string]interface{}{"user_id": 123},
Payload: map[string]interface{}{"user_id": json.Number("123")},
Queue: base.DefaultQueueName,
UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", map[string]interface{}{"user_id": 123}),
}
@@ -116,13 +116,26 @@ func TestEnqueueUnique(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case.
// Enqueue the first message, should succeed.
err := r.EnqueueUnique(tc.msg, tc.ttl)
if err != nil {
t.Errorf("First message: (*RDB).EnqueueUnique(%v, %v) = %v, want nil",
tc.msg, tc.ttl, err)
continue
}
gotPending := h.GetPendingMessages(t, r.client, tc.msg.Queue)
if len(gotPending) != 1 {
t.Errorf("%q has length %d, want 1", base.PendingKey(tc.msg.Queue), len(gotPending))
continue
}
if diff := cmp.Diff(tc.msg, gotPending[0]); diff != "" {
t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff)
}
if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() {
t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues)
}
// Enqueue the second message, should fail.
got := r.EnqueueUnique(tc.msg, tc.ttl)
if got != ErrDuplicateTask {
t.Errorf("Second message: (*RDB).EnqueueUnique(%v, %v) = %v, want %v",
@@ -134,9 +147,6 @@ func TestEnqueueUnique(t *testing.T) {
t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl)
continue
}
if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() {
t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues)
}
}
}
@@ -148,6 +158,7 @@ func TestDequeue(t *testing.T) {
ID: uuid.New(),
Type: "send_email",
Payload: map[string]interface{}{"subject": "hello!"},
Queue: "default",
Timeout: 1800,
Deadline: 0,
}
@@ -156,6 +167,7 @@ func TestDequeue(t *testing.T) {
ID: uuid.New(),
Type: "export_csv",
Payload: nil,
Queue: "critical",
Timeout: 0,
Deadline: 1593021600,
}
@@ -164,10 +176,10 @@ func TestDequeue(t *testing.T) {
ID: uuid.New(),
Type: "reindex",
Payload: nil,
Queue: "low",
Timeout: int64((5 * time.Minute).Seconds()),
Deadline: time.Now().Add(10 * time.Minute).Unix(),
}
t3Deadline := now.Unix() + t3.Timeout // use whichever is earliest
tests := []struct {
pending map[string][]*base.TaskMessage
@@ -243,26 +255,26 @@ func TestDequeue(t *testing.T) {
},
{
pending: map[string][]*base.TaskMessage{
"default": {t3},
"default": {t1},
"critical": {},
"low": {t2, t1},
"low": {t3},
},
args: []string{"critical", "default", "low"},
wantMsg: t3,
wantDeadline: time.Unix(t3Deadline, 0),
wantMsg: t1,
wantDeadline: time.Unix(t1Deadline, 0),
err: nil,
wantPending: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
"low": {t2, t1},
"low": {t3},
},
wantActive: map[string][]*base.TaskMessage{
"default": {t3},
"default": {t1},
"critical": {},
"low": {},
},
wantDeadlines: map[string][]base.Z{
"default": {{Message: t3, Score: t3Deadline}},
"default": {{Message: t1, Score: t1Deadline}},
"critical": {},
"low": {},
},
@@ -319,7 +331,7 @@ func TestDequeue(t *testing.T) {
for queue, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff)
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.PendingKey(queue), diff)
}
}
for queue, want := range tc.wantActive {
@@ -438,7 +450,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) {
for queue, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, queue)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff)
t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.PendingKey(queue), diff)
}
}
for queue, want := range tc.wantActive {
@@ -485,7 +497,7 @@ func TestDone(t *testing.T) {
tests := []struct {
desc string
inProgress map[string][]*base.TaskMessage // initial state of the active list
active map[string][]*base.TaskMessage // initial state of the active list
deadlines map[string][]base.Z // initial state of deadlines set
target *base.TaskMessage // task to remove
wantActive map[string][]*base.TaskMessage // final state of the active list
@@ -493,7 +505,7 @@ func TestDone(t *testing.T) {
}{
{
desc: "removes message from the correct queue",
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {t1},
"custom": {t2},
},
@@ -513,7 +525,7 @@ func TestDone(t *testing.T) {
},
{
desc: "with one queue",
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {t1},
},
deadlines: map[string][]base.Z{
@@ -529,7 +541,7 @@ func TestDone(t *testing.T) {
},
{
desc: "with multiple messages in a queue",
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {t1, t3},
"custom": {t2},
},
@@ -552,8 +564,8 @@ func TestDone(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
for _, msgs := range tc.inProgress {
h.SeedAllActiveQueues(t, r.client, tc.active)
for _, msgs := range tc.active {
for _, msg := range msgs {
// Set uniqueness lock if unique key is present.
if len(msg.UniqueKey) > 0 {
@@ -634,7 +646,7 @@ func TestRequeue(t *testing.T) {
tests := []struct {
pending map[string][]*base.TaskMessage // initial state of queues
inProgress map[string][]*base.TaskMessage // initial state of the active list
active map[string][]*base.TaskMessage // initial state of the active list
deadlines map[string][]base.Z // initial state of the deadlines set
target *base.TaskMessage // task to requeue
wantPending map[string][]*base.TaskMessage // final state of queues
@@ -645,7 +657,7 @@ func TestRequeue(t *testing.T) {
pending: map[string][]*base.TaskMessage{
"default": {},
},
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {t1, t2},
},
deadlines: map[string][]base.Z{
@@ -671,7 +683,7 @@ func TestRequeue(t *testing.T) {
pending: map[string][]*base.TaskMessage{
"default": {t1},
},
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {t2},
},
deadlines: map[string][]base.Z{
@@ -695,7 +707,7 @@ func TestRequeue(t *testing.T) {
"default": {t1},
"critical": {},
},
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {t2},
"critical": {t3},
},
@@ -722,7 +734,7 @@ func TestRequeue(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllPendingQueues(t, r.client, tc.pending)
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
h.SeedAllActiveQueues(t, r.client, tc.active)
h.SeedAllDeadlines(t, r.client, tc.deadlines)
err := r.Requeue(tc.target)
@@ -734,7 +746,7 @@ func TestRequeue(t *testing.T) {
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff)
}
}
for qname, want := range tc.wantActive {
@@ -755,12 +767,12 @@ func TestRequeue(t *testing.T) {
func TestSchedule(t *testing.T) {
r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})
msg := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})
tests := []struct {
msg *base.TaskMessage
processAt time.Time
}{
{t1, time.Now().Add(15 * time.Minute)},
{msg, time.Now().Add(15 * time.Minute)},
}
for _, tc := range tests {
@@ -886,7 +898,7 @@ func TestRetry(t *testing.T) {
errMsg := "SMTP server is not responding"
tests := []struct {
inProgress map[string][]*base.TaskMessage
active map[string][]*base.TaskMessage
deadlines map[string][]base.Z
retry map[string][]base.Z
msg *base.TaskMessage
@@ -897,7 +909,7 @@ func TestRetry(t *testing.T) {
wantRetry map[string][]base.Z
}{
{
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {t1, t2},
},
deadlines: map[string][]base.Z{
@@ -923,7 +935,7 @@ func TestRetry(t *testing.T) {
},
},
{
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {t1, t2},
"custom": {t4},
},
@@ -957,7 +969,7 @@ func TestRetry(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client)
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
h.SeedAllActiveQueues(t, r.client, tc.active)
h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllRetryQueues(t, r.client, tc.retry)
@@ -1056,7 +1068,7 @@ func TestArchive(t *testing.T) {
// TODO(hibiken): add test cases for trimming
tests := []struct {
inProgress map[string][]*base.TaskMessage
active map[string][]*base.TaskMessage
deadlines map[string][]base.Z
archived map[string][]base.Z
target *base.TaskMessage // task to archive
@@ -1065,7 +1077,7 @@ func TestArchive(t *testing.T) {
wantArchived map[string][]base.Z
}{
{
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {t1, t2},
},
deadlines: map[string][]base.Z{
@@ -1094,7 +1106,7 @@ func TestArchive(t *testing.T) {
},
},
{
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {t1, t2, t3},
},
deadlines: map[string][]base.Z{
@@ -1124,7 +1136,7 @@ func TestArchive(t *testing.T) {
},
},
{
inProgress: map[string][]*base.TaskMessage{
active: map[string][]*base.TaskMessage{
"default": {t1},
"custom": {t4},
},
@@ -1160,7 +1172,7 @@ func TestArchive(t *testing.T) {
for _, tc := range tests {
h.FlushDB(t, r.client) // clean up db before each test case
h.SeedAllActiveQueues(t, r.client, tc.inProgress)
h.SeedAllActiveQueues(t, r.client, tc.active)
h.SeedAllDeadlines(t, r.client, tc.deadlines)
h.SeedAllArchivedQueues(t, r.client, tc.archived)
@@ -1211,7 +1223,7 @@ func TestArchive(t *testing.T) {
}
}
func TestCheckAndEnqueue(t *testing.T) {
func TestForwardIfReady(t *testing.T) {
r := setup(t)
defer r.Close()
t1 := h.NewTaskMessage("send_email", nil)
@@ -1328,7 +1340,7 @@ func TestCheckAndEnqueue(t *testing.T) {
h.SeedAllScheduledQueues(t, r.client, tc.scheduled)
h.SeedAllRetryQueues(t, r.client, tc.retry)
err := r.CheckAndEnqueue(tc.qnames...)
err := r.ForwardIfReady(tc.qnames...)
if err != nil {
t.Errorf("(*RDB).CheckScheduled(%v) = %v, want nil", tc.qnames, err)
continue
@@ -1337,7 +1349,7 @@ func TestCheckAndEnqueue(t *testing.T) {
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r.client, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.QueueKey(qname), diff)
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.PendingKey(qname), diff)
}
}
for qname, want := range tc.wantScheduled {
@@ -1462,7 +1474,7 @@ func TestWriteServerState(t *testing.T) {
Concurrency: 10,
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
StrictPriority: false,
Started: time.Now(),
Started: time.Now().UTC(),
Status: "running",
ActiveWorkerCount: 0,
}
@@ -1475,12 +1487,11 @@ func TestWriteServerState(t *testing.T) {
// Check ServerInfo was written correctly.
skey := base.ServerInfoKey(host, pid, serverID)
data := r.client.Get(skey).Val()
var got base.ServerInfo
err = json.Unmarshal([]byte(data), &got)
got, err := base.DecodeServerInfo([]byte(data))
if err != nil {
t.Fatalf("could not decode json: %v", err)
t.Fatalf("could not decode server info: %v", err)
}
if diff := cmp.Diff(info, got); diff != "" {
if diff := cmp.Diff(info, *got); diff != "" {
t.Errorf("persisted ServerInfo was %v, want %v; (-want,+got)\n%s",
got, info, diff)
}
@@ -1553,7 +1564,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
Concurrency: 10,
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
StrictPriority: false,
Started: time.Now().Add(-10 * time.Minute),
Started: time.Now().Add(-10 * time.Minute).UTC(),
Status: "running",
ActiveWorkerCount: len(workers),
}
@@ -1566,12 +1577,11 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
// Check ServerInfo was written correctly.
skey := base.ServerInfoKey(host, pid, serverID)
data := r.client.Get(skey).Val()
var got base.ServerInfo
err = json.Unmarshal([]byte(data), &got)
got, err := base.DecodeServerInfo([]byte(data))
if err != nil {
t.Fatalf("could not decode json: %v", err)
t.Fatalf("could not decode server info: %v", err)
}
if diff := cmp.Diff(serverInfo, got); diff != "" {
if diff := cmp.Diff(serverInfo, *got); diff != "" {
t.Errorf("persisted ServerInfo was %v, want %v; (-want,+got)\n%s",
got, serverInfo, diff)
}
@@ -1595,11 +1605,11 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
}
var gotWorkers []*base.WorkerInfo
for _, val := range wdata {
var w base.WorkerInfo
if err := json.Unmarshal([]byte(val), &w); err != nil {
w, err := base.DecodeWorkerInfo([]byte(val))
if err != nil {
t.Fatalf("could not unmarshal worker's data: %v", err)
}
gotWorkers = append(gotWorkers, &w)
gotWorkers = append(gotWorkers, w)
}
if diff := cmp.Diff(workers, gotWorkers, h.SortWorkerInfoOpt); diff != "" {
t.Errorf("persisted workers info was %v, want %v; (-want,+got)\n%s",