mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 23:32:17 +08:00
Rename (*rdb).lrem to (*rdb).remove
This commit is contained in:
parent
efaceb8a03
commit
1617278d86
@ -80,7 +80,7 @@ func (p *processor) exec() {
|
|||||||
// NOTE: This deferred anonymous function needs to take taskMessage as a value because
|
// NOTE: This deferred anonymous function needs to take taskMessage as a value because
|
||||||
// the message can be mutated by the time this function is called.
|
// the message can be mutated by the time this function is called.
|
||||||
defer func(msg taskMessage) {
|
defer func(msg taskMessage) {
|
||||||
if err := p.rdb.lrem(inProgress, &msg); err != nil {
|
if err := p.rdb.remove(inProgress, &msg); err != nil {
|
||||||
log.Printf("[ERROR] could not remove %+v from %q: %v\n", msg, inProgress, err)
|
log.Printf("[ERROR] could not remove %+v from %q: %v\n", msg, inProgress, err)
|
||||||
}
|
}
|
||||||
<-p.sema // release token
|
<-p.sema // release token
|
||||||
|
14
rdb.go
14
rdb.go
@ -75,7 +75,8 @@ func (r *rdb) dequeue(qname string, timeout time.Duration) (*taskMessage, error)
|
|||||||
return &msg, nil
|
return &msg, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *rdb) lrem(key string, msg *taskMessage) error {
|
// remove deletes all elements equal to msg from a redis list with the given key.
|
||||||
|
func (r *rdb) remove(key string, msg *taskMessage) error {
|
||||||
bytes, err := json.Marshal(msg)
|
bytes, err := json.Marshal(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
||||||
@ -122,17 +123,6 @@ func (r *rdb) kill(msg *taskMessage) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// listQueues returns the list of all queues.
|
|
||||||
// NOTE: Add default to the slice if empty because
|
|
||||||
// BLPOP will error out if empty list is passed.
|
|
||||||
func (r *rdb) listQueues() []string {
|
|
||||||
queues := r.client.SMembers(allQueues).Val()
|
|
||||||
if len(queues) == 0 {
|
|
||||||
queues = append(queues, queuePrefix+"default")
|
|
||||||
}
|
|
||||||
return queues
|
|
||||||
}
|
|
||||||
|
|
||||||
// moveAll moves all tasks from src list to dst list.
|
// moveAll moves all tasks from src list to dst list.
|
||||||
func (r *rdb) moveAll(src, dst string) error {
|
func (r *rdb) moveAll(src, dst string) error {
|
||||||
script := redis.NewScript(`
|
script := redis.NewScript(`
|
||||||
|
@ -68,6 +68,7 @@ func TestEnqueue(t *testing.T) {
|
|||||||
err := r.enqueue(tc.msg)
|
err := r.enqueue(tc.msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
res := r.client.LRange(defaultQueue, 0, -1).Val()
|
res := r.client.LRange(defaultQueue, 0, -1).Val()
|
||||||
if len(res) != 1 {
|
if len(res) != 1 {
|
||||||
|
Loading…
Reference in New Issue
Block a user