diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index e44c051..b75922c 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -123,6 +123,7 @@ func (r *RDB) Requeue(msg *TaskMessage) error { if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } + // Note: Use RPUSH to push to the head of the queue. // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:queues:default // ARGV[1] -> taskMessage value diff --git a/processor.go b/processor.go index 9d88ffd..f0650fa 100644 --- a/processor.go +++ b/processor.go @@ -21,16 +21,14 @@ type processor struct { // in case of a program shutdown or additon of a new queue. dequeueTimeout time.Duration - // running represents the state of the "processor" goroutine - mu sync.Mutex - running bool - // sema is a counting semaphore to ensure the number of active workers // does not exceed the limit. sema chan struct{} // channel to communicate back to the long running "processor" goroutine. + // once is used to send value to the channel only once. done chan struct{} + once sync.Once // shutdown channel is closed when the shutdown of the "processor" goroutine starts. shutdown chan struct{} @@ -54,18 +52,14 @@ func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor { // Note: stops only the "processor" goroutine, does not stop workers. // It's safe to call this method multiple times. func (p *processor) stop() { - p.mu.Lock() - defer p.mu.Unlock() - if !p.running { - return - } - log.Println("[INFO] Processor shutting down...") - // Unblock if processor is waiting for sema token. - close(p.shutdown) - // Signal the processor goroutine to stop processing tasks - // from the queue. - p.done <- struct{}{} - p.running = false + p.once.Do(func() { + log.Println("[INFO] Processor shutting down...") + // Unblock if processor is waiting for sema token. + close(p.shutdown) + // Signal the processor goroutine to stop processing tasks + // from the queue. + p.done <- struct{}{} + }) } // NOTE: once terminated, processor cannot be re-started. @@ -85,11 +79,6 @@ func (p *processor) terminate() { } func (p *processor) start() { - p.mu.Lock() - defer p.mu.Unlock() - if p.running { - return - } // NOTE: The call to "restore" needs to complete before starting // the processor goroutine. p.restore() @@ -104,7 +93,6 @@ func (p *processor) start() { } } }() - p.running = true } // exec pulls a task out of the queue and starts a worker goroutine to