mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-15 11:58:47 +08:00
391 lines
11 KiB
Go
391 lines
11 KiB
Go
// Copyright 2020 Kentaro Hibino. All rights reserved.
|
|
// Use of this source code is governed by a MIT license
|
|
// that can be found in the LICENSE file.
|
|
|
|
// Package base defines foundational types and constants used in asynq package.
|
|
package base
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sort"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-redis/redis/v7"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// Version of asynq library and CLI.
|
|
const Version = "0.17.1"
|
|
|
|
// DefaultQueueName is the queue name used if none are specified by user.
|
|
const DefaultQueueName = "default"
|
|
|
|
// DefaultQueue is the redis key for the default queue.
|
|
var DefaultQueue = QueueKey(DefaultQueueName)
|
|
|
|
// Global Redis keys.
|
|
const (
|
|
AllServers = "asynq:servers" // ZSET
|
|
AllWorkers = "asynq:workers" // ZSET
|
|
AllSchedulers = "asynq:schedulers" // ZSET
|
|
AllQueues = "asynq:queues" // SET
|
|
CancelChannel = "asynq:cancel" // PubSub channel
|
|
)
|
|
|
|
// ValidateQueueName validates a given qname to be used as a queue name.
|
|
// Returns nil if valid, otherwise returns non-nil error.
|
|
func ValidateQueueName(qname string) error {
|
|
if len(qname) == 0 {
|
|
return fmt.Errorf("queue name must contain one or more characters")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// QueueKey returns a redis key for the given queue name.
|
|
func QueueKey(qname string) string {
|
|
return fmt.Sprintf("asynq:{%s}", qname)
|
|
}
|
|
|
|
// ActiveKey returns a redis key for the active tasks.
|
|
func ActiveKey(qname string) string {
|
|
return fmt.Sprintf("asynq:{%s}:active", qname)
|
|
}
|
|
|
|
// ScheduledKey returns a redis key for the scheduled tasks.
|
|
func ScheduledKey(qname string) string {
|
|
return fmt.Sprintf("asynq:{%s}:scheduled", qname)
|
|
}
|
|
|
|
// RetryKey returns a redis key for the retry tasks.
|
|
func RetryKey(qname string) string {
|
|
return fmt.Sprintf("asynq:{%s}:retry", qname)
|
|
}
|
|
|
|
// ArchivedKey returns a redis key for the archived tasks.
|
|
func ArchivedKey(qname string) string {
|
|
return fmt.Sprintf("asynq:{%s}:archived", qname)
|
|
}
|
|
|
|
// DeadlinesKey returns a redis key for the deadlines.
|
|
func DeadlinesKey(qname string) string {
|
|
return fmt.Sprintf("asynq:{%s}:deadlines", qname)
|
|
}
|
|
|
|
// PausedKey returns a redis key to indicate that the given queue is paused.
|
|
func PausedKey(qname string) string {
|
|
return fmt.Sprintf("asynq:{%s}:paused", qname)
|
|
}
|
|
|
|
// ProcessedKey returns a redis key for processed count for the given day for the queue.
|
|
func ProcessedKey(qname string, t time.Time) string {
|
|
return fmt.Sprintf("asynq:{%s}:processed:%s", qname, t.UTC().Format("2006-01-02"))
|
|
}
|
|
|
|
// FailedKey returns a redis key for failure count for the given day for the queue.
|
|
func FailedKey(qname string, t time.Time) string {
|
|
return fmt.Sprintf("asynq:{%s}:failed:%s", qname, t.UTC().Format("2006-01-02"))
|
|
}
|
|
|
|
// ServerInfoKey returns a redis key for process info.
|
|
func ServerInfoKey(hostname string, pid int, serverID string) string {
|
|
return fmt.Sprintf("asynq:servers:{%s:%d:%s}", hostname, pid, serverID)
|
|
}
|
|
|
|
// WorkersKey returns a redis key for the workers given hostname, pid, and server ID.
|
|
func WorkersKey(hostname string, pid int, serverID string) string {
|
|
return fmt.Sprintf("asynq:workers:{%s:%d:%s}", hostname, pid, serverID)
|
|
}
|
|
|
|
// SchedulerEntriesKey returns a redis key for the scheduler entries given scheduler ID.
|
|
func SchedulerEntriesKey(schedulerID string) string {
|
|
return fmt.Sprintf("asynq:schedulers:{%s}", schedulerID)
|
|
}
|
|
|
|
// SchedulerHistoryKey returns a redis key for the scheduler's history for the given entry.
|
|
func SchedulerHistoryKey(entryID string) string {
|
|
return fmt.Sprintf("asynq:scheduler_history:%s", entryID)
|
|
}
|
|
|
|
// UniqueKey returns a redis key with the given type, payload, and queue name.
|
|
func UniqueKey(qname, tasktype string, payload map[string]interface{}) string {
|
|
return fmt.Sprintf("asynq:{%s}:unique:%s:%s", qname, tasktype, serializePayload(payload))
|
|
}
|
|
|
|
func serializePayload(payload map[string]interface{}) string {
|
|
if payload == nil {
|
|
return "nil"
|
|
}
|
|
type entry struct {
|
|
k string
|
|
v interface{}
|
|
}
|
|
var es []entry
|
|
for k, v := range payload {
|
|
es = append(es, entry{k, v})
|
|
}
|
|
// sort entries by key
|
|
sort.Slice(es, func(i, j int) bool { return es[i].k < es[j].k })
|
|
var b strings.Builder
|
|
for _, e := range es {
|
|
if b.Len() > 0 {
|
|
b.WriteString(",")
|
|
}
|
|
b.WriteString(fmt.Sprintf("%s=%v", e.k, e.v))
|
|
}
|
|
return b.String()
|
|
}
|
|
|
|
// TaskMessage is the internal representation of a task with additional metadata fields.
|
|
// Serialized data of this type gets written to redis.
|
|
type TaskMessage struct {
|
|
// Type indicates the kind of the task to be performed.
|
|
Type string
|
|
|
|
// Payload holds data needed to process the task.
|
|
Payload map[string]interface{}
|
|
|
|
// ID is a unique identifier for each task.
|
|
ID uuid.UUID
|
|
|
|
// Queue is a name this message should be enqueued to.
|
|
Queue string
|
|
|
|
// Retry is the max number of retry for this task.
|
|
Retry int
|
|
|
|
// Retried is the number of times we've retried this task so far.
|
|
Retried int
|
|
|
|
// ErrorMsg holds the error message from the last failure.
|
|
ErrorMsg string
|
|
|
|
// Timeout specifies timeout in seconds.
|
|
// If task processing doesn't complete within the timeout, the task will be retried
|
|
// if retry count is remaining. Otherwise it will be moved to the archive.
|
|
//
|
|
// Use zero to indicate no timeout.
|
|
Timeout int64
|
|
|
|
// Deadline specifies the deadline for the task in Unix time,
|
|
// the number of seconds elapsed since January 1, 1970 UTC.
|
|
// If task processing doesn't complete before the deadline, the task will be retried
|
|
// if retry count is remaining. Otherwise it will be moved to the archive.
|
|
//
|
|
// Use zero to indicate no deadline.
|
|
Deadline int64
|
|
|
|
// UniqueKey holds the redis key used for uniqueness lock for this task.
|
|
//
|
|
// Empty string indicates that no uniqueness lock was used.
|
|
UniqueKey string
|
|
}
|
|
|
|
// EncodeMessage marshals the given task message in JSON and returns an encoded string.
|
|
func EncodeMessage(msg *TaskMessage) (string, error) {
|
|
b, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return string(b), nil
|
|
}
|
|
|
|
// DecodeMessage unmarshals the given encoded string and returns a decoded task message.
|
|
func DecodeMessage(s string) (*TaskMessage, error) {
|
|
d := json.NewDecoder(strings.NewReader(s))
|
|
d.UseNumber()
|
|
var msg TaskMessage
|
|
if err := d.Decode(&msg); err != nil {
|
|
return nil, err
|
|
}
|
|
return &msg, nil
|
|
}
|
|
|
|
// Z represents sorted set member.
|
|
type Z struct {
|
|
Message *TaskMessage
|
|
Score int64
|
|
}
|
|
|
|
// ServerStatus represents status of a server.
|
|
// ServerStatus methods are concurrency safe.
|
|
type ServerStatus struct {
|
|
mu sync.Mutex
|
|
val ServerStatusValue
|
|
}
|
|
|
|
// NewServerStatus returns a new status instance given an initial value.
|
|
func NewServerStatus(v ServerStatusValue) *ServerStatus {
|
|
return &ServerStatus{val: v}
|
|
}
|
|
|
|
type ServerStatusValue int
|
|
|
|
const (
|
|
// StatusIdle indicates the server is in idle state.
|
|
StatusIdle ServerStatusValue = iota
|
|
|
|
// StatusRunning indicates the server is up and active.
|
|
StatusRunning
|
|
|
|
// StatusQuiet indicates the server is up but not active.
|
|
StatusQuiet
|
|
|
|
// StatusStopped indicates the server server has been stopped.
|
|
StatusStopped
|
|
)
|
|
|
|
var statuses = []string{
|
|
"idle",
|
|
"running",
|
|
"quiet",
|
|
"stopped",
|
|
}
|
|
|
|
func (s *ServerStatus) String() string {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
if StatusIdle <= s.val && s.val <= StatusStopped {
|
|
return statuses[s.val]
|
|
}
|
|
return "unknown status"
|
|
}
|
|
|
|
// Get returns the status value.
|
|
func (s *ServerStatus) Get() ServerStatusValue {
|
|
s.mu.Lock()
|
|
v := s.val
|
|
s.mu.Unlock()
|
|
return v
|
|
}
|
|
|
|
// Set sets the status value.
|
|
func (s *ServerStatus) Set(v ServerStatusValue) {
|
|
s.mu.Lock()
|
|
s.val = v
|
|
s.mu.Unlock()
|
|
}
|
|
|
|
// ServerInfo holds information about a running server.
|
|
type ServerInfo struct {
|
|
Host string
|
|
PID int
|
|
ServerID string
|
|
Concurrency int
|
|
Queues map[string]int
|
|
StrictPriority bool
|
|
Status string
|
|
Started time.Time
|
|
ActiveWorkerCount int
|
|
}
|
|
|
|
// WorkerInfo holds information about a running worker.
|
|
type WorkerInfo struct {
|
|
Host string
|
|
PID int
|
|
ServerID string
|
|
ID string
|
|
Type string
|
|
Queue string
|
|
Payload map[string]interface{}
|
|
Started time.Time
|
|
Deadline time.Time
|
|
}
|
|
|
|
// SchedulerEntry holds information about a periodic task registered with a scheduler.
|
|
type SchedulerEntry struct {
|
|
// Identifier of this entry.
|
|
ID string
|
|
|
|
// Spec describes the schedule of this entry.
|
|
Spec string
|
|
|
|
// Type is the task type of the periodic task.
|
|
Type string
|
|
|
|
// Payload is the payload of the periodic task.
|
|
Payload map[string]interface{}
|
|
|
|
// Opts is the options for the periodic task.
|
|
Opts []string
|
|
|
|
// Next shows the next time the task will be enqueued.
|
|
Next time.Time
|
|
|
|
// Prev shows the last time the task was enqueued.
|
|
// Zero time if task was never enqueued.
|
|
Prev time.Time
|
|
}
|
|
|
|
// SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.
|
|
type SchedulerEnqueueEvent struct {
|
|
// ID of the task that was enqueued.
|
|
TaskID string
|
|
|
|
// Time the task was enqueued.
|
|
EnqueuedAt time.Time
|
|
}
|
|
|
|
// Cancelations is a collection that holds cancel functions for all active tasks.
|
|
//
|
|
// Cancelations are safe for concurrent use by multipel goroutines.
|
|
type Cancelations struct {
|
|
mu sync.Mutex
|
|
cancelFuncs map[string]context.CancelFunc
|
|
}
|
|
|
|
// NewCancelations returns a Cancelations instance.
|
|
func NewCancelations() *Cancelations {
|
|
return &Cancelations{
|
|
cancelFuncs: make(map[string]context.CancelFunc),
|
|
}
|
|
}
|
|
|
|
// Add adds a new cancel func to the collection.
|
|
func (c *Cancelations) Add(id string, fn context.CancelFunc) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
c.cancelFuncs[id] = fn
|
|
}
|
|
|
|
// Delete deletes a cancel func from the collection given an id.
|
|
func (c *Cancelations) Delete(id string) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
delete(c.cancelFuncs, id)
|
|
}
|
|
|
|
// Get returns a cancel func given an id.
|
|
func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
fn, ok = c.cancelFuncs[id]
|
|
return fn, ok
|
|
}
|
|
|
|
// Broker is a message broker that supports operations to manage task queues.
|
|
//
|
|
// See rdb.RDB as a reference implementation.
|
|
type Broker interface {
|
|
Ping() error
|
|
Enqueue(msg *TaskMessage) error
|
|
EnqueueUnique(msg *TaskMessage, ttl time.Duration) error
|
|
Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
|
|
Done(msg *TaskMessage) error
|
|
Requeue(msg *TaskMessage) error
|
|
Schedule(msg *TaskMessage, processAt time.Time) error
|
|
ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error
|
|
Retry(msg *TaskMessage, processAt time.Time, errMsg string) error
|
|
Archive(msg *TaskMessage, errMsg string) error
|
|
CheckAndEnqueue(qnames ...string) error
|
|
ListDeadlineExceeded(deadline time.Time, qnames ...string) ([]*TaskMessage, error)
|
|
WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
|
|
ClearServerState(host string, pid int, serverID string) error
|
|
CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
|
|
PublishCancelation(id string) error
|
|
Close() error
|
|
}
|