mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 07:12:17 +08:00
Extract redis logic to type rdb
This commit is contained in:
parent
4c5b6081de
commit
85a04cbabb
94
asynq.go
94
asynq.go
@ -9,32 +9,18 @@ TODOs:
|
||||
*/
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v7"
|
||||
)
|
||||
|
||||
// Redis keys
|
||||
const (
|
||||
queuePrefix = "asynq:queues:" // LIST
|
||||
allQueues = "asynq:queues" // SET
|
||||
scheduled = "asynq:scheduled" // ZSET
|
||||
retry = "asynq:retry" // ZSET
|
||||
dead = "asynq:dead" // ZSET
|
||||
)
|
||||
|
||||
// Max retry count by default
|
||||
const defaultMaxRetry = 25
|
||||
|
||||
// Client is an interface for scheduling tasks.
|
||||
type Client struct {
|
||||
rdb *redis.Client
|
||||
rdb *rdb
|
||||
}
|
||||
|
||||
// Task represents a task to be performed.
|
||||
@ -76,8 +62,8 @@ type RedisOpt struct {
|
||||
|
||||
// NewClient creates and returns a new client.
|
||||
func NewClient(opt *RedisOpt) *Client {
|
||||
rdb := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password})
|
||||
return &Client{rdb: rdb}
|
||||
client := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password})
|
||||
return &Client{rdb: newRDB(client)}
|
||||
}
|
||||
|
||||
// Process enqueues the task to be performed at a given time.
|
||||
@ -94,17 +80,15 @@ func (c *Client) Process(task *Task, executeAt time.Time) error {
|
||||
// enqueue pushes a given task to the specified queue.
|
||||
func (c *Client) enqueue(msg *taskMessage, executeAt time.Time) error {
|
||||
if time.Now().After(executeAt) {
|
||||
return push(c.rdb, msg)
|
||||
return c.rdb.push(msg)
|
||||
}
|
||||
return zadd(c.rdb, scheduled, float64(executeAt.Unix()), msg)
|
||||
return c.rdb.zadd(scheduled, float64(executeAt.Unix()), msg)
|
||||
}
|
||||
|
||||
//-------------------- Launcher --------------------
|
||||
|
||||
// Launcher starts the manager and poller.
|
||||
type Launcher struct {
|
||||
rdb *redis.Client
|
||||
|
||||
// running indicates whether manager and poller are both running.
|
||||
running bool
|
||||
mu sync.Mutex
|
||||
@ -116,16 +100,11 @@ type Launcher struct {
|
||||
|
||||
// NewLauncher creates and returns a new Launcher.
|
||||
func NewLauncher(poolSize int, opt *RedisOpt) *Launcher {
|
||||
rdb := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password})
|
||||
poller := &poller{
|
||||
rdb: rdb,
|
||||
done: make(chan struct{}),
|
||||
avgInterval: 5 * time.Second,
|
||||
zsets: []string{scheduled, retry},
|
||||
}
|
||||
client := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password})
|
||||
rdb := newRDB(client)
|
||||
poller := newPoller(rdb, 5*time.Second, []string{scheduled, retry})
|
||||
manager := newManager(rdb, poolSize, nil)
|
||||
return &Launcher{
|
||||
rdb: rdb,
|
||||
poller: poller,
|
||||
manager: manager,
|
||||
}
|
||||
@ -160,60 +139,3 @@ func (l *Launcher) Stop() {
|
||||
l.poller.terminate()
|
||||
l.manager.terminate()
|
||||
}
|
||||
|
||||
// push pushes the task to the specified queue to get picked up by a worker.
|
||||
func push(rdb *redis.Client, msg *taskMessage) error {
|
||||
bytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not encode task into JSON: %v", err)
|
||||
}
|
||||
qname := queuePrefix + msg.Queue
|
||||
err = rdb.SAdd(allQueues, qname).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not execute command SADD %q %q: %v",
|
||||
allQueues, qname, err)
|
||||
}
|
||||
return rdb.RPush(qname, string(bytes)).Err()
|
||||
}
|
||||
|
||||
// zadd serializes the given message and adds to the specified sorted set.
|
||||
func zadd(rdb *redis.Client, zset string, zscore float64, msg *taskMessage) error {
|
||||
bytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not encode task into JSON: %v", err)
|
||||
}
|
||||
return rdb.ZAdd(zset, &redis.Z{Member: string(bytes), Score: zscore}).Err()
|
||||
}
|
||||
|
||||
const maxDeadTask = 100
|
||||
const deadExpirationInDays = 90
|
||||
|
||||
// kill sends the task to "dead" sorted set. It also trim the sorted set by
|
||||
// timestamp and set size.
|
||||
func kill(rdb *redis.Client, msg *taskMessage) error {
|
||||
bytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not encode task into JSON: %v", err)
|
||||
}
|
||||
now := time.Now()
|
||||
pipe := rdb.Pipeline()
|
||||
pipe.ZAdd(dead, &redis.Z{Member: string(bytes), Score: float64(now.Unix())})
|
||||
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
||||
pipe.ZRemRangeByScore(dead, "-inf", strconv.Itoa(int(limit)))
|
||||
pipe.ZRemRangeByRank(dead, 0, -maxDeadTask) // trim the set to 100
|
||||
_, err = pipe.Exec()
|
||||
return err
|
||||
}
|
||||
|
||||
// listQueues returns the list of all queues.
|
||||
func listQueues(rdb *redis.Client) []string {
|
||||
return rdb.SMembers(allQueues).Val()
|
||||
}
|
||||
|
||||
// delaySeconds returns a number seconds to delay before retrying.
|
||||
// Formula taken from https://github.com/mperham/sidekiq.
|
||||
func delaySeconds(count int) time.Duration {
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1))
|
||||
return time.Duration(s) * time.Second
|
||||
}
|
||||
|
44
manager.go
44
manager.go
@ -1,16 +1,15 @@
|
||||
package asynq
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v7"
|
||||
)
|
||||
|
||||
type manager struct {
|
||||
rdb *redis.Client
|
||||
rdb *rdb
|
||||
|
||||
handler TaskHandler
|
||||
|
||||
@ -22,7 +21,7 @@ type manager struct {
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func newManager(rdb *redis.Client, numWorkers int, handler TaskHandler) *manager {
|
||||
func newManager(rdb *rdb, numWorkers int, handler TaskHandler) *manager {
|
||||
return &manager{
|
||||
rdb: rdb,
|
||||
handler: handler,
|
||||
@ -63,22 +62,21 @@ func (m *manager) start() {
|
||||
func (m *manager) processTasks() {
|
||||
// pull message out of the queue and process it
|
||||
// TODO(hibiken): sort the list of queues in order of priority
|
||||
res, err := m.rdb.BLPop(5*time.Second, listQueues(m.rdb)...).Result() // NOTE: BLPOP needs to time out because if case a new queue is added.
|
||||
msg, err := m.rdb.bpop(5*time.Second, m.rdb.listQueues()...)
|
||||
if err != nil {
|
||||
if err != redis.Nil {
|
||||
log.Printf("BLPOP command failed: %v\n", err)
|
||||
switch err {
|
||||
case errQueuePopTimeout:
|
||||
// timed out, this is a normal behavior.
|
||||
return
|
||||
case errDeserializeTask:
|
||||
log.Println("[Servere Error] could not parse json encoded message")
|
||||
return
|
||||
default:
|
||||
log.Printf("[Servere Error] unexpected error while pulling message out of queues: %v\n", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
q, data := res[0], res[1]
|
||||
fmt.Printf("perform task %v from %s\n", data, q)
|
||||
var msg taskMessage
|
||||
err = json.Unmarshal([]byte(data), &msg)
|
||||
if err != nil {
|
||||
log.Printf("[Servere Error] could not parse json encoded message %s: %v", data, err)
|
||||
return
|
||||
}
|
||||
t := &Task{Type: msg.Type, Payload: msg.Payload}
|
||||
m.sema <- struct{}{} // acquire token
|
||||
go func(task *Task) {
|
||||
@ -87,7 +85,7 @@ func (m *manager) processTasks() {
|
||||
if err != nil {
|
||||
if msg.Retried >= msg.Retry {
|
||||
fmt.Println("Retry exhausted!!!")
|
||||
if err := kill(m.rdb, &msg); err != nil {
|
||||
if err := m.rdb.kill(msg); err != nil {
|
||||
log.Printf("[SERVER ERROR] could not add task %+v to 'dead' set\n", err)
|
||||
}
|
||||
return
|
||||
@ -97,7 +95,7 @@ func (m *manager) processTasks() {
|
||||
fmt.Printf("[DEBUG] retying the task in %v\n", retryAt.Sub(time.Now()))
|
||||
msg.Retried++
|
||||
msg.ErrorMsg = err.Error()
|
||||
if err := zadd(m.rdb, retry, float64(retryAt.Unix()), &msg); err != nil {
|
||||
if err := m.rdb.zadd(retry, float64(retryAt.Unix()), msg); err != nil {
|
||||
// TODO(hibiken): Not sure how to handle this error
|
||||
log.Printf("[SEVERE ERROR] could not add msg %+v to 'retry' set: %v\n", msg, err)
|
||||
return
|
||||
@ -105,3 +103,11 @@ func (m *manager) processTasks() {
|
||||
}
|
||||
}(t)
|
||||
}
|
||||
|
||||
// delaySeconds returns a number seconds to delay before retrying.
|
||||
// Formula taken from https://github.com/mperham/sidekiq.
|
||||
func delaySeconds(count int) time.Duration {
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
s := int(math.Pow(float64(count), 4)) + 15 + (r.Intn(30) * (count + 1))
|
||||
return time.Duration(s) * time.Second
|
||||
}
|
||||
|
43
poller.go
43
poller.go
@ -1,7 +1,6 @@
|
||||
package asynq
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
@ -11,7 +10,7 @@ import (
|
||||
)
|
||||
|
||||
type poller struct {
|
||||
rdb *redis.Client
|
||||
rdb *rdb
|
||||
|
||||
// channel to communicate back to the long running "poller" goroutine.
|
||||
done chan struct{}
|
||||
@ -23,6 +22,15 @@ type poller struct {
|
||||
zsets []string
|
||||
}
|
||||
|
||||
func newPoller(rdb *rdb, avgInterval time.Duration, zsets []string) *poller {
|
||||
return &poller{
|
||||
rdb: rdb,
|
||||
done: make(chan struct{}),
|
||||
avgInterval: avgInterval,
|
||||
zsets: zsets,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *poller) terminate() {
|
||||
// send a signal to the manager goroutine to stop
|
||||
// processing tasks from the queue.
|
||||
@ -51,38 +59,19 @@ func (p *poller) enqueue() {
|
||||
// Get next items in the queue with scores (time to execute) <= now.
|
||||
now := time.Now().Unix()
|
||||
fmt.Printf("[DEBUG] polling ZSET %q\n", zset)
|
||||
jobs, err := p.rdb.ZRangeByScore(zset,
|
||||
&redis.ZRangeBy{
|
||||
Min: "-inf",
|
||||
Max: strconv.Itoa(int(now))}).Result()
|
||||
fmt.Printf("len(jobs) = %d\n", len(jobs))
|
||||
msgs, err := p.rdb.zRangeByScore(zset,
|
||||
&redis.ZRangeBy{Min: "-inf", Max: strconv.Itoa(int(now))})
|
||||
if err != nil {
|
||||
log.Printf("radis command ZRANGEBYSCORE failed: %v\n", err)
|
||||
continue
|
||||
}
|
||||
if len(jobs) == 0 {
|
||||
fmt.Println("jobs empty")
|
||||
continue
|
||||
}
|
||||
|
||||
for _, j := range jobs {
|
||||
fmt.Printf("[debug] j = %v\n", j)
|
||||
var msg taskMessage
|
||||
err = json.Unmarshal([]byte(j), &msg)
|
||||
if err != nil {
|
||||
fmt.Println("unmarshal failed")
|
||||
for _, m := range msgs {
|
||||
if err := p.rdb.move(zset, m); err != nil {
|
||||
log.Printf("could not move task %+v to queue %q: %v",
|
||||
m, m.Queue, err)
|
||||
continue
|
||||
}
|
||||
|
||||
fmt.Println("[debug] ZREM")
|
||||
if p.rdb.ZRem(zset, j).Val() > 0 {
|
||||
err = push(p.rdb, &msg)
|
||||
if err != nil {
|
||||
log.Printf("could not push task to queue %q: %v", msg.Queue, err)
|
||||
// TODO(hibiken): Handle this error properly. Add back to scheduled ZSET?
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
155
rdb.go
Normal file
155
rdb.go
Normal file
@ -0,0 +1,155 @@
|
||||
package asynq
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v7"
|
||||
)
|
||||
|
||||
// Redis keys
|
||||
const (
|
||||
queuePrefix = "asynq:queues:" // LIST
|
||||
allQueues = "asynq:queues" // SET
|
||||
scheduled = "asynq:scheduled" // ZSET
|
||||
retry = "asynq:retry" // ZSET
|
||||
dead = "asynq:dead" // ZSET
|
||||
)
|
||||
|
||||
var (
|
||||
errQueuePopTimeout = errors.New("blocking queue pop operation timed out")
|
||||
errSerializeTask = errors.New("could not encode task message into json")
|
||||
errDeserializeTask = errors.New("could not decode task message from json")
|
||||
)
|
||||
|
||||
// rdb encapsulates the interaction with redis server.
|
||||
type rdb struct {
|
||||
client *redis.Client
|
||||
}
|
||||
|
||||
func newRDB(client *redis.Client) *rdb {
|
||||
return &rdb{client}
|
||||
}
|
||||
|
||||
// push enqueues the task to queue.
|
||||
func (r *rdb) push(msg *taskMessage) error {
|
||||
bytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not encode task into JSON: %v", err)
|
||||
}
|
||||
qname := queuePrefix + msg.Queue
|
||||
err = r.client.SAdd(allQueues, qname).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("command SADD %q %q failed: %v",
|
||||
allQueues, qname, err)
|
||||
}
|
||||
err = r.client.RPush(qname, string(bytes)).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("command RPUSH %q %q failed: %v",
|
||||
qname, string(bytes), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// bpop blocks until there is a taskMessage available to be processed.
|
||||
// bpop returns immediately if there are already taskMessages waiting to be processed.
|
||||
func (r *rdb) bpop(timeout time.Duration, keys ...string) (*taskMessage, error) {
|
||||
res, err := r.client.BLPop(5*time.Second, keys...).Result() // NOTE: BLPOP needs to time out because if case a new queue is added.
|
||||
if err != nil {
|
||||
if err != redis.Nil {
|
||||
return nil, fmt.Errorf("command BLPOP %v %v failed: %v",
|
||||
timeout, keys, err)
|
||||
}
|
||||
return nil, errQueuePopTimeout
|
||||
}
|
||||
q, data := res[0], res[1]
|
||||
fmt.Printf("perform task %v from %s\n", data, q)
|
||||
var msg taskMessage
|
||||
err = json.Unmarshal([]byte(data), &msg)
|
||||
if err != nil {
|
||||
return nil, errDeserializeTask
|
||||
}
|
||||
return &msg, nil
|
||||
}
|
||||
|
||||
// zadd adds the taskMessage to the specified zset (sorted set) with the given score.
|
||||
func (r *rdb) zadd(zset string, zscore float64, msg *taskMessage) error {
|
||||
bytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not encode task into JSON: %v", err)
|
||||
}
|
||||
err = r.client.ZAdd(zset, &redis.Z{Member: string(bytes), Score: zscore}).Err()
|
||||
if err != nil {
|
||||
return fmt.Errorf("command ZADD %s %.1f %s failed: %v",
|
||||
zset, zscore, string(bytes), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rdb) zRangeByScore(key string, opt *redis.ZRangeBy) ([]*taskMessage, error) {
|
||||
jobs, err := r.client.ZRangeByScore(key, opt).Result()
|
||||
fmt.Printf("len(jobs) = %d\n", len(jobs))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("command ZRANGEBYSCORE %s %v failed: %v", key, opt, err)
|
||||
}
|
||||
var msgs []*taskMessage
|
||||
for _, j := range jobs {
|
||||
fmt.Printf("[debug] j = %v\n", j)
|
||||
var msg taskMessage
|
||||
err = json.Unmarshal([]byte(j), &msg)
|
||||
if err != nil {
|
||||
log.Printf("[WARNING] could not unmarshal task data %s: %v\n", j, err)
|
||||
continue
|
||||
}
|
||||
msgs = append(msgs, &msg)
|
||||
}
|
||||
return msgs, nil
|
||||
}
|
||||
|
||||
// move moves taskMessage from zfrom to the specified queue.
|
||||
func (r *rdb) move(from string, msg *taskMessage) error {
|
||||
bytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return errSerializeTask
|
||||
}
|
||||
if r.client.ZRem(from, string(bytes)).Val() > 0 {
|
||||
err = r.push(msg)
|
||||
if err != nil {
|
||||
log.Printf("[SERVERE ERROR] could not push task to queue %q: %v\n",
|
||||
msg.Queue, err)
|
||||
// TODO(hibiken): Handle this error properly.
|
||||
// Add back to zfrom?
|
||||
return fmt.Errorf("could not push task %v from %q: %v", msg, msg.Queue, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
const maxDeadTask = 100
|
||||
const deadExpirationInDays = 90
|
||||
|
||||
// kill sends the taskMessage to "dead" set.
|
||||
// It also trims the sorted set by timestamp and set size.
|
||||
func (r *rdb) kill(msg *taskMessage) error {
|
||||
bytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not encode task into JSON: %v", err)
|
||||
}
|
||||
now := time.Now()
|
||||
pipe := r.client.Pipeline()
|
||||
pipe.ZAdd(dead, &redis.Z{Member: string(bytes), Score: float64(now.Unix())})
|
||||
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
||||
pipe.ZRemRangeByScore(dead, "-inf", strconv.Itoa(int(limit)))
|
||||
pipe.ZRemRangeByRank(dead, 0, -maxDeadTask) // trim the set to 100
|
||||
_, err = pipe.Exec()
|
||||
return err
|
||||
}
|
||||
|
||||
// listQueues returns the list of all queues.
|
||||
func (r *rdb) listQueues() []string {
|
||||
return r.client.SMembers(allQueues).Val()
|
||||
}
|
Loading…
Reference in New Issue
Block a user