mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 07:12:17 +08:00
Rename (*rdb).zadd to (*rdb).schedule
This commit is contained in:
parent
fab2dcb56e
commit
47e2a57d05
10
client.go
10
client.go
@ -17,7 +17,7 @@ func NewClient(opt *RedisOpt) *Client {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Process enqueues the task to be performed at a given time.
|
// Process enqueues the task to be performed at a given time.
|
||||||
func (c *Client) Process(task *Task, executeAt time.Time) error {
|
func (c *Client) Process(task *Task, processAt time.Time) error {
|
||||||
msg := &taskMessage{
|
msg := &taskMessage{
|
||||||
ID: uuid.New(),
|
ID: uuid.New(),
|
||||||
Type: task.Type,
|
Type: task.Type,
|
||||||
@ -25,13 +25,13 @@ func (c *Client) Process(task *Task, executeAt time.Time) error {
|
|||||||
Queue: "default",
|
Queue: "default",
|
||||||
Retry: defaultMaxRetry,
|
Retry: defaultMaxRetry,
|
||||||
}
|
}
|
||||||
return c.enqueue(msg, executeAt)
|
return c.enqueue(msg, processAt)
|
||||||
}
|
}
|
||||||
|
|
||||||
// enqueue pushes a given task to the specified queue.
|
// enqueue pushes a given task to the specified queue.
|
||||||
func (c *Client) enqueue(msg *taskMessage, executeAt time.Time) error {
|
func (c *Client) enqueue(msg *taskMessage, processAt time.Time) error {
|
||||||
if time.Now().After(executeAt) {
|
if time.Now().After(processAt) {
|
||||||
return c.rdb.enqueue(msg)
|
return c.rdb.enqueue(msg)
|
||||||
}
|
}
|
||||||
return c.rdb.zadd(scheduled, float64(executeAt.Unix()), msg)
|
return c.rdb.schedule(scheduled, processAt, msg)
|
||||||
}
|
}
|
||||||
|
9
rdb.go
9
rdb.go
@ -93,16 +93,17 @@ func (r *rdb) lrem(key string, msg *taskMessage) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// zadd adds the taskMessage to the specified zset (sorted set) with the given score.
|
// schedule adds the task to the zset to be processd at the specified time.
|
||||||
func (r *rdb) zadd(zset string, zscore float64, msg *taskMessage) error {
|
func (r *rdb) schedule(zset string, processAt time.Time, msg *taskMessage) error {
|
||||||
bytes, err := json.Marshal(msg)
|
bytes, err := json.Marshal(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not encode task into JSON: %v", err)
|
return fmt.Errorf("could not encode task into JSON: %v", err)
|
||||||
}
|
}
|
||||||
err = r.client.ZAdd(zset, &redis.Z{Member: string(bytes), Score: zscore}).Err()
|
score := float64(processAt.Unix())
|
||||||
|
err = r.client.ZAdd(zset, &redis.Z{Member: string(bytes), Score: score}).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("command ZADD %s %.1f %s failed: %v",
|
return fmt.Errorf("command ZADD %s %.1f %s failed: %v",
|
||||||
zset, zscore, string(bytes), err)
|
zset, score, string(bytes), err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
46
rdb_test.go
46
rdb_test.go
@ -2,6 +2,7 @@ package asynq
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
@ -265,3 +266,48 @@ func TestForward(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSchedule(t *testing.T) {
|
||||||
|
r := setup(t)
|
||||||
|
tests := []struct {
|
||||||
|
msg *taskMessage
|
||||||
|
processAt time.Time
|
||||||
|
zset string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
randomTask("send_email", "default", map[string]interface{}{"subject": "hello"}),
|
||||||
|
time.Now().Add(15 * time.Minute),
|
||||||
|
scheduled,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range tests {
|
||||||
|
// clean up db before each test case.
|
||||||
|
if err := r.client.FlushDB().Err(); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.schedule(tc.zset, tc.processAt, tc.msg)
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := r.client.ZRangeWithScores(tc.zset, 0, -1).Result()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
desc := fmt.Sprintf("(*rdb).schedule(%q, %v, %v)", tc.zset, tc.processAt, tc.msg)
|
||||||
|
if len(res) != 1 {
|
||||||
|
t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(res), tc.zset)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if res[0].Score != float64(tc.processAt.Unix()) {
|
||||||
|
t.Errorf("%s inserted an item with score %f, want %f", desc, res[0].Score, float64(tc.processAt.Unix()))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
2
retry.go
2
retry.go
@ -20,7 +20,7 @@ func retryTask(rdb *rdb, msg *taskMessage, err error) {
|
|||||||
fmt.Printf("[DEBUG] Retrying the task in %v\n", retryAt.Sub(time.Now()))
|
fmt.Printf("[DEBUG] Retrying the task in %v\n", retryAt.Sub(time.Now()))
|
||||||
msg.Retried++
|
msg.Retried++
|
||||||
msg.ErrorMsg = err.Error()
|
msg.ErrorMsg = err.Error()
|
||||||
if err := rdb.zadd(retry, float64(retryAt.Unix()), msg); err != nil {
|
if err := rdb.schedule(retry, retryAt, msg); err != nil {
|
||||||
// TODO(hibiken): Not sure how to handle this error
|
// TODO(hibiken): Not sure how to handle this error
|
||||||
log.Printf("[ERROR] could not add msg %+v to 'retry' set: %v\n", msg, err)
|
log.Printf("[ERROR] could not add msg %+v to 'retry' set: %v\n", msg, err)
|
||||||
return
|
return
|
||||||
|
Loading…
Reference in New Issue
Block a user