2020-01-03 10:13:16 +08:00
|
|
|
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
|
|
|
// Use of this source code is governed by a MIT license
|
|
|
|
// that can be found in the LICENSE file.
|
|
|
|
|
2019-12-04 22:25:58 +08:00
|
|
|
// Package rdb encapsulates the interactions with redis.
|
2019-12-04 13:01:26 +08:00
|
|
|
package rdb
|
2019-11-20 11:44:41 +08:00
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/go-redis/redis/v7"
|
2019-12-22 23:15:45 +08:00
|
|
|
"github.com/hibiken/asynq/internal/base"
|
2020-01-07 22:28:34 +08:00
|
|
|
"github.com/spf13/cast"
|
2019-11-20 11:44:41 +08:00
|
|
|
)
|
|
|
|
|
2019-12-08 22:46:04 +08:00
|
|
|
var (
|
2020-01-07 22:28:34 +08:00
|
|
|
// ErrNoProcessableTask indicates that there are no tasks ready to be processed.
|
|
|
|
ErrNoProcessableTask = errors.New("no tasks are ready for processing")
|
2019-12-08 22:46:04 +08:00
|
|
|
|
|
|
|
// ErrTaskNotFound indicates that a task that matches the given identifier was not found.
|
|
|
|
ErrTaskNotFound = errors.New("could not find a task")
|
2020-03-18 21:49:39 +08:00
|
|
|
|
|
|
|
// ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock.
|
|
|
|
ErrDuplicateTask = errors.New("task already exists")
|
2019-12-08 22:46:04 +08:00
|
|
|
)
|
2019-11-20 11:44:41 +08:00
|
|
|
|
2019-12-23 21:33:48 +08:00
|
|
|
const statsTTL = 90 * 24 * time.Hour // 90 days
|
|
|
|
|
2019-12-04 22:25:58 +08:00
|
|
|
// RDB is a client interface to query and mutate task queues.
|
2019-12-04 13:01:26 +08:00
|
|
|
type RDB struct {
|
2019-11-20 11:44:41 +08:00
|
|
|
client *redis.Client
|
|
|
|
}
|
|
|
|
|
2019-12-04 13:01:26 +08:00
|
|
|
// NewRDB returns a new instance of RDB.
|
|
|
|
func NewRDB(client *redis.Client) *RDB {
|
|
|
|
return &RDB{client}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close closes the connection with redis server.
|
|
|
|
func (r *RDB) Close() error {
|
|
|
|
return r.client.Close()
|
2019-11-20 11:44:41 +08:00
|
|
|
}
|
|
|
|
|
2020-02-09 03:06:14 +08:00
|
|
|
// KEYS[1] -> asynq:queues:<qname>
|
|
|
|
// KEYS[2] -> asynq:queues
|
|
|
|
// ARGV[1] -> task message data
|
|
|
|
var enqueueCmd = redis.NewScript(`
|
|
|
|
redis.call("LPUSH", KEYS[1], ARGV[1])
|
|
|
|
redis.call("SADD", KEYS[2], KEYS[1])
|
|
|
|
return 1`)
|
|
|
|
|
2019-12-28 12:37:15 +08:00
|
|
|
// Enqueue inserts the given task to the tail of the queue.
|
2019-12-22 23:15:45 +08:00
|
|
|
func (r *RDB) Enqueue(msg *base.TaskMessage) error {
|
2019-11-20 11:44:41 +08:00
|
|
|
bytes, err := json.Marshal(msg)
|
|
|
|
if err != nil {
|
2019-12-28 12:37:15 +08:00
|
|
|
return err
|
2019-11-20 11:44:41 +08:00
|
|
|
}
|
2020-01-07 13:27:51 +08:00
|
|
|
key := base.QueueKey(msg.Queue)
|
2020-02-09 03:06:14 +08:00
|
|
|
return enqueueCmd.Run(r.client, []string{key, base.AllQueues}, bytes).Err()
|
2019-11-20 11:44:41 +08:00
|
|
|
}
|
|
|
|
|
2020-03-18 21:49:39 +08:00
|
|
|
// KEYS[1] -> unique key in the form <type>:<payload>:<qname>
|
|
|
|
// KEYS[2] -> asynq:queues:<qname>
|
|
|
|
// KEYS[2] -> asynq:queues
|
|
|
|
// ARGV[1] -> task ID
|
|
|
|
// ARGV[2] -> uniqueness lock TTL
|
|
|
|
// ARGV[3] -> task message data
|
|
|
|
var enqueueUniqueCmd = redis.NewScript(`
|
|
|
|
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
|
|
|
|
if not ok then
|
|
|
|
return 0
|
|
|
|
end
|
|
|
|
redis.call("LPUSH", KEYS[2], ARGV[3])
|
|
|
|
redis.call("SADD", KEYS[3], KEYS[2])
|
|
|
|
return 1
|
|
|
|
`)
|
|
|
|
|
|
|
|
// EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired.
|
|
|
|
// It returns ErrDuplicateTask if the lock cannot be acquired.
|
|
|
|
func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
|
|
|
|
bytes, err := json.Marshal(msg)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
key := base.QueueKey(msg.Queue)
|
|
|
|
res, err := enqueueUniqueCmd.Run(r.client,
|
|
|
|
[]string{msg.UniqueKey, key, base.AllQueues},
|
|
|
|
msg.ID.String(), int(ttl.Seconds()), bytes).Result()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
n, ok := res.(int64)
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("could not cast %v to int64", res)
|
|
|
|
}
|
|
|
|
if n == 0 {
|
|
|
|
return ErrDuplicateTask
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-02-09 03:06:14 +08:00
|
|
|
// Dequeue queries given queues in order and pops a task message if there is one and returns it.
|
2020-06-04 20:37:17 +08:00
|
|
|
// Dequeue skips a queue if the queue is paused.
|
2020-02-09 03:06:14 +08:00
|
|
|
// If all queues are empty, ErrNoProcessableTask error is returned.
|
2020-01-07 22:28:34 +08:00
|
|
|
func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) {
|
2020-06-04 20:37:17 +08:00
|
|
|
var keys []string
|
|
|
|
for _, q := range qnames {
|
|
|
|
keys = append(keys, base.QueueKey(q))
|
2020-01-07 22:28:34 +08:00
|
|
|
}
|
2020-06-04 20:37:17 +08:00
|
|
|
data, err := r.dequeue(keys...)
|
2019-11-28 11:36:56 +08:00
|
|
|
if err == redis.Nil {
|
2020-01-07 22:28:34 +08:00
|
|
|
return nil, ErrNoProcessableTask
|
2019-11-28 11:36:56 +08:00
|
|
|
}
|
2019-11-20 11:44:41 +08:00
|
|
|
if err != nil {
|
2019-12-28 12:37:15 +08:00
|
|
|
return nil, err
|
2019-11-20 11:44:41 +08:00
|
|
|
}
|
2019-12-22 23:15:45 +08:00
|
|
|
var msg base.TaskMessage
|
2019-11-20 11:44:41 +08:00
|
|
|
err = json.Unmarshal([]byte(data), &msg)
|
|
|
|
if err != nil {
|
2019-12-28 12:37:15 +08:00
|
|
|
return nil, err
|
2019-11-20 11:44:41 +08:00
|
|
|
}
|
|
|
|
return &msg, nil
|
|
|
|
}
|
|
|
|
|
2020-02-09 03:06:14 +08:00
|
|
|
// KEYS[1] -> asynq:in_progress
|
2020-06-04 20:37:17 +08:00
|
|
|
// KEYS[2] -> asynq:paused
|
2020-02-09 03:06:14 +08:00
|
|
|
// ARGV -> List of queues to query in order
|
2020-06-04 20:37:17 +08:00
|
|
|
//
|
|
|
|
// dequeueCmd checks whether a queue is paused first, before
|
|
|
|
// calling RPOPLPUSH to pop a task from the queue.
|
2020-02-09 03:06:14 +08:00
|
|
|
var dequeueCmd = redis.NewScript(`
|
|
|
|
for _, qkey in ipairs(ARGV) do
|
2020-06-04 20:37:17 +08:00
|
|
|
if redis.call("SISMEMBER", KEYS[2], qkey) == 0 then
|
|
|
|
local res = redis.call("RPOPLPUSH", qkey, KEYS[1])
|
|
|
|
if res then
|
|
|
|
return res
|
|
|
|
end
|
2020-02-09 03:06:14 +08:00
|
|
|
end
|
|
|
|
end
|
2020-06-04 20:37:17 +08:00
|
|
|
return nil`)
|
2020-02-09 03:06:14 +08:00
|
|
|
|
2020-01-07 22:28:34 +08:00
|
|
|
func (r *RDB) dequeue(queues ...string) (data string, err error) {
|
|
|
|
var args []interface{}
|
|
|
|
for _, qkey := range queues {
|
|
|
|
args = append(args, qkey)
|
|
|
|
}
|
2020-06-04 20:37:17 +08:00
|
|
|
res, err := dequeueCmd.Run(r.client,
|
|
|
|
[]string{base.InProgressQueue, base.PausedQueues}, args...).Result()
|
2020-01-07 22:28:34 +08:00
|
|
|
if err != nil {
|
|
|
|
return "", err
|
|
|
|
}
|
|
|
|
return cast.ToStringE(res)
|
|
|
|
}
|
|
|
|
|
2020-02-09 03:06:14 +08:00
|
|
|
// KEYS[1] -> asynq:in_progress
|
|
|
|
// KEYS[2] -> asynq:processed:<yyyy-mm-dd>
|
2020-03-18 21:49:39 +08:00
|
|
|
// KEYS[3] -> unique key in the format <type>:<payload>:<qname>
|
2020-02-09 03:06:14 +08:00
|
|
|
// ARGV[1] -> base.TaskMessage value
|
|
|
|
// ARGV[2] -> stats expiration timestamp
|
2020-03-18 21:49:39 +08:00
|
|
|
// ARGV[3] -> task ID
|
2020-02-09 03:06:14 +08:00
|
|
|
// Note: LREM count ZERO means "remove all elements equal to val"
|
|
|
|
var doneCmd = redis.NewScript(`
|
|
|
|
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
|
|
|
local n = redis.call("INCR", KEYS[2])
|
|
|
|
if tonumber(n) == 1 then
|
|
|
|
redis.call("EXPIREAT", KEYS[2], ARGV[2])
|
|
|
|
end
|
2020-03-18 21:49:39 +08:00
|
|
|
if string.len(KEYS[3]) > 0 and redis.call("GET", KEYS[3]) == ARGV[3] then
|
|
|
|
redis.call("DEL", KEYS[3])
|
|
|
|
end
|
2020-02-09 03:06:14 +08:00
|
|
|
return redis.status_reply("OK")
|
|
|
|
`)
|
|
|
|
|
2019-12-04 22:33:05 +08:00
|
|
|
// Done removes the task from in-progress queue to mark the task as done.
|
2020-03-18 21:49:39 +08:00
|
|
|
// It removes a uniqueness lock acquired by the task, if any.
|
2019-12-22 23:15:45 +08:00
|
|
|
func (r *RDB) Done(msg *base.TaskMessage) error {
|
2019-11-22 13:45:27 +08:00
|
|
|
bytes, err := json.Marshal(msg)
|
|
|
|
if err != nil {
|
2019-12-28 12:37:15 +08:00
|
|
|
return err
|
2019-11-22 13:45:27 +08:00
|
|
|
}
|
2019-12-23 21:33:48 +08:00
|
|
|
now := time.Now()
|
|
|
|
processedKey := base.ProcessedKey(now)
|
|
|
|
expireAt := now.Add(statsTTL)
|
2020-02-09 03:06:14 +08:00
|
|
|
return doneCmd.Run(r.client,
|
2020-03-18 21:49:39 +08:00
|
|
|
[]string{base.InProgressQueue, processedKey, msg.UniqueKey},
|
|
|
|
bytes, expireAt.Unix(), msg.ID.String()).Err()
|
2019-11-22 13:45:27 +08:00
|
|
|
}
|
|
|
|
|
2020-02-09 03:06:14 +08:00
|
|
|
// KEYS[1] -> asynq:in_progress
|
|
|
|
// KEYS[2] -> asynq:queues:<qname>
|
|
|
|
// ARGV[1] -> base.TaskMessage value
|
|
|
|
// Note: Use RPUSH to push to the head of the queue.
|
|
|
|
var requeueCmd = redis.NewScript(`
|
|
|
|
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
|
|
|
redis.call("RPUSH", KEYS[2], ARGV[1])
|
|
|
|
return redis.status_reply("OK")`)
|
|
|
|
|
|
|
|
// Requeue moves the task from in-progress queue to the specified queue.
|
2019-12-22 23:15:45 +08:00
|
|
|
func (r *RDB) Requeue(msg *base.TaskMessage) error {
|
2019-12-18 12:07:17 +08:00
|
|
|
bytes, err := json.Marshal(msg)
|
|
|
|
if err != nil {
|
2019-12-28 12:37:15 +08:00
|
|
|
return err
|
2019-12-18 12:07:17 +08:00
|
|
|
}
|
2020-02-09 03:06:14 +08:00
|
|
|
return requeueCmd.Run(r.client,
|
2020-01-27 05:41:06 +08:00
|
|
|
[]string{base.InProgressQueue, base.QueueKey(msg.Queue)},
|
2019-12-28 12:37:15 +08:00
|
|
|
string(bytes)).Err()
|
2019-12-18 12:07:17 +08:00
|
|
|
}
|
|
|
|
|
2020-03-18 21:49:39 +08:00
|
|
|
// KEYS[1] -> asynq:scheduled
|
|
|
|
// KEYS[2] -> asynq:queues
|
|
|
|
// ARGV[1] -> score (process_at timestamp)
|
|
|
|
// ARGV[2] -> task message
|
|
|
|
// ARGV[3] -> queue key
|
|
|
|
var scheduleCmd = redis.NewScript(`
|
|
|
|
redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2])
|
|
|
|
redis.call("SADD", KEYS[2], ARGV[3])
|
|
|
|
return 1
|
|
|
|
`)
|
|
|
|
|
2019-12-04 22:45:30 +08:00
|
|
|
// Schedule adds the task to the backlog queue to be processed in the future.
|
2019-12-22 23:15:45 +08:00
|
|
|
func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error {
|
2019-12-16 09:18:43 +08:00
|
|
|
bytes, err := json.Marshal(msg)
|
|
|
|
if err != nil {
|
2019-12-28 12:37:15 +08:00
|
|
|
return err
|
2019-12-16 09:18:43 +08:00
|
|
|
}
|
2020-03-18 21:49:39 +08:00
|
|
|
qkey := base.QueueKey(msg.Queue)
|
|
|
|
score := float64(processAt.Unix())
|
|
|
|
return scheduleCmd.Run(r.client,
|
|
|
|
[]string{base.ScheduledQueue, base.AllQueues},
|
|
|
|
score, bytes, qkey).Err()
|
|
|
|
}
|
|
|
|
|
|
|
|
// KEYS[1] -> unique key in the format <type>:<payload>:<qname>
|
|
|
|
// KEYS[2] -> asynq:scheduled
|
|
|
|
// KEYS[3] -> asynq:queues
|
|
|
|
// ARGV[1] -> task ID
|
|
|
|
// ARGV[2] -> uniqueness lock TTL
|
|
|
|
// ARGV[3] -> score (process_at timestamp)
|
|
|
|
// ARGV[4] -> task message
|
|
|
|
// ARGV[5] -> queue key
|
|
|
|
var scheduleUniqueCmd = redis.NewScript(`
|
|
|
|
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
|
|
|
|
if not ok then
|
|
|
|
return 0
|
|
|
|
end
|
|
|
|
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[4])
|
|
|
|
redis.call("SADD", KEYS[3], ARGV[5])
|
|
|
|
return 1
|
|
|
|
`)
|
|
|
|
|
2020-03-22 02:44:26 +08:00
|
|
|
// ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired.
|
2020-03-18 21:49:39 +08:00
|
|
|
// It returns ErrDuplicateTask if the lock cannot be acquired.
|
|
|
|
func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error {
|
|
|
|
bytes, err := json.Marshal(msg)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
qkey := base.QueueKey(msg.Queue)
|
2019-12-16 09:18:43 +08:00
|
|
|
score := float64(processAt.Unix())
|
2020-03-18 21:49:39 +08:00
|
|
|
res, err := scheduleUniqueCmd.Run(r.client,
|
|
|
|
[]string{msg.UniqueKey, base.ScheduledQueue, base.AllQueues},
|
|
|
|
msg.ID.String(), int(ttl.Seconds()), score, bytes, qkey).Result()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
n, ok := res.(int64)
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("could not cast %v to int64", res)
|
|
|
|
}
|
|
|
|
if n == 0 {
|
|
|
|
return ErrDuplicateTask
|
|
|
|
}
|
|
|
|
return nil
|
2019-12-04 22:45:30 +08:00
|
|
|
}
|
|
|
|
|
2020-02-09 03:06:14 +08:00
|
|
|
// KEYS[1] -> asynq:in_progress
|
|
|
|
// KEYS[2] -> asynq:retry
|
|
|
|
// KEYS[3] -> asynq:processed:<yyyy-mm-dd>
|
|
|
|
// KEYS[4] -> asynq:failure:<yyyy-mm-dd>
|
|
|
|
// ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue
|
|
|
|
// ARGV[2] -> base.TaskMessage value to add to Retry queue
|
|
|
|
// ARGV[3] -> retry_at UNIX timestamp
|
|
|
|
// ARGV[4] -> stats expiration timestamp
|
|
|
|
var retryCmd = redis.NewScript(`
|
|
|
|
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
|
|
|
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
|
|
|
|
local n = redis.call("INCR", KEYS[3])
|
|
|
|
if tonumber(n) == 1 then
|
|
|
|
redis.call("EXPIREAT", KEYS[3], ARGV[4])
|
|
|
|
end
|
|
|
|
local m = redis.call("INCR", KEYS[4])
|
|
|
|
if tonumber(m) == 1 then
|
|
|
|
redis.call("EXPIREAT", KEYS[4], ARGV[4])
|
|
|
|
end
|
|
|
|
return redis.status_reply("OK")`)
|
|
|
|
|
2019-12-16 08:15:07 +08:00
|
|
|
// Retry moves the task from in-progress to retry queue, incrementing retry count
|
|
|
|
// and assigning error message to the task message.
|
2019-12-22 23:15:45 +08:00
|
|
|
func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error {
|
2019-12-16 12:05:56 +08:00
|
|
|
bytesToRemove, err := json.Marshal(msg)
|
2019-12-16 08:15:07 +08:00
|
|
|
if err != nil {
|
2019-12-29 02:51:37 +08:00
|
|
|
return err
|
2019-12-16 08:15:07 +08:00
|
|
|
}
|
2019-12-16 12:05:56 +08:00
|
|
|
modified := *msg
|
|
|
|
modified.Retried++
|
|
|
|
modified.ErrorMsg = errMsg
|
|
|
|
bytesToAdd, err := json.Marshal(&modified)
|
|
|
|
if err != nil {
|
2019-12-29 02:51:37 +08:00
|
|
|
return err
|
2019-12-16 12:05:56 +08:00
|
|
|
}
|
2019-12-23 21:33:48 +08:00
|
|
|
now := time.Now()
|
|
|
|
processedKey := base.ProcessedKey(now)
|
|
|
|
failureKey := base.FailureKey(now)
|
|
|
|
expireAt := now.Add(statsTTL)
|
2020-02-09 03:06:14 +08:00
|
|
|
return retryCmd.Run(r.client,
|
2019-12-23 21:33:48 +08:00
|
|
|
[]string{base.InProgressQueue, base.RetryQueue, processedKey, failureKey},
|
2019-12-29 02:51:37 +08:00
|
|
|
string(bytesToRemove), string(bytesToAdd), processAt.Unix(), expireAt.Unix()).Err()
|
2019-12-16 08:15:07 +08:00
|
|
|
}
|
|
|
|
|
2019-12-26 23:17:26 +08:00
|
|
|
const (
|
|
|
|
maxDeadTasks = 10000
|
|
|
|
deadExpirationInDays = 90
|
|
|
|
)
|
|
|
|
|
2020-02-09 03:06:14 +08:00
|
|
|
// KEYS[1] -> asynq:in_progress
|
|
|
|
// KEYS[2] -> asynq:dead
|
|
|
|
// KEYS[3] -> asynq:processed:<yyyy-mm-dd>
|
|
|
|
// KEYS[4] -> asynq.failure:<yyyy-mm-dd>
|
|
|
|
// ARGV[1] -> base.TaskMessage value to remove from base.InProgressQueue queue
|
|
|
|
// ARGV[2] -> base.TaskMessage value to add to Dead queue
|
|
|
|
// ARGV[3] -> died_at UNIX timestamp
|
|
|
|
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
|
|
|
|
// ARGV[5] -> max number of tasks in dead queue (e.g., 100)
|
|
|
|
// ARGV[6] -> stats expiration timestamp
|
|
|
|
var killCmd = redis.NewScript(`
|
|
|
|
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
|
|
|
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
|
|
|
|
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4])
|
|
|
|
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5])
|
|
|
|
local n = redis.call("INCR", KEYS[3])
|
|
|
|
if tonumber(n) == 1 then
|
|
|
|
redis.call("EXPIREAT", KEYS[3], ARGV[6])
|
|
|
|
end
|
|
|
|
local m = redis.call("INCR", KEYS[4])
|
|
|
|
if tonumber(m) == 1 then
|
|
|
|
redis.call("EXPIREAT", KEYS[4], ARGV[6])
|
|
|
|
end
|
|
|
|
return redis.status_reply("OK")`)
|
|
|
|
|
2019-12-16 09:16:13 +08:00
|
|
|
// Kill sends the task to "dead" queue from in-progress queue, assigning
|
|
|
|
// the error message to the task.
|
2019-12-04 22:50:52 +08:00
|
|
|
// It also trims the set by timestamp and set size.
|
2019-12-22 23:15:45 +08:00
|
|
|
func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
|
2019-12-16 12:05:56 +08:00
|
|
|
bytesToRemove, err := json.Marshal(msg)
|
2019-11-20 11:44:41 +08:00
|
|
|
if err != nil {
|
2019-12-29 02:51:37 +08:00
|
|
|
return err
|
2019-11-20 11:44:41 +08:00
|
|
|
}
|
2019-12-16 12:05:56 +08:00
|
|
|
modified := *msg
|
|
|
|
modified.ErrorMsg = errMsg
|
|
|
|
bytesToAdd, err := json.Marshal(&modified)
|
|
|
|
if err != nil {
|
2019-12-29 02:51:37 +08:00
|
|
|
return err
|
2019-12-16 12:05:56 +08:00
|
|
|
}
|
2019-11-20 11:44:41 +08:00
|
|
|
now := time.Now()
|
|
|
|
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
2019-12-23 21:33:48 +08:00
|
|
|
processedKey := base.ProcessedKey(now)
|
|
|
|
failureKey := base.FailureKey(now)
|
|
|
|
expireAt := now.Add(statsTTL)
|
2020-02-09 03:06:14 +08:00
|
|
|
return killCmd.Run(r.client,
|
2019-12-23 21:33:48 +08:00
|
|
|
[]string{base.InProgressQueue, base.DeadQueue, processedKey, failureKey},
|
2019-12-29 02:51:37 +08:00
|
|
|
string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Err()
|
2019-11-20 11:44:41 +08:00
|
|
|
}
|
|
|
|
|
2020-02-09 03:06:14 +08:00
|
|
|
// KEYS[1] -> asynq:in_progress
|
|
|
|
// ARGV[1] -> queue prefix
|
|
|
|
var requeueAllCmd = redis.NewScript(`
|
|
|
|
local msgs = redis.call("LRANGE", KEYS[1], 0, -1)
|
|
|
|
for _, msg in ipairs(msgs) do
|
|
|
|
local decoded = cjson.decode(msg)
|
|
|
|
local qkey = ARGV[1] .. decoded["Queue"]
|
|
|
|
redis.call("RPUSH", qkey, msg)
|
|
|
|
redis.call("LREM", KEYS[1], 0, msg)
|
|
|
|
end
|
|
|
|
return table.getn(msgs)`)
|
|
|
|
|
|
|
|
// RequeueAll moves all tasks from in-progress list to the queue
|
2019-12-19 10:55:08 +08:00
|
|
|
// and reports the number of tasks restored.
|
2020-02-09 03:06:14 +08:00
|
|
|
func (r *RDB) RequeueAll() (int64, error) {
|
|
|
|
res, err := requeueAllCmd.Run(r.client, []string{base.InProgressQueue}, base.QueuePrefix).Result()
|
2019-12-19 10:55:08 +08:00
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
n, ok := res.(int64)
|
|
|
|
if !ok {
|
|
|
|
return 0, fmt.Errorf("could not cast %v to int64", res)
|
|
|
|
}
|
|
|
|
return n, nil
|
2019-11-24 07:09:50 +08:00
|
|
|
}
|
2019-11-25 23:09:39 +08:00
|
|
|
|
2019-12-04 23:28:57 +08:00
|
|
|
// CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that
|
|
|
|
// have to be processed.
|
2020-01-14 23:26:41 +08:00
|
|
|
//
|
|
|
|
// qnames specifies to which queues to send tasks.
|
|
|
|
func (r *RDB) CheckAndEnqueue(qnames ...string) error {
|
2019-12-22 23:15:45 +08:00
|
|
|
delayed := []string{base.ScheduledQueue, base.RetryQueue}
|
2019-12-04 23:14:37 +08:00
|
|
|
for _, zset := range delayed {
|
2020-01-14 23:26:41 +08:00
|
|
|
var err error
|
|
|
|
if len(qnames) == 1 {
|
|
|
|
err = r.forwardSingle(zset, base.QueueKey(qnames[0]))
|
|
|
|
} else {
|
|
|
|
err = r.forward(zset)
|
|
|
|
}
|
|
|
|
if err != nil {
|
2019-12-04 23:14:37 +08:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-02-09 03:06:14 +08:00
|
|
|
// KEYS[1] -> source queue (e.g. scheduled or retry queue)
|
|
|
|
// ARGV[1] -> current unix time
|
|
|
|
// ARGV[2] -> queue prefix
|
|
|
|
var forwardCmd = redis.NewScript(`
|
|
|
|
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
|
|
|
|
for _, msg in ipairs(msgs) do
|
|
|
|
local decoded = cjson.decode(msg)
|
|
|
|
local qkey = ARGV[2] .. decoded["Queue"]
|
|
|
|
redis.call("LPUSH", qkey, msg)
|
|
|
|
redis.call("ZREM", KEYS[1], msg)
|
|
|
|
end
|
|
|
|
return msgs`)
|
|
|
|
|
2019-12-08 22:46:04 +08:00
|
|
|
// forward moves all tasks with a score less than the current unix time
|
2020-01-14 23:26:41 +08:00
|
|
|
// from the src zset.
|
|
|
|
func (r *RDB) forward(src string) error {
|
2019-11-25 23:09:39 +08:00
|
|
|
now := float64(time.Now().Unix())
|
2020-02-09 03:06:14 +08:00
|
|
|
return forwardCmd.Run(r.client,
|
2020-01-14 23:26:41 +08:00
|
|
|
[]string{src}, now, base.QueuePrefix).Err()
|
|
|
|
}
|
|
|
|
|
2020-02-09 03:06:14 +08:00
|
|
|
// KEYS[1] -> source queue (e.g. scheduled or retry queue)
|
|
|
|
// KEYS[2] -> destination queue
|
|
|
|
var forwardSingleCmd = redis.NewScript(`
|
|
|
|
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
|
|
|
|
for _, msg in ipairs(msgs) do
|
|
|
|
redis.call("LPUSH", KEYS[2], msg)
|
|
|
|
redis.call("ZREM", KEYS[1], msg)
|
|
|
|
end
|
|
|
|
return msgs`)
|
|
|
|
|
2020-01-14 23:26:41 +08:00
|
|
|
// forwardSingle moves all tasks with a score less than the current unix time
|
|
|
|
// from the src zset to dst list.
|
|
|
|
func (r *RDB) forwardSingle(src, dst string) error {
|
|
|
|
now := float64(time.Now().Unix())
|
2020-02-09 03:06:14 +08:00
|
|
|
return forwardSingleCmd.Run(r.client,
|
2020-01-14 23:26:41 +08:00
|
|
|
[]string{src, dst}, now).Err()
|
2019-12-07 14:29:40 +08:00
|
|
|
}
|
2020-01-31 22:48:58 +08:00
|
|
|
|
2020-04-13 07:42:11 +08:00
|
|
|
// KEYS[1] -> asynq:servers:<host:pid:sid>
|
|
|
|
// KEYS[2] -> asynq:servers
|
|
|
|
// KEYS[3] -> asynq:workers<host:pid:sid>
|
2020-05-19 11:47:35 +08:00
|
|
|
// KEYS[4] -> asynq:workers
|
2020-02-21 23:18:22 +08:00
|
|
|
// ARGV[1] -> expiration time
|
|
|
|
// ARGV[2] -> TTL in seconds
|
2020-05-19 11:47:35 +08:00
|
|
|
// ARGV[3] -> server info
|
2020-02-21 23:18:22 +08:00
|
|
|
// ARGV[4:] -> alternate key-value pair of (worker id, worker data)
|
|
|
|
// Note: Add key to ZSET with expiration time as score.
|
|
|
|
// ref: https://github.com/antirez/redis/issues/135#issuecomment-2361996
|
2020-05-19 11:47:35 +08:00
|
|
|
var writeServerStateCmd = redis.NewScript(`
|
2020-02-21 23:18:22 +08:00
|
|
|
redis.call("SETEX", KEYS[1], ARGV[2], ARGV[3])
|
|
|
|
redis.call("ZADD", KEYS[2], ARGV[1], KEYS[1])
|
2020-02-23 13:06:02 +08:00
|
|
|
redis.call("DEL", KEYS[3])
|
2020-02-21 23:18:22 +08:00
|
|
|
for i = 4, table.getn(ARGV)-1, 2 do
|
|
|
|
redis.call("HSET", KEYS[3], ARGV[i], ARGV[i+1])
|
|
|
|
end
|
|
|
|
redis.call("EXPIRE", KEYS[3], ARGV[2])
|
|
|
|
redis.call("ZADD", KEYS[4], ARGV[1], KEYS[3])
|
2020-02-09 03:06:14 +08:00
|
|
|
return redis.status_reply("OK")`)
|
|
|
|
|
2020-05-19 11:47:35 +08:00
|
|
|
// WriteServerState writes server state data to redis with expiration set to the value ttl.
|
|
|
|
func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error {
|
2020-02-21 23:18:22 +08:00
|
|
|
bytes, err := json.Marshal(info)
|
2020-01-31 22:48:58 +08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-02-02 14:22:48 +08:00
|
|
|
exp := time.Now().Add(ttl).UTC()
|
2020-05-19 11:47:35 +08:00
|
|
|
args := []interface{}{float64(exp.Unix()), ttl.Seconds(), bytes} // args to the lua script
|
2020-02-21 23:18:22 +08:00
|
|
|
for _, w := range workers {
|
|
|
|
bytes, err := json.Marshal(w)
|
|
|
|
if err != nil {
|
|
|
|
continue // skip bad data
|
|
|
|
}
|
2020-05-19 11:47:35 +08:00
|
|
|
args = append(args, w.ID, bytes)
|
2020-01-31 22:48:58 +08:00
|
|
|
}
|
2020-04-13 07:42:11 +08:00
|
|
|
skey := base.ServerInfoKey(info.Host, info.PID, info.ServerID)
|
|
|
|
wkey := base.WorkersKey(info.Host, info.PID, info.ServerID)
|
2020-05-19 11:47:35 +08:00
|
|
|
return writeServerStateCmd.Run(r.client,
|
2020-04-13 07:42:11 +08:00
|
|
|
[]string{skey, base.AllServers, wkey, base.AllWorkers},
|
2020-02-21 23:18:22 +08:00
|
|
|
args...).Err()
|
2020-02-02 14:22:48 +08:00
|
|
|
}
|
|
|
|
|
2020-04-13 07:42:11 +08:00
|
|
|
// KEYS[1] -> asynq:servers
|
|
|
|
// KEYS[2] -> asynq:servers:<host:pid:sid>
|
2020-02-21 23:18:22 +08:00
|
|
|
// KEYS[3] -> asynq:workers
|
2020-04-13 07:42:11 +08:00
|
|
|
// KEYS[4] -> asynq:workers<host:pid:sid>
|
2020-02-09 03:06:14 +08:00
|
|
|
var clearProcessInfoCmd = redis.NewScript(`
|
|
|
|
redis.call("ZREM", KEYS[1], KEYS[2])
|
|
|
|
redis.call("DEL", KEYS[2])
|
2020-02-21 23:18:22 +08:00
|
|
|
redis.call("ZREM", KEYS[3], KEYS[4])
|
|
|
|
redis.call("DEL", KEYS[4])
|
2020-02-09 03:06:14 +08:00
|
|
|
return redis.status_reply("OK")`)
|
|
|
|
|
2020-04-13 02:41:50 +08:00
|
|
|
// ClearServerState deletes server state data from redis.
|
2020-05-19 11:47:35 +08:00
|
|
|
func (r *RDB) ClearServerState(host string, pid int, serverID string) error {
|
|
|
|
skey := base.ServerInfoKey(host, pid, serverID)
|
|
|
|
wkey := base.WorkersKey(host, pid, serverID)
|
2020-02-21 23:18:22 +08:00
|
|
|
return clearProcessInfoCmd.Run(r.client,
|
2020-04-13 07:42:11 +08:00
|
|
|
[]string{base.AllServers, skey, base.AllWorkers, wkey}).Err()
|
2020-01-31 22:48:58 +08:00
|
|
|
}
|
2020-02-13 09:12:09 +08:00
|
|
|
|
|
|
|
// CancelationPubSub returns a pubsub for cancelation messages.
|
|
|
|
func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
|
|
|
|
pubsub := r.client.Subscribe(base.CancelChannel)
|
|
|
|
_, err := pubsub.Receive()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return pubsub, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// PublishCancelation publish cancelation message to all subscribers.
|
2020-02-13 09:33:41 +08:00
|
|
|
// The message is the ID for the task to be canceled.
|
2020-02-13 09:12:09 +08:00
|
|
|
func (r *RDB) PublishCancelation(id string) error {
|
|
|
|
return r.client.Publish(base.CancelChannel, id).Err()
|
|
|
|
}
|