2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Use sync.Once

This commit is contained in:
Ken Hibino 2019-12-17 20:34:56 -08:00
parent c40e779fdb
commit 69b46a7f0d
2 changed files with 11 additions and 22 deletions

View File

@ -123,6 +123,7 @@ func (r *RDB) Requeue(msg *TaskMessage) error {
if err != nil { if err != nil {
return fmt.Errorf("could not marshal %+v to json: %v", msg, err) return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
} }
// Note: Use RPUSH to push to the head of the queue.
// KEYS[1] -> asynq:in_progress // KEYS[1] -> asynq:in_progress
// KEYS[2] -> asynq:queues:default // KEYS[2] -> asynq:queues:default
// ARGV[1] -> taskMessage value // ARGV[1] -> taskMessage value

View File

@ -21,16 +21,14 @@ type processor struct {
// in case of a program shutdown or additon of a new queue. // in case of a program shutdown or additon of a new queue.
dequeueTimeout time.Duration dequeueTimeout time.Duration
// running represents the state of the "processor" goroutine
mu sync.Mutex
running bool
// sema is a counting semaphore to ensure the number of active workers // sema is a counting semaphore to ensure the number of active workers
// does not exceed the limit. // does not exceed the limit.
sema chan struct{} sema chan struct{}
// channel to communicate back to the long running "processor" goroutine. // channel to communicate back to the long running "processor" goroutine.
// once is used to send value to the channel only once.
done chan struct{} done chan struct{}
once sync.Once
// shutdown channel is closed when the shutdown of the "processor" goroutine starts. // shutdown channel is closed when the shutdown of the "processor" goroutine starts.
shutdown chan struct{} shutdown chan struct{}
@ -54,18 +52,14 @@ func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor {
// Note: stops only the "processor" goroutine, does not stop workers. // Note: stops only the "processor" goroutine, does not stop workers.
// It's safe to call this method multiple times. // It's safe to call this method multiple times.
func (p *processor) stop() { func (p *processor) stop() {
p.mu.Lock() p.once.Do(func() {
defer p.mu.Unlock() log.Println("[INFO] Processor shutting down...")
if !p.running { // Unblock if processor is waiting for sema token.
return close(p.shutdown)
} // Signal the processor goroutine to stop processing tasks
log.Println("[INFO] Processor shutting down...") // from the queue.
// Unblock if processor is waiting for sema token. p.done <- struct{}{}
close(p.shutdown) })
// Signal the processor goroutine to stop processing tasks
// from the queue.
p.done <- struct{}{}
p.running = false
} }
// NOTE: once terminated, processor cannot be re-started. // NOTE: once terminated, processor cannot be re-started.
@ -85,11 +79,6 @@ func (p *processor) terminate() {
} }
func (p *processor) start() { func (p *processor) start() {
p.mu.Lock()
defer p.mu.Unlock()
if p.running {
return
}
// NOTE: The call to "restore" needs to complete before starting // NOTE: The call to "restore" needs to complete before starting
// the processor goroutine. // the processor goroutine.
p.restore() p.restore()
@ -104,7 +93,6 @@ func (p *processor) start() {
} }
} }
}() }()
p.running = true
} }
// exec pulls a task out of the queue and starts a worker goroutine to // exec pulls a task out of the queue and starts a worker goroutine to