From e362e0dacac4751528dc14be6019464494d7fed6 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 15 Nov 2019 07:21:25 -0800 Subject: [PATCH] Add workers --- asynq.go | 123 ++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 118 insertions(+), 5 deletions(-) diff --git a/asynq.go b/asynq.go index 2689063..f34d5bf 100644 --- a/asynq.go +++ b/asynq.go @@ -2,6 +2,10 @@ package asynq import ( "encoding/json" + "fmt" + "log" + "strconv" + "strings" "time" "github.com/go-redis/redis/v7" @@ -11,6 +15,7 @@ import ( // Redis keys const ( queuePrefix = "asynq:queues:" + allQueues = "asynq:queues" scheduled = "asynq:scheduled" ) @@ -44,19 +49,19 @@ func NewClient(opt *RedisOpt) *Client { } // Enqueue pushes a given task to the specified queue. -func (c *Client) Enqueue(queue string, task *Task, delay time.Duration) error { - if delay == 0 { +func (c *Client) Enqueue(queue string, task *Task, executeAt time.Time) error { + if time.Now().After(executeAt) { + fmt.Println("going directly to queue") bytes, err := json.Marshal(task) if err != nil { return err } return c.rdb.RPush(queuePrefix+queue, string(bytes)).Err() } - executeAt := time.Now().Add(delay) dt := &delayedTask{ - ID: uuid.New().String(), + ID: uuid.New().String(), Queue: queue, - Task: task, + Task: task, } bytes, err := json.Marshal(dt) if err != nil { @@ -64,3 +69,111 @@ func (c *Client) Enqueue(queue string, task *Task, delay time.Duration) error { } return c.rdb.ZAdd(scheduled, &redis.Z{Member: string(bytes), Score: float64(executeAt.Unix())}).Err() } + +//-------------------- Workers -------------------- + +// Workers represents a pool of workers. +type Workers struct { + rdb *redis.Client + + // poolTokens is a counting semaphore to ensure the number of active workers + // does not exceed the limit. + poolTokens chan struct{} + + // handlers maps queue name to a handler for that queue's messages. + handlers map[string]func(msg string) +} + +// NewWorkers creates and returns a new Workers. +func NewWorkers(poolSize int, opt *RedisOpt) *Workers { + rdb := redis.NewClient(&redis.Options{Addr: opt.Addr, Password: opt.Password}) + return &Workers{ + rdb: rdb, + poolTokens: make(chan struct{}, poolSize), + handlers: make(map[string]func(string)), + } +} + +// Handle registers a handler function for a given queue. +func (w *Workers) Handle(q string, fn func(msg string)) { + w.handlers[q] = fn +} + +// Run starts the workers and scheduler. +func (w *Workers) Run() { + go w.pollScheduledTasks() + + for { + // pull message out of the queue and process it + // TODO(hibiken): get a list of queues in order of priority + res, err := w.rdb.BLPop(0, "asynq:queues:test").Result() // A timeout of zero means block indefinitely. + if err != nil { + if err != redis.Nil { + log.Printf("error when BLPOP from %s: %v\n", "aysnq:queues:test", err) + } + continue + } + + q, msg := res[0], res[1] + fmt.Printf("perform task %v from %s\n", msg, q) + handler, ok := w.handlers[strings.TrimPrefix(q, queuePrefix)] + if !ok { + log.Printf("no handler found for queue %q\n", strings.TrimPrefix(q, queuePrefix)) + continue + } + w.poolTokens <- struct{}{} // acquire a token + go func(msg string) { + handler(msg) + <-w.poolTokens + }(msg) + } +} + +func (w *Workers) pollScheduledTasks() { + for { + // Get next items in the queue with scores (time to execute) <= now. + now := time.Now().Unix() + jobs, err := w.rdb.ZRangeByScore(scheduled, + &redis.ZRangeBy{ + Min: "-inf", + Max: strconv.Itoa(int(now))}).Result() + fmt.Printf("len(jobs) = %d\n", len(jobs)) + if err != nil { + log.Printf("radis command ZRANGEBYSCORE failed: %v\n", err) + continue + } + if len(jobs) == 0 { + fmt.Println("jobs empty") + time.Sleep(5 * time.Second) + continue + } + + for _, j := range jobs { + var job delayedTask + err = json.Unmarshal([]byte(j), &job) + if err != nil { + fmt.Println("unmarshal failed") + continue + } + + // TODO(hibiken): Acquire lock for job.ID + pipe := w.rdb.TxPipeline() + pipe.ZRem(scheduled, j) + // Do we need to encode this again? + // Can we skip this entirely by defining Task field to be a string field? + bytes, err := json.Marshal(job.Task) + if err != nil { + log.Printf("could not marshal job.Task %v: %v\n", job.Task, err) + pipe.Discard() + continue + } + pipe.RPush(queuePrefix+job.Queue, string(bytes)) + _, err = pipe.Exec() + if err != nil { + log.Printf("could not execute pipeline: %v\n", err) + continue + } + // TODO(hibiken): Release lock for job.ID + } + } +}