2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00
asynq/servemux.go

159 lines
4.5 KiB
Go
Raw Normal View History

// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"context"
"fmt"
"sort"
"strings"
"sync"
)
// ServeMux is a multiplexer for asynchronous tasks.
// It matches the type of each task against a list of registered patterns
// and calls the handler for the pattern that most closely matches the
// taks's type name.
//
// Longer patterns take precedence over shorter ones, so that if there are
// handlers registered for both "images" and "images:thumbnails",
// the latter handler will be called for tasks with a type name beginning with
// "images:thumbnails" and the former will receive tasks with type name beginning
// with "images".
type ServeMux struct {
mu sync.RWMutex
m map[string]muxEntry
es []muxEntry // slice of entries sorted from longest to shortest.
mws []MiddlewareFunc
}
type muxEntry struct {
h Handler
pattern string
}
// MiddlewareFunc is a function which receives an asynq.Handler and returns another asynq.Handler.
// Typically, the returned handler is a closure which does something with the context and task passed
// to it, and then calls the handler passed as parameter to the MiddlewareFunc.
type MiddlewareFunc func(Handler) Handler
// NewServeMux allocates and returns a new ServeMux.
func NewServeMux() *ServeMux {
return new(ServeMux)
}
// ProcessTask dispatches the task to the handler whose
// pattern most closely matches the task type.
func (mux *ServeMux) ProcessTask(ctx context.Context, task *Task) error {
h, _ := mux.Handler(task)
return h.ProcessTask(ctx, task)
}
// Handler returns the handler to use for the given task.
// It always return a non-nil handler.
//
// Handler also returns the registered pattern that matches the task.
//
// If there is no registered handler that applies to the task,
// handler returns a 'not found' handler which returns an error.
func (mux *ServeMux) Handler(t *Task) (h Handler, pattern string) {
mux.mu.RLock()
defer mux.mu.RUnlock()
h, pattern = mux.match(t.Type)
if h == nil {
h, pattern = NotFoundHandler(), ""
}
for i := len(mux.mws) - 1; i >= 0; i-- {
h = mux.mws[i](h)
}
return h, pattern
}
// Find a handler on a handler map given a typename string.
// Most-specific (longest) pattern wins.
func (mux *ServeMux) match(typename string) (h Handler, pattern string) {
// Check for exact match first.
v, ok := mux.m[typename]
if ok {
return v.h, v.pattern
}
// Check for longest valid match.
// mux.es contains all patterns from longest to shortest.
for _, e := range mux.es {
if strings.HasPrefix(typename, e.pattern) {
return e.h, e.pattern
}
}
return nil, ""
}
// Handle registers the handler for the given pattern.
// If a handler already exists for pattern, Handle panics.
func (mux *ServeMux) Handle(pattern string, handler Handler) {
mux.mu.Lock()
defer mux.mu.Unlock()
if pattern == "" {
panic("asynq: invalid pattern")
}
if handler == nil {
panic("asynq: nil handler")
}
if _, exist := mux.m[pattern]; exist {
panic("asynq: multiple registrations for " + pattern)
}
if mux.m == nil {
mux.m = make(map[string]muxEntry)
}
e := muxEntry{h: handler, pattern: pattern}
mux.m[pattern] = e
mux.es = appendSorted(mux.es, e)
}
func appendSorted(es []muxEntry, e muxEntry) []muxEntry {
n := len(es)
i := sort.Search(n, func(i int) bool {
return len(es[i].pattern) < len(e.pattern)
})
if i == n {
return append(es, e)
}
// we now know that i points at where we want to insert.
es = append(es, muxEntry{}) // try to grow the slice in place, any entry works.
copy(es[i+1:], es[i:]) // shift shorter entries down.
es[i] = e
return es
}
// HandleFunc registers the handler function for the given pattern.
func (mux *ServeMux) HandleFunc(pattern string, handler func(context.Context, *Task) error) {
if handler == nil {
panic("asynq: nil handler")
}
mux.Handle(pattern, HandlerFunc(handler))
}
// Use appends a MiddlewareFunc to the chain.
// Middlewares are executed in the order that they are applied to the ServeMux.
func (mux *ServeMux) Use(mws ...MiddlewareFunc) {
mux.mu.Lock()
defer mux.mu.Unlock()
for _, fn := range mws {
mux.mws = append(mux.mws, fn)
}
}
// NotFound returns an error indicating that the handler was not found for the given task.
func NotFound(ctx context.Context, task *Task) error {
return fmt.Errorf("handler not found for task %q", task.Type)
}
// NotFoundHandler returns a simple task handler that returns a ``not found`` error.
func NotFoundHandler() Handler { return HandlerFunc(NotFound) }