mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-23 10:16:12 +08:00 
			
		
		
		
	Change Handler interface to take context.Context
This commit is contained in:
		| @@ -5,6 +5,7 @@ | |||||||
| package asynq | package asynq | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"math" | 	"math" | ||||||
| 	"math/rand" | 	"math/rand" | ||||||
| @@ -141,18 +142,18 @@ func NewBackground(r RedisConnOpt, cfg *Config) *Background { | |||||||
| // If ProcessTask return a non-nil error or panics, the task | // If ProcessTask return a non-nil error or panics, the task | ||||||
| // will be retried after delay. | // will be retried after delay. | ||||||
| type Handler interface { | type Handler interface { | ||||||
| 	ProcessTask(*Task) error | 	ProcessTask(context.Context, *Task) error | ||||||
| } | } | ||||||
|  |  | ||||||
| // The HandlerFunc type is an adapter to allow the use of | // The HandlerFunc type is an adapter to allow the use of | ||||||
| // ordinary functions as a Handler. If f is a function | // ordinary functions as a Handler. If f is a function | ||||||
| // with the appropriate signature, HandlerFunc(f) is a | // with the appropriate signature, HandlerFunc(f) is a | ||||||
| // Handler that calls f. | // Handler that calls f. | ||||||
| type HandlerFunc func(*Task) error | type HandlerFunc func(context.Context, *Task) error | ||||||
|  |  | ||||||
| // ProcessTask calls fn(task) | // ProcessTask calls fn(ctx, task) | ||||||
| func (fn HandlerFunc) ProcessTask(task *Task) error { | func (fn HandlerFunc) ProcessTask(ctx context.Context, task *Task) error { | ||||||
| 	return fn(task) | 	return fn(ctx, task) | ||||||
| } | } | ||||||
|  |  | ||||||
| // Run starts the background-task processing and blocks until | // Run starts the background-task processing and blocks until | ||||||
|   | |||||||
| @@ -5,6 +5,7 @@ | |||||||
| package asynq | package asynq | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"context" | ||||||
| 	"testing" | 	"testing" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| @@ -27,7 +28,7 @@ func TestBackground(t *testing.T) { | |||||||
| 	}) | 	}) | ||||||
|  |  | ||||||
| 	// no-op handler | 	// no-op handler | ||||||
| 	h := func(task *Task) error { | 	h := func(ctx context.Context, task *Task) error { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|   | |||||||
| @@ -5,6 +5,7 @@ | |||||||
| package asynq | package asynq | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"math/rand" | 	"math/rand" | ||||||
| 	"sync" | 	"sync" | ||||||
| @@ -37,7 +38,7 @@ func BenchmarkEndToEndSimple(b *testing.B) { | |||||||
|  |  | ||||||
| 		var wg sync.WaitGroup | 		var wg sync.WaitGroup | ||||||
| 		wg.Add(count) | 		wg.Add(count) | ||||||
| 		handler := func(t *Task) error { | 		handler := func(ctx context.Context, t *Task) error { | ||||||
| 			wg.Done() | 			wg.Done() | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
| @@ -82,7 +83,7 @@ func BenchmarkEndToEnd(b *testing.B) { | |||||||
|  |  | ||||||
| 		var wg sync.WaitGroup | 		var wg sync.WaitGroup | ||||||
| 		wg.Add(count * 2) | 		wg.Add(count * 2) | ||||||
| 		handler := func(t *Task) error { | 		handler := func(ctx context.Context, t *Task) error { | ||||||
| 			// randomly fail 1% of tasks | 			// randomly fail 1% of tasks | ||||||
| 			if rand.Intn(100) == 1 { | 			if rand.Intn(100) == 1 { | ||||||
| 				return fmt.Errorf(":(") | 				return fmt.Errorf(":(") | ||||||
| @@ -141,7 +142,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { | |||||||
|  |  | ||||||
| 		var wg sync.WaitGroup | 		var wg sync.WaitGroup | ||||||
| 		wg.Add(highCount + defaultCount + lowCount) | 		wg.Add(highCount + defaultCount + lowCount) | ||||||
| 		handler := func(t *Task) error { | 		handler := func(ctx context.Context, t *Task) error { | ||||||
| 			wg.Done() | 			wg.Done() | ||||||
| 			return nil | 			return nil | ||||||
| 		} | 		} | ||||||
|   | |||||||
							
								
								
									
										11
									
								
								processor.go
									
									
									
									
									
								
							
							
						
						
									
										11
									
								
								processor.go
									
									
									
									
									
								
							| @@ -5,6 +5,7 @@ | |||||||
| package asynq | package asynq | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"math/rand" | 	"math/rand" | ||||||
| 	"sort" | 	"sort" | ||||||
| @@ -73,7 +74,7 @@ func newProcessor(r *rdb.RDB, pinfo *base.ProcessInfo, fn retryDelayFunc, syncRe | |||||||
| 		done:           make(chan struct{}), | 		done:           make(chan struct{}), | ||||||
| 		abort:          make(chan struct{}), | 		abort:          make(chan struct{}), | ||||||
| 		quit:           make(chan struct{}), | 		quit:           make(chan struct{}), | ||||||
| 		handler:        HandlerFunc(func(t *Task) error { return fmt.Errorf("handler not set") }), | 		handler:        HandlerFunc(func(ctx context.Context, t *Task) error { return fmt.Errorf("handler not set") }), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -160,8 +161,10 @@ func (p *processor) exec() { | |||||||
|  |  | ||||||
| 			resCh := make(chan error, 1) | 			resCh := make(chan error, 1) | ||||||
| 			task := NewTask(msg.Type, msg.Payload) | 			task := NewTask(msg.Type, msg.Payload) | ||||||
|  | 			// TODO: Set timeout if provided | ||||||
|  | 			ctx := context.Background() | ||||||
| 			go func() { | 			go func() { | ||||||
| 				resCh <- perform(p.handler, task) | 				resCh <- perform(ctx, task, p.handler) | ||||||
| 			}() | 			}() | ||||||
|  |  | ||||||
| 			select { | 			select { | ||||||
| @@ -282,13 +285,13 @@ func (p *processor) queues() []string { | |||||||
| // perform calls the handler with the given task. | // perform calls the handler with the given task. | ||||||
| // If the call returns without panic, it simply returns the value, | // If the call returns without panic, it simply returns the value, | ||||||
| // otherwise, it recovers from panic and returns an error. | // otherwise, it recovers from panic and returns an error. | ||||||
| func perform(h Handler, task *Task) (err error) { | func perform(ctx context.Context, task *Task, h Handler) (err error) { | ||||||
| 	defer func() { | 	defer func() { | ||||||
| 		if x := recover(); x != nil { | 		if x := recover(); x != nil { | ||||||
| 			err = fmt.Errorf("panic: %v", x) | 			err = fmt.Errorf("panic: %v", x) | ||||||
| 		} | 		} | ||||||
| 	}() | 	}() | ||||||
| 	return h.ProcessTask(task) | 	return h.ProcessTask(ctx, task) | ||||||
| } | } | ||||||
|  |  | ||||||
| // uniq dedupes elements and returns a slice of unique names of length l. | // uniq dedupes elements and returns a slice of unique names of length l. | ||||||
|   | |||||||
| @@ -5,6 +5,7 @@ | |||||||
| package asynq | package asynq | ||||||
|  |  | ||||||
| import ( | import ( | ||||||
|  | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"sort" | 	"sort" | ||||||
| 	"sync" | 	"sync" | ||||||
| @@ -59,7 +60,7 @@ func TestProcessorSuccess(t *testing.T) { | |||||||
| 		// instantiate a new processor | 		// instantiate a new processor | ||||||
| 		var mu sync.Mutex | 		var mu sync.Mutex | ||||||
| 		var processed []*Task | 		var processed []*Task | ||||||
| 		handler := func(task *Task) error { | 		handler := func(ctx context.Context, task *Task) error { | ||||||
| 			mu.Lock() | 			mu.Lock() | ||||||
| 			defer mu.Unlock() | 			defer mu.Unlock() | ||||||
| 			processed = append(processed, task) | 			processed = append(processed, task) | ||||||
| @@ -146,7 +147,7 @@ func TestProcessorRetry(t *testing.T) { | |||||||
| 		delayFunc := func(n int, e error, t *Task) time.Duration { | 		delayFunc := func(n int, e error, t *Task) time.Duration { | ||||||
| 			return tc.delay | 			return tc.delay | ||||||
| 		} | 		} | ||||||
| 		handler := func(task *Task) error { | 		handler := func(ctx context.Context, task *Task) error { | ||||||
| 			return fmt.Errorf(errMsg) | 			return fmt.Errorf(errMsg) | ||||||
| 		} | 		} | ||||||
| 		pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false) | 		pi := base.NewProcessInfo("localhost", 1234, 10, defaultQueueConfig, false) | ||||||
| @@ -264,7 +265,7 @@ func TestProcessorWithStrictPriority(t *testing.T) { | |||||||
| 		// instantiate a new processor | 		// instantiate a new processor | ||||||
| 		var mu sync.Mutex | 		var mu sync.Mutex | ||||||
| 		var processed []*Task | 		var processed []*Task | ||||||
| 		handler := func(task *Task) error { | 		handler := func(ctx context.Context, task *Task) error { | ||||||
| 			mu.Lock() | 			mu.Lock() | ||||||
| 			defer mu.Unlock() | 			defer mu.Unlock() | ||||||
| 			processed = append(processed, task) | 			processed = append(processed, task) | ||||||
| @@ -303,7 +304,7 @@ func TestPerform(t *testing.T) { | |||||||
| 	}{ | 	}{ | ||||||
| 		{ | 		{ | ||||||
| 			desc: "handler returns nil", | 			desc: "handler returns nil", | ||||||
| 			handler: func(t *Task) error { | 			handler: func(ctx context.Context, t *Task) error { | ||||||
| 				return nil | 				return nil | ||||||
| 			}, | 			}, | ||||||
| 			task:    NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), | 			task:    NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), | ||||||
| @@ -311,7 +312,7 @@ func TestPerform(t *testing.T) { | |||||||
| 		}, | 		}, | ||||||
| 		{ | 		{ | ||||||
| 			desc: "handler returns error", | 			desc: "handler returns error", | ||||||
| 			handler: func(t *Task) error { | 			handler: func(ctx context.Context, t *Task) error { | ||||||
| 				return fmt.Errorf("something went wrong") | 				return fmt.Errorf("something went wrong") | ||||||
| 			}, | 			}, | ||||||
| 			task:    NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), | 			task:    NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), | ||||||
| @@ -319,7 +320,7 @@ func TestPerform(t *testing.T) { | |||||||
| 		}, | 		}, | ||||||
| 		{ | 		{ | ||||||
| 			desc: "handler panics", | 			desc: "handler panics", | ||||||
| 			handler: func(t *Task) error { | 			handler: func(ctx context.Context, t *Task) error { | ||||||
| 				panic("something went terribly wrong") | 				panic("something went terribly wrong") | ||||||
| 			}, | 			}, | ||||||
| 			task:    NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), | 			task:    NewTask("gen_thumbnail", map[string]interface{}{"src": "some/img/path"}), | ||||||
| @@ -328,7 +329,7 @@ func TestPerform(t *testing.T) { | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	for _, tc := range tests { | 	for _, tc := range tests { | ||||||
| 		got := perform(tc.handler, tc.task) | 		got := perform(context.Background(), tc.task, tc.handler) | ||||||
| 		if !tc.wantErr && got != nil { | 		if !tc.wantErr && got != nil { | ||||||
| 			t.Errorf("%s: perform() = %v, want nil", tc.desc, got) | 			t.Errorf("%s: perform() = %v, want nil", tc.desc, got) | ||||||
| 			continue | 			continue | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user