2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Limit the number of tasks moved by CheckAndEnqueue to prevent a long

running script
This commit is contained in:
Ken Hibino 2020-06-07 13:04:27 -07:00
parent 8af4cbad51
commit 06c4a1c7f8
13 changed files with 91 additions and 105 deletions

View File

@ -5,6 +5,7 @@ git:
go: [1.13.x, 1.14.x] go: [1.13.x, 1.14.x]
script: script:
- go test -race -v -coverprofile=coverage.txt -covermode=atomic ./... - go test -race -v -coverprofile=coverage.txt -covermode=atomic ./...
- go test -run=XXX -bench=. -loglevel=debug ./...
services: services:
- redis-server - redis-server
after_success: after_success:

View File

@ -3,13 +3,16 @@ if [ "${TRAVIS_PULL_REQUEST_BRANCH:-$TRAVIS_BRANCH}" != "master" ]; then
cd ${TRAVIS_BUILD_DIR}/.. && \ cd ${TRAVIS_BUILD_DIR}/.. && \
git clone ${REMOTE_URL} "${TRAVIS_REPO_SLUG}-bench" && \ git clone ${REMOTE_URL} "${TRAVIS_REPO_SLUG}-bench" && \
cd "${TRAVIS_REPO_SLUG}-bench" && \ cd "${TRAVIS_REPO_SLUG}-bench" && \
# Benchmark master # Benchmark master
git checkout master && \ git checkout master && \
go test -run=XXX -bench=. ./... > master.txt && \ go test -run=XXX -bench=. ./... > master.txt && \
# Benchmark feature branch # Benchmark feature branch
git checkout ${TRAVIS_COMMIT} && \ git checkout ${TRAVIS_COMMIT} && \
go test -run=XXX -bench=. ./... > feature.txt && \ go test -run=XXX -bench=. ./... > feature.txt && \
go get -u golang.org/x/tools/cmd/benchcmp && \
# compare two benchmarks # compare two benchmarks
go get -u golang.org/x/tools/cmd/benchcmp && \
benchcmp master.txt feature.txt; benchcmp master.txt feature.txt;
fi fi

View File

@ -7,7 +7,6 @@ package asynq
import ( import (
"context" "context"
"fmt" "fmt"
"math/rand"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -29,6 +28,7 @@ func BenchmarkEndToEndSimple(b *testing.B) {
RetryDelayFunc: func(n int, err error, t *Task) time.Duration { RetryDelayFunc: func(n int, err error, t *Task) time.Duration {
return time.Second return time.Second
}, },
LogLevel: testLogLevel,
}) })
// Create a bunch of tasks // Create a bunch of tasks
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
@ -60,7 +60,6 @@ func BenchmarkEndToEnd(b *testing.B) {
const count = 100000 const count = 100000
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
b.StopTimer() // begin setup b.StopTimer() // begin setup
rand.Seed(time.Now().UnixNano())
setup(b) setup(b)
redis := &RedisClientOpt{ redis := &RedisClientOpt{
Addr: redisAddr, Addr: redisAddr,
@ -72,6 +71,7 @@ func BenchmarkEndToEnd(b *testing.B) {
RetryDelayFunc: func(n int, err error, t *Task) time.Duration { RetryDelayFunc: func(n int, err error, t *Task) time.Duration {
return time.Second return time.Second
}, },
LogLevel: testLogLevel,
}) })
// Create a bunch of tasks // Create a bunch of tasks
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
@ -90,8 +90,16 @@ func BenchmarkEndToEnd(b *testing.B) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(count * 2) wg.Add(count * 2)
handler := func(ctx context.Context, t *Task) error { handler := func(ctx context.Context, t *Task) error {
// randomly fail 1% of tasks n, err := t.Payload.GetInt("data")
if rand.Intn(100) == 1 { if err != nil {
b.Logf("internal error: %v", err)
}
retried, ok := GetRetryCount(ctx)
if !ok {
b.Logf("internal error: %v", err)
}
// Fail 1% of tasks for the first attempt.
if retried == 0 && n%100 == 0 {
return fmt.Errorf(":(") return fmt.Errorf(":(")
} }
wg.Done() wg.Done()
@ -131,6 +139,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) {
"default": 3, "default": 3,
"low": 1, "low": 1,
}, },
LogLevel: testLogLevel,
}) })
// Create a bunch of tasks // Create a bunch of tasks
for i := 0; i < highCount; i++ { for i := 0; i < highCount; i++ {

View File

@ -251,7 +251,7 @@ type Broker interface {
Retry(msg *TaskMessage, processAt time.Time, errMsg string) error Retry(msg *TaskMessage, processAt time.Time, errMsg string) error
Kill(msg *TaskMessage, errMsg string) error Kill(msg *TaskMessage, errMsg string) error
RequeueAll() (int64, error) RequeueAll() (int64, error)
CheckAndEnqueue(qnames ...string) error CheckAndEnqueue() error
WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
ClearServerState(host string, pid int, serverID string) error ClearServerState(host string, pid int, serverID string) error
CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers

View File

@ -32,9 +32,16 @@ type Stats struct {
// Queue represents a task queue. // Queue represents a task queue.
type Queue struct { type Queue struct {
Name string // Name of the queue (e.g. "default", "critical").
// Note: It doesn't include the prefix "asynq:queues:".
Name string
// Paused indicates whether the queue is paused.
// If true, tasks in the queue should not be processed.
Paused bool Paused bool
Size int // number of tasks in the queue
// Size is the number of tasks in the queue.
Size int
} }
// DailyStats holds aggregate data for a given day. // DailyStats holds aggregate data for a given day.

View File

@ -100,11 +100,7 @@ func TestCurrentStats(t *testing.T) {
Failed: 10, Failed: 10,
Timestamp: now, Timestamp: now,
Queues: []*Queue{ Queues: []*Queue{
{ {Name: base.DefaultQueueName, Paused: false, Size: 0},
Name: base.DefaultQueueName,
Paused: false,
Size: 0,
},
}, },
}, },
}, },
@ -709,12 +705,14 @@ func TestListRetry(t *testing.T) {
func TestListRetryPagination(t *testing.T) { func TestListRetryPagination(t *testing.T) {
r := setup(t) r := setup(t)
// create 100 tasks with an increasing number of wait time. // create 100 tasks with an increasing number of wait time.
now := time.Now()
var seed []h.ZSetEntry
for i := 0; i < 100; i++ { for i := 0; i < 100; i++ {
msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil) msg := h.NewTaskMessage(fmt.Sprintf("task %d", i), nil)
if err := r.Retry(msg, time.Now().Add(time.Duration(i)*time.Second), "error"); err != nil { processAt := now.Add(time.Duration(i) * time.Second)
t.Fatal(err) seed = append(seed, h.ZSetEntry{Msg: msg, Score: float64(processAt.Unix())})
}
} }
h.SeedRetryQueue(t, r.client, seed)
tests := []struct { tests := []struct {
desc string desc string
@ -2212,9 +2210,9 @@ func TestPause(t *testing.T) {
r := setup(t) r := setup(t)
tests := []struct { tests := []struct {
initial []string // initial queue keys in the set initial []string // initial keys in the paused set
qname string // queue name to pause qname string // name of the queue to pause
want []string // expected queue keys in the set want []string // expected keys in the paused set
}{ }{
{[]string{}, "default", []string{"asynq:queues:default"}}, {[]string{}, "default", []string{"asynq:queues:default"}},
{[]string{"asynq:queues:default"}, "critical", []string{"asynq:queues:default", "asynq:queues:critical"}}, {[]string{"asynq:queues:default"}, "critical", []string{"asynq:queues:default", "asynq:queues:critical"}},
@ -2233,7 +2231,6 @@ func TestPause(t *testing.T) {
err := r.Pause(tc.qname) err := r.Pause(tc.qname)
if err != nil { if err != nil {
t.Errorf("Pause(%q) returned error: %v", tc.qname, err) t.Errorf("Pause(%q) returned error: %v", tc.qname, err)
continue
} }
got, err := r.client.SMembers(base.PausedQueues).Result() got, err := r.client.SMembers(base.PausedQueues).Result()
@ -2253,9 +2250,9 @@ func TestPauseError(t *testing.T) {
tests := []struct { tests := []struct {
desc string // test case description desc string // test case description
initial []string // initial queue keys in the set initial []string // initial keys in the paused set
qname string // queue name to pause qname string // name of the queue to pause
want []string // expected queue keys in the set want []string // expected keys in the paused set
}{ }{
{"queue already paused", []string{"asynq:queues:default"}, "default", []string{"asynq:queues:default"}}, {"queue already paused", []string{"asynq:queues:default"}, "default", []string{"asynq:queues:default"}},
} }
@ -2273,7 +2270,6 @@ func TestPauseError(t *testing.T) {
err := r.Pause(tc.qname) err := r.Pause(tc.qname)
if err == nil { if err == nil {
t.Errorf("%s; Pause(%q) returned nil: want error", tc.desc, tc.qname) t.Errorf("%s; Pause(%q) returned nil: want error", tc.desc, tc.qname)
continue
} }
got, err := r.client.SMembers(base.PausedQueues).Result() got, err := r.client.SMembers(base.PausedQueues).Result()
@ -2292,9 +2288,9 @@ func TestUnpause(t *testing.T) {
r := setup(t) r := setup(t)
tests := []struct { tests := []struct {
initial []string // initial queue keys in the set initial []string // initial keys in the paused set
qname string // queue name to unpause qname string // name of the queue to unpause
want []string // expected queue keys in the set want []string // expected keys in the paused set
}{ }{
{[]string{"asynq:queues:default"}, "default", []string{}}, {[]string{"asynq:queues:default"}, "default", []string{}},
{[]string{"asynq:queues:default", "asynq:queues:low"}, "low", []string{"asynq:queues:default"}}, {[]string{"asynq:queues:default", "asynq:queues:low"}, "low", []string{"asynq:queues:default"}},
@ -2313,7 +2309,6 @@ func TestUnpause(t *testing.T) {
err := r.Unpause(tc.qname) err := r.Unpause(tc.qname)
if err != nil { if err != nil {
t.Errorf("Unpause(%q) returned error: %v", tc.qname, err) t.Errorf("Unpause(%q) returned error: %v", tc.qname, err)
continue
} }
got, err := r.client.SMembers(base.PausedQueues).Result() got, err := r.client.SMembers(base.PausedQueues).Result()
@ -2333,9 +2328,9 @@ func TestUnpauseError(t *testing.T) {
tests := []struct { tests := []struct {
desc string // test case description desc string // test case description
initial []string // initial queue keys in the set initial []string // initial keys in the paused set
qname string // queue name to unpause qname string // name of the queue to unpause
want []string // expected queue keys in the set want []string // expected keys in the paused set
}{ }{
{"set is empty", []string{}, "default", []string{}}, {"set is empty", []string{}, "default", []string{}},
{"queue is not in the set", []string{"asynq:queues:default"}, "low", []string{"asynq:queues:default"}}, {"queue is not in the set", []string{"asynq:queues:default"}, "low", []string{"asynq:queues:default"}},
@ -2354,7 +2349,6 @@ func TestUnpauseError(t *testing.T) {
err := r.Unpause(tc.qname) err := r.Unpause(tc.qname)
if err == nil { if err == nil {
t.Errorf("%s; Unpause(%q) returned nil: want error", tc.desc, tc.qname) t.Errorf("%s; Unpause(%q) returned nil: want error", tc.desc, tc.qname)
continue
} }
got, err := r.client.SMembers(base.PausedQueues).Result() got, err := r.client.SMembers(base.PausedQueues).Result()

View File

@ -106,11 +106,11 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error {
// Dequeue skips a queue if the queue is paused. // Dequeue skips a queue if the queue is paused.
// If all queues are empty, ErrNoProcessableTask error is returned. // If all queues are empty, ErrNoProcessableTask error is returned.
func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) { func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) {
var keys []string var qkeys []interface{}
for _, q := range qnames { for _, q := range qnames {
keys = append(keys, base.QueueKey(q)) qkeys = append(qkeys, base.QueueKey(q))
} }
data, err := r.dequeue(keys...) data, err := r.dequeue(qkeys...)
if err == redis.Nil { if err == redis.Nil {
return nil, ErrNoProcessableTask return nil, ErrNoProcessableTask
} }
@ -142,13 +142,9 @@ for _, qkey in ipairs(ARGV) do
end end
return nil`) return nil`)
func (r *RDB) dequeue(queues ...string) (data string, err error) { func (r *RDB) dequeue(qkeys ...interface{}) (data string, err error) {
var args []interface{}
for _, qkey := range queues {
args = append(args, qkey)
}
res, err := dequeueCmd.Run(r.client, res, err := dequeueCmd.Run(r.client,
[]string{base.InProgressQueue, base.PausedQueues}, args...).Result() []string{base.InProgressQueue, base.PausedQueues}, qkeys...).Result()
if err != nil { if err != nil {
return "", err return "", err
} }
@ -163,7 +159,10 @@ func (r *RDB) dequeue(queues ...string) (data string, err error) {
// ARGV[3] -> task ID // ARGV[3] -> task ID
// Note: LREM count ZERO means "remove all elements equal to val" // Note: LREM count ZERO means "remove all elements equal to val"
var doneCmd = redis.NewScript(` var doneCmd = redis.NewScript(`
redis.call("LREM", KEYS[1], 0, ARGV[1]) local x = redis.call("LREM", KEYS[1], 0, ARGV[1])
if x == 0 then
return redis.error_reply("NOT FOUND")
end
local n = redis.call("INCR", KEYS[2]) local n = redis.call("INCR", KEYS[2])
if tonumber(n) == 1 then if tonumber(n) == 1 then
redis.call("EXPIREAT", KEYS[2], ARGV[2]) redis.call("EXPIREAT", KEYS[2], ARGV[2])
@ -285,7 +284,10 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim
// ARGV[3] -> retry_at UNIX timestamp // ARGV[3] -> retry_at UNIX timestamp
// ARGV[4] -> stats expiration timestamp // ARGV[4] -> stats expiration timestamp
var retryCmd = redis.NewScript(` var retryCmd = redis.NewScript(`
redis.call("LREM", KEYS[1], 0, ARGV[1]) local x = redis.call("LREM", KEYS[1], 0, ARGV[1])
if x == 0 then
return redis.error_reply("NOT FOUND")
end
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
local n = redis.call("INCR", KEYS[3]) local n = redis.call("INCR", KEYS[3])
if tonumber(n) == 1 then if tonumber(n) == 1 then
@ -336,7 +338,10 @@ const (
// ARGV[5] -> max number of tasks in dead queue (e.g., 100) // ARGV[5] -> max number of tasks in dead queue (e.g., 100)
// ARGV[6] -> stats expiration timestamp // ARGV[6] -> stats expiration timestamp
var killCmd = redis.NewScript(` var killCmd = redis.NewScript(`
redis.call("LREM", KEYS[1], 0, ARGV[1]) local x = redis.call("LREM", KEYS[1], 0, ARGV[1])
if x == 0 then
return redis.error_reply("NOT FOUND")
end
redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2]) redis.call("ZADD", KEYS[2], ARGV[3], ARGV[2])
redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4]) redis.call("ZREMRANGEBYSCORE", KEYS[2], "-inf", ARGV[4])
redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5]) redis.call("ZREMRANGEBYRANK", KEYS[2], 0, -ARGV[5])
@ -400,21 +405,17 @@ func (r *RDB) RequeueAll() (int64, error) {
return n, nil return n, nil
} }
// CheckAndEnqueue checks for all scheduled tasks and enqueues any tasks that // CheckAndEnqueue checks for all scheduled/retry tasks and enqueues any tasks that
// have to be processed. // are ready to be processed.
// func (r *RDB) CheckAndEnqueue() (err error) {
// qnames specifies to which queues to send tasks.
func (r *RDB) CheckAndEnqueue(qnames ...string) error {
delayed := []string{base.ScheduledQueue, base.RetryQueue} delayed := []string{base.ScheduledQueue, base.RetryQueue}
for _, zset := range delayed { for _, zset := range delayed {
var err error n := 1
if len(qnames) == 1 { for n != 0 {
err = r.forwardSingle(zset, base.QueueKey(qnames[0])) n, err = r.forward(zset)
} else { if err != nil {
err = r.forward(zset) return err
} }
if err != nil {
return err
} }
} }
return nil return nil
@ -423,40 +424,27 @@ func (r *RDB) CheckAndEnqueue(qnames ...string) error {
// KEYS[1] -> source queue (e.g. scheduled or retry queue) // KEYS[1] -> source queue (e.g. scheduled or retry queue)
// ARGV[1] -> current unix time // ARGV[1] -> current unix time
// ARGV[2] -> queue prefix // ARGV[2] -> queue prefix
// Note: Script moves tasks up to 100 at a time to keep the runtime of script short.
var forwardCmd = redis.NewScript(` var forwardCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1]) local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100)
for _, msg in ipairs(msgs) do for _, msg in ipairs(msgs) do
local decoded = cjson.decode(msg) local decoded = cjson.decode(msg)
local qkey = ARGV[2] .. decoded["Queue"] local qkey = ARGV[2] .. decoded["Queue"]
redis.call("LPUSH", qkey, msg) redis.call("LPUSH", qkey, msg)
redis.call("ZREM", KEYS[1], msg) redis.call("ZREM", KEYS[1], msg)
end end
return msgs`) return table.getn(msgs)`)
// forward moves all tasks with a score less than the current unix time // forward moves tasks with a score less than the current unix time
// from the src zset. // from the src zset. It returns the number of tasks moved.
func (r *RDB) forward(src string) error { func (r *RDB) forward(src string) (int, error) {
now := float64(time.Now().Unix()) now := float64(time.Now().Unix())
return forwardCmd.Run(r.client, res, err := forwardCmd.Run(r.client,
[]string{src}, now, base.QueuePrefix).Err() []string{src}, now, base.QueuePrefix).Result()
} if err != nil {
return 0, err
// KEYS[1] -> source queue (e.g. scheduled or retry queue) }
// KEYS[2] -> destination queue return cast.ToInt(res), nil
var forwardSingleCmd = redis.NewScript(`
local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1])
for _, msg in ipairs(msgs) do
redis.call("LPUSH", KEYS[2], msg)
redis.call("ZREM", KEYS[1], msg)
end
return msgs`)
// forwardSingle moves all tasks with a score less than the current unix time
// from the src zset to dst list.
func (r *RDB) forwardSingle(src, dst string) error {
now := float64(time.Now().Unix())
return forwardSingleCmd.Run(r.client,
[]string{src, dst}, now).Err()
} }
// KEYS[1] -> asynq:servers:<host:pid:sid> // KEYS[1] -> asynq:servers:<host:pid:sid>

View File

@ -860,7 +860,6 @@ func TestCheckAndEnqueue(t *testing.T) {
tests := []struct { tests := []struct {
scheduled []h.ZSetEntry scheduled []h.ZSetEntry
retry []h.ZSetEntry retry []h.ZSetEntry
qnames []string
wantEnqueued map[string][]*base.TaskMessage wantEnqueued map[string][]*base.TaskMessage
wantScheduled []*base.TaskMessage wantScheduled []*base.TaskMessage
wantRetry []*base.TaskMessage wantRetry []*base.TaskMessage
@ -872,7 +871,6 @@ func TestCheckAndEnqueue(t *testing.T) {
}, },
retry: []h.ZSetEntry{ retry: []h.ZSetEntry{
{Msg: t3, Score: float64(secondAgo.Unix())}}, {Msg: t3, Score: float64(secondAgo.Unix())}},
qnames: []string{"default"},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1, t2, t3}, "default": {t1, t2, t3},
}, },
@ -885,7 +883,6 @@ func TestCheckAndEnqueue(t *testing.T) {
{Msg: t2, Score: float64(secondAgo.Unix())}}, {Msg: t2, Score: float64(secondAgo.Unix())}},
retry: []h.ZSetEntry{ retry: []h.ZSetEntry{
{Msg: t3, Score: float64(secondAgo.Unix())}}, {Msg: t3, Score: float64(secondAgo.Unix())}},
qnames: []string{"default"},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t2, t3}, "default": {t2, t3},
}, },
@ -898,7 +895,6 @@ func TestCheckAndEnqueue(t *testing.T) {
{Msg: t2, Score: float64(hourFromNow.Unix())}}, {Msg: t2, Score: float64(hourFromNow.Unix())}},
retry: []h.ZSetEntry{ retry: []h.ZSetEntry{
{Msg: t3, Score: float64(hourFromNow.Unix())}}, {Msg: t3, Score: float64(hourFromNow.Unix())}},
qnames: []string{"default"},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {}, "default": {},
}, },
@ -912,7 +908,6 @@ func TestCheckAndEnqueue(t *testing.T) {
}, },
retry: []h.ZSetEntry{ retry: []h.ZSetEntry{
{Msg: t5, Score: float64(secondAgo.Unix())}}, {Msg: t5, Score: float64(secondAgo.Unix())}},
qnames: []string{"default", "critical", "low"},
wantEnqueued: map[string][]*base.TaskMessage{ wantEnqueued: map[string][]*base.TaskMessage{
"default": {t1}, "default": {t1},
"critical": {t4}, "critical": {t4},
@ -928,7 +923,7 @@ func TestCheckAndEnqueue(t *testing.T) {
h.SeedScheduledQueue(t, r.client, tc.scheduled) h.SeedScheduledQueue(t, r.client, tc.scheduled)
h.SeedRetryQueue(t, r.client, tc.retry) h.SeedRetryQueue(t, r.client, tc.retry)
err := r.CheckAndEnqueue(tc.qnames...) err := r.CheckAndEnqueue()
if err != nil { if err != nil {
t.Errorf("(*RDB).CheckScheduled() = %v, want nil", err) t.Errorf("(*RDB).CheckScheduled() = %v, want nil", err)
continue continue

View File

@ -132,7 +132,7 @@ func (tb *TestBroker) RequeueAll() (int64, error) {
return tb.real.RequeueAll() return tb.real.RequeueAll()
} }
func (tb *TestBroker) CheckAndEnqueue(qnames ...string) error { func (tb *TestBroker) CheckAndEnqueue() error {
tb.mu.Lock() tb.mu.Lock()
defer tb.mu.Unlock() defer tb.mu.Unlock()
if tb.sleeping { if tb.sleeping {

View File

@ -253,7 +253,7 @@ func (p *processor) requeue(msg *base.TaskMessage) {
func (p *processor) markAsDone(msg *base.TaskMessage) { func (p *processor) markAsDone(msg *base.TaskMessage) {
err := p.broker.Done(msg) err := p.broker.Done(msg)
if err != nil { if err != nil {
errMsg := fmt.Sprintf("Could not remove task id=%s from %q", msg.ID, base.InProgressQueue) errMsg := fmt.Sprintf("Could not remove task id=%s type=%q from %q", msg.ID, msg.Type, base.InProgressQueue)
p.logger.Warnf("%s; Will retry syncing", errMsg) p.logger.Warnf("%s; Will retry syncing", errMsg)
p.syncRequestCh <- &syncRequest{ p.syncRequestCh <- &syncRequest{
fn: func() error { fn: func() error {

View File

@ -21,29 +21,20 @@ type scheduler struct {
// poll interval on average // poll interval on average
avgInterval time.Duration avgInterval time.Duration
// list of queues to move the tasks into.
qnames []string
} }
type schedulerParams struct { type schedulerParams struct {
logger *log.Logger logger *log.Logger
broker base.Broker broker base.Broker
interval time.Duration interval time.Duration
queues map[string]int
} }
func newScheduler(params schedulerParams) *scheduler { func newScheduler(params schedulerParams) *scheduler {
var qnames []string
for q := range params.queues {
qnames = append(qnames, q)
}
return &scheduler{ return &scheduler{
logger: params.logger, logger: params.logger,
broker: params.broker, broker: params.broker,
done: make(chan struct{}), done: make(chan struct{}),
avgInterval: params.interval, avgInterval: params.interval,
qnames: qnames,
} }
} }
@ -71,7 +62,7 @@ func (s *scheduler) start(wg *sync.WaitGroup) {
} }
func (s *scheduler) exec() { func (s *scheduler) exec() {
if err := s.broker.CheckAndEnqueue(s.qnames...); err != nil { if err := s.broker.CheckAndEnqueue(); err != nil {
s.logger.Errorf("Could not enqueue scheduled tasks: %v", err) s.logger.Errorf("Could not enqueue scheduled tasks: %v", err)
} }
} }

View File

@ -23,7 +23,6 @@ func TestScheduler(t *testing.T) {
logger: testLogger, logger: testLogger,
broker: rdbClient, broker: rdbClient,
interval: pollInterval, interval: pollInterval,
queues: defaultQueueConfig,
}) })
t1 := h.NewTaskMessage("gen_thumbnail", nil) t1 := h.NewTaskMessage("gen_thumbnail", nil)
t2 := h.NewTaskMessage("send_email", nil) t2 := h.NewTaskMessage("send_email", nil)

View File

@ -283,8 +283,8 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
logger.SetLevel(toInternalLogLevel(loglevel)) logger.SetLevel(toInternalLogLevel(loglevel))
rdb := rdb.NewRDB(createRedisClient(r)) rdb := rdb.NewRDB(createRedisClient(r))
starting := make(chan *base.TaskMessage, n) starting := make(chan *base.TaskMessage)
finished := make(chan *base.TaskMessage, n) finished := make(chan *base.TaskMessage)
syncCh := make(chan *syncRequest) syncCh := make(chan *syncRequest)
status := base.NewServerStatus(base.StatusIdle) status := base.NewServerStatus(base.StatusIdle)
cancels := base.NewCancelations() cancels := base.NewCancelations()
@ -309,7 +309,6 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
logger: logger, logger: logger,
broker: rdb, broker: rdb,
interval: 5 * time.Second, interval: 5 * time.Second,
queues: queues,
}) })
subscriber := newSubscriber(subscriberParams{ subscriber := newSubscriber(subscriberParams{
logger: logger, logger: logger,