mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-26 07:42:17 +08:00
Run golangci-lint in CI (#927)
* Setup golangci-lint in CI and local-dev * Fix linting error or locally disable linter
This commit is contained in:
parent
3f4e211a3b
commit
03f4799712
14
.github/workflows/build.yml
vendored
14
.github/workflows/build.yml
vendored
@ -67,3 +67,17 @@ jobs:
|
|||||||
- name: Test tools module
|
- name: Test tools module
|
||||||
run: cd tools && go test -race -v ./... && cd ..
|
run: cd tools && go test -race -v ./... && cd ..
|
||||||
|
|
||||||
|
golangci:
|
||||||
|
name: lint
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- uses: actions/setup-go@v5
|
||||||
|
with:
|
||||||
|
go-version: stable
|
||||||
|
|
||||||
|
- name: golangci-lint
|
||||||
|
uses: golangci/golangci-lint-action@v6
|
||||||
|
with:
|
||||||
|
version: v1.61
|
||||||
|
4
Makefile
4
Makefile
@ -5,3 +5,7 @@ proto: internal/proto/asynq.proto
|
|||||||
--go_out=$(ROOT_DIR)/internal/proto \
|
--go_out=$(ROOT_DIR)/internal/proto \
|
||||||
--go_opt=module=github.com/hibiken/asynq/internal/proto \
|
--go_opt=module=github.com/hibiken/asynq/internal/proto \
|
||||||
$(ROOT_DIR)/internal/proto/asynq.proto
|
$(ROOT_DIR)/internal/proto/asynq.proto
|
||||||
|
|
||||||
|
.PHONY: lint
|
||||||
|
lint:
|
||||||
|
golangci-lint run
|
||||||
|
@ -55,7 +55,7 @@ func BenchmarkEndToEndSimple(b *testing.B) {
|
|||||||
}
|
}
|
||||||
b.StartTimer() // end setup
|
b.StartTimer() // end setup
|
||||||
|
|
||||||
srv.Start(HandlerFunc(handler))
|
_ = srv.Start(HandlerFunc(handler))
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
b.StopTimer() // begin teardown
|
b.StopTimer() // begin teardown
|
||||||
@ -117,7 +117,7 @@ func BenchmarkEndToEnd(b *testing.B) {
|
|||||||
}
|
}
|
||||||
b.StartTimer() // end setup
|
b.StartTimer() // end setup
|
||||||
|
|
||||||
srv.Start(HandlerFunc(handler))
|
_ = srv.Start(HandlerFunc(handler))
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
b.StopTimer() // begin teardown
|
b.StopTimer() // begin teardown
|
||||||
@ -174,7 +174,7 @@ func BenchmarkEndToEndMultipleQueues(b *testing.B) {
|
|||||||
}
|
}
|
||||||
b.StartTimer() // end setup
|
b.StartTimer() // end setup
|
||||||
|
|
||||||
srv.Start(HandlerFunc(handler))
|
_ = srv.Start(HandlerFunc(handler))
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
b.StopTimer() // begin teardown
|
b.StopTimer() // begin teardown
|
||||||
@ -215,7 +215,7 @@ func BenchmarkClientWhileServerRunning(b *testing.B) {
|
|||||||
handler := func(ctx context.Context, t *Task) error {
|
handler := func(ctx context.Context, t *Task) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
srv.Start(HandlerFunc(handler))
|
_ = srv.Start(HandlerFunc(handler))
|
||||||
|
|
||||||
b.StartTimer() // end setup
|
b.StartTimer() // end setup
|
||||||
|
|
||||||
|
@ -433,7 +433,7 @@ func (c *Client) enqueue(ctx context.Context, msg *base.TaskMessage, uniqueTTL t
|
|||||||
|
|
||||||
func (c *Client) schedule(ctx context.Context, msg *base.TaskMessage, t time.Time, uniqueTTL time.Duration) error {
|
func (c *Client) schedule(ctx context.Context, msg *base.TaskMessage, t time.Time, uniqueTTL time.Duration) error {
|
||||||
if uniqueTTL > 0 {
|
if uniqueTTL > 0 {
|
||||||
ttl := t.Add(uniqueTTL).Sub(time.Now())
|
ttl := time.Until(t.Add(uniqueTTL))
|
||||||
return c.broker.ScheduleUnique(ctx, msg, t, ttl)
|
return c.broker.ScheduleUnique(ctx, msg, t, ttl)
|
||||||
}
|
}
|
||||||
return c.broker.Schedule(ctx, msg, t)
|
return c.broker.Schedule(ctx, msg, t)
|
||||||
|
@ -1173,7 +1173,7 @@ func TestClientEnqueueUniqueWithProcessAtOption(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
gotTTL := r.TTL(context.Background(), base.UniqueKey(base.DefaultQueueName, tc.task.Type(), tc.task.Payload())).Val()
|
gotTTL := r.TTL(context.Background(), base.UniqueKey(base.DefaultQueueName, tc.task.Type(), tc.task.Payload())).Val()
|
||||||
wantTTL := tc.at.Add(tc.ttl).Sub(time.Now())
|
wantTTL := time.Until(tc.at.Add(tc.ttl))
|
||||||
if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
|
if !cmp.Equal(wantTTL.Seconds(), gotTTL.Seconds(), cmpopts.EquateApprox(0, 1)) {
|
||||||
t.Errorf("TTL = %v, want %v", gotTTL, wantTTL)
|
t.Errorf("TTL = %v, want %v", gotTTL, wantTTL)
|
||||||
continue
|
continue
|
||||||
|
@ -120,7 +120,9 @@ func (h *heartbeater) start(wg *sync.WaitGroup) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-h.done:
|
case <-h.done:
|
||||||
h.broker.ClearServerState(h.host, h.pid, h.serverID)
|
if err := h.broker.ClearServerState(h.host, h.pid, h.serverID); err != nil {
|
||||||
|
h.logger.Errorf("Failed to clear server state: %v", err)
|
||||||
|
}
|
||||||
h.logger.Debug("Heartbeater done")
|
h.logger.Debug("Heartbeater done")
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
return
|
return
|
||||||
|
@ -14,7 +14,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/protobuf/ptypes"
|
"github.com/golang/protobuf/ptypes" //nolint: staticcheck
|
||||||
"github.com/hibiken/asynq/internal/errors"
|
"github.com/hibiken/asynq/internal/errors"
|
||||||
pb "github.com/hibiken/asynq/internal/proto"
|
pb "github.com/hibiken/asynq/internal/proto"
|
||||||
"github.com/hibiken/asynq/internal/timeutil"
|
"github.com/hibiken/asynq/internal/timeutil"
|
||||||
@ -379,7 +379,7 @@ func EncodeServerInfo(info *ServerInfo) ([]byte, error) {
|
|||||||
for q, p := range info.Queues {
|
for q, p := range info.Queues {
|
||||||
queues[q] = int32(p)
|
queues[q] = int32(p)
|
||||||
}
|
}
|
||||||
started, err := ptypes.TimestampProto(info.Started)
|
started, err := ptypes.TimestampProto(info.Started) //nolint: staticcheck
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -406,7 +406,7 @@ func DecodeServerInfo(b []byte) (*ServerInfo, error) {
|
|||||||
for q, p := range pbmsg.GetQueues() {
|
for q, p := range pbmsg.GetQueues() {
|
||||||
queues[q] = int(p)
|
queues[q] = int(p)
|
||||||
}
|
}
|
||||||
startTime, err := ptypes.Timestamp(pbmsg.GetStartTime())
|
startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) //nolint: staticcheck
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -441,11 +441,11 @@ func EncodeWorkerInfo(info *WorkerInfo) ([]byte, error) {
|
|||||||
if info == nil {
|
if info == nil {
|
||||||
return nil, fmt.Errorf("cannot encode nil worker info")
|
return nil, fmt.Errorf("cannot encode nil worker info")
|
||||||
}
|
}
|
||||||
startTime, err := ptypes.TimestampProto(info.Started)
|
startTime, err := ptypes.TimestampProto(info.Started) //nolint: staticcheck
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
deadline, err := ptypes.TimestampProto(info.Deadline)
|
deadline, err := ptypes.TimestampProto(info.Deadline) //nolint: staticcheck
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -468,11 +468,11 @@ func DecodeWorkerInfo(b []byte) (*WorkerInfo, error) {
|
|||||||
if err := proto.Unmarshal(b, &pbmsg); err != nil {
|
if err := proto.Unmarshal(b, &pbmsg); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
startTime, err := ptypes.Timestamp(pbmsg.GetStartTime())
|
startTime, err := ptypes.Timestamp(pbmsg.GetStartTime()) //nolint: staticcheck
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
deadline, err := ptypes.Timestamp(pbmsg.GetDeadline())
|
deadline, err := ptypes.Timestamp(pbmsg.GetDeadline()) //nolint: staticcheck
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -519,11 +519,11 @@ func EncodeSchedulerEntry(entry *SchedulerEntry) ([]byte, error) {
|
|||||||
if entry == nil {
|
if entry == nil {
|
||||||
return nil, fmt.Errorf("cannot encode nil scheduler entry")
|
return nil, fmt.Errorf("cannot encode nil scheduler entry")
|
||||||
}
|
}
|
||||||
next, err := ptypes.TimestampProto(entry.Next)
|
next, err := ptypes.TimestampProto(entry.Next) //nolint: staticcheck
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
prev, err := ptypes.TimestampProto(entry.Prev)
|
prev, err := ptypes.TimestampProto(entry.Prev) //nolint: staticcheck
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -544,11 +544,11 @@ func DecodeSchedulerEntry(b []byte) (*SchedulerEntry, error) {
|
|||||||
if err := proto.Unmarshal(b, &pbmsg); err != nil {
|
if err := proto.Unmarshal(b, &pbmsg); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime())
|
next, err := ptypes.Timestamp(pbmsg.GetNextEnqueueTime()) //nolint: staticcheck
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
prev, err := ptypes.Timestamp(pbmsg.GetPrevEnqueueTime())
|
prev, err := ptypes.Timestamp(pbmsg.GetPrevEnqueueTime()) //nolint: staticcheck
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -578,7 +578,7 @@ func EncodeSchedulerEnqueueEvent(event *SchedulerEnqueueEvent) ([]byte, error) {
|
|||||||
if event == nil {
|
if event == nil {
|
||||||
return nil, fmt.Errorf("cannot encode nil enqueue event")
|
return nil, fmt.Errorf("cannot encode nil enqueue event")
|
||||||
}
|
}
|
||||||
enqueuedAt, err := ptypes.TimestampProto(event.EnqueuedAt)
|
enqueuedAt, err := ptypes.TimestampProto(event.EnqueuedAt) //nolint: staticcheck
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -595,7 +595,7 @@ func DecodeSchedulerEnqueueEvent(b []byte) (*SchedulerEnqueueEvent, error) {
|
|||||||
if err := proto.Unmarshal(b, &pbmsg); err != nil {
|
if err := proto.Unmarshal(b, &pbmsg); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
enqueuedAt, err := ptypes.Timestamp(pbmsg.GetEnqueueTime())
|
enqueuedAt, err := ptypes.Timestamp(pbmsg.GetEnqueueTime()) //nolint: staticcheck
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -3206,7 +3206,7 @@ func TestCancelationPubSub(t *testing.T) {
|
|||||||
publish := []string{"one", "two", "three"}
|
publish := []string{"one", "two", "three"}
|
||||||
|
|
||||||
for _, msg := range publish {
|
for _, msg := range publish {
|
||||||
r.PublishCancelation(msg)
|
_ = r.PublishCancelation(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
// allow for message to reach subscribers.
|
// allow for message to reach subscribers.
|
||||||
|
@ -7,7 +7,6 @@ package asynq
|
|||||||
import (
|
import (
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -79,13 +78,13 @@ type PeriodicTaskConfig struct {
|
|||||||
|
|
||||||
func (c *PeriodicTaskConfig) hash() string {
|
func (c *PeriodicTaskConfig) hash() string {
|
||||||
h := sha256.New()
|
h := sha256.New()
|
||||||
io.WriteString(h, c.Cronspec)
|
_, _ = h.Write([]byte(c.Cronspec))
|
||||||
io.WriteString(h, c.Task.Type())
|
_, _ = h.Write([]byte(c.Task.Type()))
|
||||||
h.Write(c.Task.Payload())
|
h.Write(c.Task.Payload())
|
||||||
opts := stringifyOptions(c.Opts)
|
opts := stringifyOptions(c.Opts)
|
||||||
sort.Strings(opts)
|
sort.Strings(opts)
|
||||||
for _, opt := range opts {
|
for _, opt := range opts {
|
||||||
io.WriteString(h, opt)
|
_, _ = h.Write([]byte(opt))
|
||||||
}
|
}
|
||||||
return fmt.Sprintf("%x", h.Sum(nil))
|
return fmt.Sprintf("%x", h.Sum(nil))
|
||||||
}
|
}
|
||||||
|
@ -313,7 +313,7 @@ func TestProcessorRetry(t *testing.T) {
|
|||||||
pending: []*base.TaskMessage{m1, m2, m3, m4},
|
pending: []*base.TaskMessage{m1, m2, m3, m4},
|
||||||
delay: time.Minute,
|
delay: time.Minute,
|
||||||
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
|
handler: HandlerFunc(func(ctx context.Context, task *Task) error {
|
||||||
return fmt.Errorf(errMsg)
|
return errors.New(errMsg)
|
||||||
}),
|
}),
|
||||||
wait: 2 * time.Second,
|
wait: 2 * time.Second,
|
||||||
wantErrMsg: errMsg,
|
wantErrMsg: errMsg,
|
||||||
|
@ -289,7 +289,9 @@ func (s *Scheduler) runHeartbeater() {
|
|||||||
select {
|
select {
|
||||||
case <-s.done:
|
case <-s.done:
|
||||||
s.logger.Debugf("Scheduler heatbeater shutting down")
|
s.logger.Debugf("Scheduler heatbeater shutting down")
|
||||||
s.rdb.ClearSchedulerEntries(s.id)
|
if err := s.rdb.ClearSchedulerEntries(s.id); err != nil {
|
||||||
|
s.logger.Errorf("Failed to clear the scheduler entries: %v", err)
|
||||||
|
}
|
||||||
ticker.Stop()
|
ticker.Stop()
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
@ -91,7 +91,7 @@ func TestServerRun(t *testing.T) {
|
|||||||
// Make sure server exits when receiving TERM signal.
|
// Make sure server exits when receiving TERM signal.
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(2 * time.Second)
|
time.Sleep(2 * time.Second)
|
||||||
syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
|
_ = syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
|
||||||
done <- struct{}{}
|
done <- struct{}{}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user