2
0
mirror of https://github.com/hibiken/asynq.git synced 2024-11-10 11:31:58 +08:00

Add Scheduler

- Renamed previously called scheduler to forwarder to resolve name
  conflicts
This commit is contained in:
Ken Hibino 2020-09-26 17:33:29 -07:00
parent fadcae76d6
commit 50e7f38365
19 changed files with 1002 additions and 200 deletions

View File

@ -28,7 +28,7 @@ type Client struct {
rdb *rdb.RDB rdb *rdb.RDB
} }
// NewClient and returns a new Client given a redis connection option. // NewClient returns a new Client instance given a redis connection option.
func NewClient(r RedisConnOpt) *Client { func NewClient(r RedisConnOpt) *Client {
rdb := rdb.NewRDB(createRedisClient(r)) rdb := rdb.NewRDB(createRedisClient(r))
return &Client{ return &Client{
@ -208,6 +208,9 @@ type Result struct {
// ID is a unique identifier for the task. // ID is a unique identifier for the task.
ID string ID string
// EnqueuedAt is the time the task was enqueued in UTC.
EnqueuedAt time.Time
// ProcessAt indicates when the task should be processed. // ProcessAt indicates when the task should be processed.
ProcessAt time.Time ProcessAt time.Time
@ -298,12 +301,13 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) {
return nil, err return nil, err
} }
return &Result{ return &Result{
ID: msg.ID.String(), ID: msg.ID.String(),
ProcessAt: opt.processAt, EnqueuedAt: time.Now().UTC(),
Queue: msg.Queue, ProcessAt: opt.processAt,
Retry: msg.Retry, Queue: msg.Queue,
Timeout: timeout, Retry: msg.Retry,
Deadline: deadline, Timeout: timeout,
Deadline: deadline,
}, nil }, nil
} }

View File

@ -42,11 +42,12 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
processAt: now, processAt: now,
opts: []Option{}, opts: []Option{},
wantRes: &Result{ wantRes: &Result{
ProcessAt: now, EnqueuedAt: now.UTC(),
Queue: "default", ProcessAt: now,
Retry: defaultMaxRetry, Queue: "default",
Timeout: defaultTimeout, Retry: defaultMaxRetry,
Deadline: noDeadline, Timeout: defaultTimeout,
Deadline: noDeadline,
}, },
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": { "default": {
@ -70,11 +71,12 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
processAt: oneHourLater, processAt: oneHourLater,
opts: []Option{}, opts: []Option{},
wantRes: &Result{ wantRes: &Result{
ProcessAt: oneHourLater, EnqueuedAt: now.UTC(),
Queue: "default", ProcessAt: oneHourLater,
Retry: defaultMaxRetry, Queue: "default",
Timeout: defaultTimeout, Retry: defaultMaxRetry,
Deadline: noDeadline, Timeout: defaultTimeout,
Deadline: noDeadline,
}, },
wantPending: map[string][]*base.TaskMessage{ wantPending: map[string][]*base.TaskMessage{
"default": {}, "default": {},
@ -111,8 +113,8 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) {
cmpopts.EquateApproxTime(500 * time.Millisecond), cmpopts.EquateApproxTime(500 * time.Millisecond),
} }
if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueueAt(processAt, task) returned %v, want %v; (-want,+got)\n%s", t.Errorf("%s;\nEnqueue(task, ProcessAt(%v)) returned %v, want %v; (-want,+got)\n%s",
tc.desc, gotRes, tc.wantRes, diff) tc.desc, tc.processAt, gotRes, tc.wantRes, diff)
} }
for qname, want := range tc.wantPending { for qname, want := range tc.wantPending {
@ -366,7 +368,7 @@ func TestClientEnqueue(t *testing.T) {
continue continue
} }
cmpOptions := []cmp.Option{ cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(Result{}, "ID"), cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"),
cmpopts.EquateApproxTime(500 * time.Millisecond), cmpopts.EquateApproxTime(500 * time.Millisecond),
} }
if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" {
@ -471,12 +473,12 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) {
continue continue
} }
cmpOptions := []cmp.Option{ cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(Result{}, "ID"), cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"),
cmpopts.EquateApproxTime(500 * time.Millisecond), cmpopts.EquateApproxTime(500 * time.Millisecond),
} }
if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" {
t.Errorf("%s;\nEnqueueIn(delay, task) returned %v, want %v; (-want,+got)\n%s", t.Errorf("%s;\nEnqueue(task, ProcessIn(%v)) returned %v, want %v; (-want,+got)\n%s",
tc.desc, gotRes, tc.wantRes, diff) tc.desc, tc.delay, gotRes, tc.wantRes, diff)
} }
for qname, want := range tc.wantPending { for qname, want := range tc.wantPending {
@ -617,7 +619,7 @@ func TestClientDefaultOptions(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
cmpOptions := []cmp.Option{ cmpOptions := []cmp.Option{
cmpopts.IgnoreFields(Result{}, "ID"), cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"),
cmpopts.EquateApproxTime(500 * time.Millisecond), cmpopts.EquateApproxTime(500 * time.Millisecond),
} }
if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" {

75
forwarder.go Normal file
View File

@ -0,0 +1,75 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"sync"
"time"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log"
)
// A forwarder is responsible for moving scheduled and retry tasks to pending state
// so that the tasks get processed by the workers.
type forwarder struct {
logger *log.Logger
broker base.Broker
// channel to communicate back to the long running "forwarder" goroutine.
done chan struct{}
// list of queue names to check and enqueue.
queues []string
// poll interval on average
avgInterval time.Duration
}
type forwarderParams struct {
logger *log.Logger
broker base.Broker
queues []string
interval time.Duration
}
func newForwarder(params forwarderParams) *forwarder {
return &forwarder{
logger: params.logger,
broker: params.broker,
done: make(chan struct{}),
queues: params.queues,
avgInterval: params.interval,
}
}
func (f *forwarder) terminate() {
f.logger.Debug("Forwarder shutting down...")
// Signal the forwarder goroutine to stop polling.
f.done <- struct{}{}
}
// start starts the "forwarder" goroutine.
func (f *forwarder) start(wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-f.done:
f.logger.Debug("Forwarder done")
return
case <-time.After(f.avgInterval):
f.exec()
}
}
}()
}
func (f *forwarder) exec() {
if err := f.broker.CheckAndEnqueue(f.queues...); err != nil {
f.logger.Errorf("Could not enqueue scheduled tasks: %v", err)
}
}

137
forwarder_test.go Normal file
View File

@ -0,0 +1,137 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package asynq
import (
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
h "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
)
func TestForwarder(t *testing.T) {
r := setup(t)
defer r.Close()
rdbClient := rdb.NewRDB(r)
const pollInterval = time.Second
s := newForwarder(forwarderParams{
logger: testLogger,
broker: rdbClient,
queues: []string{"default", "critical"},
interval: pollInterval,
})
t1 := h.NewTaskMessageWithQueue("gen_thumbnail", nil, "default")
t2 := h.NewTaskMessageWithQueue("send_email", nil, "critical")
t3 := h.NewTaskMessageWithQueue("reindex", nil, "default")
t4 := h.NewTaskMessageWithQueue("sync", nil, "critical")
now := time.Now()
tests := []struct {
initScheduled map[string][]base.Z // scheduled queue initial state
initRetry map[string][]base.Z // retry queue initial state
initPending map[string][]*base.TaskMessage // default queue initial state
wait time.Duration // wait duration before checking for final state
wantScheduled map[string][]*base.TaskMessage // schedule queue final state
wantRetry map[string][]*base.TaskMessage // retry queue final state
wantPending map[string][]*base.TaskMessage // default queue final state
}{
{
initScheduled: map[string][]base.Z{
"default": {{Message: t1, Score: now.Add(time.Hour).Unix()}},
"critical": {{Message: t2, Score: now.Add(-2 * time.Second).Unix()}},
},
initRetry: map[string][]base.Z{
"default": {{Message: t3, Score: time.Now().Add(-500 * time.Millisecond).Unix()}},
"critical": {},
},
initPending: map[string][]*base.TaskMessage{
"default": {},
"critical": {t4},
},
wait: pollInterval * 2,
wantScheduled: map[string][]*base.TaskMessage{
"default": {t1},
"critical": {},
},
wantRetry: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
},
wantPending: map[string][]*base.TaskMessage{
"default": {t3},
"critical": {t2, t4},
},
},
{
initScheduled: map[string][]base.Z{
"default": {
{Message: t1, Score: now.Unix()},
{Message: t3, Score: now.Add(-500 * time.Millisecond).Unix()},
},
"critical": {
{Message: t2, Score: now.Add(-2 * time.Second).Unix()},
},
},
initRetry: map[string][]base.Z{
"default": {},
"critical": {},
},
initPending: map[string][]*base.TaskMessage{
"default": {},
"critical": {t4},
},
wait: pollInterval * 2,
wantScheduled: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
},
wantRetry: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
},
wantPending: map[string][]*base.TaskMessage{
"default": {t1, t3},
"critical": {t2, t4},
},
},
}
for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case.
h.SeedAllScheduledQueues(t, r, tc.initScheduled) // initialize scheduled queue
h.SeedAllRetryQueues(t, r, tc.initRetry) // initialize retry queue
h.SeedAllPendingQueues(t, r, tc.initPending) // initialize default queue
var wg sync.WaitGroup
s.start(&wg)
time.Sleep(tc.wait)
s.terminate()
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledMessages(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running forwarder: (-want, +got)\n%s", base.ScheduledKey(qname), diff)
}
}
for qname, want := range tc.wantRetry {
gotRetry := h.GetRetryMessages(t, r, qname)
if diff := cmp.Diff(want, gotRetry, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running forwarder: (-want, +got)\n%s", base.RetryKey(qname), diff)
}
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running forwarder: (-want, +got)\n%s", base.QueueKey(qname), diff)
}
}
}
}

1
go.mod
View File

@ -6,6 +6,7 @@ require (
github.com/go-redis/redis/v7 v7.4.0 github.com/go-redis/redis/v7 v7.4.0
github.com/google/go-cmp v0.4.0 github.com/google/go-cmp v0.4.0
github.com/google/uuid v1.1.1 github.com/google/uuid v1.1.1
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/cast v1.3.1 github.com/spf13/cast v1.3.1
go.uber.org/goleak v0.10.0 go.uber.org/goleak v0.10.0
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e

2
go.sum
View File

@ -27,6 +27,8 @@ github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng= github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng=
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=

View File

@ -65,6 +65,24 @@ var SortWorkerInfoOpt = cmp.Transformer("SortWorkerInfo", func(in []*base.Worker
return out return out
}) })
// SortSchedulerEntryOpt is a cmp.Option to sort base.SchedulerEntry for comparing slice of entries.
var SortSchedulerEntryOpt = cmp.Transformer("SortSchedulerEntry", func(in []*base.SchedulerEntry) []*base.SchedulerEntry {
out := append([]*base.SchedulerEntry(nil), in...) // Copy input to avoid mutating it
sort.Slice(out, func(i, j int) bool {
return out[i].Spec < out[j].Spec
})
return out
})
// SortSchedulerEnqueueEventOpt is a cmp.Option to sort base.SchedulerEnqueueEvent for comparing slice of events.
var SortSchedulerEnqueueEventOpt = cmp.Transformer("SortSchedulerEnqueueEvent", func(in []*base.SchedulerEnqueueEvent) []*base.SchedulerEnqueueEvent {
out := append([]*base.SchedulerEnqueueEvent(nil), in...)
sort.Slice(out, func(i, j int) bool {
return out[i].EnqueuedAt.Unix() < out[j].EnqueuedAt.Unix()
})
return out
})
// SortStringSliceOpt is a cmp.Option to sort string slice. // SortStringSliceOpt is a cmp.Option to sort string slice.
var SortStringSliceOpt = cmp.Transformer("SortStringSlice", func(in []string) []string { var SortStringSliceOpt = cmp.Transformer("SortStringSlice", func(in []string) []string {
out := append([]string(nil), in...) out := append([]string(nil), in...)

View File

@ -29,10 +29,11 @@ var DefaultQueue = QueueKey(DefaultQueueName)
// Global Redis keys. // Global Redis keys.
const ( const (
AllServers = "asynq:servers" // ZSET AllServers = "asynq:servers" // ZSET
AllWorkers = "asynq:workers" // ZSET AllWorkers = "asynq:workers" // ZSET
AllQueues = "asynq:queues" // SET AllSchedulers = "asynq:schedulers" // ZSET
CancelChannel = "asynq:cancel" // PubSub channel AllQueues = "asynq:queues" // SET
CancelChannel = "asynq:cancel" // PubSub channel
) )
// QueueKey returns a redis key for the given queue name. // QueueKey returns a redis key for the given queue name.
@ -81,13 +82,23 @@ func FailedKey(qname string, t time.Time) string {
} }
// ServerInfoKey returns a redis key for process info. // ServerInfoKey returns a redis key for process info.
func ServerInfoKey(hostname string, pid int, sid string) string { func ServerInfoKey(hostname string, pid int, serverID string) string {
return fmt.Sprintf("asynq:servers:{%s:%d:%s}", hostname, pid, sid) return fmt.Sprintf("asynq:servers:{%s:%d:%s}", hostname, pid, serverID)
} }
// WorkersKey returns a redis key for the workers given hostname, pid, and server ID. // WorkersKey returns a redis key for the workers given hostname, pid, and server ID.
func WorkersKey(hostname string, pid int, sid string) string { func WorkersKey(hostname string, pid int, serverID string) string {
return fmt.Sprintf("asynq:workers:{%s:%d:%s}", hostname, pid, sid) return fmt.Sprintf("asynq:workers:{%s:%d:%s}", hostname, pid, serverID)
}
// SchedulerEntriesKey returns a redis key for the scheduler entries given scheduler ID.
func SchedulerEntriesKey(schedulerID string) string {
return fmt.Sprintf("asynq:schedulers:{%s}", schedulerID)
}
// SchedulerHistoryKey returns a redis key for the scheduler's history for the given entry.
func SchedulerHistoryKey(entryID string) string {
return fmt.Sprintf("asynq:scheduler_history:%s", entryID)
} }
// UniqueKey returns a redis key with the given type, payload, and queue name. // UniqueKey returns a redis key with the given type, payload, and queue name.
@ -208,10 +219,10 @@ const (
// StatusIdle indicates the server is in idle state. // StatusIdle indicates the server is in idle state.
StatusIdle ServerStatusValue = iota StatusIdle ServerStatusValue = iota
// StatusRunning indicates the servier is up and processing tasks. // StatusRunning indicates the server is up and active.
StatusRunning StatusRunning
// StatusQuiet indicates the server is up but not processing new tasks. // StatusQuiet indicates the server is up but not active.
StatusQuiet StatusQuiet
// StatusStopped indicates the server server has been stopped. // StatusStopped indicates the server server has been stopped.
@ -273,6 +284,40 @@ type WorkerInfo struct {
Started time.Time Started time.Time
} }
// SchedulerEntry holds information about a periodic task registered with a scheduler.
type SchedulerEntry struct {
// Identifier of this entry.
ID string
// Spec describes the schedule of this entry.
Spec string
// Type is the task type of the periodic task.
Type string
// Payload is the payload of the periodic task.
Payload map[string]interface{}
// Opts is the options for the periodic task.
Opts string
// Next shows the next time the task will be enqueued.
Next time.Time
// Prev shows the last time the task was enqueued.
// Zero time if task was never enqueued.
Prev time.Time
}
// SchedulerEnqueueEvent holds information about an enqueue event by a scheduler.
type SchedulerEnqueueEvent struct {
// ID of the task that was enqueued.
TaskID string
// Time the task was enqueued.
EnqueuedAt time.Time
}
// Cancelations is a collection that holds cancel functions for all active tasks. // Cancelations is a collection that holds cancel functions for all active tasks.
// //
// Cancelations are safe for concurrent use by multipel goroutines. // Cancelations are safe for concurrent use by multipel goroutines.

View File

@ -212,6 +212,41 @@ func TestWorkersKey(t *testing.T) {
} }
} }
func TestSchedulerEntriesKey(t *testing.T) {
tests := []struct {
schedulerID string
want string
}{
{"localhost:9876:scheduler123", "asynq:schedulers:{localhost:9876:scheduler123}"},
{"127.0.0.1:1234:scheduler987", "asynq:schedulers:{127.0.0.1:1234:scheduler987}"},
}
for _, tc := range tests {
got := SchedulerEntriesKey(tc.schedulerID)
if got != tc.want {
t.Errorf("SchedulerEntriesKey(%q) = %q, want %q", tc.schedulerID, got, tc.want)
}
}
}
func TestSchedulerHistoryKey(t *testing.T) {
tests := []struct {
entryID string
want string
}{
{"entry876", "asynq:scheduler_history:entry876"},
{"entry345", "asynq:scheduler_history:entry345"},
}
for _, tc := range tests {
got := SchedulerHistoryKey(tc.entryID)
if got != tc.want {
t.Errorf("SchedulerHistoryKey(%q) = %q, want %q",
tc.entryID, got, tc.want)
}
}
}
func TestUniqueKey(t *testing.T) { func TestUniqueKey(t *testing.T) {
tests := []struct { tests := []struct {
desc string desc string

View File

@ -758,7 +758,7 @@ return keys`)
// ListServers returns the list of server info. // ListServers returns the list of server info.
func (r *RDB) ListServers() ([]*base.ServerInfo, error) { func (r *RDB) ListServers() ([]*base.ServerInfo, error) {
now := time.Now().UTC() now := time.Now()
res, err := listServerKeysCmd.Run(r.client, []string{base.AllServers}, now.Unix()).Result() res, err := listServerKeysCmd.Run(r.client, []string{base.AllServers}, now.Unix()).Result()
if err != nil { if err != nil {
return nil, err return nil, err
@ -791,7 +791,7 @@ return keys`)
// ListWorkers returns the list of worker stats. // ListWorkers returns the list of worker stats.
func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) { func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
now := time.Now().UTC() now := time.Now()
res, err := listWorkerKeysCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result() res, err := listWorkerKeysCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result()
if err != nil { if err != nil {
return nil, err return nil, err
@ -818,6 +818,63 @@ func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) {
return workers, nil return workers, nil
} }
// Note: Script also removes stale keys.
var listSchedulerKeysCmd = redis.NewScript(`
local now = tonumber(ARGV[1])
local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf")
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1)
return keys`)
// ListSchedulerEntries returns the list of scheduler entries.
func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) {
now := time.Now()
res, err := listSchedulerKeysCmd.Run(r.client, []string{base.AllSchedulers}, now.Unix()).Result()
if err != nil {
return nil, err
}
keys, err := cast.ToStringSliceE(res)
if err != nil {
return nil, err
}
var entries []*base.SchedulerEntry
for _, key := range keys {
data, err := r.client.LRange(key, 0, -1).Result()
if err != nil {
continue // skip bad data
}
for _, s := range data {
var e base.SchedulerEntry
if err := json.Unmarshal([]byte(s), &e); err != nil {
continue // skip bad data
}
entries = append(entries, &e)
}
}
return entries, nil
}
// ListSchedulerEnqueueEvents returns the list of scheduler enqueue events.
func (r *RDB) ListSchedulerEnqueueEvents(entryID string) ([]*base.SchedulerEnqueueEvent, error) {
key := base.SchedulerHistoryKey(entryID)
zs, err := r.client.ZRangeWithScores(key, 0, -1).Result()
if err != nil {
return nil, err
}
var events []*base.SchedulerEnqueueEvent
for _, z := range zs {
data, err := cast.ToStringE(z.Member)
if err != nil {
return nil, err
}
var e base.SchedulerEnqueueEvent
if err := json.Unmarshal([]byte(data), &e); err != nil {
return nil, err
}
events = append(events, &e)
}
return events, nil
}
// Pause pauses processing of tasks from the given queue. // Pause pauses processing of tasks from the given queue.
func (r *RDB) Pause(qname string) error { func (r *RDB) Pause(qname string) error {
key := base.PausedKey(qname) key := base.PausedKey(qname)

View File

@ -2983,6 +2983,103 @@ func TestListWorkers(t *testing.T) {
} }
} }
func TestWriteListClearSchedulerEntries(t *testing.T) {
r := setup(t)
now := time.Now().UTC()
schedulerID := "127.0.0.1:9876:abc123"
data := []*base.SchedulerEntry{
&base.SchedulerEntry{
Spec: "* * * * *",
Type: "foo",
Payload: nil,
Opts: "",
Next: now.Add(5 * time.Hour),
Prev: now.Add(-2 * time.Hour),
},
&base.SchedulerEntry{
Spec: "@every 20m",
Type: "bar",
Payload: map[string]interface{}{"fiz": "baz"},
Opts: "",
Next: now.Add(1 * time.Minute),
Prev: now.Add(-19 * time.Minute),
},
}
if err := r.WriteSchedulerEntries(schedulerID, data, 30*time.Second); err != nil {
t.Fatalf("WriteSchedulerEnties failed: %v", err)
}
entries, err := r.ListSchedulerEntries()
if err != nil {
t.Fatalf("ListSchedulerEntries failed: %v", err)
}
if diff := cmp.Diff(data, entries, h.SortSchedulerEntryOpt); diff != "" {
t.Errorf("ListSchedulerEntries() = %v, want %v; (-want,+got)\n%s", entries, data, diff)
}
if err := r.ClearSchedulerEntries(schedulerID); err != nil {
t.Fatalf("ClearSchedulerEntries failed: %v", err)
}
entries, err = r.ListSchedulerEntries()
if err != nil {
t.Fatalf("ListSchedulerEntries() after clear failed: %v", err)
}
if len(entries) != 0 {
t.Errorf("found %d entries, want 0 after clearing", len(entries))
}
}
func TestSchedulerEnqueueEvents(t *testing.T) {
r := setup(t)
var (
now = time.Now()
oneDayAgo = now.Add(-24 * time.Hour)
oneHourAgo = now.Add(-1 * time.Hour)
)
type event struct {
entryID string
taskID string
enqueuedAt time.Time
}
tests := []struct {
entryID string
events []*base.SchedulerEnqueueEvent
}{
{
entryID: "entry123",
events: []*base.SchedulerEnqueueEvent{{"task123", oneDayAgo}, {"task456", oneHourAgo}},
},
{
entryID: "entry123",
events: []*base.SchedulerEnqueueEvent{},
},
}
loop:
for _, tc := range tests {
h.FlushDB(t, r.client)
for _, e := range tc.events {
if err := r.RecordSchedulerEnqueueEvent(tc.entryID, e); err != nil {
t.Errorf("RecordSchedulerEnqueueEvent(%q, %v) failed: %v", tc.entryID, e, err)
continue loop
}
}
got, err := r.ListSchedulerEnqueueEvents(tc.entryID)
if err != nil {
t.Errorf("ListSchedulerEnqueueEvents(%q) failed: %v", tc.entryID, err)
continue
}
if diff := cmp.Diff(tc.events, got, h.SortSchedulerEnqueueEventOpt, timeCmpOpt); diff != "" {
t.Errorf("ListSchedulerEnqueueEvent(%q) = %v, want %v; (-want,+got)\n%s",
tc.entryID, got, tc.events, diff)
}
}
}
func TestPause(t *testing.T) { func TestPause(t *testing.T) {
r := setup(t) r := setup(t)

View File

@ -575,6 +575,45 @@ func (r *RDB) ClearServerState(host string, pid int, serverID string) error {
return clearServerStateCmd.Run(r.client, []string{skey, wkey}).Err() return clearServerStateCmd.Run(r.client, []string{skey, wkey}).Err()
} }
// KEYS[1] -> asynq:schedulers:{<schedulerID>}
// ARGV[1] -> TTL in seconds
// ARGV[2:] -> schedler entries
var writeSchedulerEntriesCmd = redis.NewScript(`
redis.call("DEL", KEYS[1])
for i = 2, #ARGV do
redis.call("LPUSH", KEYS[1], ARGV[i])
end
redis.call("EXPIRE", KEYS[1], ARGV[1])
return redis.status_reply("OK")`)
// WriteSchedulerEntries writes scheduler entries data to redis with expiration set to the value ttl.
func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error {
args := []interface{}{ttl.Seconds()}
for _, e := range entries {
bytes, err := json.Marshal(e)
if err != nil {
continue // skip bad data
}
args = append(args, bytes)
}
exp := time.Now().Add(ttl).UTC()
key := base.SchedulerEntriesKey(schedulerID)
err := r.client.ZAdd(base.AllSchedulers, &redis.Z{Score: float64(exp.Unix()), Member: key}).Err()
if err != nil {
return err
}
return writeSchedulerEntriesCmd.Run(r.client, []string{key}, args...).Err()
}
// ClearSchedulerEntries deletes scheduler entries data from redis.
func (r *RDB) ClearSchedulerEntries(scheduelrID string) error {
key := base.SchedulerEntriesKey(scheduelrID)
if err := r.client.ZRem(base.AllSchedulers, key).Err(); err != nil {
return err
}
return r.client.Del(key).Err()
}
// CancelationPubSub returns a pubsub for cancelation messages. // CancelationPubSub returns a pubsub for cancelation messages.
func (r *RDB) CancelationPubSub() (*redis.PubSub, error) { func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
pubsub := r.client.Subscribe(base.CancelChannel) pubsub := r.client.Subscribe(base.CancelChannel)
@ -590,3 +629,26 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) {
func (r *RDB) PublishCancelation(id string) error { func (r *RDB) PublishCancelation(id string) error {
return r.client.Publish(base.CancelChannel, id).Err() return r.client.Publish(base.CancelChannel, id).Err()
} }
// KEYS[1] -> asynq:scheduler_history:<entryID>
// ARGV[1] -> enqueued_at timestamp
// ARGV[2] -> serialized SchedulerEnqueueEvent data
// ARGV[3] -> max number of events to be persisted
var recordSchedulerEnqueueEventCmd = redis.NewScript(`
redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2])
redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[3])
return redis.status_reply("OK")`)
// Maximum number of enqueue events to store per entry.
const maxEvents = 10000
// RecordSchedulerEnqueueEvent records the time when the given task was enqueued.
func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error {
key := base.SchedulerHistoryKey(entryID)
data, err := json.Marshal(event)
if err != nil {
return err
}
return recordSchedulerEnqueueEventCmd.Run(
r.client, []string{key}, event.EnqueuedAt.Unix(), data, maxEvents).Err()
}

View File

@ -5,69 +5,244 @@
package asynq package asynq
import ( import (
"fmt"
"os"
"strings"
"sync" "sync"
"time" "time"
"github.com/google/uuid"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/log" "github.com/hibiken/asynq/internal/log"
"github.com/hibiken/asynq/internal/rdb"
"github.com/robfig/cron/v3"
) )
type scheduler struct { // A Scheduler kicks off tasks at regular intervals based on the user defined schedule.
logger *log.Logger type Scheduler struct {
broker base.Broker id string
status *base.ServerStatus
// channel to communicate back to the long running "scheduler" goroutine.
done chan struct{}
// list of queue names to check and enqueue.
queues []string
// poll interval on average
avgInterval time.Duration
}
type schedulerParams struct {
logger *log.Logger logger *log.Logger
broker base.Broker client *Client
queues []string rdb *rdb.RDB
interval time.Duration cron *cron.Cron
location *time.Location
done chan struct{}
wg sync.WaitGroup
} }
func newScheduler(params schedulerParams) *scheduler { // NewScheduler returns a new Scheduler instance given the redis connection option.
return &scheduler{ // The parameter opts is optional, defaults will be used if opts is set to nil
logger: params.logger, func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler {
broker: params.broker, if opts == nil {
done: make(chan struct{}), opts = &SchedulerOpts{}
queues: params.queues, }
avgInterval: params.interval,
logger := log.NewLogger(opts.Logger)
loglevel := opts.LogLevel
if loglevel == level_unspecified {
loglevel = InfoLevel
}
logger.SetLevel(toInternalLogLevel(loglevel))
loc := opts.Location
if loc == nil {
loc = time.UTC
}
return &Scheduler{
id: generateSchedulerID(),
status: base.NewServerStatus(base.StatusIdle),
logger: logger,
client: NewClient(r),
rdb: rdb.NewRDB(createRedisClient(r)),
cron: cron.New(cron.WithLocation(loc)),
location: loc,
done: make(chan struct{}),
} }
} }
func (s *scheduler) terminate() { func generateSchedulerID() string {
s.logger.Debug("Scheduler shutting down...") host, err := os.Hostname()
// Signal the scheduler goroutine to stop polling. if err != nil {
s.done <- struct{}{} host = "unknown-host"
}
return fmt.Sprintf("%s:%d:%v", host, os.Getpid(), uuid.New())
} }
// start starts the "scheduler" goroutine. // SchedulerOpts specifies scheduler options.
func (s *scheduler) start(wg *sync.WaitGroup) { type SchedulerOpts struct {
wg.Add(1) // Logger specifies the logger used by the scheduler instance.
go func() { //
defer wg.Done() // If unset, the default logger is used.
for { Logger Logger
select {
case <-s.done: // LogLevel specifies the minimum log level to enable.
s.logger.Debug("Scheduler done") //
return // If unset, InfoLevel is used by default.
case <-time.After(s.avgInterval): LogLevel LogLevel
s.exec()
} // Location specifies the time zone location.
//
// If unset, the UTC time zone (time.UTC) is used.
Location *time.Location
// TODO: Add ErrorHandler
}
// enqueueJob encapsulates the job of enqueing a task and recording the event.
type enqueueJob struct {
id uuid.UUID
cronspec string
task *Task
opts []Option
location *time.Location
logger *log.Logger
client *Client
rdb *rdb.RDB
}
func (j *enqueueJob) Run() {
res, err := j.client.Enqueue(j.task, j.opts...)
if err != nil {
j.logger.Errorf("scheduler could not enqueue a task %+v: %v", j.task, err)
return
}
j.logger.Infof("scheduler enqueued a task: %+v", res)
event := &base.SchedulerEnqueueEvent{
TaskID: res.ID,
EnqueuedAt: res.EnqueuedAt.In(j.location),
}
err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event)
if err != nil {
j.logger.Errorf("scheduler could not record enqueue event of enqueued task %+v: %v", j.task, err)
}
}
// Register registers a task to be enqueued with given schedule specified by the cronspec.
// It returns an ID of the newly registered entry.
func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) {
job := &enqueueJob{
id: uuid.New(),
cronspec: cronspec,
task: task,
opts: opts,
location: s.location,
client: s.client,
rdb: s.rdb,
logger: s.logger,
}
if _, err = s.cron.AddJob(cronspec, job); err != nil {
return "", err
}
return job.id.String(), nil
}
// Run starts the scheduler until an os signal to exit the program is received.
// It returns an error if scheduler is already running or has been stopped.
func (s *Scheduler) Run() error {
if err := s.Start(); err != nil {
return err
}
s.waitForSignals()
return s.Stop()
}
// Start starts the scheduler.
// It returns an error if the scheduler is already running or has been stopped.
func (s *Scheduler) Start() error {
switch s.status.Get() {
case base.StatusRunning:
return fmt.Errorf("asynq: the scheduler is already running")
case base.StatusStopped:
return fmt.Errorf("asynq: the scheduler has already been stopped")
}
s.logger.Info("Scheduler starting")
s.logger.Infof("Scheduler timezone is set to %v", s.location)
s.cron.Start()
s.wg.Add(1)
go s.runHeartbeater()
s.status.Set(base.StatusRunning)
return nil
}
// Stop stops the scheduler.
// It returns an error if the scheduler is not currently running.
func (s *Scheduler) Stop() error {
if s.status.Get() != base.StatusRunning {
return fmt.Errorf("asynq: the scheduler is not running")
}
s.logger.Info("Scheduler shutting down")
close(s.done) // signal heartbeater to stop
ctx := s.cron.Stop()
<-ctx.Done()
s.wg.Wait()
s.client.Close()
s.rdb.Close()
s.status.Set(base.StatusStopped)
s.logger.Info("Scheduler stopped")
return nil
}
func (s *Scheduler) runHeartbeater() {
defer s.wg.Done()
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-s.done:
s.logger.Debugf("Scheduler heatbeater shutting down")
s.rdb.ClearSchedulerEntries(s.id)
return
case <-ticker.C:
s.beat()
} }
}()
}
func (s *scheduler) exec() {
if err := s.broker.CheckAndEnqueue(s.queues...); err != nil {
s.logger.Errorf("Could not enqueue scheduled tasks: %v", err)
} }
} }
// beat writes a snapshot of entries to redis.
func (s *Scheduler) beat() {
var entries []*base.SchedulerEntry
for _, entry := range s.cron.Entries() {
job := entry.Job.(*enqueueJob)
e := &base.SchedulerEntry{
ID: job.id.String(),
Spec: job.cronspec,
Type: job.task.Type,
Payload: job.task.Payload.data,
Opts: stringifyOptions(job.opts),
Next: entry.Next,
Prev: entry.Prev,
}
entries = append(entries, e)
}
s.logger.Debugf("Writing entries %v", entries)
if err := s.rdb.WriteSchedulerEntries(s.id, entries, 5*time.Second); err != nil {
s.logger.Warnf("Scheduler could not write heartbeat data: %v", err)
}
}
func stringifyOptions(opts []Option) string {
var res []string
for _, opt := range opts {
switch opt := opt.(type) {
case retryOption:
res = append(res, fmt.Sprintf("MaxRetry(%d)", int(opt)))
case queueOption:
res = append(res, fmt.Sprintf("Queue(%q)", string(opt)))
case timeoutOption:
res = append(res, fmt.Sprintf("Timeout(%v)", time.Duration(opt)))
case deadlineOption:
res = append(res, fmt.Sprintf("Deadline(%v)", time.Time(opt)))
case uniqueOption:
res = append(res, fmt.Sprintf("Unique(%v)", time.Duration(opt)))
case processAtOption:
res = append(res, fmt.Sprintf("ProcessAt(%v)", time.Time(opt)))
case processInOption:
res = append(res, fmt.Sprintf("ProcessIn(%v)", time.Duration(opt)))
default:
// ignore unexpected option
}
}
return strings.Join(res, ", ")
}

View File

@ -5,133 +5,109 @@
package asynq package asynq
import ( import (
"sync" "fmt"
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
h "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/asynqtest"
"github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/base"
"github.com/hibiken/asynq/internal/rdb"
) )
func TestScheduler(t *testing.T) { func TestScheduler(t *testing.T) {
r := setup(t)
defer r.Close()
rdbClient := rdb.NewRDB(r)
const pollInterval = time.Second
s := newScheduler(schedulerParams{
logger: testLogger,
broker: rdbClient,
queues: []string{"default", "critical"},
interval: pollInterval,
})
t1 := h.NewTaskMessageWithQueue("gen_thumbnail", nil, "default")
t2 := h.NewTaskMessageWithQueue("send_email", nil, "critical")
t3 := h.NewTaskMessageWithQueue("reindex", nil, "default")
t4 := h.NewTaskMessageWithQueue("sync", nil, "critical")
now := time.Now()
tests := []struct { tests := []struct {
initScheduled map[string][]base.Z // scheduled queue initial state cronspec string
initRetry map[string][]base.Z // retry queue initial state task *Task
initPending map[string][]*base.TaskMessage // default queue initial state opts []Option
wait time.Duration // wait duration before checking for final state wait time.Duration
wantScheduled map[string][]*base.TaskMessage // schedule queue final state queue string
wantRetry map[string][]*base.TaskMessage // retry queue final state want []*base.TaskMessage
wantPending map[string][]*base.TaskMessage // default queue final state
}{ }{
{ {
initScheduled: map[string][]base.Z{ cronspec: "@every 3s",
"default": {{Message: t1, Score: now.Add(time.Hour).Unix()}}, task: NewTask("task1", nil),
"critical": {{Message: t2, Score: now.Add(-2 * time.Second).Unix()}}, opts: []Option{MaxRetry(10)},
}, wait: 10 * time.Second,
initRetry: map[string][]base.Z{ queue: "default",
"default": {{Message: t3, Score: time.Now().Add(-500 * time.Millisecond).Unix()}}, want: []*base.TaskMessage{
"critical": {}, {
}, Type: "task1",
initPending: map[string][]*base.TaskMessage{ Payload: nil,
"default": {}, Retry: 10,
"critical": {t4}, Timeout: int64(defaultTimeout.Seconds()),
}, Queue: "default",
wait: pollInterval * 2, },
wantScheduled: map[string][]*base.TaskMessage{ {
"default": {t1}, Type: "task1",
"critical": {}, Payload: nil,
}, Retry: 10,
wantRetry: map[string][]*base.TaskMessage{ Timeout: int64(defaultTimeout.Seconds()),
"default": {}, Queue: "default",
"critical": {}, },
}, {
wantPending: map[string][]*base.TaskMessage{ Type: "task1",
"default": {t3}, Payload: nil,
"critical": {t2, t4}, Retry: 10,
Timeout: int64(defaultTimeout.Seconds()),
Queue: "default",
},
}, },
}, },
}
r := setup(t)
for _, tc := range tests {
scheduler := NewScheduler(getRedisConnOpt(t), nil)
if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil {
t.Fatal(err)
}
if err := scheduler.Start(); err != nil {
t.Fatal(err)
}
time.Sleep(tc.wait)
if err := scheduler.Stop(); err != nil {
t.Fatal(err)
}
got := asynqtest.GetPendingMessages(t, r, tc.queue)
if diff := cmp.Diff(tc.want, got, asynqtest.IgnoreIDOpt); diff != "" {
t.Errorf("mismatch found in queue %q: (-want,+got)\n%s", tc.queue, diff)
}
}
}
func TestStringifyOptions(t *testing.T) {
now := time.Now()
oneHourFromNow := now.Add(1 * time.Hour)
twoHoursFromNow := now.Add(2 * time.Hour)
tests := []struct {
opts []Option
want string
}{
{ {
initScheduled: map[string][]base.Z{ opts: []Option{MaxRetry(10)},
"default": { want: "MaxRetry(10)",
{Message: t1, Score: now.Unix()}, },
{Message: t3, Score: now.Add(-500 * time.Millisecond).Unix()}, {
}, opts: []Option{Queue("custom"), Timeout(1 * time.Minute)},
"critical": { want: `Queue("custom"), Timeout(1m0s)`,
{Message: t2, Score: now.Add(-2 * time.Second).Unix()}, },
}, {
}, opts: []Option{ProcessAt(oneHourFromNow), Deadline(twoHoursFromNow)},
initRetry: map[string][]base.Z{ want: fmt.Sprintf("ProcessAt(%v), Deadline(%v)", oneHourFromNow, twoHoursFromNow),
"default": {}, },
"critical": {}, {
}, opts: []Option{ProcessIn(30 * time.Minute), Unique(1 * time.Hour)},
initPending: map[string][]*base.TaskMessage{ want: "ProcessIn(30m0s), Unique(1h0m0s)",
"default": {},
"critical": {t4},
},
wait: pollInterval * 2,
wantScheduled: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
},
wantRetry: map[string][]*base.TaskMessage{
"default": {},
"critical": {},
},
wantPending: map[string][]*base.TaskMessage{
"default": {t1, t3},
"critical": {t2, t4},
},
}, },
} }
for _, tc := range tests { for _, tc := range tests {
h.FlushDB(t, r) // clean up db before each test case. got := stringifyOptions(tc.opts)
h.SeedAllScheduledQueues(t, r, tc.initScheduled) // initialize scheduled queue if got != tc.want {
h.SeedAllRetryQueues(t, r, tc.initRetry) // initialize retry queue t.Errorf("got %v, want %v", got, tc.want)
h.SeedAllPendingQueues(t, r, tc.initPending) // initialize default queue
var wg sync.WaitGroup
s.start(&wg)
time.Sleep(tc.wait)
s.terminate()
for qname, want := range tc.wantScheduled {
gotScheduled := h.GetScheduledMessages(t, r, qname)
if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.ScheduledKey(qname), diff)
}
}
for qname, want := range tc.wantRetry {
gotRetry := h.GetRetryMessages(t, r, qname)
if diff := cmp.Diff(want, gotRetry, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.RetryKey(qname), diff)
}
}
for qname, want := range tc.wantPending {
gotPending := h.GetPendingMessages(t, r, qname)
if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" {
t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.QueueKey(qname), diff)
}
} }
} }
} }

View File

@ -41,7 +41,7 @@ type Server struct {
// wait group to wait for all goroutines to finish. // wait group to wait for all goroutines to finish.
wg sync.WaitGroup wg sync.WaitGroup
scheduler *scheduler forwarder *forwarder
processor *processor processor *processor
syncer *syncer syncer *syncer
heartbeater *heartbeater heartbeater *heartbeater
@ -333,7 +333,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
starting: starting, starting: starting,
finished: finished, finished: finished,
}) })
scheduler := newScheduler(schedulerParams{ forwarder := newForwarder(forwarderParams{
logger: logger, logger: logger,
broker: rdb, broker: rdb,
queues: qnames, queues: qnames,
@ -375,7 +375,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server {
logger: logger, logger: logger,
broker: rdb, broker: rdb,
status: status, status: status,
scheduler: scheduler, forwarder: forwarder,
processor: processor, processor: processor,
syncer: syncer, syncer: syncer,
heartbeater: heartbeater, heartbeater: heartbeater,
@ -453,7 +453,7 @@ func (srv *Server) Start(handler Handler) error {
srv.subscriber.start(&srv.wg) srv.subscriber.start(&srv.wg)
srv.syncer.start(&srv.wg) srv.syncer.start(&srv.wg)
srv.recoverer.start(&srv.wg) srv.recoverer.start(&srv.wg)
srv.scheduler.start(&srv.wg) srv.forwarder.start(&srv.wg)
srv.processor.start(&srv.wg) srv.processor.start(&srv.wg)
return nil return nil
} }
@ -474,7 +474,7 @@ func (srv *Server) Stop() {
// Sender goroutines should be terminated before the receiver goroutines. // Sender goroutines should be terminated before the receiver goroutines.
// processor -> syncer (via syncCh) // processor -> syncer (via syncCh)
// processor -> heartbeater (via starting, finished channels) // processor -> heartbeater (via starting, finished channels)
srv.scheduler.terminate() srv.forwarder.terminate()
srv.processor.terminate() srv.processor.terminate()
srv.recoverer.terminate() srv.recoverer.terminate()
srv.syncer.terminate() srv.syncer.terminate()

View File

@ -127,7 +127,7 @@ func TestServerWithRedisDown(t *testing.T) {
testBroker := testbroker.NewTestBroker(r) testBroker := testbroker.NewTestBroker(r)
srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel}) srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel})
srv.broker = testBroker srv.broker = testBroker
srv.scheduler.broker = testBroker srv.forwarder.broker = testBroker
srv.heartbeater.broker = testBroker srv.heartbeater.broker = testBroker
srv.processor.broker = testBroker srv.processor.broker = testBroker
srv.subscriber.broker = testBroker srv.subscriber.broker = testBroker
@ -160,7 +160,7 @@ func TestServerWithFlakyBroker(t *testing.T) {
redisConnOpt := getRedisConnOpt(t) redisConnOpt := getRedisConnOpt(t)
srv := NewServer(redisConnOpt, Config{LogLevel: testLogLevel}) srv := NewServer(redisConnOpt, Config{LogLevel: testLogLevel})
srv.broker = testBroker srv.broker = testBroker
srv.scheduler.broker = testBroker srv.forwarder.broker = testBroker
srv.heartbeater.broker = testBroker srv.heartbeater.broker = testBroker
srv.processor.broker = testBroker srv.processor.broker = testBroker
srv.subscriber.broker = testBroker srv.subscriber.broker = testBroker

View File

@ -28,3 +28,10 @@ func (srv *Server) waitForSignals() {
break break
} }
} }
func (s *Scheduler) waitForSignals() {
s.logger.Info("Send signal TERM or INT to stop the scheduler")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, unix.SIGTERM, unix.SIGINT)
<-sigs
}

View File

@ -20,3 +20,10 @@ func (srv *Server) waitForSignals() {
signal.Notify(sigs, windows.SIGTERM, windows.SIGINT) signal.Notify(sigs, windows.SIGTERM, windows.SIGINT)
<-sigs <-sigs
} }
func (s *Scheduler) waitForSignals() {
s.logger.Info("Send signal TERM or INT to stop the scheduler")
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, windows.SIGTERM, windows.SIGINT)
<-sigs
}

102
tools/asynq/cmd/cron.go Normal file
View File

@ -0,0 +1,102 @@
// Copyright 2020 Kentaro Hibino. All rights reserved.
// Use of this source code is governed by a MIT license
// that can be found in the LICENSE file.
package cmd
import (
"fmt"
"io"
"os"
"sort"
"github.com/spf13/cobra"
)
func init() {
rootCmd.AddCommand(cronCmd)
cronCmd.AddCommand(cronListCmd)
cronCmd.AddCommand(cronHistoryCmd)
}
var cronCmd = &cobra.Command{
Use: "cron",
Short: "Manage cron",
}
var cronListCmd = &cobra.Command{
Use: "ls",
Short: "List cron entries",
Run: cronList,
}
var cronHistoryCmd = &cobra.Command{
Use: "history",
Short: "Show history of each cron tasks",
Args: cobra.MinimumNArgs(1),
Run: cronHistory,
}
func cronList(cmd *cobra.Command, args []string) {
r := createRDB()
entries, err := r.ListSchedulerEntries()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if len(entries) == 0 {
fmt.Println("No scheduler entries")
return
}
// Sort entries by spec.
sort.Slice(entries, func(i, j int) bool {
x, y := entries[i], entries[j]
return x.Spec < y.Spec
})
cols := []string{"EntryID", "Spec", "Type", "Payload", "Options", "Next", "Prev"}
printRows := func(w io.Writer, tmpl string) {
for _, e := range entries {
fmt.Fprintf(w, tmpl, e.ID, e.Spec, e.Type, e.Payload, e.Opts, e.Next, e.Prev)
}
}
printTable(cols, printRows)
}
func cronHistory(cmd *cobra.Command, args []string) {
r := createRDB()
for i, entryID := range args {
if i > 0 {
fmt.Printf("\n%s\n", separator)
}
fmt.Println()
fmt.Printf("Entry: %s\n\n", entryID)
events, err := r.ListSchedulerEnqueueEvents(entryID)
if err != nil {
fmt.Printf("error: %v\n", err)
continue
}
if len(events) == 0 {
fmt.Printf("No scheduler enqueue events found for entry: %s\n", entryID)
continue
}
// Sort entries by enqueuedAt timestamp.
sort.Slice(events, func(i, j int) bool {
x, y := events[i], events[j]
return x.EnqueuedAt.Unix() > y.EnqueuedAt.Unix()
})
cols := []string{"TaskID", "EnqueuedAt"}
printRows := func(w io.Writer, tmpl string) {
for _, e := range events {
fmt.Fprintf(w, tmpl, e.TaskID, e.EnqueuedAt)
}
}
printTable(cols, printRows)
}
}