2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00
asynq/poller_test.go
Ken Hibino 989b2b6d55 Add timeout to worker goroutines when TERM signal is received
Wait for a certain amount of time to allow for worker goroutines to
finish. If the goroutines don't finish with the timeout duration,
processor will quit the goroutines and restore any unfinished tasks from
the in_progress queue back to the default queue.
2019-12-15 21:00:09 -08:00

117 lines
3.4 KiB
Go

package asynq
import (
"testing"
"time"
"github.com/go-redis/redis/v7"
"github.com/google/go-cmp/cmp"
"github.com/hibiken/asynq/internal/rdb"
)
func TestPoller(t *testing.T) {
type scheduledTask struct {
msg *rdb.TaskMessage
processAt time.Time
}
r := setup(t)
rdbClient := rdb.NewRDB(r)
const pollInterval = time.Second
p := newPoller(rdbClient, pollInterval)
t1 := randomTask("gen_thumbnail", "default", nil)
t2 := randomTask("send_email", "default", nil)
t3 := randomTask("reindex", "default", nil)
t4 := randomTask("sync", "default", nil)
tests := []struct {
initScheduled []scheduledTask // scheduled queue initial state
initRetry []scheduledTask // retry queue initial state
initQueue []*rdb.TaskMessage // default queue initial state
wait time.Duration // wait duration before checking for final state
wantScheduled []*rdb.TaskMessage // schedule queue final state
wantRetry []*rdb.TaskMessage // retry queue final state
wantQueue []*rdb.TaskMessage // default queue final state
}{
{
initScheduled: []scheduledTask{
{t1, time.Now().Add(time.Hour)},
{t2, time.Now().Add(-2 * time.Second)},
},
initRetry: []scheduledTask{
{t3, time.Now().Add(-500 * time.Millisecond)},
},
initQueue: []*rdb.TaskMessage{t4},
wait: pollInterval * 2,
wantScheduled: []*rdb.TaskMessage{t1},
wantRetry: []*rdb.TaskMessage{},
wantQueue: []*rdb.TaskMessage{t2, t3, t4},
},
{
initScheduled: []scheduledTask{
{t1, time.Now()},
{t2, time.Now().Add(-2 * time.Second)},
{t3, time.Now().Add(-500 * time.Millisecond)},
},
initRetry: []scheduledTask{},
initQueue: []*rdb.TaskMessage{t4},
wait: pollInterval * 2,
wantScheduled: []*rdb.TaskMessage{},
wantRetry: []*rdb.TaskMessage{},
wantQueue: []*rdb.TaskMessage{t1, t2, t3, t4},
},
}
for _, tc := range tests {
// clean up db before each test case.
if err := r.FlushDB().Err(); err != nil {
t.Fatal(err)
}
// initialize scheduled queue
for _, st := range tc.initScheduled {
err := rdbClient.Schedule(st.msg, st.processAt)
if err != nil {
t.Fatal(err)
}
}
// initialize retry queue
for _, st := range tc.initRetry {
err := r.ZAdd(retryQ, &redis.Z{
Member: mustMarshal(t, st.msg),
Score: float64(st.processAt.Unix()),
}).Err()
if err != nil {
t.Fatal(err)
}
}
// initialize default queue
for _, msg := range tc.initQueue {
err := rdbClient.Enqueue(msg)
if err != nil {
t.Fatal(err)
}
}
p.start()
time.Sleep(tc.wait)
p.terminate()
gotScheduledRaw := r.ZRange(scheduledQ, 0, -1).Val()
gotScheduled := mustUnmarshalSlice(t, gotScheduledRaw)
if diff := cmp.Diff(tc.wantScheduled, gotScheduled, sortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", scheduledQ, diff)
}
gotRetryRaw := r.ZRange(retryQ, 0, -1).Val()
gotRetry := mustUnmarshalSlice(t, gotRetryRaw)
if diff := cmp.Diff(tc.wantRetry, gotRetry, sortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", retryQ, diff)
}
gotQueueRaw := r.LRange(defaultQ, 0, -1).Val()
gotQueue := mustUnmarshalSlice(t, gotQueueRaw)
if diff := cmp.Diff(tc.wantQueue, gotQueue, sortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running poller: (-want, +got)\n%s", defaultQ, diff)
}
}
}