mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Unblock processor shutdown process if processor is waiting for semaphore
token
This commit is contained in:
parent
24dd78b31c
commit
5ddba8ca98
@ -116,6 +116,25 @@ func (r *RDB) Done(msg *TaskMessage) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Requeue moves the task from in-progress queue to the default
|
||||
// queue.
|
||||
func (r *RDB) Requeue(msg *TaskMessage) error {
|
||||
bytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||
}
|
||||
// KEYS[1] -> asynq:in_progress
|
||||
// KEYS[2] -> asynq:queues:default
|
||||
// ARGV[1] -> taskMessage value
|
||||
script := redis.NewScript(`
|
||||
redis.call("LREM", KEYS[1], 0, ARGV[1])
|
||||
redis.call("RPUSH", KEYS[2], ARGV[1])
|
||||
return redis.status_reply("OK")
|
||||
`)
|
||||
_, err = script.Run(r.client, []string{inProgressQ, defaultQ}, string(bytes)).Result()
|
||||
return err
|
||||
}
|
||||
|
||||
// Schedule adds the task to the backlog queue to be processed in the future.
|
||||
func (r *RDB) Schedule(msg *TaskMessage, processAt time.Time) error {
|
||||
bytes, err := json.Marshal(msg)
|
||||
|
@ -112,6 +112,59 @@ func TestDone(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequeue(t *testing.T) {
|
||||
r := setup(t)
|
||||
t1 := newTaskMessage("send_email", nil)
|
||||
t2 := newTaskMessage("export_csv", nil)
|
||||
|
||||
tests := []struct {
|
||||
enqueued []*TaskMessage // initial state of the default queue
|
||||
inProgress []*TaskMessage // initial state of the in-progress list
|
||||
target *TaskMessage // task to requeue
|
||||
wantEnqueued []*TaskMessage // final state of the default queue
|
||||
wantInProgress []*TaskMessage // final state of the in-progress list
|
||||
}{
|
||||
{
|
||||
enqueued: []*TaskMessage{},
|
||||
inProgress: []*TaskMessage{t1, t2},
|
||||
target: t1,
|
||||
wantEnqueued: []*TaskMessage{t1},
|
||||
wantInProgress: []*TaskMessage{t2},
|
||||
},
|
||||
{
|
||||
enqueued: []*TaskMessage{t1},
|
||||
inProgress: []*TaskMessage{t2},
|
||||
target: t2,
|
||||
wantEnqueued: []*TaskMessage{t1, t2},
|
||||
wantInProgress: []*TaskMessage{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
flushDB(t, r) // clean up db before each test case
|
||||
seedDefaultQueue(t, r, tc.enqueued)
|
||||
seedInProgressQueue(t, r, tc.inProgress)
|
||||
|
||||
err := r.Requeue(tc.target)
|
||||
if err != nil {
|
||||
t.Errorf("(*RDB).Requeue(task) = %v, want nil", err)
|
||||
continue
|
||||
}
|
||||
|
||||
gotEnqueuedRaw := r.client.LRange(defaultQ, 0, -1).Val()
|
||||
gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw)
|
||||
if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q: (-want, +got):\n%s", defaultQ, diff)
|
||||
}
|
||||
|
||||
gotInProgressRaw := r.client.LRange(inProgressQ, 0, -1).Val()
|
||||
gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw)
|
||||
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" {
|
||||
t.Errorf("mismatch found in %q: (-want, +got):\n%s", inProgressQ, diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestKill(t *testing.T) {
|
||||
r := setup(t)
|
||||
t1 := newTaskMessage("send_email", nil)
|
||||
|
99
processor.go
99
processor.go
@ -21,14 +21,19 @@ type processor struct {
|
||||
// in case of a program shutdown or additon of a new queue.
|
||||
dequeueTimeout time.Duration
|
||||
|
||||
// running represents the state of the "processor" goroutine
|
||||
mu sync.Mutex
|
||||
running bool
|
||||
|
||||
// sema is a counting semaphore to ensure the number of active workers
|
||||
// does not exceed the limit.
|
||||
sema chan struct{}
|
||||
|
||||
// channel to communicate back to the long running "processor" goroutine.
|
||||
// once is used to send value to the channel only once.
|
||||
done chan struct{}
|
||||
once sync.Once
|
||||
|
||||
// shutdown channel is closed when the shutdown of the "processor" goroutine starts.
|
||||
shutdown chan struct{}
|
||||
|
||||
// quit channel communicates to the in-flight worker goroutines to stop.
|
||||
quit chan struct{}
|
||||
@ -38,9 +43,10 @@ func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor {
|
||||
return &processor{
|
||||
rdb: r,
|
||||
handler: handler,
|
||||
dequeueTimeout: 5 * time.Second,
|
||||
dequeueTimeout: 2 * time.Second,
|
||||
sema: make(chan struct{}, numWorkers),
|
||||
done: make(chan struct{}),
|
||||
shutdown: make(chan struct{}),
|
||||
quit: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
@ -48,12 +54,18 @@ func newProcessor(r *rdb.RDB, numWorkers int, handler Handler) *processor {
|
||||
// Note: stops only the "processor" goroutine, does not stop workers.
|
||||
// It's safe to call this method multiple times.
|
||||
func (p *processor) stop() {
|
||||
p.once.Do(func() {
|
||||
log.Println("[INFO] Processor shutting down...")
|
||||
// Signal the processor goroutine to stop processing tasks
|
||||
// from the queue.
|
||||
p.done <- struct{}{}
|
||||
})
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if !p.running {
|
||||
return
|
||||
}
|
||||
log.Println("[INFO] Processor shutting down...")
|
||||
// Unblock if processor is waiting for sema token.
|
||||
close(p.shutdown)
|
||||
// Signal the processor goroutine to stop processing tasks
|
||||
// from the queue.
|
||||
p.done <- struct{}{}
|
||||
p.running = false
|
||||
}
|
||||
|
||||
// NOTE: once terminated, processor cannot be re-started.
|
||||
@ -73,6 +85,11 @@ func (p *processor) terminate() {
|
||||
}
|
||||
|
||||
func (p *processor) start() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if p.running {
|
||||
return
|
||||
}
|
||||
// NOTE: The call to "restore" needs to complete before starting
|
||||
// the processor goroutine.
|
||||
p.restore()
|
||||
@ -87,6 +104,7 @@ func (p *processor) start() {
|
||||
}
|
||||
}
|
||||
}()
|
||||
p.running = true
|
||||
}
|
||||
|
||||
// exec pulls a task out of the queue and starts a worker goroutine to
|
||||
@ -102,37 +120,43 @@ func (p *processor) exec() {
|
||||
return
|
||||
}
|
||||
|
||||
p.sema <- struct{}{} // acquire token
|
||||
go func() {
|
||||
defer func() { <-p.sema /* release token */ }()
|
||||
|
||||
resCh := make(chan error, 1)
|
||||
task := &Task{Type: msg.Type, Payload: msg.Payload}
|
||||
select {
|
||||
case <-p.shutdown:
|
||||
// shutdown is starting, return immediately after requeuing the message.
|
||||
p.requeue(msg)
|
||||
return
|
||||
case p.sema <- struct{}{}: // acquire token
|
||||
go func() {
|
||||
resCh <- perform(p.handler, task)
|
||||
}()
|
||||
defer func() { <-p.sema /* release token */ }()
|
||||
|
||||
select {
|
||||
case <-p.quit:
|
||||
// time is up, quit this worker goroutine.
|
||||
return
|
||||
case resErr := <-resCh:
|
||||
// Note: One of three things should happen.
|
||||
// 1) Done -> Removes the message from InProgress
|
||||
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry
|
||||
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead
|
||||
if resErr != nil {
|
||||
if msg.Retried >= msg.Retry {
|
||||
p.kill(msg, resErr.Error())
|
||||
resCh := make(chan error, 1)
|
||||
task := &Task{Type: msg.Type, Payload: msg.Payload}
|
||||
go func() {
|
||||
resCh <- perform(p.handler, task)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-p.quit:
|
||||
// time is up, quit this worker goroutine.
|
||||
return
|
||||
case resErr := <-resCh:
|
||||
// Note: One of three things should happen.
|
||||
// 1) Done -> Removes the message from InProgress
|
||||
// 2) Retry -> Removes the message from InProgress & Adds the message to Retry
|
||||
// 3) Kill -> Removes the message from InProgress & Adds the message to Dead
|
||||
if resErr != nil {
|
||||
if msg.Retried >= msg.Retry {
|
||||
p.kill(msg, resErr.Error())
|
||||
return
|
||||
}
|
||||
p.retry(msg, resErr.Error())
|
||||
return
|
||||
}
|
||||
p.retry(msg, resErr.Error())
|
||||
p.markAsDone(msg)
|
||||
return
|
||||
}
|
||||
p.markAsDone(msg)
|
||||
return
|
||||
}
|
||||
}()
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// restore moves all tasks from "in-progress" back to queue
|
||||
@ -144,6 +168,13 @@ func (p *processor) restore() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *processor) requeue(msg *rdb.TaskMessage) {
|
||||
err := p.rdb.Requeue(msg)
|
||||
if err != nil {
|
||||
log.Printf("[ERROR] Could not move task from InProgress back to queue: %v\n", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *processor) markAsDone(msg *rdb.TaskMessage) {
|
||||
err := p.rdb.Done(msg)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user