diff --git a/processor.go b/processor.go index 87a3570..71770cb 100644 --- a/processor.go +++ b/processor.go @@ -81,31 +81,12 @@ func (p *processor) exec() { task := &Task{Type: msg.Type, Payload: msg.Payload} p.sema <- struct{}{} // acquire token go func(task *Task) { - quit := make(chan struct{}) // channel to signal heartbeat goroutine defer func() { - quit <- struct{}{} if err := p.rdb.srem(inProgress, msg); err != nil { log.Printf("[SERVER ERROR] SREM failed: %v\n", err) } - if err := p.rdb.clearHeartbeat(msg.ID); err != nil { - log.Printf("[SERVER ERROR] DEL heartbeat failed: %v\n", err) - } <-p.sema // release token }() - // start "heartbeat" goroutine - go func() { - ticker := time.NewTicker(5 * time.Second) - for { - select { - case <-quit: - return - case t := <-ticker.C: - if err := p.rdb.heartbeat(msg.ID, t); err != nil { - log.Printf("[ERROR] heartbeat failed for %v at %v: %v", msg.ID, t, err) - } - } - } - }() err := p.handler(task) // TODO(hibiken): maybe also handle panic? if err != nil { retryTask(p.rdb, msg, err) diff --git a/rdb.go b/rdb.go index d4e00e7..5a4b692 100644 --- a/rdb.go +++ b/rdb.go @@ -9,7 +9,6 @@ import ( "time" "github.com/go-redis/redis/v7" - "github.com/google/uuid" ) // Redis keys @@ -147,24 +146,6 @@ func (r *rdb) move(from string, msg *taskMessage) error { return nil } -func (r *rdb) heartbeat(id uuid.UUID, timestamp time.Time) error { - key := heartbeatPrefix + id.String() - err := r.client.Set(key, timestamp, 0).Err() // zero expiration means no expiration - if err != nil { - return fmt.Errorf("command SET %s %v failed: %v", key, timestamp, err) - } - return nil -} - -func (r *rdb) clearHeartbeat(id uuid.UUID) error { - key := heartbeatPrefix + id.String() - err := r.client.Del(key).Err() - if err != nil { - return fmt.Errorf("command DEL %s failed: %v", key, err) - } - return nil -} - const maxDeadTask = 100 const deadExpirationInDays = 90