From 50e7f38365389836c35719c2037480c77c822400 Mon Sep 17 00:00:00 2001 From: Ken Hibino Date: Sat, 26 Sep 2020 17:33:29 -0700 Subject: [PATCH] Add Scheduler - Renamed previously called scheduler to forwarder to resolve name conflicts --- client.go | 18 ++- client_test.go | 36 +++-- forwarder.go | 75 +++++++++ forwarder_test.go | 137 ++++++++++++++++ go.mod | 1 + go.sum | 2 + internal/asynqtest/asynqtest.go | 18 +++ internal/base/base.go | 65 ++++++-- internal/base/base_test.go | 35 +++++ internal/rdb/inspect.go | 61 ++++++- internal/rdb/inspect_test.go | 97 ++++++++++++ internal/rdb/rdb.go | 62 ++++++++ scheduler.go | 271 ++++++++++++++++++++++++++------ scheduler_test.go | 194 ++++++++++------------- server.go | 10 +- server_test.go | 4 +- signals_unix.go | 7 + signals_windows.go | 7 + tools/asynq/cmd/cron.go | 102 ++++++++++++ 19 files changed, 1002 insertions(+), 200 deletions(-) create mode 100644 forwarder.go create mode 100644 forwarder_test.go create mode 100644 tools/asynq/cmd/cron.go diff --git a/client.go b/client.go index 1380968..6d6b80b 100644 --- a/client.go +++ b/client.go @@ -28,7 +28,7 @@ type Client struct { rdb *rdb.RDB } -// NewClient and returns a new Client given a redis connection option. +// NewClient returns a new Client instance given a redis connection option. func NewClient(r RedisConnOpt) *Client { rdb := rdb.NewRDB(createRedisClient(r)) return &Client{ @@ -208,6 +208,9 @@ type Result struct { // ID is a unique identifier for the task. ID string + // EnqueuedAt is the time the task was enqueued in UTC. + EnqueuedAt time.Time + // ProcessAt indicates when the task should be processed. ProcessAt time.Time @@ -298,12 +301,13 @@ func (c *Client) Enqueue(task *Task, opts ...Option) (*Result, error) { return nil, err } return &Result{ - ID: msg.ID.String(), - ProcessAt: opt.processAt, - Queue: msg.Queue, - Retry: msg.Retry, - Timeout: timeout, - Deadline: deadline, + ID: msg.ID.String(), + EnqueuedAt: time.Now().UTC(), + ProcessAt: opt.processAt, + Queue: msg.Queue, + Retry: msg.Retry, + Timeout: timeout, + Deadline: deadline, }, nil } diff --git a/client_test.go b/client_test.go index fc9429e..fa0dd3d 100644 --- a/client_test.go +++ b/client_test.go @@ -42,11 +42,12 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { processAt: now, opts: []Option{}, wantRes: &Result{ - ProcessAt: now, - Queue: "default", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, + EnqueuedAt: now.UTC(), + ProcessAt: now, + Queue: "default", + Retry: defaultMaxRetry, + Timeout: defaultTimeout, + Deadline: noDeadline, }, wantPending: map[string][]*base.TaskMessage{ "default": { @@ -70,11 +71,12 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { processAt: oneHourLater, opts: []Option{}, wantRes: &Result{ - ProcessAt: oneHourLater, - Queue: "default", - Retry: defaultMaxRetry, - Timeout: defaultTimeout, - Deadline: noDeadline, + EnqueuedAt: now.UTC(), + ProcessAt: oneHourLater, + Queue: "default", + Retry: defaultMaxRetry, + Timeout: defaultTimeout, + Deadline: noDeadline, }, wantPending: map[string][]*base.TaskMessage{ "default": {}, @@ -111,8 +113,8 @@ func TestClientEnqueueWithProcessAtOption(t *testing.T) { cmpopts.EquateApproxTime(500 * time.Millisecond), } if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { - t.Errorf("%s;\nEnqueueAt(processAt, task) returned %v, want %v; (-want,+got)\n%s", - tc.desc, gotRes, tc.wantRes, diff) + t.Errorf("%s;\nEnqueue(task, ProcessAt(%v)) returned %v, want %v; (-want,+got)\n%s", + tc.desc, tc.processAt, gotRes, tc.wantRes, diff) } for qname, want := range tc.wantPending { @@ -366,7 +368,7 @@ func TestClientEnqueue(t *testing.T) { continue } cmpOptions := []cmp.Option{ - cmpopts.IgnoreFields(Result{}, "ID"), + cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"), cmpopts.EquateApproxTime(500 * time.Millisecond), } if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { @@ -471,12 +473,12 @@ func TestClientEnqueueWithProcessInOption(t *testing.T) { continue } cmpOptions := []cmp.Option{ - cmpopts.IgnoreFields(Result{}, "ID"), + cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"), cmpopts.EquateApproxTime(500 * time.Millisecond), } if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { - t.Errorf("%s;\nEnqueueIn(delay, task) returned %v, want %v; (-want,+got)\n%s", - tc.desc, gotRes, tc.wantRes, diff) + t.Errorf("%s;\nEnqueue(task, ProcessIn(%v)) returned %v, want %v; (-want,+got)\n%s", + tc.desc, tc.delay, gotRes, tc.wantRes, diff) } for qname, want := range tc.wantPending { @@ -617,7 +619,7 @@ func TestClientDefaultOptions(t *testing.T) { t.Fatal(err) } cmpOptions := []cmp.Option{ - cmpopts.IgnoreFields(Result{}, "ID"), + cmpopts.IgnoreFields(Result{}, "ID", "EnqueuedAt"), cmpopts.EquateApproxTime(500 * time.Millisecond), } if diff := cmp.Diff(tc.wantRes, gotRes, cmpOptions...); diff != "" { diff --git a/forwarder.go b/forwarder.go new file mode 100644 index 0000000..3b2babf --- /dev/null +++ b/forwarder.go @@ -0,0 +1,75 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package asynq + +import ( + "sync" + "time" + + "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/log" +) + +// A forwarder is responsible for moving scheduled and retry tasks to pending state +// so that the tasks get processed by the workers. +type forwarder struct { + logger *log.Logger + broker base.Broker + + // channel to communicate back to the long running "forwarder" goroutine. + done chan struct{} + + // list of queue names to check and enqueue. + queues []string + + // poll interval on average + avgInterval time.Duration +} + +type forwarderParams struct { + logger *log.Logger + broker base.Broker + queues []string + interval time.Duration +} + +func newForwarder(params forwarderParams) *forwarder { + return &forwarder{ + logger: params.logger, + broker: params.broker, + done: make(chan struct{}), + queues: params.queues, + avgInterval: params.interval, + } +} + +func (f *forwarder) terminate() { + f.logger.Debug("Forwarder shutting down...") + // Signal the forwarder goroutine to stop polling. + f.done <- struct{}{} +} + +// start starts the "forwarder" goroutine. +func (f *forwarder) start(wg *sync.WaitGroup) { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-f.done: + f.logger.Debug("Forwarder done") + return + case <-time.After(f.avgInterval): + f.exec() + } + } + }() +} + +func (f *forwarder) exec() { + if err := f.broker.CheckAndEnqueue(f.queues...); err != nil { + f.logger.Errorf("Could not enqueue scheduled tasks: %v", err) + } +} diff --git a/forwarder_test.go b/forwarder_test.go new file mode 100644 index 0000000..27728b3 --- /dev/null +++ b/forwarder_test.go @@ -0,0 +1,137 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package asynq + +import ( + "sync" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + h "github.com/hibiken/asynq/internal/asynqtest" + "github.com/hibiken/asynq/internal/base" + "github.com/hibiken/asynq/internal/rdb" +) + +func TestForwarder(t *testing.T) { + r := setup(t) + defer r.Close() + rdbClient := rdb.NewRDB(r) + const pollInterval = time.Second + s := newForwarder(forwarderParams{ + logger: testLogger, + broker: rdbClient, + queues: []string{"default", "critical"}, + interval: pollInterval, + }) + t1 := h.NewTaskMessageWithQueue("gen_thumbnail", nil, "default") + t2 := h.NewTaskMessageWithQueue("send_email", nil, "critical") + t3 := h.NewTaskMessageWithQueue("reindex", nil, "default") + t4 := h.NewTaskMessageWithQueue("sync", nil, "critical") + now := time.Now() + + tests := []struct { + initScheduled map[string][]base.Z // scheduled queue initial state + initRetry map[string][]base.Z // retry queue initial state + initPending map[string][]*base.TaskMessage // default queue initial state + wait time.Duration // wait duration before checking for final state + wantScheduled map[string][]*base.TaskMessage // schedule queue final state + wantRetry map[string][]*base.TaskMessage // retry queue final state + wantPending map[string][]*base.TaskMessage // default queue final state + }{ + { + initScheduled: map[string][]base.Z{ + "default": {{Message: t1, Score: now.Add(time.Hour).Unix()}}, + "critical": {{Message: t2, Score: now.Add(-2 * time.Second).Unix()}}, + }, + initRetry: map[string][]base.Z{ + "default": {{Message: t3, Score: time.Now().Add(-500 * time.Millisecond).Unix()}}, + "critical": {}, + }, + initPending: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {t4}, + }, + wait: pollInterval * 2, + wantScheduled: map[string][]*base.TaskMessage{ + "default": {t1}, + "critical": {}, + }, + wantRetry: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + }, + wantPending: map[string][]*base.TaskMessage{ + "default": {t3}, + "critical": {t2, t4}, + }, + }, + { + initScheduled: map[string][]base.Z{ + "default": { + {Message: t1, Score: now.Unix()}, + {Message: t3, Score: now.Add(-500 * time.Millisecond).Unix()}, + }, + "critical": { + {Message: t2, Score: now.Add(-2 * time.Second).Unix()}, + }, + }, + initRetry: map[string][]base.Z{ + "default": {}, + "critical": {}, + }, + initPending: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {t4}, + }, + wait: pollInterval * 2, + wantScheduled: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + }, + wantRetry: map[string][]*base.TaskMessage{ + "default": {}, + "critical": {}, + }, + wantPending: map[string][]*base.TaskMessage{ + "default": {t1, t3}, + "critical": {t2, t4}, + }, + }, + } + + for _, tc := range tests { + h.FlushDB(t, r) // clean up db before each test case. + h.SeedAllScheduledQueues(t, r, tc.initScheduled) // initialize scheduled queue + h.SeedAllRetryQueues(t, r, tc.initRetry) // initialize retry queue + h.SeedAllPendingQueues(t, r, tc.initPending) // initialize default queue + + var wg sync.WaitGroup + s.start(&wg) + time.Sleep(tc.wait) + s.terminate() + + for qname, want := range tc.wantScheduled { + gotScheduled := h.GetScheduledMessages(t, r, qname) + if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q after running forwarder: (-want, +got)\n%s", base.ScheduledKey(qname), diff) + } + } + + for qname, want := range tc.wantRetry { + gotRetry := h.GetRetryMessages(t, r, qname) + if diff := cmp.Diff(want, gotRetry, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q after running forwarder: (-want, +got)\n%s", base.RetryKey(qname), diff) + } + } + + for qname, want := range tc.wantPending { + gotPending := h.GetPendingMessages(t, r, qname) + if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { + t.Errorf("mismatch found in %q after running forwarder: (-want, +got)\n%s", base.QueueKey(qname), diff) + } + } + } +} diff --git a/go.mod b/go.mod index 6566a52..88612af 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/go-redis/redis/v7 v7.4.0 github.com/google/go-cmp v0.4.0 github.com/google/uuid v1.1.1 + github.com/robfig/cron/v3 v3.0.1 github.com/spf13/cast v1.3.1 go.uber.org/goleak v0.10.0 golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e diff --git a/go.sum b/go.sum index a7abac2..d963f7a 100644 --- a/go.sum +++ b/go.sum @@ -27,6 +27,8 @@ github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng= github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= diff --git a/internal/asynqtest/asynqtest.go b/internal/asynqtest/asynqtest.go index 8e02358..d1d5f12 100644 --- a/internal/asynqtest/asynqtest.go +++ b/internal/asynqtest/asynqtest.go @@ -65,6 +65,24 @@ var SortWorkerInfoOpt = cmp.Transformer("SortWorkerInfo", func(in []*base.Worker return out }) +// SortSchedulerEntryOpt is a cmp.Option to sort base.SchedulerEntry for comparing slice of entries. +var SortSchedulerEntryOpt = cmp.Transformer("SortSchedulerEntry", func(in []*base.SchedulerEntry) []*base.SchedulerEntry { + out := append([]*base.SchedulerEntry(nil), in...) // Copy input to avoid mutating it + sort.Slice(out, func(i, j int) bool { + return out[i].Spec < out[j].Spec + }) + return out +}) + +// SortSchedulerEnqueueEventOpt is a cmp.Option to sort base.SchedulerEnqueueEvent for comparing slice of events. +var SortSchedulerEnqueueEventOpt = cmp.Transformer("SortSchedulerEnqueueEvent", func(in []*base.SchedulerEnqueueEvent) []*base.SchedulerEnqueueEvent { + out := append([]*base.SchedulerEnqueueEvent(nil), in...) + sort.Slice(out, func(i, j int) bool { + return out[i].EnqueuedAt.Unix() < out[j].EnqueuedAt.Unix() + }) + return out +}) + // SortStringSliceOpt is a cmp.Option to sort string slice. var SortStringSliceOpt = cmp.Transformer("SortStringSlice", func(in []string) []string { out := append([]string(nil), in...) diff --git a/internal/base/base.go b/internal/base/base.go index 35ec0f3..15a6b2d 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -29,10 +29,11 @@ var DefaultQueue = QueueKey(DefaultQueueName) // Global Redis keys. const ( - AllServers = "asynq:servers" // ZSET - AllWorkers = "asynq:workers" // ZSET - AllQueues = "asynq:queues" // SET - CancelChannel = "asynq:cancel" // PubSub channel + AllServers = "asynq:servers" // ZSET + AllWorkers = "asynq:workers" // ZSET + AllSchedulers = "asynq:schedulers" // ZSET + AllQueues = "asynq:queues" // SET + CancelChannel = "asynq:cancel" // PubSub channel ) // QueueKey returns a redis key for the given queue name. @@ -81,13 +82,23 @@ func FailedKey(qname string, t time.Time) string { } // ServerInfoKey returns a redis key for process info. -func ServerInfoKey(hostname string, pid int, sid string) string { - return fmt.Sprintf("asynq:servers:{%s:%d:%s}", hostname, pid, sid) +func ServerInfoKey(hostname string, pid int, serverID string) string { + return fmt.Sprintf("asynq:servers:{%s:%d:%s}", hostname, pid, serverID) } // WorkersKey returns a redis key for the workers given hostname, pid, and server ID. -func WorkersKey(hostname string, pid int, sid string) string { - return fmt.Sprintf("asynq:workers:{%s:%d:%s}", hostname, pid, sid) +func WorkersKey(hostname string, pid int, serverID string) string { + return fmt.Sprintf("asynq:workers:{%s:%d:%s}", hostname, pid, serverID) +} + +// SchedulerEntriesKey returns a redis key for the scheduler entries given scheduler ID. +func SchedulerEntriesKey(schedulerID string) string { + return fmt.Sprintf("asynq:schedulers:{%s}", schedulerID) +} + +// SchedulerHistoryKey returns a redis key for the scheduler's history for the given entry. +func SchedulerHistoryKey(entryID string) string { + return fmt.Sprintf("asynq:scheduler_history:%s", entryID) } // UniqueKey returns a redis key with the given type, payload, and queue name. @@ -208,10 +219,10 @@ const ( // StatusIdle indicates the server is in idle state. StatusIdle ServerStatusValue = iota - // StatusRunning indicates the servier is up and processing tasks. + // StatusRunning indicates the server is up and active. StatusRunning - // StatusQuiet indicates the server is up but not processing new tasks. + // StatusQuiet indicates the server is up but not active. StatusQuiet // StatusStopped indicates the server server has been stopped. @@ -273,6 +284,40 @@ type WorkerInfo struct { Started time.Time } +// SchedulerEntry holds information about a periodic task registered with a scheduler. +type SchedulerEntry struct { + // Identifier of this entry. + ID string + + // Spec describes the schedule of this entry. + Spec string + + // Type is the task type of the periodic task. + Type string + + // Payload is the payload of the periodic task. + Payload map[string]interface{} + + // Opts is the options for the periodic task. + Opts string + + // Next shows the next time the task will be enqueued. + Next time.Time + + // Prev shows the last time the task was enqueued. + // Zero time if task was never enqueued. + Prev time.Time +} + +// SchedulerEnqueueEvent holds information about an enqueue event by a scheduler. +type SchedulerEnqueueEvent struct { + // ID of the task that was enqueued. + TaskID string + + // Time the task was enqueued. + EnqueuedAt time.Time +} + // Cancelations is a collection that holds cancel functions for all active tasks. // // Cancelations are safe for concurrent use by multipel goroutines. diff --git a/internal/base/base_test.go b/internal/base/base_test.go index c12fc80..428271b 100644 --- a/internal/base/base_test.go +++ b/internal/base/base_test.go @@ -212,6 +212,41 @@ func TestWorkersKey(t *testing.T) { } } +func TestSchedulerEntriesKey(t *testing.T) { + tests := []struct { + schedulerID string + want string + }{ + {"localhost:9876:scheduler123", "asynq:schedulers:{localhost:9876:scheduler123}"}, + {"127.0.0.1:1234:scheduler987", "asynq:schedulers:{127.0.0.1:1234:scheduler987}"}, + } + + for _, tc := range tests { + got := SchedulerEntriesKey(tc.schedulerID) + if got != tc.want { + t.Errorf("SchedulerEntriesKey(%q) = %q, want %q", tc.schedulerID, got, tc.want) + } + } +} + +func TestSchedulerHistoryKey(t *testing.T) { + tests := []struct { + entryID string + want string + }{ + {"entry876", "asynq:scheduler_history:entry876"}, + {"entry345", "asynq:scheduler_history:entry345"}, + } + + for _, tc := range tests { + got := SchedulerHistoryKey(tc.entryID) + if got != tc.want { + t.Errorf("SchedulerHistoryKey(%q) = %q, want %q", + tc.entryID, got, tc.want) + } + } +} + func TestUniqueKey(t *testing.T) { tests := []struct { desc string diff --git a/internal/rdb/inspect.go b/internal/rdb/inspect.go index bec0752..4d70a1c 100644 --- a/internal/rdb/inspect.go +++ b/internal/rdb/inspect.go @@ -758,7 +758,7 @@ return keys`) // ListServers returns the list of server info. func (r *RDB) ListServers() ([]*base.ServerInfo, error) { - now := time.Now().UTC() + now := time.Now() res, err := listServerKeysCmd.Run(r.client, []string{base.AllServers}, now.Unix()).Result() if err != nil { return nil, err @@ -791,7 +791,7 @@ return keys`) // ListWorkers returns the list of worker stats. func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) { - now := time.Now().UTC() + now := time.Now() res, err := listWorkerKeysCmd.Run(r.client, []string{base.AllWorkers}, now.Unix()).Result() if err != nil { return nil, err @@ -818,6 +818,63 @@ func (r *RDB) ListWorkers() ([]*base.WorkerInfo, error) { return workers, nil } +// Note: Script also removes stale keys. +var listSchedulerKeysCmd = redis.NewScript(` +local now = tonumber(ARGV[1]) +local keys = redis.call("ZRANGEBYSCORE", KEYS[1], now, "+inf") +redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", now-1) +return keys`) + +// ListSchedulerEntries returns the list of scheduler entries. +func (r *RDB) ListSchedulerEntries() ([]*base.SchedulerEntry, error) { + now := time.Now() + res, err := listSchedulerKeysCmd.Run(r.client, []string{base.AllSchedulers}, now.Unix()).Result() + if err != nil { + return nil, err + } + keys, err := cast.ToStringSliceE(res) + if err != nil { + return nil, err + } + var entries []*base.SchedulerEntry + for _, key := range keys { + data, err := r.client.LRange(key, 0, -1).Result() + if err != nil { + continue // skip bad data + } + for _, s := range data { + var e base.SchedulerEntry + if err := json.Unmarshal([]byte(s), &e); err != nil { + continue // skip bad data + } + entries = append(entries, &e) + } + } + return entries, nil +} + +// ListSchedulerEnqueueEvents returns the list of scheduler enqueue events. +func (r *RDB) ListSchedulerEnqueueEvents(entryID string) ([]*base.SchedulerEnqueueEvent, error) { + key := base.SchedulerHistoryKey(entryID) + zs, err := r.client.ZRangeWithScores(key, 0, -1).Result() + if err != nil { + return nil, err + } + var events []*base.SchedulerEnqueueEvent + for _, z := range zs { + data, err := cast.ToStringE(z.Member) + if err != nil { + return nil, err + } + var e base.SchedulerEnqueueEvent + if err := json.Unmarshal([]byte(data), &e); err != nil { + return nil, err + } + events = append(events, &e) + } + return events, nil +} + // Pause pauses processing of tasks from the given queue. func (r *RDB) Pause(qname string) error { key := base.PausedKey(qname) diff --git a/internal/rdb/inspect_test.go b/internal/rdb/inspect_test.go index 8d7625b..1f6ba09 100644 --- a/internal/rdb/inspect_test.go +++ b/internal/rdb/inspect_test.go @@ -2983,6 +2983,103 @@ func TestListWorkers(t *testing.T) { } } +func TestWriteListClearSchedulerEntries(t *testing.T) { + r := setup(t) + now := time.Now().UTC() + schedulerID := "127.0.0.1:9876:abc123" + + data := []*base.SchedulerEntry{ + &base.SchedulerEntry{ + Spec: "* * * * *", + Type: "foo", + Payload: nil, + Opts: "", + Next: now.Add(5 * time.Hour), + Prev: now.Add(-2 * time.Hour), + }, + &base.SchedulerEntry{ + Spec: "@every 20m", + Type: "bar", + Payload: map[string]interface{}{"fiz": "baz"}, + Opts: "", + Next: now.Add(1 * time.Minute), + Prev: now.Add(-19 * time.Minute), + }, + } + + if err := r.WriteSchedulerEntries(schedulerID, data, 30*time.Second); err != nil { + t.Fatalf("WriteSchedulerEnties failed: %v", err) + } + entries, err := r.ListSchedulerEntries() + if err != nil { + t.Fatalf("ListSchedulerEntries failed: %v", err) + } + if diff := cmp.Diff(data, entries, h.SortSchedulerEntryOpt); diff != "" { + t.Errorf("ListSchedulerEntries() = %v, want %v; (-want,+got)\n%s", entries, data, diff) + } + if err := r.ClearSchedulerEntries(schedulerID); err != nil { + t.Fatalf("ClearSchedulerEntries failed: %v", err) + } + entries, err = r.ListSchedulerEntries() + if err != nil { + t.Fatalf("ListSchedulerEntries() after clear failed: %v", err) + } + if len(entries) != 0 { + t.Errorf("found %d entries, want 0 after clearing", len(entries)) + } +} + +func TestSchedulerEnqueueEvents(t *testing.T) { + r := setup(t) + + var ( + now = time.Now() + oneDayAgo = now.Add(-24 * time.Hour) + oneHourAgo = now.Add(-1 * time.Hour) + ) + + type event struct { + entryID string + taskID string + enqueuedAt time.Time + } + + tests := []struct { + entryID string + events []*base.SchedulerEnqueueEvent + }{ + { + entryID: "entry123", + events: []*base.SchedulerEnqueueEvent{{"task123", oneDayAgo}, {"task456", oneHourAgo}}, + }, + { + entryID: "entry123", + events: []*base.SchedulerEnqueueEvent{}, + }, + } + +loop: + for _, tc := range tests { + h.FlushDB(t, r.client) + + for _, e := range tc.events { + if err := r.RecordSchedulerEnqueueEvent(tc.entryID, e); err != nil { + t.Errorf("RecordSchedulerEnqueueEvent(%q, %v) failed: %v", tc.entryID, e, err) + continue loop + } + } + got, err := r.ListSchedulerEnqueueEvents(tc.entryID) + if err != nil { + t.Errorf("ListSchedulerEnqueueEvents(%q) failed: %v", tc.entryID, err) + continue + } + if diff := cmp.Diff(tc.events, got, h.SortSchedulerEnqueueEventOpt, timeCmpOpt); diff != "" { + t.Errorf("ListSchedulerEnqueueEvent(%q) = %v, want %v; (-want,+got)\n%s", + tc.entryID, got, tc.events, diff) + } + } +} + func TestPause(t *testing.T) { r := setup(t) diff --git a/internal/rdb/rdb.go b/internal/rdb/rdb.go index baf3280..09a962d 100644 --- a/internal/rdb/rdb.go +++ b/internal/rdb/rdb.go @@ -575,6 +575,45 @@ func (r *RDB) ClearServerState(host string, pid int, serverID string) error { return clearServerStateCmd.Run(r.client, []string{skey, wkey}).Err() } +// KEYS[1] -> asynq:schedulers:{} +// ARGV[1] -> TTL in seconds +// ARGV[2:] -> schedler entries +var writeSchedulerEntriesCmd = redis.NewScript(` +redis.call("DEL", KEYS[1]) +for i = 2, #ARGV do + redis.call("LPUSH", KEYS[1], ARGV[i]) +end +redis.call("EXPIRE", KEYS[1], ARGV[1]) +return redis.status_reply("OK")`) + +// WriteSchedulerEntries writes scheduler entries data to redis with expiration set to the value ttl. +func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.SchedulerEntry, ttl time.Duration) error { + args := []interface{}{ttl.Seconds()} + for _, e := range entries { + bytes, err := json.Marshal(e) + if err != nil { + continue // skip bad data + } + args = append(args, bytes) + } + exp := time.Now().Add(ttl).UTC() + key := base.SchedulerEntriesKey(schedulerID) + err := r.client.ZAdd(base.AllSchedulers, &redis.Z{Score: float64(exp.Unix()), Member: key}).Err() + if err != nil { + return err + } + return writeSchedulerEntriesCmd.Run(r.client, []string{key}, args...).Err() +} + +// ClearSchedulerEntries deletes scheduler entries data from redis. +func (r *RDB) ClearSchedulerEntries(scheduelrID string) error { + key := base.SchedulerEntriesKey(scheduelrID) + if err := r.client.ZRem(base.AllSchedulers, key).Err(); err != nil { + return err + } + return r.client.Del(key).Err() +} + // CancelationPubSub returns a pubsub for cancelation messages. func (r *RDB) CancelationPubSub() (*redis.PubSub, error) { pubsub := r.client.Subscribe(base.CancelChannel) @@ -590,3 +629,26 @@ func (r *RDB) CancelationPubSub() (*redis.PubSub, error) { func (r *RDB) PublishCancelation(id string) error { return r.client.Publish(base.CancelChannel, id).Err() } + +// KEYS[1] -> asynq:scheduler_history: +// ARGV[1] -> enqueued_at timestamp +// ARGV[2] -> serialized SchedulerEnqueueEvent data +// ARGV[3] -> max number of events to be persisted +var recordSchedulerEnqueueEventCmd = redis.NewScript(` +redis.call("ZADD", KEYS[1], ARGV[1], ARGV[2]) +redis.call("ZREMRANGEBYSCORE", KEYS[1], "-inf", ARGV[3]) +return redis.status_reply("OK")`) + +// Maximum number of enqueue events to store per entry. +const maxEvents = 10000 + +// RecordSchedulerEnqueueEvent records the time when the given task was enqueued. +func (r *RDB) RecordSchedulerEnqueueEvent(entryID string, event *base.SchedulerEnqueueEvent) error { + key := base.SchedulerHistoryKey(entryID) + data, err := json.Marshal(event) + if err != nil { + return err + } + return recordSchedulerEnqueueEventCmd.Run( + r.client, []string{key}, event.EnqueuedAt.Unix(), data, maxEvents).Err() +} diff --git a/scheduler.go b/scheduler.go index 6effb90..9654232 100644 --- a/scheduler.go +++ b/scheduler.go @@ -5,69 +5,244 @@ package asynq import ( + "fmt" + "os" + "strings" "sync" "time" + "github.com/google/uuid" "github.com/hibiken/asynq/internal/base" "github.com/hibiken/asynq/internal/log" + "github.com/hibiken/asynq/internal/rdb" + "github.com/robfig/cron/v3" ) -type scheduler struct { - logger *log.Logger - broker base.Broker - - // channel to communicate back to the long running "scheduler" goroutine. - done chan struct{} - - // list of queue names to check and enqueue. - queues []string - - // poll interval on average - avgInterval time.Duration -} - -type schedulerParams struct { +// A Scheduler kicks off tasks at regular intervals based on the user defined schedule. +type Scheduler struct { + id string + status *base.ServerStatus logger *log.Logger - broker base.Broker - queues []string - interval time.Duration + client *Client + rdb *rdb.RDB + cron *cron.Cron + location *time.Location + done chan struct{} + wg sync.WaitGroup } -func newScheduler(params schedulerParams) *scheduler { - return &scheduler{ - logger: params.logger, - broker: params.broker, - done: make(chan struct{}), - queues: params.queues, - avgInterval: params.interval, +// NewScheduler returns a new Scheduler instance given the redis connection option. +// The parameter opts is optional, defaults will be used if opts is set to nil +func NewScheduler(r RedisConnOpt, opts *SchedulerOpts) *Scheduler { + if opts == nil { + opts = &SchedulerOpts{} + } + + logger := log.NewLogger(opts.Logger) + loglevel := opts.LogLevel + if loglevel == level_unspecified { + loglevel = InfoLevel + } + logger.SetLevel(toInternalLogLevel(loglevel)) + + loc := opts.Location + if loc == nil { + loc = time.UTC + } + + return &Scheduler{ + id: generateSchedulerID(), + status: base.NewServerStatus(base.StatusIdle), + logger: logger, + client: NewClient(r), + rdb: rdb.NewRDB(createRedisClient(r)), + cron: cron.New(cron.WithLocation(loc)), + location: loc, + done: make(chan struct{}), } } -func (s *scheduler) terminate() { - s.logger.Debug("Scheduler shutting down...") - // Signal the scheduler goroutine to stop polling. - s.done <- struct{}{} +func generateSchedulerID() string { + host, err := os.Hostname() + if err != nil { + host = "unknown-host" + } + return fmt.Sprintf("%s:%d:%v", host, os.Getpid(), uuid.New()) } -// start starts the "scheduler" goroutine. -func (s *scheduler) start(wg *sync.WaitGroup) { - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-s.done: - s.logger.Debug("Scheduler done") - return - case <-time.After(s.avgInterval): - s.exec() - } +// SchedulerOpts specifies scheduler options. +type SchedulerOpts struct { + // Logger specifies the logger used by the scheduler instance. + // + // If unset, the default logger is used. + Logger Logger + + // LogLevel specifies the minimum log level to enable. + // + // If unset, InfoLevel is used by default. + LogLevel LogLevel + + // Location specifies the time zone location. + // + // If unset, the UTC time zone (time.UTC) is used. + Location *time.Location + + // TODO: Add ErrorHandler +} + +// enqueueJob encapsulates the job of enqueing a task and recording the event. +type enqueueJob struct { + id uuid.UUID + cronspec string + task *Task + opts []Option + location *time.Location + logger *log.Logger + client *Client + rdb *rdb.RDB +} + +func (j *enqueueJob) Run() { + res, err := j.client.Enqueue(j.task, j.opts...) + if err != nil { + j.logger.Errorf("scheduler could not enqueue a task %+v: %v", j.task, err) + return + } + j.logger.Infof("scheduler enqueued a task: %+v", res) + event := &base.SchedulerEnqueueEvent{ + TaskID: res.ID, + EnqueuedAt: res.EnqueuedAt.In(j.location), + } + err = j.rdb.RecordSchedulerEnqueueEvent(j.id.String(), event) + if err != nil { + j.logger.Errorf("scheduler could not record enqueue event of enqueued task %+v: %v", j.task, err) + } +} + +// Register registers a task to be enqueued with given schedule specified by the cronspec. +// It returns an ID of the newly registered entry. +func (s *Scheduler) Register(cronspec string, task *Task, opts ...Option) (entryID string, err error) { + job := &enqueueJob{ + id: uuid.New(), + cronspec: cronspec, + task: task, + opts: opts, + location: s.location, + client: s.client, + rdb: s.rdb, + logger: s.logger, + } + if _, err = s.cron.AddJob(cronspec, job); err != nil { + return "", err + } + return job.id.String(), nil +} + +// Run starts the scheduler until an os signal to exit the program is received. +// It returns an error if scheduler is already running or has been stopped. +func (s *Scheduler) Run() error { + if err := s.Start(); err != nil { + return err + } + s.waitForSignals() + return s.Stop() +} + +// Start starts the scheduler. +// It returns an error if the scheduler is already running or has been stopped. +func (s *Scheduler) Start() error { + switch s.status.Get() { + case base.StatusRunning: + return fmt.Errorf("asynq: the scheduler is already running") + case base.StatusStopped: + return fmt.Errorf("asynq: the scheduler has already been stopped") + } + s.logger.Info("Scheduler starting") + s.logger.Infof("Scheduler timezone is set to %v", s.location) + s.cron.Start() + s.wg.Add(1) + go s.runHeartbeater() + s.status.Set(base.StatusRunning) + return nil +} + +// Stop stops the scheduler. +// It returns an error if the scheduler is not currently running. +func (s *Scheduler) Stop() error { + if s.status.Get() != base.StatusRunning { + return fmt.Errorf("asynq: the scheduler is not running") + } + s.logger.Info("Scheduler shutting down") + close(s.done) // signal heartbeater to stop + ctx := s.cron.Stop() + <-ctx.Done() + s.wg.Wait() + + s.client.Close() + s.rdb.Close() + s.status.Set(base.StatusStopped) + s.logger.Info("Scheduler stopped") + return nil +} + +func (s *Scheduler) runHeartbeater() { + defer s.wg.Done() + ticker := time.NewTicker(5 * time.Second) + for { + select { + case <-s.done: + s.logger.Debugf("Scheduler heatbeater shutting down") + s.rdb.ClearSchedulerEntries(s.id) + return + case <-ticker.C: + s.beat() } - }() -} - -func (s *scheduler) exec() { - if err := s.broker.CheckAndEnqueue(s.queues...); err != nil { - s.logger.Errorf("Could not enqueue scheduled tasks: %v", err) } } + +// beat writes a snapshot of entries to redis. +func (s *Scheduler) beat() { + var entries []*base.SchedulerEntry + for _, entry := range s.cron.Entries() { + job := entry.Job.(*enqueueJob) + e := &base.SchedulerEntry{ + ID: job.id.String(), + Spec: job.cronspec, + Type: job.task.Type, + Payload: job.task.Payload.data, + Opts: stringifyOptions(job.opts), + Next: entry.Next, + Prev: entry.Prev, + } + entries = append(entries, e) + } + s.logger.Debugf("Writing entries %v", entries) + if err := s.rdb.WriteSchedulerEntries(s.id, entries, 5*time.Second); err != nil { + s.logger.Warnf("Scheduler could not write heartbeat data: %v", err) + } +} + +func stringifyOptions(opts []Option) string { + var res []string + for _, opt := range opts { + switch opt := opt.(type) { + case retryOption: + res = append(res, fmt.Sprintf("MaxRetry(%d)", int(opt))) + case queueOption: + res = append(res, fmt.Sprintf("Queue(%q)", string(opt))) + case timeoutOption: + res = append(res, fmt.Sprintf("Timeout(%v)", time.Duration(opt))) + case deadlineOption: + res = append(res, fmt.Sprintf("Deadline(%v)", time.Time(opt))) + case uniqueOption: + res = append(res, fmt.Sprintf("Unique(%v)", time.Duration(opt))) + case processAtOption: + res = append(res, fmt.Sprintf("ProcessAt(%v)", time.Time(opt))) + case processInOption: + res = append(res, fmt.Sprintf("ProcessIn(%v)", time.Duration(opt))) + default: + // ignore unexpected option + } + } + return strings.Join(res, ", ") +} diff --git a/scheduler_test.go b/scheduler_test.go index 965d24c..0cf4bc4 100644 --- a/scheduler_test.go +++ b/scheduler_test.go @@ -5,133 +5,109 @@ package asynq import ( - "sync" + "fmt" "testing" "time" "github.com/google/go-cmp/cmp" - h "github.com/hibiken/asynq/internal/asynqtest" + "github.com/hibiken/asynq/internal/asynqtest" "github.com/hibiken/asynq/internal/base" - "github.com/hibiken/asynq/internal/rdb" ) func TestScheduler(t *testing.T) { - r := setup(t) - defer r.Close() - rdbClient := rdb.NewRDB(r) - const pollInterval = time.Second - s := newScheduler(schedulerParams{ - logger: testLogger, - broker: rdbClient, - queues: []string{"default", "critical"}, - interval: pollInterval, - }) - t1 := h.NewTaskMessageWithQueue("gen_thumbnail", nil, "default") - t2 := h.NewTaskMessageWithQueue("send_email", nil, "critical") - t3 := h.NewTaskMessageWithQueue("reindex", nil, "default") - t4 := h.NewTaskMessageWithQueue("sync", nil, "critical") - now := time.Now() - tests := []struct { - initScheduled map[string][]base.Z // scheduled queue initial state - initRetry map[string][]base.Z // retry queue initial state - initPending map[string][]*base.TaskMessage // default queue initial state - wait time.Duration // wait duration before checking for final state - wantScheduled map[string][]*base.TaskMessage // schedule queue final state - wantRetry map[string][]*base.TaskMessage // retry queue final state - wantPending map[string][]*base.TaskMessage // default queue final state + cronspec string + task *Task + opts []Option + wait time.Duration + queue string + want []*base.TaskMessage }{ { - initScheduled: map[string][]base.Z{ - "default": {{Message: t1, Score: now.Add(time.Hour).Unix()}}, - "critical": {{Message: t2, Score: now.Add(-2 * time.Second).Unix()}}, - }, - initRetry: map[string][]base.Z{ - "default": {{Message: t3, Score: time.Now().Add(-500 * time.Millisecond).Unix()}}, - "critical": {}, - }, - initPending: map[string][]*base.TaskMessage{ - "default": {}, - "critical": {t4}, - }, - wait: pollInterval * 2, - wantScheduled: map[string][]*base.TaskMessage{ - "default": {t1}, - "critical": {}, - }, - wantRetry: map[string][]*base.TaskMessage{ - "default": {}, - "critical": {}, - }, - wantPending: map[string][]*base.TaskMessage{ - "default": {t3}, - "critical": {t2, t4}, + cronspec: "@every 3s", + task: NewTask("task1", nil), + opts: []Option{MaxRetry(10)}, + wait: 10 * time.Second, + queue: "default", + want: []*base.TaskMessage{ + { + Type: "task1", + Payload: nil, + Retry: 10, + Timeout: int64(defaultTimeout.Seconds()), + Queue: "default", + }, + { + Type: "task1", + Payload: nil, + Retry: 10, + Timeout: int64(defaultTimeout.Seconds()), + Queue: "default", + }, + { + Type: "task1", + Payload: nil, + Retry: 10, + Timeout: int64(defaultTimeout.Seconds()), + Queue: "default", + }, }, }, + } + + r := setup(t) + + for _, tc := range tests { + scheduler := NewScheduler(getRedisConnOpt(t), nil) + if _, err := scheduler.Register(tc.cronspec, tc.task, tc.opts...); err != nil { + t.Fatal(err) + } + + if err := scheduler.Start(); err != nil { + t.Fatal(err) + } + time.Sleep(tc.wait) + if err := scheduler.Stop(); err != nil { + t.Fatal(err) + } + + got := asynqtest.GetPendingMessages(t, r, tc.queue) + if diff := cmp.Diff(tc.want, got, asynqtest.IgnoreIDOpt); diff != "" { + t.Errorf("mismatch found in queue %q: (-want,+got)\n%s", tc.queue, diff) + } + } +} + +func TestStringifyOptions(t *testing.T) { + now := time.Now() + oneHourFromNow := now.Add(1 * time.Hour) + twoHoursFromNow := now.Add(2 * time.Hour) + tests := []struct { + opts []Option + want string + }{ { - initScheduled: map[string][]base.Z{ - "default": { - {Message: t1, Score: now.Unix()}, - {Message: t3, Score: now.Add(-500 * time.Millisecond).Unix()}, - }, - "critical": { - {Message: t2, Score: now.Add(-2 * time.Second).Unix()}, - }, - }, - initRetry: map[string][]base.Z{ - "default": {}, - "critical": {}, - }, - initPending: map[string][]*base.TaskMessage{ - "default": {}, - "critical": {t4}, - }, - wait: pollInterval * 2, - wantScheduled: map[string][]*base.TaskMessage{ - "default": {}, - "critical": {}, - }, - wantRetry: map[string][]*base.TaskMessage{ - "default": {}, - "critical": {}, - }, - wantPending: map[string][]*base.TaskMessage{ - "default": {t1, t3}, - "critical": {t2, t4}, - }, + opts: []Option{MaxRetry(10)}, + want: "MaxRetry(10)", + }, + { + opts: []Option{Queue("custom"), Timeout(1 * time.Minute)}, + want: `Queue("custom"), Timeout(1m0s)`, + }, + { + opts: []Option{ProcessAt(oneHourFromNow), Deadline(twoHoursFromNow)}, + want: fmt.Sprintf("ProcessAt(%v), Deadline(%v)", oneHourFromNow, twoHoursFromNow), + }, + { + opts: []Option{ProcessIn(30 * time.Minute), Unique(1 * time.Hour)}, + want: "ProcessIn(30m0s), Unique(1h0m0s)", }, } for _, tc := range tests { - h.FlushDB(t, r) // clean up db before each test case. - h.SeedAllScheduledQueues(t, r, tc.initScheduled) // initialize scheduled queue - h.SeedAllRetryQueues(t, r, tc.initRetry) // initialize retry queue - h.SeedAllPendingQueues(t, r, tc.initPending) // initialize default queue - - var wg sync.WaitGroup - s.start(&wg) - time.Sleep(tc.wait) - s.terminate() - - for qname, want := range tc.wantScheduled { - gotScheduled := h.GetScheduledMessages(t, r, qname) - if diff := cmp.Diff(want, gotScheduled, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.ScheduledKey(qname), diff) - } - } - - for qname, want := range tc.wantRetry { - gotRetry := h.GetRetryMessages(t, r, qname) - if diff := cmp.Diff(want, gotRetry, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.RetryKey(qname), diff) - } - } - - for qname, want := range tc.wantPending { - gotPending := h.GetPendingMessages(t, r, qname) - if diff := cmp.Diff(want, gotPending, h.SortMsgOpt); diff != "" { - t.Errorf("mismatch found in %q after running scheduler: (-want, +got)\n%s", base.QueueKey(qname), diff) - } + got := stringifyOptions(tc.opts) + if got != tc.want { + t.Errorf("got %v, want %v", got, tc.want) } } } diff --git a/server.go b/server.go index 3ad30cf..418c54a 100644 --- a/server.go +++ b/server.go @@ -41,7 +41,7 @@ type Server struct { // wait group to wait for all goroutines to finish. wg sync.WaitGroup - scheduler *scheduler + forwarder *forwarder processor *processor syncer *syncer heartbeater *heartbeater @@ -333,7 +333,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { starting: starting, finished: finished, }) - scheduler := newScheduler(schedulerParams{ + forwarder := newForwarder(forwarderParams{ logger: logger, broker: rdb, queues: qnames, @@ -375,7 +375,7 @@ func NewServer(r RedisConnOpt, cfg Config) *Server { logger: logger, broker: rdb, status: status, - scheduler: scheduler, + forwarder: forwarder, processor: processor, syncer: syncer, heartbeater: heartbeater, @@ -453,7 +453,7 @@ func (srv *Server) Start(handler Handler) error { srv.subscriber.start(&srv.wg) srv.syncer.start(&srv.wg) srv.recoverer.start(&srv.wg) - srv.scheduler.start(&srv.wg) + srv.forwarder.start(&srv.wg) srv.processor.start(&srv.wg) return nil } @@ -474,7 +474,7 @@ func (srv *Server) Stop() { // Sender goroutines should be terminated before the receiver goroutines. // processor -> syncer (via syncCh) // processor -> heartbeater (via starting, finished channels) - srv.scheduler.terminate() + srv.forwarder.terminate() srv.processor.terminate() srv.recoverer.terminate() srv.syncer.terminate() diff --git a/server_test.go b/server_test.go index 1b72570..32e79b5 100644 --- a/server_test.go +++ b/server_test.go @@ -127,7 +127,7 @@ func TestServerWithRedisDown(t *testing.T) { testBroker := testbroker.NewTestBroker(r) srv := NewServer(RedisClientOpt{Addr: ":6379"}, Config{LogLevel: testLogLevel}) srv.broker = testBroker - srv.scheduler.broker = testBroker + srv.forwarder.broker = testBroker srv.heartbeater.broker = testBroker srv.processor.broker = testBroker srv.subscriber.broker = testBroker @@ -160,7 +160,7 @@ func TestServerWithFlakyBroker(t *testing.T) { redisConnOpt := getRedisConnOpt(t) srv := NewServer(redisConnOpt, Config{LogLevel: testLogLevel}) srv.broker = testBroker - srv.scheduler.broker = testBroker + srv.forwarder.broker = testBroker srv.heartbeater.broker = testBroker srv.processor.broker = testBroker srv.subscriber.broker = testBroker diff --git a/signals_unix.go b/signals_unix.go index 0a2d93b..41f878b 100644 --- a/signals_unix.go +++ b/signals_unix.go @@ -28,3 +28,10 @@ func (srv *Server) waitForSignals() { break } } + +func (s *Scheduler) waitForSignals() { + s.logger.Info("Send signal TERM or INT to stop the scheduler") + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, unix.SIGTERM, unix.SIGINT) + <-sigs +} diff --git a/signals_windows.go b/signals_windows.go index b06e7ec..da22e68 100644 --- a/signals_windows.go +++ b/signals_windows.go @@ -20,3 +20,10 @@ func (srv *Server) waitForSignals() { signal.Notify(sigs, windows.SIGTERM, windows.SIGINT) <-sigs } + +func (s *Scheduler) waitForSignals() { + s.logger.Info("Send signal TERM or INT to stop the scheduler") + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, windows.SIGTERM, windows.SIGINT) + <-sigs +} diff --git a/tools/asynq/cmd/cron.go b/tools/asynq/cmd/cron.go new file mode 100644 index 0000000..4367ffb --- /dev/null +++ b/tools/asynq/cmd/cron.go @@ -0,0 +1,102 @@ +// Copyright 2020 Kentaro Hibino. All rights reserved. +// Use of this source code is governed by a MIT license +// that can be found in the LICENSE file. + +package cmd + +import ( + "fmt" + "io" + "os" + "sort" + + "github.com/spf13/cobra" +) + +func init() { + rootCmd.AddCommand(cronCmd) + cronCmd.AddCommand(cronListCmd) + cronCmd.AddCommand(cronHistoryCmd) +} + +var cronCmd = &cobra.Command{ + Use: "cron", + Short: "Manage cron", +} + +var cronListCmd = &cobra.Command{ + Use: "ls", + Short: "List cron entries", + Run: cronList, +} + +var cronHistoryCmd = &cobra.Command{ + Use: "history", + Short: "Show history of each cron tasks", + Args: cobra.MinimumNArgs(1), + Run: cronHistory, +} + +func cronList(cmd *cobra.Command, args []string) { + r := createRDB() + + entries, err := r.ListSchedulerEntries() + if err != nil { + fmt.Println(err) + os.Exit(1) + } + if len(entries) == 0 { + fmt.Println("No scheduler entries") + return + } + + // Sort entries by spec. + sort.Slice(entries, func(i, j int) bool { + x, y := entries[i], entries[j] + return x.Spec < y.Spec + }) + + cols := []string{"EntryID", "Spec", "Type", "Payload", "Options", "Next", "Prev"} + printRows := func(w io.Writer, tmpl string) { + for _, e := range entries { + fmt.Fprintf(w, tmpl, e.ID, e.Spec, e.Type, e.Payload, e.Opts, e.Next, e.Prev) + } + } + printTable(cols, printRows) +} + +func cronHistory(cmd *cobra.Command, args []string) { + r := createRDB() + for i, entryID := range args { + if i > 0 { + fmt.Printf("\n%s\n", separator) + } + fmt.Println() + + fmt.Printf("Entry: %s\n\n", entryID) + + events, err := r.ListSchedulerEnqueueEvents(entryID) + if err != nil { + fmt.Printf("error: %v\n", err) + continue + } + if len(events) == 0 { + fmt.Printf("No scheduler enqueue events found for entry: %s\n", entryID) + continue + } + + // Sort entries by enqueuedAt timestamp. + sort.Slice(events, func(i, j int) bool { + x, y := events[i], events[j] + return x.EnqueuedAt.Unix() > y.EnqueuedAt.Unix() + }) + + cols := []string{"TaskID", "EnqueuedAt"} + printRows := func(w io.Writer, tmpl string) { + for _, e := range events { + fmt.Fprintf(w, tmpl, e.TaskID, e.EnqueuedAt) + } + } + printTable(cols, printRows) + } +}