Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Acknowledgment of Configuration Updates in Watcher Mode. #91 #95

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func New(
wholeConf any,
log log.Modular,
stats metrics.Type,
count *int,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than slowly adding more parameters here it might be worth replacing wholeConf with an abstraction that allows these other peices of information to be accessed, which we can expand over time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, Updated

opts ...OptFunc,
) (*Type, error) {
gMux := mux.NewRouter()
Expand Down Expand Up @@ -146,6 +147,10 @@ func New(
}
}

handleConfigAcknowledgement := func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "{\"success_reload_count\":\"%v\"}", *count)
}

if t.conf.DebugEndpoints {
t.RegisterEndpoint(
"/debug/config/json", "DEBUG: Returns the loaded config as JSON.",
Expand Down Expand Up @@ -200,6 +205,7 @@ func New(

t.RegisterEndpoint("/ping", "Ping me.", handlePing)
t.RegisterEndpoint("/version", "Returns the service version.", handleVersion)
t.RegisterEndpoint("/config/ack", "Returns the count of success watcher", handleConfigAcknowledgement)
t.RegisterEndpoint("/endpoints", "Returns this map of endpoints.", handleEndpoints)

// If we want to expose a stats endpoint we register the endpoints.
Expand Down
8 changes: 4 additions & 4 deletions internal/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestAPIEnableCORS(t *testing.T) {
conf.CORS.Enabled = true
conf.CORS.AllowedOrigins = []string{"*"}

s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop())
s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil)
require.NoError(t, err)

handler := s.Handler()
Expand All @@ -41,7 +41,7 @@ func TestAPIEnableCORSOrigins(t *testing.T) {
conf.CORS.Enabled = true
conf.CORS.AllowedOrigins = []string{"foo", "bar"}

s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop())
s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil)
require.NoError(t, err)

handler := s.Handler()
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestAPIEnableCORSNoHeaders(t *testing.T) {
conf := api.NewConfig()
conf.CORS.Enabled = true

_, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop())
_, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil)
require.Error(t, err)
assert.Contains(t, err.Error(), "must specify at least one allowed origin")
}
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestAPIBasicAuth(t *testing.T) {
conf.BasicAuth.PasswordHash = tc.correctPass
conf.BasicAuth.Salt = "EzrwNJYw2wkErVVV1P36FQ=="

s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop())
s, err := api.New("", "", conf, nil, log.Noop(), metrics.Noop(), nil)
if ok := tc.expectedErr(t, err); !(ok && err == nil) {
return
}
Expand Down
3 changes: 2 additions & 1 deletion internal/cli/common/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func CreateManager(
logger log.Modular,
streamsMode bool,
conf config.Type,
count *int,
mgrOpts ...manager.OptFunc,
) (stoppableMgr *StoppableManager, err error) {
var stats *metrics.Namespaced
Expand Down Expand Up @@ -88,7 +89,7 @@ func CreateManager(
}

var httpServer *api.Type
if httpServer, err = api.New(cliOpts.Version, cliOpts.DateBuilt, conf.HTTP, sanitNode, logger, stats); err != nil {
if httpServer, err = api.New(cliOpts.Version, cliOpts.DateBuilt, conf.HTTP, sanitNode, logger, stats, count); err != nil {
err = fmt.Errorf("failed to initialise API: %w", err)
return
}
Expand Down
17 changes: 11 additions & 6 deletions internal/cli/common/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error {
}

verLogger := logger.With("benthos_version", cliOpts.Version)

if mainPath == "" {
verLogger.Info("Running without a main config file")
} else if inferredMainPath {
Expand All @@ -73,8 +74,9 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error {
if strict && len(lints) > 0 {
return errors.New(cliOpts.ExecTemplate("shutting down due to linter errors, to prevent shutdown run {{.ProductName}} with --chilled"))
}

stoppableManager, err := CreateManager(c, cliOpts, logger, streamsMode, conf)
//Success Watcher Count Is Used to for to get count of the config which was updated with the watcher flag.
successReloadCount := 0
stoppableManager, err := CreateManager(c, cliOpts, logger, streamsMode, conf, &successReloadCount)
if err != nil {
return err
}
Expand All @@ -90,9 +92,10 @@ func RunService(c *cli.Context, cliOpts *CLIOpts, streamsMode bool) error {
watching := cliOpts.RootFlags.GetWatcher(c)
if streamsMode {
enableStreamsAPI := !c.Bool("no-api")
stoppableStream, err = initStreamsMode(cliOpts, strict, watching, enableStreamsAPI, confReader, stoppableManager.Manager())
stoppableStream, err = initStreamsMode(cliOpts, strict, watching, enableStreamsAPI, confReader, stoppableManager.Manager(), &successReloadCount)
} else {
stoppableStream, dataStreamClosedChan, err = initNormalMode(cliOpts, conf, strict, watching, confReader, stoppableManager.Manager())
logger.Info("InitMode Get Initiated... strict:%v", strict)
stoppableStream, dataStreamClosedChan, err = initNormalMode(cliOpts, conf, strict, watching, confReader, stoppableManager.Manager(), &successReloadCount)
}
if err != nil {
return err
Expand Down Expand Up @@ -133,6 +136,7 @@ func initStreamsMode(
strict, watching, enableAPI bool,
confReader *config.Reader,
mgr *manager.Type,
successReloadCount *int,
) (RunningStream, error) {
logger := mgr.Logger()
streamMgr := strmmgr.New(mgr, strmmgr.OptAPIEnabled(enableAPI))
Expand Down Expand Up @@ -181,7 +185,7 @@ func initStreamsMode(
}

if watching {
if err := confReader.BeginFileWatching(mgr, strict); err != nil {
if err := confReader.BeginFileWatching(mgr, strict, successReloadCount); err != nil {
return nil, fmt.Errorf("failed to create stream config watcher: %w", err)
}
}
Expand All @@ -194,6 +198,7 @@ func initNormalMode(
strict, watching bool,
confReader *config.Reader,
mgr *manager.Type,
successReloadCount *int,
) (newStream RunningStream, stoppedChan chan struct{}, err error) {
logger := mgr.Logger()

Expand Down Expand Up @@ -231,7 +236,7 @@ func initNormalMode(
}

if watching {
if err := confReader.BeginFileWatching(mgr, strict); err != nil {
if err := confReader.BeginFileWatching(mgr, strict, successReloadCount); err != nil {
return nil, nil, fmt.Errorf("failed to create config file watcher: %w", err)
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/cli/studio/pull_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (r *PullRunner) bootstrapConfigReader(ctx context.Context) (bootstrapErr er
tmpTracingSummary.SetEnabled(false)

stopMgrTmp, err := common.CreateManager(
r.cliContext, r.cliOpts, r.logger, false, conf,
r.cliContext, r.cliOpts, r.logger, false, conf, nil,
manager.OptSetEnvironment(tmpEnv),
manager.OptSetBloblangEnvironment(bloblEnv),
manager.OptSetFS(sessFS))
Expand Down Expand Up @@ -413,13 +413,13 @@ func (r *PullRunner) Sync(ctx context.Context) {
}
}
for _, res := range diff.AddResources {
if err := r.confReader.TriggerResourceUpdate(r.mgr, r.strictMode, res.Name); err != nil {
if err := r.confReader.TriggerResourceUpdate(r.mgr, r.strictMode, res.Name, nil); err != nil {
r.logger.Error("Failed to reflect resource file '%v' update: %v", res.Name, err)
runErr = err
}
}
if diff.MainConfig != nil {
if err := r.confReader.TriggerMainUpdate(r.mgr, r.strictMode, diff.MainConfig.Name); err != nil {
if err := r.confReader.TriggerMainUpdate(r.mgr, r.strictMode, diff.MainConfig.Name, nil); err != nil {
r.logger.Error("Failed to reflect main config file '%v' update: %v", diff.MainConfig.Name, err)
runErr = err
}
Expand Down
9 changes: 8 additions & 1 deletion internal/config/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (r *Reader) readMain(mainPath string) (conf Type, pConf *docs.ParsedConfig,
// TriggerMainUpdate attempts to re-read the main configuration file, trigger
// the provided main update func, and apply changes to resources to the provided
// manager as appropriate.
func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string) error {
func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPath string, successReloadCount *int) error {
conf, _, lints, err := r.readMain(newPath)
if errors.Is(err, fs.ErrNotExist) {
if r.mainPath != newPath {
Expand Down Expand Up @@ -416,6 +416,13 @@ func (r *Reader) TriggerMainUpdate(mgr bundle.NewManagement, strict bool, newPat
mgr.Logger().Error("Failed to apply updated config: %v", err)
return err
}

// Success Watcher Count denotes if the configuration in the benthos gets updated with the watcher
// flag then success watcher count gets increased
if successReloadCount != nil {
*successReloadCount = *successReloadCount + 1
mgr.Logger().Info("Success Reload Count: %v, For Normal Config", *successReloadCount)
}
mgr.Logger().Info("Updated main config")
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions internal/config/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ processor_resources:
assert.False(t, testMgr.ProbeProcessor("c"))
assert.False(t, testMgr.ProbeProcessor("d"))

require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "bar_main.yaml"))
require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "bar_main.yaml", nil))

// Wait for the config watcher to reload the config
select {
Expand Down Expand Up @@ -226,10 +226,10 @@ processor_resources:
return nil
}))

require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "a.yaml"))
require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "b.yaml"))
require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "a.yaml", nil))
require.NoError(t, rdr.TriggerResourceUpdate(testMgr, true, "b.yaml", nil))

require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "foo_main.yaml"))
require.NoError(t, rdr.TriggerMainUpdate(testMgr, true, "foo_main.yaml", nil))

assert.Equal(t, "fooin", conf.Input.Label)
assert.Equal(t, "fooout", conf.Output.Label)
Expand Down
7 changes: 6 additions & 1 deletion internal/config/resource_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (r *Reader) readResource(path string) (conf manager.ResourceConfig, lints [

// TriggerResourceUpdate attempts to re-read a resource configuration file and
// apply changes to the provided manager as appropriate.
func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string) error {
func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, path string, successReloadCount *int) error {
newResConf, lints, err := r.readResource(path)
if errors.Is(err, fs.ErrNotExist) {
return r.TriggerResourceDelete(mgr, path)
Expand Down Expand Up @@ -273,6 +273,11 @@ func (r *Reader) TriggerResourceUpdate(mgr bundle.NewManagement, strict bool, pa
}

r.resourceFileInfo[path] = newInfo

if successReloadCount != nil {
*successReloadCount = *successReloadCount + 1
mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *successReloadCount)
}
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions internal/config/resource_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ processor_resources:
// Watch for configuration changes.
testMgr, err := manager.New(conf.ResourceConfig)
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

tCtx, done := context.WithTimeout(context.Background(), time.Second*30)
defer done()
Expand Down Expand Up @@ -175,7 +175,7 @@ processor_resources:
// Watch for configuration changes.
testMgr, err := manager.New(conf.ResourceConfig)
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

tCtx, done := context.WithTimeout(context.Background(), time.Second*30)
defer done()
Expand Down
7 changes: 6 additions & 1 deletion internal/config/stream_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (r *Reader) findStreamPathWalkedDir(streamPath string) (dir string) {

// TriggerStreamUpdate attempts to re-read a stream configuration file, and
// trigger the provided stream update func.
func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string) error {
func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path string, successReloadCount *int) error {
if r.streamUpdateFn == nil {
return nil
}
Expand Down Expand Up @@ -236,5 +236,10 @@ func (r *Reader) TriggerStreamUpdate(mgr bundle.NewManagement, strict bool, path
return err
}
mgr.Logger().Info("Updated stream %v config from file.", info.id)

if successReloadCount != nil {
*successReloadCount = *successReloadCount + 1
mgr.Logger().Info("Success Reload Count: %v, For Stream Config", *successReloadCount)
}
return nil
}
11 changes: 7 additions & 4 deletions internal/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (r *Reader) modifiedSinceLastRead(name string) bool {
// WARNING: Either SubscribeConfigChanges or SubscribeStreamChanges must be
// called before this, as otherwise it is unsafe to register them during
// watching.
func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error {
func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool, successReloadCount *int) error {
if r.watcher != nil {
return errors.New("a file watcher has already been started")
}
Expand Down Expand Up @@ -102,9 +102,11 @@ func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error
}

refreshFiles := func() error {
mgr.Logger().Info("Inside the Refresh Files")
if !r.streamsMode && r.mainPath != "" {
if _, err := r.fs.Stat(r.mainPath); err == nil {
if err := addNotWatching([]string{r.mainPath}); err != nil {
mgr.Logger().Error("addNotWatching Error")
return err
}
}
Expand Down Expand Up @@ -173,13 +175,14 @@ func (r *Reader) BeginFileWatching(mgr bundle.NewManagement, strict bool) error
}
var succeeded bool
if nameClean == r.mainPath {
succeeded = !ShouldReread(r.TriggerMainUpdate(mgr, strict, r.mainPath))
succeeded = !ShouldReread(r.TriggerMainUpdate(mgr, strict, r.mainPath, successReloadCount))
} else if _, exists := r.streamFileInfo[nameClean]; exists {
succeeded = !ShouldReread(r.TriggerStreamUpdate(mgr, strict, nameClean))
succeeded = !ShouldReread(r.TriggerStreamUpdate(mgr, strict, nameClean, successReloadCount))
} else {
succeeded = !ShouldReread(r.TriggerResourceUpdate(mgr, strict, nameClean))
succeeded = !ShouldReread(r.TriggerResourceUpdate(mgr, strict, nameClean, successReloadCount))
}
if succeeded {
mgr.Logger().Info("This is the collaps changes %v", collapsedChanges)
delete(collapsedChanges, nameClean)
} else {
change.at = time.Now()
Expand Down
19 changes: 11 additions & 8 deletions internal/config/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ output:
// Watch for configuration changes
testMgr, err := manager.New(manager.ResourceConfig{})
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

// Overwrite original config
require.NoError(t, os.WriteFile(confFilePath, dummyConfig, 0o644))
Expand Down Expand Up @@ -91,16 +91,19 @@ output:

changeChan := make(chan struct{})
var updatedConf stream.Config
var once sync.Once
require.NoError(t, rdr.SubscribeConfigChanges(func(conf *Type) error {
updatedConf = conf.Config
close(changeChan)
once.Do(func() {
close(changeChan)
})
return nil
}))

// Watch for configuration changes
testMgr, err := manager.New(manager.ResourceConfig{})
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

// Create a new config folder and place in it a new copy of the config file
newConfDir := filepath.Join(rootDir, "config_new")
Expand Down Expand Up @@ -184,7 +187,7 @@ func TestReaderStreamDirectWatching(t *testing.T) {
// Watch for configuration changes
testMgr, err := manager.New(manager.ResourceConfig{})
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

require.NoError(t, os.WriteFile(confAPath, []byte(`output: { label: a2, drop: {} }`), 0o644))
require.NoError(t, os.WriteFile(confBPath, []byte(`output: { label: b2, drop: {} }`), 0o644))
Expand Down Expand Up @@ -268,7 +271,7 @@ func TestReaderStreamWildcardWatching(t *testing.T) {
// Watch for configuration changes
testMgr, err := manager.New(manager.ResourceConfig{})
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

require.NoError(t, os.WriteFile(confAPath, []byte(`output: { label: a2, drop: {} }`), 0o644))
require.NoError(t, os.WriteFile(confBPath, []byte(`output: { label: b2, drop: {} }`), 0o644))
Expand Down Expand Up @@ -352,7 +355,7 @@ func TestReaderStreamDirWatching(t *testing.T) {
// Watch for configuration changes
testMgr, err := manager.New(manager.ResourceConfig{})
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

require.NoError(t, os.WriteFile(confAPath, []byte(`output: { label: a2, drop: {} }`), 0o644))
require.NoError(t, os.WriteFile(confBPath, []byte(`output: { label: b2, drop: {} }`), 0o644))
Expand Down Expand Up @@ -443,7 +446,7 @@ func TestReaderWatcherRace(t *testing.T) {
// Watch for configuration changes
testMgr, err := manager.New(manager.ResourceConfig{})
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

for i := 0; i < 2; i++ {
// Wait for the config watcher to reload each config
Expand Down Expand Up @@ -523,7 +526,7 @@ processor_resources:
// Watch for configuration changes
testMgr, err := manager.New(manager.ResourceConfig{})
require.NoError(t, err)
require.NoError(t, rdr.BeginFileWatching(testMgr, true))
require.NoError(t, rdr.BeginFileWatching(testMgr, true, nil))

require.NoError(t, os.WriteFile(confAPath, procConfig("a", "a2"), 0o644))
require.NoError(t, os.WriteFile(confBPath, procConfig("b", "b2"), 0o644))
Expand Down
Loading