diff --git a/background.go b/background.go index 204b59d..908e7a6 100644 --- a/background.go +++ b/background.go @@ -16,6 +16,7 @@ import ( "time" "github.com/go-redis/redis/v7" + "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" ) @@ -54,6 +55,23 @@ type Config struct { // e is the error returned by the task handler. // t is the task in question. RetryDelayFunc func(n int, e error, t *Task) time.Duration + + // List of queues to process with given priority level. Keys are the names of the + // queues and values are associated priority level. + // + // If set to nil or not specified, the background will process only the "default" queue. + // + // Priority is treated as follows to prevent starving low priority queues. + // Example: + // Queues: map[string]uint{ + // "critical": 6, + // "default": 3, + // "low": 1, + // } + // With the above config and if all queues are not empty, then the tasks + // in "critical", "default", "low" should be processed 60%, 30%, 10% of + // the time respectively. + Queues map[string]uint } // Formula taken from https://github.com/mperham/sidekiq. @@ -63,6 +81,10 @@ func defaultDelayFunc(n int, e error, t *Task) time.Duration { return time.Duration(s) * time.Second } +var defaultQueueConfig = map[string]uint{ + base.DefaultQueueName: 1, +} + // NewBackground returns a new Background instance given a redis client // and background processing configuration. func NewBackground(r *redis.Client, cfg *Config) *Background { @@ -74,9 +96,13 @@ func NewBackground(r *redis.Client, cfg *Config) *Background { if delayFunc == nil { delayFunc = defaultDelayFunc } + queues := cfg.Queues + if queues == nil { + queues = defaultQueueConfig + } rdb := rdb.NewRDB(r) scheduler := newScheduler(rdb, 5*time.Second) - processor := newProcessor(rdb, n, delayFunc) + processor := newProcessor(rdb, n, queues, delayFunc) return &Background{ rdb: rdb, scheduler: scheduler, diff --git a/processor.go b/processor.go index 7f408c0..06930db 100644 --- a/processor.go +++ b/processor.go @@ -19,6 +19,8 @@ type processor struct { handler Handler + queueConfig map[string]uint + retryDelayFunc retryDelayFunc // timeout for blocking dequeue operation. @@ -44,9 +46,10 @@ type processor struct { type retryDelayFunc func(n int, err error, task *Task) time.Duration -func newProcessor(r *rdb.RDB, n int, fn retryDelayFunc) *processor { +func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, fn retryDelayFunc) *processor { return &processor{ rdb: r, + queueConfig: qcfg, retryDelayFunc: fn, dequeueTimeout: 2 * time.Second, sema: make(chan struct{}, n), diff --git a/processor_test.go b/processor_test.go index 79ff0cc..915bb72 100644 --- a/processor_test.go +++ b/processor_test.go @@ -63,7 +63,7 @@ func TestProcessorSuccess(t *testing.T) { processed = append(processed, task) return nil } - p := newProcessor(rdbClient, 10, defaultDelayFunc) + p := newProcessor(rdbClient, 10, defaultQueueConfig, defaultDelayFunc) p.handler = HandlerFunc(handler) p.dequeueTimeout = time.Second // short time out for test purpose @@ -147,7 +147,7 @@ func TestProcessorRetry(t *testing.T) { handler := func(task *Task) error { return fmt.Errorf(errMsg) } - p := newProcessor(rdbClient, 10, delayFunc) + p := newProcessor(rdbClient, 10, defaultQueueConfig, delayFunc) p.handler = HandlerFunc(handler) p.dequeueTimeout = time.Second // short time out for test purpose