diff --git a/CHANGELOG.md b/CHANGELOG.md index 1fc5ca1..8d1abac 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `ServeMux` type to make it easy for users to implement Handler interface. - `ErrorHandler` type was added. Allow users to specify error handling function (e.g. Report error to error reporting service such as Honeybadger, Bugsnag, etc) ## [0.5.0] - 2020-02-23 diff --git a/README.md b/README.md index ffa8b5e..4511057 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ First, make sure you are running a Redis server locally. $ redis-server ``` -To create and schedule tasks, use `Client` and provide a task and when to process the task. +To create and schedule tasks, use `Client` and provide a task and when to enqueue the task. ```go func main() { @@ -41,9 +41,9 @@ func main() { client := asynq.NewClient(r) // Create a task with task type and payload - t1 := asynq.NewTask("send_welcome_email", map[string]interface{}{"user_id": 42}) + t1 := asynq.NewTask("email:signup", map[string]interface{}{"user_id": 42}) - t2 := asynq.NewTask("send_reminder_email", map[string]interface{}{"user_id": 42}) + t2 := asynq.NewTask("email:reminder", map[string]interface{}{"user_id": 42}) // Process immediately err := client.Enqueue(t1) @@ -52,8 +52,8 @@ func main() { err = client.EnqueueIn(24*time.Hour, t2) // Process at specified time. - t := time.Date(2020, time.March, 6, 10, 0, 0, 0, time.UTC) - err = client.EnqueueAt(t, t2) + target := time.Date(2020, time.March, 6, 10, 0, 0, 0, time.UTC) + err = client.EnqueueAt(target, t2) // Pass options to specify processing behavior for a given task. // @@ -66,6 +66,21 @@ func main() { To start the background workers, use `Background` and provide your `Handler` to process the tasks. +`Handler` is an interface with one method `ProcessTask` with the following signature. + +```go +// ProcessTask should return nil if the processing of a task +// is successful. +// +// If ProcessTask return a non-nil error or panics, the task +// will be retried after delay. +type Handler interface { + ProcessTask(context.Context, *asynq.Task) error +} +``` + +You can optionally use `ServeMux` to create a handler, just as you would with `"net/http"` Handler. + ```go func main() { r := &asynq.RedisClientOpt{ @@ -84,20 +99,23 @@ func main() { // See the godoc for other configuration options }) - bg.Run(handler) + mux := asynq.NewServeMux() + mux.HandleFunc("email:signup", signupEmailHandler) + mux.HandleFunc("email:reminder", reminderEmailHandler) + // ...register other handlers... + + bg.Run(mux) } -``` -`Handler` is an interface with one method `ProcessTask` with the following signature. - -```go -// ProcessTask should return nil if the processing of a task -// is successful. -// -// If ProcessTask return a non-nil error or panics, the task -// will be retried after delay. -type Handler interface { - ProcessTask(context.Context, *asynq.Task) error +// function with the same signature as the ProcessTask method for the Handler interface. +func signupEmailHandler(ctx context.Context, t *asynq.Task) error { + id, err := t.Payload.GetInt("user_id") + if err != nil { + return err + } + fmt.Printf("Send welcome email to user %d\n", id) + // ...your email sending logic... + return nil } ``` diff --git a/asynq.go b/asynq.go index 52430e9..7d1b211 100644 --- a/asynq.go +++ b/asynq.go @@ -138,6 +138,6 @@ func createRedisClient(r RedisConnOpt) *redis.Client { TLSConfig: r.TLSConfig, }) default: - panic(fmt.Sprintf("unexpected type %T for RedisConnOpt", r)) + panic(fmt.Sprintf("asynq: unexpected type %T for RedisConnOpt", r)) } } diff --git a/servemux.go b/servemux.go new file mode 100644 index 0000000..f9e13b4 --- /dev/null +++ b/servemux.go @@ -0,0 +1,139 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package asynq + +import ( + "context" + "fmt" + "sort" + "strings" + "sync" +) + +// ServeMux is a multiplexer for asynchronous tasks. +// It matches the type of each task against a list of registered patterns +// and calls the handler for the pattern that most closely matches the +// taks's type name. +// +// Longer patterns take precedence over shorter ones, so that if there are +// handlers registered for both "images" and "images:thumbnails", +// the latter handler will be called for tasks with a type name beginning with +// "images:thumbnails" and the former will receive tasks with type name beginning +// with "images". +type ServeMux struct { + mu sync.RWMutex + m map[string]muxEntry + es []muxEntry // slice of entries sorted from longest to shortest. +} + +type muxEntry struct { + h Handler + pattern string +} + +// NewServeMux allocates and returns a new ServeMux. +func NewServeMux() *ServeMux { + return new(ServeMux) +} + +// ProcessTask dispatches the task to the handler whose +// pattern most closely matches the task type. +func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error { + h, _ := mux.Handler(task) + return h.ProcessTask(ctx, task) +} + +// Handler returns the handler to use for the given task. +// It always return a non-nil handler. +// +// Handler also returns the registered pattern that matches the task. +// +// If there is no registered handler that applies to the task, +// handler returns a 'not found' handler which returns an error. +func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string) { + mux.mu.RLock() + defer mux.mu.RUnlock() + + h, pattern = mux.match(t.Type) + if h == nil { + h, pattern = NotFoundHandler(), "" + } + return h, pattern +} + +// Find a handler on a handler map given a typename string. +// Most-specific (longest) pattern wins. +func (mux *ServeMux) match(typename string) (h Handler, pattern string) { + // Check for exact match first. + v, ok := mux.m[typename] + if ok { + return v.h, v.pattern + } + + // Check for longest valid match. + // mux.es contains all patterns from longest to shortest. + for _, e := range mux.es { + if strings.HasPrefix(typename, e.pattern) { + return e.h, e.pattern + } + } + return nil, "" + +} + +// Handle registers the handler for the given pattern. +// If a handler already exists for pattern, Handle panics. +func (mux *ServeMux) Handle(pattern string, handler Handler) { + mux.mu.Lock() + defer mux.mu.Unlock() + + if pattern == "" { + panic("asynq: invalid pattern") + } + if handler == nil { + panic("asynq: nil handler") + } + if _, exist := mux.m[pattern]; exist { + panic("asynq: multiple registrations for " + pattern) + } + + if mux.m == nil { + mux.m = make(map[string]muxEntry) + } + e := muxEntry{h: handler, pattern: pattern} + mux.m[pattern] = e + mux.es = appendSorted(mux.es, e) +} + +func appendSorted(es []muxEntry, e muxEntry) []muxEntry { + n := len(es) + i := sort.Search(n, func(i int) bool { + return len(es[i].pattern) < len(e.pattern) + }) + if i == n { + return append(es, e) + } + // we now know that i points at where we want to insert. + es = append(es, muxEntry{}) // try to grow the slice in place, any entry works. + copy(es[i+1:], es[i:]) // shift shorter entries down. + es[i] = e + return es +} + +// HandleFunc registers the handler function for the given pattern. +func (mux *ServeMux) HandleFunc(pattern string, handler func(context.Context, *Task) error) { + if handler == nil { + panic("asynq: nil handler") + } + mux.Handle(pattern, HandlerFunc(handler)) +} + +// NotFound returns an error indicating that the handler was not found for the given task. +func NotFound(ctx context.Context, task *Task) error { + return fmt.Errorf("handler not found for task %q", task.Type) +} + +// NotFoundHandler returns a simple task handler that returns a ``not found`` error. +func NotFoundHandler() Handler { return HandlerFunc(NotFound) } diff --git a/servemux_test.go b/servemux_test.go new file mode 100644 index 0000000..7b0c4cc --- /dev/null +++ b/servemux_test.go @@ -0,0 +1,116 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package asynq + +import ( + "context" + "testing" +) + +var called string + +// makeFakeHandler returns a handler that updates the global called variable +// to the given identity. +func makeFakeHandler(identity string) Handler { + return HandlerFunc(func(ctx context.Context, t *Task) error { + called = identity + return nil + }) +} + +// A list of pattern, handler pair that is registered with mux. +var serveMuxRegister = []struct { + pattern string + h Handler +}{ + {"email:", makeFakeHandler("default email handler")}, + {"email:signup", makeFakeHandler("signup email handler")}, + {"csv:export", makeFakeHandler("csv export handler")}, +} + +var serveMuxTests = []struct { + typename string // task's type name + want string // identifier of the handler that should be called +}{ + {"email:signup", "signup email handler"}, + {"csv:export", "csv export handler"}, + {"email:daily", "default email handler"}, +} + +func TestServeMux(t *testing.T) { + mux := NewServeMux() + for _, e := range serveMuxRegister { + mux.Handle(e.pattern, e.h) + } + + for _, tc := range serveMuxTests { + called = "" // reset to zero value + + task := NewTask(tc.typename, nil) + if err := mux.ProcessTask(context.Background(), task); err != nil { + t.Fatal(err) + } + + if called != tc.want { + t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type, tc.want) + } + } +} + +func TestServeMuxRegisterNilHandler(t *testing.T) { + defer func() { + if err := recover(); err == nil { + t.Error("expected call to mux.HandleFunc to panic") + } + }() + + mux := NewServeMux() + mux.HandleFunc("email:signup", nil) +} + +func TestServeMuxRegisterEmptyPattern(t *testing.T) { + defer func() { + if err := recover(); err == nil { + t.Error("expected call to mux.HandleFunc to panic") + } + }() + + mux := NewServeMux() + mux.Handle("", makeFakeHandler("email")) +} + +func TestServeMuxRegisterDuplicatePattern(t *testing.T) { + defer func() { + if err := recover(); err == nil { + t.Error("expected call to mux.HandleFunc to panic") + } + }() + + mux := NewServeMux() + mux.Handle("email", makeFakeHandler("email")) + mux.Handle("email", makeFakeHandler("email:default")) +} + +var notFoundTests = []struct { + typename string // task's type name +}{ + {"image:minimize"}, + {"csv:"}, // registered patterns match the task's type prefix, not the other way around. +} + +func TestServeMuxNotFound(t *testing.T) { + mux := NewServeMux() + for _, e := range serveMuxRegister { + mux.Handle(e.pattern, e.h) + } + + for _, tc := range notFoundTests { + task := NewTask(tc.typename, nil) + err := mux.ProcessTask(context.Background(), task) + if err == nil { + t.Errorf("ProcessTask did not return error for task %q, should return 'not found' error", task.Type) + } + } +}