mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 23:32:17 +08:00
Clean up rdb package
This commit is contained in:
parent
4ceb49cfd1
commit
49d6ab5df0
@ -130,14 +130,14 @@ func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error {
|
|||||||
func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error {
|
func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) error {
|
||||||
bytesToRemove, err := json.Marshal(msg)
|
bytesToRemove, err := json.Marshal(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
return err
|
||||||
}
|
}
|
||||||
modified := *msg
|
modified := *msg
|
||||||
modified.Retried++
|
modified.Retried++
|
||||||
modified.ErrorMsg = errMsg
|
modified.ErrorMsg = errMsg
|
||||||
bytesToAdd, err := json.Marshal(&modified)
|
bytesToAdd, err := json.Marshal(&modified)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not marshal %+v to json: %v", modified, err)
|
return err
|
||||||
}
|
}
|
||||||
// KEYS[1] -> asynq:in_progress
|
// KEYS[1] -> asynq:in_progress
|
||||||
// KEYS[2] -> asynq:retry
|
// KEYS[2] -> asynq:retry
|
||||||
@ -164,10 +164,9 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string) e
|
|||||||
processedKey := base.ProcessedKey(now)
|
processedKey := base.ProcessedKey(now)
|
||||||
failureKey := base.FailureKey(now)
|
failureKey := base.FailureKey(now)
|
||||||
expireAt := now.Add(statsTTL)
|
expireAt := now.Add(statsTTL)
|
||||||
_, err = script.Run(r.client,
|
return script.Run(r.client,
|
||||||
[]string{base.InProgressQueue, base.RetryQueue, processedKey, failureKey},
|
[]string{base.InProgressQueue, base.RetryQueue, processedKey, failureKey},
|
||||||
string(bytesToRemove), string(bytesToAdd), processAt.Unix(), expireAt.Unix()).Result()
|
string(bytesToRemove), string(bytesToAdd), processAt.Unix(), expireAt.Unix()).Err()
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -181,13 +180,13 @@ const (
|
|||||||
func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
|
func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
|
||||||
bytesToRemove, err := json.Marshal(msg)
|
bytesToRemove, err := json.Marshal(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not marshal %+v to json: %v", msg, err)
|
return err
|
||||||
}
|
}
|
||||||
modified := *msg
|
modified := *msg
|
||||||
modified.ErrorMsg = errMsg
|
modified.ErrorMsg = errMsg
|
||||||
bytesToAdd, err := json.Marshal(&modified)
|
bytesToAdd, err := json.Marshal(&modified)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not marshal %+v to json: %v", modified, err)
|
return err
|
||||||
}
|
}
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
limit := now.AddDate(0, 0, -deadExpirationInDays).Unix() // 90 days ago
|
||||||
@ -219,10 +218,9 @@ func (r *RDB) Kill(msg *base.TaskMessage, errMsg string) error {
|
|||||||
end
|
end
|
||||||
return redis.status_reply("OK")
|
return redis.status_reply("OK")
|
||||||
`)
|
`)
|
||||||
_, err = script.Run(r.client,
|
return script.Run(r.client,
|
||||||
[]string{base.InProgressQueue, base.DeadQueue, processedKey, failureKey},
|
[]string{base.InProgressQueue, base.DeadQueue, processedKey, failureKey},
|
||||||
string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Result()
|
string(bytesToRemove), string(bytesToAdd), now.Unix(), limit, maxDeadTasks, expireAt.Unix()).Err()
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RestoreUnfinished moves all tasks from in-progress list to the queue
|
// RestoreUnfinished moves all tasks from in-progress list to the queue
|
||||||
@ -270,6 +268,6 @@ func (r *RDB) forward(from string) error {
|
|||||||
return msgs
|
return msgs
|
||||||
`)
|
`)
|
||||||
now := float64(time.Now().Unix())
|
now := float64(time.Now().Unix())
|
||||||
_, err := script.Run(r.client, []string{from, base.DefaultQueue}, now).Result()
|
return script.Run(r.client,
|
||||||
return err
|
[]string{from, base.DefaultQueue}, now).Err()
|
||||||
}
|
}
|
||||||
|
@ -285,8 +285,7 @@ func TestRetry(t *testing.T) {
|
|||||||
Score: int64(z.Score),
|
Score: int64(z.Score),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
cmpOpt := cmp.AllowUnexported(sortedSetEntry{})
|
if diff := cmp.Diff(tc.wantRetry, gotRetry, sortZSetEntryOpt); diff != "" {
|
||||||
if diff := cmp.Diff(tc.wantRetry, gotRetry, cmpOpt, sortZSetEntryOpt); diff != "" {
|
|
||||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff)
|
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.RetryQueue, diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -373,7 +372,7 @@ func TestKill(t *testing.T) {
|
|||||||
gotInProgressRaw := r.client.LRange(base.InProgressQueue, 0, -1).Val()
|
gotInProgressRaw := r.client.LRange(base.InProgressQueue, 0, -1).Val()
|
||||||
gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw)
|
gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw)
|
||||||
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" {
|
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" {
|
||||||
t.Errorf("mismatch found in %q; (-want, +got)\n%s", base.InProgressQueue, diff)
|
t.Errorf("mismatch found in %q: (-want, +got)\n%s", base.InProgressQueue, diff)
|
||||||
}
|
}
|
||||||
|
|
||||||
var gotDead []sortedSetEntry
|
var gotDead []sortedSetEntry
|
||||||
@ -384,7 +383,6 @@ func TestKill(t *testing.T) {
|
|||||||
Score: int64(z.Score),
|
Score: int64(z.Score),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
if diff := cmp.Diff(tc.wantDead, gotDead, sortZSetEntryOpt); diff != "" {
|
if diff := cmp.Diff(tc.wantDead, gotDead, sortZSetEntryOpt); diff != "" {
|
||||||
t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadQueue, diff)
|
t.Errorf("mismatch found in %q after calling (*RDB).Kill: (-want, +got):\n%s", base.DeadQueue, diff)
|
||||||
}
|
}
|
||||||
@ -453,7 +451,6 @@ func TestRestoreUnfinished(t *testing.T) {
|
|||||||
seedDefaultQueue(t, r, tc.enqueued)
|
seedDefaultQueue(t, r, tc.enqueued)
|
||||||
|
|
||||||
got, err := r.RestoreUnfinished()
|
got, err := r.RestoreUnfinished()
|
||||||
|
|
||||||
if got != tc.want || err != nil {
|
if got != tc.want || err != nil {
|
||||||
t.Errorf("(*RDB).RestoreUnfinished() = %v %v, want %v nil", got, err, tc.want)
|
t.Errorf("(*RDB).RestoreUnfinished() = %v %v, want %v nil", got, err, tc.want)
|
||||||
continue
|
continue
|
||||||
@ -462,12 +459,12 @@ func TestRestoreUnfinished(t *testing.T) {
|
|||||||
gotInProgressRaw := r.client.LRange(base.InProgressQueue, 0, -1).Val()
|
gotInProgressRaw := r.client.LRange(base.InProgressQueue, 0, -1).Val()
|
||||||
gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw)
|
gotInProgress := mustUnmarshalSlice(t, gotInProgressRaw)
|
||||||
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" {
|
if diff := cmp.Diff(tc.wantInProgress, gotInProgress, sortMsgOpt); diff != "" {
|
||||||
t.Errorf("mismatch found in %q (-want, +got)\n%s", base.InProgressQueue, diff)
|
t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.InProgressQueue, diff)
|
||||||
}
|
}
|
||||||
gotEnqueuedRaw := r.client.LRange(base.DefaultQueue, 0, -1).Val()
|
gotEnqueuedRaw := r.client.LRange(base.DefaultQueue, 0, -1).Val()
|
||||||
gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw)
|
gotEnqueued := mustUnmarshalSlice(t, gotEnqueuedRaw)
|
||||||
if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" {
|
if diff := cmp.Diff(tc.wantEnqueued, gotEnqueued, sortMsgOpt); diff != "" {
|
||||||
t.Errorf("mismatch found in %q (-want, +got)\n%s", base.DefaultQueue, diff)
|
t.Errorf("mismatch found in %q: (-want, +got):\n%s", base.DefaultQueue, diff)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user