diff --git a/asynq.go b/asynq.go index f34d5bf..387bac5 100644 --- a/asynq.go +++ b/asynq.go @@ -51,7 +51,6 @@ func NewClient(opt *RedisOpt) *Client { // Enqueue pushes a given task to the specified queue. func (c *Client) Enqueue(queue string, task *Task, executeAt time.Time) error { if time.Now().After(executeAt) { - fmt.Println("going directly to queue") bytes, err := json.Marshal(task) if err != nil { return err @@ -156,24 +155,23 @@ func (w *Workers) pollScheduledTasks() { continue } - // TODO(hibiken): Acquire lock for job.ID - pipe := w.rdb.TxPipeline() - pipe.ZRem(scheduled, j) - // Do we need to encode this again? - // Can we skip this entirely by defining Task field to be a string field? - bytes, err := json.Marshal(job.Task) - if err != nil { - log.Printf("could not marshal job.Task %v: %v\n", job.Task, err) - pipe.Discard() - continue + if w.rdb.ZRem(scheduled, j).Val() > 0 { + // Do we need to encode this again? + // Can we skip this entirely by defining Task field to be a string field? + bytes, err := json.Marshal(job.Task) + if err != nil { + log.Printf("could not marshal job.Task %v: %v\n", job.Task, err) + // TODO(hibiken): Put item that cannot marshal into dead queue. + continue + } + err = w.rdb.RPush(queuePrefix+job.Queue, string(bytes)).Err() + if err != nil { + log.Printf("command RPUSH %q %s failed: %v", + queuePrefix+job.Queue, string(bytes), err) + // TODO(hibiken): Handle this error properly. Add back to scheduled ZSET? + continue + } } - pipe.RPush(queuePrefix+job.Queue, string(bytes)) - _, err = pipe.Exec() - if err != nil { - log.Printf("could not execute pipeline: %v\n", err) - continue - } - // TODO(hibiken): Release lock for job.ID } } }