diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 300e402..24a264a 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -130,14 +130,14 @@ func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error { bytesToRemove, err := json.Marshal(msg) if err != nil { - return fmt.Errorf("could not marshal %+v to json: %v", msg, err) + return err } modified := *msg modified.Retried++ modified.ErrorMsg = errMsg bytesToAdd, err := json.Marshal(&modified) if err != nil { - return fmt.Errorf("could not marshal %+v to json: %v", modified, err) + return err } // KEYS[1] -> asynq:in_progress // KEYS[2] -> asynq:retry @@ -164,10 +164,9 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e processedKey := base.ProcessedKey(now) failureKey := base.FailureKey(now) expireAt := now.Add(statsTTL) - _, err = script.Run(r.client, + return script.Run(r.client, []string{base.InProgressQueue, base.RetryQueue, processedKey, failureKey}, - string(bytesToRemove), string(bytesToAdd), processAt.Unix(), expireAt.Unix()).Result() - return err + string(bytesToRemove), string(bytesToAdd), processAt.Unix(), expireAt.Unix()).Err() } const ( @@ -181,13 +180,13 @@ const ( func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { bytesToRemove, err := json.Marshal(msg) if err != nil { - return fmt.Errorf("could not marshal %+v to json: %v", msg, err) + return err } modified := *msg modified.ErrorMsg = errMsg bytesToAdd, err := json.Marshal(&modified) if err != nil { - return fmt.Errorf("could not marshal %+v to json: %v", modified, err) + return err } now := time.Now() limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago @@ -219,10 +218,9 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error { end return redis.status_reply("OK") `) - _, err = script.Run(r.client, + return script.Run(r.client, []string{base.InProgressQueue, base.DeadQueue, processedKey, failureKey}, - string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Result() - return err + string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Err() } // RestoreUnfinished moves all tasks from in-progress list to the queue @@ -270,6 +268,6 @@ func (r *RDB) forward(from string) error { return msgs `) now := float64(time.Now().Unix()) - _, err := script.Run(r.client, []string{from, base.DefaultQueue}, now).Result() - return err + return script.Run(r.client, + []string{from, base.DefaultQueue}, now).Err() } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 026dc93..dbf3ba4 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -285,8 +285,7 @@ func TestRetry(t *testing.T) { Score: int64(z.Score), }) } - cmpOpt := cmp.AllowUnexported(sortedSetEntry{}) - if diff := cmp.Diff(tc.wantRetry, gotRetry, cmpOpt, sortZSetEntryOpt); diff != "" { + if diff := cmp.Diff(tc.wantRetry, gotRetry, sortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff) } @@ -373,7 +372,7 @@ func TestKill(t *testing.T) { gotInProgressRaw := r.client.LRange(base.InProgressQueue, 0, -1).Val() gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw) if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.InProgressQueue, diff) + t.Errorf("mismatch found in %q: (-want, +got)\n%s", base.InProgressQueue, diff) } var gotDead []sortedSetEntry @@ -384,7 +383,6 @@ func TestKill(t *testing.T) { Score: int64(z.Score), }) } - if diff := cmp.Diff(tc.wantDead, gotDead, sortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadQueue, diff) } @@ -453,7 +451,6 @@ func TestRestoreUnfinished(t *testing.T) { seedDefaultQueue(t, r, tc.enqueued) got, err := r.RestoreUnfinished() - if got != tc.want || err != nil { t.Errorf("(*RDB).RestoreUnfinished() = %v %v, want %v nil", got, err, tc.want) continue @@ -462,12 +459,12 @@ func TestRestoreUnfinished(t *testing.T) { gotInProgressRaw := r.client.LRange(base.InProgressQueue, 0, -1).Val() gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw) if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q (-want, +got)\n%s", base.InProgressQueue, diff) + t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.InProgressQueue, diff) } gotEnqueuedRaw := r.client.LRange(base.DefaultQueue, 0, -1).Val() gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw) if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q (-want, +got)\n%s", base.DefaultQueue, diff) + t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.DefaultQueue, diff) } } }