mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-27 00:02:19 +08:00
Record last_failed_at time on Retry or Archive event
This commit is contained in:
parent
62168b8d0d
commit
87264b66f3
@ -10,6 +10,7 @@ import (
|
|||||||
"math"
|
"math"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/go-redis/redis/v7"
|
"github.com/go-redis/redis/v7"
|
||||||
"github.com/google/go-cmp/cmp"
|
"github.com/google/go-cmp/cmp"
|
||||||
@ -126,12 +127,14 @@ func JSON(kv map[string]interface{}) []byte {
|
|||||||
func TaskMessageAfterRetry(t base.TaskMessage, errMsg string) *base.TaskMessage {
|
func TaskMessageAfterRetry(t base.TaskMessage, errMsg string) *base.TaskMessage {
|
||||||
t.Retried = t.Retried + 1
|
t.Retried = t.Retried + 1
|
||||||
t.ErrorMsg = errMsg
|
t.ErrorMsg = errMsg
|
||||||
|
t.LastFailedAt = time.Now().Unix() // use EquateApproxTime with cmp.Diff
|
||||||
return &t
|
return &t
|
||||||
}
|
}
|
||||||
|
|
||||||
// TaskMessageWithError returns an updated copy of t with the given error message.
|
// TaskMessageWithError returns an updated copy of t with the given error message.
|
||||||
func TaskMessageWithError(t base.TaskMessage, errMsg string) *base.TaskMessage {
|
func TaskMessageWithError(t base.TaskMessage, errMsg string) *base.TaskMessage {
|
||||||
t.ErrorMsg = errMsg
|
t.ErrorMsg = errMsg
|
||||||
|
t.LastFailedAt = time.Now().Unix() // use EquateApproxTime with cmp.Diff
|
||||||
return &t
|
return &t
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -483,14 +483,15 @@ return redis.status_reply("OK")`)
|
|||||||
// and assigning error message to the task message.
|
// and assigning error message to the task message.
|
||||||
func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error {
|
func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error {
|
||||||
var op errors.Op = "rdb.Retry"
|
var op errors.Op = "rdb.Retry"
|
||||||
|
now := time.Now()
|
||||||
modified := *msg
|
modified := *msg
|
||||||
modified.Retried++
|
modified.Retried++
|
||||||
modified.ErrorMsg = errMsg
|
modified.ErrorMsg = errMsg
|
||||||
|
modified.LastFailedAt = now.Unix()
|
||||||
encoded, err := base.EncodeMessage(&modified)
|
encoded, err := base.EncodeMessage(&modified)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode message: %v", err))
|
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode message: %v", err))
|
||||||
}
|
}
|
||||||
now := time.Now()
|
|
||||||
expireAt := now.Add(statsTTL)
|
expireAt := now.Add(statsTTL)
|
||||||
keys := []string{
|
keys := []string{
|
||||||
base.TaskKey(msg.Queue, msg.ID.String()),
|
base.TaskKey(msg.Queue, msg.ID.String()),
|
||||||
@ -551,13 +552,14 @@ return redis.status_reply("OK")`)
|
|||||||
// It also trims the archive by timestamp and set size.
|
// It also trims the archive by timestamp and set size.
|
||||||
func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
|
func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error {
|
||||||
var op errors.Op = "rdb.Archive"
|
var op errors.Op = "rdb.Archive"
|
||||||
|
now := time.Now()
|
||||||
modified := *msg
|
modified := *msg
|
||||||
modified.ErrorMsg = errMsg
|
modified.ErrorMsg = errMsg
|
||||||
|
modified.LastFailedAt = now.Unix()
|
||||||
encoded, err := base.EncodeMessage(&modified)
|
encoded, err := base.EncodeMessage(&modified)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode message: %v", err))
|
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode message: %v", err))
|
||||||
}
|
}
|
||||||
now := time.Now()
|
|
||||||
cutoff := now.AddDate(0, 0, -archivedExpirationInDays)
|
cutoff := now.AddDate(0, 0, -archivedExpirationInDays)
|
||||||
expireAt := now.Add(statsTTL)
|
expireAt := now.Add(statsTTL)
|
||||||
keys := []string{
|
keys := []string{
|
||||||
|
@ -1163,9 +1163,13 @@ func TestRetry(t *testing.T) {
|
|||||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadlinesKey(queue), diff)
|
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadlinesKey(queue), diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
cmpOpts := []cmp.Option{
|
||||||
|
h.SortZSetEntryOpt,
|
||||||
|
cmpopts.EquateApproxTime(5 * time.Second), // for LastFailedAt field
|
||||||
|
}
|
||||||
for queue, want := range tc.wantRetry {
|
for queue, want := range tc.wantRetry {
|
||||||
gotRetry := h.GetRetryEntries(t, r.client, queue)
|
gotRetry := h.GetRetryEntries(t, r.client, queue)
|
||||||
if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" {
|
if diff := cmp.Diff(want, gotRetry, cmpOpts...); diff != "" {
|
||||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryKey(queue), diff)
|
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryKey(queue), diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1368,7 +1372,7 @@ func TestArchive(t *testing.T) {
|
|||||||
}
|
}
|
||||||
for queue, want := range tc.wantArchived {
|
for queue, want := range tc.wantArchived {
|
||||||
gotArchived := h.GetArchivedEntries(t, r.client, queue)
|
gotArchived := h.GetArchivedEntries(t, r.client, queue)
|
||||||
if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" {
|
if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt, timeCmpOpt); diff != "" {
|
||||||
t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.ArchivedKey(queue), diff)
|
t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.ArchivedKey(queue), diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user