2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-12-26 07:42:17 +08:00

Inspector support Go context

This commit is contained in:
youngxu 2022-06-21 11:40:39 +08:00
parent 092911854e
commit 7d783d6ae9
6 changed files with 7 additions and 12 deletions

View File

@ -402,7 +402,7 @@ func (i *Inspector) ListActiveTasksContext(ctx context.Context, queue string, op
case err != nil:
return nil, fmt.Errorf("asynq: %v", err)
}
expired, err := i.rdb.ListLeaseExpiredContext(ctx, time.Now(), queue)
expired, err := i.rdb.ListLeaseExpired(ctx, time.Now(), queue)
if err != nil {
return nil, fmt.Errorf("asynq: %v", err)
}

View File

@ -740,7 +740,7 @@ type Broker interface {
DeleteExpiredCompletedTasks(qname string) error
// Lease related methods
ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*TaskMessage, error)
ListLeaseExpired(ctx context.Context, cutoff time.Time, qnames ...string) ([]*TaskMessage, error)
ExtendLease(qname string, ids ...string) (time.Time, error)
// State snapshot related methods

View File

@ -1291,12 +1291,7 @@ return res
`)
// ListLeaseExpired returns a list of task messages with an expired lease from the given queues.
func (r *RDB) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) {
return r.ListLeaseExpiredContext(context.Background(), cutoff, qnames...)
}
// ListLeaseExpiredContext returns a list of task messages with an expired lease from the given queues.
func (r *RDB) ListLeaseExpiredContext(ctx context.Context, cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) {
func (r *RDB) ListLeaseExpired(ctx context.Context, cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) {
var op errors.Op = "rdb.ListLeaseExpired"
var msgs []*base.TaskMessage
for _, qname := range qnames {

View File

@ -2630,7 +2630,7 @@ func TestListLeaseExpired(t *testing.T) {
h.FlushDB(t, r.client)
h.SeedAllLease(t, r.client, tc.lease)
got, err := r.ListLeaseExpired(tc.cutoff, tc.qnames...)
got, err := r.ListLeaseExpired(context.Background(), tc.cutoff, tc.qnames...)
if err != nil {
t.Errorf("%s; ListLeaseExpired(%v) returned error: %v", tc.desc, tc.cutoff, err)
continue

View File

@ -154,13 +154,13 @@ func (tb *TestBroker) DeleteExpiredCompletedTasks(qname string) error {
return tb.real.DeleteExpiredCompletedTasks(qname)
}
func (tb *TestBroker) ListLeaseExpired(cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) {
func (tb *TestBroker) ListLeaseExpired(ctx context.Context, cutoff time.Time, qnames ...string) ([]*base.TaskMessage, error) {
tb.mu.Lock()
defer tb.mu.Unlock()
if tb.sleeping {
return nil, errRedisDown
}
return tb.real.ListLeaseExpired(cutoff, qnames...)
return tb.real.ListLeaseExpired(ctx, cutoff, qnames...)
}
func (tb *TestBroker) ExtendLease(qname string, ids ...string) (time.Time, error) {

View File

@ -89,7 +89,7 @@ func (r *recoverer) recover() {
func (r *recoverer) recoverLeaseExpiredTasks() {
// Get all tasks which have expired 30 seconds ago or earlier to accomodate certain amount of clock skew.
cutoff := time.Now().Add(-30 * time.Second)
msgs, err := r.broker.ListLeaseExpired(cutoff, r.queues...)
msgs, err := r.broker.ListLeaseExpired(context.Background(), cutoff, r.queues...)
if err != nil {
r.logger.Warnf("recoverer: could not list lease expired tasks: %v", err)
return