diff --git a/asynq.go b/asynq.go index 0c5cfe7..5aa98cc 100644 --- a/asynq.go +++ b/asynq.go @@ -100,11 +100,11 @@ func (w *Workers) Run() { for { // pull message out of the queue and process it - // TODO(hibiken): get a list of queues in order of priority - res, err := w.rdb.BLPop(0, "asynq:queues:test").Result() // A timeout of zero means block indefinitely. + // TODO(hibiken): sort the list of queues in order of priority + res, err := w.rdb.BLPop(0, listQueues(w.rdb)...).Result() // A timeout of zero means block indefinitely. if err != nil { if err != redis.Nil { - log.Printf("error when BLPOP from %s: %v\n", "aysnq:queues:test", err) + log.Printf("BLPOP command failed: %v\n", err) } continue } @@ -163,15 +163,22 @@ func (w *Workers) pollScheduledTasks() { } } +// push pushes the task to the specified queue to get picked up by a worker. func push(rdb *redis.Client, queue string, t *Task) error { bytes, err := json.Marshal(t) if err != nil { return fmt.Errorf("could not encode task into JSON: %v", err) } - err = rdb.SAdd(allQueues, queue).Err() + qname := queuePrefix + queue + err = rdb.SAdd(allQueues, qname).Err() if err != nil { return fmt.Errorf("could not execute command SADD %q %q: %v", - allQueues, queue, err) + allQueues, qname, err) } - return rdb.RPush(queuePrefix+queue, string(bytes)).Err() + return rdb.RPush(qname, string(bytes)).Err() +} + +// listQueues returns the list of all queues. +func listQueues(rdb *redis.Client) []string { + return rdb.SMembers(allQueues).Val() }