From 96c51fdc23b75c0239356b6e11366484ffeebc21 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Wed, 19 May 2021 16:23:38 -0700 Subject: [PATCH] Update WorkerInfo and remove unnecessary types --- CHANGELOG.md | 1 + inspector.go | 144 +++++---------------------------------------------- 2 files changed, 15 insertions(+), 130 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aac4782..79143fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `Inspector.DeleteTaskByKey` is replaced with `Inspector.DeleteTask` - `Inspector.ArchiveTaskByKey` is replaced with `Inspector.ArchiveTask` - `inspeq` package is removed. All types and functions from the package is moved to `asynq` package. +- `WorkerInfo` field names have changed. ## [0.17.2] - 2021-06-06 diff --git a/inspector.go b/inspector.go index 2c72bda..3635b34 100644 --- a/inspector.go +++ b/inspector.go @@ -167,127 +167,6 @@ func (i *Inspector) DeleteQueue(qname string, force bool) error { return err } -// PendingTask is a task in a queue and is ready to be processed. -type PendingTask struct { - *Task - ID string - Queue string - MaxRetry int - Retried int - LastError string -} - -// ActiveTask is a task that's currently being processed. -type ActiveTask struct { - *Task - ID string - Queue string - MaxRetry int - Retried int - LastError string -} - -// ScheduledTask is a task scheduled to be processed in the future. -type ScheduledTask struct { - *Task - ID string - Queue string - MaxRetry int - Retried int - LastError string - NextProcessAt time.Time - - score int64 -} - -// RetryTask is a task scheduled to be retried in the future. -type RetryTask struct { - *Task - ID string - Queue string - NextProcessAt time.Time - MaxRetry int - Retried int - LastError string - // TODO: LastFailedAt time.Time - - score int64 -} - -// ArchivedTask is a task archived for debugging and inspection purposes, and -// it won't be retried automatically. -// A task can be archived when the task exhausts its retry counts or manually -// archived by a user via the CLI or Inspector. -type ArchivedTask struct { - *Task - ID string - Queue string - MaxRetry int - Retried int - LastFailedAt time.Time - LastError string - - score int64 -} - -// Format string used for task key. -// Format is ::. -const taskKeyFormat = "%s:%v:%v" - -// Prefix used for task key. -const ( - keyPrefixPending = "p" - keyPrefixScheduled = "s" - keyPrefixRetry = "r" - keyPrefixArchived = "a" - - allKeyPrefixes = keyPrefixPending + keyPrefixScheduled + keyPrefixRetry + keyPrefixArchived -) - -// Key returns a key used to delete, and archive the pending task. -func (t *PendingTask) Key() string { - // Note: Pending tasks are stored in redis LIST, therefore no score. - // Use zero for the score to use the same key format. - return fmt.Sprintf(taskKeyFormat, keyPrefixPending, t.ID, 0) -} - -// Key returns a key used to delete, run, and archive the scheduled task. -func (t *ScheduledTask) Key() string { - return fmt.Sprintf(taskKeyFormat, keyPrefixScheduled, t.ID, t.score) -} - -// Key returns a key used to delete, run, and archive the retry task. -func (t *RetryTask) Key() string { - return fmt.Sprintf(taskKeyFormat, keyPrefixRetry, t.ID, t.score) -} - -// Key returns a key used to delete and run the archived task. -func (t *ArchivedTask) Key() string { - return fmt.Sprintf(taskKeyFormat, keyPrefixArchived, t.ID, t.score) -} - -// parseTaskKey parses a key string and returns each part of key with proper -// type if valid, otherwise it reports an error. -func parseTaskKey(key string) (prefix string, id uuid.UUID, score int64, err error) { - parts := strings.Split(key, ":") - if len(parts) != 3 { - return "", uuid.Nil, 0, fmt.Errorf("invalid id") - } - id, err = uuid.Parse(parts[1]) - if err != nil { - return "", uuid.Nil, 0, fmt.Errorf("invalid id") - } - score, err = strconv.ParseInt(parts[2], 10, 64) - if err != nil { - return "", uuid.Nil, 0, fmt.Errorf("invalid id") - } - prefix = parts[0] - if len(prefix) != 1 || !strings.Contains(allKeyPrefixes, prefix) { - return "", uuid.Nil, 0, fmt.Errorf("invalid id") - } - return prefix, id, score, nil -} - // ListOption specifies behavior of list operation. type ListOption interface{} @@ -724,13 +603,12 @@ func (i *Inspector) Servers() ([]*ServerInfo, error) { continue } wrkInfo := &WorkerInfo{ - Started: w.Started, - Deadline: w.Deadline, - Task: &ActiveTask{ - Task: NewTask(w.Type, w.Payload), - ID: w.ID, - Queue: w.Queue, - }, + TaskID: w.ID, + TaskType: w.Type, + TaskPayload: w.Payload, + Queue: w.Queue, + Started: w.Started, + Deadline: w.Deadline, } srvInfo.ActiveWorkers = append(srvInfo.ActiveWorkers, wrkInfo) } @@ -767,8 +645,14 @@ type ServerInfo struct { // WorkerInfo describes a running worker processing a task. type WorkerInfo struct { - // The task the worker is processing. - Task *ActiveTask + // ID of the task the worker is processing. + TaskID string + // Type of the task the worker is processing. + TaskType string + // Payload of the task the worker is processing. + TaskPayload []byte + // Queue from which the worker got its task. + Queue string // Time the worker started processing the task. Started time.Time // Time the worker needs to finish processing the task by.