2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Rename MoveAll to RestoreUnfinished

This commit is contained in:
Ken Hibino 2019-12-04 06:50:52 -08:00
parent 4531e90b9d
commit 318b24b3b8
3 changed files with 11 additions and 12 deletions

View File

@ -197,12 +197,11 @@ func (r *RDB) schedule(zset string, processAt time.Time, msg *TaskMessage) error
return nil return nil
} }
const maxDeadTask = 100 // Kill sends the task to "dead" set.
const deadExpirationInDays = 90 // It also trims the set by timestamp and set size.
// Kill sends the taskMessage to "dead" set.
// It also trims the sorted set by timestamp and set size.
func (r *RDB) Kill(msg *TaskMessage) error { func (r *RDB) Kill(msg *TaskMessage) error {
const maxDeadTask = 10
const deadExpirationInDays = 90
bytes, err := json.Marshal(msg) bytes, err := json.Marshal(msg)
if err != nil { if err != nil {
return fmt.Errorf("could not marshal %+v to json: %v", msg, err) return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
@ -217,8 +216,8 @@ func (r *RDB) Kill(msg *TaskMessage) error {
return err return err
} }
// MoveAll moves all tasks from src list to dst list. // RestoreUnfinished moves all tasks from in-progress list to the queue.
func (r *RDB) MoveAll(src, dst string) error { func (r *RDB) RestoreUnfinished() error {
script := redis.NewScript(` script := redis.NewScript(`
local len = redis.call("LLEN", KEYS[1]) local len = redis.call("LLEN", KEYS[1])
for i = len, 1, -1 do for i = len, 1, -1 do
@ -226,7 +225,7 @@ func (r *RDB) MoveAll(src, dst string) error {
end end
return len return len
`) `)
_, err := script.Run(r.client, []string{src, dst}).Result() _, err := script.Run(r.client, []string{InProgress, DefaultQueue}).Result()
return err return err
} }

View File

@ -258,7 +258,7 @@ func TestKill(t *testing.T) {
} }
} }
func TestMoveAll(t *testing.T) { func TestRestoreUnfinished(t *testing.T) {
r := setup(t) r := setup(t)
t1 := randomTask("send_email", "default", nil) t1 := randomTask("send_email", "default", nil)
t2 := randomTask("export_csv", "csv", nil) t2 := randomTask("export_csv", "csv", nil)
@ -305,8 +305,8 @@ func TestMoveAll(t *testing.T) {
r.client.LPush(DefaultQueue, mustMarshal(t, msg)) r.client.LPush(DefaultQueue, mustMarshal(t, msg))
} }
if err := r.MoveAll(InProgress, DefaultQueue); err != nil { if err := r.RestoreUnfinished(); err != nil {
t.Errorf("(*RDB).MoveAll(%q, %q) = %v, want nil", InProgress, DefaultQueue, err) t.Errorf("(*RDB).RestoreUnfinished() = %v, want nil", err)
continue continue
} }

View File

@ -101,7 +101,7 @@ func (p *processor) exec() {
// restore moves all tasks from "in-progress" back to queue // restore moves all tasks from "in-progress" back to queue
// to restore all unfinished tasks. // to restore all unfinished tasks.
func (p *processor) restore() { func (p *processor) restore() {
err := p.rdb.MoveAll(rdb.InProgress, rdb.DefaultQueue) err := p.rdb.RestoreUnfinished()
if err != nil { if err != nil {
log.Printf("[ERROR] could not move tasks from %q to %q\n", rdb.InProgress, rdb.DefaultQueue) log.Printf("[ERROR] could not move tasks from %q to %q\n", rdb.InProgress, rdb.DefaultQueue)
} }