mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-24 23:02:18 +08:00
Add ServeMux type
Allow user to use ServeMux type to be used as a Handler. ServeMux API is design to be similar to net/http.ServeMux API.
This commit is contained in:
parent
897ab4e28b
commit
742ed6546f
@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
### Added
|
### Added
|
||||||
|
|
||||||
|
- Added `ServeMux` type to make it easy for users to implement Handler interface.
|
||||||
- `ErrorHandler` type was added. Allow users to specify error handling function (e.g. Report error to error reporting service such as Honeybadger, Bugsnag, etc)
|
- `ErrorHandler` type was added. Allow users to specify error handling function (e.g. Report error to error reporting service such as Honeybadger, Bugsnag, etc)
|
||||||
|
|
||||||
## [0.5.0] - 2020-02-23
|
## [0.5.0] - 2020-02-23
|
||||||
|
52
README.md
52
README.md
@ -30,7 +30,7 @@ First, make sure you are running a Redis server locally.
|
|||||||
$ redis-server
|
$ redis-server
|
||||||
```
|
```
|
||||||
|
|
||||||
To create and schedule tasks, use `Client` and provide a task and when to process the task.
|
To create and schedule tasks, use `Client` and provide a task and when to enqueue the task.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
func main() {
|
func main() {
|
||||||
@ -41,9 +41,9 @@ func main() {
|
|||||||
client := asynq.NewClient(r)
|
client := asynq.NewClient(r)
|
||||||
|
|
||||||
// Create a task with task type and payload
|
// Create a task with task type and payload
|
||||||
t1 := asynq.NewTask("send_welcome_email", map[string]interface{}{"user_id": 42})
|
t1 := asynq.NewTask("email:signup", map[string]interface{}{"user_id": 42})
|
||||||
|
|
||||||
t2 := asynq.NewTask("send_reminder_email", map[string]interface{}{"user_id": 42})
|
t2 := asynq.NewTask("email:reminder", map[string]interface{}{"user_id": 42})
|
||||||
|
|
||||||
// Process immediately
|
// Process immediately
|
||||||
err := client.Enqueue(t1)
|
err := client.Enqueue(t1)
|
||||||
@ -52,8 +52,8 @@ func main() {
|
|||||||
err = client.EnqueueIn(24*time.Hour, t2)
|
err = client.EnqueueIn(24*time.Hour, t2)
|
||||||
|
|
||||||
// Process at specified time.
|
// Process at specified time.
|
||||||
t := time.Date(2020, time.March, 6, 10, 0, 0, 0, time.UTC)
|
target := time.Date(2020, time.March, 6, 10, 0, 0, 0, time.UTC)
|
||||||
err = client.EnqueueAt(t, t2)
|
err = client.EnqueueAt(target, t2)
|
||||||
|
|
||||||
// Pass options to specify processing behavior for a given task.
|
// Pass options to specify processing behavior for a given task.
|
||||||
//
|
//
|
||||||
@ -66,6 +66,21 @@ func main() {
|
|||||||
|
|
||||||
To start the background workers, use `Background` and provide your `Handler` to process the tasks.
|
To start the background workers, use `Background` and provide your `Handler` to process the tasks.
|
||||||
|
|
||||||
|
`Handler` is an interface with one method `ProcessTask` with the following signature.
|
||||||
|
|
||||||
|
```go
|
||||||
|
// ProcessTask should return nil if the processing of a task
|
||||||
|
// is successful.
|
||||||
|
//
|
||||||
|
// If ProcessTask return a non-nil error or panics, the task
|
||||||
|
// will be retried after delay.
|
||||||
|
type Handler interface {
|
||||||
|
ProcessTask(context.Context, *asynq.Task) error
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
You can optionally use `ServeMux` to create a handler, just as you would with `"net/http"` Handler.
|
||||||
|
|
||||||
```go
|
```go
|
||||||
func main() {
|
func main() {
|
||||||
r := &asynq.RedisClientOpt{
|
r := &asynq.RedisClientOpt{
|
||||||
@ -84,20 +99,23 @@ func main() {
|
|||||||
// See the godoc for other configuration options
|
// See the godoc for other configuration options
|
||||||
})
|
})
|
||||||
|
|
||||||
bg.Run(handler)
|
mux := asynq.NewServeMux()
|
||||||
|
mux.HandleFunc("email:signup", signupEmailHandler)
|
||||||
|
mux.HandleFunc("email:reminder", reminderEmailHandler)
|
||||||
|
// ...register other handlers...
|
||||||
|
|
||||||
|
bg.Run(mux)
|
||||||
}
|
}
|
||||||
```
|
|
||||||
|
|
||||||
`Handler` is an interface with one method `ProcessTask` with the following signature.
|
// function with the same signature as the ProcessTask method for the Handler interface.
|
||||||
|
func signupEmailHandler(ctx context.Context, t *asynq.Task) error {
|
||||||
```go
|
id, err := t.Payload.GetInt("user_id")
|
||||||
// ProcessTask should return nil if the processing of a task
|
if err != nil {
|
||||||
// is successful.
|
return err
|
||||||
//
|
}
|
||||||
// If ProcessTask return a non-nil error or panics, the task
|
fmt.Printf("Send welcome email to user %d\n", id)
|
||||||
// will be retried after delay.
|
// ...your email sending logic...
|
||||||
type Handler interface {
|
return nil
|
||||||
ProcessTask(context.Context, *asynq.Task) error
|
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
2
asynq.go
2
asynq.go
@ -138,6 +138,6 @@ func createRedisClient(r RedisConnOpt) *redis.Client {
|
|||||||
TLSConfig: r.TLSConfig,
|
TLSConfig: r.TLSConfig,
|
||||||
})
|
})
|
||||||
default:
|
default:
|
||||||
panic(fmt.Sprintf("unexpected type %T for RedisConnOpt", r))
|
panic(fmt.Sprintf("asynq: unexpected type %T for RedisConnOpt", r))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
139
servemux.go
Normal file
139
servemux.go
Normal file
@ -0,0 +1,139 @@
|
|||||||
|
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT license
|
||||||
|
// that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package asynq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ServeMux is a multiplexer for asynchronous tasks.
|
||||||
|
// It matches the type of each task against a list of registered patterns
|
||||||
|
// and calls the handler for the pattern that most closely matches the
|
||||||
|
// taks's type name.
|
||||||
|
//
|
||||||
|
// Longer patterns take precedence over shorter ones, so that if there are
|
||||||
|
// handlers registered for both "images" and "images:thumbnails",
|
||||||
|
// the latter handler will be called for tasks with a type name beginning with
|
||||||
|
// "images:thumbnails" and the former will receive tasks with type name beginning
|
||||||
|
// with "images".
|
||||||
|
type ServeMux struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
m map[string]muxEntry
|
||||||
|
es []muxEntry // slice of entries sorted from longest to shortest.
|
||||||
|
}
|
||||||
|
|
||||||
|
type muxEntry struct {
|
||||||
|
h Handler
|
||||||
|
pattern string
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewServeMux allocates and returns a new ServeMux.
|
||||||
|
func NewServeMux() *ServeMux {
|
||||||
|
return new(ServeMux)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProcessTask dispatches the task to the handler whose
|
||||||
|
// pattern most closely matches the task type.
|
||||||
|
func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error {
|
||||||
|
h, _ := mux.Handler(task)
|
||||||
|
return h.ProcessTask(ctx, task)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler returns the handler to use for the given task.
|
||||||
|
// It always return a non-nil handler.
|
||||||
|
//
|
||||||
|
// Handler also returns the registered pattern that matches the task.
|
||||||
|
//
|
||||||
|
// If there is no registered handler that applies to the task,
|
||||||
|
// handler returns a 'not found' handler which returns an error.
|
||||||
|
func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string) {
|
||||||
|
mux.mu.RLock()
|
||||||
|
defer mux.mu.RUnlock()
|
||||||
|
|
||||||
|
h, pattern = mux.match(t.Type)
|
||||||
|
if h == nil {
|
||||||
|
h, pattern = NotFoundHandler(), ""
|
||||||
|
}
|
||||||
|
return h, pattern
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find a handler on a handler map given a typename string.
|
||||||
|
// Most-specific (longest) pattern wins.
|
||||||
|
func (mux *ServeMux) match(typename string) (h Handler, pattern string) {
|
||||||
|
// Check for exact match first.
|
||||||
|
v, ok := mux.m[typename]
|
||||||
|
if ok {
|
||||||
|
return v.h, v.pattern
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for longest valid match.
|
||||||
|
// mux.es contains all patterns from longest to shortest.
|
||||||
|
for _, e := range mux.es {
|
||||||
|
if strings.HasPrefix(typename, e.pattern) {
|
||||||
|
return e.h, e.pattern
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, ""
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle registers the handler for the given pattern.
|
||||||
|
// If a handler already exists for pattern, Handle panics.
|
||||||
|
func (mux *ServeMux) Handle(pattern string, handler Handler) {
|
||||||
|
mux.mu.Lock()
|
||||||
|
defer mux.mu.Unlock()
|
||||||
|
|
||||||
|
if pattern == "" {
|
||||||
|
panic("asynq: invalid pattern")
|
||||||
|
}
|
||||||
|
if handler == nil {
|
||||||
|
panic("asynq: nil handler")
|
||||||
|
}
|
||||||
|
if _, exist := mux.m[pattern]; exist {
|
||||||
|
panic("asynq: multiple registrations for " + pattern)
|
||||||
|
}
|
||||||
|
|
||||||
|
if mux.m == nil {
|
||||||
|
mux.m = make(map[string]muxEntry)
|
||||||
|
}
|
||||||
|
e := muxEntry{h: handler, pattern: pattern}
|
||||||
|
mux.m[pattern] = e
|
||||||
|
mux.es = appendSorted(mux.es, e)
|
||||||
|
}
|
||||||
|
|
||||||
|
func appendSorted(es []muxEntry, e muxEntry) []muxEntry {
|
||||||
|
n := len(es)
|
||||||
|
i := sort.Search(n, func(i int) bool {
|
||||||
|
return len(es[i].pattern) < len(e.pattern)
|
||||||
|
})
|
||||||
|
if i == n {
|
||||||
|
return append(es, e)
|
||||||
|
}
|
||||||
|
// we now know that i points at where we want to insert.
|
||||||
|
es = append(es, muxEntry{}) // try to grow the slice in place, any entry works.
|
||||||
|
copy(es[i+1:], es[i:]) // shift shorter entries down.
|
||||||
|
es[i] = e
|
||||||
|
return es
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleFunc registers the handler function for the given pattern.
|
||||||
|
func (mux *ServeMux) HandleFunc(pattern string, handler func(context.Context, *Task) error) {
|
||||||
|
if handler == nil {
|
||||||
|
panic("asynq: nil handler")
|
||||||
|
}
|
||||||
|
mux.Handle(pattern, HandlerFunc(handler))
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotFound returns an error indicating that the handler was not found for the given task.
|
||||||
|
func NotFound(ctx context.Context, task *Task) error {
|
||||||
|
return fmt.Errorf("handler not found for task %q", task.Type)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NotFoundHandler returns a simple task handler that returns a ``not found`` error.
|
||||||
|
func NotFoundHandler() Handler { return HandlerFunc(NotFound) }
|
116
servemux_test.go
Normal file
116
servemux_test.go
Normal file
@ -0,0 +1,116 @@
|
|||||||
|
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
||||||
|
// Use of this source code is governed by a MIT license
|
||||||
|
// that can be found in the LICENSE file.
|
||||||
|
|
||||||
|
package asynq
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
var called string
|
||||||
|
|
||||||
|
// makeFakeHandler returns a handler that updates the global called variable
|
||||||
|
// to the given identity.
|
||||||
|
func makeFakeHandler(identity string) Handler {
|
||||||
|
return HandlerFunc(func(ctx context.Context, t *Task) error {
|
||||||
|
called = identity
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// A list of pattern, handler pair that is registered with mux.
|
||||||
|
var serveMuxRegister = []struct {
|
||||||
|
pattern string
|
||||||
|
h Handler
|
||||||
|
}{
|
||||||
|
{"email:", makeFakeHandler("default email handler")},
|
||||||
|
{"email:signup", makeFakeHandler("signup email handler")},
|
||||||
|
{"csv:export", makeFakeHandler("csv export handler")},
|
||||||
|
}
|
||||||
|
|
||||||
|
var serveMuxTests = []struct {
|
||||||
|
typename string // task's type name
|
||||||
|
want string // identifier of the handler that should be called
|
||||||
|
}{
|
||||||
|
{"email:signup", "signup email handler"},
|
||||||
|
{"csv:export", "csv export handler"},
|
||||||
|
{"email:daily", "default email handler"},
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServeMux(t *testing.T) {
|
||||||
|
mux := NewServeMux()
|
||||||
|
for _, e := range serveMuxRegister {
|
||||||
|
mux.Handle(e.pattern, e.h)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range serveMuxTests {
|
||||||
|
called = "" // reset to zero value
|
||||||
|
|
||||||
|
task := NewTask(tc.typename, nil)
|
||||||
|
if err := mux.ProcessTask(context.Background(), task); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if called != tc.want {
|
||||||
|
t.Errorf("%q handler was called for task %q, want %q to be called", called, task.Type, tc.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServeMuxRegisterNilHandler(t *testing.T) {
|
||||||
|
defer func() {
|
||||||
|
if err := recover(); err == nil {
|
||||||
|
t.Error("expected call to mux.HandleFunc to panic")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
mux := NewServeMux()
|
||||||
|
mux.HandleFunc("email:signup", nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServeMuxRegisterEmptyPattern(t *testing.T) {
|
||||||
|
defer func() {
|
||||||
|
if err := recover(); err == nil {
|
||||||
|
t.Error("expected call to mux.HandleFunc to panic")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
mux := NewServeMux()
|
||||||
|
mux.Handle("", makeFakeHandler("email"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServeMuxRegisterDuplicatePattern(t *testing.T) {
|
||||||
|
defer func() {
|
||||||
|
if err := recover(); err == nil {
|
||||||
|
t.Error("expected call to mux.HandleFunc to panic")
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
mux := NewServeMux()
|
||||||
|
mux.Handle("email", makeFakeHandler("email"))
|
||||||
|
mux.Handle("email", makeFakeHandler("email:default"))
|
||||||
|
}
|
||||||
|
|
||||||
|
var notFoundTests = []struct {
|
||||||
|
typename string // task's type name
|
||||||
|
}{
|
||||||
|
{"image:minimize"},
|
||||||
|
{"csv:"}, // registered patterns match the task's type prefix, not the other way around.
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestServeMuxNotFound(t *testing.T) {
|
||||||
|
mux := NewServeMux()
|
||||||
|
for _, e := range serveMuxRegister {
|
||||||
|
mux.Handle(e.pattern, e.h)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range notFoundTests {
|
||||||
|
task := NewTask(tc.typename, nil)
|
||||||
|
err := mux.ProcessTask(context.Background(), task)
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("ProcessTask did not return error for task %q, should return 'not found' error", task.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user