mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-26 11:16:12 +08:00 
			
		
		
		
	Merge pull request #5 from 22dm/custom-unique-key
feat: add custom unique key option
This commit is contained in:
		
							
								
								
									
										30
									
								
								client.go
									
									
									
									
									
								
							
							
						
						
									
										30
									
								
								client.go
									
									
									
									
									
								
							| @@ -44,6 +44,7 @@ const ( | |||||||
| 	TimeoutOpt | 	TimeoutOpt | ||||||
| 	DeadlineOpt | 	DeadlineOpt | ||||||
| 	UniqueOpt | 	UniqueOpt | ||||||
|  | 	UniqueKeyOpt | ||||||
| 	ProcessAtOpt | 	ProcessAtOpt | ||||||
| 	ProcessInOpt | 	ProcessInOpt | ||||||
| 	TaskIDOpt | 	TaskIDOpt | ||||||
| @@ -71,6 +72,7 @@ type ( | |||||||
| 	timeoutOption   time.Duration | 	timeoutOption   time.Duration | ||||||
| 	deadlineOption  time.Time | 	deadlineOption  time.Time | ||||||
| 	uniqueOption    time.Duration | 	uniqueOption    time.Duration | ||||||
|  | 	uniqueKeyOption string | ||||||
| 	processAtOption time.Time | 	processAtOption time.Time | ||||||
| 	processInOption time.Duration | 	processInOption time.Duration | ||||||
| 	retentionOption time.Duration | 	retentionOption time.Duration | ||||||
| @@ -149,10 +151,11 @@ func (t deadlineOption) Value() interface{} { return time.Time(t) } | |||||||
| // ErrDuplicateTask error is returned when enqueueing a duplicate task. | // ErrDuplicateTask error is returned when enqueueing a duplicate task. | ||||||
| // TTL duration must be greater than or equal to 1 second. | // TTL duration must be greater than or equal to 1 second. | ||||||
| // | // | ||||||
| // Uniqueness of a task is based on the following properties: | // By default, the uniqueness of a task is based on the following properties: | ||||||
| //     - Task Type | //     - Task Type | ||||||
| //     - Task Payload | //     - Task Payload | ||||||
| //     - Queue Name | //     - Queue Name | ||||||
|  | // UniqueKey can be used to specify a custom string for calculating uniqueness, instead of task payload. | ||||||
| func Unique(ttl time.Duration) Option { | func Unique(ttl time.Duration) Option { | ||||||
| 	return uniqueOption(ttl) | 	return uniqueOption(ttl) | ||||||
| } | } | ||||||
| @@ -161,6 +164,24 @@ func (ttl uniqueOption) String() string     { return fmt.Sprintf("Unique(%v)", t | |||||||
| func (ttl uniqueOption) Type() OptionType   { return UniqueOpt } | func (ttl uniqueOption) Type() OptionType   { return UniqueOpt } | ||||||
| func (ttl uniqueOption) Value() interface{} { return time.Duration(ttl) } | func (ttl uniqueOption) Value() interface{} { return time.Duration(ttl) } | ||||||
|  |  | ||||||
|  | // UniqueKey returns an option to define the custom uniqueness of a task. | ||||||
|  | // If uniqueKey is not empty, the uniqueness of a task is based on the following properties: | ||||||
|  | //     - Task Type | ||||||
|  | //     - UniqueKey | ||||||
|  | //     - Queue Name | ||||||
|  | // Otherwise, task payload will be used, see Unique. | ||||||
|  | // | ||||||
|  | // UniqueKey should be used together with Unique. | ||||||
|  | func UniqueKey(uniqueKey string) Option { | ||||||
|  | 	return uniqueKeyOption(uniqueKey) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (uniqueKey uniqueKeyOption) String() string { | ||||||
|  | 	return fmt.Sprintf("UniqueKey(%q)", string(uniqueKey)) | ||||||
|  | } | ||||||
|  | func (uniqueKey uniqueKeyOption) Type() OptionType   { return UniqueKeyOpt } | ||||||
|  | func (uniqueKey uniqueKeyOption) Value() interface{} { return string(uniqueKey) } | ||||||
|  |  | ||||||
| // ProcessAt returns an option to specify when to process the given task. | // ProcessAt returns an option to specify when to process the given task. | ||||||
| // | // | ||||||
| // If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others. | // If there's a conflicting ProcessIn option, the last option passed to Enqueue overrides the others. | ||||||
| @@ -223,6 +244,7 @@ type option struct { | |||||||
| 	timeout   time.Duration | 	timeout   time.Duration | ||||||
| 	deadline  time.Time | 	deadline  time.Time | ||||||
| 	uniqueTTL time.Duration | 	uniqueTTL time.Duration | ||||||
|  | 	uniqueKey string | ||||||
| 	processAt time.Time | 	processAt time.Time | ||||||
| 	retention time.Duration | 	retention time.Duration | ||||||
| 	group     string | 	group     string | ||||||
| @@ -267,6 +289,8 @@ func composeOptions(opts ...Option) (option, error) { | |||||||
| 				return option{}, errors.New("Unique TTL cannot be less than 1s") | 				return option{}, errors.New("Unique TTL cannot be less than 1s") | ||||||
| 			} | 			} | ||||||
| 			res.uniqueTTL = ttl | 			res.uniqueTTL = ttl | ||||||
|  | 		case uniqueKeyOption: | ||||||
|  | 			res.uniqueKey = string(opt) | ||||||
| 		case processAtOption: | 		case processAtOption: | ||||||
| 			res.processAt = time.Time(opt) | 			res.processAt = time.Time(opt) | ||||||
| 		case processInOption: | 		case processInOption: | ||||||
| @@ -365,8 +389,12 @@ func (c *Client) EnqueueContext(ctx context.Context, task *Task, opts ...Option) | |||||||
| 	} | 	} | ||||||
| 	var uniqueKey string | 	var uniqueKey string | ||||||
| 	if opt.uniqueTTL > 0 { | 	if opt.uniqueTTL > 0 { | ||||||
|  | 		if opt.uniqueKey != "" { | ||||||
|  | 			uniqueKey = base.CustomUniqueKey(opt.queue, task.Type(), opt.uniqueKey) | ||||||
|  | 		} else { | ||||||
| 			uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload()) | 			uniqueKey = base.UniqueKey(opt.queue, task.Type(), task.Payload()) | ||||||
| 		} | 		} | ||||||
|  | 	} | ||||||
| 	msg := &base.TaskMessage{ | 	msg := &base.TaskMessage{ | ||||||
| 		ID:        opt.taskID, | 		ID:        opt.taskID, | ||||||
| 		Type:      task.Type(), | 		Type:      task.Type(), | ||||||
|   | |||||||
| @@ -1176,3 +1176,46 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) { | |||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | func TestClientEnqueueUniqueWithUniqueKeyOption(t *testing.T) { | ||||||
|  | 	r := setup(t) | ||||||
|  | 	c := NewClient(getRedisConnOpt(t)) | ||||||
|  | 	defer c.Close() | ||||||
|  |  | ||||||
|  | 	tests := []struct { | ||||||
|  | 		task *Task | ||||||
|  | 		ttl  time.Duration | ||||||
|  | 	}{ | ||||||
|  | 		{ | ||||||
|  | 			NewTask("email", h.JSON(map[string]interface{}{"user_id": 123})), | ||||||
|  | 			time.Hour, | ||||||
|  | 		}, | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	for _, tc := range tests { | ||||||
|  | 		h.FlushDB(t, r) // clean up db before each test case. | ||||||
|  |  | ||||||
|  | 		// Enqueue the task first. It should succeed. | ||||||
|  | 		_, err := c.Enqueue(tc.task, Unique(tc.ttl), UniqueKey("custom_unique_key")) | ||||||
|  | 		if err != nil { | ||||||
|  | 			t.Fatal(err) | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		gotTTL := r.TTL(context.Background(), base.CustomUniqueKey(base.DefaultQueueName, tc.task.Type(), "custom_unique_key")).Val() | ||||||
|  | 		if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { | ||||||
|  | 			t.Errorf("TTL = %v, want %v", gotTTL, tc.ttl) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// Enqueue the task again. It should fail. | ||||||
|  | 		_, err = c.Enqueue(tc.task, Unique(tc.ttl), UniqueKey("custom_unique_key")) | ||||||
|  | 		if err == nil { | ||||||
|  | 			t.Errorf("Enqueueing %+v did not return an error", tc.task) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 		if !errors.Is(err, ErrDuplicateTask) { | ||||||
|  | 			t.Errorf("Enqueueing %+v returned an error that is not ErrDuplicateTask", tc.task) | ||||||
|  | 			continue | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|   | |||||||
| @@ -945,6 +945,12 @@ func parseOption(s string) (Option, error) { | |||||||
| 			return nil, err | 			return nil, err | ||||||
| 		} | 		} | ||||||
| 		return Unique(d), nil | 		return Unique(d), nil | ||||||
|  | 	case "UniqueKey": | ||||||
|  | 		key, err := strconv.Unquote(arg) | ||||||
|  | 		if err != nil { | ||||||
|  | 			return nil, err | ||||||
|  | 		} | ||||||
|  | 		return UniqueKey(key), nil | ||||||
| 	case "ProcessAt": | 	case "ProcessAt": | ||||||
| 		t, err := time.Parse(time.UnixDate, arg) | 		t, err := time.Parse(time.UnixDate, arg) | ||||||
| 		if err != nil { | 		if err != nil { | ||||||
|   | |||||||
| @@ -205,6 +205,12 @@ func UniqueKey(qname, tasktype string, payload []byte) string { | |||||||
| 	return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, hex.EncodeToString(checksum[:])) | 	return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, hex.EncodeToString(checksum[:])) | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // CustomUniqueKey returns a redis key with the given type, custom key, and queue name. | ||||||
|  | func CustomUniqueKey(qname, tasktype string, customKey string) string { | ||||||
|  | 	checksum := md5.Sum([]byte(customKey)) | ||||||
|  | 	return fmt.Sprintf("%sunique:%s:%s", QueueKeyPrefix(qname), tasktype, hex.EncodeToString(checksum[:])) | ||||||
|  | } | ||||||
|  |  | ||||||
| // GroupKeyPrefix returns a prefix for group key. | // GroupKeyPrefix returns a prefix for group key. | ||||||
| func GroupKeyPrefix(qname string) string { | func GroupKeyPrefix(qname string) string { | ||||||
| 	return fmt.Sprintf("%sg:", QueueKeyPrefix(qname)) | 	return fmt.Sprintf("%sg:", QueueKeyPrefix(qname)) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user