2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Use pipeline for enqueue

This commit is contained in:
Ken Hibino 2019-11-25 20:10:35 -08:00
parent faa9b6ee22
commit 199dcf8fdb

21
rdb.go
View File

@ -28,7 +28,7 @@ var (
errDeserializeTask = errors.New("could not decode task message from json") errDeserializeTask = errors.New("could not decode task message from json")
) )
// rdb encapsulates the interaction with redis server. // rdb encapsulates the interactions with redis server.
type rdb struct { type rdb struct {
client *redis.Client client *redis.Client
} }
@ -45,21 +45,20 @@ func (r *rdb) enqueue(msg *taskMessage) error {
return fmt.Errorf("could not encode task into JSON: %v", err) return fmt.Errorf("could not encode task into JSON: %v", err)
} }
qname := queuePrefix + msg.Queue qname := queuePrefix + msg.Queue
err = r.client.SAdd(allQueues, qname).Err() pipe := r.client.Pipeline()
pipe.SAdd(allQueues, qname)
pipe.LPush(qname, string(bytes))
_, err = pipe.Exec()
if err != nil { if err != nil {
return fmt.Errorf("command SADD %q %q failed: %v", return fmt.Errorf("could not enqueue task %+v to %q: %v",
allQueues, qname, err) msg, qname, err)
}
err = r.client.LPush(qname, string(bytes)).Err()
if err != nil {
return fmt.Errorf("command RPUSH %q %q failed: %v",
qname, string(bytes), err)
} }
return nil return nil
} }
// dequeue blocks until there is a taskMessage available to be processed, // dequeue blocks until there is a task available to be processed,
// once available, it adds the task to "in progress" list and returns the task. // once a task is available, it adds the task to "in progress" list
// and returns the task.
func (r *rdb) dequeue(qname string, timeout time.Duration) (*taskMessage, error) { func (r *rdb) dequeue(qname string, timeout time.Duration) (*taskMessage, error) {
data, err := r.client.BRPopLPush(qname, inProgress, timeout).Result() data, err := r.client.BRPopLPush(qname, inProgress, timeout).Result()
if err != nil { if err != nil {