2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-14 11:31:18 +08:00

Clean up error messages from rdb methods

This commit is contained in:
Ken Hibino 2019-11-27 19:43:33 -08:00
parent c9a8f5fabd
commit efaceb8a03

26
rdb.go
View File

@ -21,11 +21,7 @@ const (
inProgress = "asynq:in_progress" // SET inProgress = "asynq:in_progress" // SET
) )
var ( var errDequeueTimeout = errors.New("blocking dequeue operation timed out")
errDequeueTimeout = errors.New("blocking dequeue operation timed out")
errSerializeTask = errors.New("could not encode task message into json")
errDeserializeTask = errors.New("could not decode task message from json")
)
// rdb encapsulates the interactions with redis server. // rdb encapsulates the interactions with redis server.
type rdb struct { type rdb struct {
@ -46,7 +42,7 @@ func newRDB(opt *RedisOpt) *rdb {
func (r *rdb) enqueue(msg *taskMessage) error { func (r *rdb) enqueue(msg *taskMessage) error {
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)
if err != nil { if err != nil {
return fmt.Errorf("could not encode task into JSON: %v", err) return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
} }
qname := queuePrefix + msg.Queue qname := queuePrefix + msg.Queue
pipe := r.client.Pipeline() pipe := r.client.Pipeline()
@ -54,8 +50,7 @@ func (r *rdb) enqueue(msg *taskMessage) error {
pipe.LPush(qname, string(bytes)) pipe.LPush(qname, string(bytes))
_, err = pipe.Exec() _, err = pipe.Exec()
if err != nil { if err != nil {
return fmt.Errorf("could not enqueue task %+v to %q: %v", return fmt.Errorf("could not enqueue the task %+v to %q: %v", msg, qname, err)
msg, qname, err)
} }
return nil return nil
} }
@ -69,12 +64,12 @@ func (r *rdb) dequeue(qname string, timeout time.Duration) (*taskMessage, error)
return nil, errDequeueTimeout return nil, errDequeueTimeout
} }
if err != nil { if err != nil {
return nil, fmt.Errorf("command BRPOPLPUSH %q %q %v failed: %v", qname, inProgress, timeout, err) return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", qname, inProgress, timeout, err)
} }
var msg taskMessage var msg taskMessage
err = json.Unmarshal([]byte(data), &msg) err = json.Unmarshal([]byte(data), &msg)
if err != nil { if err != nil {
return nil, errDeserializeTask return nil, fmt.Errorf("could not unmarshal %v to json: %v", data, err)
} }
fmt.Printf("[DEBUG] perform task %+v from %s\n", msg, qname) fmt.Printf("[DEBUG] perform task %+v from %s\n", msg, qname)
return &msg, nil return &msg, nil
@ -83,12 +78,12 @@ func (r *rdb) dequeue(qname string, timeout time.Duration) (*taskMessage, error)
func (r *rdb) lrem(key string, msg *taskMessage) error { func (r *rdb) lrem(key string, msg *taskMessage) error {
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)
if err != nil { if err != nil {
return fmt.Errorf("could not encode task into JSON: %v", err) return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
} }
// NOTE: count ZERO means "remove all elements equal to val" // NOTE: count ZERO means "remove all elements equal to val"
err = r.client.LRem(key, 0, string(bytes)).Err() err = r.client.LRem(key, 0, string(bytes)).Err()
if err != nil { if err != nil {
return fmt.Errorf("command LREM %s 0 %s failed: %v", key, string(bytes), err) return fmt.Errorf("command `LREM %s 0 %s` failed: %v", key, string(bytes), err)
} }
return nil return nil
} }
@ -97,13 +92,12 @@ func (r *rdb) lrem(key string, msg *taskMessage) error {
func (r *rdb) schedule(zset string, processAt time.Time, msg *taskMessage) error { func (r *rdb) schedule(zset string, processAt time.Time, msg *taskMessage) error {
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)
if err != nil { if err != nil {
return fmt.Errorf("could not encode task into JSON: %v", err) return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
} }
score := float64(processAt.Unix()) score := float64(processAt.Unix())
err = r.client.ZAdd(zset, &redis.Z{Member: string(bytes), Score: score}).Err() err = r.client.ZAdd(zset, &redis.Z{Member: string(bytes), Score: score}).Err()
if err != nil { if err != nil {
return fmt.Errorf("command ZADD %s %.1f %s failed: %v", return fmt.Errorf("command `ZADD %s %.1f %s` failed: %v", zset, score, string(bytes), err)
zset, score, string(bytes), err)
} }
return nil return nil
} }
@ -116,7 +110,7 @@ const deadExpirationInDays = 90
func (r *rdb) kill(msg *taskMessage) error { func (r *rdb) kill(msg *taskMessage) error {
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)
if err != nil { if err != nil {
return fmt.Errorf("could not encode task into JSON: %v", err) return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
} }
now := time.Now() now := time.Now()
pipe := r.client.Pipeline() pipe := r.client.Pipeline()