2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-25 23:32:17 +08:00

Remove stale field in processor struct

This commit is contained in:
Ken Hibino 2020-01-08 20:50:22 -08:00
parent 718336ff44
commit 390eb13149
3 changed files with 1 additions and 9 deletions

View File

@ -52,7 +52,7 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error {
} }
// Dequeue queries given queues in order and pops a task message if there // Dequeue queries given queues in order and pops a task message if there
// is one and returns it. If all queues are empty, ErrNoProcessableTask // is one and returns it. If all queues are empty, ErrNoProcessableTask
// error is returned. // error is returned.
func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) { func (r *RDB) Dequeue(qnames ...string) (*base.TaskMessage, error) {
var keys []string var keys []string

View File

@ -24,11 +24,6 @@ type processor struct {
retryDelayFunc retryDelayFunc retryDelayFunc retryDelayFunc
// timeout for blocking dequeue operation.
// dequeue needs to timeout to avoid blocking forever
// in case of a program shutdown or additon of a new queue.
dequeueTimeout time.Duration
// sema is a counting semaphore to ensure the number of active workers // sema is a counting semaphore to ensure the number of active workers
// does not exceed the limit. // does not exceed the limit.
sema chan struct{} sema chan struct{}
@ -52,7 +47,6 @@ func newProcessor(r *rdb.RDB, n int, qcfg map[string]uint, fn retryDelayFunc) *p
rdb: r, rdb: r,
queueConfig: qcfg, queueConfig: qcfg,
retryDelayFunc: fn, retryDelayFunc: fn,
dequeueTimeout: 2 * time.Second,
sema: make(chan struct{}, n), sema: make(chan struct{}, n),
done: make(chan struct{}), done: make(chan struct{}),
abort: make(chan struct{}), abort: make(chan struct{}),

View File

@ -66,7 +66,6 @@ func TestProcessorSuccess(t *testing.T) {
} }
p := newProcessor(rdbClient, 10, defaultQueueConfig, defaultDelayFunc) p := newProcessor(rdbClient, 10, defaultQueueConfig, defaultDelayFunc)
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
p.dequeueTimeout = time.Second // short time out for test purpose
p.start() p.start()
for _, msg := range tc.incoming { for _, msg := range tc.incoming {
@ -150,7 +149,6 @@ func TestProcessorRetry(t *testing.T) {
} }
p := newProcessor(rdbClient, 10, defaultQueueConfig, delayFunc) p := newProcessor(rdbClient, 10, defaultQueueConfig, delayFunc)
p.handler = HandlerFunc(handler) p.handler = HandlerFunc(handler)
p.dequeueTimeout = time.Second // short time out for test purpose
p.start() p.start()
for _, msg := range tc.incoming { for _, msg := range tc.incoming {