diff --git a/background.go b/background.go index 9cec5d9..73f1dcb 100644 --- a/background.go +++ b/background.go @@ -24,7 +24,7 @@ type Background struct { // NewBackground returns a new Background instance. func NewBackground(numWorkers int, config *RedisConfig) *Background { r := rdb.NewRDB(newRedisClient(config)) - poller := newPoller(r, 5*time.Second, []string{rdb.Scheduled, rdb.Retry}) + poller := newPoller(r, 5*time.Second) processor := newProcessor(r, numWorkers, nil) return &Background{ rdb: r, diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index c6be7d3..d0279fb 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -229,10 +229,21 @@ func (r *RDB) RestoreUnfinished() error { return err } +// CheckScheduled checks for all scheduled tasks and moves any tasks that +// have to be processed to the queue. +func (r *RDB) CheckScheduled() error { + delayed := []string{Scheduled, Retry} + for _, zset := range delayed { + if err := r.forward(zset); err != nil { + return err + } + } + return nil +} + // Forward moves all tasks with a score less than the current unix time // from the given zset to the default queue. -// TODO(hibiken): Find a better method name that reflects what this does. -func (r *RDB) Forward(from string) error { +func (r *RDB) forward(from string) error { script := redis.NewScript(` local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) for _, msg in ipairs(msgs) do diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 197dd7d..80a3362 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -323,38 +323,50 @@ func TestRestoreUnfinished(t *testing.T) { } } -func TestForward(t *testing.T) { +func TestCheckScheduled(t *testing.T) { r := setup(t) t1 := randomTask("send_email", "default", nil) t2 := randomTask("generate_csv", "default", nil) + t3 := randomTask("gen_thumbnail", "default", nil) secondAgo := time.Now().Add(-time.Second) hourFromNow := time.Now().Add(time.Hour) tests := []struct { - tasks []*redis.Z // scheduled tasks with timestamp as a score + initScheduled []*redis.Z // tasks to be processed later + initRetry []*redis.Z // tasks to be retired later wantQueued []*TaskMessage // queue after calling forward - wantScheduled []*TaskMessage // scheduled queue after calling forward + wantScheduled []*TaskMessage // tasks in scheduled queue after calling CheckScheduled + wantRetry []*TaskMessage // tasks in retry queue after calling CheckScheduled }{ { - tasks: []*redis.Z{ + initScheduled: []*redis.Z{ &redis.Z{Member: mustMarshal(t, t1), Score: float64(secondAgo.Unix())}, &redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}}, - wantQueued: []*TaskMessage{t1, t2}, + initRetry: []*redis.Z{ + &redis.Z{Member: mustMarshal(t, t3), Score: float64(secondAgo.Unix())}}, + wantQueued: []*TaskMessage{t1, t2, t3}, wantScheduled: []*TaskMessage{}, + wantRetry: []*TaskMessage{}, }, { - tasks: []*redis.Z{ + initScheduled: []*redis.Z{ &redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())}, &redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}}, - wantQueued: []*TaskMessage{t2}, + initRetry: []*redis.Z{ + &redis.Z{Member: mustMarshal(t, t3), Score: float64(secondAgo.Unix())}}, + wantQueued: []*TaskMessage{t2, t3}, wantScheduled: []*TaskMessage{t1}, + wantRetry: []*TaskMessage{}, }, { - tasks: []*redis.Z{ + initScheduled: []*redis.Z{ &redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())}, &redis.Z{Member: mustMarshal(t, t2), Score: float64(hourFromNow.Unix())}}, + initRetry: []*redis.Z{ + &redis.Z{Member: mustMarshal(t, t3), Score: float64(hourFromNow.Unix())}}, wantQueued: []*TaskMessage{}, wantScheduled: []*TaskMessage{t1, t2}, + wantRetry: []*TaskMessage{t3}, }, } @@ -363,14 +375,18 @@ func TestForward(t *testing.T) { if err := r.client.FlushDB().Err(); err != nil { t.Fatal(err) } - if err := r.client.ZAdd(Scheduled, tc.tasks...).Err(); err != nil { + if err := r.client.ZAdd(Scheduled, tc.initScheduled...).Err(); err != nil { + t.Error(err) + continue + } + if err := r.client.ZAdd(Retry, tc.initRetry...).Err(); err != nil { t.Error(err) continue } - err := r.Forward(Scheduled) + err := r.CheckScheduled() if err != nil { - t.Errorf("(*RDB).Forward(%q) = %v, want nil", Scheduled, err) + t.Errorf("(*RDB).CheckScheduled() = %v, want nil", err) continue } queued := r.client.LRange(DefaultQueue, 0, -1).Val() diff --git a/poller.go b/poller.go index a4ef6c9..defadb4 100644 --- a/poller.go +++ b/poller.go @@ -15,17 +15,13 @@ type poller struct { // poll interval on average avgInterval time.Duration - - // redis ZSETs to poll - zsets []string } -func newPoller(r *rdb.RDB, avgInterval time.Duration, zsets []string) *poller { +func newPoller(r *rdb.RDB, avgInterval time.Duration) *poller { return &poller{ rdb: r, done: make(chan struct{}), avgInterval: avgInterval, - zsets: zsets, } } @@ -52,9 +48,7 @@ func (p *poller) start() { } func (p *poller) exec() { - for _, zset := range p.zsets { - if err := p.rdb.Forward(zset); err != nil { - log.Printf("[ERROR] could not forward scheduled tasks from %q: %v\n", zset, err) - } + if err := p.rdb.CheckScheduled(); err != nil { + log.Printf("[ERROR] could not forward scheduled tasks: %v\n", err) } } diff --git a/poller_test.go b/poller_test.go index 4821a7d..b754956 100644 --- a/poller_test.go +++ b/poller_test.go @@ -16,7 +16,7 @@ func TestPoller(t *testing.T) { r := setup(t) rdbClient := rdb.NewRDB(r) const pollInterval = time.Second - p := newPoller(rdbClient, pollInterval, []string{rdb.Scheduled, rdb.Retry}) + p := newPoller(rdbClient, pollInterval) t1 := randomTask("gen_thumbnail", "default", nil) t2 := randomTask("send_email", "default", nil) t3 := randomTask("reindex", "default", nil)