Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACK-2416] Add read-only mode for services that doesn't create indexes. #691

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions auth/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ var _ = Describe("Client", func() {
config.Config.UserAgent = testHttp.NewUserAgent()
config.Config.ServiceSecret = authTest.NewServiceSecret()
config.ExternalConfig.Address = testHttp.NewAddress()
config.ExternalConfig.PathPrefix = "auth"
config.ExternalConfig.UserAgent = testHttp.NewUserAgent()
config.ExternalConfig.ServerSessionTokenSecret = serverTokenSecret
config.ExternalConfig.ServerSessionTokenTimeout = time.Duration(serverTokenTimeout) * time.Second
Expand Down Expand Up @@ -115,6 +116,7 @@ var _ = Describe("Client", func() {
config.Config.UserAgent = testHttp.NewUserAgent()
config.Config.ServiceSecret = authTest.NewServiceSecret()
config.ExternalConfig.Address = server.URL()
config.ExternalConfig.PathPrefix = "auth"
config.ExternalConfig.UserAgent = testHttp.NewUserAgent()
config.ExternalConfig.ServerSessionTokenSecret = serverTokenSecret
authorizeAs = platform.AuthorizeAsService
Expand Down
8 changes: 6 additions & 2 deletions auth/client/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type ExternalConfig struct {
*platform.Config
ServerSessionTokenSecret string `envconfig:"TIDEPOOL_AUTH_CLIENT_EXTERNAL_SERVER_SESSION_TOKEN_SECRET"`
ServerSessionTokenTimeout time.Duration `envconfig:"TIDEPOOL_AUTH_CLIENT_EXTERNAL_SERVER_SESSION_TOKEN_TIMEOUT" default:"1h"`
// PathPrefix to prepend to the path of any service calls (if any).
PathPrefix string `envconfig:"TIDEPOOL_AUTH_CLIENT_EXTERNAL_PATH_PREFIX" default:"auth"`
}

func NewExternalConfig() *ExternalConfig {
Expand Down Expand Up @@ -107,6 +109,7 @@ type External struct {
serverSessionTokenMutex sync.Mutex
serverSessionTokenSafe string
closingChannel chan chan bool
PathPrefix string // PathPrefix is the prefix to prepend to all external URL path calls to the auth service (if any)
}

func NewExternal(cfg *ExternalConfig, authorizeAs platform.AuthorizeAs, name string, lgr log.Logger) (*External, error) {
Expand Down Expand Up @@ -135,6 +138,7 @@ func NewExternal(cfg *ExternalConfig, authorizeAs platform.AuthorizeAs, name str
name: name,
serverSessionTokenSecret: cfg.ServerSessionTokenSecret,
serverSessionTokenTimeout: cfg.ServerSessionTokenTimeout,
PathPrefix: cfg.PathPrefix,
}, nil
}

Expand Down Expand Up @@ -200,7 +204,7 @@ func (e *External) ValidateSessionToken(ctx context.Context, token string) (requ
IsServer bool
UserID string
}
if err := e.client.RequestData(ctx, "GET", e.client.ConstructURL("auth", "token", token), nil, nil, &result); err != nil {
if err := e.client.RequestData(ctx, "GET", e.client.ConstructURL(e.PathPrefix, "token", token), nil, nil, &result); err != nil {
return nil, err
}

Expand Down Expand Up @@ -329,7 +333,7 @@ func (e *External) refreshServerSessionToken() error {
e.logger.Debug("Refreshing server session token")

requestMethod := "POST"
requestURL := e.client.ConstructURL("auth", "serverlogin")
requestURL := e.client.ConstructURL(e.PathPrefix, "serverlogin")
request, err := http.NewRequest(requestMethod, requestURL, nil)
if err != nil {
return errors.Wrapf(err, "unable to create new request for %s %s", requestMethod, requestURL)
Expand Down
19 changes: 13 additions & 6 deletions auth/service/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *Service) Initialize(provider application.Provider) error {
if err := s.initializePartnerSecrets(); err != nil {
return err
}
return s.initializeUserEventsHandler()
return s.initializeUserEventsHandler(provider)
}

func (s *Service) Terminate() {
Expand Down Expand Up @@ -417,13 +417,20 @@ func (s *Service) terminateAuthClient() {
}
}

func (s *Service) initializeUserEventsHandler() error {
func (s *Service) initializeUserEventsHandler(provider application.Provider) error {
s.Logger().Debug("Initializing user events handler")

ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger())
handler := authEvents.NewUserDataDeletionHandler(ctx, s.authClient)
handlers := []eventsCommon.EventHandler{handler}
runner := events.NewRunner(handlers)
var runner events.Runner

configReporter := provider.ConfigReporter().WithScopes("user", "events", "handler")
if configReporter.GetWithDefault("disable", "") != "true" {
ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger())
handler := authEvents.NewUserDataDeletionHandler(ctx, s.authClient)
handlers := []eventsCommon.EventHandler{handler}
runner = events.NewRunner(handlers)
} else {
runner = events.NewNoopRunner()
}

if err := runner.Initialize(); err != nil {
return errors.Wrap(err, "unable to initialize events runner")
Expand Down
2 changes: 1 addition & 1 deletion auth/service/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ var _ = Describe("Service", func() {
server = NewServer()
server.AppendHandlers(
CombineHandlers(
VerifyRequest("POST", "/auth/serverlogin"),
VerifyRequest("POST", "/serverlogin"), // by default the path prefix is empty to the auth service unless set in the env var TIDEPOOL_AUTH_CLIENT_EXTERNAL_PATH_PREFIX
VerifyHeaderKV("X-Tidepool-Server-Name", *provider.NameOutput),
VerifyHeaderKV("X-Tidepool-Server-Secret", serverSecret),
VerifyBody(nil),
Expand Down
19 changes: 13 additions & 6 deletions blob/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (s *Service) Initialize(provider application.Provider) error {
if err := s.initializeBlobClient(); err != nil {
return err
}
if err := s.initializeUserEventsHandler(); err != nil {
if err := s.initializeUserEventsHandler(provider); err != nil {
return err
}
return s.initializeRouter()
Expand Down Expand Up @@ -211,13 +211,20 @@ func (s *Service) terminateDeviceLogsUnstructuredStore() {
}
}

func (s *Service) initializeUserEventsHandler() error {
func (s *Service) initializeUserEventsHandler(provider application.Provider) error {
s.Logger().Debug("Initializing user events handler")

ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger())
handler := blobEvents.NewUserDataDeletionHandler(ctx, s.blobClient)
handlers := []eventsCommon.EventHandler{handler}
runner := events.NewRunner(handlers)
var runner events.Runner

configReporter := provider.ConfigReporter().WithScopes("user", "events", "handler")
if configReporter.GetWithDefault("disable", "") != "true" {
ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger())
handler := blobEvents.NewUserDataDeletionHandler(ctx, s.blobClient)
handlers := []eventsCommon.EventHandler{handler}
runner = events.NewRunner(handlers)
} else {
runner = events.NewNoopRunner()
}

if err := runner.Initialize(); err != nil {
return errors.Wrap(err, "unable to initialize events runner")
Expand Down
2 changes: 1 addition & 1 deletion blob/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var _ = Describe("Service", func() {
server = NewServer()
server.AppendHandlers(
CombineHandlers(
VerifyRequest(http.MethodPost, "/auth/serverlogin"),
VerifyRequest(http.MethodPost, "/serverlogin"),
VerifyHeaderKV("X-Tidepool-Server-Name", *provider.NameOutput),
VerifyHeaderKV("X-Tidepool-Server-Secret", serverSecret),
VerifyBody(nil),
Expand Down
6 changes: 5 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ func New(cfg *Config) (*Client, error) {
func (c *Client) ConstructURL(paths ...string) string {
segments := []string{}
for _, path := range paths {
segments = append(segments, url.PathEscape(strings.Trim(path, "/")))
escapedPath := url.PathEscape(strings.Trim(path, "/"))
if escapedPath == "" {
continue
}
segments = append(segments, escapedPath)
}
return fmt.Sprintf("%s/%s", strings.TrimRight(c.address, "/"), strings.Join(segments, "/"))
}
Expand Down
3 changes: 2 additions & 1 deletion client/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"fmt"
"net/url"

"github.com/kelseyhightower/envconfig"
Expand Down Expand Up @@ -36,7 +37,7 @@ func (c *Config) Validate() error {
if c.Address == "" {
return errors.New("address is missing")
} else if _, err := url.Parse(c.Address); err != nil {
return errors.New("address is invalid")
return fmt.Errorf("address is invalid: %w", err)
}

return nil
Expand Down
2 changes: 1 addition & 1 deletion client/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ var _ = Describe("Config", func() {

It("returns an error if the address is not a parseable URL", func() {
cfg.Address = "Not%Parseable"
Expect(cfg.Validate()).To(MatchError("address is invalid"))
Expect(cfg.Validate()).To(MatchError("address is invalid: parse \"Not%Parseable\": invalid URL escape \"%Pa\""))
})

It("returns success", func() {
Expand Down
20 changes: 14 additions & 6 deletions data/service/service/standard.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (s *Standard) Initialize(provider application.Provider) error {
if err := s.initializeDataSourceClient(); err != nil {
return err
}
if err := s.initializeUserEventsHandler(); err != nil {
if err := s.initializeUserEventsHandler(provider); err != nil {
return err
}
if err := s.initializeAPI(); err != nil {
Expand Down Expand Up @@ -404,14 +404,22 @@ func (s *Standard) initializeServer() error {
return nil
}

func (s *Standard) initializeUserEventsHandler() error {
func (s *Standard) initializeUserEventsHandler(provider application.Provider) error {
s.Logger().Debug("Initializing user events handler")
sarama.Logger = log.New(os.Stdout, "SARAMA ", log.LstdFlags|log.Lshortfile)

ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger())
handler := dataEvents.NewUserDataDeletionHandler(ctx, s.dataStore, s.dataSourceStructuredStore)
handlers := []eventsCommon.EventHandler{handler}
runner := events.NewRunner(handlers)
var runner events.Runner

configReporter := provider.ConfigReporter().WithScopes("user", "events", "handler")
if configReporter.GetWithDefault("disable", "") != "true" {
ctx := logInternal.NewContextWithLogger(context.Background(), s.Logger())
handler := dataEvents.NewUserDataDeletionHandler(ctx, s.dataStore, s.dataSourceStructuredStore)
handlers := []eventsCommon.EventHandler{handler}
runner = events.NewRunner(handlers)
} else {
runner = events.NewNoopRunner()
}

if err := runner.Initialize(); err != nil {
return errors.Wrap(err, "unable to initialize user events handler runner")
}
Expand Down
4 changes: 0 additions & 4 deletions data/store/mongo/mongo_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,12 @@ func (d *DataRepository) DeleteDataSet(ctx context.Context, dataSet *upload.Uplo
selector := bson.M{
"_userId": dataSet.UserID,
"uploadId": dataSet.UploadID,
"type": bson.M{"$ne": "upload"},
}
removeInfo, err = d.DatumRepository.DeleteMany(ctx, selector)
if err == nil {
selector = bson.M{
"_userId": dataSet.UserID,
"uploadId": dataSet.UploadID,
"type": "upload",
"deletedTime": bson.M{"$exists": false},
"deletedUserId": bson.M{"$exists": false},
}
Expand Down Expand Up @@ -136,15 +134,13 @@ func (d *DataRepository) DeleteOtherDataSetData(ctx context.Context, dataSet *up
"_userId": dataSet.UserID,
"deviceId": *dataSet.DeviceID,
"uploadId": bson.M{"$ne": dataSet.UploadID},
"type": bson.M{"$ne": "upload"},
}
removeInfo, err = d.DatumRepository.DeleteMany(ctx, selector)
if err == nil {
selector = bson.M{
"_userId": dataSet.UserID,
"deviceId": *dataSet.DeviceID,
"uploadId": bson.M{"$ne": dataSet.UploadID},
"type": "upload",
"deletedTime": bson.M{"$exists": false},
"deletedUserId": bson.M{"$exists": false},
}
Expand Down
24 changes: 4 additions & 20 deletions data/store/mongo/mongo_data_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,11 @@ func (d *DataSetRepository) EnsureIndexes() error {
Keys: bson.D{
{Key: "_userId", Value: 1},
{Key: "_active", Value: 1},
{Key: "type", Value: 1},
{Key: "time", Value: -1},
},
Options: options.Index().
SetName("UserIdTypeWeighted_v2"),
},
{
Keys: bson.D{
{Key: "origin.id", Value: 1},
{Key: "type", Value: 1},
{Key: "deletedTime", Value: -1},
{Key: "_active", Value: 1},
},
Options: options.Index().
SetName("OriginId"),
},
{
Keys: bson.D{
{Key: "uploadId", Value: 1},
Expand All @@ -61,7 +50,6 @@ func (d *DataSetRepository) EnsureIndexes() error {
{
Keys: bson.D{
{Key: "uploadId", Value: 1},
{Key: "type", Value: 1},
{Key: "deletedTime", Value: -1},
{Key: "_active", Value: 1},
},
Expand All @@ -72,7 +60,6 @@ func (d *DataSetRepository) EnsureIndexes() error {
Keys: bson.D{
{Key: "_userId", Value: 1},
{Key: "client.name", Value: 1},
{Key: "type", Value: 1},
{Key: "createdTime", Value: -1},
},
Options: options.Index().
Expand All @@ -85,7 +72,6 @@ func (d *DataSetRepository) EnsureIndexes() error {
Keys: bson.D{
{Key: "_userId", Value: 1},
{Key: "deviceId", Value: 1},
{Key: "type", Value: 1},
{Key: "createdTime", Value: -1},
},
Options: options.Index().
Expand Down Expand Up @@ -267,7 +253,6 @@ func (d *DataSetRepository) ListUserDataSets(ctx context.Context, userID string,
selector := bson.M{
"_active": true,
"_userId": userID,
"type": "upload",
}
if filter.ClientName != nil {
selector["client.name"] = *filter.ClientName
Expand Down Expand Up @@ -321,7 +306,6 @@ func (d *DataSetRepository) GetDataSetsForUserByID(ctx context.Context, userID s
selector := bson.M{
"_active": true,
"_userId": userID,
"type": "upload",
}
if !filter.Deleted {
selector["deletedTime"] = bson.M{"$exists": false}
Expand All @@ -330,14 +314,14 @@ func (d *DataSetRepository) GetDataSetsForUserByID(ctx context.Context, userID s
SetSort(bson.M{"createdTime": -1})
cursor, err := d.Find(ctx, selector, opts)

loggerFields := log.Fields{"userId": userID, "dataSetsCount": len(dataSets), "duration": time.Since(now) / time.Microsecond}
log.LoggerFromContext(ctx).WithFields(loggerFields).WithError(err).Debug("GetDataSetsForUserByID")

if err != nil {
return nil, errors.Wrap(err, "unable to get data sets for user by id")
}

if err = cursor.All(ctx, &dataSets); err != nil {
err = cursor.All(ctx, &dataSets)
loggerFields := log.Fields{"userId": userID, "dataSetsCount": len(dataSets), "duration": time.Since(now) / time.Microsecond}
log.LoggerFromContext(ctx).WithFields(loggerFields).WithError(err).Debug("GetDataSetsForUserByID")
if err != nil {
return nil, errors.Wrap(err, "unable to decode data sets for user by id")
}

Expand Down
Loading