From d2a6cc127d5fc6980d27b26f27408f4717c8c413 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 26 Nov 2019 06:52:58 -0800 Subject: [PATCH] Use (*rdb).forward in poller --- poller.go | 20 ++------------------ rdb.go | 44 +++----------------------------------------- 2 files changed, 5 insertions(+), 59 deletions(-) diff --git a/poller.go b/poller.go index 3d09eb0..046dc5b 100644 --- a/poller.go +++ b/poller.go @@ -3,10 +3,7 @@ package asynq import ( "fmt" "log" - "strconv" "time" - - "github.com/go-redis/redis/v7" ) type poller struct { @@ -56,21 +53,8 @@ func (p *poller) start() { func (p *poller) exec() { for _, zset := range p.zsets { - // Get next items in the queue with scores (time to execute) <= now. - now := time.Now().Unix() - msgs, err := p.rdb.zRangeByScore(zset, &redis.ZRangeBy{Min: "-inf", Max: strconv.Itoa(int(now))}) - if err != nil { - log.Printf("radis command ZRANGEBYSCORE failed: %v\n", err) - continue - } - fmt.Printf("[DEBUG] got %d tasks from %q\n", len(msgs), zset) - - for _, m := range msgs { - // TODO(hibiken): Make this move operation atomic. - if err := p.rdb.move(zset, m); err != nil { - log.Printf("could not move task %+v to queue %q: %v", m, m.Queue, err) - continue - } + if err := p.rdb.forward(zset); err != nil { + log.Printf("[ERROR] could not forward scheduled tasks from %q: %v", zset, err) } } } diff --git a/rdb.go b/rdb.go index 65ccc16..67dd2d6 100644 --- a/rdb.go +++ b/rdb.go @@ -4,7 +4,6 @@ import ( "encoding/json" "errors" "fmt" - "log" "strconv" "time" @@ -103,45 +102,6 @@ func (r *rdb) zadd(zset string, zscore float64, msg *taskMessage) error { return nil } -func (r *rdb) zRangeByScore(key string, opt *redis.ZRangeBy) ([]*taskMessage, error) { - jobs, err := r.client.ZRangeByScore(key, opt).Result() - if err != nil { - return nil, fmt.Errorf("command ZRANGEBYSCORE %s %v failed: %v", key, opt, err) - } - var msgs []*taskMessage - for _, j := range jobs { - fmt.Printf("[debug] j = %v\n", j) - var msg taskMessage - err = json.Unmarshal([]byte(j), &msg) - if err != nil { - log.Printf("[WARNING] could not unmarshal task data %s: %v\n", j, err) - continue - } - msgs = append(msgs, &msg) - } - return msgs, nil -} - -// move moves taskMessage from zfrom to the specified queue. -func (r *rdb) move(from string, msg *taskMessage) error { - // TODO(hibiken): Lua script, make this atomic. - bytes, err := json.Marshal(msg) - if err != nil { - return errSerializeTask - } - if r.client.ZRem(from, string(bytes)).Val() > 0 { - err = r.enqueue(msg) - if err != nil { - log.Printf("[SERVERE ERROR] could not push task to queue %q: %v\n", - msg.Queue, err) - // TODO(hibiken): Handle this error properly. - // Add back to zfrom? - return fmt.Errorf("could not push task %v from %q: %v", msg, msg.Queue, err) - } - } - return nil -} - const maxDeadTask = 100 const deadExpirationInDays = 90 @@ -188,6 +148,7 @@ func (r *rdb) moveAll(src, dst string) error { // forward moves all tasks with a score less than the current unix time // from the given zset to the default queue. +// TODO(hibiken): Find a better method name that reflects what this does. func (r *rdb) forward(from string) error { script := redis.NewScript(` local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) @@ -199,6 +160,7 @@ func (r *rdb) forward(from string) error { return msgs `) now := float64(time.Now().Unix()) - _, err := script.Run(r.client, []string{from, allQueues, defaultQueue}, now).Result() + res, err := script.Run(r.client, []string{from, allQueues, defaultQueue}, now).Result() + fmt.Printf("[DEBUG] got %d tasks from %q\n", len(res.([]interface{})), from) return err }