2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Minor cleanup

This commit is contained in:
Ken Hibino 2019-11-20 20:27:01 -08:00
parent 2dd5f2c5ab
commit 3dddcfbb14
3 changed files with 8 additions and 13 deletions

View File

@ -32,8 +32,7 @@ func newPoller(rdb *rdb, avgInterval time.Duration, zsets []string) *poller {
} }
func (p *poller) terminate() { func (p *poller) terminate() {
// send a signal to the manager goroutine to stop // Signal the poller goroutine to stop polling.
// processing tasks from the queue.
p.done <- struct{}{} p.done <- struct{}{}
} }
@ -47,29 +46,27 @@ func (p *poller) start() {
fmt.Println("------------------------------------") fmt.Println("------------------------------------")
return return
default: default:
p.enqueue() p.exec()
time.Sleep(p.avgInterval) time.Sleep(p.avgInterval)
} }
} }
}() }()
} }
func (p *poller) enqueue() { func (p *poller) exec() {
for _, zset := range p.zsets { for _, zset := range p.zsets {
// Get next items in the queue with scores (time to execute) <= now. // Get next items in the queue with scores (time to execute) <= now.
now := time.Now().Unix() now := time.Now().Unix()
fmt.Printf("[DEBUG] polling ZSET %q\n", zset) msgs, err := p.rdb.zRangeByScore(zset, &redis.ZRangeBy{Min: "-inf", Max: strconv.Itoa(int(now))})
msgs, err := p.rdb.zRangeByScore(zset,
&redis.ZRangeBy{Min: "-inf", Max: strconv.Itoa(int(now))})
if err != nil { if err != nil {
log.Printf("radis command ZRANGEBYSCORE failed: %v\n", err) log.Printf("radis command ZRANGEBYSCORE failed: %v\n", err)
continue continue
} }
fmt.Printf("[DEBUG] got %d tasks from %q\n", len(msgs), zset)
for _, m := range msgs { for _, m := range msgs {
if err := p.rdb.move(zset, m); err != nil { if err := p.rdb.move(zset, m); err != nil {
log.Printf("could not move task %+v to queue %q: %v", log.Printf("could not move task %+v to queue %q: %v", m, m.Queue, err)
m, m.Queue, err)
continue continue
} }
} }

View File

@ -31,8 +31,7 @@ func newProcessor(rdb *rdb, numWorkers int, handler TaskHandler) *processor {
} }
func (p *processor) terminate() { func (p *processor) terminate() {
// send a signal to the processor goroutine to stop // Signal he processor goroutine to stop processing tasks from the queue.
// processing tasks from the queue.
p.done <- struct{}{} p.done <- struct{}{}
fmt.Println("--- Waiting for all workers to finish ---") fmt.Println("--- Waiting for all workers to finish ---")

3
rdb.go
View File

@ -67,12 +67,12 @@ func (r *rdb) bpop(timeout time.Duration, keys ...string) (*taskMessage, error)
return nil, errQueuePopTimeout return nil, errQueuePopTimeout
} }
q, data := res[0], res[1] q, data := res[0], res[1]
fmt.Printf("perform task %v from %s\n", data, q)
var msg taskMessage var msg taskMessage
err = json.Unmarshal([]byte(data), &msg) err = json.Unmarshal([]byte(data), &msg)
if err != nil { if err != nil {
return nil, errDeserializeTask return nil, errDeserializeTask
} }
fmt.Printf("[DEBUG] perform task %+v from %s\n", msg, q)
return &msg, nil return &msg, nil
} }
@ -92,7 +92,6 @@ func (r *rdb) zadd(zset string, zscore float64, msg *taskMessage) error {
func (r *rdb) zRangeByScore(key string, opt *redis.ZRangeBy) ([]*taskMessage, error) { func (r *rdb) zRangeByScore(key string, opt *redis.ZRangeBy) ([]*taskMessage, error) {
jobs, err := r.client.ZRangeByScore(key, opt).Result() jobs, err := r.client.ZRangeByScore(key, opt).Result()
fmt.Printf("len(jobs) = %d\n", len(jobs))
if err != nil { if err != nil {
return nil, fmt.Errorf("command ZRANGEBYSCORE %s %v failed: %v", key, opt, err) return nil, fmt.Errorf("command ZRANGEBYSCORE %s %v failed: %v", key, opt, err)
} }