From 2261c7c9a0a969a788447a305707f7183d8225a7 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Fri, 10 Sep 2021 06:29:37 -0700 Subject: [PATCH] Change TaskMessage.ID type from uuid.UUID to string --- asynq.go | 2 +- client.go | 2 +- heartbeat.go | 4 +- inspector_test.go | 12 ++--- internal/asynqtest/asynqtest.go | 18 +++---- internal/base/base.go | 7 ++- internal/base/base_test.go | 2 +- internal/context/context_test.go | 14 +++--- internal/rdb/inspect.go | 35 +++++++------ internal/rdb/inspect_test.go | 68 +++++++++++++------------- internal/rdb/rdb.go | 32 ++++++------ internal/rdb/rdb_test.go | 84 ++++++++++++++++---------------- processor.go | 4 +- 13 files changed, 141 insertions(+), 143 deletions(-) diff --git a/asynq.go b/asynq.go index 6b5bb55..b3231ab 100644 --- a/asynq.go +++ b/asynq.go @@ -85,7 +85,7 @@ type TaskInfo struct { func newTaskInfo(msg *base.TaskMessage, state base.TaskState, nextProcessAt time.Time) *TaskInfo { info := TaskInfo{ - ID: msg.ID.String(), + ID: msg.ID, Queue: msg.Queue, Type: msg.Type, Payload: msg.Payload, // Do we need to make a copy? diff --git a/client.go b/client.go index 624f146..d14c17e 100644 --- a/client.go +++ b/client.go @@ -276,7 +276,7 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*TaskInfo, error) { uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload()) } msg := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: task.Type(), Payload: task.Payload(), Queue: opt.queue, diff --git a/heartbeat.go b/heartbeat.go index a51158f..fd695ae 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -125,10 +125,10 @@ func (h *heartbeater) start(wg *sync.WaitGroup) { timer.Reset(h.interval) case w := <-h.starting: - h.workers[w.msg.ID.String()] = w + h.workers[w.msg.ID] = w case msg := <-h.finished: - delete(h.workers, msg.ID.String()) + delete(h.workers, msg.ID) } } }() diff --git a/inspector_test.go b/inspector_test.go index 89be4b4..88c13c7 100644 --- a/inspector_test.go +++ b/inspector_test.go @@ -484,7 +484,7 @@ func TestInspectorGetTaskInfo(t *testing.T) { }{ { qname: "default", - id: m1.ID.String(), + id: m1.ID, want: newTaskInfo( m1, base.TaskStateActive, @@ -493,7 +493,7 @@ func TestInspectorGetTaskInfo(t *testing.T) { }, { qname: "default", - id: m2.ID.String(), + id: m2.ID, want: newTaskInfo( m2, base.TaskStateScheduled, @@ -502,7 +502,7 @@ func TestInspectorGetTaskInfo(t *testing.T) { }, { qname: "custom", - id: m3.ID.String(), + id: m3.ID, want: newTaskInfo( m3, base.TaskStateRetry, @@ -511,7 +511,7 @@ func TestInspectorGetTaskInfo(t *testing.T) { }, { qname: "custom", - id: m4.ID.String(), + id: m4.ID, want: newTaskInfo( m4, base.TaskStateArchived, @@ -520,7 +520,7 @@ func TestInspectorGetTaskInfo(t *testing.T) { }, { qname: "custom", - id: m5.ID.String(), + id: m5.ID, want: newTaskInfo( m5, base.TaskStatePending, @@ -603,7 +603,7 @@ func TestInspectorGetTaskInfoError(t *testing.T) { }{ { qname: "nonexistent", - id: m1.ID.String(), + id: m1.ID, wantErr: ErrQueueNotFound, }, { diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 0cf924b..4094b84 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -32,7 +32,7 @@ func EquateInt64Approx(margin int64) cmp.Option { var SortMsgOpt = cmp.Transformer("SortTaskMessages", func(in []*base.TaskMessage) []*base.TaskMessage { out := append([]*base.TaskMessage(nil), in...) // Copy input to avoid mutating it sort.Slice(out, func(i, j int) bool { - return out[i].ID.String() < out[j].ID.String() + return out[i].ID < out[j].ID }) return out }) @@ -41,7 +41,7 @@ var SortMsgOpt = cmp.Transformer("SortTaskMessages", func(in []*base.TaskMessage var SortZSetEntryOpt = cmp.Transformer("SortZSetEntries", func(in []base.Z) []base.Z { out := append([]base.Z(nil), in...) // Copy input to avoid mutating it sort.Slice(out, func(i, j int) bool { - return out[i].Message.ID.String() < out[j].Message.ID.String() + return out[i].Message.ID < out[j].Message.ID }) return out }) @@ -104,7 +104,7 @@ func NewTaskMessage(taskType string, payload []byte) *base.TaskMessage { // task type, payload and queue name. func NewTaskMessageWithQueue(taskType string, payload []byte, qname string) *base.TaskMessage { return &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: taskType, Queue: qname, Retry: 25, @@ -279,10 +279,10 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, tb.Helper() for _, msg := range msgs { encoded := MustMarshal(tb, msg) - if err := c.LPush(context.Background(), key, msg.ID.String()).Err(); err != nil { + if err := c.LPush(context.Background(), key, msg.ID).Err(); err != nil { tb.Fatal(err) } - key := base.TaskKey(msg.Queue, msg.ID.String()) + key := base.TaskKey(msg.Queue, msg.ID) data := map[string]interface{}{ "msg": encoded, "state": state.String(), @@ -294,7 +294,7 @@ func seedRedisList(tb testing.TB, c redis.UniversalClient, key string, tb.Fatal(err) } if len(msg.UniqueKey) > 0 { - err := c.SetNX(context.Background(), msg.UniqueKey, msg.ID.String(), 1*time.Minute).Err() + err := c.SetNX(context.Background(), msg.UniqueKey, msg.ID, 1*time.Minute).Err() if err != nil { tb.Fatalf("Failed to set unique lock in redis: %v", err) } @@ -308,11 +308,11 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, for _, item := range items { msg := item.Message encoded := MustMarshal(tb, msg) - z := &redis.Z{Member: msg.ID.String(), Score: float64(item.Score)} + z := &redis.Z{Member: msg.ID, Score: float64(item.Score)} if err := c.ZAdd(context.Background(), key, z).Err(); err != nil { tb.Fatal(err) } - key := base.TaskKey(msg.Queue, msg.ID.String()) + key := base.TaskKey(msg.Queue, msg.ID) data := map[string]interface{}{ "msg": encoded, "state": state.String(), @@ -324,7 +324,7 @@ func seedRedisZSet(tb testing.TB, c redis.UniversalClient, key string, tb.Fatal(err) } if len(msg.UniqueKey) > 0 { - err := c.SetNX(context.Background(), msg.UniqueKey, msg.ID.String(), 1*time.Minute).Err() + err := c.SetNX(context.Background(), msg.UniqueKey, msg.ID, 1*time.Minute).Err() if err != nil { tb.Fatalf("Failed to set unique lock in redis: %v", err) } diff --git a/internal/base/base.go b/internal/base/base.go index e74bbca..606b1da 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -16,7 +16,6 @@ import ( "github.com/go-redis/redis/v8" "github.com/golang/protobuf/ptypes" - "github.com/google/uuid" "github.com/hibiken/asynq/internal/errors" pb "github.com/hibiken/asynq/internal/proto" "google.golang.org/protobuf/proto" @@ -191,7 +190,7 @@ type TaskMessage struct { Payload []byte // ID is a unique identifier for each task. - ID uuid.UUID + ID string // Queue is a name this message should be enqueued to. Queue string @@ -240,7 +239,7 @@ func EncodeMessage(msg *TaskMessage) ([]byte, error) { return proto.Marshal(&pb.TaskMessage{ Type: msg.Type, Payload: msg.Payload, - Id: msg.ID.String(), + Id: msg.ID, Queue: msg.Queue, Retry: int32(msg.Retry), Retried: int32(msg.Retried), @@ -261,7 +260,7 @@ func DecodeMessage(data []byte) (*TaskMessage, error) { return &TaskMessage{ Type: pbmsg.GetType(), Payload: pbmsg.GetPayload(), - ID: uuid.MustParse(pbmsg.GetId()), + ID: pbmsg.GetId(), Queue: pbmsg.GetQueue(), Retry: int(pbmsg.GetRetry()), Retried: int(pbmsg.GetRetried()), diff --git a/internal/base/base_test.go b/internal/base/base_test.go index 9df725e..c504401 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -344,7 +344,7 @@ func TestUniqueKey(t *testing.T) { } func TestMessageEncoding(t *testing.T) { - id := uuid.New() + id := uuid.NewString() tests := []struct { in *TaskMessage out *TaskMessage diff --git a/internal/context/context_test.go b/internal/context/context_test.go index 50498d7..9e127cc 100644 --- a/internal/context/context_test.go +++ b/internal/context/context_test.go @@ -24,7 +24,7 @@ func TestCreateContextWithFutureDeadline(t *testing.T) { for _, tc := range tests { msg := &base.TaskMessage{ Type: "something", - ID: uuid.New(), + ID: uuid.NewString(), Payload: nil, } @@ -64,7 +64,7 @@ func TestCreateContextWithPastDeadline(t *testing.T) { for _, tc := range tests { msg := &base.TaskMessage{ Type: "something", - ID: uuid.New(), + ID: uuid.NewString(), Payload: nil, } @@ -92,9 +92,9 @@ func TestGetTaskMetadataFromContext(t *testing.T) { desc string msg *base.TaskMessage }{ - {"with zero retried message", &base.TaskMessage{Type: "something", ID: uuid.New(), Retry: 25, Retried: 0, Timeout: 1800, Queue: "default"}}, - {"with non-zero retried message", &base.TaskMessage{Type: "something", ID: uuid.New(), Retry: 10, Retried: 5, Timeout: 1800, Queue: "default"}}, - {"with custom queue name", &base.TaskMessage{Type: "something", ID: uuid.New(), Retry: 25, Retried: 0, Timeout: 1800, Queue: "custom"}}, + {"with zero retried message", &base.TaskMessage{Type: "something", ID: uuid.NewString(), Retry: 25, Retried: 0, Timeout: 1800, Queue: "default"}}, + {"with non-zero retried message", &base.TaskMessage{Type: "something", ID: uuid.NewString(), Retry: 10, Retried: 5, Timeout: 1800, Queue: "default"}}, + {"with custom queue name", &base.TaskMessage{Type: "something", ID: uuid.NewString(), Retry: 25, Retried: 0, Timeout: 1800, Queue: "custom"}}, } for _, tc := range tests { @@ -105,8 +105,8 @@ func TestGetTaskMetadataFromContext(t *testing.T) { if !ok { t.Errorf("%s: GetTaskID(ctx) returned ok == false", tc.desc) } - if ok && id != tc.msg.ID.String() { - t.Errorf("%s: GetTaskID(ctx) returned id == %q, want %q", tc.desc, id, tc.msg.ID.String()) + if ok && id != tc.msg.ID { + t.Errorf("%s: GetTaskID(ctx) returned id == %q, want %q", tc.desc, id, tc.msg.ID) } retried, ok := GetRetryCount(ctx) diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index b53a155..2641540 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -11,7 +11,6 @@ import ( "time" "github.com/go-redis/redis/v8" - "github.com/google/uuid" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/errors" "github.com/spf13/cast" @@ -386,21 +385,21 @@ var getTaskInfoCmd = redis.NewScript(` `) // GetTaskInfo returns a TaskInfo describing the task from the given queue. -func (r *RDB) GetTaskInfo(qname string, id uuid.UUID) (*base.TaskInfo, error) { +func (r *RDB) GetTaskInfo(qname, id string) (*base.TaskInfo, error) { var op errors.Op = "rdb.GetTaskInfo" if err := r.checkQueueExists(qname); err != nil { return nil, errors.E(op, errors.CanonicalCode(err), err) } - keys := []string{base.TaskKey(qname, id.String())} + keys := []string{base.TaskKey(qname, id)} argv := []interface{}{ - id.String(), + id, time.Now().Unix(), base.QueueKeyPrefix(qname), } res, err := getTaskInfoCmd.Run(context.Background(), r.client, keys, argv...).Result() if err != nil { if err.Error() == "NOT FOUND" { - return nil, errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id.String()}) + return nil, errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id}) } return nil, errors.E(op, errors.Unknown, err) } @@ -704,17 +703,17 @@ return 1 // If a queue with the given name doesn't exist, it returns QueueNotFoundError. // If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError // If a task is in active or pending state it returns non-nil error with Code FailedPrecondition. -func (r *RDB) RunTask(qname string, id uuid.UUID) error { +func (r *RDB) RunTask(qname, id string) error { var op errors.Op = "rdb.RunTask" if err := r.checkQueueExists(qname); err != nil { return errors.E(op, errors.CanonicalCode(err), err) } keys := []string{ - base.TaskKey(qname, id.String()), + base.TaskKey(qname, id), base.PendingKey(qname), } argv := []interface{}{ - id.String(), + id, base.QueueKeyPrefix(qname), } res, err := runTaskCmd.Run(context.Background(), r.client, keys, argv...).Result() @@ -729,7 +728,7 @@ func (r *RDB) RunTask(qname string, id uuid.UUID) error { case 1: return nil case 0: - return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id.String()}) + return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id}) case -1: return errors.E(op, errors.FailedPrecondition, "task is already running") case -2: @@ -922,18 +921,18 @@ return 1 // If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError // If a task is already archived, it returns TaskAlreadyArchivedError. // If a task is in active state it returns non-nil error with FailedPrecondition code. -func (r *RDB) ArchiveTask(qname string, id uuid.UUID) error { +func (r *RDB) ArchiveTask(qname, id string) error { var op errors.Op = "rdb.ArchiveTask" if err := r.checkQueueExists(qname); err != nil { return errors.E(op, errors.CanonicalCode(err), err) } keys := []string{ - base.TaskKey(qname, id.String()), + base.TaskKey(qname, id), base.ArchivedKey(qname), } now := time.Now() argv := []interface{}{ - id.String(), + id, now.Unix(), now.AddDate(0, 0, -archivedExpirationInDays).Unix(), maxArchiveSize, @@ -951,9 +950,9 @@ func (r *RDB) ArchiveTask(qname string, id uuid.UUID) error { case 1: return nil case 0: - return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id.String()}) + return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id}) case -1: - return errors.E(op, errors.FailedPrecondition, &errors.TaskAlreadyArchivedError{Queue: qname, ID: id.String()}) + return errors.E(op, errors.FailedPrecondition, &errors.TaskAlreadyArchivedError{Queue: qname, ID: id}) case -2: return errors.E(op, errors.FailedPrecondition, "cannot archive task in active state. use CancelTask instead.") case -3: @@ -1059,16 +1058,16 @@ return redis.call("DEL", KEYS[1]) // If a queue with the given name doesn't exist, it returns QueueNotFoundError. // If a task with the given id doesn't exist in the queue, it returns TaskNotFoundError // If a task is in active state it returns non-nil error with Code FailedPrecondition. -func (r *RDB) DeleteTask(qname string, id uuid.UUID) error { +func (r *RDB) DeleteTask(qname, id string) error { var op errors.Op = "rdb.DeleteTask" if err := r.checkQueueExists(qname); err != nil { return errors.E(op, errors.CanonicalCode(err), err) } keys := []string{ - base.TaskKey(qname, id.String()), + base.TaskKey(qname, id), } argv := []interface{}{ - id.String(), + id, base.QueueKeyPrefix(qname), } res, err := deleteTaskCmd.Run(context.Background(), r.client, keys, argv...).Result() @@ -1083,7 +1082,7 @@ func (r *RDB) DeleteTask(qname string, id uuid.UUID) error { case 1: return nil case 0: - return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id.String()}) + return errors.E(op, errors.NotFound, &errors.TaskNotFoundError{Queue: qname, ID: id}) case -1: return errors.E(op, errors.FailedPrecondition, "cannot delete task in active state. use CancelTask instead.") default: diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 787d46c..2d8b1c0 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -363,7 +363,7 @@ func TestGetTaskInfo(t *testing.T) { tests := []struct { qname string - id uuid.UUID + id string want *base.TaskInfo }{ { @@ -478,7 +478,7 @@ func TestGetTaskInfoError(t *testing.T) { tests := []struct { qname string - id uuid.UUID + id string match func(err error) bool }{ { @@ -488,7 +488,7 @@ func TestGetTaskInfoError(t *testing.T) { }, { qname: "default", - id: uuid.New(), + id: uuid.NewString(), match: errors.IsTaskNotFound, }, } @@ -882,7 +882,7 @@ func TestListRetry(t *testing.T) { r := setup(t) defer r.Close() m1 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "task1", Queue: "default", Payload: nil, @@ -891,7 +891,7 @@ func TestListRetry(t *testing.T) { Retried: 10, } m2 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "task2", Queue: "default", Payload: nil, @@ -900,7 +900,7 @@ func TestListRetry(t *testing.T) { Retried: 2, } m3 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "task3", Queue: "custom", Payload: nil, @@ -1041,21 +1041,21 @@ func TestListArchived(t *testing.T) { r := setup(t) defer r.Close() m1 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "task1", Queue: "default", Payload: nil, ErrorMsg: "some error occurred", } m2 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "task2", Queue: "default", Payload: nil, ErrorMsg: "some error occurred", } m3 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "task3", Queue: "custom", Payload: nil, @@ -1240,7 +1240,7 @@ func TestRunArchivedTask(t *testing.T) { tests := []struct { archived map[string][]base.Z qname string - id uuid.UUID + id string wantArchived map[string][]*base.TaskMessage wantPending map[string][]*base.TaskMessage }{ @@ -1320,7 +1320,7 @@ func TestRunRetryTask(t *testing.T) { tests := []struct { retry map[string][]base.Z qname string - id uuid.UUID + id string wantRetry map[string][]*base.TaskMessage wantPending map[string][]*base.TaskMessage }{ @@ -1400,7 +1400,7 @@ func TestRunScheduledTask(t *testing.T) { tests := []struct { scheduled map[string][]base.Z qname string - id uuid.UUID + id string wantScheduled map[string][]*base.TaskMessage wantPending map[string][]*base.TaskMessage }{ @@ -1480,7 +1480,7 @@ func TestRunTaskError(t *testing.T) { pending map[string][]*base.TaskMessage scheduled map[string][]base.Z qname string - id uuid.UUID + id string match func(err error) bool wantActive map[string][]*base.TaskMessage wantPending map[string][]*base.TaskMessage @@ -1526,7 +1526,7 @@ func TestRunTaskError(t *testing.T) { }, }, qname: "default", - id: uuid.New(), + id: uuid.NewString(), match: errors.IsTaskNotFound, wantActive: map[string][]*base.TaskMessage{ "default": {}, @@ -1987,7 +1987,7 @@ func TestArchiveRetryTask(t *testing.T) { retry map[string][]base.Z archived map[string][]base.Z qname string - id uuid.UUID + id string wantRetry map[string][]base.Z wantArchived map[string][]base.Z }{ @@ -2088,7 +2088,7 @@ func TestArchiveScheduledTask(t *testing.T) { scheduled map[string][]base.Z archived map[string][]base.Z qname string - id uuid.UUID + id string wantScheduled map[string][]base.Z wantArchived map[string][]base.Z }{ @@ -2185,7 +2185,7 @@ func TestArchivePendingTask(t *testing.T) { pending map[string][]*base.TaskMessage archived map[string][]base.Z qname string - id uuid.UUID + id string wantPending map[string][]*base.TaskMessage wantArchived map[string][]base.Z }{ @@ -2270,7 +2270,7 @@ func TestArchiveTaskError(t *testing.T) { scheduled map[string][]base.Z archived map[string][]base.Z qname string - id uuid.UUID + id string match func(err error) bool wantActive map[string][]*base.TaskMessage wantScheduled map[string][]base.Z @@ -2312,7 +2312,7 @@ func TestArchiveTaskError(t *testing.T) { "default": {{Message: m2, Score: t2.Unix()}}, }, qname: "default", - id: uuid.New(), + id: uuid.NewString(), match: errors.IsTaskNotFound, wantActive: map[string][]*base.TaskMessage{ "default": {}, @@ -2879,7 +2879,7 @@ func TestDeleteArchivedTask(t *testing.T) { tests := []struct { archived map[string][]base.Z qname string - id uuid.UUID + id string wantArchived map[string][]*base.TaskMessage }{ { @@ -2945,7 +2945,7 @@ func TestDeleteRetryTask(t *testing.T) { tests := []struct { retry map[string][]base.Z qname string - id uuid.UUID + id string wantRetry map[string][]*base.TaskMessage }{ { @@ -3011,7 +3011,7 @@ func TestDeleteScheduledTask(t *testing.T) { tests := []struct { scheduled map[string][]base.Z qname string - id uuid.UUID + id string wantScheduled map[string][]*base.TaskMessage }{ { @@ -3074,7 +3074,7 @@ func TestDeletePendingTask(t *testing.T) { tests := []struct { pending map[string][]*base.TaskMessage qname string - id uuid.UUID + id string wantPending map[string][]*base.TaskMessage }{ { @@ -3123,7 +3123,7 @@ func TestDeleteTaskWithUniqueLock(t *testing.T) { r := setup(t) defer r.Close() m1 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "email", Payload: h.JSON(map[string]interface{}{"user_id": json.Number("123")}), Queue: base.DefaultQueueName, @@ -3134,7 +3134,7 @@ func TestDeleteTaskWithUniqueLock(t *testing.T) { tests := []struct { scheduled map[string][]base.Z qname string - id uuid.UUID + id string uniqueKey string wantScheduled map[string][]*base.TaskMessage }{ @@ -3186,7 +3186,7 @@ func TestDeleteTaskError(t *testing.T) { active map[string][]*base.TaskMessage scheduled map[string][]base.Z qname string - id uuid.UUID + id string match func(err error) bool wantActive map[string][]*base.TaskMessage wantScheduled map[string][]*base.TaskMessage @@ -3200,7 +3200,7 @@ func TestDeleteTaskError(t *testing.T) { "default": {{Message: m1, Score: t1.Unix()}}, }, qname: "default", - id: uuid.New(), + id: uuid.NewString(), match: errors.IsTaskNotFound, wantActive: map[string][]*base.TaskMessage{ "default": {}, @@ -3218,7 +3218,7 @@ func TestDeleteTaskError(t *testing.T) { "default": {{Message: m1, Score: t1.Unix()}}, }, qname: "nonexistent", - id: uuid.New(), + id: uuid.NewString(), match: errors.IsQueueNotFound, wantActive: map[string][]*base.TaskMessage{ "default": {}, @@ -3340,7 +3340,7 @@ func TestDeleteAllArchivedTasksWithUniqueKey(t *testing.T) { r := setup(t) defer r.Close() m1 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "task1", Payload: nil, Timeout: 1800, @@ -3349,7 +3349,7 @@ func TestDeleteAllArchivedTasksWithUniqueKey(t *testing.T) { Queue: "default", } m2 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "task2", Payload: nil, Timeout: 1800, @@ -3721,7 +3721,7 @@ func TestRemoveQueue(t *testing.T) { } } - if n := len(r.client.Keys(context.Background(), base.TaskKeyPrefix(tc.qname) + "*").Val()); n != 0 { + if n := len(r.client.Keys(context.Background(), base.TaskKeyPrefix(tc.qname)+"*").Val()); n != 0 { t.Errorf("%d keys still exists for tasks", n) } } @@ -3960,7 +3960,7 @@ func TestListWorkers(t *testing.T) { Host: host, PID: pid, ServerID: serverID, - ID: m1.ID.String(), + ID: m1.ID, Type: m1.Type, Queue: m1.Queue, Payload: m1.Payload, @@ -3971,7 +3971,7 @@ func TestListWorkers(t *testing.T) { Host: host, PID: pid, ServerID: serverID, - ID: m2.ID.String(), + ID: m2.ID, Type: m2.Type, Queue: m2.Queue, Payload: m2.Payload, @@ -3982,7 +3982,7 @@ func TestListWorkers(t *testing.T) { Host: host, PID: pid, ServerID: serverID, - ID: m3.ID.String(), + ID: m3.ID, Type: m3.Type, Queue: m3.Queue, Payload: m3.Payload, diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 9dd3513..de12ca7 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -84,12 +84,12 @@ func (r *RDB) Enqueue(msg *base.TaskMessage) error { return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) } keys := []string{ - base.TaskKey(msg.Queue, msg.ID.String()), + base.TaskKey(msg.Queue, msg.ID), base.PendingKey(msg.Queue), } argv := []interface{}{ encoded, - msg.ID.String(), + msg.ID, msg.Timeout, msg.Deadline, } @@ -139,11 +139,11 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { } keys := []string{ msg.UniqueKey, - base.TaskKey(msg.Queue, msg.ID.String()), + base.TaskKey(msg.Queue, msg.ID), base.PendingKey(msg.Queue), } argv := []interface{}{ - msg.ID.String(), + msg.ID, int(ttl.Seconds()), encoded, msg.Timeout, @@ -312,11 +312,11 @@ func (r *RDB) Done(msg *base.TaskMessage) error { keys := []string{ base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), - base.TaskKey(msg.Queue, msg.ID.String()), + base.TaskKey(msg.Queue, msg.ID), base.ProcessedKey(msg.Queue, now), } argv := []interface{}{ - msg.ID.String(), + msg.ID, expireAt.Unix(), } if len(msg.UniqueKey) > 0 { @@ -350,9 +350,9 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error { base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.PendingKey(msg.Queue), - base.TaskKey(msg.Queue, msg.ID.String()), + base.TaskKey(msg.Queue, msg.ID), } - return r.runScript(op, requeueCmd, keys, msg.ID.String()) + return r.runScript(op, requeueCmd, keys, msg.ID) } // KEYS[1] -> asynq:{}:t: @@ -383,13 +383,13 @@ func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { return errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "sadd", Err: err}) } keys := []string{ - base.TaskKey(msg.Queue, msg.ID.String()), + base.TaskKey(msg.Queue, msg.ID), base.ScheduledKey(msg.Queue), } argv := []interface{}{ encoded, processAt.Unix(), - msg.ID.String(), + msg.ID, msg.Timeout, msg.Deadline, } @@ -433,11 +433,11 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim } keys := []string{ msg.UniqueKey, - base.TaskKey(msg.Queue, msg.ID.String()), + base.TaskKey(msg.Queue, msg.ID), base.ScheduledKey(msg.Queue), } argv := []interface{}{ - msg.ID.String(), + msg.ID, int(ttl.Seconds()), processAt.Unix(), encoded, @@ -508,7 +508,7 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string, i } expireAt := now.Add(statsTTL) keys := []string{ - base.TaskKey(msg.Queue, msg.ID.String()), + base.TaskKey(msg.Queue, msg.ID), base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.RetryKey(msg.Queue), @@ -516,7 +516,7 @@ func (r *RDB) Retry(msg *base.TaskMessage, processAt time.Time, errMsg string, i base.FailedKey(msg.Queue, now), } argv := []interface{}{ - msg.ID.String(), + msg.ID, encoded, processAt.Unix(), expireAt.Unix(), @@ -578,7 +578,7 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error { cutoff := now.AddDate(0, 0, -archivedExpirationInDays) expireAt := now.Add(statsTTL) keys := []string{ - base.TaskKey(msg.Queue, msg.ID.String()), + base.TaskKey(msg.Queue, msg.ID), base.ActiveKey(msg.Queue), base.DeadlinesKey(msg.Queue), base.ArchivedKey(msg.Queue), @@ -586,7 +586,7 @@ func (r *RDB) Archive(msg *base.TaskMessage, errMsg string) error { base.FailedKey(msg.Queue, now), } argv := []interface{}{ - msg.ID.String(), + msg.ID, encoded, now.Unix(), cutoff.Unix(), diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index c566722..8dbc179 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -91,13 +91,13 @@ func TestEnqueue(t *testing.T) { t.Errorf("Redis LIST %q contains %d IDs, want 1", pendingKey, n) continue } - if pendingIDs[0] != tc.msg.ID.String() { - t.Errorf("Redis LIST %q: got %v, want %v", pendingKey, pendingIDs, []string{tc.msg.ID.String()}) + if pendingIDs[0] != tc.msg.ID { + t.Errorf("Redis LIST %q: got %v, want %v", pendingKey, pendingIDs, []string{tc.msg.ID}) continue } // Check the value under the task key. - taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID.String()) + taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID) encoded := r.client.HGet(context.Background(), taskKey, "msg").Val() // "msg" field decoded := h.MustUnmarshal(t, encoded) if diff := cmp.Diff(tc.msg, decoded); diff != "" { @@ -127,7 +127,7 @@ func TestEnqueueUnique(t *testing.T) { r := setup(t) defer r.Close() m1 := base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "email", Payload: h.JSON(map[string]interface{}{"user_id": json.Number("123")}), Queue: base.DefaultQueueName, @@ -170,13 +170,13 @@ func TestEnqueueUnique(t *testing.T) { t.Errorf("Redis LIST %q contains %d IDs, want 1", pendingKey, len(pendingIDs)) continue } - if pendingIDs[0] != tc.msg.ID.String() { - t.Errorf("Redis LIST %q: got %v, want %v", pendingKey, pendingIDs, []string{tc.msg.ID.String()}) + if pendingIDs[0] != tc.msg.ID { + t.Errorf("Redis LIST %q: got %v, want %v", pendingKey, pendingIDs, []string{tc.msg.ID}) continue } // Check the value under the task key. - taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID.String()) + taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID) encoded := r.client.HGet(context.Background(), taskKey, "msg").Val() // "msg" field decoded := h.MustUnmarshal(t, encoded) if diff := cmp.Diff(tc.msg, decoded); diff != "" { @@ -223,7 +223,7 @@ func TestDequeue(t *testing.T) { defer r.Close() now := time.Now() t1 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "send_email", Payload: h.JSON(map[string]interface{}{"subject": "hello!"}), Queue: "default", @@ -232,7 +232,7 @@ func TestDequeue(t *testing.T) { } t1Deadline := now.Unix() + t1.Timeout t2 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "export_csv", Payload: nil, Queue: "critical", @@ -241,7 +241,7 @@ func TestDequeue(t *testing.T) { } t2Deadline := t2.Deadline t3 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "reindex", Payload: nil, Queue: "low", @@ -466,7 +466,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { r := setup(t) defer r.Close() t1 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "send_email", Payload: h.JSON(map[string]interface{}{"subject": "hello!"}), Queue: "default", @@ -474,7 +474,7 @@ func TestDequeueIgnoresPausedQueues(t *testing.T) { Deadline: 0, } t2 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "export_csv", Payload: nil, Queue: "critical", @@ -580,7 +580,7 @@ func TestDone(t *testing.T) { defer r.Close() now := time.Now() t1 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "send_email", Payload: nil, Timeout: 1800, @@ -588,7 +588,7 @@ func TestDone(t *testing.T) { Queue: "default", } t2 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "export_csv", Payload: nil, Timeout: 0, @@ -596,7 +596,7 @@ func TestDone(t *testing.T) { Queue: "custom", } t3 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "reindex", Payload: nil, Timeout: 1800, @@ -682,7 +682,7 @@ func TestDone(t *testing.T) { for _, msg := range msgs { // Set uniqueness lock if unique key is present. if len(msg.UniqueKey) > 0 { - err := r.client.SetNX(context.Background(), msg.UniqueKey, msg.ID.String(), time.Minute).Err() + err := r.client.SetNX(context.Background(), msg.UniqueKey, msg.ID, time.Minute).Err() if err != nil { t.Fatal(err) } @@ -733,21 +733,21 @@ func TestRequeue(t *testing.T) { defer r.Close() now := time.Now() t1 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "send_email", Payload: nil, Queue: "default", Timeout: 1800, } t2 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "export_csv", Payload: nil, Queue: "default", Timeout: 3000, } t3 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "send_email", Payload: nil, Queue: "critical", @@ -906,9 +906,9 @@ func TestSchedule(t *testing.T) { scheduledKey, n) continue } - if got := zs[0].Member.(string); got != tc.msg.ID.String() { + if got := zs[0].Member.(string); got != tc.msg.ID { t.Errorf("Redis ZSET %q member: got %v, want %v", - scheduledKey, got, tc.msg.ID.String()) + scheduledKey, got, tc.msg.ID) continue } if got := int64(zs[0].Score); got != tc.processAt.Unix() { @@ -918,7 +918,7 @@ func TestSchedule(t *testing.T) { } // Check the values under the task key. - taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID.String()) + taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID) encoded := r.client.HGet(context.Background(), taskKey, "msg").Val() // "msg" field decoded := h.MustUnmarshal(t, encoded) if diff := cmp.Diff(tc.msg, decoded); diff != "" { @@ -950,7 +950,7 @@ func TestScheduleUnique(t *testing.T) { r := setup(t) defer r.Close() m1 := base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "email", Payload: h.JSON(map[string]interface{}{"user_id": 123}), Queue: base.DefaultQueueName, @@ -983,9 +983,9 @@ func TestScheduleUnique(t *testing.T) { scheduledKey, n) continue } - if got := zs[0].Member.(string); got != tc.msg.ID.String() { + if got := zs[0].Member.(string); got != tc.msg.ID { t.Errorf("Redis ZSET %q member: got %v, want %v", - scheduledKey, got, tc.msg.ID.String()) + scheduledKey, got, tc.msg.ID) continue } if got := int64(zs[0].Score); got != tc.processAt.Unix() { @@ -995,7 +995,7 @@ func TestScheduleUnique(t *testing.T) { } // Check the values under the task key. - taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID.String()) + taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID) encoded := r.client.HGet(context.Background(), taskKey, "msg").Val() // "msg" field decoded := h.MustUnmarshal(t, encoded) if diff := cmp.Diff(tc.msg, decoded); diff != "" { @@ -1045,7 +1045,7 @@ func TestRetry(t *testing.T) { defer r.Close() now := time.Now() t1 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "send_email", Payload: h.JSON(map[string]interface{}{"subject": "Hola!"}), Retried: 10, @@ -1053,21 +1053,21 @@ func TestRetry(t *testing.T) { Queue: "default", } t2 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "gen_thumbnail", Payload: h.JSON(map[string]interface{}{"path": "some/path/to/image.jpg"}), Timeout: 3000, Queue: "default", } t3 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "reindex", Payload: nil, Timeout: 60, Queue: "default", } t4 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "send_notification", Payload: nil, Timeout: 1800, @@ -1216,7 +1216,7 @@ func TestRetryWithNonFailureError(t *testing.T) { defer r.Close() now := time.Now() t1 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "send_email", Payload: h.JSON(map[string]interface{}{"subject": "Hola!"}), Retried: 10, @@ -1224,21 +1224,21 @@ func TestRetryWithNonFailureError(t *testing.T) { Queue: "default", } t2 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "gen_thumbnail", Payload: h.JSON(map[string]interface{}{"path": "some/path/to/image.jpg"}), Timeout: 3000, Queue: "default", } t3 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "reindex", Payload: nil, Timeout: 60, Queue: "default", } t4 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "send_notification", Payload: nil, Timeout: 1800, @@ -1383,7 +1383,7 @@ func TestArchive(t *testing.T) { defer r.Close() now := time.Now() t1 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "send_email", Payload: nil, Queue: "default", @@ -1393,7 +1393,7 @@ func TestArchive(t *testing.T) { } t1Deadline := now.Unix() + t1.Timeout t2 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "reindex", Payload: nil, Queue: "default", @@ -1403,7 +1403,7 @@ func TestArchive(t *testing.T) { } t2Deadline := now.Unix() + t2.Timeout t3 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "generate_csv", Payload: nil, Queue: "default", @@ -1413,7 +1413,7 @@ func TestArchive(t *testing.T) { } t3Deadline := now.Unix() + t3.Timeout t4 := &base.TaskMessage{ - ID: uuid.New(), + ID: uuid.NewString(), Type: "send_email", Payload: nil, Queue: "custom", @@ -1905,7 +1905,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) { { Host: host, PID: pid, - ID: msg1.ID.String(), + ID: msg1.ID, Type: msg1.Type, Queue: msg1.Queue, Payload: msg1.Payload, @@ -1914,7 +1914,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) { { Host: host, PID: pid, - ID: msg2.ID.String(), + ID: msg2.ID, Type: msg2.Type, Queue: msg2.Queue, Payload: msg2.Payload, @@ -2017,7 +2017,7 @@ func TestClearServerState(t *testing.T) { { Host: host, PID: pid, - ID: msg1.ID.String(), + ID: msg1.ID, Type: msg1.Type, Queue: msg1.Queue, Payload: msg1.Payload, @@ -2040,7 +2040,7 @@ func TestClearServerState(t *testing.T) { { Host: otherHost, PID: otherPID, - ID: msg2.ID.String(), + ID: msg2.ID, Type: msg2.Type, Queue: msg2.Queue, Payload: msg2.Payload, diff --git a/processor.go b/processor.go index fcf7cb4..e4c50b1 100644 --- a/processor.go +++ b/processor.go @@ -191,10 +191,10 @@ func (p *processor) exec() { }() ctx, cancel := asynqcontext.New(msg, deadline) - p.cancelations.Add(msg.ID.String(), cancel) + p.cancelations.Add(msg.ID, cancel) defer func() { cancel() - p.cancelations.Delete(msg.ID.String()) + p.cancelations.Delete(msg.ID) }() // check context before starting a worker goroutine.