diff --git a/client.go b/client.go index ac6afd8..72d8438 100644 --- a/client.go +++ b/client.go @@ -76,11 +76,11 @@ const ( func (c *Client) Process(task *Task, processAt time.Time, opts ...Option) error { opt := composeOptions(opts...) msg := &base.TaskMessage{ - ID: xid.New(), - Type: task.Type, - Payload: task.Payload, - Queue: "default", - Retry: opt.retry, + ID: xid.New(), + Type: task.Type, + Payload: task.Payload, + Priority: base.PriorityDefault, + Retry: opt.retry, } return c.enqueue(msg, processAt) } diff --git a/client_test.go b/client_test.go index 0f6b04d..c12d98b 100644 --- a/client_test.go +++ b/client_test.go @@ -30,10 +30,10 @@ func TestClient(t *testing.T) { opts: []Option{}, wantEnqueued: []*base.TaskMessage{ &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload, - Retry: defaultMaxRetry, - Queue: "default", + Type: task.Type, + Payload: task.Payload, + Retry: defaultMaxRetry, + Priority: base.PriorityDefault, }, }, wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil @@ -47,10 +47,10 @@ func TestClient(t *testing.T) { wantScheduled: []h.ZSetEntry{ { Msg: &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload, - Retry: defaultMaxRetry, - Queue: "default", + Type: task.Type, + Payload: task.Payload, + Retry: defaultMaxRetry, + Priority: base.PriorityDefault, }, Score: time.Now().Add(2 * time.Hour).Unix(), }, @@ -65,10 +65,10 @@ func TestClient(t *testing.T) { }, wantEnqueued: []*base.TaskMessage{ &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload, - Retry: 3, - Queue: "default", + Type: task.Type, + Payload: task.Payload, + Retry: 3, + Priority: base.PriorityDefault, }, }, wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil @@ -82,10 +82,10 @@ func TestClient(t *testing.T) { }, wantEnqueued: []*base.TaskMessage{ &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload, - Retry: 0, // Retry count should be set to zero - Queue: "default", + Type: task.Type, + Payload: task.Payload, + Retry: 0, // Retry count should be set to zero + Priority: base.PriorityDefault, }, }, wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil @@ -100,10 +100,10 @@ func TestClient(t *testing.T) { }, wantEnqueued: []*base.TaskMessage{ &base.TaskMessage{ - Type: task.Type, - Payload: task.Payload, - Retry: 10, // Last option takes precedence - Queue: "default", + Type: task.Type, + Payload: task.Payload, + Retry: 10, // Last option takes precedence + Priority: base.PriorityDefault, }, }, wantScheduled: nil, // db is flushed in setup so zset does not exist hence nil diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index f3791d2..4ddb19b 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -43,11 +43,11 @@ var IgnoreIDOpt = cmpopts.IgnoreFields(base.TaskMessage{}, "ID") // NewTaskMessage returns a new instance of TaskMessage given a task type and payload. func NewTaskMessage(taskType string, payload map[string]interface{}) *base.TaskMessage { return &base.TaskMessage{ - ID: xid.New(), - Type: taskType, - Queue: "default", - Retry: 25, - Payload: payload, + ID: xid.New(), + Type: taskType, + Priority: base.PriorityDefault, + Retry: 25, + Payload: payload, } } diff --git a/internal/base/base.go b/internal/base/base.go index 41edc98..3b38fba 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -62,8 +62,8 @@ type TaskMessage struct { // ID is a unique identifier for each task. ID xid.ID - // Queue is a name this message should be enqueued to. - Queue string + // Priority is the priority of this task. + Priority Priority // Retry is the max number of retry for this task. Retry int diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index cd6b0a7..ac9adcf 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -42,7 +42,7 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { if err != nil { return err } - qname := base.QueuePrefix + msg.Queue + qname := base.QueueKey(msg.Priority) return r.client.LPush(qname, string(bytes)).Err() }