diff --git a/processor.go b/processor.go index e7d41e0..7bf0694 100644 --- a/processor.go +++ b/processor.go @@ -34,8 +34,8 @@ func (p *processor) terminate() { p.done <- struct{}{} fmt.Print("Waiting for all workers to finish...") + // block until all workers have released the token for i := 0; i < cap(p.sema); i++ { - // block until all workers have released the token p.sema <- struct{}{} } fmt.Println("Done") @@ -65,18 +65,13 @@ func (p *processor) exec() { // in case of a program shutdown or additon of a new queue. const timeout = 5 * time.Second msg, err := p.rdb.dequeue(defaultQueue, timeout) + if err == errDequeueTimeout { + // timed out, this is a normal behavior. + return + } if err != nil { - switch err { - case errQueuePopTimeout: - // timed out, this is a normal behavior. - return - case errDeserializeTask: - log.Println("[Error] could not parse json encoded message") - return - default: - log.Printf("[Error] unexpected error while pulling message out of queues: %v\n", err) - return - } + log.Printf("[ERROR] unexpected error while pulling a task out of queue: %v\n", err) + return } task := &Task{Type: msg.Type, Payload: msg.Payload} diff --git a/rdb.go b/rdb.go index c84ffd0..4dcb107 100644 --- a/rdb.go +++ b/rdb.go @@ -22,7 +22,7 @@ const ( ) var ( - errQueuePopTimeout = errors.New("blocking queue pop operation timed out") + errDequeueTimeout = errors.New("blocking dequeue operation timed out") errSerializeTask = errors.New("could not encode task message into json") errDeserializeTask = errors.New("could not decode task message from json") ) @@ -65,11 +65,11 @@ func (r *rdb) enqueue(msg *taskMessage) error { // and returns the task. func (r *rdb) dequeue(qname string, timeout time.Duration) (*taskMessage, error) { data, err := r.client.BRPopLPush(qname, inProgress, timeout).Result() + if err == redis.Nil { + return nil, errDequeueTimeout + } if err != nil { - if err != redis.Nil { - return nil, fmt.Errorf("command BRPOPLPUSH %q %q %v failed: %v", qname, inProgress, timeout, err) - } - return nil, errQueuePopTimeout + return nil, fmt.Errorf("command BRPOPLPUSH %q %q %v failed: %v", qname, inProgress, timeout, err) } var msg taskMessage err = json.Unmarshal([]byte(data), &msg) diff --git a/rdb_test.go b/rdb_test.go index 0ed8af5..b5bf734 100644 --- a/rdb_test.go +++ b/rdb_test.go @@ -98,7 +98,7 @@ func TestDequeue(t *testing.T) { inProgress int64 // length of "in-progress" tasks after dequeue }{ {queued: []*taskMessage{t1}, want: t1, err: nil, inProgress: 1}, - {queued: []*taskMessage{}, want: nil, err: errQueuePopTimeout, inProgress: 0}, + {queued: []*taskMessage{}, want: nil, err: errDequeueTimeout, inProgress: 0}, } for _, tc := range tests {