2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Add logic to restore unfinished tasks back into the default queue if

there are any uncompleted tasks
This commit is contained in:
Ken Hibino 2019-11-23 15:09:50 -08:00
parent 4a327933bd
commit fd80126a67
3 changed files with 71 additions and 12 deletions

View File

@ -41,6 +41,9 @@ func (p *processor) terminate() {
} }
func (p *processor) start() { func (p *processor) start() {
// NOTE: The call to "restore" needs to complete before starting
// the processor goroutine.
p.restore()
go func() { go func() {
for { for {
select { select {
@ -92,3 +95,12 @@ func (p *processor) exec() {
} }
}(task) }(task)
} }
// restore moves all tasks from "in-progress" back to queue
// to restore all unfinished tasks.
func (p *processor) restore() {
err := p.rdb.moveAll(inProgress, defaultQueue)
if err != nil {
log.Printf("[SERVER ERROR] could not move tasks from %q to %q\n", inProgress, defaultQueue)
}
}

13
rdb.go
View File

@ -171,3 +171,16 @@ func (r *rdb) listQueues() []string {
} }
return queues return queues
} }
// moveAll moves all tasks from src list to dst list.
func (r *rdb) moveAll(src, dst string) error {
// TODO(hibiken): Lua script
txf := func(tx *redis.Tx) error {
length := tx.LLen(src).Val()
for i := 0; i < int(length); i++ {
tx.RPopLPush(src, dst)
}
return nil
}
return r.client.Watch(txf, src)
}

View File

@ -2,6 +2,7 @@ package asynq
import ( import (
"encoding/json" "encoding/json"
"math/rand"
"testing" "testing"
"time" "time"
@ -12,6 +13,10 @@ import (
var client *redis.Client var client *redis.Client
func init() {
rand.Seed(time.Now().UnixNano())
}
// setup connects to a redis database and flush all keys // setup connects to a redis database and flush all keys
// before returning an instance of rdb. // before returning an instance of rdb.
func setup() *rdb { func setup() *rdb {
@ -26,14 +31,18 @@ func setup() *rdb {
return newRDB(client) return newRDB(client)
} }
func randomTask(taskType, qname string) *taskMessage {
return &taskMessage{
ID: uuid.New(),
Type: taskType,
Queue: qname,
Retry: rand.Intn(100),
}
}
func TestPush(t *testing.T) { func TestPush(t *testing.T) {
r := setup() r := setup()
msg := &taskMessage{ msg := randomTask("send_email", "default")
ID: uuid.New(),
Type: "sendEmail",
Queue: "default",
Retry: 10,
}
err := r.push(msg) err := r.push(msg)
if err != nil { if err != nil {
@ -55,12 +64,7 @@ func TestPush(t *testing.T) {
func TestDequeueImmediateReturn(t *testing.T) { func TestDequeueImmediateReturn(t *testing.T) {
r := setup() r := setup()
msg := &taskMessage{ msg := randomTask("export_csv", "csv")
ID: uuid.New(),
Type: "GenerateCSVExport",
Queue: "csv",
Retry: 10,
}
r.push(msg) r.push(msg)
res, err := r.dequeue("asynq:queues:csv", time.Second) res, err := r.dequeue("asynq:queues:csv", time.Second)
@ -92,3 +96,33 @@ func TestDequeueTimeout(t *testing.T) {
t.Errorf("err = %v, want %v", err, errQueuePopTimeout) t.Errorf("err = %v, want %v", err, errQueuePopTimeout)
} }
} }
func TestMoveAll(t *testing.T) {
r := setup()
seed := []*taskMessage{
randomTask("send_email", "default"),
randomTask("export_csv", "csv"),
randomTask("sync_stuff", "sync"),
}
for _, task := range seed {
bytes, err := json.Marshal(task)
if err != nil {
t.Errorf("json.Marhsal() failed: %v", err)
}
if err := client.LPush(inProgress, string(bytes)).Err(); err != nil {
t.Errorf("LPUSH %q %s failed: %v", inProgress, string(bytes), err)
}
}
err := r.moveAll(inProgress, defaultQueue)
if err != nil {
t.Errorf("moveAll failed: %v", err)
}
if l := client.LLen(inProgress).Val(); l != 0 {
t.Errorf("LLEN %q = %d, want 0", inProgress, l)
}
if l := client.LLen(defaultQueue).Val(); int(l) != len(seed) {
t.Errorf("LLEN %q = %d, want %d", defaultQueue, l, len(seed))
}
}