mirror of
https://github.com/hibiken/asynq.git
synced 2025-09-19 13:21:58 +08:00
Introduce Task Results
* Added Retention Option to specify retention TTL for tasks * Added ResultWriter as a client interface to write result data for the associated task
This commit is contained in:
@@ -14,11 +14,18 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
h "github.com/hibiken/asynq/internal/asynqtest"
|
||||
"github.com/hibiken/asynq/internal/base"
|
||||
"github.com/hibiken/asynq/internal/rdb"
|
||||
)
|
||||
|
||||
var taskCmpOpts = []cmp.Option{
|
||||
sortTaskOpt, // sort the tasks
|
||||
cmp.AllowUnexported(Task{}), // allow typename, payload fields to be compared
|
||||
cmpopts.IgnoreFields(Task{}, "opts", "w"), // ignore opts, w fields
|
||||
}
|
||||
|
||||
// fakeHeartbeater receives from starting and finished channels and do nothing.
|
||||
func fakeHeartbeater(starting <-chan *workerInfo, finished <-chan *base.TaskMessage, done <-chan struct{}) {
|
||||
for {
|
||||
@@ -42,6 +49,34 @@ func fakeSyncer(syncCh <-chan *syncRequest, done <-chan struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// Returns a processor instance configured for testing purpose.
|
||||
func newProcessorForTest(t *testing.T, r *rdb.RDB, h Handler) *processor {
|
||||
starting := make(chan *workerInfo)
|
||||
finished := make(chan *base.TaskMessage)
|
||||
syncCh := make(chan *syncRequest)
|
||||
done := make(chan struct{})
|
||||
t.Cleanup(func() { close(done) })
|
||||
go fakeHeartbeater(starting, finished, done)
|
||||
go fakeSyncer(syncCh, done)
|
||||
p := newProcessor(processorParams{
|
||||
logger: testLogger,
|
||||
broker: r,
|
||||
retryDelayFunc: DefaultRetryDelayFunc,
|
||||
isFailureFunc: defaultIsFailureFunc,
|
||||
syncCh: syncCh,
|
||||
cancelations: base.NewCancelations(),
|
||||
concurrency: 10,
|
||||
queues: defaultQueueConfig,
|
||||
strictPriority: false,
|
||||
errHandler: nil,
|
||||
shutdownTimeout: defaultShutdownTimeout,
|
||||
starting: starting,
|
||||
finished: finished,
|
||||
})
|
||||
p.handler = h
|
||||
return p
|
||||
}
|
||||
|
||||
func TestProcessorSuccessWithSingleQueue(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
@@ -87,29 +122,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
|
||||
processed = append(processed, task)
|
||||
return nil
|
||||
}
|
||||
starting := make(chan *workerInfo)
|
||||
finished := make(chan *base.TaskMessage)
|
||||
syncCh := make(chan *syncRequest)
|
||||
done := make(chan struct{})
|
||||
defer func() { close(done) }()
|
||||
go fakeHeartbeater(starting, finished, done)
|
||||
go fakeSyncer(syncCh, done)
|
||||
p := newProcessor(processorParams{
|
||||
logger: testLogger,
|
||||
broker: rdbClient,
|
||||
retryDelayFunc: DefaultRetryDelayFunc,
|
||||
isFailureFunc: defaultIsFailureFunc,
|
||||
syncCh: syncCh,
|
||||
cancelations: base.NewCancelations(),
|
||||
concurrency: 10,
|
||||
queues: defaultQueueConfig,
|
||||
strictPriority: false,
|
||||
errHandler: nil,
|
||||
shutdownTimeout: defaultShutdownTimeout,
|
||||
starting: starting,
|
||||
finished: finished,
|
||||
})
|
||||
p.handler = HandlerFunc(handler)
|
||||
p := newProcessorForTest(t, rdbClient, HandlerFunc(handler))
|
||||
|
||||
p.start(&sync.WaitGroup{})
|
||||
for _, msg := range tc.incoming {
|
||||
@@ -126,7 +139,7 @@ func TestProcessorSuccessWithSingleQueue(t *testing.T) {
|
||||
p.shutdown()
|
||||
|
||||
mu.Lock()
|
||||
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" {
|
||||
if diff := cmp.Diff(tc.wantProcessed, processed, taskCmpOpts...); diff != "" {
|
||||
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
|
||||
}
|
||||
mu.Unlock()
|
||||
@@ -180,33 +193,12 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
|
||||
processed = append(processed, task)
|
||||
return nil
|
||||
}
|
||||
starting := make(chan *workerInfo)
|
||||
finished := make(chan *base.TaskMessage)
|
||||
syncCh := make(chan *syncRequest)
|
||||
done := make(chan struct{})
|
||||
defer func() { close(done) }()
|
||||
go fakeHeartbeater(starting, finished, done)
|
||||
go fakeSyncer(syncCh, done)
|
||||
p := newProcessor(processorParams{
|
||||
logger: testLogger,
|
||||
broker: rdbClient,
|
||||
retryDelayFunc: DefaultRetryDelayFunc,
|
||||
isFailureFunc: defaultIsFailureFunc,
|
||||
syncCh: syncCh,
|
||||
cancelations: base.NewCancelations(),
|
||||
concurrency: 10,
|
||||
queues: map[string]int{
|
||||
"default": 2,
|
||||
"high": 3,
|
||||
"low": 1,
|
||||
},
|
||||
strictPriority: false,
|
||||
errHandler: nil,
|
||||
shutdownTimeout: defaultShutdownTimeout,
|
||||
starting: starting,
|
||||
finished: finished,
|
||||
})
|
||||
p.handler = HandlerFunc(handler)
|
||||
p := newProcessorForTest(t, rdbClient, HandlerFunc(handler))
|
||||
p.queueConfig = map[string]int{
|
||||
"default": 2,
|
||||
"high": 3,
|
||||
"low": 1,
|
||||
}
|
||||
|
||||
p.start(&sync.WaitGroup{})
|
||||
// Wait for two second to allow all pending tasks to be processed.
|
||||
@@ -220,7 +212,7 @@ func TestProcessorSuccessWithMultipleQueues(t *testing.T) {
|
||||
p.shutdown()
|
||||
|
||||
mu.Lock()
|
||||
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" {
|
||||
if diff := cmp.Diff(tc.wantProcessed, processed, taskCmpOpts...); diff != "" {
|
||||
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
|
||||
}
|
||||
mu.Unlock()
|
||||
@@ -267,29 +259,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
|
||||
processed = append(processed, task)
|
||||
return nil
|
||||
}
|
||||
starting := make(chan *workerInfo)
|
||||
finished := make(chan *base.TaskMessage)
|
||||
syncCh := make(chan *syncRequest)
|
||||
done := make(chan struct{})
|
||||
defer func() { close(done) }()
|
||||
go fakeHeartbeater(starting, finished, done)
|
||||
go fakeSyncer(syncCh, done)
|
||||
p := newProcessor(processorParams{
|
||||
logger: testLogger,
|
||||
broker: rdbClient,
|
||||
retryDelayFunc: DefaultRetryDelayFunc,
|
||||
isFailureFunc: defaultIsFailureFunc,
|
||||
syncCh: syncCh,
|
||||
cancelations: base.NewCancelations(),
|
||||
concurrency: 10,
|
||||
queues: defaultQueueConfig,
|
||||
strictPriority: false,
|
||||
errHandler: nil,
|
||||
shutdownTimeout: defaultShutdownTimeout,
|
||||
starting: starting,
|
||||
finished: finished,
|
||||
})
|
||||
p.handler = HandlerFunc(handler)
|
||||
p := newProcessorForTest(t, rdbClient, HandlerFunc(handler))
|
||||
|
||||
p.start(&sync.WaitGroup{})
|
||||
time.Sleep(2 * time.Second) // wait for two second to allow all pending tasks to be processed.
|
||||
@@ -299,7 +269,7 @@ func TestProcessTasksWithLargeNumberInPayload(t *testing.T) {
|
||||
p.shutdown()
|
||||
|
||||
mu.Lock()
|
||||
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" {
|
||||
if diff := cmp.Diff(tc.wantProcessed, processed, taskCmpOpts...); diff != "" {
|
||||
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
|
||||
}
|
||||
mu.Unlock()
|
||||
@@ -389,27 +359,9 @@ func TestProcessorRetry(t *testing.T) {
|
||||
defer mu.Unlock()
|
||||
n++
|
||||
}
|
||||
starting := make(chan *workerInfo)
|
||||
finished := make(chan *base.TaskMessage)
|
||||
done := make(chan struct{})
|
||||
defer func() { close(done) }()
|
||||
go fakeHeartbeater(starting, finished, done)
|
||||
p := newProcessor(processorParams{
|
||||
logger: testLogger,
|
||||
broker: rdbClient,
|
||||
retryDelayFunc: delayFunc,
|
||||
isFailureFunc: defaultIsFailureFunc,
|
||||
syncCh: nil,
|
||||
cancelations: base.NewCancelations(),
|
||||
concurrency: 10,
|
||||
queues: defaultQueueConfig,
|
||||
strictPriority: false,
|
||||
errHandler: ErrorHandlerFunc(errHandler),
|
||||
shutdownTimeout: defaultShutdownTimeout,
|
||||
starting: starting,
|
||||
finished: finished,
|
||||
})
|
||||
p.handler = tc.handler
|
||||
p := newProcessorForTest(t, rdbClient, tc.handler)
|
||||
p.errHandler = ErrorHandlerFunc(errHandler)
|
||||
p.retryDelayFunc = delayFunc
|
||||
|
||||
p.start(&sync.WaitGroup{})
|
||||
runTime := time.Now() // time when processor is running
|
||||
@@ -453,6 +405,81 @@ func TestProcessorRetry(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessorMarkAsComplete(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
rdbClient := rdb.NewRDB(r)
|
||||
|
||||
msg1 := h.NewTaskMessage("one", nil)
|
||||
msg2 := h.NewTaskMessage("two", nil)
|
||||
msg3 := h.NewTaskMessageWithQueue("three", nil, "custom")
|
||||
msg1.Retention = 3600
|
||||
msg3.Retention = 7200
|
||||
|
||||
handler := func(ctx context.Context, task *Task) error { return nil }
|
||||
|
||||
tests := []struct {
|
||||
pending map[string][]*base.TaskMessage
|
||||
completed map[string][]base.Z
|
||||
queueCfg map[string]int
|
||||
wantPending map[string][]*base.TaskMessage
|
||||
wantCompleted func(completedAt time.Time) map[string][]base.Z
|
||||
}{
|
||||
{
|
||||
pending: map[string][]*base.TaskMessage{
|
||||
"default": {msg1, msg2},
|
||||
"custom": {msg3},
|
||||
},
|
||||
completed: map[string][]base.Z{
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
queueCfg: map[string]int{
|
||||
"default": 1,
|
||||
"custom": 1,
|
||||
},
|
||||
wantPending: map[string][]*base.TaskMessage{
|
||||
"default": {},
|
||||
"custom": {},
|
||||
},
|
||||
wantCompleted: func(completedAt time.Time) map[string][]base.Z {
|
||||
return map[string][]base.Z{
|
||||
"default": {{Message: h.TaskMessageWithCompletedAt(*msg1, completedAt), Score: completedAt.Unix() + msg1.Retention}},
|
||||
"custom": {{Message: h.TaskMessageWithCompletedAt(*msg3, completedAt), Score: completedAt.Unix() + msg3.Retention}},
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r)
|
||||
h.SeedAllPendingQueues(t, r, tc.pending)
|
||||
h.SeedAllCompletedQueues(t, r, tc.completed)
|
||||
|
||||
p := newProcessorForTest(t, rdbClient, HandlerFunc(handler))
|
||||
p.queueConfig = tc.queueCfg
|
||||
|
||||
p.start(&sync.WaitGroup{})
|
||||
runTime := time.Now() // time when processor is running
|
||||
time.Sleep(2 * time.Second)
|
||||
p.shutdown()
|
||||
|
||||
for qname, want := range tc.wantPending {
|
||||
gotPending := h.GetPendingMessages(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotPending, cmpopts.EquateEmpty()); diff != "" {
|
||||
t.Errorf("diff found in %q pending set; want=%v, got=%v\n%s", qname, want, gotPending, diff)
|
||||
}
|
||||
}
|
||||
|
||||
for qname, want := range tc.wantCompleted(runTime) {
|
||||
gotCompleted := h.GetCompletedEntries(t, r, qname)
|
||||
if diff := cmp.Diff(want, gotCompleted, cmpopts.EquateEmpty()); diff != "" {
|
||||
t.Errorf("diff found in %q completed set; want=%v, got=%v\n%s", qname, want, gotCompleted, diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessorQueues(t *testing.T) {
|
||||
sortOpt := cmp.Transformer("SortStrings", func(in []string) []string {
|
||||
out := append([]string(nil), in...) // Copy input to avoid mutating it
|
||||
@@ -481,26 +508,10 @@ func TestProcessorQueues(t *testing.T) {
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
starting := make(chan *workerInfo)
|
||||
finished := make(chan *base.TaskMessage)
|
||||
done := make(chan struct{})
|
||||
defer func() { close(done) }()
|
||||
go fakeHeartbeater(starting, finished, done)
|
||||
p := newProcessor(processorParams{
|
||||
logger: testLogger,
|
||||
broker: nil,
|
||||
retryDelayFunc: DefaultRetryDelayFunc,
|
||||
isFailureFunc: defaultIsFailureFunc,
|
||||
syncCh: nil,
|
||||
cancelations: base.NewCancelations(),
|
||||
concurrency: 10,
|
||||
queues: tc.queueCfg,
|
||||
strictPriority: false,
|
||||
errHandler: nil,
|
||||
shutdownTimeout: defaultShutdownTimeout,
|
||||
starting: starting,
|
||||
finished: finished,
|
||||
})
|
||||
// Note: rdb and handler not needed for this test.
|
||||
p := newProcessorForTest(t, nil, nil)
|
||||
p.queueConfig = tc.queueCfg
|
||||
|
||||
got := p.queues()
|
||||
if diff := cmp.Diff(tc.want, got, sortOpt); diff != "" {
|
||||
t.Errorf("with queue config: %v\n(*processor).queues() = %v, want %v\n(-want,+got):\n%s",
|
||||
@@ -605,7 +616,7 @@ func TestProcessorWithStrictPriority(t *testing.T) {
|
||||
}
|
||||
p.shutdown()
|
||||
|
||||
if diff := cmp.Diff(tc.wantProcessed, processed, sortTaskOpt, cmp.AllowUnexported(Task{})); diff != "" {
|
||||
if diff := cmp.Diff(tc.wantProcessed, processed, taskCmpOpts...); diff != "" {
|
||||
t.Errorf("mismatch found in processed tasks; (-want, +got)\n%s", diff)
|
||||
}
|
||||
|
||||
@@ -644,12 +655,9 @@ func TestProcessorPerform(t *testing.T) {
|
||||
wantErr: true,
|
||||
},
|
||||
}
|
||||
// Note: We don't need to fully initialize the processor since we are only testing
|
||||
// Note: We don't need to fully initialized the processor since we are only testing
|
||||
// perform method.
|
||||
p := newProcessor(processorParams{
|
||||
logger: testLogger,
|
||||
queues: defaultQueueConfig,
|
||||
})
|
||||
p := newProcessorForTest(t, nil, nil)
|
||||
|
||||
for _, tc := range tests {
|
||||
p.handler = tc.handler
|
||||
|
Reference in New Issue
Block a user