diff --git a/rdb.go b/rdb.go index 425110a..df0a407 100644 --- a/rdb.go +++ b/rdb.go @@ -28,7 +28,7 @@ var ( errDeserializeTask = errors.New("could not decode task message from json") ) -// rdb encapsulates the interaction with redis server. +// rdb encapsulates the interactions with redis server. type rdb struct { client *redis.Client } @@ -45,21 +45,20 @@ func (r *rdb) enqueue(msg *taskMessage) error { return fmt.Errorf("could not encode task into JSON: %v", err) } qname := queuePrefix + msg.Queue - err = r.client.SAdd(allQueues, qname).Err() + pipe := r.client.Pipeline() + pipe.SAdd(allQueues, qname) + pipe.LPush(qname, string(bytes)) + _, err = pipe.Exec() if err != nil { - return fmt.Errorf("command SADD %q %q failed: %v", - allQueues, qname, err) - } - err = r.client.LPush(qname, string(bytes)).Err() - if err != nil { - return fmt.Errorf("command RPUSH %q %q failed: %v", - qname, string(bytes), err) + return fmt.Errorf("could not enqueue task %+v to %q: %v", + msg, qname, err) } return nil } -// dequeue blocks until there is a taskMessage available to be processed, -// once available, it adds the task to "in progress" list and returns the task. +// dequeue blocks until there is a task available to be processed, +// once a task is available, it adds the task to "in progress" list +// and returns the task. func (r *rdb) dequeue(qname string, timeout time.Duration) (*taskMessage, error) { data, err := r.client.BRPopLPush(qname, inProgress, timeout).Result() if err != nil {