diff --git a/processor.go b/processor.go index eb91ad8..87a3570 100644 --- a/processor.go +++ b/processor.go @@ -78,18 +78,37 @@ func (p *processor) exec() { } } - t := &Task{Type: msg.Type, Payload: msg.Payload} + task := &Task{Type: msg.Type, Payload: msg.Payload} p.sema <- struct{}{} // acquire token go func(task *Task) { + quit := make(chan struct{}) // channel to signal heartbeat goroutine defer func() { + quit <- struct{}{} if err := p.rdb.srem(inProgress, msg); err != nil { log.Printf("[SERVER ERROR] SREM failed: %v\n", err) } + if err := p.rdb.clearHeartbeat(msg.ID); err != nil { + log.Printf("[SERVER ERROR] DEL heartbeat failed: %v\n", err) + } <-p.sema // release token }() - err := p.handler(task) + // start "heartbeat" goroutine + go func() { + ticker := time.NewTicker(5 * time.Second) + for { + select { + case <-quit: + return + case t := <-ticker.C: + if err := p.rdb.heartbeat(msg.ID, t); err != nil { + log.Printf("[ERROR] heartbeat failed for %v at %v: %v", msg.ID, t, err) + } + } + } + }() + err := p.handler(task) // TODO(hibiken): maybe also handle panic? if err != nil { retryTask(p.rdb, msg, err) } - }(t) + }(task) } diff --git a/rdb.go b/rdb.go index 4aa7da5..d4e00e7 100644 --- a/rdb.go +++ b/rdb.go @@ -9,16 +9,18 @@ import ( "time" "github.com/go-redis/redis/v7" + "github.com/google/uuid" ) // Redis keys const ( - queuePrefix = "asynq:queues:" // LIST - allQueues = "asynq:queues" // SET - scheduled = "asynq:scheduled" // ZSET - retry = "asynq:retry" // ZSET - dead = "asynq:dead" // ZSET - inProgress = "asynq:in_progress" // SET + queuePrefix = "asynq:queues:" // LIST - asynq:queues: + allQueues = "asynq:queues" // SET + scheduled = "asynq:scheduled" // ZSET + retry = "asynq:retry" // ZSET + dead = "asynq:dead" // ZSET + inProgress = "asynq:in_progress" // SET + heartbeatPrefix = "asynq:heartbeat:" // STRING - asynq:heartbeat: ) var ( @@ -145,6 +147,24 @@ func (r *rdb) move(from string, msg *taskMessage) error { return nil } +func (r *rdb) heartbeat(id uuid.UUID, timestamp time.Time) error { + key := heartbeatPrefix + id.String() + err := r.client.Set(key, timestamp, 0).Err() // zero expiration means no expiration + if err != nil { + return fmt.Errorf("command SET %s %v failed: %v", key, timestamp, err) + } + return nil +} + +func (r *rdb) clearHeartbeat(id uuid.UUID) error { + key := heartbeatPrefix + id.String() + err := r.client.Del(key).Err() + if err != nil { + return fmt.Errorf("command DEL %s failed: %v", key, err) + } + return nil +} + const maxDeadTask = 100 const deadExpirationInDays = 90