diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 45a0402..fbb015c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -67,3 +67,17 @@ jobs: - name: Test tools module run: cd tools && go test -race -v ./... && cd .. + golangci: + name: lint + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - uses: actions/setup-go@v5 + with: + go-version: stable + + - name: golangci-lint + uses: golangci/golangci-lint-action@v6 + with: + version: v1.61 diff --git a/Makefile b/Makefile index 0c1c7ff..6f11321 100644 --- a/Makefile +++ b/Makefile @@ -4,4 +4,8 @@ proto: internal/proto/asynq.proto protoc -I=$(ROOT_DIR)/internal/proto \ --go_out=$(ROOT_DIR)/internal/proto \ --go_opt=module=github.com/hibiken/asynq/internal/proto \ - $(ROOT_DIR)/internal/proto/asynq.proto \ No newline at end of file + $(ROOT_DIR)/internal/proto/asynq.proto + +.PHONY: lint +lint: + golangci-lint run diff --git a/benchmark_test.go b/benchmark_test.go index dfae16a..93897dd 100644 --- a/benchmark_test.go +++ b/benchmark_test.go @@ -55,7 +55,7 @@ func BenchmarkEndToEndSimple(b *testing.B) { } b.StartTimer() // end setup - srv.Start(HandlerFunc(handler)) + _ = srv.Start(HandlerFunc(handler)) wg.Wait() b.StopTimer() // begin teardown @@ -117,7 +117,7 @@ func BenchmarkEndToEnd(b *testing.B) { } b.StartTimer() // end setup - srv.Start(HandlerFunc(handler)) + _ = srv.Start(HandlerFunc(handler)) wg.Wait() b.StopTimer() // begin teardown @@ -174,7 +174,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) { } b.StartTimer() // end setup - srv.Start(HandlerFunc(handler)) + _ = srv.Start(HandlerFunc(handler)) wg.Wait() b.StopTimer() // begin teardown @@ -215,7 +215,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) { handler := func(ctx context.Context, t *Task) error { return nil } - srv.Start(HandlerFunc(handler)) + _ = srv.Start(HandlerFunc(handler)) b.StartTimer() // end setup diff --git a/client.go b/client.go index aec50b1..4a0ba39 100644 --- a/client.go +++ b/client.go @@ -433,7 +433,7 @@ func (c *Client) enqueue(ctx context.Context, msg *base.TaskMessage, uniqueTTL t func (c *Client) schedule(ctx context.Context, msg *base.TaskMessage, t time.Time, uniqueTTL time.Duration) error { if uniqueTTL > 0 { - ttl := t.Add(uniqueTTL).Sub(time.Now()) + ttl := time.Until(t.Add(uniqueTTL)) return c.broker.ScheduleUnique(ctx, msg, t, ttl) } return c.broker.Schedule(ctx, msg, t) diff --git a/client_test.go b/client_test.go index 80a5517..62aefc5 100644 --- a/client_test.go +++ b/client_test.go @@ -1173,7 +1173,7 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) { } gotTTL := r.TTL(context.Background(), base.UniqueKey(base.DefaultQueueName, tc.task.Type(), tc.task.Payload())).Val() - wantTTL := tc.at.Add(tc.ttl).Sub(time.Now()) + wantTTL := time.Until(tc.at.Add(tc.ttl)) if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) { t.Errorf("TTL = %v, want %v", gotTTL, wantTTL) continue diff --git a/heartbeat.go b/heartbeat.go index 4199600..f426445 100644 --- a/heartbeat.go +++ b/heartbeat.go @@ -120,7 +120,9 @@ func (h *heartbeater) start(wg *sync.WaitGroup) { for { select { case <-h.done: - h.broker.ClearServerState(h.host, h.pid, h.serverID) + if err := h.broker.ClearServerState(h.host, h.pid, h.serverID); err != nil { + h.logger.Errorf("Failed to clear server state: %v", err) + } h.logger.Debug("Heartbeater done") timer.Stop() return diff --git a/internal/base/base.go b/internal/base/base.go index c0eb5bb..054edf5 100644 --- a/internal/base/base.go +++ b/internal/base/base.go @@ -14,7 +14,7 @@ import ( "sync" "time" - "github.com/golang/protobuf/ptypes" + "github.com/golang/protobuf/ptypes" //nolint: staticcheck "github.com/hibiken/asynq/internal/errors" pb "github.com/hibiken/asynq/internal/proto" "github.com/hibiken/asynq/internal/timeutil" @@ -379,7 +379,7 @@ func EncodeServerInfo(info *ServerInfo) ([]byte, error) { for q, p := range info.Queues { queues[q] = int32(p) } - started, err := ptypes.TimestampProto(info.Started) + started, err := ptypes.TimestampProto(info.Started) //nolint: staticcheck if err != nil { return nil, err } @@ -406,7 +406,7 @@ func DecodeServerInfo(b []byte) (*ServerInfo, error) { for q, p := range pbmsg.GetQueues() { queues[q] = int(p) } - startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) + startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) //nolint: staticcheck if err != nil { return nil, err } @@ -441,11 +441,11 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) { if info == nil { return nil, fmt.Errorf("cannot encode nil worker info") } - startTime, err := ptypes.TimestampProto(info.Started) + startTime, err := ptypes.TimestampProto(info.Started) //nolint: staticcheck if err != nil { return nil, err } - deadline, err := ptypes.TimestampProto(info.Deadline) + deadline, err := ptypes.TimestampProto(info.Deadline) //nolint: staticcheck if err != nil { return nil, err } @@ -468,11 +468,11 @@ func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) { if err := proto.Unmarshal(b, &pbmsg); err != nil { return nil, err } - startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) + startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) //nolint: staticcheck if err != nil { return nil, err } - deadline, err := ptypes.Timestamp(pbmsg.GetDeadline()) + deadline, err := ptypes.Timestamp(pbmsg.GetDeadline()) //nolint: staticcheck if err != nil { return nil, err } @@ -519,11 +519,11 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) { if entry == nil { return nil, fmt.Errorf("cannot encode nil scheduler entry") } - next, err := ptypes.TimestampProto(entry.Next) + next, err := ptypes.TimestampProto(entry.Next) //nolint: staticcheck if err != nil { return nil, err } - prev, err := ptypes.TimestampProto(entry.Prev) + prev, err := ptypes.TimestampProto(entry.Prev) //nolint: staticcheck if err != nil { return nil, err } @@ -544,11 +544,11 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) { if err := proto.Unmarshal(b, &pbmsg); err != nil { return nil, err } - next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime()) + next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime()) //nolint: staticcheck if err != nil { return nil, err } - prev, err := ptypes.Timestamp(pbmsg.GetPrevEnqueueTime()) + prev, err := ptypes.Timestamp(pbmsg.GetPrevEnqueueTime()) //nolint: staticcheck if err != nil { return nil, err } @@ -578,7 +578,7 @@ func EncodeSchedulerEnqueueEvent(event *SchedulerEnqueueEvent) ([]byte, error) { if event == nil { return nil, fmt.Errorf("cannot encode nil enqueue event") } - enqueuedAt, err := ptypes.TimestampProto(event.EnqueuedAt) + enqueuedAt, err := ptypes.TimestampProto(event.EnqueuedAt) //nolint: staticcheck if err != nil { return nil, err } @@ -595,7 +595,7 @@ func DecodeSchedulerEnqueueEvent(b []byte) (*SchedulerEnqueueEvent, error) { if err := proto.Unmarshal(b, &pbmsg); err != nil { return nil, err } - enqueuedAt, err := ptypes.Timestamp(pbmsg.GetEnqueueTime()) + enqueuedAt, err := ptypes.Timestamp(pbmsg.GetEnqueueTime()) //nolint: staticcheck if err != nil { return nil, err } diff --git a/internal/rdb/rdb_test.go b/internal/rdb/rdb_test.go index 1b7d138..433d6fb 100644 --- a/internal/rdb/rdb_test.go +++ b/internal/rdb/rdb_test.go @@ -3206,7 +3206,7 @@ func TestCancelationPubSub(t *testing.T) { publish := []string{"one", "two", "three"} for _, msg := range publish { - r.PublishCancelation(msg) + _ = r.PublishCancelation(msg) } // allow for message to reach subscribers. diff --git a/periodic_task_manager.go b/periodic_task_manager.go index 085ca3d..3b927dd 100644 --- a/periodic_task_manager.go +++ b/periodic_task_manager.go @@ -7,7 +7,6 @@ package asynq import ( "crypto/sha256" "fmt" - "io" "sort" "sync" "time" @@ -79,13 +78,13 @@ type PeriodicTaskConfig struct { func (c *PeriodicTaskConfig) hash() string { h := sha256.New() - io.WriteString(h, c.Cronspec) - io.WriteString(h, c.Task.Type()) + _, _ = h.Write([]byte(c.Cronspec)) + _, _ = h.Write([]byte(c.Task.Type())) h.Write(c.Task.Payload()) opts := stringifyOptions(c.Opts) sort.Strings(opts) for _, opt := range opts { - io.WriteString(h, opt) + _, _ = h.Write([]byte(opt)) } return fmt.Sprintf("%x", h.Sum(nil)) } diff --git a/processor_test.go b/processor_test.go index 669ce43..2f7599e 100644 --- a/processor_test.go +++ b/processor_test.go @@ -313,7 +313,7 @@ func TestProcessorRetry(t *testing.T) { pending: []*base.TaskMessage{m1, m2, m3, m4}, delay: time.Minute, handler: HandlerFunc(func(ctx context.Context, task *Task) error { - return fmt.Errorf(errMsg) + return errors.New(errMsg) }), wait: 2 * time.Second, wantErrMsg: errMsg, diff --git a/scheduler.go b/scheduler.go index 5f0b05d..5e8781a 100644 --- a/scheduler.go +++ b/scheduler.go @@ -289,7 +289,9 @@ func (s *Scheduler) runHeartbeater() { select { case <-s.done: s.logger.Debugf("Scheduler heatbeater shutting down") - s.rdb.ClearSchedulerEntries(s.id) + if err := s.rdb.ClearSchedulerEntries(s.id); err != nil { + s.logger.Errorf("Failed to clear the scheduler entries: %v", err) + } ticker.Stop() return case <-ticker.C: diff --git a/server_test.go b/server_test.go index da7aac5..3d39e4a 100644 --- a/server_test.go +++ b/server_test.go @@ -91,7 +91,7 @@ func TestServerRun(t *testing.T) { // Make sure server exits when receiving TERM signal. go func() { time.Sleep(2 * time.Second) - syscall.Kill(syscall.Getpid(), syscall.SIGTERM) + _ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM) done <- struct{}{} }()