diff --git a/README.md b/README.md index a957c78..3662c0f 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,10 @@ # Asynq -[![Build Status](https://travis-ci.com/hibiken/asynq.svg?token=paqzfpSkF4p23s5Ux39b&branch=master)](https://travis-ci.com/hibiken/asynq) [![License: MIT](https://img.shields.io/badge/License-MIT-green.svg)](https://opensource.org/licenses/MIT) +[![Build Status](https://travis-ci.com/hibiken/asynq.svg?token=paqzfpSkF4p23s5Ux39b&branch=master)](https://travis-ci.com/hibiken/asynq) [![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT) -Simple, efficent asynchronous task processing library in Go. +Simple and efficent asynchronous task processing library in Go. + +**Important Note**: Current major version is zero (v0.x.x) to accomodate rapid development and fast iteration while getting early feedback from users. The public API could change without a major version update before the release of verson 1.0.0. ## Table of Contents @@ -10,6 +12,7 @@ Simple, efficent asynchronous task processing library in Go. - [Requirements](#requirements) - [Installation](#installation) - [Getting Started](#getting-started) +- [Monitoring CLI](#monitoring-cli) - [Acknowledgements](#acknowledgements) - [License](#license) @@ -27,44 +30,49 @@ Asynq provides: - Ability to configure max retry count per task - Ability to configure max number of worker goroutines to process tasks - Unix signal handling to safely shutdown background processing -- Enhanced reliability TODO(hibiken): link to wiki page describing this. - CLI to query and mutate queues state for mointoring and administrative purposes ## Requirements -| Dependency | Version | -| -------------------------------------------------------------- | ------- | -| [Redis](https://redis.io/) | v2.6+ | -| [Go](https://golang.org/) | v1.12+ | -| [github.com/go-redis/redis](https://github.com/go-redis/redis) | v.7.0+ | +| Dependency | Version | +| -------------------------- | ------- | +| [Redis](https://redis.io/) | v2.8+ | +| [Go](https://golang.org/) | v1.12+ | ## Installation ``` -go get github.com/hibiken/asynq +go get -u github.com/hibiken/asynq ``` ## Getting Started -1. Import `asynq` and `redis` in your file. +1. Import `asynq` in your file. ```go -import ( - "github.com/go-redis/redis/v7" - "github.com/hibiken/asynq" -) +import "github.com/hibiken/asynq" ``` -2. Create a `Client` instance to create tasks. +2. Use one of `RedisConnOpt` types to specify how to connect to Redis. + +```go +var redis = &asynq.RedisClientOpt{ + Addr: "localhost:6379", + // Omit if no password is required + Password: "mypassword", + // Use a dedicated db number for asynq. + // By default, Redis offers 16 databases (0..15) + DB: 0, +} +``` + +3. Create a `Client` instance to create and schedule tasks. ```go func main() { - r := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - } - client := asynq.NewClient(r) + client := asynq.NewClient(redis) - // create a task with typename and payload. + // Create a task with typename and payload. t1 := asynq.NewTask( "send_welcome_email", map[string]interface{}{"user_id": 42}) @@ -73,35 +81,34 @@ func main() { "send_reminder_email", map[string]interface{}{"user_id": 42}) - // process the task immediately. + // Process the task immediately. err := client.Schedule(t1, time.Now()) - // process the task 24 hours later. + // Process the task 24 hours later. err = client.Schedule(t2, time.Now().Add(24 * time.Hour)) - // specify the max number of retry (default: 25) + // Specify the max number of retry (default: 25) err = client.Schedule(t1, time.Now(), asynq.MaxRetry(1)) } ``` -3. Create a `Background` instance to process tasks. +4. Create a `Background` instance to process tasks. ```go func main() { - r := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - } - bg := asynq.NewBackground(r, &asynq.Config{ - Concurrency: 20, + bg := asynq.NewBackground(redis, &asynq.Config{ + Concurrency: 10, }) // Blocks until signal TERM or INT is received. // For graceful shutdown, send signal TSTP to stop processing more tasks - // before sending TERM or INT signal. + // before sending TERM or INT signal to terminate the process. bg.Run(handler) } ``` +Note that `Client` and `Background` are intended to be used in separate executable binaries. + The argument to `(*asynq.Background).Run` is an interface `asynq.Handler` which has one method `ProcessTask`. ```go @@ -136,11 +143,8 @@ func handler(t *asynq.Task) error { } func main() { - r := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - } - bg := asynq.NewBackground(r, &asynq.Config{ - Concurrency: 20, + bg := asynq.NewBackground(redis, &asynq.Config{ + Concurrency: 10, }) // Use asynq.HandlerFunc adapter for a handler function @@ -148,6 +152,10 @@ func main() { } ``` +## Monitoring CLI + +TODO(hibiken): Describe basic usage of `asynqmon` CLI + ## Acknowledgements - [Sidekiq](https://github.com/mperham/sidekiq) : Many of the design ideas are taken from sidekiq and its Web UI diff --git a/asynq.go b/asynq.go index 51fcceb..b3044ac 100644 --- a/asynq.go +++ b/asynq.go @@ -4,10 +4,12 @@ package asynq -/* -TODOs: -- [P0] Go docs -*/ +import ( + "crypto/tls" + "fmt" + + "github.com/go-redis/redis/v7" +) // Task represents a task to be performed. type Task struct { @@ -28,3 +30,109 @@ func NewTask(typename string, payload map[string]interface{}) *Task { Payload: Payload{payload}, } } + +// RedisConnOpt is a discriminated union of redis-client-option types. +// +// RedisConnOpt represents a sum of following types: +// RedisClientOpt | *RedisClientOpt | RedisFailoverClientOpt | *RedisFailoverClientOpt +type RedisConnOpt interface{} + +// RedisClientOpt is used to specify redis client options to connect +// to a redis server running as a stand alone instance. +type RedisClientOpt struct { + // Network type to use, either tcp or unix. + // Default is tcp. + Network string + + // Redis server address in the format 'host:port'. + Addr string + + // Redis server password. + Password string + + // Redis DB to select after connecting to the server. + // See: https://redis.io/commands/select. + DB int + + // Maximum number of socket connections. + // Default is 10 connections per every CPU as reported by runtime.NumCPU. + PoolSize int + + // TLS Config used to connect to the server. + // TLS will be negotiated only if this field is set. + TLSConfig *tls.Config +} + +// RedisFailoverClientOpt is used to specify redis failover client. +type RedisFailoverClientOpt struct { + // Redis master name that monitored by sentinels. + MasterName string + + // Addresses of sentinels in the form "host:port". + // Use at least three sentinels to avoid problems described in + // https://redis.io/topics/sentinel. + SentinelAddrs []string + + // Redis sentinel password. + SentinelPassword string + + // Redis server password. + Password string + + // Redis DB to select after connecting to the server. + // See: https://redis.io/commands/select. + DB int + + // Maximum number of socket connections. + // Default is 10 connections per every CPU as reported by runtime.NumCPU. + PoolSize int + + // TLS Config used to connect to the server. + // TLS will be negotiated only if this field is set. + TLSConfig *tls.Config +} + +func createRedisClient(r RedisConnOpt) *redis.Client { + switch r := r.(type) { + case *RedisClientOpt: + return redis.NewClient(&redis.Options{ + Network: r.Network, + Addr: r.Addr, + Password: r.Password, + DB: r.DB, + PoolSize: r.PoolSize, + TLSConfig: r.TLSConfig, + }) + case RedisClientOpt: + return redis.NewClient(&redis.Options{ + Network: r.Network, + Addr: r.Addr, + Password: r.Password, + DB: r.DB, + PoolSize: r.PoolSize, + TLSConfig: r.TLSConfig, + }) + case *RedisFailoverClientOpt: + return redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: r.MasterName, + SentinelAddrs: r.SentinelAddrs, + SentinelPassword: r.SentinelPassword, + Password: r.Password, + DB: r.DB, + PoolSize: r.PoolSize, + TLSConfig: r.TLSConfig, + }) + case RedisFailoverClientOpt: + return redis.NewFailoverClient(&redis.FailoverOptions{ + MasterName: r.MasterName, + SentinelAddrs: r.SentinelAddrs, + SentinelPassword: r.SentinelPassword, + Password: r.Password, + DB: r.DB, + PoolSize: r.PoolSize, + TLSConfig: r.TLSConfig, + }) + default: + panic(fmt.Sprintf("unexpected type %T for RedisConnOpt", r)) + } +} diff --git a/background.go b/background.go index 4f30d1d..58121b8 100644 --- a/background.go +++ b/background.go @@ -15,7 +15,6 @@ import ( "syscall" "time" - "github.com/go-redis/redis/v7" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" ) @@ -95,7 +94,7 @@ var defaultQueueConfig = map[string]uint{ // NewBackground returns a new Background instance given a redis client // and background processing configuration. -func NewBackground(r *redis.Client, cfg *Config) *Background { +func NewBackground(r RedisConnOpt, cfg *Config) *Background { n := cfg.Concurrency if n < 1 { n = 1 @@ -110,7 +109,7 @@ func NewBackground(r *redis.Client, cfg *Config) *Background { } qcfg := normalizeQueueCfg(queues) - rdb := rdb.NewRDB(r) + rdb := rdb.NewRDB(createRedisClient(r)) scheduler := newScheduler(rdb, 5*time.Second, qcfg) processor := newProcessor(rdb, n, qcfg, cfg.StrictPriority, delayFunc) return &Background{ diff --git a/client.go b/client.go index a4f58a3..3dfcfb2 100644 --- a/client.go +++ b/client.go @@ -8,7 +8,6 @@ import ( "strings" "time" - "github.com/go-redis/redis/v7" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/rdb" "github.com/rs/xid" @@ -25,8 +24,8 @@ type Client struct { } // NewClient and returns a new Client given a redis configuration. -func NewClient(r *redis.Client) *Client { - rdb := rdb.NewRDB(r) +func NewClient(r RedisConnOpt) *Client { + rdb := rdb.NewRDB(createRedisClient(r)) return &Client{rdb} } diff --git a/doc.go b/doc.go index 75bc785..237b200 100644 --- a/doc.go +++ b/doc.go @@ -5,15 +5,27 @@ /* Package asynq provides a framework for background task processing. +Asynq uses Redis as a message broker. To connect to redis server, +specify the options using one of RedisConnOpt types. + + redis = &asynq.RedisClientOpt{ + Addr: "localhost:6379", + Password: "secretpassword", + DB: 3, + } + The Client is used to register a task to be processed at the specified time. - client := asynq.NewClient(redis) +Task is created with two parameters: its type and payload. - t := asynq.NewTask( - "send_email", - map[string]interface{}{"user_id": 42}) + client := asynq.NewClient(redis) - err := client.Schedule(t, time.Now().Add(time.Minute)) + t := asynq.NewTask( + "send_email", + map[string]interface{}{"user_id": 42}) + + // Schedule the task t to be processed a minute from now. + err := client.Schedule(t, time.Now().Add(time.Minute)) The Background is used to run the background task processing with a given handler. @@ -26,7 +38,7 @@ handler. Handler is an interface with one method ProcessTask which takes a task and returns an error. Handler should return nil if the processing is successful, otherwise return a non-nil error. -If handler returns a non-nil error, the task will be retried in the future. +If handler panics or returns a non-nil error, the task will be retried in the future. Example of a type that implements the Handler interface. type TaskHandler struct { @@ -42,7 +54,7 @@ Example of a type that implements the Handler interface. // generate thumbnail image //... default: - return fmt.Errorf("unepected task type %q", task.Type) + return fmt.Errorf("unexpected task type %q", task.Type) } return nil }