From c9bcb849dbda9b413bef2adf60e7933abc1f9c8c Mon Sep 17 00:00:00 2001 From: Denis Rechkunov Date: Mon, 27 Jan 2025 19:55:48 +0100 Subject: [PATCH] Add simplified integration framework That can be used for running Beats binaries in integration tests. This framework has a simplified API comparing to the existing one and uses a more efficient way to search for logs in the output of the command. --- filebeat/testing/integration/integration.go | 72 +++++ filebeat/testing/integration/log_generator.go | 138 ++++++++ filebeat/testing/integration/sample_test.go | 135 ++++++++ libbeat/testing/integration/integration.go | 306 ++++++++++++++++++ libbeat/testing/integration/output_watcher.go | 187 +++++++++++ libbeat/testing/integration/run_beat.go | 244 ++++++++++++++ libbeat/testing/integration/sample_test.go | 93 ++++++ 7 files changed, 1175 insertions(+) create mode 100644 filebeat/testing/integration/integration.go create mode 100644 filebeat/testing/integration/log_generator.go create mode 100644 filebeat/testing/integration/sample_test.go create mode 100644 libbeat/testing/integration/integration.go create mode 100644 libbeat/testing/integration/output_watcher.go create mode 100644 libbeat/testing/integration/run_beat.go create mode 100644 libbeat/testing/integration/sample_test.go diff --git a/filebeat/testing/integration/integration.go b/filebeat/testing/integration/integration.go new file mode 100644 index 00000000000..57caf967a4a --- /dev/null +++ b/filebeat/testing/integration/integration.go @@ -0,0 +1,72 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "fmt" + "testing" + + "github.com/elastic/beats/v7/libbeat/testing/integration" +) + +// Test describes all operations for testing Filebeat +// +// Due to interface composition all Filebeat-specific functions +// must be used first in the call-chain. +type Test interface { + integration.BeatTest + // ExpectEOF sets an expectation that Filebeat will read the given + // files to EOF. + ExpectEOF(...string) Test +} + +// TestOptions describes all available options for the test. +type TestOptions struct { + // Config for the Beat written in YAML + Config string + // Args sets additional arguments to pass when running the binary. + Args []string +} + +// NewTest creates a new integration test for Filebeat. +func NewTest(t *testing.T, opts TestOptions) Test { + return &test{ + BeatTest: integration.NewBeatTest(t, integration.BeatTestOptions{ + RunBeatOptions: integration.RunBeatOptions{ + Beatname: "filebeat", + Config: opts.Config, + Args: opts.Args, + }, + }), + } +} + +type test struct { + integration.BeatTest +} + +// ExpectEOF implements the Test interface. +func (fbt *test) ExpectEOF(files ...string) Test { + // Ensuring we completely ingest every file + for _, filename := range files { + line := fmt.Sprintf("End of file reached: %s; Backoff now.", filename) + fbt.ExpectOutput(line) + } + + return fbt +} diff --git a/filebeat/testing/integration/log_generator.go b/filebeat/testing/integration/log_generator.go new file mode 100644 index 00000000000..d4961c76302 --- /dev/null +++ b/filebeat/testing/integration/log_generator.go @@ -0,0 +1,138 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "testing" + + uuid "github.com/gofrs/uuid/v5" +) + +// LogGenerator used for generating log files +type LogGenerator interface { + // GenerateLine generates a single line for a log file. + // Expected no new line character at the end. + GenerateLine(filename string, index int) string + // FileExtension sets the extension of the file where lines are written. + FileExtension() string +} + +// NewPlainTextGenerator creates is a simple plain text generator. +// +// It's using the given message prefix following by the filename +// and the line number, e.g. `filename:128` +func NewPlainTextGenerator(prefix string) LogGenerator { + return plainTextGenerator{ + prefix: prefix, + } +} + +type plainTextGenerator struct { + prefix string +} + +func (g plainTextGenerator) GenerateLine(filename string, index int) string { + return fmt.Sprintf("%s %s:%d", g.prefix, filepath.Base(filename), index) +} + +func (g plainTextGenerator) FileExtension() string { + return ".log" +} + +// JSONLineGenerator creates a JSON log line generator. +// Forms a JSON object with a message +// prefixed by the given prefix and followed by the filename +// and the line number, e.g. `filename:128` +func NewJSONGenerator(prefix string) LogGenerator { + return jsonGenerator{ + prefix: prefix, + } +} + +type jsonGenerator struct { + prefix string +} + +func (g jsonGenerator) GenerateLine(filename string, index int) string { + message := fmt.Sprintf("%s %s:%d", g.prefix, filepath.Base(filename), index) + + line := struct{ Message string }{Message: message} + bytes, _ := json.Marshal(line) + return string(bytes) +} + +func (g jsonGenerator) FileExtension() string { + return ".ndjson" +} + +// GenerateLogFiles generate given amount of files with given +// amount of lines in them. +// +// Returns the path value to put in the Filebeat configuration and +// filenames for all created files. +func GenerateLogFiles(t *testing.T, files, lines int, generator LogGenerator) (path string, filenames []string) { + t.Helper() + t.Logf("generating %d log files with %d lines each...", files, lines) + logsPath := filepath.Join(t.TempDir(), "logs") + err := os.MkdirAll(logsPath, 0777) + if err != nil { + t.Fatalf("failed to create a directory for logs %q: %s", logsPath, err) + return "", nil + } + + filenames = make([]string, 0, files) + for i := 0; i < files; i++ { + id, err := uuid.NewV4() + if err != nil { + t.Fatalf("failed to generate a unique filename: %s", err) + return "", nil + } + filename := filepath.Join(logsPath, id.String()+generator.FileExtension()) + filenames = append(filenames, filename) + GenerateLogFile(t, filename, lines, generator) + } + + t.Logf("finished generating %d log files with %d lines each", files, lines) + + return filepath.Join(logsPath, "*"+generator.FileExtension()), filenames +} + +// GenerateLogFile generates a single log file with the given full +// filename, amount of lines using the given generator +// to create each line. +func GenerateLogFile(t *testing.T, filename string, lines int, generator LogGenerator) { + t.Helper() + file, err := os.Create(filename) + if err != nil { + t.Fatalf("failed to create a log file: %q", filename) + return + } + defer file.Close() + for i := 1; i <= lines; i++ { + line := generator.GenerateLine(filename, i) + "\n" + _, err := file.WriteString(line) + if err != nil { + t.Fatalf("cannot write a generated log line to %s", filename) + return + } + } +} diff --git a/filebeat/testing/integration/sample_test.go b/filebeat/testing/integration/sample_test.go new file mode 100644 index 00000000000..69da968cb52 --- /dev/null +++ b/filebeat/testing/integration/sample_test.go @@ -0,0 +1,135 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "context" + "fmt" + "path/filepath" + "testing" + "time" + + "github.com/elastic/beats/v7/libbeat/testing/integration" +) + +func TestFilebeat(t *testing.T) { + messagePrefix := "sample test message" + fileCount := 5 + lineCount := 128 + + reportOptions := integration.ReportOptions{ + PrintLinesOnFail: 10, + } + + t.Run("Filebeat starts and ingests files", func(t *testing.T) { + configTemplate := ` +filebeat.inputs: + - type: filestream + id: "test-filestream" + paths: + - %s +# we want to check that all messages are ingested +# without using an external service, this is an easy way +output.console: + enabled: true +` + // we can generate any amount of expectations + // they are light-weight + expectIngestedFiles := func(test Test, files []string) { + // ensuring we ingest every line from every file + for _, filename := range files { + for i := 1; i <= lineCount; i++ { + line := fmt.Sprintf("%s %s:%d", messagePrefix, filepath.Base(filename), i) + test.ExpectOutput(line) + } + } + } + + t.Run("plain text files", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + generator := NewPlainTextGenerator(messagePrefix) + path, files := GenerateLogFiles(t, fileCount, lineCount, generator) + config := fmt.Sprintf(configTemplate, path) + test := NewTest(t, TestOptions{ + Config: config, + }) + + expectIngestedFiles(test, files) + + test. + // we expect to read all generated files to EOF + ExpectEOF(files...). + WithReportOptions(reportOptions). + // we should observe the start message of the Beat + ExpectStart(). + // check that the first and the last line of the file get ingested + Start(ctx). + // wait until all the expectations are met + // or we hit the timeout set by the context + Wait() + }) + + t.Run("JSON files", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + generator := NewJSONGenerator(messagePrefix) + path, files := GenerateLogFiles(t, fileCount, lineCount, generator) + config := fmt.Sprintf(configTemplate, path) + test := NewTest(t, TestOptions{ + Config: config, + }) + + expectIngestedFiles(test, files) + + test. + ExpectEOF(files...). + WithReportOptions(reportOptions). + ExpectStart(). + Start(ctx). + Wait() + }) + }) + + t.Run("Filebeat crashes due to incorrect config", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // path items are required, this config is invalid + config := ` +filebeat.inputs: + - type: filestream + id: "test-filestream" +output.console: + enabled: true +` + test := NewTest(t, TestOptions{ + Config: config, + }) + + test. + WithReportOptions(reportOptions). + ExpectStart(). + ExpectOutput("Exiting: Failed to start crawler: starting input failed: error while initializing input: no path is configured"). + ExpectStop(1). + Start(ctx). + Wait() + }) +} diff --git a/libbeat/testing/integration/integration.go b/libbeat/testing/integration/integration.go new file mode 100644 index 00000000000..706d0a9790c --- /dev/null +++ b/libbeat/testing/integration/integration.go @@ -0,0 +1,306 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "context" + "errors" + "fmt" + "os/exec" + "regexp" + "sync" + "testing" +) + +const ( + expectErrMsg = "cannot set expectations once the test started" +) + +// BeatTest describes all operations involved +// in integration testing of a Beat +type BeatTest interface { + // Start the integration test + // + // The test runs until all the expectations are met (unless `ExpectStop` is used) or context was canceled or the Beat exits on its own. + Start(context.Context) BeatTest + + // Wait until the test is over. + // + // `PrintOutput` might be helpful for debugging after calling this function. + Wait() + + // ExpectStart sets an expectation that the Beat will report that it started. + ExpectStart() BeatTest + + // ExpectStop sets an expectation that the Beat will exit by itself. + // The process exit code will be checked against the given value. + // + // User controls the timeout by passing the context in `Start`. + // + // All the output expectations would still work as usual, however, + // satisfying all expectations would not stop the Beat. + ExpectStop(exitCode int) BeatTest + + // ExpectOutput registers an output watch for the given substrings. + // + // Every future output line produced by the Beat will be checked + // if it contains one of the given strings. + // + // If given multiple strings, they get checked in order: + // The first substring must be found first, then second, etc. + // + // For `AND` behavior use this function multiple times. + // + // This function should be used before `Start` because it's + // inspecting only the new output lines. + ExpectOutput(...string) BeatTest + + // ExpectOutputRegex registers an output watch for the given regular expression.. + // + // Every future output line produced by the Beat will be matched + // against the given regular expression. + // + // If given multiple expressions, they get checked in order. + // The first expression must match first, then second, etc. + // + // For `AND` behavior use this function multiple times. + // + // This function should be used before `Start` because it's + // inspecting only new outputs. + ExpectOutputRegex(...regexp.Regexp) BeatTest + + // PrintOutput prints last `limit` lines of the output + // + // It might be handy for inspecting the output in case of a failure. + // Use `limit=-1` to print the entire output (strongly discouraged). + // + // JSON lines of the output are formatted. + PrintOutput(lineCount int) + + // PrintExpectations prints all currently set expectations + PrintExpectations() + + // WithReportOptions sets the reporting options for the test. + WithReportOptions(ReportOptions) BeatTest +} + +// ReportOptions describes all reporting options +type ReportOptions struct { + // PrintExpectationsBeforeStart if set to `true`, all the defined + // expectations will be printed before the test starts. + // + // Use it only if you have a manageable amount of expectations that + // would be readable in the output. + PrintExpectationsBeforeStart bool + + // PrintLinesOnFail defines how many lines of the Beat output + // the test should print in case of failure (default 0). + // + // It uses `PrintOutput`, see its documentation for details. + PrintLinesOnFail int +} + +// BeatTestOptions describes all options to run the test +type BeatTestOptions struct { + RunBeatOptions +} + +// NewBeatTest creates a new integration test for a Beat. +func NewBeatTest(t *testing.T, opts BeatTestOptions) BeatTest { + test := &beatTest{ + t: t, + opts: opts, + } + + return test +} + +type beatTest struct { + t *testing.T + opts BeatTestOptions + reportOpts ReportOptions + expectations []OutputWatcher + expectedExitCode *int + beat *RunningBeat + mtx sync.Mutex +} + +// Start implements the BeatTest interface. +func (b *beatTest) Start(ctx context.Context) BeatTest { + b.mtx.Lock() + defer b.mtx.Unlock() + if b.beat != nil { + b.t.Fatal("test cannot be startd multiple times") + return b + } + watcher := NewOverallWatcher(b.expectations) + b.t.Logf("running %s integration test...", b.opts.Beatname) + if b.reportOpts.PrintExpectationsBeforeStart { + b.printExpectations() + } + b.beat = RunBeat(ctx, b.t, b.opts.RunBeatOptions, watcher) + + return b +} + +// Wait implements the BeatTest interface. +func (b *beatTest) Wait() { + b.mtx.Lock() + defer b.mtx.Unlock() + + if b.beat == nil { + b.t.Fatal("test must start first before calling wait on it") + return + } + + err := b.beat.c.Wait() + exitErr := &exec.ExitError{} + if !errors.As(err, &exitErr) { + b.t.Fatalf("unexpected error when stopping %s: %s", b.opts.Beatname, err) + return + } + + exitCode := 0 + if err != nil { + exitCode = exitErr.ExitCode() + } + b.t.Logf("%s stopped, exit code %d", b.opts.Beatname, exitCode) + + if b.expectedExitCode != nil && exitCode != *b.expectedExitCode { + b.t.Cleanup(func() { + b.t.Logf("expected exit code %d, actual %d", b.expectedExitCode, exitCode) + }) + + b.t.Fail() + } + + if b.beat.watcher != nil { + b.t.Cleanup(func() { + b.t.Logf("\n\nExpectations are not met:\n\n%s\n\n", b.beat.watcher.String()) + if b.reportOpts.PrintLinesOnFail != 0 { + b.PrintOutput(b.reportOpts.PrintLinesOnFail) + } + }) + b.t.Fail() + } +} + +// ExpectOutput implements the BeatTest interface. +func (b *beatTest) ExpectOutput(lines ...string) BeatTest { + b.mtx.Lock() + defer b.mtx.Unlock() + + if b.beat != nil { + b.t.Fatal(expectErrMsg) + return b + } + watchers := make([]OutputWatcher, 0, len(lines)) + for _, l := range lines { + watchers = append(watchers, NewStringWatcher(l)) + } + b.expectations = append(b.expectations, NewInOrderWatcher(watchers)) + return b +} + +// ExpectOutputRegex implements the BeatTest interface. +func (b *beatTest) ExpectOutputRegex(exprs ...regexp.Regexp) BeatTest { + b.mtx.Lock() + defer b.mtx.Unlock() + + if b.beat != nil { + b.t.Fatal(expectErrMsg) + return b + } + + watchers := make([]OutputWatcher, 0, len(exprs)) + for _, e := range exprs { + watchers = append(watchers, NewRegexpWatcher(e)) + } + b.expectations = append(b.expectations, NewInOrderWatcher(watchers)) + + return b +} + +// ExpectStart implements the BeatTest interface. +func (b *beatTest) ExpectStart() BeatTest { + b.mtx.Lock() + defer b.mtx.Unlock() + + if b.beat != nil { + b.t.Fatal(expectErrMsg) + return b + } + + expectedLine := fmt.Sprintf("%s start running.", b.opts.Beatname) + b.expectations = append(b.expectations, NewStringWatcher(expectedLine)) + return b +} + +// ExpectStop implements the BeatTest interface. +func (b *beatTest) ExpectStop(exitCode int) BeatTest { + b.mtx.Lock() + defer b.mtx.Unlock() + + if b.beat != nil { + b.t.Fatal(expectErrMsg) + return b + } + + b.opts.KeepRunning = true + b.expectedExitCode = &exitCode + return b +} + +// PrintOutput implements the BeatTest interface. +func (b *beatTest) PrintOutput(lineCount int) { + b.mtx.Lock() + defer b.mtx.Unlock() + + if b.beat == nil { + return + } + + b.t.Logf("\n\nLast %d lines of the output:\n\n%s\n\n", lineCount, b.beat.CollectOutput(lineCount)) +} + +// WithReportOptions implements the BeatTest interface. +func (b *beatTest) WithReportOptions(opts ReportOptions) BeatTest { + b.mtx.Lock() + defer b.mtx.Unlock() + + b.reportOpts = opts + return b +} + +// PrintExpectations implements the BeatTest interface. +func (b *beatTest) PrintExpectations() { + b.mtx.Lock() + defer b.mtx.Unlock() + b.printExpectations() +} + +// lock-free, so it can be used inside a lock +func (b *beatTest) printExpectations() { + overall := NewOverallWatcher(b.expectations) + b.t.Logf("set expectations:\n%s", overall) + if b.expectedExitCode != nil { + b.t.Logf("\nprocess expected to exit with code %d\n\n", *b.expectedExitCode) + } else { + b.t.Log("\nprocess expected to be killed once expectations are met\n\n") + } +} diff --git a/libbeat/testing/integration/output_watcher.go b/libbeat/testing/integration/output_watcher.go new file mode 100644 index 00000000000..137b5176677 --- /dev/null +++ b/libbeat/testing/integration/output_watcher.go @@ -0,0 +1,187 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "fmt" + "regexp" + "strings" +) + +// OutputWatcher describes operations for watching output. +type OutputWatcher interface { + // Inspect the line of the output and adjust the state accordingly. + Inspect(string) + // Observed is `true` if every expected output has been observed. + Observed() bool + // String is the string representation of the current state. + // Describes what output is still expected. + String() string +} + +// NewStringWatcher creates a new output watcher that watches for a +// substring. +// +// The given string must be a substring of an output line +// to be marked as observed. +func NewStringWatcher(str string) OutputWatcher { + return &stringWatcher{ + expecting: &str, + } +} + +type stringWatcher struct { + expecting *string +} + +func (w *stringWatcher) Inspect(line string) { + if w.Observed() { + return + } + if strings.Contains(line, *w.expecting) { + w.expecting = nil + return + } +} + +func (w *stringWatcher) Observed() bool { + return w.expecting == nil +} + +func (w *stringWatcher) String() string { + if w.Observed() { + return "" + } + return fmt.Sprintf("to have a substring %q", *w.expecting) +} + +// NewRegexpWatcher create a new output watcher that watches for an +// output line to match the given regular expression. +func NewRegexpWatcher(expr regexp.Regexp) OutputWatcher { + return ®expWatcher{ + expecting: &expr, + } +} + +type regexpWatcher struct { + expecting *regexp.Regexp +} + +func (w *regexpWatcher) Inspect(line string) { + if w.Observed() { + return + } + if w.expecting.MatchString(line) { + w.expecting = nil + } +} + +func (w *regexpWatcher) Observed() bool { + return w.expecting == nil +} + +func (w *regexpWatcher) String() string { + if w.Observed() { + return "" + } + return fmt.Sprintf("to match %s", w.expecting.String()) +} + +// NewInOrderWatcher creates a watcher that makes sure that the first +// watcher has `Observed() == true` then it moves on to the second, +// then third, etc. +// +// Reports overall state of all watchers on the list. +func NewInOrderWatcher(watchers []OutputWatcher) OutputWatcher { + return &inOrderWatcher{ + watchers: watchers, + } +} + +type inOrderWatcher struct { + watchers []OutputWatcher +} + +func (w *inOrderWatcher) Inspect(line string) { + if w.Observed() { + return + } + w.watchers[0].Inspect(line) + if w.watchers[0].Observed() { + w.watchers = w.watchers[1:] + return + } +} + +func (w *inOrderWatcher) Observed() bool { + return len(w.watchers) == 0 +} + +func (w *inOrderWatcher) String() string { + if w.Observed() { + return "" + } + + expectations := make([]string, 0, len(w.watchers)) + for _, watcher := range w.watchers { + expectations = append(expectations, watcher.String()) + } + return strings.Join(expectations, " -> ") +} + +// NewOverallWatcher creates a watcher that reports an overall state +// of the list of other watchers. +// +// It's state marked as observed when all the nested watchers have +// `Observed() == true`. +func NewOverallWatcher(watchers []OutputWatcher) OutputWatcher { + return &metaWatcher{ + active: watchers, + } +} + +type metaWatcher struct { + active []OutputWatcher +} + +func (w *metaWatcher) Inspect(line string) { + var active []OutputWatcher + for _, watcher := range w.active { + watcher.Inspect(line) + if !watcher.Observed() { + active = append(active, watcher) + } + } + w.active = active +} + +func (w *metaWatcher) Observed() bool { + return len(w.active) == 0 +} + +func (w *metaWatcher) String() string { + if w.Observed() { + return "" + } + + expectations := make([]string, 0, len(w.active)) + for _, watcher := range w.active { + expectations = append(expectations, watcher.String()) + } + return " * " + strings.Join(expectations, "\n * ") +} diff --git a/libbeat/testing/integration/run_beat.go b/libbeat/testing/integration/run_beat.go new file mode 100644 index 00000000000..f1a3be6474f --- /dev/null +++ b/libbeat/testing/integration/run_beat.go @@ -0,0 +1,244 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "testing" +) + +var ( + compiling sync.Mutex +) + +// RunningBeat describes the running Beat binary. +type RunningBeat struct { + c *exec.Cmd + outputRW sync.RWMutex + output []string + watcher OutputWatcher + keepRunning bool +} + +// CollectOutput returns the last `limit` lines of the currently +// accumulated output. +// `limit=-1` returns the entire output from the beginning. +func (b *RunningBeat) CollectOutput(limit int) string { + b.outputRW.RLock() + defer b.outputRW.RUnlock() + if limit < 0 { + limit = len(b.output) + } + + builder := strings.Builder{} + output := b.output + if len(output) > limit { + output = output[len(output)-limit:] + } + + m := make(map[string]any) + for i, l := range output { + err := json.Unmarshal([]byte(l), &m) + if err != nil { + builder.WriteString(l) + } else { + pretty, _ := json.MarshalIndent(m, "", " ") + builder.Write(pretty) + } + if i < len(output)-1 { + builder.WriteByte('\n') + } + } + + return builder.String() +} + +func (b *RunningBeat) writeOutputLine(line string) { + b.outputRW.Lock() + defer b.outputRW.Unlock() + + b.output = append(b.output, line) + + if b.watcher == nil { + return + } + + b.watcher.Inspect(line) + if b.watcher.Observed() { + if !b.keepRunning { + _ = b.c.Process.Kill() + } + b.watcher = nil + } +} + +// RunBeatOptions describes the options for running a Beat +type RunBeatOptions struct { + // Beatname, for example "filebeat". + Beatname string + // Config for the Beat written in YAML + Config string + // Args sets additional arguments to pass when running the binary. + Args []string + // KeepRunning if set to `true` observing all + // the expected output would not kill the process. + // + // In this case user controls the runtime through the context + // passed in `RunBeat`. + KeepRunning bool +} + +// Runs a Beat binary with the given config and args. +// Returns a `RunningBeat` that allow to collect the output and wait until the exit. +func RunBeat(ctx context.Context, t *testing.T, opts RunBeatOptions, watcher OutputWatcher) *RunningBeat { + t.Logf("preparing to run %s...", opts.Beatname) + + binaryFilename := ensureCompiled(ctx, t, opts) + + // create a temporary Beat config + cfgPath := filepath.Join(t.TempDir(), fmt.Sprintf("%s.yml", opts.Beatname)) + homePath := filepath.Join(t.TempDir(), "home") + + err := os.WriteFile(cfgPath, []byte(opts.Config), 0777) + if err != nil { + t.Fatalf("failed to create a temporary config file: %s", err) + return nil + } + t.Logf("temporary config has been created at %s", cfgPath) + + // compute the args for execution + baseArgs := []string{ + // logging to stderr instead of log files + "-e", + "-c", cfgPath, + // we want all the logs + "-E", "logging.level=debug", + // so we can run multiple Beats at the same time + "--path.home", homePath, + } + execArgs := make([]string, 0, len(baseArgs)+len(opts.Args)) + execArgs = append(execArgs, baseArgs...) + execArgs = append(execArgs, opts.Args...) + + t.Logf("running %s %s", binaryFilename, strings.Join(execArgs, " ")) + c := exec.CommandContext(ctx, binaryFilename, execArgs...) + + output, err := c.StdoutPipe() + if err != nil { + t.Fatalf("failed to create the stdout pipe: %s", err) + return nil + } + c.Stderr = c.Stdout + + b := &RunningBeat{ + c: c, + watcher: watcher, + keepRunning: opts.KeepRunning, + } + + go func() { + scanner := bufio.NewScanner(output) + for scanner.Scan() { + b.writeOutputLine(scanner.Text()) + } + }() + + err = c.Start() + if err != nil { + t.Fatalf("failed to start Filebeat command: %s", err) + return nil + } + + t.Logf("%s is running (pid: %d)", binaryFilename, c.Process.Pid) + + return b +} + +// ensureCompiled ensures that the given Beat is compiled and ready +// to run. +func ensureCompiled(ctx context.Context, t *testing.T, opts RunBeatOptions) (path string) { + compiling.Lock() + defer compiling.Unlock() + + t.Logf("ensuring the %s binary is available...", opts.Beatname) + + baseDir := findBeatDir(t, opts.Beatname) + t.Logf("found %s directory at %s", opts.Beatname, baseDir) + + binaryFilename := filepath.Join(baseDir, opts.Beatname) + _, err := os.Stat(binaryFilename) + if err == nil { + t.Logf("found existing %s binary at %s", opts.Beatname, binaryFilename) + return binaryFilename + } + + if !errors.Is(err, os.ErrNotExist) { + t.Fatalf("failed to check for compiled binary %s: %s", binaryFilename, err) + return "" + } + + args := []string{"build"} + t.Logf("existing %s binary not found, building with \"mage %s\"... ", binaryFilename, strings.Join(args, " ")) + c := exec.CommandContext(ctx, "mage", args...) + c.Dir = baseDir + output, err := c.CombinedOutput() + if err != nil { + t.Fatalf("failed to build %s binary: %s\n%s", opts.Beatname, err, output) + return "" + } + + _, err = os.Stat(binaryFilename) + if err == nil { + t.Logf("%s binary has been successfully built ", binaryFilename) + return binaryFilename + } + if !errors.Is(err, os.ErrNotExist) { + t.Fatalf("building command for binary %s succeeded but the binary was not created: %s", binaryFilename, err) + return "" + } + + return "" +} + +func findBeatDir(t *testing.T, beatName string) string { + pwd, err := os.Getwd() + if err != nil { + t.Fatalf("failed to get the working directory: %s", err) + return "" + } + t.Logf("searching for the %s directory, starting with %s...", beatName, pwd) + for pwd != "" { + stat, err := os.Stat(filepath.Join(pwd, beatName)) + if errors.Is(err, os.ErrNotExist) || !stat.IsDir() { + pwd = filepath.Dir(pwd) + continue + } + return filepath.Join(pwd, beatName) + } + t.Fatalf("could not find the %s base directory", beatName) + return "" +} diff --git a/libbeat/testing/integration/sample_test.go b/libbeat/testing/integration/sample_test.go new file mode 100644 index 00000000000..62fd7353c97 --- /dev/null +++ b/libbeat/testing/integration/sample_test.go @@ -0,0 +1,93 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration + +import ( + "context" + "testing" + "time" +) + +func TestFilebeat(t *testing.T) { + reportOptions := ReportOptions{ + PrintExpectationsBeforeStart: true, + // last 10 output lines would suffice + PrintLinesOnFail: 10, + } + + t.Run("Filebeat starts", func(t *testing.T) { + config := ` +filebeat.inputs: + - type: filestream + id: "test-filestream" + paths: + - /var/log/*.log +# we want to check that all messages are ingested +# without using an external service, this is an easy way +output.console: + enabled: true +` + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + test := NewBeatTest(t, BeatTestOptions{ + RunBeatOptions: RunBeatOptions{ + Beatname: "filebeat", + Config: config, + }, + }) + + test. + WithReportOptions(reportOptions). + // we should observe the start message of the Beat + ExpectStart(). + // check that the first and the last line of the file get ingested + Start(ctx). + // wait until all the expectations are met + // or we hit the timeout set by the context + Wait() + }) + + t.Run("Filebeat crashes due to incorrect config", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // path items are required, this config is invalid + config := ` +filebeat.inputs: + - type: filestream + id: "test-filestream" +output.console: + enabled: true +` + test := NewBeatTest(t, BeatTestOptions{ + RunBeatOptions: RunBeatOptions{ + Beatname: "filebeat", + Config: config, + }, + }) + + test. + WithReportOptions(reportOptions). + ExpectStart(). + ExpectOutput("Exiting: Failed to start crawler: starting input failed: error while initializing input: no path is configured"). + ExpectStop(1). + Start(ctx). + Wait() + }) +}