2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-09-20 11:05:58 +08:00

Update WorkerInfo and remove unnecessary types

This commit is contained in:
Ken Hibino 2021-05-19 16:23:38 -07:00
parent ea9086fd8b
commit 96c51fdc23
2 changed files with 15 additions and 130 deletions

View File

@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `Inspector.DeleteTaskByKey` is replaced with `Inspector.DeleteTask` - `Inspector.DeleteTaskByKey` is replaced with `Inspector.DeleteTask`
- `Inspector.ArchiveTaskByKey` is replaced with `Inspector.ArchiveTask` - `Inspector.ArchiveTaskByKey` is replaced with `Inspector.ArchiveTask`
- `inspeq` package is removed. All types and functions from the package is moved to `asynq` package. - `inspeq` package is removed. All types and functions from the package is moved to `asynq` package.
- `WorkerInfo` field names have changed.
## [0.17.2] - 2021-06-06 ## [0.17.2] - 2021-06-06

View File

@ -167,127 +167,6 @@ func (i *Inspector) DeleteQueue(qname string, force bool) error {
return err return err
} }
// PendingTask is a task in a queue and is ready to be processed.
type PendingTask struct {
*Task
ID string
Queue string
MaxRetry int
Retried int
LastError string
}
// ActiveTask is a task that's currently being processed.
type ActiveTask struct {
*Task
ID string
Queue string
MaxRetry int
Retried int
LastError string
}
// ScheduledTask is a task scheduled to be processed in the future.
type ScheduledTask struct {
*Task
ID string
Queue string
MaxRetry int
Retried int
LastError string
NextProcessAt time.Time
score int64
}
// RetryTask is a task scheduled to be retried in the future.
type RetryTask struct {
*Task
ID string
Queue string
NextProcessAt time.Time
MaxRetry int
Retried int
LastError string
// TODO: LastFailedAt time.Time
score int64
}
// ArchivedTask is a task archived for debugging and inspection purposes, and
// it won't be retried automatically.
// A task can be archived when the task exhausts its retry counts or manually
// archived by a user via the CLI or Inspector.
type ArchivedTask struct {
*Task
ID string
Queue string
MaxRetry int
Retried int
LastFailedAt time.Time
LastError string
score int64
}
// Format string used for task key.
// Format is <prefix>:<uuid>:<score>.
const taskKeyFormat = "%s:%v:%v"
// Prefix used for task key.
const (
keyPrefixPending = "p"
keyPrefixScheduled = "s"
keyPrefixRetry = "r"
keyPrefixArchived = "a"
allKeyPrefixes = keyPrefixPending + keyPrefixScheduled + keyPrefixRetry + keyPrefixArchived
)
// Key returns a key used to delete, and archive the pending task.
func (t *PendingTask) Key() string {
// Note: Pending tasks are stored in redis LIST, therefore no score.
// Use zero for the score to use the same key format.
return fmt.Sprintf(taskKeyFormat, keyPrefixPending, t.ID, 0)
}
// Key returns a key used to delete, run, and archive the scheduled task.
func (t *ScheduledTask) Key() string {
return fmt.Sprintf(taskKeyFormat, keyPrefixScheduled, t.ID, t.score)
}
// Key returns a key used to delete, run, and archive the retry task.
func (t *RetryTask) Key() string {
return fmt.Sprintf(taskKeyFormat, keyPrefixRetry, t.ID, t.score)
}
// Key returns a key used to delete and run the archived task.
func (t *ArchivedTask) Key() string {
return fmt.Sprintf(taskKeyFormat, keyPrefixArchived, t.ID, t.score)
}
// parseTaskKey parses a key string and returns each part of key with proper
// type if valid, otherwise it reports an error.
func parseTaskKey(key string) (prefix string, id uuid.UUID, score int64, err error) {
parts := strings.Split(key, ":")
if len(parts) != 3 {
return "", uuid.Nil, 0, fmt.Errorf("invalid id")
}
id, err = uuid.Parse(parts[1])
if err != nil {
return "", uuid.Nil, 0, fmt.Errorf("invalid id")
}
score, err = strconv.ParseInt(parts[2], 10, 64)
if err != nil {
return "", uuid.Nil, 0, fmt.Errorf("invalid id")
}
prefix = parts[0]
if len(prefix) != 1 || !strings.Contains(allKeyPrefixes, prefix) {
return "", uuid.Nil, 0, fmt.Errorf("invalid id")
}
return prefix, id, score, nil
}
// ListOption specifies behavior of list operation. // ListOption specifies behavior of list operation.
type ListOption interface{} type ListOption interface{}
@ -724,13 +603,12 @@ func (i *Inspector) Servers() ([]*ServerInfo, error) {
continue continue
} }
wrkInfo := &WorkerInfo{ wrkInfo := &WorkerInfo{
Started: w.Started, TaskID: w.ID,
Deadline: w.Deadline, TaskType: w.Type,
Task: &ActiveTask{ TaskPayload: w.Payload,
Task: NewTask(w.Type, w.Payload), Queue: w.Queue,
ID: w.ID, Started: w.Started,
Queue: w.Queue, Deadline: w.Deadline,
},
} }
srvInfo.ActiveWorkers = append(srvInfo.ActiveWorkers, wrkInfo) srvInfo.ActiveWorkers = append(srvInfo.ActiveWorkers, wrkInfo)
} }
@ -767,8 +645,14 @@ type ServerInfo struct {
// WorkerInfo describes a running worker processing a task. // WorkerInfo describes a running worker processing a task.
type WorkerInfo struct { type WorkerInfo struct {
// The task the worker is processing. // ID of the task the worker is processing.
Task *ActiveTask TaskID string
// Type of the task the worker is processing.
TaskType string
// Payload of the task the worker is processing.
TaskPayload []byte
// Queue from which the worker got its task.
Queue string
// Time the worker started processing the task. // Time the worker started processing the task.
Started time.Time Started time.Time
// Time the worker needs to finish processing the task by. // Time the worker needs to finish processing the task by.