diff --git a/context.go b/context.go index 3a64150..1ed6634 100644 --- a/context.go +++ b/context.go @@ -27,13 +27,13 @@ type ctxKey int const metadataCtxKey ctxKey = 0 // createContext returns a context and cancel function for a given task message. -func createContext(msg *base.TaskMessage, deadline time.Time) (ctx context.Context, cancel context.CancelFunc) { +func createContext(msg *base.TaskMessage, deadline time.Time) (context.Context, context.CancelFunc) { metadata := taskMetadata{ id: msg.ID.String(), maxRetry: msg.Retry, retryCount: msg.Retried, } - ctx = context.WithValue(context.Background(), metadataCtxKey, metadata) + ctx := context.WithValue(context.Background(), metadataCtxKey, metadata) return context.WithDeadline(ctx, deadline) } diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index 81fbd82..e33f11e 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -150,7 +150,7 @@ for i = 2, table.getn(ARGV) do elseif timeout ~= 0 then score = ARGV[1] + timeout elseif deadline ~= 0 then - score = deadline + score = deadline else return redis.error_reply("asynq internal error: both timeout and deadline are not set") end @@ -175,7 +175,7 @@ func (r *RDB) dequeue(qkeys ...interface{}) (msgjson string, deadline int64, err return "", 0, err } if len(data) != 2 { - return "", 0, fmt.Errorf("asynq: internal error: dequeue command returned %v values", len(data)) + return "", 0, fmt.Errorf("asynq: internal error: dequeue command returned %d values", len(data)) } if msgjson, err = cast.ToStringE(data[0]); err != nil { return "", 0, err @@ -471,23 +471,6 @@ func (r *RDB) forward(src string) (int, error) { return cast.ToInt(res), nil } -// KEYS[1] -> asynq:deadlines -// KEYS[2] -> asynq:in_progress -// ARGV[1] -> max deadline score in unix time -// ARGV[2] -> queue prefix -/* -var requeueDeadlineExceededCmd = redis.NewScript(` -local msgs = redis.call("ZRANGEBYSCORE", KEYS[1], "-inf", ARGV[1], "LIMIT", 0, 100) -for _, msg in ipairs(msgs) do - local decoded = cjson.decode(msg) - local qkey = ARGV[2] .. decoded["Queue"] - redis.call("LPUSH", qkey, msg) - redis.call("ZREM", KEYS[1], msg) - redis.call("LREM", KEYS[2], 0, msg) -end -return table.getn(msgs)`) -*/ - // ListDeadlineExceeded returns a list of task messages that have exceeded the given deadline. func (r *RDB) ListDeadlineExceeded(deadline time.Time) ([]*base.TaskMessage, error) { var msgs []*base.TaskMessage diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 3712887..e7a95e8 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -156,7 +156,7 @@ func TestDequeue(t *testing.T) { }, args: []string{"default"}, wantMsg: t1, - wantDeadline: time.Unix(int64(t1Deadline), 0), + wantDeadline: time.Unix(t1Deadline, 0), err: nil, wantEnqueued: map[string][]*base.TaskMessage{ "default": {}, @@ -191,7 +191,7 @@ func TestDequeue(t *testing.T) { }, args: []string{"critical", "default", "low"}, wantMsg: t2, - wantDeadline: time.Unix(int64(t2Deadline), 0), + wantDeadline: time.Unix(t2Deadline, 0), err: nil, wantEnqueued: map[string][]*base.TaskMessage{ "default": {t1}, @@ -214,7 +214,7 @@ func TestDequeue(t *testing.T) { }, args: []string{"critical", "default", "low"}, wantMsg: t3, - wantDeadline: time.Unix(int64(t3Deadline), 0), + wantDeadline: time.Unix(t3Deadline, 0), err: nil, wantEnqueued: map[string][]*base.TaskMessage{ "default": {}, @@ -278,12 +278,10 @@ func TestDequeue(t *testing.T) { t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.QueueKey(queue), diff) } } - gotInProgress := h.GetInProgressMessages(t, r.client) if diff := cmp.Diff(tc.wantInProgress, gotInProgress, h.SortMsgOpt); diff != "" { t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.InProgressQueue, diff) } - gotDeadlines := h.GetDeadlinesEntries(t, r.client) if diff := cmp.Diff(tc.wantDeadlines, gotDeadlines, h.SortZSetEntryOpt); diff != "" { t.Errorf("mismatch found in %q: (-want,+got):\n%s", base.KeyDeadlines, diff) @@ -746,6 +744,7 @@ func TestRetry(t *testing.T) { ID: xid.New(), Type: "send_email", Payload: map[string]interface{}{"subject": "Hola!"}, + Retried: 10, Timeout: 1800, } t1Deadline := now.Unix() + t1.Timeout @@ -762,18 +761,7 @@ func TestRetry(t *testing.T) { Payload: nil, Timeout: 60, } - t1.Retried = 10 errMsg := "SMTP server is not responding" - t1AfterRetry := &base.TaskMessage{ - ID: t1.ID, - Type: t1.Type, - Payload: t1.Payload, - Queue: t1.Queue, - Retry: t1.Retry, - Retried: t1.Retried + 1, - Timeout: t1.Timeout, - ErrorMsg: errMsg, - } tests := []struct { inProgress []*base.TaskMessage @@ -807,7 +795,7 @@ func TestRetry(t *testing.T) { }, wantRetry: []h.ZSetEntry{ { - Msg: t1AfterRetry, + Msg: h.TaskMessageAfterRetry(*t1, errMsg), Score: float64(now.Add(5 * time.Minute).Unix()), }, { @@ -899,16 +887,6 @@ func TestKill(t *testing.T) { } t3Deadline := now.Unix() + t3.Timeout errMsg := "SMTP server not responding" - t1AfterKill := &base.TaskMessage{ - ID: t1.ID, - Type: t1.Type, - Payload: t1.Payload, - Queue: t1.Queue, - Retry: t1.Retry, - Retried: t1.Retried, - Timeout: t1.Timeout, - ErrorMsg: errMsg, - } // TODO(hibiken): add test cases for trimming tests := []struct { @@ -939,7 +917,7 @@ func TestKill(t *testing.T) { }, wantDead: []h.ZSetEntry{ { - Msg: t1AfterKill, + Msg: h.TaskMessageWithError(*t1, errMsg), Score: float64(now.Unix()), }, { @@ -964,7 +942,7 @@ func TestKill(t *testing.T) { }, wantDead: []h.ZSetEntry{ { - Msg: t1AfterKill, + Msg: h.TaskMessageWithError(*t1, errMsg), Score: float64(now.Unix()), }, }, diff --git a/internal/testbroker/testbroker.go b/internal/testbroker/testbroker.go index 227f61e..85d388b 100644 --- a/internal/testbroker/testbroker.go +++ b/internal/testbroker/testbroker.go @@ -26,6 +26,9 @@ type TestBroker struct { real base.Broker } +// Make sure TestBroker implements Broker interface at compile time. +var _ base.Broker = (*TestBroker)(nil) + func NewTestBroker(b base.Broker) *TestBroker { return &TestBroker{real: b} }