diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index fcf0103..39bbd1d 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -52,7 +52,7 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { } // Dequeue queries given queues in order and pops a task message if there -// is one and returns it. If all queues are empty, ErrNoProcessableTask +// is one and returns it. If all queues are empty, ErrNoProcessableTask // error is returned. func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) { var keys []string diff --git a/processor.go b/processor.go index 214f8db..a9f71a7 100644 --- a/processor.go +++ b/processor.go @@ -24,11 +24,6 @@ type processor struct { retryDelayFunc retryDelayFunc - // timeout for blocking dequeue operation. - // dequeue needs to timeout to avoid blocking forever - // in case of a program shutdown or additon of a new queue. - dequeueTimeout time.Duration - // sema is a counting semaphore to ensure the number of active workers // does not exceed the limit. sema chan struct{} @@ -52,7 +47,6 @@ func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, fn retryDelayFunc) *p rdb: r, queueConfig: qcfg, retryDelayFunc: fn, - dequeueTimeout: 2 * time.Second, sema: make(chan struct{}, n), done: make(chan struct{}), abort: make(chan struct{}), diff --git a/processor_test.go b/processor_test.go index 83113b6..cca97ea 100644 --- a/processor_test.go +++ b/processor_test.go @@ -66,7 +66,6 @@ func TestProcessorSuccess(t *testing.T) { } p := newProcessor(rdbClient, 10, defaultQueueConfig, defaultDelayFunc) p.handler = HandlerFunc(handler) - p.dequeueTimeout = time.Second // short time out for test purpose p.start() for _, msg := range tc.incoming { @@ -150,7 +149,6 @@ func TestProcessorRetry(t *testing.T) { } p := newProcessor(rdbClient, 10, defaultQueueConfig, delayFunc) p.handler = HandlerFunc(handler) - p.dequeueTimeout = time.Second // short time out for test purpose p.start() for _, msg := range tc.incoming {