mirror of
https://github.com/hibiken/asynq.git
synced 2024-12-25 23:32:17 +08:00
Change UTC timezone to system time
This commit is contained in:
parent
123d560a44
commit
020309f472
@ -355,7 +355,7 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
desc: "With deadline option",
|
desc: "With deadline option",
|
||||||
task: task,
|
task: task,
|
||||||
opts: []Option{
|
opts: []Option{
|
||||||
Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)),
|
Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.Local)),
|
||||||
},
|
},
|
||||||
wantInfo: &TaskInfo{
|
wantInfo: &TaskInfo{
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
@ -367,7 +367,7 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
LastErr: "",
|
LastErr: "",
|
||||||
LastFailedAt: time.Time{},
|
LastFailedAt: time.Time{},
|
||||||
Timeout: noTimeout,
|
Timeout: noTimeout,
|
||||||
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC),
|
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.Local),
|
||||||
NextProcessAt: now,
|
NextProcessAt: now,
|
||||||
},
|
},
|
||||||
wantPending: map[string][]*base.TaskMessage{
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
@ -378,7 +378,7 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
Retry: defaultMaxRetry,
|
Retry: defaultMaxRetry,
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
Timeout: int64(noTimeout.Seconds()),
|
Timeout: int64(noTimeout.Seconds()),
|
||||||
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix(),
|
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.Local).Unix(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -388,7 +388,7 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
task: task,
|
task: task,
|
||||||
opts: []Option{
|
opts: []Option{
|
||||||
Timeout(20 * time.Second),
|
Timeout(20 * time.Second),
|
||||||
Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC)),
|
Deadline(time.Date(2020, time.June, 24, 0, 0, 0, 0, time.Local)),
|
||||||
},
|
},
|
||||||
wantInfo: &TaskInfo{
|
wantInfo: &TaskInfo{
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
@ -400,7 +400,7 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
LastErr: "",
|
LastErr: "",
|
||||||
LastFailedAt: time.Time{},
|
LastFailedAt: time.Time{},
|
||||||
Timeout: 20 * time.Second,
|
Timeout: 20 * time.Second,
|
||||||
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC),
|
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.Local),
|
||||||
NextProcessAt: now,
|
NextProcessAt: now,
|
||||||
},
|
},
|
||||||
wantPending: map[string][]*base.TaskMessage{
|
wantPending: map[string][]*base.TaskMessage{
|
||||||
@ -411,7 +411,7 @@ func TestClientEnqueue(t *testing.T) {
|
|||||||
Retry: defaultMaxRetry,
|
Retry: defaultMaxRetry,
|
||||||
Queue: "default",
|
Queue: "default",
|
||||||
Timeout: 20,
|
Timeout: 20,
|
||||||
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.UTC).Unix(),
|
Deadline: time.Date(2020, time.June, 24, 0, 0, 0, 0, time.Local).Unix(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -414,7 +414,7 @@ func TestInspectorGetQueueInfo(t *testing.T) {
|
|||||||
func TestInspectorHistory(t *testing.T) {
|
func TestInspectorHistory(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
now := time.Now().UTC()
|
now := time.Now()
|
||||||
inspector := NewInspector(getRedisConnOpt(t))
|
inspector := NewInspector(getRedisConnOpt(t))
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
@ -3295,7 +3295,7 @@ func TestInspectorSchedulerEntries(t *testing.T) {
|
|||||||
rdbClient := rdb.NewRDB(r)
|
rdbClient := rdb.NewRDB(r)
|
||||||
inspector := NewInspector(getRedisConnOpt(t))
|
inspector := NewInspector(getRedisConnOpt(t))
|
||||||
|
|
||||||
now := time.Now().UTC()
|
now := time.Now()
|
||||||
schedulerID := "127.0.0.1:9876:abc123"
|
schedulerID := "127.0.0.1:9876:abc123"
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
|
@ -168,12 +168,12 @@ func FailedTotalKey(qname string) string {
|
|||||||
|
|
||||||
// ProcessedKey returns a redis key for processed count for the given day for the queue.
|
// ProcessedKey returns a redis key for processed count for the given day for the queue.
|
||||||
func ProcessedKey(qname string, t time.Time) string {
|
func ProcessedKey(qname string, t time.Time) string {
|
||||||
return fmt.Sprintf("%sprocessed:%s", QueueKeyPrefix(qname), t.UTC().Format("2006-01-02"))
|
return fmt.Sprintf("%sprocessed:%s", QueueKeyPrefix(qname), t.In(time.Local).Format("2006-01-02"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// FailedKey returns a redis key for failure count for the given day for the queue.
|
// FailedKey returns a redis key for failure count for the given day for the queue.
|
||||||
func FailedKey(qname string, t time.Time) string {
|
func FailedKey(qname string, t time.Time) string {
|
||||||
return fmt.Sprintf("%sfailed:%s", QueueKeyPrefix(qname), t.UTC().Format("2006-01-02"))
|
return fmt.Sprintf("%sfailed:%s", QueueKeyPrefix(qname), t.In(time.Local).Format("2006-01-02"))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServerInfoKey returns a redis key for process info.
|
// ServerInfoKey returns a redis key for process info.
|
||||||
|
@ -214,9 +214,9 @@ func TestProcessedKey(t *testing.T) {
|
|||||||
input time.Time
|
input time.Time
|
||||||
want string
|
want string
|
||||||
}{
|
}{
|
||||||
{"default", time.Date(2019, 11, 14, 10, 30, 1, 1, time.UTC), "asynq:{default}:processed:2019-11-14"},
|
{"default", time.Date(2019, 11, 14, 10, 30, 1, 1, time.Local), "asynq:{default}:processed:2019-11-14"},
|
||||||
{"critical", time.Date(2020, 12, 1, 1, 0, 1, 1, time.UTC), "asynq:{critical}:processed:2020-12-01"},
|
{"critical", time.Date(2020, 12, 1, 1, 0, 1, 1, time.Local), "asynq:{critical}:processed:2020-12-01"},
|
||||||
{"default", time.Date(2020, 1, 6, 15, 02, 1, 1, time.UTC), "asynq:{default}:processed:2020-01-06"},
|
{"default", time.Date(2020, 1, 6, 15, 02, 1, 1, time.Local), "asynq:{default}:processed:2020-01-06"},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
@ -233,9 +233,9 @@ func TestFailedKey(t *testing.T) {
|
|||||||
input time.Time
|
input time.Time
|
||||||
want string
|
want string
|
||||||
}{
|
}{
|
||||||
{"default", time.Date(2019, 11, 14, 10, 30, 1, 1, time.UTC), "asynq:{default}:failed:2019-11-14"},
|
{"default", time.Date(2019, 11, 14, 10, 30, 1, 1, time.Local), "asynq:{default}:failed:2019-11-14"},
|
||||||
{"custom", time.Date(2020, 12, 1, 1, 0, 1, 1, time.UTC), "asynq:{custom}:failed:2020-12-01"},
|
{"custom", time.Date(2020, 12, 1, 1, 0, 1, 1, time.Local), "asynq:{custom}:failed:2020-12-01"},
|
||||||
{"low", time.Date(2020, 1, 6, 15, 02, 1, 1, time.UTC), "asynq:{low}:failed:2020-01-06"},
|
{"low", time.Date(2020, 1, 6, 15, 02, 1, 1, time.Local), "asynq:{low}:failed:2020-01-06"},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
@ -336,7 +336,7 @@ func TestUniqueKey(t *testing.T) {
|
|||||||
"address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"},
|
"address": map[string]string{"line": "123 Main St", "city": "Boston", "state": "MA"},
|
||||||
"names": []string{"bob", "mike", "rob"}})
|
"names": []string{"bob", "mike", "rob"}})
|
||||||
payload4 := toBytes(map[string]interface{}{
|
payload4 := toBytes(map[string]interface{}{
|
||||||
"time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.UTC),
|
"time": time.Date(2020, time.July, 28, 0, 0, 0, 0, time.Local),
|
||||||
"duration": time.Hour})
|
"duration": time.Hour})
|
||||||
|
|
||||||
checksum := func(data []byte) string {
|
checksum := func(data []byte) string {
|
||||||
@ -633,8 +633,8 @@ func TestSchedulerEntryEncoding(t *testing.T) {
|
|||||||
Type: "task_A",
|
Type: "task_A",
|
||||||
Payload: toBytes(map[string]interface{}{"foo": "bar"}),
|
Payload: toBytes(map[string]interface{}{"foo": "bar"}),
|
||||||
Opts: []string{"Queue('email')"},
|
Opts: []string{"Queue('email')"},
|
||||||
Next: time.Now().Add(30 * time.Second).UTC(),
|
Next: time.Now().Add(30 * time.Second),
|
||||||
Prev: time.Now().Add(-2 * time.Minute).UTC(),
|
Prev: time.Now().Add(-2 * time.Minute),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -664,7 +664,7 @@ func TestSchedulerEnqueueEventEncoding(t *testing.T) {
|
|||||||
{
|
{
|
||||||
event: SchedulerEnqueueEvent{
|
event: SchedulerEnqueueEvent{
|
||||||
TaskID: uuid.NewString(),
|
TaskID: uuid.NewString(),
|
||||||
EnqueuedAt: time.Now().Add(-30 * time.Second).UTC(),
|
EnqueuedAt: time.Now().Add(-30 * time.Second),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -74,7 +74,7 @@ func (l *baseLogger) prefixPrint(prefix string, args ...interface{}) {
|
|||||||
func newBase(out io.Writer) *baseLogger {
|
func newBase(out io.Writer) *baseLogger {
|
||||||
prefix := fmt.Sprintf("asynq: pid=%d ", os.Getpid())
|
prefix := fmt.Sprintf("asynq: pid=%d ", os.Getpid())
|
||||||
return &baseLogger{
|
return &baseLogger{
|
||||||
stdlog.New(out, prefix, stdlog.Ldate|stdlog.Ltime|stdlog.Lmicroseconds|stdlog.LUTC),
|
stdlog.New(out, prefix, stdlog.Ldate|stdlog.Ltime|stdlog.Lmicroseconds),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -373,7 +373,7 @@ func (r *RDB) HistoricalStats(qname string, n int) ([]*DailyStats, error) {
|
|||||||
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
|
return nil, errors.E(op, errors.NotFound, &errors.QueueNotFoundError{Queue: qname})
|
||||||
}
|
}
|
||||||
const day = 24 * time.Hour
|
const day = 24 * time.Hour
|
||||||
now := r.clock.Now().UTC()
|
now := r.clock.Now()
|
||||||
var days []time.Time
|
var days []time.Time
|
||||||
var keys []string
|
var keys []string
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
|
@ -343,7 +343,7 @@ func TestCurrentStatsWithNonExistentQueue(t *testing.T) {
|
|||||||
func TestHistoricalStats(t *testing.T) {
|
func TestHistoricalStats(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
now := time.Now().UTC()
|
now := time.Now()
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
qname string // queue of interest
|
qname string // queue of interest
|
||||||
@ -5326,7 +5326,7 @@ func TestListWorkers(t *testing.T) {
|
|||||||
|
|
||||||
func TestWriteListClearSchedulerEntries(t *testing.T) {
|
func TestWriteListClearSchedulerEntries(t *testing.T) {
|
||||||
r := setup(t)
|
r := setup(t)
|
||||||
now := time.Now().UTC()
|
now := time.Now()
|
||||||
schedulerID := "127.0.0.1:9876:abc123"
|
schedulerID := "127.0.0.1:9876:abc123"
|
||||||
|
|
||||||
data := []*base.SchedulerEntry{
|
data := []*base.SchedulerEntry{
|
||||||
|
@ -1356,7 +1356,7 @@ func (r *RDB) WriteServerState(info *base.ServerInfo, workers []*base.WorkerInfo
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode server info: %v", err))
|
return errors.E(op, errors.Internal, fmt.Sprintf("cannot encode server info: %v", err))
|
||||||
}
|
}
|
||||||
exp := r.clock.Now().Add(ttl).UTC()
|
exp := r.clock.Now().Add(ttl)
|
||||||
args := []interface{}{ttl.Seconds(), bytes} // args to the lua script
|
args := []interface{}{ttl.Seconds(), bytes} // args to the lua script
|
||||||
for _, w := range workers {
|
for _, w := range workers {
|
||||||
bytes, err := base.EncodeWorkerInfo(w)
|
bytes, err := base.EncodeWorkerInfo(w)
|
||||||
@ -1421,7 +1421,7 @@ func (r *RDB) WriteSchedulerEntries(schedulerID string, entries []*base.Schedule
|
|||||||
}
|
}
|
||||||
args = append(args, bytes)
|
args = append(args, bytes)
|
||||||
}
|
}
|
||||||
exp := r.clock.Now().Add(ttl).UTC()
|
exp := r.clock.Now().Add(ttl)
|
||||||
key := base.SchedulerEntriesKey(schedulerID)
|
key := base.SchedulerEntriesKey(schedulerID)
|
||||||
err := r.client.ZAdd(ctx, base.AllSchedulers, redis.Z{Score: float64(exp.Unix()), Member: key}).Err()
|
err := r.client.ZAdd(ctx, base.AllSchedulers, redis.Z{Score: float64(exp.Unix()), Member: key}).Err()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -2767,7 +2767,7 @@ func TestWriteServerState(t *testing.T) {
|
|||||||
Concurrency: 10,
|
Concurrency: 10,
|
||||||
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
|
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
|
||||||
StrictPriority: false,
|
StrictPriority: false,
|
||||||
Started: time.Now().UTC(),
|
Started: time.Now(),
|
||||||
Status: "active",
|
Status: "active",
|
||||||
ActiveWorkerCount: 0,
|
ActiveWorkerCount: 0,
|
||||||
}
|
}
|
||||||
@ -2857,7 +2857,7 @@ func TestWriteServerStateWithWorkers(t *testing.T) {
|
|||||||
Concurrency: 10,
|
Concurrency: 10,
|
||||||
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
|
Queues: map[string]int{"default": 2, "email": 5, "low": 1},
|
||||||
StrictPriority: false,
|
StrictPriority: false,
|
||||||
Started: time.Now().Add(-10 * time.Minute).UTC(),
|
Started: time.Now().Add(-10 * time.Minute),
|
||||||
Status: "active",
|
Status: "active",
|
||||||
ActiveWorkerCount: len(workers),
|
ActiveWorkerCount: len(workers),
|
||||||
}
|
}
|
||||||
|
@ -50,7 +50,7 @@ func TestTaskMessageBuilder(t *testing.T) {
|
|||||||
ops: func(b *TaskMessageBuilder) {
|
ops: func(b *TaskMessageBuilder) {
|
||||||
b.SetRetry(1).
|
b.SetRetry(1).
|
||||||
SetTimeout(20 * time.Second).
|
SetTimeout(20 * time.Second).
|
||||||
SetDeadline(time.Date(2017, 3, 6, 0, 0, 0, 0, time.UTC))
|
SetDeadline(time.Date(2017, 3, 6, 0, 0, 0, 0, time.Local))
|
||||||
},
|
},
|
||||||
want: &base.TaskMessage{
|
want: &base.TaskMessage{
|
||||||
Type: "default_task",
|
Type: "default_task",
|
||||||
@ -58,7 +58,7 @@ func TestTaskMessageBuilder(t *testing.T) {
|
|||||||
Payload: nil,
|
Payload: nil,
|
||||||
Retry: 1,
|
Retry: 1,
|
||||||
Timeout: 20,
|
Timeout: 20,
|
||||||
Deadline: time.Date(2017, 3, 6, 0, 0, 0, 0, time.UTC).Unix(),
|
Deadline: time.Date(2017, 3, 6, 0, 0, 0, 0, time.Local).Unix(),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -180,7 +180,7 @@ func printQueueInfo(info *asynq.QueueInfo) {
|
|||||||
},
|
},
|
||||||
)
|
)
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
bold.Printf("Daily Stats %s UTC\n", info.Timestamp.UTC().Format("2006-01-02"))
|
bold.Printf("Daily Stats %s\n", info.Timestamp.In(time.Local).Format("2006-01-02"))
|
||||||
printTable(
|
printTable(
|
||||||
[]string{"processed", "failed", "error rate"},
|
[]string{"processed", "failed", "error rate"},
|
||||||
func(w io.Writer, tmpl string) {
|
func(w io.Writer, tmpl string) {
|
||||||
@ -218,7 +218,7 @@ func queueHistory(cmd *cobra.Command, args []string) {
|
|||||||
|
|
||||||
func printDailyStats(stats []*asynq.DailyStats) {
|
func printDailyStats(stats []*asynq.DailyStats) {
|
||||||
printTable(
|
printTable(
|
||||||
[]string{"date (UTC)", "processed", "failed", "error rate"},
|
[]string{"date", "processed", "failed", "error rate"},
|
||||||
func(w io.Writer, tmpl string) {
|
func(w io.Writer, tmpl string) {
|
||||||
for _, s := range stats {
|
for _, s := range stats {
|
||||||
var errRate string
|
var errRate string
|
||||||
|
@ -139,7 +139,7 @@ func stats(cmd *cobra.Command, args []string) {
|
|||||||
printStatsByQueue(stats)
|
printStatsByQueue(stats)
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
|
|
||||||
bold.Printf("Daily Stats %s UTC\n", aggStats.Timestamp.UTC().Format("2006-01-02"))
|
bold.Printf("Daily Stats %s\n", aggStats.Timestamp.In(time.Local).Format("2006-01-02"))
|
||||||
printSuccessFailureStats(&aggStats)
|
printSuccessFailureStats(&aggStats)
|
||||||
fmt.Println()
|
fmt.Println()
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user