diff --git a/processor.go b/processor.go index 7bf0694..feffb2c 100644 --- a/processor.go +++ b/processor.go @@ -80,7 +80,7 @@ func (p *processor) exec() { // NOTE: This deferred anonymous function needs to take taskMessage as a value because // the message can be mutated by the time this function is called. defer func(msg taskMessage) { - if err := p.rdb.lrem(inProgress, &msg); err != nil { + if err := p.rdb.remove(inProgress, &msg); err != nil { log.Printf("[ERROR] could not remove %+v from %q: %v\n", msg, inProgress, err) } <-p.sema // release token diff --git a/rdb.go b/rdb.go index bd22df0..e5fc337 100644 --- a/rdb.go +++ b/rdb.go @@ -75,7 +75,8 @@ func (r *rdb) dequeue(qname string, timeout time.Duration) (*taskMessage, error) return &msg, nil } -func (r *rdb) lrem(key string, msg *taskMessage) error { +// remove deletes all elements equal to msg from a redis list with the given key. +func (r *rdb) remove(key string, msg *taskMessage) error { bytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) @@ -122,17 +123,6 @@ func (r *rdb) kill(msg *taskMessage) error { return err } -// listQueues returns the list of all queues. -// NOTE: Add default to the slice if empty because -// BLPOP will error out if empty list is passed. -func (r *rdb) listQueues() []string { - queues := r.client.SMembers(allQueues).Val() - if len(queues) == 0 { - queues = append(queues, queuePrefix+"default") - } - return queues -} - // moveAll moves all tasks from src list to dst list. func (r *rdb) moveAll(src, dst string) error { script := redis.NewScript(` diff --git a/rdb_test.go b/rdb_test.go index b5bf734..c769a4a 100644 --- a/rdb_test.go +++ b/rdb_test.go @@ -68,6 +68,7 @@ func TestEnqueue(t *testing.T) { err := r.enqueue(tc.msg) if err != nil { t.Error(err) + continue } res := r.client.LRange(defaultQueue, 0, -1).Val() if len(res) != 1 {