diff --git a/README.md b/README.md index fa5d2d0..a7aa6ef 100644 --- a/README.md +++ b/README.md @@ -90,7 +90,9 @@ func main() { r := redis.NewClient(&redis.Options{ Addr: "localhost:6379", } - bg := asynq.NewBackground(r, 10) + bg := asynq.NewBackground(r, &asynq.Config{ + Concurrency: 20, + }) // Blocks until signal TERM or INT is received. // For graceful shutdown, send signal TSTP to stop processing more tasks @@ -136,7 +138,9 @@ func main() { r := redis.NewClient(&redis.Options{ Addr: "localhost:6379", } - bg := asynq.NewBackground(r, 10) + bg := asynq.NewBackground(r, &asynq.Config{ + Concurrency: 20, + }) // Use asynq.HandlerFunc adapter for a handler function bg.Run(asynq.HandlerFunc(handler)) diff --git a/background.go b/background.go index 9d5b719..bfa4caa 100644 --- a/background.go +++ b/background.go @@ -33,12 +33,29 @@ type Background struct { processor *processor } -// NewBackground returns a new Background with the specified number of workers -// given a redis configuration . -func NewBackground(r *redis.Client, numWorkers int) *Background { +// Config specifies the background-task processing behavior. +type Config struct { + // Max number of concurrent workers to process tasks. + // + // If set to zero or negative value, NewBackground will overwrite the value to one. + Concurrency int + + // TODO(hibiken): Add ShutdownTimeout + // ShutdownTimeout time.Duration + + // TODO(hibiken): Add RetryDelayFunc +} + +// NewBackground returns a new Background instance given a redis client +// and background processing configuration. +func NewBackground(r *redis.Client, cfg *Config) *Background { + n := cfg.Concurrency + if n < 1 { + n = 1 + } rdb := rdb.NewRDB(r) scheduler := newScheduler(rdb, 5*time.Second) - processor := newProcessor(rdb, numWorkers, nil) + processor := newProcessor(rdb, n, nil) return &Background{ rdb: rdb, scheduler: scheduler, diff --git a/background_test.go b/background_test.go index 489bebc..5b648ca 100644 --- a/background_test.go +++ b/background_test.go @@ -18,7 +18,9 @@ func TestBackground(t *testing.T) { DB: 15, }) client := NewClient(r) - bg := NewBackground(r, 10) + bg := NewBackground(r, &Config{ + Concurrency: 10, + }) // no-op handler h := func(task *Task) error { diff --git a/client.go b/client.go index e51e385..ac6afd8 100644 --- a/client.go +++ b/client.go @@ -25,7 +25,7 @@ func NewClient(r *redis.Client) *Client { return &Client{rdb} } -// Option configures the behavior of task processing. +// Option specifies the processing behavior for the associated task. type Option interface{} // max number of times a task will be retried. diff --git a/doc.go b/doc.go index 1836fb4..7e08e11 100644 --- a/doc.go +++ b/doc.go @@ -3,9 +3,7 @@ Package asynq provides a framework for background task processing. The Client is used to register a task to be processed at the specified time. - client := asynq.NewClient(&asynq.RedisConfig{ - Addr: "localhost:6379", - }) + client := asynq.NewClient(redis) t := &asynq.Task{ Type: "send_email", @@ -16,8 +14,8 @@ The Client is used to register a task to be processed at the specified time. The Background is used to run the background processing with a given handler with the specified number of workers. - bg := asynq.NewBackground(20, &asynq.RedisConfig{ - Addr: "localhost:6379", + bg := asynq.NewBackground(redis, &asynq.Config{ + Concurrency: 20, }) bg.Run(handler)