mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-26 11:16:12 +08:00 
			
		
		
		
	Update RDB.EnqueueUnique with task state
This commit is contained in:
		| @@ -99,7 +99,11 @@ local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) | ||||
| if not ok then | ||||
|   return 0 | ||||
| end | ||||
| redis.call("HSET", KEYS[2], "msg", ARGV[3], "timeout", ARGV[4], "deadline", ARGV[5]) | ||||
| redis.call("HSET", KEYS[2], | ||||
|            "msg", ARGV[3], | ||||
| 		   "state", "pending", | ||||
|            "timeout", ARGV[4], | ||||
| 		   "deadline", ARGV[5]) | ||||
| redis.call("LPUSH", KEYS[3], ARGV[1]) | ||||
| return 1 | ||||
| `) | ||||
|   | ||||
| @@ -150,14 +150,40 @@ func TestEnqueueUnique(t *testing.T) { | ||||
| 				tc.msg, tc.ttl, err) | ||||
| 			continue | ||||
| 		} | ||||
| 		gotPending := h.GetPendingMessages(t, r.client, tc.msg.Queue) | ||||
| 		if len(gotPending) != 1 { | ||||
| 			t.Errorf("%q has length %d, want 1", base.PendingKey(tc.msg.Queue), len(gotPending)) | ||||
|  | ||||
| 		// Check Pending list has task ID. | ||||
| 		pendingKey := base.PendingKey(tc.msg.Queue) | ||||
| 		pendingIDs := r.client.LRange(pendingKey, 0, -1).Val() | ||||
| 		if len(pendingIDs) != 1 { | ||||
| 			t.Errorf("Redis LIST %q contains %d IDs, want 1", pendingKey, len(pendingIDs)) | ||||
| 			continue | ||||
| 		} | ||||
| 		if diff := cmp.Diff(tc.msg, gotPending[0]); diff != "" { | ||||
| 			t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff) | ||||
| 		if pendingIDs[0] != tc.msg.ID.String() { | ||||
| 			t.Errorf("Redis LIST %q: got %v, want %v", pendingKey, pendingIDs, []string{tc.msg.ID.String()}) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		// Check the value under the task key. | ||||
| 		taskKey := base.TaskKey(tc.msg.Queue, tc.msg.ID.String()) | ||||
| 		encoded := r.client.HGet(taskKey, "msg").Val() // "msg" field | ||||
| 		decoded := h.MustUnmarshal(t, encoded) | ||||
| 		if diff := cmp.Diff(tc.msg, decoded); diff != "" { | ||||
| 			t.Errorf("persisted message was %v, want %v; (-want, +got)\n%s", decoded, tc.msg, diff) | ||||
| 		} | ||||
| 		state := r.client.HGet(taskKey, "state").Val() // "state" field | ||||
| 		if state != "pending" { | ||||
| 			t.Errorf("state field under task-key is set to %q, want %q", state, "pending") | ||||
| 		} | ||||
| 		timeout := r.client.HGet(taskKey, "timeout").Val() // "timeout" field | ||||
| 		if want := strconv.Itoa(int(tc.msg.Timeout)); timeout != want { | ||||
| 			t.Errorf("timeout field under task-key is set to %v, want %v", timeout, want) | ||||
| 		} | ||||
| 		deadline := r.client.HGet(taskKey, "deadline").Val() // "deadline" field | ||||
| 		if want := strconv.Itoa(int(tc.msg.Deadline)); deadline != want { | ||||
| 			t.Errorf("deadline field under task-ke is set to %v, want %v", deadline, want) | ||||
| 		} | ||||
|  | ||||
| 		// Check queue is in the AllQueues set. | ||||
| 		if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() { | ||||
| 			t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues) | ||||
| 		} | ||||
| @@ -170,7 +196,7 @@ func TestEnqueueUnique(t *testing.T) { | ||||
| 			continue | ||||
| 		} | ||||
| 		gotTTL := r.client.TTL(tc.msg.UniqueKey).Val() | ||||
| 		if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { | ||||
| 		if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 2)) { | ||||
| 			t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl) | ||||
| 			continue | ||||
| 		} | ||||
|   | ||||
		Reference in New Issue
	
	Block a user