mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 23:32:17 +08:00
Refactor forwarding of scheduled tasks
This commit is contained in:
parent
318b24b3b8
commit
4684f961c0
@ -24,7 +24,7 @@ type Background struct {
|
|||||||
// NewBackground returns a new Background instance.
|
// NewBackground returns a new Background instance.
|
||||||
func NewBackground(numWorkers int, config *RedisConfig) *Background {
|
func NewBackground(numWorkers int, config *RedisConfig) *Background {
|
||||||
r := rdb.NewRDB(newRedisClient(config))
|
r := rdb.NewRDB(newRedisClient(config))
|
||||||
poller := newPoller(r, 5*time.Second, []string{rdb.Scheduled, rdb.Retry})
|
poller := newPoller(r, 5*time.Second)
|
||||||
processor := newProcessor(r, numWorkers, nil)
|
processor := newProcessor(r, numWorkers, nil)
|
||||||
return &Background{
|
return &Background{
|
||||||
rdb: r,
|
rdb: r,
|
||||||
|
@ -229,10 +229,21 @@ func (r *RDB) RestoreUnfinished() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CheckScheduled checks for all scheduled tasks and moves any tasks that
|
||||||
|
// have to be processed to the queue.
|
||||||
|
func (r *RDB) CheckScheduled() error {
|
||||||
|
delayed := []string{Scheduled, Retry}
|
||||||
|
for _, zset := range delayed {
|
||||||
|
if err := r.forward(zset); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Forward moves all tasks with a score less than the current unix time
|
// Forward moves all tasks with a score less than the current unix time
|
||||||
// from the given zset to the default queue.
|
// from the given zset to the default queue.
|
||||||
// TODO(hibiken): Find a better method name that reflects what this does.
|
func (r *RDB) forward(from string) error {
|
||||||
func (r *RDB) Forward(from string) error {
|
|
||||||
script := redis.NewScript(`
|
script := redis.NewScript(`
|
||||||
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
|
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
|
||||||
for _, msg in ipairs(msgs) do
|
for _, msg in ipairs(msgs) do
|
||||||
|
@ -323,38 +323,50 @@ func TestRestoreUnfinished(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestForward(t *testing.T) {
|
func TestCheckScheduled(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
t1 := randomTask("send_email", "default", nil)
|
t1 := randomTask("send_email", "default", nil)
|
||||||
t2 := randomTask("generate_csv", "default", nil)
|
t2 := randomTask("generate_csv", "default", nil)
|
||||||
|
t3 := randomTask("gen_thumbnail", "default", nil)
|
||||||
secondAgo := time.Now().Add(-time.Second)
|
secondAgo := time.Now().Add(-time.Second)
|
||||||
hourFromNow := time.Now().Add(time.Hour)
|
hourFromNow := time.Now().Add(time.Hour)
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
tasks []*redis.Z // scheduled tasks with timestamp as a score
|
initScheduled []*redis.Z // tasks to be processed later
|
||||||
|
initRetry []*redis.Z // tasks to be retired later
|
||||||
wantQueued []*TaskMessage // queue after calling forward
|
wantQueued []*TaskMessage // queue after calling forward
|
||||||
wantScheduled []*TaskMessage // scheduled queue after calling forward
|
wantScheduled []*TaskMessage // tasks in scheduled queue after calling CheckScheduled
|
||||||
|
wantRetry []*TaskMessage // tasks in retry queue after calling CheckScheduled
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
tasks: []*redis.Z{
|
initScheduled: []*redis.Z{
|
||||||
&redis.Z{Member: mustMarshal(t, t1), Score: float64(secondAgo.Unix())},
|
&redis.Z{Member: mustMarshal(t, t1), Score: float64(secondAgo.Unix())},
|
||||||
&redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}},
|
&redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}},
|
||||||
wantQueued: []*TaskMessage{t1, t2},
|
initRetry: []*redis.Z{
|
||||||
|
&redis.Z{Member: mustMarshal(t, t3), Score: float64(secondAgo.Unix())}},
|
||||||
|
wantQueued: []*TaskMessage{t1, t2, t3},
|
||||||
wantScheduled: []*TaskMessage{},
|
wantScheduled: []*TaskMessage{},
|
||||||
|
wantRetry: []*TaskMessage{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
tasks: []*redis.Z{
|
initScheduled: []*redis.Z{
|
||||||
&redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())},
|
&redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())},
|
||||||
&redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}},
|
&redis.Z{Member: mustMarshal(t, t2), Score: float64(secondAgo.Unix())}},
|
||||||
wantQueued: []*TaskMessage{t2},
|
initRetry: []*redis.Z{
|
||||||
|
&redis.Z{Member: mustMarshal(t, t3), Score: float64(secondAgo.Unix())}},
|
||||||
|
wantQueued: []*TaskMessage{t2, t3},
|
||||||
wantScheduled: []*TaskMessage{t1},
|
wantScheduled: []*TaskMessage{t1},
|
||||||
|
wantRetry: []*TaskMessage{},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
tasks: []*redis.Z{
|
initScheduled: []*redis.Z{
|
||||||
&redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())},
|
&redis.Z{Member: mustMarshal(t, t1), Score: float64(hourFromNow.Unix())},
|
||||||
&redis.Z{Member: mustMarshal(t, t2), Score: float64(hourFromNow.Unix())}},
|
&redis.Z{Member: mustMarshal(t, t2), Score: float64(hourFromNow.Unix())}},
|
||||||
|
initRetry: []*redis.Z{
|
||||||
|
&redis.Z{Member: mustMarshal(t, t3), Score: float64(hourFromNow.Unix())}},
|
||||||
wantQueued: []*TaskMessage{},
|
wantQueued: []*TaskMessage{},
|
||||||
wantScheduled: []*TaskMessage{t1, t2},
|
wantScheduled: []*TaskMessage{t1, t2},
|
||||||
|
wantRetry: []*TaskMessage{t3},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -363,14 +375,18 @@ func TestForward(t *testing.T) {
|
|||||||
if err := r.client.FlushDB().Err(); err != nil {
|
if err := r.client.FlushDB().Err(); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := r.client.ZAdd(Scheduled, tc.tasks...).Err(); err != nil {
|
if err := r.client.ZAdd(Scheduled, tc.initScheduled...).Err(); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := r.client.ZAdd(Retry, tc.initRetry...).Err(); err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err := r.Forward(Scheduled)
|
err := r.CheckScheduled()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("(*RDB).Forward(%q) = %v, want nil", Scheduled, err)
|
t.Errorf("(*RDB).CheckScheduled() = %v, want nil", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
queued := r.client.LRange(DefaultQueue, 0, -1).Val()
|
queued := r.client.LRange(DefaultQueue, 0, -1).Val()
|
||||||
|
12
poller.go
12
poller.go
@ -15,17 +15,13 @@ type poller struct {
|
|||||||
|
|
||||||
// poll interval on average
|
// poll interval on average
|
||||||
avgInterval time.Duration
|
avgInterval time.Duration
|
||||||
|
|
||||||
// redis ZSETs to poll
|
|
||||||
zsets []string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPoller(r *rdb.RDB, avgInterval time.Duration, zsets []string) *poller {
|
func newPoller(r *rdb.RDB, avgInterval time.Duration) *poller {
|
||||||
return &poller{
|
return &poller{
|
||||||
rdb: r,
|
rdb: r,
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
avgInterval: avgInterval,
|
avgInterval: avgInterval,
|
||||||
zsets: zsets,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -52,9 +48,7 @@ func (p *poller) start() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *poller) exec() {
|
func (p *poller) exec() {
|
||||||
for _, zset := range p.zsets {
|
if err := p.rdb.CheckScheduled(); err != nil {
|
||||||
if err := p.rdb.Forward(zset); err != nil {
|
log.Printf("[ERROR] could not forward scheduled tasks: %v\n", err)
|
||||||
log.Printf("[ERROR] could not forward scheduled tasks from %q: %v\n", zset, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -16,7 +16,7 @@ func TestPoller(t *testing.T) {
|
|||||||
r := setup(t)
|
r := setup(t)
|
||||||
rdbClient := rdb.NewRDB(r)
|
rdbClient := rdb.NewRDB(r)
|
||||||
const pollInterval = time.Second
|
const pollInterval = time.Second
|
||||||
p := newPoller(rdbClient, pollInterval, []string{rdb.Scheduled, rdb.Retry})
|
p := newPoller(rdbClient, pollInterval)
|
||||||
t1 := randomTask("gen_thumbnail", "default", nil)
|
t1 := randomTask("gen_thumbnail", "default", nil)
|
||||||
t2 := randomTask("send_email", "default", nil)
|
t2 := randomTask("send_email", "default", nil)
|
||||||
t3 := randomTask("reindex", "default", nil)
|
t3 := randomTask("reindex", "default", nil)
|
||||||
|
Loading…
Reference in New Issue
Block a user