2
0
mirror of https://github.com/hibiken/asynq.git synced 2025-10-21 21:46:12 +08:00

chore(): 从内存里面读取任务

This commit is contained in:
pacinochen
2022-02-24 14:47:04 +08:00
parent c04fd41653
commit e55a693fa0

View File

@@ -6,6 +6,7 @@ package asynq
import (
"fmt"
"strconv"
"strings"
"time"
@@ -14,12 +15,14 @@ import (
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/errors"
"github.com/hibiken/asynq/internal/rdb"
"github.com/robfig/cron/v3"
)
// Inspector is a client interface to inspect and mutate the state of
// queues and tasks.
type Inspector struct {
rdb *rdb.RDB
rdb *rdb.RDB
cron *cron.Cron
}
// New returns a new instance of Inspector.
@@ -29,7 +32,8 @@ func NewInspector(r RedisConnOpt) *Inspector {
panic(fmt.Sprintf("inspeq: unsupported RedisConnOpt type %T", r))
}
return &Inspector{
rdb: rdb.NewRDB(c),
rdb: rdb.NewRDB(c),
cron: cron.New(cron.WithLocation(time.Local)),
}
}
@@ -795,27 +799,54 @@ type SchedulerEntry struct {
// currently running schedulers.
func (i *Inspector) SchedulerEntries() ([]*SchedulerEntry, error) {
var entries []*SchedulerEntry
fmt.Printf("start to record log file")
res, err := i.rdb.ListSchedulerEntries()
if err != nil {
fmt.Errorf("ListSchedulerEntries err:%s", err.Error())
return nil, err
}
for _, e := range res {
task := NewTask(e.Type, e.Payload)
var opts []Option
for _, s := range e.Opts {
if o, err := parseOption(s); err == nil {
// ignore bad data
opts = append(opts, o)
// 如果查询不到数据 从内存里面获取数据
if len(res) == 0 {
fmt.Println("search entries from mem")
for _, entry := range i.cron.Entries() {
job := entry.Job.(*enqueueJob)
var opts []Option
opt := stringifyOptions(job.opts)
for _, s := range opt {
if o, err := parseOption(s); err == nil {
// ignore bad data
opts = append(opts, o)
}
}
entries = append(entries, &SchedulerEntry{
ID: job.id.String(),
Spec: job.cronspec,
Task: job.task,
Opts: opts,
Next: entry.Next,
Prev: entry.Prev,
})
}
fmt.Printf("entries lens is %d \n", len(res))
} else {
for _, e := range res {
task := NewTask(e.Type, e.Payload)
var opts []Option
for _, s := range e.Opts {
if o, err := parseOption(s); err == nil {
// ignore bad data
opts = append(opts, o)
}
}
entries = append(entries, &SchedulerEntry{
ID: e.ID,
Spec: e.Spec,
Task: task,
Opts: opts,
Next: e.Next,
Prev: e.Prev,
})
}
entries = append(entries, &SchedulerEntry{
ID: e.ID,
Spec: e.Spec,
Task: task,
Opts: opts,
Next: e.Next,
Prev: e.Prev,
})
}
return entries, nil
}