mirror of
https://github.com/hibiken/asynq.git
synced 2025-09-19 13:21:58 +08:00
Add ResultWriter type
This commit is contained in:
@@ -1052,3 +1052,13 @@ func (r *RDB) ClearSchedulerHistory(entryID string) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WriteResult writes the given result data for the specified task.
|
||||
func (r *RDB) WriteResult(qname, taskID string, data []byte) (int, error) {
|
||||
var op errors.Op = "rdb.WriteResult"
|
||||
taskKey := base.TaskKey(qname, taskID)
|
||||
if err := r.client.HSet(context.Background(), taskKey, "result", data).Err(); err != nil {
|
||||
return 0, errors.E(op, errors.Unknown, &errors.RedisCommandError{Command: "hset", Err: err})
|
||||
}
|
||||
return len(data), nil
|
||||
}
|
||||
|
@@ -2573,3 +2573,39 @@ func TestCancelationPubSub(t *testing.T) {
|
||||
}
|
||||
mu.Unlock()
|
||||
}
|
||||
|
||||
func TestWriteResult(t *testing.T) {
|
||||
r := setup(t)
|
||||
defer r.Close()
|
||||
|
||||
tests := []struct {
|
||||
qname string
|
||||
taskID string
|
||||
data []byte
|
||||
}{
|
||||
{
|
||||
qname: "default",
|
||||
taskID: uuid.NewString(),
|
||||
data: []byte("hello"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
h.FlushDB(t, r.client)
|
||||
|
||||
n, err := r.WriteResult(tc.qname, tc.taskID, tc.data)
|
||||
if err != nil {
|
||||
t.Errorf("WriteResult failed: %v", err)
|
||||
continue
|
||||
}
|
||||
if n != len(tc.data) {
|
||||
t.Errorf("WriteResult returned %d, want %d", n, len(tc.data))
|
||||
}
|
||||
|
||||
taskKey := base.TaskKey(tc.qname, tc.taskID)
|
||||
got := r.client.HGet(context.Background(), taskKey, "result").Val()
|
||||
if got != string(tc.data) {
|
||||
t.Errorf("`result` field under %q key is set to %q, want %q", taskKey, got, string(tc.data))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user