2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-27 16:13:40 +08:00
asynq/internal/base/base.go

354 lines
9.8 KiB
Go
Raw Normal View History

2020-01-03 10:13:16 +08:00
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
2019-12-22 23:15:45 +08:00
// Package base defines foundational types and constants used in asynq package.
package base
import (
"context"
2020-01-31 22:48:58 +08:00
"fmt"
"strings"
2020-02-02 14:22:48 +08:00
"sync"
"time"
2020-04-18 22:55:10 +08:00
"github.com/go-redis/redis/v7"
"github.com/rs/xid"
)
2019-12-22 23:15:45 +08:00
// DefaultQueueName is the queue name used if none are specified by user.
const DefaultQueueName = "default"
2019-12-22 23:15:45 +08:00
// Redis keys
const (
AllServers = "asynq:servers" // ZSET
serversPrefix = "asynq:servers:" // STRING - asynq:ps:<host>:<pid>:<serverid>
AllWorkers = "asynq:workers" // ZSET
workersPrefix = "asynq:workers:" // HASH - asynq:workers:<host:<pid>:<serverid>
processedPrefix = "asynq:processed:" // STRING - asynq:processed:<yyyy-mm-dd>
failurePrefix = "asynq:failure:" // STRING - asynq:failure:<yyyy-mm-dd>
QueuePrefix = "asynq:queues:" // LIST - asynq:queues:<qname>
AllQueues = "asynq:queues" // SET
DefaultQueue = QueuePrefix + DefaultQueueName // LIST
ScheduledQueue = "asynq:scheduled" // ZSET
RetryQueue = "asynq:retry" // ZSET
DeadQueue = "asynq:dead" // ZSET
InProgressQueue = "asynq:in_progress" // LIST
CancelChannel = "asynq:cancel" // PubSub channel
2019-12-22 23:15:45 +08:00
)
2020-02-20 22:34:09 +08:00
// QueueKey returns a redis key for the given queue name.
func QueueKey(qname string) string {
return QueuePrefix + strings.ToLower(qname)
}
2020-02-20 22:34:09 +08:00
// ProcessedKey returns a redis key for processed count for the given day.
func ProcessedKey(t time.Time) string {
return processedPrefix + t.UTC().Format("2006-01-02")
}
2020-02-20 22:34:09 +08:00
// FailureKey returns a redis key for failure count for the given day.
func FailureKey(t time.Time) string {
return failurePrefix + t.UTC().Format("2006-01-02")
}
// ServerInfoKey returns a redis key for process info.
func ServerInfoKey(hostname string, pid int, sid string) string {
return fmt.Sprintf("%s%s:%d:%s", serversPrefix, hostname, pid, sid)
2020-01-31 22:48:58 +08:00
}
// WorkersKey returns a redis key for the workers given hostname, pid, and server ID.
func WorkersKey(hostname string, pid int, sid string) string {
return fmt.Sprintf("%s%s:%d:%s", workersPrefix, hostname, pid, sid)
2020-02-20 22:34:09 +08:00
}
2019-12-22 23:15:45 +08:00
// TaskMessage is the internal representation of a task with additional metadata fields.
2019-12-28 12:37:15 +08:00
// Serialized data of this type gets written to redis.
2019-12-22 23:15:45 +08:00
type TaskMessage struct {
2019-12-28 12:37:15 +08:00
// Type indicates the kind of the task to be performed.
2019-12-22 23:15:45 +08:00
Type string
2019-12-28 12:37:15 +08:00
2019-12-22 23:15:45 +08:00
// Payload holds data needed to process the task.
Payload map[string]interface{}
2019-12-28 12:37:15 +08:00
// ID is a unique identifier for each task.
2019-12-22 23:15:45 +08:00
ID xid.ID
2019-12-28 12:37:15 +08:00
// Queue is a name this message should be enqueued to.
2019-12-22 23:15:45 +08:00
Queue string
2019-12-28 12:37:15 +08:00
2019-12-22 23:15:45 +08:00
// Retry is the max number of retry for this task.
Retry int
2019-12-28 12:37:15 +08:00
// Retried is the number of times we've retried this task so far.
2019-12-22 23:15:45 +08:00
Retried int
2019-12-28 12:37:15 +08:00
// ErrorMsg holds the error message from the last failure.
2019-12-22 23:15:45 +08:00
ErrorMsg string
2020-02-12 13:53:59 +08:00
// Timeout specifies how long a task may run.
// The string value should be compatible with time.Duration.ParseDuration.
//
// Zero means no limit.
Timeout string
// Deadline specifies the deadline for the task.
// Task won't be processed if it exceeded its deadline.
// The string shoulbe be in RFC3339 format.
//
// time.Time's zero value means no deadline.
Deadline string
// UniqueKey holds the redis key used for uniqueness lock for this task.
//
// Empty string indicates that no uniqueness lock was used.
UniqueKey string
2019-12-22 23:15:45 +08:00
}
2020-01-31 22:48:58 +08:00
2020-04-13 02:41:50 +08:00
// ServerState holds process level information.
2020-02-18 22:57:39 +08:00
//
2020-04-13 02:41:50 +08:00
// ServerStates are safe for concurrent use by multiple goroutines.
type ServerState struct {
2020-02-20 23:44:13 +08:00
mu sync.Mutex // guards all data fields
id xid.ID
2020-02-20 23:44:13 +08:00
concurrency int
queues map[string]int
strictPriority bool
pid int
host string
2020-04-13 02:41:50 +08:00
status ServerStatus
2020-02-20 23:44:13 +08:00
started time.Time
workers map[string]*workerStats
2020-02-18 22:57:39 +08:00
}
2020-04-13 02:41:50 +08:00
// ServerStatus represents status of a server.
type ServerStatus int
2020-02-18 22:57:39 +08:00
const (
2020-04-13 02:41:50 +08:00
// StatusIdle indicates the server is in idle state.
StatusIdle ServerStatus = iota
2020-02-18 22:57:39 +08:00
2020-04-13 02:41:50 +08:00
// StatusRunning indicates the servier is up and processing tasks.
2020-02-18 22:57:39 +08:00
StatusRunning
2020-04-13 02:41:50 +08:00
// StatusQuiet indicates the server is up but not processing new tasks.
StatusQuiet
// StatusStopped indicates the server server has been stopped.
2020-02-18 22:57:39 +08:00
StatusStopped
)
var statuses = []string{
"idle",
"running",
2020-04-13 02:41:50 +08:00
"quiet",
2020-02-18 22:57:39 +08:00
"stopped",
}
2020-04-13 02:41:50 +08:00
func (s ServerStatus) String() string {
2020-02-18 22:57:39 +08:00
if StatusIdle <= s && s <= StatusStopped {
return statuses[s]
}
return "unknown status"
}
2020-02-20 23:44:13 +08:00
type workerStats struct {
msg *TaskMessage
started time.Time
}
2020-04-13 02:41:50 +08:00
// NewServerState returns a new instance of ServerState.
func NewServerState(host string, pid, concurrency int, queues map[string]int, strict bool) *ServerState {
return &ServerState{
2020-02-18 22:57:39 +08:00
host: host,
pid: pid,
id: xid.New(),
2020-02-18 22:57:39 +08:00
concurrency: concurrency,
queues: cloneQueueConfig(queues),
strictPriority: strict,
status: StatusIdle,
2020-02-20 23:44:13 +08:00
workers: make(map[string]*workerStats),
2020-02-18 22:57:39 +08:00
}
}
2020-04-13 02:41:50 +08:00
// SetStatus updates the status of server.
func (ss *ServerState) SetStatus(status ServerStatus) {
ss.mu.Lock()
defer ss.mu.Unlock()
ss.status = status
}
// Status returns the status of server.
2020-04-13 02:41:50 +08:00
func (ss *ServerState) Status() ServerStatus {
ss.mu.Lock()
defer ss.mu.Unlock()
return ss.status
2020-02-18 22:57:39 +08:00
}
// SetStarted records when the process started processing.
2020-04-13 02:41:50 +08:00
func (ss *ServerState) SetStarted(t time.Time) {
ss.mu.Lock()
defer ss.mu.Unlock()
ss.started = t
2020-02-18 22:57:39 +08:00
}
2020-02-20 23:44:13 +08:00
// AddWorkerStats records when a worker started and which task it's processing.
2020-04-13 02:41:50 +08:00
func (ss *ServerState) AddWorkerStats(msg *TaskMessage, started time.Time) {
ss.mu.Lock()
defer ss.mu.Unlock()
ss.workers[msg.ID.String()] = &workerStats{msg, started}
2020-02-20 23:44:13 +08:00
}
// DeleteWorkerStats removes a worker's entry from the process state.
2020-04-13 02:41:50 +08:00
func (ss *ServerState) DeleteWorkerStats(msg *TaskMessage) {
ss.mu.Lock()
defer ss.mu.Unlock()
delete(ss.workers, msg.ID.String())
2020-02-18 22:57:39 +08:00
}
// GetInfo returns current state of server as a ServerInfo.
func (ss *ServerState) GetInfo() *ServerInfo {
2020-04-13 02:41:50 +08:00
ss.mu.Lock()
defer ss.mu.Unlock()
return &ServerInfo{
2020-04-13 02:41:50 +08:00
Host: ss.host,
PID: ss.pid,
ServerID: ss.id.String(),
2020-04-13 02:41:50 +08:00
Concurrency: ss.concurrency,
Queues: cloneQueueConfig(ss.queues),
StrictPriority: ss.strictPriority,
Status: ss.status.String(),
Started: ss.started,
ActiveWorkerCount: len(ss.workers),
2020-02-18 22:57:39 +08:00
}
}
2020-02-20 23:44:13 +08:00
// GetWorkers returns a list of currently running workers' info.
2020-04-13 02:41:50 +08:00
func (ss *ServerState) GetWorkers() []*WorkerInfo {
ss.mu.Lock()
defer ss.mu.Unlock()
2020-02-20 23:44:13 +08:00
var res []*WorkerInfo
2020-04-13 02:41:50 +08:00
for _, w := range ss.workers {
2020-02-20 23:44:13 +08:00
res = append(res, &WorkerInfo{
2020-04-13 02:41:50 +08:00
Host: ss.host,
PID: ss.pid,
2020-02-20 23:44:13 +08:00
ID: w.msg.ID,
Type: w.msg.Type,
Queue: w.msg.Queue,
Payload: clonePayload(w.msg.Payload),
Started: w.started,
})
}
return res
}
2020-02-18 22:57:39 +08:00
func cloneQueueConfig(qcfg map[string]int) map[string]int {
res := make(map[string]int)
for qname, n := range qcfg {
res[qname] = n
}
return res
}
2020-02-20 23:44:13 +08:00
func clonePayload(payload map[string]interface{}) map[string]interface{} {
res := make(map[string]interface{})
for k, v := range payload {
res[k] = v
}
return res
}
// ServerInfo holds information about a running server.
type ServerInfo struct {
2020-02-20 23:44:13 +08:00
Host string
PID int
ServerID string
2020-02-02 14:22:48 +08:00
Concurrency int
Queues map[string]int
2020-02-02 14:22:48 +08:00
StrictPriority bool
2020-02-18 22:57:39 +08:00
Status string
2020-02-02 14:22:48 +08:00
Started time.Time
ActiveWorkerCount int
}
2020-02-20 23:44:13 +08:00
// WorkerInfo holds information about a running worker.
type WorkerInfo struct {
Host string
PID int
ID xid.ID
Type string
Queue string
Payload map[string]interface{}
Started time.Time
}
2020-02-13 09:33:41 +08:00
// Cancelations is a collection that holds cancel functions for all in-progress tasks.
//
2020-02-18 22:57:39 +08:00
// Cancelations are safe for concurrent use by multipel goroutines.
type Cancelations struct {
mu sync.Mutex
cancelFuncs map[string]context.CancelFunc
}
// NewCancelations returns a Cancelations instance.
func NewCancelations() *Cancelations {
return &Cancelations{
cancelFuncs: make(map[string]context.CancelFunc),
}
}
2020-02-13 09:33:41 +08:00
// Add adds a new cancel func to the collection.
func (c *Cancelations) Add(id string, fn context.CancelFunc) {
c.mu.Lock()
defer c.mu.Unlock()
c.cancelFuncs[id] = fn
}
2020-02-13 09:33:41 +08:00
// Delete deletes a cancel func from the collection given an id.
func (c *Cancelations) Delete(id string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.cancelFuncs, id)
}
// Get returns a cancel func given an id.
2020-02-20 23:44:13 +08:00
func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool) {
c.mu.Lock()
defer c.mu.Unlock()
2020-02-20 23:44:13 +08:00
fn, ok = c.cancelFuncs[id]
return fn, ok
}
// GetAll returns all cancel funcs.
func (c *Cancelations) GetAll() []context.CancelFunc {
c.mu.Lock()
defer c.mu.Unlock()
var res []context.CancelFunc
for _, fn := range c.cancelFuncs {
res = append(res, fn)
}
return res
}
2020-04-18 22:55:10 +08:00
// Broker is a message broker that supports operations to manage task queues.
//
// See rdb.RDB as a reference implementation.
type Broker interface {
Enqueue(msg *TaskMessage) error
EnqueueUnique(msg *TaskMessage, ttl time.Duration) error
Dequeue(qnames ...string) (*TaskMessage, error)
Done(msg *TaskMessage) error
Requeue(msg *TaskMessage) error
Schedule(msg *TaskMessage, processAt time.Time) error
ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error
Retry(msg *TaskMessage, processAt time.Time, errMsg string) error
Kill(msg *TaskMessage, errMsg string) error
RequeueAll() (int64, error)
CheckAndEnqueue(qnames ...string) error
WriteServerState(ss *ServerState, ttl time.Duration) error
ClearServerState(ss *ServerState) error
CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
PublishCancelation(id string) error
Close() error
}