diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 5bf1f2f..48be550 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -159,16 +159,16 @@ func (r *RDB) Dequeue(timeout time.Duration) (*TaskMessage, error) { return &msg, nil } -// Remove deletes all elements equal to msg from a redis list with the given key. -func (r *RDB) Remove(key string, msg *TaskMessage) error { +// Done removes the task from in-progress queue to mark the task as done. +func (r *RDB) Done(msg *TaskMessage) error { bytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) } // NOTE: count ZERO means "remove all elements equal to val" - err = r.client.LRem(key, 0, string(bytes)).Err() + err = r.client.LRem(InProgress, 0, string(bytes)).Err() if err != nil { - return fmt.Errorf("command `LREM %s 0 %s` failed: %v", key, string(bytes), err) + return fmt.Errorf("command `LREM %s 0 %s` failed: %v", InProgress, string(bytes), err) } return nil } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index cdfc7b6..549e4de 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -154,15 +154,15 @@ func TestDequeue(t *testing.T) { } } -func TestRemove(t *testing.T) { +func TestDone(t *testing.T) { r := setup(t) t1 := randomTask("send_email", "default", nil) t2 := randomTask("export_csv", "csv", nil) tests := []struct { - initial []*TaskMessage // initial state of the list + initial []*TaskMessage // initial state of the in-progress list target *TaskMessage // task to remove - final []*TaskMessage // final state of the list + final []*TaskMessage // final state of the in-progress list }{ { initial: []*TaskMessage{t1, t2}, @@ -188,20 +188,20 @@ func TestRemove(t *testing.T) { } // set up initial state for _, task := range tc.initial { - err := r.client.LPush(DefaultQueue, mustMarshal(t, task)).Err() + err := r.client.LPush(InProgress, mustMarshal(t, task)).Err() if err != nil { t.Fatal(err) } } - err := r.Remove(DefaultQueue, tc.target) + err := r.Done(tc.target) if err != nil { t.Error(err) continue } var got []*TaskMessage - data := r.client.LRange(DefaultQueue, 0, -1).Val() + data := r.client.LRange(InProgress, 0, -1).Val() for _, s := range data { got = append(got, mustUnmarshal(t, s)) } diff --git a/processor.go b/processor.go index eae667e..4e290d5 100644 --- a/processor.go +++ b/processor.go @@ -86,7 +86,7 @@ func (p *processor) exec() { // NOTE: This deferred anonymous function needs to take taskMessage as a value because // the message can be mutated by the time this function is called. defer func(msg rdb.TaskMessage) { - if err := p.rdb.Remove(rdb.InProgress, &msg); err != nil { + if err := p.rdb.Done(&msg); err != nil { log.Printf("[ERROR] could not remove %+v from %q: %v\n", msg, rdb.InProgress, err) } <-p.sema // release token