From 87264b66f3c1219c578e9855b06159e87eba5b58 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Tue, 18 May 2021 21:00:53 -0700 Subject: [PATCH] Record last_failed_at time on Retry or Archive event --- internal/asynqtest/asynqtest.go | 3 +++ internal/rdb/rdb.go | 6 ++++-- internal/rdb/rdb_test.go | 8 ++++++-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 50e73ef..9dab962 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -10,6 +10,7 @@ import ( "math" "sort" "testing" + "time" "github.com/go-redis/redis/v7" "github.com/google/go-cmp/cmp" @@ -126,12 +127,14 @@ func JSON(kv map[string]interface{}) []byte { func TaskMessageAfterRetry(t base.TaskMessage, errMsg string) *base.TaskMessage { t.Retried = t.Retried + 1 t.ErrorMsg = errMsg + t.LastFailedAt = time.Now().Unix() // use EquateApproxTime with cmp.Diff return &t } // TaskMessageWithError returns an updated copy of t with the given error message. func TaskMessageWithError(t base.TaskMessage, errMsg string) *base.TaskMessage { t.ErrorMsg = errMsg + t.LastFailedAt = time.Now().Unix() // use EquateApproxTime with cmp.Diff return &t } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 21fdfc3..dc94194 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -483,14 +483,15 @@ return redis.status_reply("OK")`) // and assigning error message to the task message. func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error { var op errors.Op = "rdb.Retry" + now := time.Now() modified := *msg modified.Retried++ modified.ErrorMsg = errMsg + modified.LastFailedAt = now.Unix() encoded, err := base.EncodeMessage(&modified) if err != nil { return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode message: %v", err)) } - now := time.Now() expireAt := now.Add(statsTTL) keys := []string{ base.TaskKey(msg.Queue, msg.ID.String()), @@ -551,13 +552,14 @@ return redis.status_reply("OK")`) // It also trims the archive by timestamp and set size. func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error { var op errors.Op = "rdb.Archive" + now := time.Now() modified := *msg modified.ErrorMsg = errMsg + modified.LastFailedAt = now.Unix() encoded, err := base.EncodeMessage(&modified) if err != nil { return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode message: %v", err)) } - now := time.Now() cutoff := now.AddDate(0, 0, -archivedExpirationInDays) expireAt := now.Add(statsTTL) keys := []string{ diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 95d9e39..7936bf5 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -1163,9 +1163,13 @@ func TestRetry(t *testing.T) { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.DeadlinesKey(queue), diff) } } + cmpOpts := []cmp.Option{ + h.SortZSetEntryOpt, + cmpopts.EquateApproxTime(5 * time.Second), // for LastFailedAt field + } for queue, want := range tc.wantRetry { gotRetry := h.GetRetryEntries(t, r.client, queue) - if diff := cmp.Diff(want, gotRetry, h.SortZSetEntryOpt); diff != "" { + if diff := cmp.Diff(want, gotRetry, cmpOpts...); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryKey(queue), diff) } } @@ -1368,7 +1372,7 @@ func TestArchive(t *testing.T) { } for queue, want := range tc.wantArchived { gotArchived := h.GetArchivedEntries(t, r.client, queue) - if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt); diff != "" { + if diff := cmp.Diff(want, gotArchived, h.SortZSetEntryOpt, zScoreCmpOpt, timeCmpOpt); diff != "" { t.Errorf("mismatch found in %q after calling (*RDB).Archive: (-want, +got):\n%s", base.ArchivedKey(queue), diff) } }