diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index b8c0d5b..c6be7d3 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -197,12 +197,11 @@ func (r *RDB) schedule(zset string, processAt time.Time, msg *TaskMessage) error return nil } -const maxDeadTask = 100 -const deadExpirationInDays = 90 - -// Kill sends the taskMessage to "dead" set. -// It also trims the sorted set by timestamp and set size. +// Kill sends the task to "dead" set. +// It also trims the set by timestamp and set size. func (r *RDB) Kill(msg *TaskMessage) error { + const maxDeadTask = 10 + const deadExpirationInDays = 90 bytes, err := json.Marshal(msg) if err != nil { return fmt.Errorf("could not marshal %+v to json: %v", msg, err) @@ -217,8 +216,8 @@ func (r *RDB) Kill(msg *TaskMessage) error { return err } -// MoveAll moves all tasks from src list to dst list. -func (r *RDB) MoveAll(src, dst string) error { +// RestoreUnfinished moves all tasks from in-progress list to the queue. +func (r *RDB) RestoreUnfinished() error { script := redis.NewScript(` local len = redis.call("LLEN", KEYS[1]) for i = len, 1, -1 do @@ -226,7 +225,7 @@ func (r *RDB) MoveAll(src, dst string) error { end return len `) - _, err := script.Run(r.client, []string{src, dst}).Result() + _, err := script.Run(r.client, []string{InProgress, DefaultQueue}).Result() return err } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index cefa518..197dd7d 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -258,7 +258,7 @@ func TestKill(t *testing.T) { } } -func TestMoveAll(t *testing.T) { +func TestRestoreUnfinished(t *testing.T) { r := setup(t) t1 := randomTask("send_email", "default", nil) t2 := randomTask("export_csv", "csv", nil) @@ -305,8 +305,8 @@ func TestMoveAll(t *testing.T) { r.client.LPush(DefaultQueue, mustMarshal(t, msg)) } - if err := r.MoveAll(InProgress, DefaultQueue); err != nil { - t.Errorf("(*RDB).MoveAll(%q, %q) = %v, want nil", InProgress, DefaultQueue, err) + if err := r.RestoreUnfinished(); err != nil { + t.Errorf("(*RDB).RestoreUnfinished() = %v, want nil", err) continue } diff --git a/processor.go b/processor.go index 4e290d5..327792c 100644 --- a/processor.go +++ b/processor.go @@ -101,7 +101,7 @@ func (p *processor) exec() { // restore moves all tasks from "in-progress" back to queue // to restore all unfinished tasks. func (p *processor) restore() { - err := p.rdb.MoveAll(rdb.InProgress, rdb.DefaultQueue) + err := p.rdb.RestoreUnfinished() if err != nil { log.Printf("[ERROR] could not move tasks from %q to %q\n", rdb.InProgress, rdb.DefaultQueue) }