2020-01-02 18:13:16 -08:00
|
|
|
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
|
|
|
// Use of this source code is governed by a MIT license
|
|
|
|
// that can be found in the LICENSE file.
|
|
|
|
|
2019-12-04 06:25:58 -08:00
|
|
|
// Package rdb encapsulates the interactions with redis.
|
2019-12-03 21:01:26 -08:00
|
|
|
package rdb
|
2019-11-19 19:44:41 -08:00
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/json"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
2020-06-20 06:29:58 -07:00
|
|
|
"strconv"
|
2019-11-19 19:44:41 -08:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/go-redis/redis/v7"
|
2019-12-22 07:15:45 -08:00
|
|
|
"github.com/hibiken/asynq/internal/base"
|
2020-01-07 06:28:34 -08:00
|
|
|
"github.com/spf13/cast"
|
2019-11-19 19:44:41 -08:00
|
|
|
)
|
|
|
|
|
2019-12-08 06:46:04 -08:00
|
|
|
var (
|
2020-01-07 06:28:34 -08:00
|
|
|
// ErrNoProcessableTask indicates that there are no tasks ready to be processed.
|
|
|
|
ErrNoProcessableTask = errors.New("no tasks are ready for processing")
|
2019-12-08 06:46:04 -08:00
|
|
|
|
|
|
|
// ErrTaskNotFound indicates that a task that matches the given identifier was not found.
|
|
|
|
ErrTaskNotFound = errors.New("could not find a task")
|
2020-03-18 06:49:39 -07:00
|
|
|
|
|
|
|
// ErrDuplicateTask indicates that another task with the same unique key holds the uniqueness lock.
|
|
|
|
ErrDuplicateTask = errors.New("task already exists")
|
2019-12-08 06:46:04 -08:00
|
|
|
)
|
2019-11-19 19:44:41 -08:00
|
|
|
|
2019-12-23 05:33:48 -08:00
|
|
|
const statsTTL = 90 * 24 * time.Hour // 90 days
|
|
|
|
|
2019-12-04 06:25:58 -08:00
|
|
|
// RDB is a client interface to query and mutate task queues.
|
2019-12-03 21:01:26 -08:00
|
|
|
type RDB struct {
|
2020-08-27 07:19:21 -07:00
|
|
|
client redis.UniversalClient
|
2019-11-19 19:44:41 -08:00
|
|
|
}
|
|
|
|
|
2019-12-03 21:01:26 -08:00
|
|
|
// NewRDB returns a new instance of RDB.
|
2020-08-27 07:19:21 -07:00
|
|
|
func NewRDB(client redis.UniversalClient) *RDB {
|
2019-12-03 21:01:26 -08:00
|
|
|
return &RDB{client}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close closes the connection with redis server.
|
|
|
|
func (r *RDB) Close() error {
|
|
|
|
return r.client.Close()
|
2019-11-19 19:44:41 -08:00
|
|
|
}
|
|
|
|
|
2020-07-25 18:49:27 -07:00
|
|
|
// Ping checks the connection with redis server.
|
|
|
|
func (r *RDB) Ping() error {
|
|
|
|
return r.client.Ping().Err()
|
|
|
|
}
|
|
|
|
|
2019-12-27 20:37:15 -08:00
|
|
|
// Enqueue inserts the given task to the tail of the queue.
|
2019-12-22 07:15:45 -08:00
|
|
|
func (r *RDB) Enqueue(msg *base.TaskMessage) error {
|
2020-06-11 20:58:27 -07:00
|
|
|
encoded, err := base.EncodeMessage(msg)
|
2019-11-19 19:44:41 -08:00
|
|
|
if err != nil {
|
2019-12-27 20:37:15 -08:00
|
|
|
return err
|
2019-11-19 19:44:41 -08:00
|
|
|
}
|
2020-08-10 06:13:06 -07:00
|
|
|
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
|
2020-08-07 06:31:02 -07:00
|
|
|
return err
|
|
|
|
}
|
2020-01-06 21:27:51 -08:00
|
|
|
key := base.QueueKey(msg.Queue)
|
2020-08-07 06:31:02 -07:00
|
|
|
return r.client.LPush(key, encoded).Err()
|
2019-11-19 19:44:41 -08:00
|
|
|
}
|
|
|
|
|
2020-08-07 06:31:02 -07:00
|
|
|
// KEYS[1] -> unique key
|
|
|
|
// KEYS[2] -> asynq:{<qname>}
|
2020-03-18 06:49:39 -07:00
|
|
|
// ARGV[1] -> task ID
|
|
|
|
// ARGV[2] -> uniqueness lock TTL
|
|
|
|
// ARGV[3] -> task message data
|
|
|
|
var enqueueUniqueCmd = redis.NewScript(`
|
|
|
|
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
|
|
|
|
if not ok then
|
|
|
|
return 0
|
|
|
|
end
|
|
|
|
redis.call("LPUSH", KEYS[2], ARGV[3])
|
|
|
|
return 1
|
|
|
|
`)
|
|
|
|
|
|
|
|
// EnqueueUnique inserts the given task if the task's uniqueness lock can be acquired.
|
|
|
|
// It returns ErrDuplicateTask if the lock cannot be acquired.
|
|
|
|
func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
|
2020-06-11 20:58:27 -07:00
|
|
|
encoded, err := base.EncodeMessage(msg)
|
2020-03-18 06:49:39 -07:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-08-10 06:13:06 -07:00
|
|
|
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
|
2020-08-07 06:31:02 -07:00
|
|
|
return err
|
|
|
|
}
|
2020-03-18 06:49:39 -07:00
|
|
|
res, err := enqueueUniqueCmd.Run(r.client,
|
2020-08-07 06:31:02 -07:00
|
|
|
[]string{msg.UniqueKey, base.QueueKey(msg.Queue)},
|
2020-06-11 20:58:27 -07:00
|
|
|
msg.ID.String(), int(ttl.Seconds()), encoded).Result()
|
2020-03-18 06:49:39 -07:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
n, ok := res.(int64)
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("could not cast %v to int64", res)
|
|
|
|
}
|
|
|
|
if n == 0 {
|
|
|
|
return ErrDuplicateTask
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-06-18 05:45:10 -07:00
|
|
|
// Dequeue queries given queues in order and pops a task message
|
2020-06-18 11:25:33 -07:00
|
|
|
// off a queue if one exists and returns the message and deadline.
|
2020-06-04 05:37:17 -07:00
|
|
|
// Dequeue skips a queue if the queue is paused.
|
2020-02-08 11:06:14 -08:00
|
|
|
// If all queues are empty, ErrNoProcessableTask error is returned.
|
2020-06-18 11:25:33 -07:00
|
|
|
func (r *RDB) Dequeue(qnames ...string) (msg *base.TaskMessage, deadline time.Time, err error) {
|
2020-08-27 07:19:21 -07:00
|
|
|
data, d, err := r.dequeue(qnames...)
|
2019-11-19 19:44:41 -08:00
|
|
|
if err != nil {
|
2020-06-18 11:25:33 -07:00
|
|
|
return nil, time.Time{}, err
|
2019-11-19 19:44:41 -08:00
|
|
|
}
|
2020-06-18 05:45:10 -07:00
|
|
|
if msg, err = base.DecodeMessage(data); err != nil {
|
2020-06-18 11:25:33 -07:00
|
|
|
return nil, time.Time{}, err
|
2020-06-18 05:45:10 -07:00
|
|
|
}
|
2020-06-18 11:25:33 -07:00
|
|
|
return msg, time.Unix(d, 0), nil
|
2019-11-19 19:44:41 -08:00
|
|
|
}
|
|
|
|
|
2020-08-27 07:19:21 -07:00
|
|
|
// KEYS[1] -> asynq:{<qname>}
|
|
|
|
// KEYS[2] -> asynq:{<qname>}:paused
|
2020-09-05 12:43:15 -07:00
|
|
|
// KEYS[3] -> asynq:{<qname>}:active
|
2020-08-27 07:19:21 -07:00
|
|
|
// KEYS[4] -> asynq:{<qname>}:deadlines
|
2020-06-17 06:46:54 -07:00
|
|
|
// ARGV[1] -> current time in Unix time
|
2020-06-04 05:37:17 -07:00
|
|
|
//
|
|
|
|
// dequeueCmd checks whether a queue is paused first, before
|
|
|
|
// calling RPOPLPUSH to pop a task from the queue.
|
2020-06-17 06:46:54 -07:00
|
|
|
// It computes the task deadline by inspecting Timout and Deadline fields,
|
|
|
|
// and inserts the task with deadlines set.
|
2020-02-08 11:06:14 -08:00
|
|
|
var dequeueCmd = redis.NewScript(`
|
2020-08-27 07:19:21 -07:00
|
|
|
if redis.call("EXISTS", KEYS[2]) == 0 then
|
|
|
|
local msg = redis.call("RPOPLPUSH", KEYS[1], KEYS[3])
|
|
|
|
if msg then
|
|
|
|
local decoded = cjson.decode(msg)
|
|
|
|
local timeout = decoded["Timeout"]
|
|
|
|
local deadline = decoded["Deadline"]
|
|
|
|
local score
|
|
|
|
if timeout ~= 0 and deadline ~= 0 then
|
|
|
|
score = math.min(ARGV[1]+timeout, deadline)
|
|
|
|
elseif timeout ~= 0 then
|
|
|
|
score = ARGV[1] + timeout
|
|
|
|
elseif deadline ~= 0 then
|
|
|
|
score = deadline
|
|
|
|
else
|
|
|
|
return redis.error_reply("asynq internal error: both timeout and deadline are not set")
|
2020-06-04 05:37:17 -07:00
|
|
|
end
|
2020-08-27 07:19:21 -07:00
|
|
|
redis.call("ZADD", KEYS[4], score, msg)
|
|
|
|
return {msg, score}
|
2020-02-08 11:06:14 -08:00
|
|
|
end
|
|
|
|
end
|
2020-06-04 05:37:17 -07:00
|
|
|
return nil`)
|
2020-02-08 11:06:14 -08:00
|
|
|
|
2020-08-27 07:19:21 -07:00
|
|
|
func (r *RDB) dequeue(qnames ...string) (msgjson string, deadline int64, err error) {
|
|
|
|
for _, qname := range qnames {
|
|
|
|
keys := []string{
|
|
|
|
base.QueueKey(qname),
|
|
|
|
base.PausedKey(qname),
|
2020-09-05 12:43:15 -07:00
|
|
|
base.ActiveKey(qname),
|
2020-08-27 07:19:21 -07:00
|
|
|
base.DeadlinesKey(qname),
|
|
|
|
}
|
|
|
|
res, err := dequeueCmd.Run(r.client, keys, time.Now().Unix()).Result()
|
|
|
|
if err == redis.Nil {
|
|
|
|
continue
|
|
|
|
} else if err != nil {
|
|
|
|
return "", 0, err
|
|
|
|
}
|
|
|
|
data, err := cast.ToSliceE(res)
|
|
|
|
if err != nil {
|
|
|
|
return "", 0, err
|
|
|
|
}
|
|
|
|
if len(data) != 2 {
|
|
|
|
return "", 0, fmt.Errorf("asynq: internal error: dequeue command returned %d values", len(data))
|
|
|
|
}
|
|
|
|
if msgjson, err = cast.ToStringE(data[0]); err != nil {
|
|
|
|
return "", 0, err
|
|
|
|
}
|
|
|
|
if deadline, err = cast.ToInt64E(data[1]); err != nil {
|
|
|
|
return "", 0, err
|
|
|
|
}
|
|
|
|
return msgjson, deadline, nil
|
|
|
|
}
|
|
|
|
return "", 0, ErrNoProcessableTask
|
2020-01-07 06:28:34 -08:00
|
|
|
}
|
|
|
|
|
2020-09-05 12:43:15 -07:00
|
|
|
// KEYS[1] -> asynq:{<qname>}:active
|
2020-08-28 05:37:40 -07:00
|
|
|
// KEYS[2] -> asynq:{<qname>}:deadlines
|
|
|
|
// KEYS[3] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
|
|
|
|
// ARGV[1] -> base.TaskMessage value
|
|
|
|
// ARGV[2] -> stats expiration timestamp
|
|
|
|
var doneCmd = redis.NewScript(`
|
|
|
|
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
|
|
|
|
return redis.error_reply("NOT FOUND")
|
|
|
|
end
|
|
|
|
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
|
|
|
|
return redis.error_reply("NOT FOUND")
|
|
|
|
end
|
|
|
|
local n = redis.call("INCR", KEYS[3])
|
|
|
|
if tonumber(n) == 1 then
|
|
|
|
redis.call("EXPIREAT", KEYS[3], ARGV[2])
|
|
|
|
end
|
|
|
|
return redis.status_reply("OK")
|
|
|
|
`)
|
|
|
|
|
2020-09-05 12:43:15 -07:00
|
|
|
// KEYS[1] -> asynq:{<qname>}:active
|
2020-08-08 06:48:49 -07:00
|
|
|
// KEYS[2] -> asynq:{<qname>}:deadlines
|
|
|
|
// KEYS[3] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
|
2020-08-07 06:31:02 -07:00
|
|
|
// KEYS[4] -> unique key
|
2020-02-08 11:06:14 -08:00
|
|
|
// ARGV[1] -> base.TaskMessage value
|
|
|
|
// ARGV[2] -> stats expiration timestamp
|
2020-03-18 06:49:39 -07:00
|
|
|
// ARGV[3] -> task ID
|
2020-08-28 05:37:40 -07:00
|
|
|
var doneUniqueCmd = redis.NewScript(`
|
2020-06-18 12:12:29 -07:00
|
|
|
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
|
2020-06-07 13:04:27 -07:00
|
|
|
return redis.error_reply("NOT FOUND")
|
|
|
|
end
|
2020-06-18 12:12:29 -07:00
|
|
|
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
|
2020-06-18 07:10:57 -07:00
|
|
|
return redis.error_reply("NOT FOUND")
|
|
|
|
end
|
|
|
|
local n = redis.call("INCR", KEYS[3])
|
2020-02-08 11:06:14 -08:00
|
|
|
if tonumber(n) == 1 then
|
2020-06-18 07:10:57 -07:00
|
|
|
redis.call("EXPIREAT", KEYS[3], ARGV[2])
|
2020-02-08 11:06:14 -08:00
|
|
|
end
|
2020-08-28 05:37:40 -07:00
|
|
|
if redis.call("GET", KEYS[4]) == ARGV[3] then
|
2020-06-18 07:10:57 -07:00
|
|
|
redis.call("DEL", KEYS[4])
|
2020-03-18 06:49:39 -07:00
|
|
|
end
|
2020-02-08 11:06:14 -08:00
|
|
|
return redis.status_reply("OK")
|
|
|
|
`)
|
|
|
|
|
2020-09-05 12:43:15 -07:00
|
|
|
// Done removes the task from active queue to mark the task as done.
|
2020-03-18 06:49:39 -07:00
|
|
|
// It removes a uniqueness lock acquired by the task, if any.
|
2019-12-22 07:15:45 -08:00
|
|
|
func (r *RDB) Done(msg *base.TaskMessage) error {
|
2020-06-11 20:58:27 -07:00
|
|
|
encoded, err := base.EncodeMessage(msg)
|
2019-11-21 21:45:27 -08:00
|
|
|
if err != nil {
|
2019-12-27 20:37:15 -08:00
|
|
|
return err
|
2019-11-21 21:45:27 -08:00
|
|
|
}
|
2019-12-23 05:33:48 -08:00
|
|
|
now := time.Now()
|
|
|
|
expireAt := now.Add(statsTTL)
|
2020-08-28 05:37:40 -07:00
|
|
|
keys := []string{
|
2020-09-05 12:43:15 -07:00
|
|
|
base.ActiveKey(msg.Queue),
|
2020-08-28 05:37:40 -07:00
|
|
|
base.DeadlinesKey(msg.Queue),
|
|
|
|
base.ProcessedKey(msg.Queue, now),
|
|
|
|
}
|
|
|
|
args := []interface{}{encoded, expireAt.Unix()}
|
|
|
|
if len(msg.UniqueKey) > 0 {
|
|
|
|
keys = append(keys, msg.UniqueKey)
|
|
|
|
args = append(args, msg.ID.String())
|
|
|
|
return doneUniqueCmd.Run(r.client, keys, args...).Err()
|
|
|
|
}
|
|
|
|
return doneCmd.Run(r.client, keys, args...).Err()
|
2019-11-21 21:45:27 -08:00
|
|
|
}
|
|
|
|
|
2020-09-05 12:43:15 -07:00
|
|
|
// KEYS[1] -> asynq:{<qname>}:active
|
2020-08-09 05:40:44 -07:00
|
|
|
// KEYS[2] -> asynq:{<qname>}:deadlines
|
|
|
|
// KEYS[3] -> asynq:{<qname>}
|
2020-02-08 11:06:14 -08:00
|
|
|
// ARGV[1] -> base.TaskMessage value
|
|
|
|
// Note: Use RPUSH to push to the head of the queue.
|
|
|
|
var requeueCmd = redis.NewScript(`
|
2020-06-18 12:12:29 -07:00
|
|
|
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
|
|
|
|
return redis.error_reply("NOT FOUND")
|
|
|
|
end
|
|
|
|
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
|
|
|
|
return redis.error_reply("NOT FOUND")
|
|
|
|
end
|
|
|
|
redis.call("RPUSH", KEYS[3], ARGV[1])
|
2020-02-08 11:06:14 -08:00
|
|
|
return redis.status_reply("OK")`)
|
|
|
|
|
2020-09-05 12:43:15 -07:00
|
|
|
// Requeue moves the task from active queue to the specified queue.
|
2019-12-22 07:15:45 -08:00
|
|
|
func (r *RDB) Requeue(msg *base.TaskMessage) error {
|
2020-06-11 20:58:27 -07:00
|
|
|
encoded, err := base.EncodeMessage(msg)
|
2019-12-17 20:07:17 -08:00
|
|
|
if err != nil {
|
2019-12-27 20:37:15 -08:00
|
|
|
return err
|
2019-12-17 20:07:17 -08:00
|
|
|
}
|
2020-02-08 11:06:14 -08:00
|
|
|
return requeueCmd.Run(r.client,
|
2020-09-05 12:43:15 -07:00
|
|
|
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.QueueKey(msg.Queue)},
|
2020-06-11 20:58:27 -07:00
|
|
|
encoded).Err()
|
2019-12-17 20:07:17 -08:00
|
|
|
}
|
|
|
|
|
2019-12-04 06:45:30 -08:00
|
|
|
// Schedule adds the task to the backlog queue to be processed in the future.
|
2019-12-22 07:15:45 -08:00
|
|
|
func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error {
|
2020-06-11 20:58:27 -07:00
|
|
|
encoded, err := base.EncodeMessage(msg)
|
2019-12-15 17:18:43 -08:00
|
|
|
if err != nil {
|
2019-12-27 20:37:15 -08:00
|
|
|
return err
|
2019-12-15 17:18:43 -08:00
|
|
|
}
|
2020-08-10 06:13:06 -07:00
|
|
|
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
|
2020-08-07 06:31:02 -07:00
|
|
|
return err
|
|
|
|
}
|
2020-03-18 06:49:39 -07:00
|
|
|
score := float64(processAt.Unix())
|
2020-08-07 06:31:02 -07:00
|
|
|
return r.client.ZAdd(base.ScheduledKey(msg.Queue), &redis.Z{Score: score, Member: encoded}).Err()
|
2020-03-18 06:49:39 -07:00
|
|
|
}
|
|
|
|
|
2020-08-07 06:31:02 -07:00
|
|
|
// KEYS[1] -> unique key
|
|
|
|
// KEYS[2] -> asynq:{<qname>}:scheduled
|
2020-03-18 06:49:39 -07:00
|
|
|
// ARGV[1] -> task ID
|
|
|
|
// ARGV[2] -> uniqueness lock TTL
|
|
|
|
// ARGV[3] -> score (process_at timestamp)
|
|
|
|
// ARGV[4] -> task message
|
|
|
|
var scheduleUniqueCmd = redis.NewScript(`
|
|
|
|
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
|
|
|
|
if not ok then
|
|
|
|
return 0
|
|
|
|
end
|
|
|
|
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[4])
|
|
|
|
return 1
|
|
|
|
`)
|
|
|
|
|
2020-03-21 11:44:26 -07:00
|
|
|
// ScheduleUnique adds the task to the backlog queue to be processed in the future if the uniqueness lock can be acquired.
|
2020-03-18 06:49:39 -07:00
|
|
|
// It returns ErrDuplicateTask if the lock cannot be acquired.
|
|
|
|
func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl time.Duration) error {
|
2020-06-11 20:58:27 -07:00
|
|
|
encoded, err := base.EncodeMessage(msg)
|
2020-03-18 06:49:39 -07:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-08-10 06:13:06 -07:00
|
|
|
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
|
2020-08-07 06:31:02 -07:00
|
|
|
return err
|
|
|
|
}
|
2019-12-15 17:18:43 -08:00
|
|
|
score := float64(processAt.Unix())
|
2020-03-18 06:49:39 -07:00
|
|
|
res, err := scheduleUniqueCmd.Run(r.client,
|
2020-08-07 06:31:02 -07:00
|
|
|
[]string{msg.UniqueKey, base.ScheduledKey(msg.Queue)},
|
|
|
|
msg.ID.String(), int(ttl.Seconds()), score, encoded).Result()
|
2020-03-18 06:49:39 -07:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
n, ok := res.(int64)
|
|
|
|
if !ok {
|
|
|
|
return fmt.Errorf("could not cast %v to int64", res)
|
|
|
|
}
|
|
|
|
if n == 0 {
|
|
|
|
return ErrDuplicateTask
|
|
|
|
}
|
|
|
|
return nil
|
2019-12-04 06:45:30 -08:00
|
|
|
}
|
|
|
|
|
2020-09-05 12:43:15 -07:00
|
|
|
// KEYS[1] -> asynq:{<qname>}:active
|
2020-08-08 12:17:33 -07:00
|
|
|
// KEYS[2] -> asynq:{<qname>}:deadlines
|
|
|
|
// KEYS[3] -> asynq:{<qname>}:retry
|
|
|
|
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
|
2020-08-10 21:49:12 -07:00
|
|
|
// KEYS[5] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
|
2020-09-05 12:43:15 -07:00
|
|
|
// ARGV[1] -> base.TaskMessage value to remove from base.ActiveQueue queue
|
2020-02-08 11:06:14 -08:00
|
|
|
// ARGV[2] -> base.TaskMessage value to add to Retry queue
|
|
|
|
// ARGV[3] -> retry_at UNIX timestamp
|
|
|
|
// ARGV[4] -> stats expiration timestamp
|
|
|
|
var retryCmd = redis.NewScript(`
|
2020-06-18 12:12:29 -07:00
|
|
|
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
|
2020-06-07 13:04:27 -07:00
|
|
|
return redis.error_reply("NOT FOUND")
|
|
|
|
end
|
2020-06-18 12:12:29 -07:00
|
|
|
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
|
2020-06-18 10:25:01 -07:00
|
|
|
return redis.error_reply("NOT FOUND")
|
|
|
|
end
|
|
|
|
redis.call("ZADD", KEYS[3], ARGV[3], ARGV[2])
|
|
|
|
local n = redis.call("INCR", KEYS[4])
|
2020-02-08 11:06:14 -08:00
|
|
|
if tonumber(n) == 1 then
|
2020-06-18 10:25:01 -07:00
|
|
|
redis.call("EXPIREAT", KEYS[4], ARGV[4])
|
2020-02-08 11:06:14 -08:00
|
|
|
end
|
2020-06-18 10:25:01 -07:00
|
|
|
local m = redis.call("INCR", KEYS[5])
|
2020-02-08 11:06:14 -08:00
|
|
|
if tonumber(m) == 1 then
|
2020-06-18 10:25:01 -07:00
|
|
|
redis.call("EXPIREAT", KEYS[5], ARGV[4])
|
2020-02-08 11:06:14 -08:00
|
|
|
end
|
|
|
|
return redis.status_reply("OK")`)
|
|
|
|
|
2020-09-05 12:43:15 -07:00
|
|
|
// Retry moves the task from active to retry queue, incrementing retry count
|
2019-12-15 16:15:07 -08:00
|
|
|
// and assigning error message to the task message.
|
2019-12-22 07:15:45 -08:00
|
|
|
func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error {
|
2020-06-11 20:58:27 -07:00
|
|
|
msgToRemove, err := base.EncodeMessage(msg)
|
2019-12-15 16:15:07 -08:00
|
|
|
if err != nil {
|
2019-12-28 10:51:37 -08:00
|
|
|
return err
|
2019-12-15 16:15:07 -08:00
|
|
|
}
|
2019-12-15 20:05:56 -08:00
|
|
|
modified := *msg
|
|
|
|
modified.Retried++
|
|
|
|
modified.ErrorMsg = errMsg
|
2020-06-11 20:58:27 -07:00
|
|
|
msgToAdd, err := base.EncodeMessage(&modified)
|
2019-12-15 20:05:56 -08:00
|
|
|
if err != nil {
|
2019-12-28 10:51:37 -08:00
|
|
|
return err
|
2019-12-15 20:05:56 -08:00
|
|
|
}
|
2019-12-23 05:33:48 -08:00
|
|
|
now := time.Now()
|
2020-08-08 12:17:33 -07:00
|
|
|
processedKey := base.ProcessedKey(msg.Queue, now)
|
|
|
|
failedKey := base.FailedKey(msg.Queue, now)
|
2019-12-23 05:33:48 -08:00
|
|
|
expireAt := now.Add(statsTTL)
|
2020-02-08 11:06:14 -08:00
|
|
|
return retryCmd.Run(r.client,
|
2020-09-05 12:43:15 -07:00
|
|
|
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.RetryKey(msg.Queue), processedKey, failedKey},
|
2020-06-11 20:58:27 -07:00
|
|
|
msgToRemove, msgToAdd, processAt.Unix(), expireAt.Unix()).Err()
|
2019-12-15 16:15:07 -08:00
|
|
|
}
|
|
|
|
|
2019-12-26 07:17:26 -08:00
|
|
|
const (
|
2021-01-12 11:01:21 -08:00
|
|
|
maxArchiveSize = 10000 // maximum number of tasks in archive
|
|
|
|
archivedExpirationInDays = 90 // number of days before an archived task gets deleted permanently
|
2019-12-26 07:17:26 -08:00
|
|
|
)
|
|
|
|
|
2020-09-05 12:43:15 -07:00
|
|
|
// KEYS[1] -> asynq:{<qname>}:active
|
2020-08-08 12:44:08 -07:00
|
|
|
// KEYS[2] -> asynq:{<qname>}:deadlines
|
2021-01-12 11:01:21 -08:00
|
|
|
// KEYS[3] -> asynq:{<qname>}:archived
|
2020-08-08 12:44:08 -07:00
|
|
|
// KEYS[4] -> asynq:{<qname>}:processed:<yyyy-mm-dd>
|
2020-08-10 21:49:12 -07:00
|
|
|
// KEYS[5] -> asynq:{<qname>}:failed:<yyyy-mm-dd>
|
2021-01-12 11:01:21 -08:00
|
|
|
// ARGV[1] -> base.TaskMessage value to remove
|
|
|
|
// ARGV[2] -> base.TaskMessage value to add
|
2020-02-08 11:06:14 -08:00
|
|
|
// ARGV[3] -> died_at UNIX timestamp
|
|
|
|
// ARGV[4] -> cutoff timestamp (e.g., 90 days ago)
|
2021-01-12 11:01:21 -08:00
|
|
|
// ARGV[5] -> max number of tasks in archive (e.g., 100)
|
2020-02-08 11:06:14 -08:00
|
|
|
// ARGV[6] -> stats expiration timestamp
|
2021-01-12 11:01:21 -08:00
|
|
|
var archiveCmd = redis.NewScript(`
|
2020-06-18 12:12:29 -07:00
|
|
|
if redis.call("LREM", KEYS[1], 0, ARGV[1]) == 0 then
|
2020-06-07 13:04:27 -07:00
|
|
|
return redis.error_reply("NOT FOUND")
|
|
|
|
end
|
2020-06-18 12:12:29 -07:00
|
|
|
if redis.call("ZREM", KEYS[2], ARGV[1]) == 0 then
|
2020-06-18 10:53:58 -07:00
|
|
|
return redis.error_reply("NOT FOUND")
|
|
|
|
end
|
|
|
|
redis.call("ZADD", KEYS[3], ARGV[3], ARGV[2])
|
|
|
|
redis.call("ZREMRANGEBYSCORE", KEYS[3], "-inf", ARGV[4])
|
|
|
|
redis.call("ZREMRANGEBYRANK", KEYS[3], 0, -ARGV[5])
|
|
|
|
local n = redis.call("INCR", KEYS[4])
|
2020-02-08 11:06:14 -08:00
|
|
|
if tonumber(n) == 1 then
|
2020-06-18 10:53:58 -07:00
|
|
|
redis.call("EXPIREAT", KEYS[4], ARGV[6])
|
2020-02-08 11:06:14 -08:00
|
|
|
end
|
2020-06-18 10:53:58 -07:00
|
|
|
local m = redis.call("INCR", KEYS[5])
|
2020-02-08 11:06:14 -08:00
|
|
|
if tonumber(m) == 1 then
|
2020-06-18 10:53:58 -07:00
|
|
|
redis.call("EXPIREAT", KEYS[5], ARGV[6])
|
2020-02-08 11:06:14 -08:00
|
|
|
end
|
|
|
|
return redis.status_reply("OK")`)
|
|
|
|
|
2021-01-12 11:01:21 -08:00
|
|
|
// Archive sends the given task to archive, attaching the error message to the task.
|
|
|
|
// It also trims the archive by timestamp and set size.
|
|
|
|
func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
|
2020-06-11 20:58:27 -07:00
|
|
|
msgToRemove, err := base.EncodeMessage(msg)
|
2019-11-19 19:44:41 -08:00
|
|
|
if err != nil {
|
2019-12-28 10:51:37 -08:00
|
|
|
return err
|
2019-11-19 19:44:41 -08:00
|
|
|
}
|
2019-12-15 20:05:56 -08:00
|
|
|
modified := *msg
|
|
|
|
modified.ErrorMsg = errMsg
|
2020-06-11 20:58:27 -07:00
|
|
|
msgToAdd, err := base.EncodeMessage(&modified)
|
2019-12-15 20:05:56 -08:00
|
|
|
if err != nil {
|
2019-12-28 10:51:37 -08:00
|
|
|
return err
|
2019-12-15 20:05:56 -08:00
|
|
|
}
|
2019-11-19 19:44:41 -08:00
|
|
|
now := time.Now()
|
2021-01-12 11:01:21 -08:00
|
|
|
limit := now.AddDate(0, 0, -archivedExpirationInDays).Unix() // 90 days ago
|
2020-08-08 12:44:08 -07:00
|
|
|
processedKey := base.ProcessedKey(msg.Queue, now)
|
2020-08-10 21:49:12 -07:00
|
|
|
failedKey := base.FailedKey(msg.Queue, now)
|
2019-12-23 05:33:48 -08:00
|
|
|
expireAt := now.Add(statsTTL)
|
2021-01-12 11:01:21 -08:00
|
|
|
return archiveCmd.Run(r.client,
|
|
|
|
[]string{base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.ArchivedKey(msg.Queue), processedKey, failedKey},
|
|
|
|
msgToRemove, msgToAdd, now.Unix(), limit, maxArchiveSize, expireAt.Unix()).Err()
|
2019-11-19 19:44:41 -08:00
|
|
|
}
|
|
|
|
|
2020-08-09 06:26:14 -07:00
|
|
|
// CheckAndEnqueue checks for scheduled/retry tasks for the given queues
|
|
|
|
//and enqueues any tasks that are ready to be processed.
|
|
|
|
func (r *RDB) CheckAndEnqueue(qnames ...string) error {
|
|
|
|
for _, qname := range qnames {
|
|
|
|
if err := r.forwardAll(base.ScheduledKey(qname), base.QueueKey(qname)); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := r.forwardAll(base.RetryKey(qname), base.QueueKey(qname)); err != nil {
|
|
|
|
return err
|
2019-12-04 07:14:37 -08:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-08-09 06:26:14 -07:00
|
|
|
// KEYS[1] -> source queue (e.g. asynq:{<qname>:scheduled or asynq:{<qname>}:retry})
|
|
|
|
// KEYS[2] -> destination queue (e.g. asynq:{<qname>})
|
2020-02-08 11:06:14 -08:00
|
|
|
// ARGV[1] -> current unix time
|
2020-06-07 13:04:27 -07:00
|
|
|
// Note: Script moves tasks up to 100 at a time to keep the runtime of script short.
|
2020-02-08 11:06:14 -08:00
|
|
|
var forwardCmd = redis.NewScript(`
|
2020-06-07 13:04:27 -07:00
|
|
|
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)
|
2020-02-08 11:06:14 -08:00
|
|
|
for _, msg in ipairs(msgs) do
|
2020-08-09 06:26:14 -07:00
|
|
|
redis.call("LPUSH", KEYS[2], msg)
|
2020-02-08 11:06:14 -08:00
|
|
|
redis.call("ZREM", KEYS[1], msg)
|
|
|
|
end
|
2020-06-07 13:04:27 -07:00
|
|
|
return table.getn(msgs)`)
|
2020-02-08 11:06:14 -08:00
|
|
|
|
2020-06-07 13:04:27 -07:00
|
|
|
// forward moves tasks with a score less than the current unix time
|
2020-08-09 06:26:14 -07:00
|
|
|
// from the src zset to the dst list. It returns the number of tasks moved.
|
|
|
|
func (r *RDB) forward(src, dst string) (int, error) {
|
2020-01-14 07:26:41 -08:00
|
|
|
now := float64(time.Now().Unix())
|
2020-08-09 06:26:14 -07:00
|
|
|
res, err := forwardCmd.Run(r.client, []string{src, dst}, now).Result()
|
2020-06-07 13:04:27 -07:00
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
return cast.ToInt(res), nil
|
2019-12-06 22:29:40 -08:00
|
|
|
}
|
2020-01-31 06:48:58 -08:00
|
|
|
|
2020-08-09 06:26:14 -07:00
|
|
|
// forwardAll moves tasks with a score less than the current unix time from the src zset,
|
|
|
|
// until there's no more tasks.
|
2020-08-10 21:49:12 -07:00
|
|
|
func (r *RDB) forwardAll(src, dst string) (err error) {
|
2020-08-09 06:26:14 -07:00
|
|
|
n := 1
|
|
|
|
for n != 0 {
|
|
|
|
n, err = r.forward(src, dst)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-08-10 05:37:49 -07:00
|
|
|
// ListDeadlineExceeded returns a list of task messages that have exceeded the deadline from the given queues.
|
|
|
|
func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) {
|
2020-06-20 06:29:58 -07:00
|
|
|
var msgs []*base.TaskMessage
|
|
|
|
opt := &redis.ZRangeBy{
|
|
|
|
Min: "-inf",
|
|
|
|
Max: strconv.FormatInt(deadline.Unix(), 10),
|
|
|
|
}
|
2020-08-10 05:37:49 -07:00
|
|
|
for _, qname := range qnames {
|
|
|
|
res, err := r.client.ZRangeByScore(base.DeadlinesKey(qname), opt).Result()
|
2020-06-20 06:29:58 -07:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-08-10 05:37:49 -07:00
|
|
|
for _, s := range res {
|
|
|
|
msg, err := base.DecodeMessage(s)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
msgs = append(msgs, msg)
|
|
|
|
}
|
2020-06-20 06:29:58 -07:00
|
|
|
}
|
|
|
|
return msgs, nil
|
|
|
|
}
|
|
|
|
|
2020-08-10 21:49:12 -07:00
|
|
|
// KEYS[1] -> asynq:servers:{<host:pid:sid>}
|
|
|
|
// KEYS[2] -> asynq:workers:{<host:pid:sid>}
|
|
|
|
// ARGV[1] -> TTL in seconds
|
|
|
|
// ARGV[2] -> server info
|
|
|
|
// ARGV[3:] -> alternate key-value pair of (worker id, worker data)
|
2020-02-21 07:18:22 -08:00
|
|
|
// Note: Add key to ZSET with expiration time as score.
|
|
|
|
// ref: https://github.com/antirez/redis/issues/135#issuecomment-2361996
|
2020-05-18 20:47:35 -07:00
|
|
|
var writeServerStateCmd = redis.NewScript(`
|
2020-08-10 21:49:12 -07:00
|
|
|
redis.call("SETEX", KEYS[1], ARGV[1], ARGV[2])
|
|
|
|
redis.call("DEL", KEYS[2])
|
|
|
|
for i = 3, table.getn(ARGV)-1, 2 do
|
|
|
|
redis.call("HSET", KEYS[2], ARGV[i], ARGV[i+1])
|
2020-02-21 07:18:22 -08:00
|
|
|
end
|
2020-08-10 21:49:12 -07:00
|
|
|
redis.call("EXPIRE", KEYS[2], ARGV[1])
|
2020-02-08 11:06:14 -08:00
|
|
|
return redis.status_reply("OK")`)
|
|
|
|
|
2020-05-18 20:47:35 -07:00
|
|
|
// WriteServerState writes server state data to redis with expiration set to the value ttl.
|
|
|
|
func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error {
|
2020-02-21 07:18:22 -08:00
|
|
|
bytes, err := json.Marshal(info)
|
2020-01-31 06:48:58 -08:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2020-02-01 22:22:48 -08:00
|
|
|
exp := time.Now().Add(ttl).UTC()
|
2020-08-10 21:49:12 -07:00
|
|
|
args := []interface{}{ttl.Seconds(), bytes} // args to the lua script
|
2020-02-21 07:18:22 -08:00
|
|
|
for _, w := range workers {
|
|
|
|
bytes, err := json.Marshal(w)
|
|
|
|
if err != nil {
|
|
|
|
continue // skip bad data
|
|
|
|
}
|
2020-05-18 20:47:35 -07:00
|
|
|
args = append(args, w.ID, bytes)
|
2020-01-31 06:48:58 -08:00
|
|
|
}
|
2020-04-12 16:42:11 -07:00
|
|
|
skey := base.ServerInfoKey(info.Host, info.PID, info.ServerID)
|
|
|
|
wkey := base.WorkersKey(info.Host, info.PID, info.ServerID)
|
2020-08-10 21:49:12 -07:00
|
|
|
if err := r.client.ZAdd(base.AllServers, &redis.Z{Score: float64(exp.Unix()), Member: skey}).Err(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := r.client.ZAdd(base.AllWorkers, &redis.Z{Score: float64(exp.Unix()), Member: wkey}).Err(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return writeServerStateCmd.Run(r.client, []string{skey, wkey}, args...).Err()
|
2020-02-01 22:22:48 -08:00
|
|
|
}
|
|
|
|
|
2020-08-10 21:49:12 -07:00
|
|
|
// KEYS[1] -> asynq:servers:{<host:pid:sid>}
|
|
|
|
// KEYS[2] -> asynq:workers:{<host:pid:sid>}
|
2020-06-20 06:29:58 -07:00
|
|
|
var clearServerStateCmd = redis.NewScript(`
|
2020-08-10 21:49:12 -07:00
|
|
|
redis.call("DEL", KEYS[1])
|
2020-02-08 11:06:14 -08:00
|
|
|
redis.call("DEL", KEYS[2])
|
|
|
|
return redis.status_reply("OK")`)
|
|
|
|
|
2020-04-12 11:41:50 -07:00
|
|
|
// ClearServerState deletes server state data from redis.
|
2020-05-18 20:47:35 -07:00
|
|
|
func (r *RDB) ClearServerState(host string, pid int, serverID string) error {
|
|
|
|
skey := base.ServerInfoKey(host, pid, serverID)
|
|
|
|
wkey := base.WorkersKey(host, pid, serverID)
|
2020-08-10 21:49:12 -07:00
|
|
|
if err := r.client.ZRem(base.AllServers, skey).Err(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if err := r.client.ZRem(base.AllWorkers, wkey).Err(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return clearServerStateCmd.Run(r.client, []string{skey, wkey}).Err()
|
2020-01-31 06:48:58 -08:00
|
|
|
}
|
2020-02-12 17:12:09 -08:00
|
|
|
|
2020-09-26 17:33:29 -07:00
|
|
|
// KEYS[1] -> asynq:schedulers:{<schedulerID>}
|
|
|
|
// ARGV[1] -> TTL in seconds
|
|
|
|
// ARGV[2:] -> schedler entries
|
|
|
|
var writeSchedulerEntriesCmd = redis.NewScript(`
|
|
|
|
redis.call("DEL", KEYS[1])
|
|
|
|
for i = 2, #ARGV do
|
|
|
|
redis.call("LPUSH", KEYS[1], ARGV[i])
|
|
|
|
end
|
|
|
|
redis.call("EXPIRE", KEYS[1], ARGV[1])
|
|
|
|
return redis.status_reply("OK")`)
|
|
|
|
|
|
|
|
// WriteSchedulerEntries writes scheduler entries data to redis with expiration set to the value ttl.
|
|
|
|
func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error {
|
|
|
|
args := []interface{}{ttl.Seconds()}
|
|
|
|
for _, e := range entries {
|
|
|
|
bytes, err := json.Marshal(e)
|
|
|
|
if err != nil {
|
|
|
|
continue // skip bad data
|
|
|
|
}
|
|
|
|
args = append(args, bytes)
|
|
|
|
}
|
|
|
|
exp := time.Now().Add(ttl).UTC()
|
|
|
|
key := base.SchedulerEntriesKey(schedulerID)
|
|
|
|
err := r.client.ZAdd(base.AllSchedulers, &redis.Z{Score: float64(exp.Unix()), Member: key}).Err()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return writeSchedulerEntriesCmd.Run(r.client, []string{key}, args...).Err()
|
|
|
|
}
|
|
|
|
|
|
|
|
// ClearSchedulerEntries deletes scheduler entries data from redis.
|
|
|
|
func (r *RDB) ClearSchedulerEntries(scheduelrID string) error {
|
|
|
|
key := base.SchedulerEntriesKey(scheduelrID)
|
|
|
|
if err := r.client.ZRem(base.AllSchedulers, key).Err(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return r.client.Del(key).Err()
|
|
|
|
}
|
|
|
|
|
2020-02-12 17:12:09 -08:00
|
|
|
// CancelationPubSub returns a pubsub for cancelation messages.
|
|
|
|
func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
|
|
|
|
pubsub := r.client.Subscribe(base.CancelChannel)
|
|
|
|
_, err := pubsub.Receive()
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return pubsub, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// PublishCancelation publish cancelation message to all subscribers.
|
2020-02-12 17:33:41 -08:00
|
|
|
// The message is the ID for the task to be canceled.
|
2020-02-12 17:12:09 -08:00
|
|
|
func (r *RDB) PublishCancelation(id string) error {
|
|
|
|
return r.client.Publish(base.CancelChannel, id).Err()
|
|
|
|
}
|
2020-09-26 17:33:29 -07:00
|
|
|
|
|
|
|
// KEYS[1] -> asynq:scheduler_history:<entryID>
|
|
|
|
// ARGV[1] -> enqueued_at timestamp
|
|
|
|
// ARGV[2] -> serialized SchedulerEnqueueEvent data
|
|
|
|
// ARGV[3] -> max number of events to be persisted
|
|
|
|
var recordSchedulerEnqueueEventCmd = redis.NewScript(`
|
2021-01-25 22:32:37 -08:00
|
|
|
redis.call("ZREMRANGEBYRANK", KEYS[1], 0, -ARGV[3])
|
2020-09-26 17:33:29 -07:00
|
|
|
redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2])
|
|
|
|
return redis.status_reply("OK")`)
|
|
|
|
|
|
|
|
// Maximum number of enqueue events to store per entry.
|
2021-01-25 22:32:37 -08:00
|
|
|
const maxEvents = 1000
|
2020-09-26 17:33:29 -07:00
|
|
|
|
|
|
|
// RecordSchedulerEnqueueEvent records the time when the given task was enqueued.
|
|
|
|
func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error {
|
|
|
|
key := base.SchedulerHistoryKey(entryID)
|
|
|
|
data, err := json.Marshal(event)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return recordSchedulerEnqueueEventCmd.Run(
|
|
|
|
r.client, []string{key}, event.EnqueuedAt.Unix(), data, maxEvents).Err()
|
|
|
|
}
|
2021-01-25 22:32:37 -08:00
|
|
|
|
|
|
|
// ClearSchedulerHistory deletes the enqueue event history for the given scheduler entry.
|
|
|
|
func (r *RDB) ClearSchedulerHistory(entryID string) error {
|
|
|
|
key := base.SchedulerHistoryKey(entryID)
|
|
|
|
return r.client.Del(key).Err()
|
|
|
|
}
|