mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-26 11:16:12 +08:00 
			
		
		
		
	
		
			
				
	
	
		
			306 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			306 lines
		
	
	
		
			8.4 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| // Copyright 2020 Kentaro Hibino. All rights reserved.
 | |
| // Use of this source code is governed by a MIT license
 | |
| // that can be found in the LICENSE file.
 | |
| 
 | |
| // Package base defines foundational types and constants used in asynq package.
 | |
| package base
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"strings"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/go-redis/redis/v7"
 | |
| 	"github.com/google/uuid"
 | |
| )
 | |
| 
 | |
| // Version of asynq library and CLI.
 | |
| const Version = "0.10.0"
 | |
| 
 | |
| // DefaultQueueName is the queue name used if none are specified by user.
 | |
| const DefaultQueueName = "default"
 | |
| 
 | |
| // DefaultQueue is the redis key for the default queue.
 | |
| var DefaultQueue = QueueKey(DefaultQueueName)
 | |
| 
 | |
| // Global Redis keys.
 | |
| const (
 | |
| 	AllServers    = "asynq:servers" // ZSET
 | |
| 	AllWorkers    = "asynq:workers" // ZSET
 | |
| 	AllQueues     = "asynq:queues"  // SET
 | |
| 	CancelChannel = "asynq:cancel"  // PubSub channel
 | |
| )
 | |
| 
 | |
| // QueueKey returns a redis key for the given queue name.
 | |
| func QueueKey(qname string) string {
 | |
| 	return fmt.Sprintf("asynq:{%s}", qname)
 | |
| }
 | |
| 
 | |
| // TODO: Should we rename this to "active"?
 | |
| // InProgressKey returns a redis key for the in-progress tasks.
 | |
| func InProgressKey(qname string) string {
 | |
| 	return fmt.Sprintf("asynq:{%s}:in_progress", qname)
 | |
| }
 | |
| 
 | |
| // ScheduledKey returns a redis key for the scheduled tasks.
 | |
| func ScheduledKey(qname string) string {
 | |
| 	return fmt.Sprintf("asynq:{%s}:scheduled", qname)
 | |
| }
 | |
| 
 | |
| // RetryKey returns a redis key for the retry tasks.
 | |
| func RetryKey(qname string) string {
 | |
| 	return fmt.Sprintf("asynq:{%s}:retry", qname)
 | |
| }
 | |
| 
 | |
| // DeadKey returns a redis key for the dead tasks.
 | |
| func DeadKey(qname string) string {
 | |
| 	return fmt.Sprintf("asynq:{%s}:dead", qname)
 | |
| }
 | |
| 
 | |
| // DeadlinesKey returns a redis key for the deadlines.
 | |
| func DeadlinesKey(qname string) string {
 | |
| 	return fmt.Sprintf("asynq:{%s}:deadlines", qname)
 | |
| }
 | |
| 
 | |
| // PausedKey returns a redis key to indicate that the given queue is paused.
 | |
| func PausedKey(qname string) string {
 | |
| 	return fmt.Sprintf("asynq:{%s}:paused", qname)
 | |
| }
 | |
| 
 | |
| // ProcessedKey returns a redis key for processed count for the given day for the queue.
 | |
| func ProcessedKey(qname string, t time.Time) string {
 | |
| 	return fmt.Sprintf("asynq:{%s}:processed:%s", qname, t.UTC().Format("2006-01-02"))
 | |
| }
 | |
| 
 | |
| // FailedKey returns a redis key for failure count for the given day for the queue.
 | |
| func FailedKey(qname string, t time.Time) string {
 | |
| 	return fmt.Sprintf("asynq:{%s}:failed:%s", qname, t.UTC().Format("2006-01-02"))
 | |
| }
 | |
| 
 | |
| // ServerInfoKey returns a redis key for process info.
 | |
| func ServerInfoKey(hostname string, pid int, sid string) string {
 | |
| 	return fmt.Sprintf("asynq:servers:{%s:%d:%s}", hostname, pid, sid)
 | |
| }
 | |
| 
 | |
| // WorkersKey returns a redis key for the workers given hostname, pid, and server ID.
 | |
| func WorkersKey(hostname string, pid int, sid string) string {
 | |
| 	return fmt.Sprintf("asynq:workers:{%s:%d:%s}", hostname, pid, sid)
 | |
| }
 | |
| 
 | |
| // TaskMessage is the internal representation of a task with additional metadata fields.
 | |
| // Serialized data of this type gets written to redis.
 | |
| type TaskMessage struct {
 | |
| 	// Type indicates the kind of the task to be performed.
 | |
| 	Type string
 | |
| 
 | |
| 	// Payload holds data needed to process the task.
 | |
| 	Payload map[string]interface{}
 | |
| 
 | |
| 	// ID is a unique identifier for each task.
 | |
| 	ID uuid.UUID
 | |
| 
 | |
| 	// Queue is a name this message should be enqueued to.
 | |
| 	Queue string
 | |
| 
 | |
| 	// Retry is the max number of retry for this task.
 | |
| 	Retry int
 | |
| 
 | |
| 	// Retried is the number of times we've retried this task so far.
 | |
| 	Retried int
 | |
| 
 | |
| 	// ErrorMsg holds the error message from the last failure.
 | |
| 	ErrorMsg string
 | |
| 
 | |
| 	// Timeout specifies timeout in seconds.
 | |
| 	// If task processing doesn't complete within the timeout, the task will be retried
 | |
| 	// if retry count is remaining. Otherwise it will be moved to the dead queue.
 | |
| 	//
 | |
| 	// Use zero to indicate no timeout.
 | |
| 	Timeout int64
 | |
| 
 | |
| 	// Deadline specifies the deadline for the task in Unix time,
 | |
| 	// the number of seconds elapsed since January 1, 1970 UTC.
 | |
| 	// If task processing doesn't complete before the deadline, the task will be retried
 | |
| 	// if retry count is remaining. Otherwise it will be moved to the dead queue.
 | |
| 	//
 | |
| 	// Use zero to indicate no deadline.
 | |
| 	Deadline int64
 | |
| 
 | |
| 	// UniqueKey holds the redis key used for uniqueness lock for this task.
 | |
| 	//
 | |
| 	// Empty string indicates that no uniqueness lock was used.
 | |
| 	UniqueKey string
 | |
| }
 | |
| 
 | |
| // EncodeMessage marshals the given task message in JSON and returns an encoded string.
 | |
| func EncodeMessage(msg *TaskMessage) (string, error) {
 | |
| 	b, err := json.Marshal(msg)
 | |
| 	if err != nil {
 | |
| 		return "", err
 | |
| 	}
 | |
| 	return string(b), nil
 | |
| }
 | |
| 
 | |
| // DecodeMessage unmarshals the given encoded string and returns a decoded task message.
 | |
| func DecodeMessage(s string) (*TaskMessage, error) {
 | |
| 	d := json.NewDecoder(strings.NewReader(s))
 | |
| 	d.UseNumber()
 | |
| 	var msg TaskMessage
 | |
| 	if err := d.Decode(&msg); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	return &msg, nil
 | |
| }
 | |
| 
 | |
| // Z represents sorted set member.
 | |
| type Z struct {
 | |
| 	Message *TaskMessage
 | |
| 	Score   int64
 | |
| }
 | |
| 
 | |
| // ServerStatus represents status of a server.
 | |
| // ServerStatus methods are concurrency safe.
 | |
| type ServerStatus struct {
 | |
| 	mu  sync.Mutex
 | |
| 	val ServerStatusValue
 | |
| }
 | |
| 
 | |
| // NewServerStatus returns a new status instance given an initial value.
 | |
| func NewServerStatus(v ServerStatusValue) *ServerStatus {
 | |
| 	return &ServerStatus{val: v}
 | |
| }
 | |
| 
 | |
| type ServerStatusValue int
 | |
| 
 | |
| const (
 | |
| 	// StatusIdle indicates the server is in idle state.
 | |
| 	StatusIdle ServerStatusValue = iota
 | |
| 
 | |
| 	// StatusRunning indicates the servier is up and processing tasks.
 | |
| 	StatusRunning
 | |
| 
 | |
| 	// StatusQuiet indicates the server is up but not processing new tasks.
 | |
| 	StatusQuiet
 | |
| 
 | |
| 	// StatusStopped indicates the server server has been stopped.
 | |
| 	StatusStopped
 | |
| )
 | |
| 
 | |
| var statuses = []string{
 | |
| 	"idle",
 | |
| 	"running",
 | |
| 	"quiet",
 | |
| 	"stopped",
 | |
| }
 | |
| 
 | |
| func (s *ServerStatus) String() string {
 | |
| 	s.mu.Lock()
 | |
| 	defer s.mu.Unlock()
 | |
| 	if StatusIdle <= s.val && s.val <= StatusStopped {
 | |
| 		return statuses[s.val]
 | |
| 	}
 | |
| 	return "unknown status"
 | |
| }
 | |
| 
 | |
| // Get returns the status value.
 | |
| func (s *ServerStatus) Get() ServerStatusValue {
 | |
| 	s.mu.Lock()
 | |
| 	v := s.val
 | |
| 	s.mu.Unlock()
 | |
| 	return v
 | |
| }
 | |
| 
 | |
| // Set sets the status value.
 | |
| func (s *ServerStatus) Set(v ServerStatusValue) {
 | |
| 	s.mu.Lock()
 | |
| 	s.val = v
 | |
| 	s.mu.Unlock()
 | |
| }
 | |
| 
 | |
| // ServerInfo holds information about a running server.
 | |
| type ServerInfo struct {
 | |
| 	Host              string
 | |
| 	PID               int
 | |
| 	ServerID          string
 | |
| 	Concurrency       int
 | |
| 	Queues            map[string]int
 | |
| 	StrictPriority    bool
 | |
| 	Status            string
 | |
| 	Started           time.Time
 | |
| 	ActiveWorkerCount int
 | |
| }
 | |
| 
 | |
| // WorkerInfo holds information about a running worker.
 | |
| type WorkerInfo struct {
 | |
| 	Host    string
 | |
| 	PID     int
 | |
| 	ID      string
 | |
| 	Type    string
 | |
| 	Queue   string
 | |
| 	Payload map[string]interface{}
 | |
| 	Started time.Time
 | |
| }
 | |
| 
 | |
| // Cancelations is a collection that holds cancel functions for all in-progress tasks.
 | |
| //
 | |
| // Cancelations are safe for concurrent use by multipel goroutines.
 | |
| type Cancelations struct {
 | |
| 	mu          sync.Mutex
 | |
| 	cancelFuncs map[string]context.CancelFunc
 | |
| }
 | |
| 
 | |
| // NewCancelations returns a Cancelations instance.
 | |
| func NewCancelations() *Cancelations {
 | |
| 	return &Cancelations{
 | |
| 		cancelFuncs: make(map[string]context.CancelFunc),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Add adds a new cancel func to the collection.
 | |
| func (c *Cancelations) Add(id string, fn context.CancelFunc) {
 | |
| 	c.mu.Lock()
 | |
| 	defer c.mu.Unlock()
 | |
| 	c.cancelFuncs[id] = fn
 | |
| }
 | |
| 
 | |
| // Delete deletes a cancel func from the collection given an id.
 | |
| func (c *Cancelations) Delete(id string) {
 | |
| 	c.mu.Lock()
 | |
| 	defer c.mu.Unlock()
 | |
| 	delete(c.cancelFuncs, id)
 | |
| }
 | |
| 
 | |
| // Get returns a cancel func given an id.
 | |
| func (c *Cancelations) Get(id string) (fn context.CancelFunc, ok bool) {
 | |
| 	c.mu.Lock()
 | |
| 	defer c.mu.Unlock()
 | |
| 	fn, ok = c.cancelFuncs[id]
 | |
| 	return fn, ok
 | |
| }
 | |
| 
 | |
| // Broker is a message broker that supports operations to manage task queues.
 | |
| //
 | |
| // See rdb.RDB as a reference implementation.
 | |
| type Broker interface {
 | |
| 	Ping() error
 | |
| 	Enqueue(msg *TaskMessage) error
 | |
| 	EnqueueUnique(msg *TaskMessage, ttl time.Duration) error
 | |
| 	Dequeue(qnames ...string) (*TaskMessage, time.Time, error)
 | |
| 	Done(msg *TaskMessage) error
 | |
| 	Requeue(msg *TaskMessage) error
 | |
| 	Schedule(msg *TaskMessage, processAt time.Time) error
 | |
| 	ScheduleUnique(msg *TaskMessage, processAt time.Time, ttl time.Duration) error
 | |
| 	Retry(msg *TaskMessage, processAt time.Time, errMsg string) error
 | |
| 	Kill(msg *TaskMessage, errMsg string) error
 | |
| 	CheckAndEnqueue() error
 | |
| 	ListDeadlineExceeded(deadline time.Time) ([]*TaskMessage, error)
 | |
| 	WriteServerState(info *ServerInfo, workers []*WorkerInfo, ttl time.Duration) error
 | |
| 	ClearServerState(host string, pid int, serverID string) error
 | |
| 	CancelationPubSub() (*redis.PubSub, error) // TODO: Need to decouple from redis to support other brokers
 | |
| 	PublishCancelation(id string) error
 | |
| 	Close() error
 | |
| }
 |