diff --git a/asynq.go b/asynq.go index 0d1c814..15f6adb 100644 --- a/asynq.go +++ b/asynq.go @@ -17,7 +17,6 @@ import ( "time" "github.com/go-redis/redis/v7" - "github.com/google/uuid" ) // Redis keys @@ -27,6 +26,9 @@ const ( scheduled = "asynq:scheduled" ) +// Max retry count by default +const defaultMaxRetry = 25 + // Client is an interface for scheduling tasks. type Client struct { rdb *redis.Client @@ -42,10 +44,25 @@ type Task struct { Payload map[string]interface{} } -type delayedTask struct { - ID string +// taskMessage is an internal representation of a task with additional metadata fields. +// This data gets written in redis. +type taskMessage struct { + // fields from type Task + Type string + Payload map[string]interface{} + + //------- metadata fields ---------- + // queue name this message should be enqueued to Queue string - Task *Task + + // remainig retry count + Retry int + + // number of times we've retried so far + Retried int + + // error message from the last failure + ErrorMsg string } // RedisOpt specifies redis options. @@ -67,15 +84,16 @@ func (c *Client) Process(task *Task, executeAt time.Time) error { // enqueue pushes a given task to the specified queue. func (c *Client) enqueue(queue string, task *Task, executeAt time.Time) error { + msg := &taskMessage{ + Type: task.Type, + Payload: task.Payload, + Queue: queue, + Retry: defaultMaxRetry, + } if time.Now().After(executeAt) { - return push(c.rdb, queue, task) + return push(c.rdb, msg) } - dt := &delayedTask{ - ID: uuid.New().String(), - Queue: queue, - Task: task, - } - bytes, err := json.Marshal(dt) + bytes, err := json.Marshal(msg) if err != nil { return err } @@ -120,19 +138,25 @@ func (w *Workers) Run(handler TaskHandler) { continue } - q, msg := res[0], res[1] - fmt.Printf("perform task %v from %s\n", msg, q) - var task Task - err = json.Unmarshal([]byte(msg), &task) + q, data := res[0], res[1] + fmt.Printf("perform task %v from %s\n", data, q) + var msg taskMessage + err = json.Unmarshal([]byte(data), &msg) if err != nil { - log.Printf("[Servere Error] could not parse json encoded message %s: %v", msg, err) + log.Printf("[Servere Error] could not parse json encoded message %s: %v", data, err) continue } w.poolTokens <- struct{}{} // acquire a token + t := &Task{Type: msg.Type, Payload: msg.Payload} go func(task *Task) { - handler(task) - <-w.poolTokens - }(&task) + err := handler(task) + if err != nil { + fmt.Println("RETRY!!!") + //timeout := 10 * time.Second // TODO(hibiken): Implement exponential backoff. + // TODO(hibiken): Enqueue the task to "retry" ZSET with some timeout + } + <-w.poolTokens // release the token + }(t) } } @@ -156,17 +180,17 @@ func (w *Workers) pollScheduledTasks() { } for _, j := range jobs { - var job delayedTask - err = json.Unmarshal([]byte(j), &job) + var msg taskMessage + err = json.Unmarshal([]byte(j), &msg) if err != nil { fmt.Println("unmarshal failed") continue } if w.rdb.ZRem(scheduled, j).Val() > 0 { - err = push(w.rdb, job.Queue, job.Task) + err = push(w.rdb, &msg) if err != nil { - log.Printf("could not push task to queue %q: %v", job.Queue, err) + log.Printf("could not push task to queue %q: %v", msg.Queue, err) // TODO(hibiken): Handle this error properly. Add back to scheduled ZSET? continue } @@ -176,12 +200,12 @@ func (w *Workers) pollScheduledTasks() { } // push pushes the task to the specified queue to get picked up by a worker. -func push(rdb *redis.Client, queue string, t *Task) error { - bytes, err := json.Marshal(t) +func push(rdb *redis.Client, msg *taskMessage) error { + bytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not encode task into JSON: %v", err) } - qname := queuePrefix + queue + qname := queuePrefix + msg.Queue err = rdb.SAdd(allQueues, qname).Err() if err != nil { return fmt.Errorf("could not execute command SADD %q %q: %v",