From 4af65d5fa5e10f5afc86bd5efa6f4852a05ad8e6 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Mon, 10 May 2021 09:16:13 -0700 Subject: [PATCH] Update RDB methods with new errors package --- internal/rdb/rdb.go | 130 +++++++++++++++++++++++++++++++------------- 1 file changed, 93 insertions(+), 37 deletions(-) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 8d59afe..21fdfc3 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -37,12 +37,26 @@ func (r *RDB) Ping() error { return r.client.Ping().Err() } +func (r *RDB) runScript(op errors.Op, script *redis.Script, keys []string, args ...interface{}) error { + if err := script.Run(r.client, keys, args...).Err(); err != nil { + return errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err)) + } + return nil +} + +// enqueueCmd enqueues a given task message. +// +// Input: // KEYS[1] -> asynq:{}:t: // KEYS[2] -> asynq:{}:pending +// -- // ARGV[1] -> task message data // ARGV[2] -> task ID // ARGV[3] -> task timeout in seconds (0 if not timeout) // ARGV[4] -> task deadline in unix time (0 if no deadline) +// +// Output: +// Returns 1 if successfully enqueued var enqueueCmd = redis.NewScript(` redis.call("HSET", KEYS[1], "msg", ARGV[1], @@ -55,12 +69,13 @@ return 1 // Enqueue adds the given task to the pending list of the queue. func (r *RDB) Enqueue(msg *base.TaskMessage) error { + var op errors.Op = "rdb.Enqueue" encoded, err := base.EncodeMessage(msg) if err != nil { - return err + return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err)) } if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { - return err + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) } keys := []string{ base.TaskKey(msg.Queue, msg.ID.String()), @@ -72,17 +87,24 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { msg.Timeout, msg.Deadline, } - return enqueueCmd.Run(r.client, keys, argv...).Err() + return r.runScript(op, enqueueCmd, keys, argv...) } +// enqueueUniqueCmd enqueues the task message if the task is unique. +// // KEYS[1] -> unique key // KEYS[2] -> asynq:{}:t: // KEYS[3] -> asynq:{}:pending +// -- // ARGV[1] -> task ID // ARGV[2] -> uniqueness lock TTL // ARGV[3] -> task message data // ARGV[4] -> task timeout in seconds (0 if not timeout) // ARGV[5] -> task deadline in unix time (0 if no deadline) +// +// Output: +// Returns 1 if successfully enqueued +// Returns 0 if task already exists var enqueueUniqueCmd = redis.NewScript(` local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) if not ok then @@ -277,6 +299,7 @@ return redis.status_reply("OK") // Done removes the task from active queue to mark the task as done. // It removes a uniqueness lock acquired by the task, if any. func (r *RDB) Done(msg *base.TaskMessage) error { + var op errors.Op = "rdb.Done" now := time.Now() expireAt := now.Add(statsTTL) keys := []string{ @@ -291,9 +314,9 @@ func (r *RDB) Done(msg *base.TaskMessage) error { } if len(msg.UniqueKey) > 0 { keys = append(keys, msg.UniqueKey) - return doneUniqueCmd.Run(r.client, keys, argv...).Err() + return r.runScript(op, doneUniqueCmd, keys, argv...) } - return doneCmd.Run(r.client, keys, argv...).Err() + return r.runScript(op, doneCmd, keys, argv...) } // KEYS[1] -> asynq:{}:active @@ -315,13 +338,14 @@ return redis.status_reply("OK")`) // Requeue moves the task from active queue to the specified queue. func (r *RDB) Requeue(msg *base.TaskMessage) error { + var op errors.Op = "rdb.Requeue" keys := []string{ base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.PendingKey(msg.Queue), base.TaskKey(msg.Queue, msg.ID.String()), } - return requeueCmd.Run(r.client, keys, msg.ID.String()).Err() + return r.runScript(op, requeueCmd, keys, msg.ID.String()) } // KEYS[1] -> asynq:{}:t: @@ -343,12 +367,13 @@ return 1 // Schedule adds the task to the scheduled set to be processed in the future. func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { + var op errors.Op = "rdb.Schedule" encoded, err := base.EncodeMessage(msg) if err != nil { - return err + return errors.E(op, errors.Unknown, fmt.Sprintf("cannot encode message: %v", err)) } if err := r.client.SAdd(base.AllQueues, msg.Queue).Err(); err != nil { - return err + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) } keys := []string{ base.TaskKey(msg.Queue, msg.ID.String()), @@ -361,7 +386,7 @@ func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { msg.Timeout, msg.Deadline, } - return scheduleCmd.Run(r.client, keys, argv...).Err() + return r.runScript(op, scheduleCmd, keys, argv...) } // KEYS[1] -> unique key @@ -457,12 +482,13 @@ return redis.status_reply("OK")`) // Retry moves the task from active to retry queue, incrementing retry count // and assigning error message to the task message. func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error { + var op errors.Op = "rdb.Retry" modified := *msg modified.Retried++ modified.ErrorMsg = errMsg encoded, err := base.EncodeMessage(&modified) if err != nil { - return err + return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode message: %v", err)) } now := time.Now() expireAt := now.Add(statsTTL) @@ -480,7 +506,7 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e processAt.Unix(), expireAt.Unix(), } - return retryCmd.Run(r.client, keys, argv...).Err() + return r.runScript(op, retryCmd, keys, argv...) } const ( @@ -524,11 +550,12 @@ return redis.status_reply("OK")`) // Archive sends the given task to archive, attaching the error message to the task. // It also trims the archive by timestamp and set size. func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error { + var op errors.Op = "rdb.Archive" modified := *msg modified.ErrorMsg = errMsg encoded, err := base.EncodeMessage(&modified) if err != nil { - return err + return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode message: %v", err)) } now := time.Now() cutoff := now.AddDate(0, 0, -archivedExpirationInDays) @@ -549,15 +576,16 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error { maxArchiveSize, expireAt.Unix(), } - return archiveCmd.Run(r.client, keys, argv...).Err() + return r.runScript(op, archiveCmd, keys, argv...) } // ForwardIfReady checks scheduled and retry sets of the given queues // and move any tasks that are ready to be processed to the pending set. func (r *RDB) ForwardIfReady(qnames ...string) error { + var op errors.Op = "rdb.ForwardIfReady" for _, qname := range qnames { if err := r.forwardAll(qname); err != nil { - return err + return errors.E(op, errors.CanonicalCode(err), err) } } return nil @@ -583,9 +611,13 @@ func (r *RDB) forward(src, dst, taskKeyPrefix string) (int, error) { now := float64(time.Now().Unix()) res, err := forwardCmd.Run(r.client, []string{src, dst}, now, taskKeyPrefix).Result() if err != nil { - return 0, err + return 0, errors.E(errors.Internal, fmt.Sprintf("redis eval error: %v", err)) } - return cast.ToInt(res), nil + n, err := cast.ToIntE(res) + if err != nil { + return 0, errors.E(errors.Internal, fmt.Sprintf("cast error: Lua script returned unexpected value: %v", res)) + } + return n, nil } // forwardAll checks for tasks in scheduled/retry state that are ready to be run, and updates @@ -621,22 +653,23 @@ return res // ListDeadlineExceeded returns a list of task messages that have exceeded the deadline from the given queues. func (r *RDB) ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*base.TaskMessage, error) { + var op errors.Op = "rdb.ListDeadlineExceeded" var msgs []*base.TaskMessage for _, qname := range qnames { res, err := listDeadlineExceededCmd.Run(r.client, []string{base.DeadlinesKey(qname)}, deadline.Unix(), base.TaskKeyPrefix(qname)).Result() if err != nil { - return nil, err + return nil, errors.E(op, errors.Internal, fmt.Sprintf("redis eval error: %v", err)) } data, err := cast.ToStringSliceE(res) if err != nil { - return nil, err + return nil, errors.E(op, errors.Internal, fmt.Sprintf("cast error: Lua script returned unexpected value: %v", res)) } for _, s := range data { msg, err := base.DecodeMessage([]byte(s)) if err != nil { - return nil, err + return nil, errors.E(op, errors.Internal, fmt.Sprintf("cannot decode message: %v", err)) } msgs = append(msgs, msg) } @@ -662,9 +695,10 @@ return redis.status_reply("OK")`) // WriteServerState writes server state data to redis with expiration set to the value ttl. func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo, ttl time.Duration) error { + var op errors.Op = "rdb.WriteServerState" bytes, err := base.EncodeServerInfo(info) if err != nil { - return err + return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode server info: %v", err)) } exp := time.Now().Add(ttl).UTC() args := []interface{}{ttl.Seconds(), bytes} // args to the lua script @@ -678,12 +712,12 @@ func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo skey := base.ServerInfoKey(info.Host, info.PID, info.ServerID) wkey := base.WorkersKey(info.Host, info.PID, info.ServerID) if err := r.client.ZAdd(base.AllServers, &redis.Z{Score: float64(exp.Unix()), Member: skey}).Err(); err != nil { - return err + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) } if err := r.client.ZAdd(base.AllWorkers, &redis.Z{Score: float64(exp.Unix()), Member: wkey}).Err(); err != nil { - return err + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zadd", Err: err}) } - return writeServerStateCmd.Run(r.client, []string{skey, wkey}, args...).Err() + return r.runScript(op, writeServerStateCmd, []string{skey, wkey}, args...) } // KEYS[1] -> asynq:servers:{} @@ -695,15 +729,16 @@ return redis.status_reply("OK")`) // ClearServerState deletes server state data from redis. func (r *RDB) ClearServerState(host string, pid int, serverID string) error { + var op errors.Op = "rdb.ClearServerState" skey := base.ServerInfoKey(host, pid, serverID) wkey := base.WorkersKey(host, pid, serverID) if err := r.client.ZRem(base.AllServers, skey).Err(); err != nil { - return err + return errors.E(op, errors.Internal, &errors.RedisCommandError{Command: "zrem", Err: err}) } if err := r.client.ZRem(base.AllWorkers, wkey).Err(); err != nil { - return err + return errors.E(op, errors.Internal, &errors.RedisCommandError{Command: "zrem", Err: err}) } - return clearServerStateCmd.Run(r.client, []string{skey, wkey}).Err() + return r.runScript(op, clearServerStateCmd, []string{skey, wkey}) } // KEYS[1] -> asynq:schedulers:{} @@ -719,6 +754,7 @@ return redis.status_reply("OK")`) // WriteSchedulerEntries writes scheduler entries data to redis with expiration set to the value ttl. func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error { + var op errors.Op = "rdb.WriteSchedulerEntries" args := []interface{}{ttl.Seconds()} for _, e := range entries { bytes, err := base.EncodeSchedulerEntry(e) @@ -731,26 +767,31 @@ func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.Schedule key := base.SchedulerEntriesKey(schedulerID) err := r.client.ZAdd(base.AllSchedulers, &redis.Z{Score: float64(exp.Unix()), Member: key}).Err() if err != nil { - return err + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zadd", Err: err}) } - return writeSchedulerEntriesCmd.Run(r.client, []string{key}, args...).Err() + return r.runScript(op, writeSchedulerEntriesCmd, []string{key}, args...) } // ClearSchedulerEntries deletes scheduler entries data from redis. func (r *RDB) ClearSchedulerEntries(scheduelrID string) error { + var op errors.Op = "rdb.ClearSchedulerEntries" key := base.SchedulerEntriesKey(scheduelrID) if err := r.client.ZRem(base.AllSchedulers, key).Err(); err != nil { - return err + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "zrem", Err: err}) } - return r.client.Del(key).Err() + if err := r.client.Del(key).Err(); err != nil { + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "del", Err: err}) + } + return nil } // CancelationPubSub returns a pubsub for cancelation messages. func (r *RDB) CancelationPubSub() (*redis.PubSub, error) { + var op errors.Op = "rdb.CancelationPubSub" pubsub := r.client.Subscribe(base.CancelChannel) _, err := pubsub.Receive() if err != nil { - return nil, err + return nil, errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub receive error: %v", err)) } return pubsub, nil } @@ -758,7 +799,11 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) { // PublishCancelation publish cancelation message to all subscribers. // The message is the ID for the task to be canceled. func (r *RDB) PublishCancelation(id string) error { - return r.client.Publish(base.CancelChannel, id).Err() + var op errors.Op = "rdb.PublishCancelation" + if err := r.client.Publish(base.CancelChannel, id).Err(); err != nil { + return errors.E(op, errors.Unknown, fmt.Sprintf("redis pubsub publish error: %v", err)) + } + return nil } // KEYS[1] -> asynq:scheduler_history: @@ -775,17 +820,28 @@ const maxEvents = 1000 // RecordSchedulerEnqueueEvent records the time when the given task was enqueued. func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error { - key := base.SchedulerHistoryKey(entryID) + var op errors.Op = "rdb.RecordSchedulerEnqueueEvent" data, err := base.EncodeSchedulerEnqueueEvent(event) if err != nil { - return err + return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode scheduler enqueue event: %v", err)) } - return recordSchedulerEnqueueEventCmd.Run( - r.client, []string{key}, event.EnqueuedAt.Unix(), data, maxEvents).Err() + keys := []string{ + base.SchedulerHistoryKey(entryID), + } + argv := []interface{}{ + event.EnqueuedAt.Unix(), + data, + maxEvents, + } + return r.runScript(op, recordSchedulerEnqueueEventCmd, keys, argv...) } // ClearSchedulerHistory deletes the enqueue event history for the given scheduler entry. func (r *RDB) ClearSchedulerHistory(entryID string) error { + var op errors.Op = "rdb.ClearSchedulerHistory" key := base.SchedulerHistoryKey(entryID) - return r.client.Del(key).Err() + if err := r.client.Del(key).Err(); err != nil { + return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "del", Err: err}) + } + return nil }