mirror of
https://github.com/hibiken/asynq.git
synced 2024-11-10 11:31:58 +08:00
Minor improvement
This commit is contained in:
parent
8830d23388
commit
ea28d3cac1
@ -233,8 +233,8 @@ func (r *RDB) ListDead() ([]*DeadTask, error) {
|
|||||||
// EnqueueDeadTask finds a task that matches the given id and score from dead queue
|
// EnqueueDeadTask finds a task that matches the given id and score from dead queue
|
||||||
// and enqueues it for processing. If a task that matches the id and score
|
// and enqueues it for processing. If a task that matches the id and score
|
||||||
// does not exist, it returns ErrTaskNotFound.
|
// does not exist, it returns ErrTaskNotFound.
|
||||||
func (r *RDB) EnqueueDeadTask(id string, score float64) error {
|
func (r *RDB) EnqueueDeadTask(id uuid.UUID, score int64) error {
|
||||||
n, err := r.removeAndEnqueue(deadQ, id, score)
|
n, err := r.removeAndEnqueue(deadQ, id.String(), float64(score))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -247,8 +247,8 @@ func (r *RDB) EnqueueDeadTask(id string, score float64) error {
|
|||||||
// EnqueueRetryTask finds a task that matches the given id and score from retry queue
|
// EnqueueRetryTask finds a task that matches the given id and score from retry queue
|
||||||
// and enqueues it for processing. If a task that matches the id and score
|
// and enqueues it for processing. If a task that matches the id and score
|
||||||
// does not exist, it returns ErrTaskNotFound.
|
// does not exist, it returns ErrTaskNotFound.
|
||||||
func (r *RDB) EnqueueRetryTask(id string, score float64) error {
|
func (r *RDB) EnqueueRetryTask(id uuid.UUID, score int64) error {
|
||||||
n, err := r.removeAndEnqueue(retryQ, id, score)
|
n, err := r.removeAndEnqueue(retryQ, id.String(), float64(score))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -261,8 +261,8 @@ func (r *RDB) EnqueueRetryTask(id string, score float64) error {
|
|||||||
// EnqueueScheduledTask finds a task that matches the given id and score from scheduled queue
|
// EnqueueScheduledTask finds a task that matches the given id and score from scheduled queue
|
||||||
// and enqueues it for processing. If a task that matches the id and score does not
|
// and enqueues it for processing. If a task that matches the id and score does not
|
||||||
// exist, it returns ErrTaskNotFound.
|
// exist, it returns ErrTaskNotFound.
|
||||||
func (r *RDB) EnqueueScheduledTask(id string, score float64) error {
|
func (r *RDB) EnqueueScheduledTask(id uuid.UUID, score int64) error {
|
||||||
n, err := r.removeAndEnqueue(scheduledQ, id, score)
|
n, err := r.removeAndEnqueue(scheduledQ, id.String(), float64(score))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -476,16 +476,16 @@ func TestEnqueueDeadTask(t *testing.T) {
|
|||||||
|
|
||||||
t1 := randomTask("send_email", "default", nil)
|
t1 := randomTask("send_email", "default", nil)
|
||||||
t2 := randomTask("gen_thumbnail", "default", nil)
|
t2 := randomTask("gen_thumbnail", "default", nil)
|
||||||
s1 := float64(time.Now().Add(-5 * time.Minute).Unix())
|
s1 := time.Now().Add(-5 * time.Minute).Unix()
|
||||||
s2 := float64(time.Now().Add(-time.Hour).Unix())
|
s2 := time.Now().Add(-time.Hour).Unix()
|
||||||
type deadEntry struct {
|
type deadEntry struct {
|
||||||
msg *TaskMessage
|
msg *TaskMessage
|
||||||
score float64
|
score int64
|
||||||
}
|
}
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
dead []deadEntry
|
dead []deadEntry
|
||||||
score float64
|
score int64
|
||||||
id string
|
id uuid.UUID
|
||||||
want error // expected return value from calling EnqueueDeadTask
|
want error // expected return value from calling EnqueueDeadTask
|
||||||
wantDead []*TaskMessage
|
wantDead []*TaskMessage
|
||||||
wantEnqueued []*TaskMessage
|
wantEnqueued []*TaskMessage
|
||||||
@ -496,7 +496,7 @@ func TestEnqueueDeadTask(t *testing.T) {
|
|||||||
{t2, s2},
|
{t2, s2},
|
||||||
},
|
},
|
||||||
score: s2,
|
score: s2,
|
||||||
id: t2.ID.String(),
|
id: t2.ID,
|
||||||
want: nil,
|
want: nil,
|
||||||
wantDead: []*TaskMessage{t1},
|
wantDead: []*TaskMessage{t1},
|
||||||
wantEnqueued: []*TaskMessage{t2},
|
wantEnqueued: []*TaskMessage{t2},
|
||||||
@ -506,8 +506,8 @@ func TestEnqueueDeadTask(t *testing.T) {
|
|||||||
{t1, s1},
|
{t1, s1},
|
||||||
{t2, s2},
|
{t2, s2},
|
||||||
},
|
},
|
||||||
score: 123.0,
|
score: 123,
|
||||||
id: t2.ID.String(),
|
id: t2.ID,
|
||||||
want: ErrTaskNotFound,
|
want: ErrTaskNotFound,
|
||||||
wantDead: []*TaskMessage{t1, t2},
|
wantDead: []*TaskMessage{t1, t2},
|
||||||
wantEnqueued: []*TaskMessage{},
|
wantEnqueued: []*TaskMessage{},
|
||||||
@ -521,7 +521,7 @@ func TestEnqueueDeadTask(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// initialize dead queue
|
// initialize dead queue
|
||||||
for _, d := range tc.dead {
|
for _, d := range tc.dead {
|
||||||
err := r.client.ZAdd(deadQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: d.score}).Err()
|
err := r.client.ZAdd(deadQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: float64(d.score)}).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -529,7 +529,7 @@ func TestEnqueueDeadTask(t *testing.T) {
|
|||||||
|
|
||||||
got := r.EnqueueDeadTask(tc.id, tc.score)
|
got := r.EnqueueDeadTask(tc.id, tc.score)
|
||||||
if got != tc.want {
|
if got != tc.want {
|
||||||
t.Errorf("r.EnqueueDeadTask(%s, %0.f) = %v, want %v", tc.id, tc.score, got, tc.want)
|
t.Errorf("r.EnqueueDeadTask(%s, %d) = %v, want %v", tc.id, tc.score, got, tc.want)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -552,16 +552,16 @@ func TestEnqueueRetryTask(t *testing.T) {
|
|||||||
|
|
||||||
t1 := randomTask("send_email", "default", nil)
|
t1 := randomTask("send_email", "default", nil)
|
||||||
t2 := randomTask("gen_thumbnail", "default", nil)
|
t2 := randomTask("gen_thumbnail", "default", nil)
|
||||||
s1 := float64(time.Now().Add(-5 * time.Minute).Unix())
|
s1 := time.Now().Add(-5 * time.Minute).Unix()
|
||||||
s2 := float64(time.Now().Add(-time.Hour).Unix())
|
s2 := time.Now().Add(-time.Hour).Unix()
|
||||||
type retryEntry struct {
|
type retryEntry struct {
|
||||||
msg *TaskMessage
|
msg *TaskMessage
|
||||||
score float64
|
score int64
|
||||||
}
|
}
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
dead []retryEntry
|
dead []retryEntry
|
||||||
score float64
|
score int64
|
||||||
id string
|
id uuid.UUID
|
||||||
want error // expected return value from calling EnqueueRetryTask
|
want error // expected return value from calling EnqueueRetryTask
|
||||||
wantRetry []*TaskMessage
|
wantRetry []*TaskMessage
|
||||||
wantEnqueued []*TaskMessage
|
wantEnqueued []*TaskMessage
|
||||||
@ -572,7 +572,7 @@ func TestEnqueueRetryTask(t *testing.T) {
|
|||||||
{t2, s2},
|
{t2, s2},
|
||||||
},
|
},
|
||||||
score: s2,
|
score: s2,
|
||||||
id: t2.ID.String(),
|
id: t2.ID,
|
||||||
want: nil,
|
want: nil,
|
||||||
wantRetry: []*TaskMessage{t1},
|
wantRetry: []*TaskMessage{t1},
|
||||||
wantEnqueued: []*TaskMessage{t2},
|
wantEnqueued: []*TaskMessage{t2},
|
||||||
@ -582,8 +582,8 @@ func TestEnqueueRetryTask(t *testing.T) {
|
|||||||
{t1, s1},
|
{t1, s1},
|
||||||
{t2, s2},
|
{t2, s2},
|
||||||
},
|
},
|
||||||
score: 123.0,
|
score: 123,
|
||||||
id: t2.ID.String(),
|
id: t2.ID,
|
||||||
want: ErrTaskNotFound,
|
want: ErrTaskNotFound,
|
||||||
wantRetry: []*TaskMessage{t1, t2},
|
wantRetry: []*TaskMessage{t1, t2},
|
||||||
wantEnqueued: []*TaskMessage{},
|
wantEnqueued: []*TaskMessage{},
|
||||||
@ -597,7 +597,7 @@ func TestEnqueueRetryTask(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// initialize retry queue
|
// initialize retry queue
|
||||||
for _, d := range tc.dead {
|
for _, d := range tc.dead {
|
||||||
err := r.client.ZAdd(retryQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: d.score}).Err()
|
err := r.client.ZAdd(retryQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: float64(d.score)}).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -605,7 +605,7 @@ func TestEnqueueRetryTask(t *testing.T) {
|
|||||||
|
|
||||||
got := r.EnqueueRetryTask(tc.id, tc.score)
|
got := r.EnqueueRetryTask(tc.id, tc.score)
|
||||||
if got != tc.want {
|
if got != tc.want {
|
||||||
t.Errorf("r.EnqueueRetryTask(%s, %0.f) = %v, want %v", tc.id, tc.score, got, tc.want)
|
t.Errorf("r.EnqueueRetryTask(%s, %d) = %v, want %v", tc.id, tc.score, got, tc.want)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -628,16 +628,16 @@ func TestEnqueueScheduledTask(t *testing.T) {
|
|||||||
|
|
||||||
t1 := randomTask("send_email", "default", nil)
|
t1 := randomTask("send_email", "default", nil)
|
||||||
t2 := randomTask("gen_thumbnail", "default", nil)
|
t2 := randomTask("gen_thumbnail", "default", nil)
|
||||||
s1 := float64(time.Now().Add(-5 * time.Minute).Unix())
|
s1 := time.Now().Add(-5 * time.Minute).Unix()
|
||||||
s2 := float64(time.Now().Add(-time.Hour).Unix())
|
s2 := time.Now().Add(-time.Hour).Unix()
|
||||||
type scheduledEntry struct {
|
type scheduledEntry struct {
|
||||||
msg *TaskMessage
|
msg *TaskMessage
|
||||||
score float64
|
score int64
|
||||||
}
|
}
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
dead []scheduledEntry
|
dead []scheduledEntry
|
||||||
score float64
|
score int64
|
||||||
id string
|
id uuid.UUID
|
||||||
want error // expected return value from calling EnqueueScheduledTask
|
want error // expected return value from calling EnqueueScheduledTask
|
||||||
wantScheduled []*TaskMessage
|
wantScheduled []*TaskMessage
|
||||||
wantEnqueued []*TaskMessage
|
wantEnqueued []*TaskMessage
|
||||||
@ -648,7 +648,7 @@ func TestEnqueueScheduledTask(t *testing.T) {
|
|||||||
{t2, s2},
|
{t2, s2},
|
||||||
},
|
},
|
||||||
score: s2,
|
score: s2,
|
||||||
id: t2.ID.String(),
|
id: t2.ID,
|
||||||
want: nil,
|
want: nil,
|
||||||
wantScheduled: []*TaskMessage{t1},
|
wantScheduled: []*TaskMessage{t1},
|
||||||
wantEnqueued: []*TaskMessage{t2},
|
wantEnqueued: []*TaskMessage{t2},
|
||||||
@ -658,8 +658,8 @@ func TestEnqueueScheduledTask(t *testing.T) {
|
|||||||
{t1, s1},
|
{t1, s1},
|
||||||
{t2, s2},
|
{t2, s2},
|
||||||
},
|
},
|
||||||
score: 123.0,
|
score: 123,
|
||||||
id: t2.ID.String(),
|
id: t2.ID,
|
||||||
want: ErrTaskNotFound,
|
want: ErrTaskNotFound,
|
||||||
wantScheduled: []*TaskMessage{t1, t2},
|
wantScheduled: []*TaskMessage{t1, t2},
|
||||||
wantEnqueued: []*TaskMessage{},
|
wantEnqueued: []*TaskMessage{},
|
||||||
@ -673,7 +673,7 @@ func TestEnqueueScheduledTask(t *testing.T) {
|
|||||||
}
|
}
|
||||||
// initialize scheduled queue
|
// initialize scheduled queue
|
||||||
for _, d := range tc.dead {
|
for _, d := range tc.dead {
|
||||||
err := r.client.ZAdd(scheduledQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: d.score}).Err()
|
err := r.client.ZAdd(scheduledQ, &redis.Z{Member: mustMarshal(t, d.msg), Score: float64(d.score)}).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -681,7 +681,7 @@ func TestEnqueueScheduledTask(t *testing.T) {
|
|||||||
|
|
||||||
got := r.EnqueueScheduledTask(tc.id, tc.score)
|
got := r.EnqueueScheduledTask(tc.id, tc.score)
|
||||||
if got != tc.want {
|
if got != tc.want {
|
||||||
t.Errorf("r.EnqueueRetryTask(%s, %0.f) = %v, want %v", tc.id, tc.score, got, tc.want)
|
t.Errorf("r.EnqueueRetryTask(%s, %d) = %v, want %v", tc.id, tc.score, got, tc.want)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,11 +53,11 @@ func enq(cmd *cobra.Command, args []string) {
|
|||||||
}))
|
}))
|
||||||
switch qtype {
|
switch qtype {
|
||||||
case "s":
|
case "s":
|
||||||
err = r.EnqueueScheduledTask(id.String(), float64(score))
|
err = r.EnqueueScheduledTask(id, score)
|
||||||
case "r":
|
case "r":
|
||||||
err = r.EnqueueRetryTask(id.String(), float64(score))
|
err = r.EnqueueRetryTask(id, score)
|
||||||
case "d":
|
case "d":
|
||||||
err = r.EnqueueDeadTask(id.String(), float64(score))
|
err = r.EnqueueDeadTask(id, score)
|
||||||
default:
|
default:
|
||||||
fmt.Println("invalid argument")
|
fmt.Println("invalid argument")
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
@ -81,7 +81,7 @@ func queryID(id uuid.UUID, score int64, qtype string) string {
|
|||||||
// parseQueryID is a reverse operation of queryID function.
|
// parseQueryID is a reverse operation of queryID function.
|
||||||
// It takes a queryID and return each part of id with proper
|
// It takes a queryID and return each part of id with proper
|
||||||
// type if valid, otherwise it reports an error.
|
// type if valid, otherwise it reports an error.
|
||||||
func parseQueryID(queryID string) (id uuid.UUID, score float64, qtype string, err error) {
|
func parseQueryID(queryID string) (id uuid.UUID, score int64, qtype string, err error) {
|
||||||
parts := strings.Split(queryID, ":")
|
parts := strings.Split(queryID, ":")
|
||||||
if len(parts) != 3 {
|
if len(parts) != 3 {
|
||||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||||
@ -90,7 +90,7 @@ func parseQueryID(queryID string) (id uuid.UUID, score float64, qtype string, er
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||||
}
|
}
|
||||||
score, err = strconv.ParseFloat(parts[1], 64)
|
score, err = strconv.ParseInt(parts[1], 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
return uuid.Nil, 0, "", fmt.Errorf("invalid id")
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user