diff --git a/internal/bisect/bisect.go b/internal/bisect/bisect.go new file mode 100644 index 0000000..66b0eb7 --- /dev/null +++ b/internal/bisect/bisect.go @@ -0,0 +1,189 @@ +// Copyright 2024 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package bisect is used for bisecting a target repository +// with the goal of finding a commit introducing a regression. +package bisect + +import ( + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "log/slog" + "time" + + "golang.org/x/oscar/internal/github" + "golang.org/x/oscar/internal/queue" + "golang.org/x/oscar/internal/storage" + "golang.org/x/oscar/internal/storage/timed" + "rsc.io/ordered" +) + +const ( + taskKind = "bisection.Task" + taskUpdateKind = "bisection.TaskUpdate" // used for storing task updates in a timed db +) + +// This package stores the following key schemas in the database: +// +// ["bisection.Task", ID] => JSON of Task structure +// ["bisection.TaskUpdateByTime", DBTime, ID] => [] +// +// Bisecting a repository for a change regression can take considerable +// time. This has an effect on how the bisection is run in gaby. If +// bisection is being run as part of a batch job, other jobs will be +// blocked by the bisection. Spawning a bisection in a goroutine +// or a process will in principle not work on Cloud Run, which can +// move or kill a gaby instance if there are no requests served [1], +// even if several bisections are being ran in the background. +// +// This package addresses this problem by asynchronous bisection. +// [Client.BisectAsync] spawns a bisection [Task] by sending it to +// a [queue.Queue], which in practice will be a Cloud Tasks [2] +// queue. The latter will then send a request to gaby, which in +// turn will call [Client.Bisect]. The results and partial progress +// of bisection are saved to the provided database. +// +// [1] https://cloud.google.com/run/docs/about-instance-autoscaling +// [2] https://cloud.google.com/tasks/docs + +// o is short for ordered.Encode. +func o(list ...any) []byte { return ordered.Encode(list...) } + +// A Client is responsible for dispatching +// and executing bisection tasks. +type Client struct { + slog *slog.Logger + db storage.DB + queue queue.Queue +} + +// New returns a new client for bisection. +// The client uses the given logger, database, and queue. +func New(lg *slog.Logger, db storage.DB, q queue.Queue) *Client { + return &Client{ + slog: lg, + db: db, + queue: q, + } +} + +// BisectAsync creates and spawns a bisection task for trigger +// if the latter encodes a request for bisection. Otherwise, it +// does nothing and returns nil. +// +// BisectAsync creates a [Task] and saves it to the database, +// and then triggers an asynchronous execution of [Client.Bisect] +// through [Client] queue. +// +// TODO: generalize trigger beyond GitHub issue comment. +func (c *Client) BisectAsync(ctx context.Context, trigger *github.IssueComment) error { + if trigger.Project() != "golang/go" { + return fmt.Errorf("bisect.Add: only golang/go repo currently supported, got '%s'", trigger.Project()) + } + + now := time.Now() + t := &Task{ + Trigger: trigger.URL, + Issue: trigger.IssueURL, + Repository: "https://go.googlesource.com/go", + Bad: "master", + Good: "go1.22.0", + Regression: regression(trigger.Body), + Created: now, + Updated: now, + } + t.ID = newTaskID(t) + + skey := string(o(taskKind, t.ID)) + // Lock the task for sanity. + // This also helps with testing + // when enqueued bisection starts + // before BisectAsync saves the + // task to the database. + c.db.Lock(skey) + defer c.db.Unlock(skey) + + ok, err := c.queue.Enqueue(ctx, t, &queue.Options{}) + c.slog.Info("bisect.BisectAsync: enqueueing bisection task", "id", t.ID, "issue", t.Issue, "enqueued", ok) + if ok { + // Save the task only if it is enqueued. + t.Status = StatusQueued + c.save(t) + } + return err +} + +// regression extracts a bisection +// test code from body. +func regression(body string) string { + // For now, assume the body is + // the regression code. + return body +} + +// newTaskID creates a unique ID for t based on +// the repository, issue, trigger, command, and +// bisect commit information. +func newTaskID(t *Task) string { + hasher := sha256.New() + io.WriteString(hasher, t.Trigger) + io.WriteString(hasher, t.Repository) + io.WriteString(hasher, t.Issue) + io.WriteString(hasher, t.Good) + io.WriteString(hasher, t.Bad) + io.WriteString(hasher, t.Regression) + return hex.EncodeToString(hasher.Sum(nil)) +} + +// task returns [Task] with ID equal to id from the +// database, if such task exists. It returns nil otherwise. +func (c *Client) task(id string) (*Task, error) { + key := o(taskKind, id) + tj, ok := c.db.Get(key) + if !ok { + return nil, nil + } + var t Task + if err := json.Unmarshal(tj, &t); err != nil { + return nil, err + } + return &t, nil +} + +// save the task to the database. +func (c *Client) save(t *Task) { + b := c.db.Batch() + key := o(taskKind, t.ID) + b.Set(key, storage.JSON(t)) + timed.Set(c.db, b, taskUpdateKind, o(t.ID), nil) + b.Apply() + c.db.Flush() +} + +// Bisect performs bisection on task with task id. +func (c *Client) Bisect(id string) error { + skey := string(o(taskKind, id)) + // Lock the task just in case, so that + // no one else is bisecting it concurrently. + c.db.Lock(skey) + defer c.db.Unlock(skey) + + t, err := c.task(id) + if err != nil || t == nil { + return fmt.Errorf("bisect.Bisect: task could not be found id=%s err=%v", id, err) + } + + // TODO: implement bisection logic + + // TODO: if a task with the t.ID already exists and it has been more + // than cloud-task-deadline minutes since the task has been updated, + // assume the task was killed and restart the task from where it + // stopped the last time it was updated? + + return nil +} diff --git a/internal/bisect/bisect_test.go b/internal/bisect/bisect_test.go new file mode 100644 index 0000000..2d07b09 --- /dev/null +++ b/internal/bisect/bisect_test.go @@ -0,0 +1,100 @@ +// Copyright 2024 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package bisect + +import ( + "context" + "errors" + "net/url" + "testing" + "time" + + "golang.org/x/oscar/internal/github" + "golang.org/x/oscar/internal/queue" + "golang.org/x/oscar/internal/storage" + "golang.org/x/oscar/internal/testutil" +) + +func TestNewTaskID(t *testing.T) { + created := time.Date(2024, time.January, 0, 0, 0, 0, 0, time.UTC) // fixed date + for _, test := range []struct { + task Task + want string + }{ + { + Task{Trigger: "t", Issue: "i", Repository: "r", Regression: "c", Good: "g", Bad: "b"}, + "182eae594755dfbfbdba6d5c312d3655fbcc9dd634c818ebaf2da1dd7b6bb808", + }, + // Status, ID, Output, Created, and Updated are not important for ID computation. + { + Task{ID: "id", Trigger: "t", Issue: "i", Repository: "r", Regression: "c", Good: "g", + Bad: "b", Output: "o", Updated: time.Now(), Status: StatusSucceeded, Created: created}, + "182eae594755dfbfbdba6d5c312d3655fbcc9dd634c818ebaf2da1dd7b6bb808", + }, + } { + got := newTaskID(&test.task) + if got != test.want { + t.Errorf("%v: got %s, want %s", test.task, got, test.want) + } + } +} + +func TestBisectAsync(t *testing.T) { + check := testutil.Checker(t) + lg := testutil.Slogger(t) + db := storage.MemDB() + ctx := context.Background() + + var c *Client + // Process simulates what [Client.BisectAsync] will do in prod: + // send a task to a Cloud Tasks queue, which will issue a [http.Request] + // to gaby handle, which will then call [Client.Bisect] with the request. + process := func(_ context.Context, t queue.Task) error { + // Actual bisection handler will take an http + // request and parse the id param similarly. + url, err := url.Parse(t.Path() + "?" + t.Params()) + if err != nil { + return err + } + return c.Bisect(url.Query().Get("id")) + } + q := queue.NewInMemory(ctx, 1, process) + c = New(lg, db, q) + + trigger1 := &github.IssueComment{ + URL: "https://api.github.com/repos/golang/go/issues/00001#issuecomment-000001", + IssueURL: "https://api.github.com/repos/golang/go/issues/00001", + Body: "body1", + } + trigger2 := &github.IssueComment{ + URL: "https://api.github.com/repos/golang/go/issues/00002#issuecomment-000002", + IssueURL: "https://api.github.com/repos/golang/go/issues/00002", + Body: "body2", + } + check(c.BisectAsync(ctx, trigger1)) + check(c.BisectAsync(ctx, trigger2)) + + q.Wait(ctx) + check(errors.Join(q.Errors()...)) + + w := c.TaskWatcher("test") + var tasks []*Task + for e := range w.Recent() { + task, err := c.task(e.ID) + if err != nil { + t.Fatal(err) + } + tasks = append(tasks, task) + } + + if len(tasks) != 2 { + t.Errorf("want 2 tasks; got %d", len(tasks)) + } + for _, task := range tasks { + if task.Status != StatusQueued { + t.Errorf("want %d status for %v; got %d", StatusQueued, task, task.Status) + } + } +} diff --git a/internal/bisect/data.go b/internal/bisect/data.go new file mode 100644 index 0000000..797dc1e --- /dev/null +++ b/internal/bisect/data.go @@ -0,0 +1,119 @@ +// Copyright 2024 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package bisect + +import ( + "fmt" + "time" + + "golang.org/x/oscar/internal/storage" + "golang.org/x/oscar/internal/storage/timed" + "rsc.io/ordered" +) + +// Status is the status of the bisection task. +type Status int + +const ( + // Bisection task is ready to start. + StatusReady Status = iota + // Bisection task is enqueued. + StatusQueued + // Bisection task is in progress. + StatusStarted + // Bisection task failed. + StatusFailed + // Bisection task finished successfully. + StatusSucceeded +) + +// Task contains metadata and progress information +// on bisection run that is saved to the database +// and also used as a Cloud Task queue entry. +type Task struct { + // ID is the unique identifier for the task. + ID string + // Trigger identifies what triggered the + // bisection task. For instance, it can + // be a GitHub comment requesting a bisection. + Trigger string + // Issue identifies the problem associated + // with the bisection. For instance, Issue + // can be a GitHub issue for which the + // bisection is ran. + Issue string + // Repository is the repo on which the + // bisection is performed. + Repository string + // Bad is the commit hash or tag + // from which the bisection starts. + Bad string + // Good is the commit hash or tag + // at which the bisection ends. + Good string + // Regression is Go code reproducing a + // bug that needs to be bisected. Currently, + // it is expected to be a Go test. + Regression string + // Output is the output of bisection. It + // can contain progress and debug messages. + Output string + // Result encodes the bisection finding. + Result string + // Error is a message describing bisection + // failure, if any. + Error string + // Status is the status of the bisection. + Status Status + // Updated is the last time the bisection + // task data was updated. Together with + // Status, Updated can be used to infer + // when the task finished. + Updated time.Time + // Created is the time the bisection task + // was created and queued for execution. + Created time.Time +} + +// Name of a task is its issue combined with ID. +func (t *Task) Name() string { + return fmt.Sprintf("%s-%s", t.Issue, t.ID) +} + +// Path is always "bisect". This is the gaby +// endpoint to which the task data will be sent. +func (t *Task) Path() string { + return "bisect" +} + +// Params encodes task ID. +func (t *Task) Params() string { + return "id=" + t.ID +} + +// TaskWatcher returns a new [timed.Watcher] with the given name. +// It picks up where any previous Watcher of the same name left off. +func (c *Client) TaskWatcher(name string) *timed.Watcher[*TaskEvent] { + return timed.NewWatcher(c.slog, c.db, name, taskUpdateKind, c.decodeTaskEvent) +} + +// decodeTaskEvent decodes a taskUpdateKind [timed.Entry] into +// a task event. +func (c *Client) decodeTaskEvent(t *timed.Entry) *TaskEvent { + te := TaskEvent{ + DBTime: t.ModTime, + } + if err := ordered.Decode(t.Key, &te.ID); err != nil { + c.db.Panic("bisect task event decode", "key", storage.Fmt(t.Key), "err", err) + } + return &te +} + +// A TaskEvent is a bisection [Task] +// event returned by bisection watchers. +type TaskEvent struct { + DBTime timed.DBTime // when event was created + ID string // ID of the bisection task +} diff --git a/internal/gcp/queue/queue.go b/internal/gcp/queue/queue.go index dafe052..7940301 100644 --- a/internal/gcp/queue/queue.go +++ b/internal/gcp/queue/queue.go @@ -2,8 +2,8 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -// Package queue provides queue implementations that can be used for -// asynchronous scheduling of fetch actions. +// Package queue provides a GCP queue implementation that +// can be used for asynchronous scheduling of fetch actions. package queue import ( @@ -18,36 +18,14 @@ import ( cloudtasks "cloud.google.com/go/cloudtasks/apiv2" taskspb "cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb" + gqueue "golang.org/x/oscar/internal/queue" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" ) -// A Task can produce information needed for Cloud Tasks. -type Task interface { - Name() string // Human-readable string for the task. Need not be unique. - Path() string // URL path - Params() string // URL (escaped) query params -} - -// A Queue provides an interface for asynchronous scheduling of tasks. -type Queue interface { - // Enqueue enqueues a task request. - // It reports whether a new task was actually added to the queue. - Enqueue(context.Context, Task, *Options) (bool, error) -} - -// Metadata is data needed to create a Cloud Task Queue. -type Metadata struct { - Project string // Name of the gaby project. - Location string // Location of the queue (e.g., us-central1). - QueueName string // Unique ID of the queue. - QueueURL string // URL of the Cloud Run service. - ServiceAccount string // Email of the service account associated with the project. -} - // New creates a new Queue based on metadata m. -func New(ctx context.Context, m *Metadata) (Queue, error) { +func New(ctx context.Context, m *gqueue.Metadata) (gqueue.Queue, error) { client, err := cloudtasks.NewClient(ctx) if err != nil { return nil, err @@ -74,7 +52,7 @@ type GCP struct { // newGCP returns a new Queue based on metadata m that can be used to // enqueue tasks using the cloud tasks API. The given m.QueueName // should be the name of the queue in the cloud tasks console. -func newGCP(client *cloudtasks.Client, m *Metadata) (*GCP, error) { +func newGCP(client *cloudtasks.Client, m *gqueue.Metadata) (*GCP, error) { if m.QueueName == "" { return nil, errors.New("empty queue name") } @@ -106,9 +84,9 @@ func newGCP(client *cloudtasks.Client, m *Metadata) (*GCP, error) { // It returns an error if there was an error hashing the task name, or // an error pushing the task to GCP. // If the task was a duplicate, it returns (false, nil). -func (q *GCP) Enqueue(ctx context.Context, task Task, opts *Options) (bool, error) { +func (q *GCP) Enqueue(ctx context.Context, task gqueue.Task, opts *gqueue.Options) (bool, error) { if opts == nil { - opts = &Options{} + opts = &gqueue.Options{} } // Cloud Tasks enforces an RPC timeout of at most 30s. I couldn't find this // in the documentation, but using a larger value, or no timeout, results in @@ -132,18 +110,11 @@ func (q *GCP) Enqueue(ctx context.Context, task Task, opts *Options) (bool, erro return false, fmt.Errorf("q.client.CreateTask(ctx, req): %v", err) } -// Options is used to provide option arguments for a task queue. -type Options struct { - // TaskNameSuffix is appended to the task name to force reprocessing of - // tasks that would normally be de-duplicated. - TaskNameSuffix string -} - // maxCloudTasksTimeout is the maximum timeout for HTTP tasks. // See https://cloud.google.com/tasks/docs/creating-http-target-tasks. const maxCloudTasksTimeout = 30 * time.Minute -func (q *GCP) newTaskRequest(task Task, opts *Options) (*taskspb.CreateTaskRequest, error) { +func (q *GCP) newTaskRequest(task gqueue.Task, opts *gqueue.Options) (*taskspb.CreateTaskRequest, error) { relativeURI := "/" + task.Path() if params := task.Params(); params != "" { relativeURI += "?" + params @@ -177,7 +148,7 @@ func (q *GCP) newTaskRequest(task Task, opts *Options) (*taskspb.CreateTaskReque // Tasks with the same ID that are created within a few hours of each other will be de-duplicated. // See https://cloud.google.com/tasks/docs/reference/rpc/google.cloud.tasks.v2#createtaskrequest // under "Task De-duplication". -func newTaskID(task Task) string { +func newTaskID(task gqueue.Task) string { name := task.Name() // Hash the path, params, and body of the task. hasher := sha256.New() @@ -210,76 +181,3 @@ func escapeTaskID(s string) string { } return b.String() } - -// InMemory is a Queue implementation that schedules in-process fetch -// operations. Unlike the GCP task queue, it will not automatically -// retry tasks on failure. -// -// This should only be used for local development and testing. -type InMemory struct { - queue chan Task - done chan struct{} - errs []error -} - -// NewInMemory creates a new InMemory that asynchronously schedules -// tasks and executes processFunc on them. It uses workerCount parallelism -// to accomplish this. -func NewInMemory(ctx context.Context, workerCount int, processFunc func(context.Context, Task) error) *InMemory { - q := &InMemory{ - queue: make(chan Task, 1000), - done: make(chan struct{}), - } - sem := make(chan struct{}, workerCount) - go func() { - for v := range q.queue { - select { - case <-ctx.Done(): - return - case sem <- struct{}{}: - } - - // If a worker is available, spawn a task in a - // goroutine and wait for it to finish. - go func(t Task) { - defer func() { <-sem }() - - fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) - defer cancel() - - if err := processFunc(fetchCtx, t); err != nil { - q.errs = append(q.errs, err) - } - }(v) - } - for i := 0; i < cap(sem); i++ { - select { - case <-ctx.Done(): - // If context is cancelled here, there is no way for us to - // do cleanup. We panic here since there is no other way to - // report an error. - panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err())) - case sem <- struct{}{}: - } - } - close(q.done) - }() - return q -} - -// Enqueue pushes a scan task into the local queue to be processed -// asynchronously. -func (q *InMemory) Enqueue(ctx context.Context, task Task, _ *Options) (bool, error) { - q.queue <- task - return true, nil -} - -// Wait waits for all queued requests to finish. -func (q *InMemory) Wait(ctx context.Context) { - close(q.queue) - <-q.done -} - -func (q *InMemory) Errors() []error { - return q.errs -} diff --git a/internal/gcp/queue/queue_test.go b/internal/gcp/queue/queue_test.go index 46e3e57..c3796b2 100644 --- a/internal/gcp/queue/queue_test.go +++ b/internal/gcp/queue/queue_test.go @@ -5,12 +5,11 @@ package queue import ( - "context" - "fmt" "testing" taskspb "cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb" "github.com/google/go-cmp/cmp" + gqueue "golang.org/x/oscar/internal/queue" "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/durationpb" ) @@ -48,7 +47,7 @@ func TestNewTaskID(t *testing.T) { } func TestNewTaskRequest(t *testing.T) { - m := &Metadata{ + m := &gqueue.Metadata{ Project: "Project", QueueName: "queueID", QueueURL: "http://1.2.3.4:8000", @@ -77,7 +76,7 @@ func TestNewTaskRequest(t *testing.T) { if err != nil { t.Fatal(err) } - opts := &Options{ + opts := &gqueue.Options{ TaskNameSuffix: "suf", } sreq := &testTask{ @@ -93,34 +92,3 @@ func TestNewTaskRequest(t *testing.T) { t.Errorf("mismatch (-want, +got):\n%s", diff) } } - -func TestInMemoryQueue(t *testing.T) { - t1 := &testTask{"name1", "path1", "params1"} - t2 := &testTask{"name2", "path2", "params2"} - t3 := &testTask{"", "path1", "params1"} - - process := func(_ context.Context, t Task) error { - if t.Name() == "" { - return fmt.Errorf("name not set for task with path %s", t.Path()) - } - return nil - } - - ctx := context.Background() - q := NewInMemory(ctx, 2, process) - q.Enqueue(ctx, t1, nil) - q.Enqueue(ctx, t2, nil) - q.Enqueue(ctx, t3, nil) - q.Wait(ctx) - - errs := q.Errors() - if len(errs) != 1 { - t.Fatalf("want 1 error; got %d", len(errs)) - } - - want := "name not set for task with path path1" - got := errs[0].Error() - if want != got { - t.Errorf("want '%s' as error message; got '%s'", want, got) - } -} diff --git a/internal/queue/queue.go b/internal/queue/queue.go new file mode 100644 index 0000000..8a5099b --- /dev/null +++ b/internal/queue/queue.go @@ -0,0 +1,117 @@ +// Copyright 2024 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package queue provides queue interface and an in-memory +// implementation that can be used for asynchronous scheduling +// of fetch actions. +package queue + +import ( + "context" + "fmt" + "time" +) + +// A Task can produce information needed for Cloud Tasks. +type Task interface { + Name() string // Human-readable string for the task. Need not be unique. + Path() string // URL path + Params() string // URL (escaped) query params +} + +// A Queue provides an interface for asynchronous scheduling of tasks. +type Queue interface { + // Enqueue enqueues a task request. + // It reports whether a new task was actually added to the queue. + Enqueue(context.Context, Task, *Options) (bool, error) +} + +// Options is used to provide option arguments for a task queue. +type Options struct { + // TaskNameSuffix is appended to the task name to force reprocessing of + // tasks that would normally be de-duplicated. + TaskNameSuffix string +} + +// Metadata is data needed to create a Cloud Task Queue. +type Metadata struct { + Project string // Name of the gaby project. + Location string // Location of the queue (e.g., us-central1). + QueueName string // Unique ID of the queue. + QueueURL string // URL of the Cloud Run service. + ServiceAccount string // Email of the service account associated with the project. +} + +// InMemory is a Queue implementation that schedules in-process fetch +// operations. Unlike the GCP task queue, it will not automatically +// retry tasks on failure. +// +// This should only be used for local development and testing. +type InMemory struct { + queue chan Task + done chan struct{} + errs []error +} + +// NewInMemory creates a new InMemory that asynchronously schedules +// tasks and executes processFunc on them. It uses workerCount parallelism +// to accomplish this. +func NewInMemory(ctx context.Context, workerCount int, processFunc func(context.Context, Task) error) *InMemory { + q := &InMemory{ + queue: make(chan Task, 1000), + done: make(chan struct{}), + } + sem := make(chan struct{}, workerCount) + go func() { + for v := range q.queue { + select { + case <-ctx.Done(): + return + case sem <- struct{}{}: + } + + // If a worker is available, spawn a task in a + // goroutine and wait for it to finish. + go func(t Task) { + defer func() { <-sem }() + + fetchCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + + if err := processFunc(fetchCtx, t); err != nil { + q.errs = append(q.errs, err) + } + }(v) + } + for i := 0; i < cap(sem); i++ { + select { + case <-ctx.Done(): + // If context is cancelled here, there is no way for us to + // do cleanup. We panic here since there is no other way to + // report an error. + panic(fmt.Sprintf("InMemory queue context done: %v", ctx.Err())) + case sem <- struct{}{}: + } + } + close(q.done) + }() + return q +} + +// Enqueue pushes a scan task into the local queue to be processed +// asynchronously. +func (q *InMemory) Enqueue(ctx context.Context, task Task, _ *Options) (bool, error) { + q.queue <- task + return true, nil +} + +// Wait waits for all queued requests to finish. +func (q *InMemory) Wait(ctx context.Context) { + close(q.queue) + <-q.done +} + +func (q *InMemory) Errors() []error { + return q.errs +} diff --git a/internal/queue/queue_test.go b/internal/queue/queue_test.go new file mode 100644 index 0000000..a10f283 --- /dev/null +++ b/internal/queue/queue_test.go @@ -0,0 +1,52 @@ +// Copyright 2024 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package queue + +import ( + "context" + "fmt" + "testing" +) + +type testTask struct { + name string + path string + params string +} + +func (t *testTask) Name() string { return t.name } +func (t *testTask) Path() string { return t.path } +func (t *testTask) Params() string { return t.params } + +func TestInMemoryQueue(t *testing.T) { + t1 := &testTask{"name1", "path1", "params1"} + t2 := &testTask{"name2", "path2", "params2"} + t3 := &testTask{"", "path1", "params1"} + + process := func(_ context.Context, t Task) error { + if t.Name() == "" { + return fmt.Errorf("name not set for task with path %s", t.Path()) + } + return nil + } + + ctx := context.Background() + q := NewInMemory(ctx, 2, process) + q.Enqueue(ctx, t1, nil) + q.Enqueue(ctx, t2, nil) + q.Enqueue(ctx, t3, nil) + q.Wait(ctx) + + errs := q.Errors() + if len(errs) != 1 { + t.Fatalf("want 1 error; got %d", len(errs)) + } + + want := "name not set for task with path path1" + got := errs[0].Error() + if want != got { + t.Errorf("want '%s' as error message; got '%s'", want, got) + } +}