diff --git a/poller.go b/poller.go index 7f5452f..7569278 100644 --- a/poller.go +++ b/poller.go @@ -32,8 +32,7 @@ func newPoller(rdb *rdb, avgInterval time.Duration, zsets []string) *poller { } func (p *poller) terminate() { - // send a signal to the manager goroutine to stop - // processing tasks from the queue. + // Signal the poller goroutine to stop polling. p.done <- struct{}{} } @@ -47,29 +46,27 @@ func (p *poller) start() { fmt.Println("------------------------------------") return default: - p.enqueue() + p.exec() time.Sleep(p.avgInterval) } } }() } -func (p *poller) enqueue() { +func (p *poller) exec() { for _, zset := range p.zsets { // Get next items in the queue with scores (time to execute) <= now. now := time.Now().Unix() - fmt.Printf("[DEBUG] polling ZSET %q\n", zset) - msgs, err := p.rdb.zRangeByScore(zset, - &redis.ZRangeBy{Min: "-inf", Max: strconv.Itoa(int(now))}) + msgs, err := p.rdb.zRangeByScore(zset, &redis.ZRangeBy{Min: "-inf", Max: strconv.Itoa(int(now))}) if err != nil { log.Printf("radis command ZRANGEBYSCORE failed: %v\n", err) continue } + fmt.Printf("[DEBUG] got %d tasks from %q\n", len(msgs), zset) for _, m := range msgs { if err := p.rdb.move(zset, m); err != nil { - log.Printf("could not move task %+v to queue %q: %v", - m, m.Queue, err) + log.Printf("could not move task %+v to queue %q: %v", m, m.Queue, err) continue } } diff --git a/processor.go b/processor.go index cabf37d..05a9ffe 100644 --- a/processor.go +++ b/processor.go @@ -31,8 +31,7 @@ func newProcessor(rdb *rdb, numWorkers int, handler TaskHandler) *processor { } func (p *processor) terminate() { - // send a signal to the processor goroutine to stop - // processing tasks from the queue. + // Signal he processor goroutine to stop processing tasks from the queue. p.done <- struct{}{} fmt.Println("--- Waiting for all workers to finish ---") diff --git a/rdb.go b/rdb.go index 4492a0e..33136b0 100644 --- a/rdb.go +++ b/rdb.go @@ -67,12 +67,12 @@ func (r *rdb) bpop(timeout time.Duration, keys ...string) (*taskMessage, error) return nil, errQueuePopTimeout } q, data := res[0], res[1] - fmt.Printf("perform task %v from %s\n", data, q) var msg taskMessage err = json.Unmarshal([]byte(data), &msg) if err != nil { return nil, errDeserializeTask } + fmt.Printf("[DEBUG] perform task %+v from %s\n", msg, q) return &msg, nil } @@ -92,7 +92,6 @@ func (r *rdb) zadd(zset string, zscore float64, msg *taskMessage) error { func (r *rdb) zRangeByScore(key string, opt *redis.ZRangeBy) ([]*taskMessage, error) { jobs, err := r.client.ZRangeByScore(key, opt).Result() - fmt.Printf("len(jobs) = %d\n", len(jobs)) if err != nil { return nil, fmt.Errorf("command ZRANGEBYSCORE %s %v failed: %v", key, opt, err) }