mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-26 11:16:12 +08:00 
			
		
		
		
	Update Enqueue and Schedule commands in rdb
This commit is contained in:
		| @@ -50,27 +50,22 @@ func (r *RDB) Ping() error { | ||||
| 	return r.client.Ping().Err() | ||||
| } | ||||
|  | ||||
| // KEYS[1] -> asynq:queues:<qname> | ||||
| // KEYS[2] -> asynq:queues | ||||
| // ARGV[1] -> task message data | ||||
| var enqueueCmd = redis.NewScript(` | ||||
| redis.call("LPUSH", KEYS[1], ARGV[1]) | ||||
| redis.call("SADD", KEYS[2], KEYS[1]) | ||||
| return 1`) | ||||
|  | ||||
| // Enqueue inserts the given task to the tail of the queue. | ||||
| func (r *RDB) Enqueue(msg *base.TaskMessage) error { | ||||
| 	encoded, err := base.EncodeMessage(msg) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	err := r.client.SAdd(base.AllQueues, msg.Queue).Err() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	key := base.QueueKey(msg.Queue) | ||||
| 	return enqueueCmd.Run(r.client, []string{key, base.AllQueues}, encoded).Err() | ||||
| 	return r.client.LPush(key, encoded).Err() | ||||
| } | ||||
|  | ||||
| // KEYS[1] -> unique key in the form <type>:<payload>:<qname> | ||||
| // KEYS[2] -> asynq:queues:<qname> | ||||
| // KEYS[2] -> asynq:queues | ||||
| // KEYS[1] -> unique key | ||||
| // KEYS[2] -> asynq:{<qname>} | ||||
| // ARGV[1] -> task ID | ||||
| // ARGV[2] -> uniqueness lock TTL | ||||
| // ARGV[3] -> task message data | ||||
| @@ -80,7 +75,6 @@ if not ok then | ||||
|   return 0 | ||||
| end | ||||
| redis.call("LPUSH", KEYS[2], ARGV[3]) | ||||
| redis.call("SADD", KEYS[3], KEYS[2]) | ||||
| return 1 | ||||
| `) | ||||
|  | ||||
| @@ -91,9 +85,12 @@ func (r *RDB) EnqueueUnique(msg *base.TaskMessage, ttl time.Duration) error { | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	key := base.QueueKey(msg.Queue) | ||||
| 	err := r.client.SAdd(base.AllQueues, msg.Queue).Err() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	res, err := enqueueUniqueCmd.Run(r.client, | ||||
| 		[]string{msg.UniqueKey, key, base.AllQueues}, | ||||
| 		[]string{msg.UniqueKey, base.QueueKey(msg.Queue)}, | ||||
| 		msg.ID.String(), int(ttl.Seconds()), encoded).Result() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| @@ -194,7 +191,7 @@ func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int64, err | ||||
| // KEYS[1] -> asynq:in_progress | ||||
| // KEYS[2] -> asynq:deadlines | ||||
| // KEYS[3] -> asynq:processed:<yyyy-mm-dd> | ||||
| // KEYS[4] -> unique key in the format <type>:<payload>:<qname> | ||||
| // KEYS[4] -> unique key | ||||
| // ARGV[1] -> base.TaskMessage value | ||||
| // ARGV[2] -> stats expiration timestamp | ||||
| // ARGV[3] -> task ID | ||||
| @@ -257,45 +254,32 @@ func (r *RDB) Requeue(msg *base.TaskMessage) error { | ||||
| 		encoded).Err() | ||||
| } | ||||
|  | ||||
| // KEYS[1] -> asynq:scheduled | ||||
| // KEYS[2] -> asynq:queues | ||||
| // ARGV[1] -> score (process_at timestamp) | ||||
| // ARGV[2] -> task message | ||||
| // ARGV[3] -> queue key | ||||
| var scheduleCmd = redis.NewScript(` | ||||
| redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2]) | ||||
| redis.call("SADD", KEYS[2], ARGV[3]) | ||||
| return 1 | ||||
| `) | ||||
|  | ||||
| // Schedule adds the task to the backlog queue to be processed in the future. | ||||
| func (r *RDB) Schedule(msg *base.TaskMessage, processAt time.Time) error { | ||||
| 	encoded, err := base.EncodeMessage(msg) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	qkey := base.QueueKey(msg.Queue) | ||||
| 	err := r.client.SAdd(base.AllQueues, msg.Queue).Err() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	score := float64(processAt.Unix()) | ||||
| 	return scheduleCmd.Run(r.client, | ||||
| 		[]string{base.ScheduledQueue, base.AllQueues}, | ||||
| 		score, encoded, qkey).Err() | ||||
| 	return r.client.ZAdd(base.ScheduledKey(msg.Queue), &redis.Z{Score: score, Member: encoded}).Err() | ||||
| } | ||||
|  | ||||
| // KEYS[1] -> unique key in the format <type>:<payload>:<qname> | ||||
| // KEYS[2] -> asynq:scheduled | ||||
| // KEYS[3] -> asynq:queues | ||||
| // KEYS[1] -> unique key | ||||
| // KEYS[2] -> asynq:{<qname>}:scheduled | ||||
| // ARGV[1] -> task ID | ||||
| // ARGV[2] -> uniqueness lock TTL | ||||
| // ARGV[3] -> score (process_at timestamp) | ||||
| // ARGV[4] -> task message | ||||
| // ARGV[5] -> queue key | ||||
| var scheduleUniqueCmd = redis.NewScript(` | ||||
| local ok = redis.call("SET", KEYS[1], ARGV[1], "NX", "EX", ARGV[2]) | ||||
| if not ok then | ||||
|   return 0 | ||||
| end | ||||
| redis.call("ZADD", KEYS[2], ARGV[3], ARGV[4]) | ||||
| redis.call("SADD", KEYS[3], ARGV[5]) | ||||
| return 1 | ||||
| `) | ||||
|  | ||||
| @@ -306,11 +290,14 @@ func (r *RDB) ScheduleUnique(msg *base.TaskMessage, processAt time.Time, ttl tim | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	qkey := base.QueueKey(msg.Queue) | ||||
| 	err := r.client.SAdd(base.AllQueues, msg.Queue).Err() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 	score := float64(processAt.Unix()) | ||||
| 	res, err := scheduleUniqueCmd.Run(r.client, | ||||
| 		[]string{msg.UniqueKey, base.ScheduledQueue, base.AllQueues}, | ||||
| 		msg.ID.String(), int(ttl.Seconds()), score, encoded, qkey).Result() | ||||
| 		[]string{msg.UniqueKey, base.ScheduledKey(msg.Queue)}, | ||||
| 		msg.ID.String(), int(ttl.Seconds()), score, encoded).Result() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|   | ||||
| @@ -34,10 +34,8 @@ func setup(t *testing.T) *RDB { | ||||
| func TestEnqueue(t *testing.T) { | ||||
| 	r := setup(t) | ||||
| 	t1 := h.NewTaskMessage("send_email", map[string]interface{}{"to": "exampleuser@gmail.com", "from": "noreply@example.com"}) | ||||
| 	t2 := h.NewTaskMessage("generate_csv", map[string]interface{}{}) | ||||
| 	t2.Queue = "csv" | ||||
| 	t3 := h.NewTaskMessage("sync", nil) | ||||
| 	t3.Queue = "low" | ||||
| 	t2 := h.NewTaskMessageWithQueue("generate_csv", map[string]interface{}{}, "csv") | ||||
| 	t3 := h.NewTaskMessageWithQueue("sync", nil, "low") | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		msg *base.TaskMessage | ||||
| @@ -55,17 +53,16 @@ func TestEnqueue(t *testing.T) { | ||||
| 			t.Errorf("(*RDB).Enqueue(msg) = %v, want nil", err) | ||||
| 		} | ||||
|  | ||||
| 		qkey := base.QueueKey(tc.msg.Queue) | ||||
| 		gotEnqueued := h.GetEnqueuedMessages(t, r.client, tc.msg.Queue) | ||||
| 		if len(gotEnqueued) != 1 { | ||||
| 			t.Errorf("%q has length %d, want 1", qkey, len(gotEnqueued)) | ||||
| 			t.Errorf("%q has length %d, want 1", base.QueueKey(tc.msg.Queue), len(gotEnqueued)) | ||||
| 			continue | ||||
| 		} | ||||
| 		if diff := cmp.Diff(tc.msg, gotEnqueued[0]); diff != "" { | ||||
| 			t.Errorf("persisted data differed from the original input (-want, +got)\n%s", diff) | ||||
| 		} | ||||
| 		if !r.client.SIsMember(base.AllQueues, qkey).Val() { | ||||
| 			t.Errorf("%q is not a member of SET %q", qkey, base.AllQueues) | ||||
| 		if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() { | ||||
| 			t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
| @@ -77,7 +74,7 @@ func TestEnqueueUnique(t *testing.T) { | ||||
| 		Type:      "email", | ||||
| 		Payload:   map[string]interface{}{"user_id": 123}, | ||||
| 		Queue:     base.DefaultQueueName, | ||||
| 		UniqueKey: "email:user_id=123:default", | ||||
| 		UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", map[string]interface{}{"user_id": 123}), | ||||
| 	} | ||||
|  | ||||
| 	tests := []struct { | ||||
| @@ -103,12 +100,14 @@ func TestEnqueueUnique(t *testing.T) { | ||||
| 				tc.msg, tc.ttl, got, ErrDuplicateTask) | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		gotTTL := r.client.TTL(tc.msg.UniqueKey).Val() | ||||
| 		if !cmp.Equal(tc.ttl.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { | ||||
| 			t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl) | ||||
| 			continue | ||||
| 		} | ||||
| 		if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() { | ||||
| 			t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -673,15 +672,20 @@ func TestSchedule(t *testing.T) { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		gotScheduled := h.GetScheduledEntries(t, r.client) | ||||
| 		gotScheduled := h.GetScheduledEntries(t, r.client, tc.msg.Queue) | ||||
| 		if len(gotScheduled) != 1 { | ||||
| 			t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(gotScheduled), base.ScheduledQueue) | ||||
| 			t.Errorf("%s inserted %d items to %q, want 1 items inserted", | ||||
| 				desc, len(gotScheduled), base.ScheduledKey(tc.msg.Queue)) | ||||
| 			continue | ||||
| 		} | ||||
| 		if int64(gotScheduled[0].Score) != tc.processAt.Unix() { | ||||
| 			t.Errorf("%s inserted an item with score %d, want %d", desc, int64(gotScheduled[0].Score), tc.processAt.Unix()) | ||||
| 			t.Errorf("%s inserted an item with score %d, want %d", | ||||
| 				desc, int64(gotScheduled[0].Score), tc.processAt.Unix()) | ||||
| 			continue | ||||
| 		} | ||||
| 		if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() { | ||||
| 			t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues) | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
| @@ -692,7 +696,7 @@ func TestScheduleUnique(t *testing.T) { | ||||
| 		Type:      "email", | ||||
| 		Payload:   map[string]interface{}{"user_id": 123}, | ||||
| 		Queue:     base.DefaultQueueName, | ||||
| 		UniqueKey: "email:user_id=123:default", | ||||
| 		UniqueKey: base.UniqueKey(base.DefaultQueueName, "email", map[string]interface{}{"user_id": 123}), | ||||
| 	} | ||||
|  | ||||
| 	tests := []struct { | ||||
| @@ -713,9 +717,9 @@ func TestScheduleUnique(t *testing.T) { | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		gotScheduled := h.GetScheduledEntries(t, r.client) | ||||
| 		gotScheduled := h.GetScheduledEntries(t, r.client, tc.msg.Queue) | ||||
| 		if len(gotScheduled) != 1 { | ||||
| 			t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(gotScheduled), base.ScheduledQueue) | ||||
| 			t.Errorf("%s inserted %d items to %q, want 1 items inserted", desc, len(gotScheduled), base.ScheduledKey(tc.msg.Queue)) | ||||
| 			continue | ||||
| 		} | ||||
| 		if int64(gotScheduled[0].Score) != tc.processAt.Unix() { | ||||
| @@ -734,6 +738,10 @@ func TestScheduleUnique(t *testing.T) { | ||||
| 			t.Errorf("TTL %q = %v, want %v", tc.msg.UniqueKey, gotTTL, tc.ttl) | ||||
| 			continue | ||||
| 		} | ||||
| 		if !r.client.SIsMember(base.AllQueues, tc.msg.Queue).Val() { | ||||
| 			t.Errorf("%q is not a member of SET %q", tc.msg.Queue, base.AllQueues) | ||||
| 			continue | ||||
| 		} | ||||
| 	} | ||||
| } | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user