mirror of
https://github.com/hibiken/asynq.git
synced 2025-04-22 16:50:18 +08:00
Fix merge conflicts
This commit is contained in:
parent
7ba05e6a78
commit
978c608124
@ -6,7 +6,6 @@
|
|||||||
package base
|
package base
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
@ -180,21 +179,10 @@ type TaskMessage struct {
|
|||||||
func EncodeMessage(msg *TaskMessage) ([]byte, error) {
|
func EncodeMessage(msg *TaskMessage) ([]byte, error) {
|
||||||
if msg == nil {
|
if msg == nil {
|
||||||
return nil, fmt.Errorf("cannot encode nil message")
|
return nil, fmt.Errorf("cannot encode nil message")
|
||||||
<<<<<<< HEAD
|
|
||||||
}
|
}
|
||||||
return proto.Marshal(&pb.TaskMessage{
|
return proto.Marshal(&pb.TaskMessage{
|
||||||
Type: msg.Type,
|
Type: msg.Type,
|
||||||
Payload: msg.Payload,
|
Payload: msg.Payload,
|
||||||
=======
|
|
||||||
}
|
|
||||||
payload, err := json.Marshal(msg.Payload)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return proto.Marshal(&pb.TaskMessage{
|
|
||||||
Type: msg.Type,
|
|
||||||
Payload: payload,
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
Id: msg.ID.String(),
|
Id: msg.ID.String(),
|
||||||
Queue: msg.Queue,
|
Queue: msg.Queue,
|
||||||
Retry: int32(msg.Retry),
|
Retry: int32(msg.Retry),
|
||||||
@ -210,22 +198,11 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) {
|
|||||||
func DecodeMessage(data []byte) (*TaskMessage, error) {
|
func DecodeMessage(data []byte) (*TaskMessage, error) {
|
||||||
var pbmsg pb.TaskMessage
|
var pbmsg pb.TaskMessage
|
||||||
if err := proto.Unmarshal(data, &pbmsg); err != nil {
|
if err := proto.Unmarshal(data, &pbmsg); err != nil {
|
||||||
<<<<<<< HEAD
|
|
||||||
=======
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
payload, err := decodePayload(pbmsg.GetPayload())
|
|
||||||
if err != nil {
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &TaskMessage{
|
return &TaskMessage{
|
||||||
Type: pbmsg.GetType(),
|
Type: pbmsg.GetType(),
|
||||||
<<<<<<< HEAD
|
|
||||||
Payload: pbmsg.GetPayload(),
|
Payload: pbmsg.GetPayload(),
|
||||||
=======
|
|
||||||
Payload: payload,
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
ID: uuid.MustParse(pbmsg.GetId()),
|
ID: uuid.MustParse(pbmsg.GetId()),
|
||||||
Queue: pbmsg.GetQueue(),
|
Queue: pbmsg.GetQueue(),
|
||||||
Retry: int(pbmsg.GetRetry()),
|
Retry: int(pbmsg.GetRetry()),
|
||||||
@ -403,11 +380,7 @@ type WorkerInfo struct {
|
|||||||
ServerID string
|
ServerID string
|
||||||
ID string
|
ID string
|
||||||
Type string
|
Type string
|
||||||
<<<<<<< HEAD
|
|
||||||
Payload []byte
|
Payload []byte
|
||||||
=======
|
|
||||||
Payload map[string]interface{}
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
Queue string
|
Queue string
|
||||||
Started time.Time
|
Started time.Time
|
||||||
Deadline time.Time
|
Deadline time.Time
|
||||||
@ -418,13 +391,6 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) {
|
|||||||
if info == nil {
|
if info == nil {
|
||||||
return nil, fmt.Errorf("cannot encode nil worker info")
|
return nil, fmt.Errorf("cannot encode nil worker info")
|
||||||
}
|
}
|
||||||
<<<<<<< HEAD
|
|
||||||
=======
|
|
||||||
payload, err := json.Marshal(info.Payload)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
startTime, err := ptypes.TimestampProto(info.Started)
|
startTime, err := ptypes.TimestampProto(info.Started)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -439,43 +405,19 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) {
|
|||||||
ServerId: info.ServerID,
|
ServerId: info.ServerID,
|
||||||
TaskId: info.ID,
|
TaskId: info.ID,
|
||||||
TaskType: info.Type,
|
TaskType: info.Type,
|
||||||
<<<<<<< HEAD
|
|
||||||
TaskPayload: info.Payload,
|
TaskPayload: info.Payload,
|
||||||
=======
|
|
||||||
TaskPayload: payload,
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
Queue: info.Queue,
|
Queue: info.Queue,
|
||||||
StartTime: startTime,
|
StartTime: startTime,
|
||||||
Deadline: deadline,
|
Deadline: deadline,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
<<<<<<< HEAD
|
|
||||||
=======
|
|
||||||
func decodePayload(b []byte) (map[string]interface{}, error) {
|
|
||||||
d := json.NewDecoder(bytes.NewReader(b))
|
|
||||||
d.UseNumber()
|
|
||||||
payload := make(map[string]interface{})
|
|
||||||
if err := d.Decode(&payload); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return payload, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
// DecodeWorkerInfo decodes the given bytes into WorkerInfo.
|
// DecodeWorkerInfo decodes the given bytes into WorkerInfo.
|
||||||
func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) {
|
func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) {
|
||||||
var pbmsg pb.WorkerInfo
|
var pbmsg pb.WorkerInfo
|
||||||
if err := proto.Unmarshal(b, &pbmsg); err != nil {
|
if err := proto.Unmarshal(b, &pbmsg); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
<<<<<<< HEAD
|
|
||||||
=======
|
|
||||||
payload, err := decodePayload(pbmsg.GetTaskPayload())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
startTime, err := ptypes.Timestamp(pbmsg.GetStartTime())
|
startTime, err := ptypes.Timestamp(pbmsg.GetStartTime())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -490,11 +432,7 @@ func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) {
|
|||||||
ServerID: pbmsg.GetServerId(),
|
ServerID: pbmsg.GetServerId(),
|
||||||
ID: pbmsg.GetTaskId(),
|
ID: pbmsg.GetTaskId(),
|
||||||
Type: pbmsg.GetTaskType(),
|
Type: pbmsg.GetTaskType(),
|
||||||
<<<<<<< HEAD
|
|
||||||
Payload: pbmsg.GetTaskPayload(),
|
Payload: pbmsg.GetTaskPayload(),
|
||||||
=======
|
|
||||||
Payload: payload,
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
Queue: pbmsg.GetQueue(),
|
Queue: pbmsg.GetQueue(),
|
||||||
Started: startTime,
|
Started: startTime,
|
||||||
Deadline: deadline,
|
Deadline: deadline,
|
||||||
@ -531,13 +469,6 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) {
|
|||||||
if entry == nil {
|
if entry == nil {
|
||||||
return nil, fmt.Errorf("cannot encode nil scheduler entry")
|
return nil, fmt.Errorf("cannot encode nil scheduler entry")
|
||||||
}
|
}
|
||||||
<<<<<<< HEAD
|
|
||||||
=======
|
|
||||||
payload, err := json.Marshal(entry.Payload)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
next, err := ptypes.TimestampProto(entry.Next)
|
next, err := ptypes.TimestampProto(entry.Next)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -550,11 +481,7 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) {
|
|||||||
Id: entry.ID,
|
Id: entry.ID,
|
||||||
Spec: entry.Spec,
|
Spec: entry.Spec,
|
||||||
TaskType: entry.Type,
|
TaskType: entry.Type,
|
||||||
<<<<<<< HEAD
|
|
||||||
TaskPayload: entry.Payload,
|
TaskPayload: entry.Payload,
|
||||||
=======
|
|
||||||
TaskPayload: payload,
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
EnqueueOptions: entry.Opts,
|
EnqueueOptions: entry.Opts,
|
||||||
NextEnqueueTime: next,
|
NextEnqueueTime: next,
|
||||||
PrevEnqueueTime: prev,
|
PrevEnqueueTime: prev,
|
||||||
@ -567,13 +494,6 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) {
|
|||||||
if err := proto.Unmarshal(b, &pbmsg); err != nil {
|
if err := proto.Unmarshal(b, &pbmsg); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
<<<<<<< HEAD
|
|
||||||
=======
|
|
||||||
payload, err := decodePayload(pbmsg.GetTaskPayload())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime())
|
next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -586,11 +506,7 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) {
|
|||||||
ID: pbmsg.GetId(),
|
ID: pbmsg.GetId(),
|
||||||
Spec: pbmsg.GetSpec(),
|
Spec: pbmsg.GetSpec(),
|
||||||
Type: pbmsg.GetTaskType(),
|
Type: pbmsg.GetTaskType(),
|
||||||
<<<<<<< HEAD
|
|
||||||
Payload: pbmsg.GetTaskPayload(),
|
Payload: pbmsg.GetTaskPayload(),
|
||||||
=======
|
|
||||||
Payload: payload,
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
Opts: pbmsg.GetEnqueueOptions(),
|
Opts: pbmsg.GetEnqueueOptions(),
|
||||||
Next: next,
|
Next: next,
|
||||||
Prev: prev,
|
Prev: prev,
|
||||||
|
@ -400,11 +400,7 @@ func TestServerInfoEncoding(t *testing.T) {
|
|||||||
Concurrency: 10,
|
Concurrency: 10,
|
||||||
Queues: map[string]int{"default": 1, "critical": 2},
|
Queues: map[string]int{"default": 1, "critical": 2},
|
||||||
StrictPriority: false,
|
StrictPriority: false,
|
||||||
<<<<<<< HEAD
|
|
||||||
Status: "active",
|
Status: "active",
|
||||||
=======
|
|
||||||
Status: "running",
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
Started: time.Now().Add(-3 * time.Hour),
|
Started: time.Now().Add(-3 * time.Hour),
|
||||||
ActiveWorkerCount: 8,
|
ActiveWorkerCount: 8,
|
||||||
},
|
},
|
||||||
@ -440,11 +436,7 @@ func TestWorkerInfoEncoding(t *testing.T) {
|
|||||||
ServerID: "abc123",
|
ServerID: "abc123",
|
||||||
ID: uuid.NewString(),
|
ID: uuid.NewString(),
|
||||||
Type: "taskA",
|
Type: "taskA",
|
||||||
<<<<<<< HEAD
|
|
||||||
Payload: toBytes(map[string]interface{}{"foo": "bar"}),
|
Payload: toBytes(map[string]interface{}{"foo": "bar"}),
|
||||||
=======
|
|
||||||
Payload: map[string]interface{}{"foo": "bar"},
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
Started: time.Now().Add(-3 * time.Hour),
|
Started: time.Now().Add(-3 * time.Hour),
|
||||||
Deadline: time.Now().Add(30 * time.Second),
|
Deadline: time.Now().Add(30 * time.Second),
|
||||||
@ -479,11 +471,7 @@ func TestSchedulerEntryEncoding(t *testing.T) {
|
|||||||
ID: uuid.NewString(),
|
ID: uuid.NewString(),
|
||||||
Spec: "* * * * *",
|
Spec: "* * * * *",
|
||||||
Type: "task_A",
|
Type: "task_A",
|
||||||
<<<<<<< HEAD
|
|
||||||
Payload: toBytes(map[string]interface{}{"foo": "bar"}),
|
Payload: toBytes(map[string]interface{}{"foo": "bar"}),
|
||||||
=======
|
|
||||||
Payload: map[string]interface{}{"foo": "bar"},
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
Opts: []string{"Queue('email')"},
|
Opts: []string{"Queue('email')"},
|
||||||
Next: time.Now().Add(30 * time.Second).UTC(),
|
Next: time.Now().Add(30 * time.Second).UTC(),
|
||||||
Prev: time.Now().Add(-2 * time.Minute).UTC(),
|
Prev: time.Now().Add(-2 * time.Minute).UTC(),
|
||||||
|
@ -488,27 +488,6 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]*base.TaskIn
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
<<<<<<< HEAD
|
|
||||||
var zs []base.Z
|
|
||||||
for i := 0; i < len(data); i += 2 {
|
|
||||||
s, err := cast.ToStringE(data[i])
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
<<<<<<< HEAD
|
|
||||||
}
|
|
||||||
score, err := cast.ToInt64E(data[i+1])
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
=======
|
|
||||||
}
|
|
||||||
score, err := cast.ToInt64E(data[i+1])
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
msg, err := base.DecodeMessage([]byte(s))
|
|
||||||
=======
|
|
||||||
var tasks []*base.TaskInfo
|
var tasks []*base.TaskInfo
|
||||||
for _, s := range data {
|
for _, s := range data {
|
||||||
vals, err := cast.ToSliceE(s)
|
vals, err := cast.ToSliceE(s)
|
||||||
@ -516,7 +495,6 @@ func (r *RDB) listZSetEntries(key, qname string, pgn Pagination) ([]*base.TaskIn
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
info, err := makeTaskInfo(vals)
|
info, err := makeTaskInfo(vals)
|
||||||
>>>>>>> 4c699a2... Update RDB.ListScheduled, ListRetry, and ListArchived to return list of
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue // bad data, ignore and continue
|
continue // bad data, ignore and continue
|
||||||
}
|
}
|
||||||
|
@ -101,11 +101,7 @@ func TestEnqueueUnique(t *testing.T) {
|
|||||||
m1 := base.TaskMessage{
|
m1 := base.TaskMessage{
|
||||||
ID: uuid.New(),
|
ID: uuid.New(),
|
||||||
Type: "email",
|
Type: "email",
|
||||||
<<<<<<< HEAD
|
|
||||||
Payload: h.JSON(map[string]interface{}{"user_id": json.Number("123")}),
|
Payload: h.JSON(map[string]interface{}{"user_id": json.Number("123")}),
|
||||||
=======
|
|
||||||
Payload: map[string]interface{}{"user_id": json.Number("123")},
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
Queue: base.DefaultQueueName,
|
Queue: base.DefaultQueueName,
|
||||||
UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.JSON(map[string]interface{}{"user_id": 123})),
|
UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", h.JSON(map[string]interface{}{"user_id": 123})),
|
||||||
}
|
}
|
||||||
@ -161,11 +157,7 @@ func TestDequeue(t *testing.T) {
|
|||||||
t1 := &base.TaskMessage{
|
t1 := &base.TaskMessage{
|
||||||
ID: uuid.New(),
|
ID: uuid.New(),
|
||||||
Type: "send_email",
|
Type: "send_email",
|
||||||
<<<<<<< HEAD
|
|
||||||
Payload: h.JSON(map[string]interface{}{"subject": "hello!"}),
|
Payload: h.JSON(map[string]interface{}{"subject": "hello!"}),
|
||||||
=======
|
|
||||||
Payload: map[string]interface{}{"subject": "hello!"},
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
Timeout: 1800,
|
Timeout: 1800,
|
||||||
Deadline: 0,
|
Deadline: 0,
|
||||||
@ -775,11 +767,7 @@ func TestRequeue(t *testing.T) {
|
|||||||
func TestSchedule(t *testing.T) {
|
func TestSchedule(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
<<<<<<< HEAD
|
|
||||||
msg := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"}))
|
msg := h.NewTaskMessage("send_email", h.JSON(map[string]interface{}{"subject": "hello"}))
|
||||||
=======
|
|
||||||
msg := h.NewTaskMessage("send_email", map[string]interface{}{"subject": "hello"})
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
msg *base.TaskMessage
|
msg *base.TaskMessage
|
||||||
processAt time.Time
|
processAt time.Time
|
||||||
@ -1487,11 +1475,7 @@ func TestWriteServerState(t *testing.T) {
|
|||||||
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
|
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
|
||||||
StrictPriority: false,
|
StrictPriority: false,
|
||||||
Started: time.Now().UTC(),
|
Started: time.Now().UTC(),
|
||||||
<<<<<<< HEAD
|
|
||||||
Status: "active",
|
Status: "active",
|
||||||
=======
|
|
||||||
Status: "running",
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
ActiveWorkerCount: 0,
|
ActiveWorkerCount: 0,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1581,11 +1565,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
|
|||||||
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
|
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
|
||||||
StrictPriority: false,
|
StrictPriority: false,
|
||||||
Started: time.Now().Add(-10 * time.Minute).UTC(),
|
Started: time.Now().Add(-10 * time.Minute).UTC(),
|
||||||
<<<<<<< HEAD
|
|
||||||
Status: "active",
|
Status: "active",
|
||||||
=======
|
|
||||||
Status: "running",
|
|
||||||
>>>>>>> 138bd7f... Refactor redis keys and store messages in protobuf
|
|
||||||
ActiveWorkerCount: len(workers),
|
ActiveWorkerCount: len(workers),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user