mirror of
				https://github.com/hibiken/asynqmon.git
				synced 2025-10-26 16:26:12 +08:00 
			
		
		
		
	Add completed state
This commit is contained in:
		| @@ -14,7 +14,7 @@ import ( | ||||
| //   - conversion function from an external type to an internal type | ||||
| // **************************************************************************** | ||||
|  | ||||
| // PayloadFormatter is used to convert payload bytes to string shown in the UI. | ||||
| // PayloadFormatter is used to convert payload bytes to a string shown in the UI. | ||||
| type PayloadFormatter interface { | ||||
| 	// FormatPayload takes the task's typename and payload and returns a string representation of the payload. | ||||
| 	FormatPayload(taskType string, payload []byte) string | ||||
| @@ -22,11 +22,22 @@ type PayloadFormatter interface { | ||||
|  | ||||
| type PayloadFormatterFunc func(string, []byte) string | ||||
|  | ||||
| // FormatPayload returns a string representation of the payload of the given taskType. | ||||
| func (f PayloadFormatterFunc) FormatPayload(taskType string, payload []byte) string { | ||||
| 	return f(taskType, payload) | ||||
| } | ||||
|  | ||||
| // ResultFormatter is used to convert result bytes to a string shown in the UI. | ||||
| type ResultFormatter interface { | ||||
| 	// FormatResult takes the task's typename and result and returns a string representation of the result. | ||||
| 	FormatResult(taskType string, result []byte) string | ||||
| } | ||||
|  | ||||
| type ResultFormatterFunc func(string, []byte) string | ||||
|  | ||||
| func (f ResultFormatterFunc) FormatResult(taskType string, result []byte) string { | ||||
| 	return f(taskType, result) | ||||
| } | ||||
|  | ||||
| // DefaultPayloadFormatter is the PayloadFormater used by default. | ||||
| // It prints the given payload bytes as is if the bytes are printable, otherwise it prints a message to indicate | ||||
| // that the bytes are not printable. | ||||
| @@ -37,6 +48,16 @@ var DefaultPayloadFormatter = PayloadFormatterFunc(func(_ string, payload []byte | ||||
| 	return string(payload) | ||||
| }) | ||||
|  | ||||
| // DefaultResultFormatter is the ResultFormatter used by default. | ||||
| // It prints the given result bytes as is if the bytes are printable, otherwise it prints a message to indicate | ||||
| // that the bytes are not printable. | ||||
| var DefaultResultFormatter = ResultFormatterFunc(func(_ string, result []byte) string { | ||||
| 	if !isPrintable(result) { | ||||
| 		return "non-printable bytes" | ||||
| 	} | ||||
| 	return string(result) | ||||
| }) | ||||
|  | ||||
| // isPrintable reports whether the given data is comprised of all printable runes. | ||||
| func isPrintable(data []byte) bool { | ||||
| 	if !utf8.Valid(data) { | ||||
| @@ -67,6 +88,7 @@ type queueStateSnapshot struct { | ||||
| 	Scheduled int `json:"scheduled"` | ||||
| 	Retry     int `json:"retry"` | ||||
| 	Archived  int `json:"archived"` | ||||
| 	Completed int `json:"completed"` | ||||
|  | ||||
| 	// Total number of tasks processed during the given date. | ||||
| 	// The number includes both succeeded and failed tasks. | ||||
| @@ -91,6 +113,7 @@ func toQueueStateSnapshot(s *asynq.QueueInfo) *queueStateSnapshot { | ||||
| 		Scheduled:   s.Scheduled, | ||||
| 		Retry:       s.Retry, | ||||
| 		Archived:    s.Archived, | ||||
| 		Completed:   s.Completed, | ||||
| 		Processed:   s.Processed, | ||||
| 		Succeeded:   s.Processed - s.Failed, | ||||
| 		Failed:      s.Failed, | ||||
| @@ -152,6 +175,22 @@ type taskInfo struct { | ||||
| 	// NextProcessAt is the time the task is scheduled to be processed in RFC3339 format. | ||||
| 	// If not applicable, empty string. | ||||
| 	NextProcessAt string `json:"next_process_at"` | ||||
| 	// CompletedAt is the time the task was successfully processed in RFC3339 format. | ||||
| 	// If not applicable, empty string. | ||||
| 	CompletedAt string `json:"completed_at"` | ||||
| 	// Result is the result data associated with the task. | ||||
| 	Result string `json:"result"` | ||||
| 	// TTL is the number of seconds the task has left to be retained in the queue. | ||||
| 	// This is calculated by (CompletedAt + ResultTTL) - Now. | ||||
| 	TTL int64 `json:"ttl_seconds"` | ||||
| } | ||||
|  | ||||
| // taskTTL calculates TTL for the given task. | ||||
| func taskTTL(task *asynq.TaskInfo) time.Duration { | ||||
| 	if task.State != asynq.TaskStateCompleted { | ||||
| 		return 0 // N/A | ||||
| 	} | ||||
| 	return task.CompletedAt.Add(task.Retention).Sub(time.Now()) | ||||
| } | ||||
|  | ||||
| // formatTimeInRFC3339 formats t in RFC3339 if the value is non-zero. | ||||
| @@ -163,7 +202,7 @@ func formatTimeInRFC3339(t time.Time) string { | ||||
| 	return t.Format(time.RFC3339) | ||||
| } | ||||
|  | ||||
| func toTaskInfo(info *asynq.TaskInfo, pf PayloadFormatter) *taskInfo { | ||||
| func toTaskInfo(info *asynq.TaskInfo, pf PayloadFormatter, rf ResultFormatter) *taskInfo { | ||||
| 	return &taskInfo{ | ||||
| 		ID:            info.ID, | ||||
| 		Queue:         info.Queue, | ||||
| @@ -177,6 +216,9 @@ func toTaskInfo(info *asynq.TaskInfo, pf PayloadFormatter) *taskInfo { | ||||
| 		Timeout:       int(info.Timeout.Seconds()), | ||||
| 		Deadline:      formatTimeInRFC3339(info.Deadline), | ||||
| 		NextProcessAt: formatTimeInRFC3339(info.NextProcessAt), | ||||
| 		CompletedAt:   formatTimeInRFC3339(info.CompletedAt), | ||||
| 		Result:        rf.FormatResult("", info.Result), | ||||
| 		TTL:           int64(taskTTL(info).Seconds()), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -343,6 +385,40 @@ func toArchivedTasks(in []*asynq.TaskInfo, pf PayloadFormatter) []*archivedTask | ||||
| 	return out | ||||
| } | ||||
|  | ||||
| type completedTask struct { | ||||
| 	*baseTask | ||||
| 	CompletedAt time.Time `json:"completed_at"` | ||||
| 	Result      string    `json:"result"` | ||||
| 	// Number of seconds left for retention (i.e. (CompletedAt + ResultTTL) - Now) | ||||
| 	TTL int64 `json:"ttl_seconds"` | ||||
| } | ||||
|  | ||||
| func toCompletedTask(ti *asynq.TaskInfo, pf PayloadFormatter, rf ResultFormatter) *completedTask { | ||||
| 	base := &baseTask{ | ||||
| 		ID:        ti.ID, | ||||
| 		Type:      ti.Type, | ||||
| 		Payload:   pf.FormatPayload(ti.Type, ti.Payload), | ||||
| 		Queue:     ti.Queue, | ||||
| 		MaxRetry:  ti.MaxRetry, | ||||
| 		Retried:   ti.Retried, | ||||
| 		LastError: ti.LastErr, | ||||
| 	} | ||||
| 	return &completedTask{ | ||||
| 		baseTask:    base, | ||||
| 		CompletedAt: ti.CompletedAt, | ||||
| 		TTL:         int64(taskTTL(ti).Seconds()), | ||||
| 		Result:      rf.FormatResult(ti.Type, ti.Result), | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func toCompletedTasks(in []*asynq.TaskInfo, pf PayloadFormatter, rf ResultFormatter) []*completedTask { | ||||
| 	out := make([]*completedTask, len(in)) | ||||
| 	for i, ti := range in { | ||||
| 		out[i] = toCompletedTask(ti, pf, rf) | ||||
| 	} | ||||
| 	return out | ||||
| } | ||||
|  | ||||
| type schedulerEntry struct { | ||||
| 	ID            string   `json:"id"` | ||||
| 	Spec          string   `json:"spec"` | ||||
|   | ||||
		Reference in New Issue
	
	Block a user