diff --git a/asynq.go b/asynq.go index ba463b6..1ffcc41 100644 --- a/asynq.go +++ b/asynq.go @@ -9,32 +9,18 @@ TODOs: */ import ( - "encoding/json" - "fmt" - "math" - "math/rand" - "strconv" "sync" "time" "github.com/go-redis/redis/v7" ) -// Redis keys -const ( - queuePrefix = "asynq:queues:" // LIST - allQueues = "asynq:queues" // SET - scheduled = "asynq:scheduled" // ZSET - retry = "asynq:retry" // ZSET - dead = "asynq:dead" // ZSET -) - // Max retry count by default const defaultMaxRetry = 25 // Client is an interface for scheduling tasks. type Client struct { - rdb *redis.Client + rdb *rdb } // Task represents a task to be performed. @@ -76,8 +62,8 @@ type RedisOpt struct { // NewClient creates and returns a new client. func NewClient(opt *RedisOpt) *Client { - rdb := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password}) - return &Client{rdb: rdb} + client := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password}) + return &Client{rdb: newRDB(client)} } // Process enqueues the task to be performed at a given time. @@ -94,17 +80,15 @@ func (c *Client) Process(task *Task, executeAt time.Time) error { // enqueue pushes a given task to the specified queue. func (c *Client) enqueue(msg *taskMessage, executeAt time.Time) error { if time.Now().After(executeAt) { - return push(c.rdb, msg) + return c.rdb.push(msg) } - return zadd(c.rdb, scheduled, float64(executeAt.Unix()), msg) + return c.rdb.zadd(scheduled, float64(executeAt.Unix()), msg) } //-------------------- Launcher -------------------- // Launcher starts the manager and poller. type Launcher struct { - rdb *redis.Client - // running indicates whether manager and poller are both running. running bool mu sync.Mutex @@ -116,16 +100,11 @@ type Launcher struct { // NewLauncher creates and returns a new Launcher. func NewLauncher(poolSize int, opt *RedisOpt) *Launcher { - rdb := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password}) - poller := &poller{ - rdb: rdb, - done: make(chan struct{}), - avgInterval: 5 * time.Second, - zsets: []string{scheduled, retry}, - } + client := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password}) + rdb := newRDB(client) + poller := newPoller(rdb, 5*time.Second, []string{scheduled, retry}) manager := newManager(rdb, poolSize, nil) return &Launcher{ - rdb: rdb, poller: poller, manager: manager, } @@ -160,60 +139,3 @@ func (l *Launcher) Stop() { l.poller.terminate() l.manager.terminate() } - -// push pushes the task to the specified queue to get picked up by a worker. -func push(rdb *redis.Client, msg *taskMessage) error { - bytes, err := json.Marshal(msg) - if err != nil { - return fmt.Errorf("could not encode task into JSON: %v", err) - } - qname := queuePrefix + msg.Queue - err = rdb.SAdd(allQueues, qname).Err() - if err != nil { - return fmt.Errorf("could not execute command SADD %q %q: %v", - allQueues, qname, err) - } - return rdb.RPush(qname, string(bytes)).Err() -} - -// zadd serializes the given message and adds to the specified sorted set. -func zadd(rdb *redis.Client, zset string, zscore float64, msg *taskMessage) error { - bytes, err := json.Marshal(msg) - if err != nil { - return fmt.Errorf("could not encode task into JSON: %v", err) - } - return rdb.ZAdd(zset, &redis.Z{Member: string(bytes), Score: zscore}).Err() -} - -const maxDeadTask = 100 -const deadExpirationInDays = 90 - -// kill sends the task to "dead" sorted set. It also trim the sorted set by -// timestamp and set size. -func kill(rdb *redis.Client, msg *taskMessage) error { - bytes, err := json.Marshal(msg) - if err != nil { - return fmt.Errorf("could not encode task into JSON: %v", err) - } - now := time.Now() - pipe := rdb.Pipeline() - pipe.ZAdd(dead, &redis.Z{Member: string(bytes), Score: float64(now.Unix())}) - limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago - pipe.ZRemRangeByScore(dead, "-inf", strconv.Itoa(int(limit))) - pipe.ZRemRangeByRank(dead, 0, -maxDeadTask) // trim the set to 100 - _, err = pipe.Exec() - return err -} - -// listQueues returns the list of all queues. -func listQueues(rdb *redis.Client) []string { - return rdb.SMembers(allQueues).Val() -} - -// delaySeconds returns a number seconds to delay before retrying. -// Formula taken from https://github.com/mperham/sidekiq. -func delaySeconds(count int) time.Duration { - r := rand.New(rand.NewSource(time.Now().UnixNano())) - s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1)) - return time.Duration(s) * time.Second -} diff --git a/manager.go b/manager.go index 8339549..dad8bee 100644 --- a/manager.go +++ b/manager.go @@ -1,16 +1,15 @@ package asynq import ( - "encoding/json" "fmt" "log" + "math" + "math/rand" "time" - - "github.com/go-redis/redis/v7" ) type manager struct { - rdb *redis.Client + rdb *rdb handler TaskHandler @@ -22,7 +21,7 @@ type manager struct { done chan struct{} } -func newManager(rdb *redis.Client, numWorkers int, handler TaskHandler) *manager { +func newManager(rdb *rdb, numWorkers int, handler TaskHandler) *manager { return &manager{ rdb: rdb, handler: handler, @@ -63,22 +62,21 @@ func (m *manager) start() { func (m *manager) processTasks() { // pull message out of the queue and process it // TODO(hibiken): sort the list of queues in order of priority - res, err := m.rdb.BLPop(5*time.Second, listQueues(m.rdb)...).Result() // NOTE: BLPOP needs to time out because if case a new queue is added. + msg, err := m.rdb.bpop(5*time.Second, m.rdb.listQueues()...) if err != nil { - if err != redis.Nil { - log.Printf("BLPOP command failed: %v\n", err) + switch err { + case errQueuePopTimeout: + // timed out, this is a normal behavior. + return + case errDeserializeTask: + log.Println("[Servere Error] could not parse json encoded message") + return + default: + log.Printf("[Servere Error] unexpected error while pulling message out of queues: %v\n", err) + return } - return } - q, data := res[0], res[1] - fmt.Printf("perform task %v from %s\n", data, q) - var msg taskMessage - err = json.Unmarshal([]byte(data), &msg) - if err != nil { - log.Printf("[Servere Error] could not parse json encoded message %s: %v", data, err) - return - } t := &Task{Type: msg.Type, Payload: msg.Payload} m.sema <- struct{}{} // acquire token go func(task *Task) { @@ -87,7 +85,7 @@ func (m *manager) processTasks() { if err != nil { if msg.Retried >= msg.Retry { fmt.Println("Retry exhausted!!!") - if err := kill(m.rdb, &msg); err != nil { + if err := m.rdb.kill(msg); err != nil { log.Printf("[SERVER ERROR] could not add task %+v to 'dead' set\n", err) } return @@ -97,7 +95,7 @@ func (m *manager) processTasks() { fmt.Printf("[DEBUG] retying the task in %v\n", retryAt.Sub(time.Now())) msg.Retried++ msg.ErrorMsg = err.Error() - if err := zadd(m.rdb, retry, float64(retryAt.Unix()), &msg); err != nil { + if err := m.rdb.zadd(retry, float64(retryAt.Unix()), msg); err != nil { // TODO(hibiken): Not sure how to handle this error log.Printf("[SEVERE ERROR] could not add msg %+v to 'retry' set: %v\n", msg, err) return @@ -105,3 +103,11 @@ func (m *manager) processTasks() { } }(t) } + +// delaySeconds returns a number seconds to delay before retrying. +// Formula taken from https://github.com/mperham/sidekiq. +func delaySeconds(count int) time.Duration { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1)) + return time.Duration(s) * time.Second +} diff --git a/poller.go b/poller.go index 5f969ee..7f5452f 100644 --- a/poller.go +++ b/poller.go @@ -1,7 +1,6 @@ package asynq import ( - "encoding/json" "fmt" "log" "strconv" @@ -11,7 +10,7 @@ import ( ) type poller struct { - rdb *redis.Client + rdb *rdb // channel to communicate back to the long running "poller" goroutine. done chan struct{} @@ -23,6 +22,15 @@ type poller struct { zsets []string } +func newPoller(rdb *rdb, avgInterval time.Duration, zsets []string) *poller { + return &poller{ + rdb: rdb, + done: make(chan struct{}), + avgInterval: avgInterval, + zsets: zsets, + } +} + func (p *poller) terminate() { // send a signal to the manager goroutine to stop // processing tasks from the queue. @@ -51,38 +59,19 @@ func (p *poller) enqueue() { // Get next items in the queue with scores (time to execute) <= now. now := time.Now().Unix() fmt.Printf("[DEBUG] polling ZSET %q\n", zset) - jobs, err := p.rdb.ZRangeByScore(zset, - &redis.ZRangeBy{ - Min: "-inf", - Max: strconv.Itoa(int(now))}).Result() - fmt.Printf("len(jobs) = %d\n", len(jobs)) + msgs, err := p.rdb.zRangeByScore(zset, + &redis.ZRangeBy{Min: "-inf", Max: strconv.Itoa(int(now))}) if err != nil { log.Printf("radis command ZRANGEBYSCORE failed: %v\n", err) continue } - if len(jobs) == 0 { - fmt.Println("jobs empty") - continue - } - for _, j := range jobs { - fmt.Printf("[debug] j = %v\n", j) - var msg taskMessage - err = json.Unmarshal([]byte(j), &msg) - if err != nil { - fmt.Println("unmarshal failed") + for _, m := range msgs { + if err := p.rdb.move(zset, m); err != nil { + log.Printf("could not move task %+v to queue %q: %v", + m, m.Queue, err) continue } - - fmt.Println("[debug] ZREM") - if p.rdb.ZRem(zset, j).Val() > 0 { - err = push(p.rdb, &msg) - if err != nil { - log.Printf("could not push task to queue %q: %v", msg.Queue, err) - // TODO(hibiken): Handle this error properly. Add back to scheduled ZSET? - continue - } - } } } } diff --git a/rdb.go b/rdb.go new file mode 100644 index 0000000..d2371bf --- /dev/null +++ b/rdb.go @@ -0,0 +1,155 @@ +package asynq + +import ( + "encoding/json" + "errors" + "fmt" + "log" + "strconv" + "time" + + "github.com/go-redis/redis/v7" +) + +// Redis keys +const ( + queuePrefix = "asynq:queues:" // LIST + allQueues = "asynq:queues" // SET + scheduled = "asynq:scheduled" // ZSET + retry = "asynq:retry" // ZSET + dead = "asynq:dead" // ZSET +) + +var ( + errQueuePopTimeout = errors.New("blocking queue pop operation timed out") + errSerializeTask = errors.New("could not encode task message into json") + errDeserializeTask = errors.New("could not decode task message from json") +) + +// rdb encapsulates the interaction with redis server. +type rdb struct { + client *redis.Client +} + +func newRDB(client *redis.Client) *rdb { + return &rdb{client} +} + +// push enqueues the task to queue. +func (r *rdb) push(msg *taskMessage) error { + bytes, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("could not encode task into JSON: %v", err) + } + qname := queuePrefix + msg.Queue + err = r.client.SAdd(allQueues, qname).Err() + if err != nil { + return fmt.Errorf("command SADD %q %q failed: %v", + allQueues, qname, err) + } + err = r.client.RPush(qname, string(bytes)).Err() + if err != nil { + return fmt.Errorf("command RPUSH %q %q failed: %v", + qname, string(bytes), err) + } + return nil +} + +// bpop blocks until there is a taskMessage available to be processed. +// bpop returns immediately if there are already taskMessages waiting to be processed. +func (r *rdb) bpop(timeout time.Duration, keys ...string) (*taskMessage, error) { + res, err := r.client.BLPop(5*time.Second, keys...).Result() // NOTE: BLPOP needs to time out because if case a new queue is added. + if err != nil { + if err != redis.Nil { + return nil, fmt.Errorf("command BLPOP %v %v failed: %v", + timeout, keys, err) + } + return nil, errQueuePopTimeout + } + q, data := res[0], res[1] + fmt.Printf("perform task %v from %s\n", data, q) + var msg taskMessage + err = json.Unmarshal([]byte(data), &msg) + if err != nil { + return nil, errDeserializeTask + } + return &msg, nil +} + +// zadd adds the taskMessage to the specified zset (sorted set) with the given score. +func (r *rdb) zadd(zset string, zscore float64, msg *taskMessage) error { + bytes, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("could not encode task into JSON: %v", err) + } + err = r.client.ZAdd(zset, &redis.Z{Member: string(bytes), Score: zscore}).Err() + if err != nil { + return fmt.Errorf("command ZADD %s %.1f %s failed: %v", + zset, zscore, string(bytes), err) + } + return nil +} + +func (r *rdb) zRangeByScore(key string, opt *redis.ZRangeBy) ([]*taskMessage, error) { + jobs, err := r.client.ZRangeByScore(key, opt).Result() + fmt.Printf("len(jobs) = %d\n", len(jobs)) + if err != nil { + return nil, fmt.Errorf("command ZRANGEBYSCORE %s %v failed: %v", key, opt, err) + } + var msgs []*taskMessage + for _, j := range jobs { + fmt.Printf("[debug] j = %v\n", j) + var msg taskMessage + err = json.Unmarshal([]byte(j), &msg) + if err != nil { + log.Printf("[WARNING] could not unmarshal task data %s: %v\n", j, err) + continue + } + msgs = append(msgs, &msg) + } + return msgs, nil +} + +// move moves taskMessage from zfrom to the specified queue. +func (r *rdb) move(from string, msg *taskMessage) error { + bytes, err := json.Marshal(msg) + if err != nil { + return errSerializeTask + } + if r.client.ZRem(from, string(bytes)).Val() > 0 { + err = r.push(msg) + if err != nil { + log.Printf("[SERVERE ERROR] could not push task to queue %q: %v\n", + msg.Queue, err) + // TODO(hibiken): Handle this error properly. + // Add back to zfrom? + return fmt.Errorf("could not push task %v from %q: %v", msg, msg.Queue, err) + } + } + return nil +} + +const maxDeadTask = 100 +const deadExpirationInDays = 90 + +// kill sends the taskMessage to "dead" set. +// It also trims the sorted set by timestamp and set size. +func (r *rdb) kill(msg *taskMessage) error { + bytes, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("could not encode task into JSON: %v", err) + } + now := time.Now() + pipe := r.client.Pipeline() + pipe.ZAdd(dead, &redis.Z{Member: string(bytes), Score: float64(now.Unix())}) + limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago + pipe.ZRemRangeByScore(dead, "-inf", strconv.Itoa(int(limit))) + pipe.ZRemRangeByRank(dead, 0, -maxDeadTask) // trim the set to 100 + _, err = pipe.Exec() + return err +} + +// listQueues returns the list of all queues. +func (r *rdb) listQueues() []string { + return r.client.SMembers(allQueues).Val() +}