mirror of
				https://github.com/hibiken/asynq.git
				synced 2025-10-26 11:16:12 +08:00 
			
		
		
		
	Minor code cleanup
This commit is contained in:
		| @@ -27,13 +27,13 @@ type ctxKey int | ||||
| const metadataCtxKey ctxKey = 0 | ||||
|  | ||||
| // createContext returns a context and cancel function for a given task message. | ||||
| func createContext(msg *base.TaskMessage, deadline time.Time) (ctx context.Context, cancel context.CancelFunc) { | ||||
| func createContext(msg *base.TaskMessage, deadline time.Time) (context.Context, context.CancelFunc) { | ||||
| 	metadata := taskMetadata{ | ||||
| 		id:         msg.ID.String(), | ||||
| 		maxRetry:   msg.Retry, | ||||
| 		retryCount: msg.Retried, | ||||
| 	} | ||||
| 	ctx = context.WithValue(context.Background(), metadataCtxKey, metadata) | ||||
| 	ctx := context.WithValue(context.Background(), metadataCtxKey, metadata) | ||||
| 	return context.WithDeadline(ctx, deadline) | ||||
| } | ||||
|  | ||||
|   | ||||
| @@ -150,7 +150,7 @@ for i = 2, table.getn(ARGV) do | ||||
| 			elseif timeout ~= 0 then | ||||
| 				score = ARGV[1] + timeout | ||||
| 			elseif deadline ~= 0 then | ||||
| 			    score = deadline | ||||
| 				score = deadline | ||||
| 			else | ||||
| 				return redis.error_reply("asynq internal error: both timeout and deadline are not set") | ||||
| 			end | ||||
| @@ -175,7 +175,7 @@ func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int64, err | ||||
| 		return "", 0, err | ||||
| 	} | ||||
| 	if len(data) != 2 { | ||||
| 		return "", 0, fmt.Errorf("asynq: internal error: dequeue command returned %v values", len(data)) | ||||
| 		return "", 0, fmt.Errorf("asynq: internal error: dequeue command returned %d values", len(data)) | ||||
| 	} | ||||
| 	if msgjson, err = cast.ToStringE(data[0]); err != nil { | ||||
| 		return "", 0, err | ||||
| @@ -471,23 +471,6 @@ func (r *RDB) forward(src string) (int, error) { | ||||
| 	return cast.ToInt(res), nil | ||||
| } | ||||
|  | ||||
| // KEYS[1] -> asynq:deadlines | ||||
| // KEYS[2] -> asynq:in_progress | ||||
| // ARGV[1] -> max deadline score in unix time | ||||
| // ARGV[2] -> queue prefix | ||||
| /* | ||||
| var requeueDeadlineExceededCmd = redis.NewScript(` | ||||
| local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100) | ||||
| for _, msg in ipairs(msgs) do | ||||
| 	local decoded = cjson.decode(msg) | ||||
| 	local qkey = ARGV[2] .. decoded["Queue"] | ||||
| 	redis.call("LPUSH", qkey, msg) | ||||
| 	redis.call("ZREM", KEYS[1], msg) | ||||
| 	redis.call("LREM", KEYS[2], 0, msg) | ||||
| end | ||||
| return table.getn(msgs)`) | ||||
| */ | ||||
|  | ||||
| // ListDeadlineExceeded returns a list of task messages that have exceeded the given deadline. | ||||
| func (r *RDB) ListDeadlineExceeded(deadline time.Time) ([]*base.TaskMessage, error) { | ||||
| 	var msgs []*base.TaskMessage | ||||
|   | ||||
| @@ -156,7 +156,7 @@ func TestDequeue(t *testing.T) { | ||||
| 			}, | ||||
| 			args:         []string{"default"}, | ||||
| 			wantMsg:      t1, | ||||
| 			wantDeadline: time.Unix(int64(t1Deadline), 0), | ||||
| 			wantDeadline: time.Unix(t1Deadline, 0), | ||||
| 			err:          nil, | ||||
| 			wantEnqueued: map[string][]*base.TaskMessage{ | ||||
| 				"default": {}, | ||||
| @@ -191,7 +191,7 @@ func TestDequeue(t *testing.T) { | ||||
| 			}, | ||||
| 			args:         []string{"critical", "default", "low"}, | ||||
| 			wantMsg:      t2, | ||||
| 			wantDeadline: time.Unix(int64(t2Deadline), 0), | ||||
| 			wantDeadline: time.Unix(t2Deadline, 0), | ||||
| 			err:          nil, | ||||
| 			wantEnqueued: map[string][]*base.TaskMessage{ | ||||
| 				"default":  {t1}, | ||||
| @@ -214,7 +214,7 @@ func TestDequeue(t *testing.T) { | ||||
| 			}, | ||||
| 			args:         []string{"critical", "default", "low"}, | ||||
| 			wantMsg:      t3, | ||||
| 			wantDeadline: time.Unix(int64(t3Deadline), 0), | ||||
| 			wantDeadline: time.Unix(t3Deadline, 0), | ||||
| 			err:          nil, | ||||
| 			wantEnqueued: map[string][]*base.TaskMessage{ | ||||
| 				"default":  {}, | ||||
| @@ -278,12 +278,10 @@ func TestDequeue(t *testing.T) { | ||||
| 				t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff) | ||||
| 			} | ||||
| 		} | ||||
|  | ||||
| 		gotInProgress := h.GetInProgressMessages(t, r.client) | ||||
| 		if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { | ||||
| 			t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff) | ||||
| 		} | ||||
|  | ||||
| 		gotDeadlines := h.GetDeadlinesEntries(t, r.client) | ||||
| 		if diff := cmp.Diff(tc.wantDeadlines, gotDeadlines, h.SortZSetEntryOpt); diff != "" { | ||||
| 			t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.KeyDeadlines, diff) | ||||
| @@ -746,6 +744,7 @@ func TestRetry(t *testing.T) { | ||||
| 		ID:      xid.New(), | ||||
| 		Type:    "send_email", | ||||
| 		Payload: map[string]interface{}{"subject": "Hola!"}, | ||||
| 		Retried: 10, | ||||
| 		Timeout: 1800, | ||||
| 	} | ||||
| 	t1Deadline := now.Unix() + t1.Timeout | ||||
| @@ -762,18 +761,7 @@ func TestRetry(t *testing.T) { | ||||
| 		Payload: nil, | ||||
| 		Timeout: 60, | ||||
| 	} | ||||
| 	t1.Retried = 10 | ||||
| 	errMsg := "SMTP server is not responding" | ||||
| 	t1AfterRetry := &base.TaskMessage{ | ||||
| 		ID:       t1.ID, | ||||
| 		Type:     t1.Type, | ||||
| 		Payload:  t1.Payload, | ||||
| 		Queue:    t1.Queue, | ||||
| 		Retry:    t1.Retry, | ||||
| 		Retried:  t1.Retried + 1, | ||||
| 		Timeout:  t1.Timeout, | ||||
| 		ErrorMsg: errMsg, | ||||
| 	} | ||||
|  | ||||
| 	tests := []struct { | ||||
| 		inProgress     []*base.TaskMessage | ||||
| @@ -807,7 +795,7 @@ func TestRetry(t *testing.T) { | ||||
| 			}, | ||||
| 			wantRetry: []h.ZSetEntry{ | ||||
| 				{ | ||||
| 					Msg:   t1AfterRetry, | ||||
| 					Msg:   h.TaskMessageAfterRetry(*t1, errMsg), | ||||
| 					Score: float64(now.Add(5 * time.Minute).Unix()), | ||||
| 				}, | ||||
| 				{ | ||||
| @@ -899,16 +887,6 @@ func TestKill(t *testing.T) { | ||||
| 	} | ||||
| 	t3Deadline := now.Unix() + t3.Timeout | ||||
| 	errMsg := "SMTP server not responding" | ||||
| 	t1AfterKill := &base.TaskMessage{ | ||||
| 		ID:       t1.ID, | ||||
| 		Type:     t1.Type, | ||||
| 		Payload:  t1.Payload, | ||||
| 		Queue:    t1.Queue, | ||||
| 		Retry:    t1.Retry, | ||||
| 		Retried:  t1.Retried, | ||||
| 		Timeout:  t1.Timeout, | ||||
| 		ErrorMsg: errMsg, | ||||
| 	} | ||||
|  | ||||
| 	// TODO(hibiken): add test cases for trimming | ||||
| 	tests := []struct { | ||||
| @@ -939,7 +917,7 @@ func TestKill(t *testing.T) { | ||||
| 			}, | ||||
| 			wantDead: []h.ZSetEntry{ | ||||
| 				{ | ||||
| 					Msg:   t1AfterKill, | ||||
| 					Msg:   h.TaskMessageWithError(*t1, errMsg), | ||||
| 					Score: float64(now.Unix()), | ||||
| 				}, | ||||
| 				{ | ||||
| @@ -964,7 +942,7 @@ func TestKill(t *testing.T) { | ||||
| 			}, | ||||
| 			wantDead: []h.ZSetEntry{ | ||||
| 				{ | ||||
| 					Msg:   t1AfterKill, | ||||
| 					Msg:   h.TaskMessageWithError(*t1, errMsg), | ||||
| 					Score: float64(now.Unix()), | ||||
| 				}, | ||||
| 			}, | ||||
|   | ||||
| @@ -26,6 +26,9 @@ type TestBroker struct { | ||||
| 	real base.Broker | ||||
| } | ||||
|  | ||||
| // Make sure TestBroker implements Broker interface at compile time. | ||||
| var _ base.Broker = (*TestBroker)(nil) | ||||
|  | ||||
| func NewTestBroker(b base.Broker) *TestBroker { | ||||
| 	return &TestBroker{real: b} | ||||
| } | ||||
|   | ||||
		Reference in New Issue
	
	Block a user