mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-14 11:31:18 +08:00
Update RDB methods with new errors package
This commit is contained in:
parent
a19ad19382
commit
4af65d5fa5
@ -37,12 +37,26 @@ func (r *RDB) Ping() error {
|
|||||||
return r.client.Ping().Err()
|
return r.client.Ping().Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *RDB) runScript(op errors.Op, script *redis.Script, keys []string, args ...interface{}) error {
|
||||||
|
if err := script.Run(r.client, keys, args...).Err(); err != nil {
|
||||||
|
return errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// enqueueCmd enqueues a given task message.
|
||||||
|
//
|
||||||
|
// Input:
|
||||||
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
|
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
|
||||||
// KEYS[2] -> asynq:{<qname>}:pending
|
// KEYS[2] -> asynq:{<qname>}:pending
|
||||||
|
// --
|
||||||
// ARGV[1] -> task message data
|
// ARGV[1] -> task message data
|
||||||
// ARGV[2] -> task ID
|
// ARGV[2] -> task ID
|
||||||
// ARGV[3] -> task timeout in seconds (0 if not timeout)
|
// ARGV[3] -> task timeout in seconds (0 if not timeout)
|
||||||
// ARGV[4] -> task deadline in unix time (0 if no deadline)
|
// ARGV[4] -> task deadline in unix time (0 if no deadline)
|
||||||
|
//
|
||||||
|
// Output:
|
||||||
|
// Returns 1 if successfully enqueued
|
||||||
var enqueueCmd = redis.NewScript(`
|
var enqueueCmd = redis.NewScript(`
|
||||||
redis.call("HSET", KEYS[1],
|
redis.call("HSET", KEYS[1],
|
||||||
"msg", ARGV[1],
|
"msg", ARGV[1],
|
||||||
@ -55,12 +69,13 @@ return 1
|
|||||||
|
|
||||||
// Enqueue adds the given task to the pending list of the queue.
|
// Enqueue adds the given task to the pending list of the queue.
|
||||||
func (r *RDB) Enqueue(msg *base.TaskMessage) error {
|
func (r *RDB) Enqueue(msg *base.TaskMessage) error {
|
||||||
|
var op errors.Op = "rdb.Enqueue"
|
||||||
encoded, err := base.EncodeMessage(msg)
|
encoded, err := base.EncodeMessage(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
||||||
}
|
}
|
||||||
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
|
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
|
||||||
return err
|
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||||
}
|
}
|
||||||
keys := []string{
|
keys := []string{
|
||||||
base.TaskKey(msg.Queue, msg.ID.String()),
|
base.TaskKey(msg.Queue, msg.ID.String()),
|
||||||
@ -72,17 +87,24 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
|
|||||||
msg.Timeout,
|
msg.Timeout,
|
||||||
msg.Deadline,
|
msg.Deadline,
|
||||||
}
|
}
|
||||||
return enqueueCmd.Run(r.client, keys, argv...).Err()
|
return r.runScript(op, enqueueCmd, keys, argv...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// enqueueUniqueCmd enqueues the task message if the task is unique.
|
||||||
|
//
|
||||||
// KEYS[1] -> unique key
|
// KEYS[1] -> unique key
|
||||||
// KEYS[2] -> asynq:{<qname>}:t:<taskid>
|
// KEYS[2] -> asynq:{<qname>}:t:<taskid>
|
||||||
// KEYS[3] -> asynq:{<qname>}:pending
|
// KEYS[3] -> asynq:{<qname>}:pending
|
||||||
|
// --
|
||||||
// ARGV[1] -> task ID
|
// ARGV[1] -> task ID
|
||||||
// ARGV[2] -> uniqueness lock TTL
|
// ARGV[2] -> uniqueness lock TTL
|
||||||
// ARGV[3] -> task message data
|
// ARGV[3] -> task message data
|
||||||
// ARGV[4] -> task timeout in seconds (0 if not timeout)
|
// ARGV[4] -> task timeout in seconds (0 if not timeout)
|
||||||
// ARGV[5] -> task deadline in unix time (0 if no deadline)
|
// ARGV[5] -> task deadline in unix time (0 if no deadline)
|
||||||
|
//
|
||||||
|
// Output:
|
||||||
|
// Returns 1 if successfully enqueued
|
||||||
|
// Returns 0 if task already exists
|
||||||
var enqueueUniqueCmd = redis.NewScript(`
|
var enqueueUniqueCmd = redis.NewScript(`
|
||||||
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
|
local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2])
|
||||||
if not ok then
|
if not ok then
|
||||||
@ -277,6 +299,7 @@ return redis.status_reply("OK")
|
|||||||
// Done removes the task from active queue to mark the task as done.
|
// Done removes the task from active queue to mark the task as done.
|
||||||
// It removes a uniqueness lock acquired by the task, if any.
|
// It removes a uniqueness lock acquired by the task, if any.
|
||||||
func (r *RDB) Done(msg *base.TaskMessage) error {
|
func (r *RDB) Done(msg *base.TaskMessage) error {
|
||||||
|
var op errors.Op = "rdb.Done"
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
expireAt := now.Add(statsTTL)
|
expireAt := now.Add(statsTTL)
|
||||||
keys := []string{
|
keys := []string{
|
||||||
@ -291,9 +314,9 @@ func (r *RDB) Done(msg *base.TaskMessage) error {
|
|||||||
}
|
}
|
||||||
if len(msg.UniqueKey) > 0 {
|
if len(msg.UniqueKey) > 0 {
|
||||||
keys = append(keys, msg.UniqueKey)
|
keys = append(keys, msg.UniqueKey)
|
||||||
return doneUniqueCmd.Run(r.client, keys, argv...).Err()
|
return r.runScript(op, doneUniqueCmd, keys, argv...)
|
||||||
}
|
}
|
||||||
return doneCmd.Run(r.client, keys, argv...).Err()
|
return r.runScript(op, doneCmd, keys, argv...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:{<qname>}:active
|
// KEYS[1] -> asynq:{<qname>}:active
|
||||||
@ -315,13 +338,14 @@ return redis.status_reply("OK")`)
|
|||||||
|
|
||||||
// Requeue moves the task from active queue to the specified queue.
|
// Requeue moves the task from active queue to the specified queue.
|
||||||
func (r *RDB) Requeue(msg *base.TaskMessage) error {
|
func (r *RDB) Requeue(msg *base.TaskMessage) error {
|
||||||
|
var op errors.Op = "rdb.Requeue"
|
||||||
keys := []string{
|
keys := []string{
|
||||||
base.ActiveKey(msg.Queue),
|
base.ActiveKey(msg.Queue),
|
||||||
base.DeadlinesKey(msg.Queue),
|
base.DeadlinesKey(msg.Queue),
|
||||||
base.PendingKey(msg.Queue),
|
base.PendingKey(msg.Queue),
|
||||||
base.TaskKey(msg.Queue, msg.ID.String()),
|
base.TaskKey(msg.Queue, msg.ID.String()),
|
||||||
}
|
}
|
||||||
return requeueCmd.Run(r.client, keys, msg.ID.String()).Err()
|
return r.runScript(op, requeueCmd, keys, msg.ID.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
|
// KEYS[1] -> asynq:{<qname>}:t:<task_id>
|
||||||
@ -343,12 +367,13 @@ return 1
|
|||||||
|
|
||||||
// Schedule adds the task to the scheduled set to be processed in the future.
|
// Schedule adds the task to the scheduled set to be processed in the future.
|
||||||
func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error {
|
func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error {
|
||||||
|
var op errors.Op = "rdb.Schedule"
|
||||||
encoded, err := base.EncodeMessage(msg)
|
encoded, err := base.EncodeMessage(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err))
|
||||||
}
|
}
|
||||||
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
|
if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil {
|
||||||
return err
|
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||||
}
|
}
|
||||||
keys := []string{
|
keys := []string{
|
||||||
base.TaskKey(msg.Queue, msg.ID.String()),
|
base.TaskKey(msg.Queue, msg.ID.String()),
|
||||||
@ -361,7 +386,7 @@ func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error {
|
|||||||
msg.Timeout,
|
msg.Timeout,
|
||||||
msg.Deadline,
|
msg.Deadline,
|
||||||
}
|
}
|
||||||
return scheduleCmd.Run(r.client, keys, argv...).Err()
|
return r.runScript(op, scheduleCmd, keys, argv...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> unique key
|
// KEYS[1] -> unique key
|
||||||
@ -457,12 +482,13 @@ return redis.status_reply("OK")`)
|
|||||||
// Retry moves the task from active to retry queue, incrementing retry count
|
// Retry moves the task from active to retry queue, incrementing retry count
|
||||||
// and assigning error message to the task message.
|
// and assigning error message to the task message.
|
||||||
func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error {
|
func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error {
|
||||||
|
var op errors.Op = "rdb.Retry"
|
||||||
modified := *msg
|
modified := *msg
|
||||||
modified.Retried++
|
modified.Retried++
|
||||||
modified.ErrorMsg = errMsg
|
modified.ErrorMsg = errMsg
|
||||||
encoded, err := base.EncodeMessage(&modified)
|
encoded, err := base.EncodeMessage(&modified)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode message: %v", err))
|
||||||
}
|
}
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
expireAt := now.Add(statsTTL)
|
expireAt := now.Add(statsTTL)
|
||||||
@ -480,7 +506,7 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e
|
|||||||
processAt.Unix(),
|
processAt.Unix(),
|
||||||
expireAt.Unix(),
|
expireAt.Unix(),
|
||||||
}
|
}
|
||||||
return retryCmd.Run(r.client, keys, argv...).Err()
|
return r.runScript(op, retryCmd, keys, argv...)
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -524,11 +550,12 @@ return redis.status_reply("OK")`)
|
|||||||
// Archive sends the given task to archive, attaching the error message to the task.
|
// Archive sends the given task to archive, attaching the error message to the task.
|
||||||
// It also trims the archive by timestamp and set size.
|
// It also trims the archive by timestamp and set size.
|
||||||
func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
|
func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
|
||||||
|
var op errors.Op = "rdb.Archive"
|
||||||
modified := *msg
|
modified := *msg
|
||||||
modified.ErrorMsg = errMsg
|
modified.ErrorMsg = errMsg
|
||||||
encoded, err := base.EncodeMessage(&modified)
|
encoded, err := base.EncodeMessage(&modified)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode message: %v", err))
|
||||||
}
|
}
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
cutoff := now.AddDate(0, 0, -archivedExpirationInDays)
|
cutoff := now.AddDate(0, 0, -archivedExpirationInDays)
|
||||||
@ -549,15 +576,16 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
|
|||||||
maxArchiveSize,
|
maxArchiveSize,
|
||||||
expireAt.Unix(),
|
expireAt.Unix(),
|
||||||
}
|
}
|
||||||
return archiveCmd.Run(r.client, keys, argv...).Err()
|
return r.runScript(op, archiveCmd, keys, argv...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ForwardIfReady checks scheduled and retry sets of the given queues
|
// ForwardIfReady checks scheduled and retry sets of the given queues
|
||||||
// and move any tasks that are ready to be processed to the pending set.
|
// and move any tasks that are ready to be processed to the pending set.
|
||||||
func (r *RDB) ForwardIfReady(qnames ...string) error {
|
func (r *RDB) ForwardIfReady(qnames ...string) error {
|
||||||
|
var op errors.Op = "rdb.ForwardIfReady"
|
||||||
for _, qname := range qnames {
|
for _, qname := range qnames {
|
||||||
if err := r.forwardAll(qname); err != nil {
|
if err := r.forwardAll(qname); err != nil {
|
||||||
return err
|
return errors.E(op, errors.CanonicalCode(err), err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -583,9 +611,13 @@ func (r *RDB) forward(src, dst, taskKeyPrefix string) (int, error) {
|
|||||||
now := float64(time.Now().Unix())
|
now := float64(time.Now().Unix())
|
||||||
res, err := forwardCmd.Run(r.client, []string{src, dst}, now, taskKeyPrefix).Result()
|
res, err := forwardCmd.Run(r.client, []string{src, dst}, now, taskKeyPrefix).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
||||||
}
|
}
|
||||||
return cast.ToInt(res), nil
|
n, err := cast.ToIntE(res)
|
||||||
|
if err != nil {
|
||||||
|
return 0, errors.E(errors.Internal, fmt.Sprintf("cast error: Lua script returned unexpected value: %v", res))
|
||||||
|
}
|
||||||
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// forwardAll checks for tasks in scheduled/retry state that are ready to be run, and updates
|
// forwardAll checks for tasks in scheduled/retry state that are ready to be run, and updates
|
||||||
@ -621,22 +653,23 @@ return res
|
|||||||
|
|
||||||
// ListDeadlineExceeded returns a list of task messages that have exceeded the deadline from the given queues.
|
// ListDeadlineExceeded returns a list of task messages that have exceeded the deadline from the given queues.
|
||||||
func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) {
|
func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) {
|
||||||
|
var op errors.Op = "rdb.ListDeadlineExceeded"
|
||||||
var msgs []*base.TaskMessage
|
var msgs []*base.TaskMessage
|
||||||
for _, qname := range qnames {
|
for _, qname := range qnames {
|
||||||
res, err := listDeadlineExceededCmd.Run(r.client,
|
res, err := listDeadlineExceededCmd.Run(r.client,
|
||||||
[]string{base.DeadlinesKey(qname)},
|
[]string{base.DeadlinesKey(qname)},
|
||||||
deadline.Unix(), base.TaskKeyPrefix(qname)).Result()
|
deadline.Unix(), base.TaskKeyPrefix(qname)).Result()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err))
|
||||||
}
|
}
|
||||||
data, err := cast.ToStringSliceE(res)
|
data, err := cast.ToStringSliceE(res)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, errors.E(op, errors.Internal, fmt.Sprintf("cast error: Lua script returned unexpected value: %v", res))
|
||||||
}
|
}
|
||||||
for _, s := range data {
|
for _, s := range data {
|
||||||
msg, err := base.DecodeMessage([]byte(s))
|
msg, err := base.DecodeMessage([]byte(s))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, errors.E(op, errors.Internal, fmt.Sprintf("cannot decode message: %v", err))
|
||||||
}
|
}
|
||||||
msgs = append(msgs, msg)
|
msgs = append(msgs, msg)
|
||||||
}
|
}
|
||||||
@ -662,9 +695,10 @@ return redis.status_reply("OK")`)
|
|||||||
|
|
||||||
// WriteServerState writes server state data to redis with expiration set to the value ttl.
|
// WriteServerState writes server state data to redis with expiration set to the value ttl.
|
||||||
func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error {
|
func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error {
|
||||||
|
var op errors.Op = "rdb.WriteServerState"
|
||||||
bytes, err := base.EncodeServerInfo(info)
|
bytes, err := base.EncodeServerInfo(info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode server info: %v", err))
|
||||||
}
|
}
|
||||||
exp := time.Now().Add(ttl).UTC()
|
exp := time.Now().Add(ttl).UTC()
|
||||||
args := []interface{}{ttl.Seconds(), bytes} // args to the lua script
|
args := []interface{}{ttl.Seconds(), bytes} // args to the lua script
|
||||||
@ -678,12 +712,12 @@ func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo
|
|||||||
skey := base.ServerInfoKey(info.Host, info.PID, info.ServerID)
|
skey := base.ServerInfoKey(info.Host, info.PID, info.ServerID)
|
||||||
wkey := base.WorkersKey(info.Host, info.PID, info.ServerID)
|
wkey := base.WorkersKey(info.Host, info.PID, info.ServerID)
|
||||||
if err := r.client.ZAdd(base.AllServers, &redis.Z{Score: float64(exp.Unix()), Member: skey}).Err(); err != nil {
|
if err := r.client.ZAdd(base.AllServers, &redis.Z{Score: float64(exp.Unix()), Member: skey}).Err(); err != nil {
|
||||||
return err
|
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err})
|
||||||
}
|
}
|
||||||
if err := r.client.ZAdd(base.AllWorkers, &redis.Z{Score: float64(exp.Unix()), Member: wkey}).Err(); err != nil {
|
if err := r.client.ZAdd(base.AllWorkers, &redis.Z{Score: float64(exp.Unix()), Member: wkey}).Err(); err != nil {
|
||||||
return err
|
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zadd", Err: err})
|
||||||
}
|
}
|
||||||
return writeServerStateCmd.Run(r.client, []string{skey, wkey}, args...).Err()
|
return r.runScript(op, writeServerStateCmd, []string{skey, wkey}, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:servers:{<host:pid:sid>}
|
// KEYS[1] -> asynq:servers:{<host:pid:sid>}
|
||||||
@ -695,15 +729,16 @@ return redis.status_reply("OK")`)
|
|||||||
|
|
||||||
// ClearServerState deletes server state data from redis.
|
// ClearServerState deletes server state data from redis.
|
||||||
func (r *RDB) ClearServerState(host string, pid int, serverID string) error {
|
func (r *RDB) ClearServerState(host string, pid int, serverID string) error {
|
||||||
|
var op errors.Op = "rdb.ClearServerState"
|
||||||
skey := base.ServerInfoKey(host, pid, serverID)
|
skey := base.ServerInfoKey(host, pid, serverID)
|
||||||
wkey := base.WorkersKey(host, pid, serverID)
|
wkey := base.WorkersKey(host, pid, serverID)
|
||||||
if err := r.client.ZRem(base.AllServers, skey).Err(); err != nil {
|
if err := r.client.ZRem(base.AllServers, skey).Err(); err != nil {
|
||||||
return err
|
return errors.E(op, errors.Internal, &errors.RedisCommandError{Command: "zrem", Err: err})
|
||||||
}
|
}
|
||||||
if err := r.client.ZRem(base.AllWorkers, wkey).Err(); err != nil {
|
if err := r.client.ZRem(base.AllWorkers, wkey).Err(); err != nil {
|
||||||
return err
|
return errors.E(op, errors.Internal, &errors.RedisCommandError{Command: "zrem", Err: err})
|
||||||
}
|
}
|
||||||
return clearServerStateCmd.Run(r.client, []string{skey, wkey}).Err()
|
return r.runScript(op, clearServerStateCmd, []string{skey, wkey})
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:schedulers:{<schedulerID>}
|
// KEYS[1] -> asynq:schedulers:{<schedulerID>}
|
||||||
@ -719,6 +754,7 @@ return redis.status_reply("OK")`)
|
|||||||
|
|
||||||
// WriteSchedulerEntries writes scheduler entries data to redis with expiration set to the value ttl.
|
// WriteSchedulerEntries writes scheduler entries data to redis with expiration set to the value ttl.
|
||||||
func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error {
|
func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error {
|
||||||
|
var op errors.Op = "rdb.WriteSchedulerEntries"
|
||||||
args := []interface{}{ttl.Seconds()}
|
args := []interface{}{ttl.Seconds()}
|
||||||
for _, e := range entries {
|
for _, e := range entries {
|
||||||
bytes, err := base.EncodeSchedulerEntry(e)
|
bytes, err := base.EncodeSchedulerEntry(e)
|
||||||
@ -731,26 +767,31 @@ func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.Schedule
|
|||||||
key := base.SchedulerEntriesKey(schedulerID)
|
key := base.SchedulerEntriesKey(schedulerID)
|
||||||
err := r.client.ZAdd(base.AllSchedulers, &redis.Z{Score: float64(exp.Unix()), Member: key}).Err()
|
err := r.client.ZAdd(base.AllSchedulers, &redis.Z{Score: float64(exp.Unix()), Member: key}).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zadd", Err: err})
|
||||||
}
|
}
|
||||||
return writeSchedulerEntriesCmd.Run(r.client, []string{key}, args...).Err()
|
return r.runScript(op, writeSchedulerEntriesCmd, []string{key}, args...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClearSchedulerEntries deletes scheduler entries data from redis.
|
// ClearSchedulerEntries deletes scheduler entries data from redis.
|
||||||
func (r *RDB) ClearSchedulerEntries(scheduelrID string) error {
|
func (r *RDB) ClearSchedulerEntries(scheduelrID string) error {
|
||||||
|
var op errors.Op = "rdb.ClearSchedulerEntries"
|
||||||
key := base.SchedulerEntriesKey(scheduelrID)
|
key := base.SchedulerEntriesKey(scheduelrID)
|
||||||
if err := r.client.ZRem(base.AllSchedulers, key).Err(); err != nil {
|
if err := r.client.ZRem(base.AllSchedulers, key).Err(); err != nil {
|
||||||
return err
|
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zrem", Err: err})
|
||||||
}
|
}
|
||||||
return r.client.Del(key).Err()
|
if err := r.client.Del(key).Err(); err != nil {
|
||||||
|
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "del", Err: err})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CancelationPubSub returns a pubsub for cancelation messages.
|
// CancelationPubSub returns a pubsub for cancelation messages.
|
||||||
func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
|
func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
|
||||||
|
var op errors.Op = "rdb.CancelationPubSub"
|
||||||
pubsub := r.client.Subscribe(base.CancelChannel)
|
pubsub := r.client.Subscribe(base.CancelChannel)
|
||||||
_, err := pubsub.Receive()
|
_, err := pubsub.Receive()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err))
|
||||||
}
|
}
|
||||||
return pubsub, nil
|
return pubsub, nil
|
||||||
}
|
}
|
||||||
@ -758,7 +799,11 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
|
|||||||
// PublishCancelation publish cancelation message to all subscribers.
|
// PublishCancelation publish cancelation message to all subscribers.
|
||||||
// The message is the ID for the task to be canceled.
|
// The message is the ID for the task to be canceled.
|
||||||
func (r *RDB) PublishCancelation(id string) error {
|
func (r *RDB) PublishCancelation(id string) error {
|
||||||
return r.client.Publish(base.CancelChannel, id).Err()
|
var op errors.Op = "rdb.PublishCancelation"
|
||||||
|
if err := r.client.Publish(base.CancelChannel, id).Err(); err != nil {
|
||||||
|
return errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub publish error: %v", err))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// KEYS[1] -> asynq:scheduler_history:<entryID>
|
// KEYS[1] -> asynq:scheduler_history:<entryID>
|
||||||
@ -775,17 +820,28 @@ const maxEvents = 1000
|
|||||||
|
|
||||||
// RecordSchedulerEnqueueEvent records the time when the given task was enqueued.
|
// RecordSchedulerEnqueueEvent records the time when the given task was enqueued.
|
||||||
func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error {
|
func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error {
|
||||||
key := base.SchedulerHistoryKey(entryID)
|
var op errors.Op = "rdb.RecordSchedulerEnqueueEvent"
|
||||||
data, err := base.EncodeSchedulerEnqueueEvent(event)
|
data, err := base.EncodeSchedulerEnqueueEvent(event)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode scheduler enqueue event: %v", err))
|
||||||
}
|
}
|
||||||
return recordSchedulerEnqueueEventCmd.Run(
|
keys := []string{
|
||||||
r.client, []string{key}, event.EnqueuedAt.Unix(), data, maxEvents).Err()
|
base.SchedulerHistoryKey(entryID),
|
||||||
|
}
|
||||||
|
argv := []interface{}{
|
||||||
|
event.EnqueuedAt.Unix(),
|
||||||
|
data,
|
||||||
|
maxEvents,
|
||||||
|
}
|
||||||
|
return r.runScript(op, recordSchedulerEnqueueEventCmd, keys, argv...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ClearSchedulerHistory deletes the enqueue event history for the given scheduler entry.
|
// ClearSchedulerHistory deletes the enqueue event history for the given scheduler entry.
|
||||||
func (r *RDB) ClearSchedulerHistory(entryID string) error {
|
func (r *RDB) ClearSchedulerHistory(entryID string) error {
|
||||||
|
var op errors.Op = "rdb.ClearSchedulerHistory"
|
||||||
key := base.SchedulerHistoryKey(entryID)
|
key := base.SchedulerHistoryKey(entryID)
|
||||||
return r.client.Del(key).Err()
|
if err := r.client.Del(key).Err(); err != nil {
|
||||||
|
return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "del", Err: err})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user