diff --git a/rdb.go b/rdb.go index 4dcb107..bd22df0 100644 --- a/rdb.go +++ b/rdb.go @@ -21,11 +21,7 @@ const ( inProgress = "asynq:in_progress" // SET ) -var ( - errDequeueTimeout = errors.New("blocking dequeue operation timed out") - errSerializeTask = errors.New("could not encode task message into json") - errDeserializeTask = errors.New("could not decode task message from json") -) +var errDequeueTimeout = errors.New("blocking dequeue operation timed out") // rdb encapsulates the interactions with redis server. type rdb struct { @@ -46,7 +42,7 @@ func newRDB(opt *RedisOpt) *rdb { func (r *rdb) enqueue(msg *taskMessage) error { bytes, err := json.Marshal(msg) if err != nil { - return fmt.Errorf("could not encode task into JSON: %v", err) + return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } qname := queuePrefix + msg.Queue pipe := r.client.Pipeline() @@ -54,8 +50,7 @@ func (r *rdb) enqueue(msg *taskMessage) error { pipe.LPush(qname, string(bytes)) _, err = pipe.Exec() if err != nil { - return fmt.Errorf("could not enqueue task %+v to %q: %v", - msg, qname, err) + return fmt.Errorf("could not enqueue the task %+v to %q: %v", msg, qname, err) } return nil } @@ -69,12 +64,12 @@ func (r *rdb) dequeue(qname string, timeout time.Duration) (*taskMessage, error) return nil, errDequeueTimeout } if err != nil { - return nil, fmt.Errorf("command BRPOPLPUSH %q %q %v failed: %v", qname, inProgress, timeout, err) + return nil, fmt.Errorf("command `BRPOPLPUSH %q %q %v` failed: %v", qname, inProgress, timeout, err) } var msg taskMessage err = json.Unmarshal([]byte(data), &msg) if err != nil { - return nil, errDeserializeTask + return nil, fmt.Errorf("could not unmarshal %v to json: %v", data, err) } fmt.Printf("[DEBUG] perform task %+v from %s\n", msg, qname) return &msg, nil @@ -83,12 +78,12 @@ func (r *rdb) dequeue(qname string, timeout time.Duration) (*taskMessage, error) func (r *rdb) lrem(key string, msg *taskMessage) error { bytes, err := json.Marshal(msg) if err != nil { - return fmt.Errorf("could not encode task into JSON: %v", err) + return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } // NOTE: count ZERO means "remove all elements equal to val" err = r.client.LRem(key, 0, string(bytes)).Err() if err != nil { - return fmt.Errorf("command LREM %s 0 %s failed: %v", key, string(bytes), err) + return fmt.Errorf("command `LREM %s 0 %s` failed: %v", key, string(bytes), err) } return nil } @@ -97,13 +92,12 @@ func (r *rdb) lrem(key string, msg *taskMessage) error { func (r *rdb) schedule(zset string, processAt time.Time, msg *taskMessage) error { bytes, err := json.Marshal(msg) if err != nil { - return fmt.Errorf("could not encode task into JSON: %v", err) + return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } score := float64(processAt.Unix()) err = r.client.ZAdd(zset, &redis.Z{Member: string(bytes), Score: score}).Err() if err != nil { - return fmt.Errorf("command ZADD %s %.1f %s failed: %v", - zset, score, string(bytes), err) + return fmt.Errorf("command `ZADD %s %.1f %s` failed: %v", zset, score, string(bytes), err) } return nil } @@ -116,7 +110,7 @@ const deadExpirationInDays = 90 func (r *rdb) kill(msg *taskMessage) error { bytes, err := json.Marshal(msg) if err != nil { - return fmt.Errorf("could not encode task into JSON: %v", err) + return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } now := time.Now() pipe := r.client.Pipeline()