diff --git a/asynq.go b/asynq.go index 5aa98cc..0d1c814 100644 --- a/asynq.go +++ b/asynq.go @@ -1,11 +1,19 @@ package asynq +/* +TODOs: +- [P0] Task error handling +- [P0] Retry +- [P0] Dead task (retry exausted) +- [P0] Shutdown all workers gracefully when killed +- [P1] Add Support for multiple queues +*/ + import ( "encoding/json" "fmt" "log" "strconv" - "strings" "time" "github.com/go-redis/redis/v7" @@ -26,8 +34,12 @@ type Client struct { // Task represents a task to be performed. type Task struct { - Handler string - Args []interface{} + // Type indicates the kind of the task to be performed. + Type string + + // Payload is an arbitrary data needed for task execution. + // The value has to be serializable. + Payload map[string]interface{} } type delayedTask struct { @@ -48,8 +60,13 @@ func NewClient(opt *RedisOpt) *Client { return &Client{rdb: rdb} } -// Enqueue pushes a given task to the specified queue. -func (c *Client) Enqueue(queue string, task *Task, executeAt time.Time) error { +// Process enqueues the task to be performed at a given time. +func (c *Client) Process(task *Task, executeAt time.Time) error { + return c.enqueue("default", task, executeAt) +} + +// enqueue pushes a given task to the specified queue. +func (c *Client) enqueue(queue string, task *Task, executeAt time.Time) error { if time.Now().After(executeAt) { return push(c.rdb, queue, task) } @@ -74,9 +91,6 @@ type Workers struct { // poolTokens is a counting semaphore to ensure the number of active workers // does not exceed the limit. poolTokens chan struct{} - - // handlers maps queue name to a handler for that queue's messages. - handlers map[string]func(msg string) } // NewWorkers creates and returns a new Workers. @@ -85,17 +99,14 @@ func NewWorkers(poolSize int, opt *RedisOpt) *Workers { return &Workers{ rdb: rdb, poolTokens: make(chan struct{}, poolSize), - handlers: make(map[string]func(string)), } } -// Handle registers a handler function for a given queue. -func (w *Workers) Handle(q string, fn func(msg string)) { - w.handlers[q] = fn -} +// TaskHandler handles a given task and report any error. +type TaskHandler func(*Task) error -// Run starts the workers and scheduler. -func (w *Workers) Run() { +// Run starts the workers and scheduler with a given handler. +func (w *Workers) Run(handler TaskHandler) { go w.pollScheduledTasks() for { @@ -111,16 +122,17 @@ func (w *Workers) Run() { q, msg := res[0], res[1] fmt.Printf("perform task %v from %s\n", msg, q) - handler, ok := w.handlers[strings.TrimPrefix(q, queuePrefix)] - if !ok { - log.Printf("no handler found for queue %q\n", strings.TrimPrefix(q, queuePrefix)) + var task Task + err = json.Unmarshal([]byte(msg), &task) + if err != nil { + log.Printf("[Servere Error] could not parse json encoded message %s: %v", msg, err) continue } w.poolTokens <- struct{}{} // acquire a token - go func(msg string) { - handler(msg) + go func(task *Task) { + handler(task) <-w.poolTokens - }(msg) + }(&task) } }