From 2ea9686ff9f5b0278685125325e2d642703b6b28 Mon Sep 17 00:00:00 2001 From: Eric Wollesen Date: Mon, 8 Jul 2024 14:05:43 -0600 Subject: [PATCH] adds the alerts events consumer to the data service It uses the new asyncevents from go-common, as alerts processing requires different retry semantics than the existing solution. The Pusher interface is moved out of data/service into data/events to avoid a circular dependency. BACK-2554 --- data/events/alerts.go | 358 +++++++++++++++++ data/events/alerts_test.go | 639 +++++++++++++++++++++++++++++++ data/service/service/standard.go | 86 ++++- 3 files changed, 1074 insertions(+), 9 deletions(-) create mode 100644 data/events/alerts.go create mode 100644 data/events/alerts_test.go diff --git a/data/events/alerts.go b/data/events/alerts.go new file mode 100644 index 000000000..b69d3fa17 --- /dev/null +++ b/data/events/alerts.go @@ -0,0 +1,358 @@ +package events + +import ( + "cmp" + "context" + "os" + "slices" + "strings" + "time" + + "github.com/Shopify/sarama" + "go.mongodb.org/mongo-driver/bson" + + "github.com/tidepool-org/platform/alerts" + "github.com/tidepool-org/platform/auth" + "github.com/tidepool-org/platform/data/store" + "github.com/tidepool-org/platform/data/types/blood/glucose" + "github.com/tidepool-org/platform/data/types/dosingdecision" + "github.com/tidepool-org/platform/devicetokens" + "github.com/tidepool-org/platform/errors" + "github.com/tidepool-org/platform/log" + logjson "github.com/tidepool-org/platform/log/json" + lognull "github.com/tidepool-org/platform/log/null" + "github.com/tidepool-org/platform/permission" + "github.com/tidepool-org/platform/push" +) + +type Consumer struct { + Alerts AlertsClient + Data store.DataRepository + DeviceTokens auth.DeviceTokensClient + Evaluator AlertsEvaluator + Permissions permission.Client + Pusher Pusher + Tokens alerts.TokenProvider + + Logger log.Logger +} + +// DosingDecision removes a stutter to improve readability. +type DosingDecision = dosingdecision.DosingDecision + +// Glucose removes a stutter to improve readability. +type Glucose = glucose.Glucose + +func (c *Consumer) Consume(ctx context.Context, + session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) (err error) { + + if msg == nil { + c.logger(ctx).Info("UNEXPECTED: nil message; ignoring") + return nil + } + + switch { + case strings.HasSuffix(msg.Topic, ".data.alerts"): + return c.consumeAlertsConfigs(ctx, session, msg) + case strings.HasSuffix(msg.Topic, ".data.deviceData.alerts"): + return c.consumeDeviceData(ctx, session, msg) + default: + c.logger(ctx).WithField("topic", msg.Topic). + Infof("UNEXPECTED: topic; ignoring") + } + + return nil +} + +func (c *Consumer) consumeAlertsConfigs(ctx context.Context, + session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) error { + + cfg := &alerts.Config{} + if err := unmarshalMessageValue(msg.Value, cfg); err != nil { + return err + } + lgr := c.logger(ctx) + lgr.WithField("cfg", cfg).Info("consuming an alerts config message") + + ctxLog := c.logger(ctx).WithField("followedUserID", cfg.FollowedUserID) + ctx = log.NewContextWithLogger(ctx, ctxLog) + + notes, err := c.Evaluator.Evaluate(ctx, cfg.FollowedUserID) + if err != nil { + format := "Unable to evalaute alerts configs triggered event for user %s" + return errors.Wrapf(err, format, cfg.UserID) + } + ctxLog.WithField("notes", notes).Debug("notes generated from alerts config") + + c.pushNotes(ctx, notes) + + session.MarkMessage(msg, "") + lgr.WithField("message", msg).Debug("marked") + return nil +} + +func (c *Consumer) consumeDeviceData(ctx context.Context, + session sarama.ConsumerGroupSession, msg *sarama.ConsumerMessage) (err error) { + + datum := &Glucose{} + if err := unmarshalMessageValue(msg.Value, datum); err != nil { + return err + } + lgr := c.logger(ctx) + lgr.WithField("data", datum).Info("consuming a device data message") + + if datum.UserID == nil { + return errors.New("Unable to retrieve alerts configs: userID is nil") + } + ctx = log.NewContextWithLogger(ctx, lgr.WithField("followedUserID", *datum.UserID)) + notes, err := c.Evaluator.Evaluate(ctx, *datum.UserID) + if err != nil { + format := "Unable to evalaute device data triggered event for user %s" + return errors.Wrapf(err, format, *datum.UserID) + } + for idx, note := range notes { + lgr.WithField("idx", idx).WithField("note", note).Debug("notes") + } + + c.pushNotes(ctx, notes) + + session.MarkMessage(msg, "") + lgr.WithField("message", msg).Debug("marked") + return nil +} + +func (c *Consumer) pushNotes(ctx context.Context, notes []*alerts.Note) { + lgr := c.logger(ctx) + + // Notes could be pushed into a Kafka topic to have a more durable retry, + // but that can be added later. + for _, note := range notes { + lgr := lgr.WithField("recipientUserID", note.RecipientUserID) + tokens, err := c.DeviceTokens.GetDeviceTokens(ctx, note.RecipientUserID) + if err != nil { + lgr.WithError(err).Info("Unable to retrieve device tokens") + } + if len(tokens) == 0 { + lgr.Debug("no device tokens found, won't push any notifications") + } + pushNote := push.FromNote(note) + for _, token := range tokens { + if err := c.Pusher.Push(ctx, token, pushNote); err != nil { + lgr.WithError(err).Info("Unable to push notification") + } + } + } +} + +// logger produces a log.Logger. +// +// It tries a number of options before falling back to a null Logger. +func (c *Consumer) logger(ctx context.Context) log.Logger { + // A context's Logger is preferred, as it has the most... context. + if ctxLgr := log.LoggerFromContext(ctx); ctxLgr != nil { + return ctxLgr + } + if c.Logger != nil { + return c.Logger + } + fallback, err := logjson.NewLogger(os.Stderr, log.DefaultLevelRanks(), log.DefaultLevel()) + if err != nil { + fallback = lognull.NewLogger() + } + return fallback +} + +type AlertsEvaluator interface { + Evaluate(ctx context.Context, followedUserID string) ([]*alerts.Note, error) +} + +func NewAlertsEvaluator(alerts AlertsClient, data store.DataRepository, + perms permission.Client, tokens alerts.TokenProvider) *evaluator { + + return &evaluator{ + Alerts: alerts, + Data: data, + Permissions: perms, + Tokens: tokens, + } +} + +// evaluator implements AlertsEvaluator. +type evaluator struct { + Alerts AlertsClient + Data store.DataRepository + Permissions permission.Client + Tokens alerts.TokenProvider +} + +// logger produces a log.Logger. +// +// It tries a number of options before falling back to a null Logger. +func (e *evaluator) logger(ctx context.Context) log.Logger { + // A context's Logger is preferred, as it has the most... context. + if ctxLgr := log.LoggerFromContext(ctx); ctxLgr != nil { + return ctxLgr + } + fallback, err := logjson.NewLogger(os.Stderr, log.DefaultLevelRanks(), log.DefaultLevel()) + if err != nil { + fallback = lognull.NewLogger() + } + return fallback +} + +// Evaluate followers' alerts.Configs to generate alert notifications. +func (e *evaluator) Evaluate(ctx context.Context, followedUserID string) ( + []*alerts.Note, error) { + + alertsConfigs, err := e.gatherAlertsConfigs(ctx, followedUserID) + if err != nil { + return nil, err + } + e.logger(ctx).Debugf("%d alerts configs found", len(alertsConfigs)) + + alertsConfigsByUploadID := e.mapAlertsConfigsByUploadID(alertsConfigs) + + notes := []*alerts.Note{} + for uploadID, cfgs := range alertsConfigsByUploadID { + resp, err := e.gatherData(ctx, followedUserID, uploadID, cfgs) + if err != nil { + return nil, err + } + notes = slices.Concat(notes, e.generateNotes(ctx, cfgs, resp)) + } + + return notes, nil +} + +func (e *evaluator) mapAlertsConfigsByUploadID(cfgs []*alerts.Config) map[string][]*alerts.Config { + mapped := map[string][]*alerts.Config{} + for _, cfg := range cfgs { + if _, found := mapped[cfg.UploadID]; !found { + mapped[cfg.UploadID] = []*alerts.Config{} + } + mapped[cfg.UploadID] = append(mapped[cfg.UploadID], cfg) + } + return mapped +} + +func (e *evaluator) gatherAlertsConfigs(ctx context.Context, + followedUserID string) ([]*alerts.Config, error) { + + alertsConfigs, err := e.Alerts.List(ctx, followedUserID) + if err != nil { + return nil, err + } + e.logger(ctx).Debugf("after List, %d alerts configs", len(alertsConfigs)) + alertsConfigs = slices.DeleteFunc(alertsConfigs, e.authDenied(ctx)) + e.logger(ctx).Debugf("after perms check, %d alerts configs", len(alertsConfigs)) + return alertsConfigs, nil +} + +// authDenied builds functions that enable slices.DeleteFunc to remove +// unauthorized users' alerts.Configs. +// +// Via a closure it's able to inject information from the Context and the +// evaluator itself into the resulting function. +func (e *evaluator) authDenied(ctx context.Context) func(ac *alerts.Config) bool { + lgr := e.logger(ctx) + return func(ac *alerts.Config) bool { + if ac == nil { + return true + } + lgr = lgr.WithFields(log.Fields{ + "userID": ac.UserID, + "followedUserID": ac.FollowedUserID, + }) + token, err := e.Tokens.ServerSessionToken() + if err != nil { + lgr.WithError(err).Warn("Unable to confirm permissions; skipping") + return false + } + ctx = auth.NewContextWithServerSessionToken(ctx, token) + perms, err := e.Permissions.GetUserPermissions(ctx, ac.UserID, ac.FollowedUserID) + if err != nil { + lgr.WithError(err).Warn("Unable to confirm permissions; skipping") + return true + } + if _, found := perms[permission.Follow]; !found { + lgr.Debug("permission denied: skipping") + return true + } + return false + } +} + +func (e *evaluator) gatherData(ctx context.Context, followedUserID, uploadID string, + alertsConfigs []*alerts.Config) (*store.AlertableResponse, error) { + + if len(alertsConfigs) == 0 { + return nil, nil + } + + longestDelay := slices.MaxFunc(alertsConfigs, func(i, j *alerts.Config) int { + return cmp.Compare(i.LongestDelay(), j.LongestDelay()) + }).LongestDelay() + longestDelay = max(5*time.Minute, longestDelay) + e.logger(ctx).WithField("longestDelay", longestDelay).Debug("here it is") + params := store.AlertableParams{ + UserID: followedUserID, + UploadID: uploadID, + Start: time.Now().Add(-longestDelay), + } + resp, err := e.Data.GetAlertableData(ctx, params) + if err != nil { + return nil, err + } + + return resp, nil +} + +func (e *evaluator) generateNotes(ctx context.Context, + alertsConfigs []*alerts.Config, resp *store.AlertableResponse) []*alerts.Note { + + if len(alertsConfigs) == 0 { + return nil + } + + lgr := e.logger(ctx) + notes := []*alerts.Note{} + for _, alertsConfig := range alertsConfigs { + l := lgr.WithFields(log.Fields{ + "userID": alertsConfig.UserID, + "followedUserID": alertsConfig.FollowedUserID, + "uploadID": alertsConfig.UploadID, + }) + c := log.NewContextWithLogger(ctx, l) + note := alertsConfig.Evaluate(c, resp.Glucose, resp.DosingDecisions) + if note != nil { + notes = append(notes, note) + continue + } + } + + return notes +} + +func unmarshalMessageValue[A any](b []byte, payload *A) error { + wrapper := &struct { + FullDocument A `json:"fullDocument"` + }{} + if err := bson.UnmarshalExtJSON(b, false, wrapper); err != nil { + return errors.Wrap(err, "Unable to unmarshal ExtJSON") + } + *payload = wrapper.FullDocument + return nil +} + +type AlertsClient interface { + Delete(context.Context, *alerts.Config) error + Get(context.Context, *alerts.Config) (*alerts.Config, error) + List(_ context.Context, userID string) ([]*alerts.Config, error) + Upsert(context.Context, *alerts.Config) error +} + +// Pusher is a service-agnostic interface for sending push notifications. +type Pusher interface { + // Push a notification to a device. + Push(context.Context, *devicetokens.DeviceToken, *push.Notification) error +} diff --git a/data/events/alerts_test.go b/data/events/alerts_test.go new file mode 100644 index 000000000..66ab07787 --- /dev/null +++ b/data/events/alerts_test.go @@ -0,0 +1,639 @@ +package events + +import ( + "context" + "time" + + "github.com/Shopify/sarama" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + + "github.com/tidepool-org/platform/alerts" + nontypesglucose "github.com/tidepool-org/platform/data/blood/glucose" + "github.com/tidepool-org/platform/data/store" + storetest "github.com/tidepool-org/platform/data/store/test" + "github.com/tidepool-org/platform/data/types" + "github.com/tidepool-org/platform/data/types/blood" + "github.com/tidepool-org/platform/data/types/blood/glucose" + "github.com/tidepool-org/platform/devicetokens" + "github.com/tidepool-org/platform/errors" + "github.com/tidepool-org/platform/log" + logtest "github.com/tidepool-org/platform/log/test" + "github.com/tidepool-org/platform/permission" + "github.com/tidepool-org/platform/pointer" + "github.com/tidepool-org/platform/push" +) + +const ( + testUserID = "test-user-id" + testFollowedUserID = "test-followed-user-id" + testUserNoPermsID = "test-user-no-perms" + testUploadID = "test-upload-id" +) + +var ( + testMongoUrgentLowResponse = &store.AlertableResponse{ + Glucose: []*glucose.Glucose{ + newTestStaticDatumMmolL(1.0)}, + } +) + +var _ = Describe("Consumer", func() { + + Describe("Consume", func() { + It("ignores nil messages", func() { + ctx, _ := addLogger(context.Background()) + c := &Consumer{} + + Expect(c.Consume(ctx, nil, nil)).To(Succeed()) + }) + + It("processes alerts config events", func() { + cfg := &alerts.Config{ + UserID: testUserID, + FollowedUserID: testFollowedUserID, + Alerts: alerts.Alerts{ + Low: &alerts.LowAlert{ + Base: alerts.Base{ + Enabled: true}, + Threshold: alerts.Threshold{ + Value: 101.1, + Units: "mg/dL", + }, + }, + }, + } + kafkaMsg := newAlertsMockConsumerMessage(".data.alerts", cfg) + docs := []interface{}{bson.M{}} + c, deps := newConsumerTestDeps(docs) + + Expect(c.Consume(deps.Context, deps.Session, kafkaMsg)).To(Succeed()) + Expect(deps.Session.MarkCalls).To(Equal(1)) + }) + + It("processes device data events", func() { + blood := &glucose.Glucose{ + Blood: blood.Blood{ + Units: pointer.FromAny("mmol/L"), + Value: pointer.FromAny(7.2), + Base: types.Base{ + UserID: pointer.FromAny(testFollowedUserID), + }, + }, + } + kafkaMsg := newAlertsMockConsumerMessage(".data.deviceData.alerts", blood) + docs := []interface{}{bson.M{}} + c, deps := newConsumerTestDeps(docs) + + Expect(c.Consume(deps.Context, deps.Session, kafkaMsg)).To(Succeed()) + Expect(deps.Session.MarkCalls).To(Equal(1)) + }) + + }) + + Describe("Evaluator", func() { + Describe("Evaluate", func() { + It("checks that alerts config owners have permission", func() { + testLogger := logtest.NewLogger() + ctx := log.NewContextWithLogger(context.Background(), testLogger) + + eval, deps := newEvaluatorTestDeps([]*store.AlertableResponse{testMongoUrgentLowResponse}) + deps.Permissions.Allow(testUserID, permission.Follow, testFollowedUserID) + deps.Permissions.DenyAll(testUserNoPermsID, testFollowedUserID) + deps.Alerts.Configs = append(deps.Alerts.Configs, testAlertsConfigUrgentLow(testUserNoPermsID)) + deps.Alerts.Configs = append(deps.Alerts.Configs, testAlertsConfigUrgentLow(testUserID)) + + notes, err := eval.Evaluate(ctx, testFollowedUserID) + + Expect(err).To(Succeed()) + Expect(notes).To(ConsistOf(HaveField("RecipientUserID", testUserID))) + }) + + It("uses the longest delay", func() { + + }) + }) + + }) + + // Describe("evaluateUrgentLow", func() { + // It("can't function without datum units", func() { + // ctx, _ := addLogger(context.Background()) + // alert := newTestUrgentLowAlert() + // datum := newTestStaticDatumMmolL(11) + // datum.Blood.Units = nil + // c := &Consumer{ + // Pusher: newMockPusher(), + // DeviceTokens: newMockDeviceTokensClient(), + // } + + // _, err := c.evaluateUrgentLow(ctx, datum, testUserID, alert) + + // Expect(err).To(MatchError("Unable to evaluate datum: Units, Value, or Time is nil")) + // }) + + // It("can't function without datum value", func() { + // ctx, _ := addLogger(context.Background()) + // alert := newTestUrgentLowAlert() + // datum := newTestStaticDatumMmolL(11) + // datum.Blood.Value = nil + // c := &Consumer{ + // Pusher: newMockPusher(), + // DeviceTokens: newMockDeviceTokensClient(), + // } + + // _, err := c.evaluateUrgentLow(ctx, datum, testUserID, alert) + + // Expect(err).To(MatchError("Unable to evaluate datum: Units, Value, or Time is nil")) + // }) + + // It("can't function without datum time", func() { + // ctx, _ := addLogger(context.Background()) + // alert := newTestUrgentLowAlert() + // datum := newTestStaticDatumMmolL(11) + // datum.Blood.Time = nil + // c := &Consumer{ + // Pusher: newMockPusher(), + // DeviceTokens: newMockDeviceTokensClient(), + // } + + // _, err := c.evaluateUrgentLow(ctx, datum, testUserID, alert) + // Expect(err).To(MatchError("Unable to evaluate datum: Units, Value, or Time is nil")) + // }) + + // It("is marked resolved", func() { + // ctx, _ := addLogger(context.Background()) + // datum := newTestStaticDatumMmolL(11) + // alert := newTestUrgentLowAlert() + // alert.Threshold.Value = *datum.Blood.Value - 1 + // userID := "test-user-id" + // c := &Consumer{ + // Pusher: newMockPusher(), + // DeviceTokens: newMockDeviceTokensClient(), + // } + + // updated, err := c.evaluateUrgentLow(ctx, datum, userID, alert) + // Expect(err).To(Succeed()) + // Expect(updated).To(BeTrue()) + // Expect(alert.Resolved).To(BeTemporally("~", time.Now(), time.Second)) + // }) + + // It("is marked both notified and triggered", func() { + // ctx, _ := addLogger(context.Background()) + // datum := newTestStaticDatumMmolL(11) + // alert := newTestUrgentLowAlert() + // alert.Threshold.Value = *datum.Blood.Value + 1 + // userID := "test-user-id" + // c := &Consumer{ + // Pusher: newMockPusher(), + // DeviceTokens: newMockDeviceTokensClient(), + // } + + // updated, err := c.evaluateUrgentLow(ctx, datum, userID, alert) + // Expect(err).To(Succeed()) + // Expect(updated).To(BeTrue()) + // Expect(alert.Sent).To(BeTemporally("~", time.Now(), time.Second)) + // Expect(alert.Triggered).To(BeTemporally("~", time.Now(), time.Second)) + // }) + + // It("sends notifications regardless of previous notification time", func() { + // ctx, _ := addLogger(context.Background()) + // datum := newTestStaticDatumMmolL(11) + // alert := newTestUrgentLowAlert() + // lastTime := time.Now().Add(-10 * time.Second) + // alert.Activity.Sent = lastTime + // alert.Threshold.Value = *datum.Blood.Value + 1 + // userID := "test-user-id" + // c := &Consumer{ + // Pusher: newMockPusher(), + // DeviceTokens: newMockDeviceTokensClient(), + // } + + // updated, err := c.evaluateUrgentLow(ctx, datum, userID, alert) + // Expect(err).To(Succeed()) + // Expect(updated).To(BeTrue()) + // Expect(alert.Sent).To(BeTemporally("~", time.Now(), time.Second)) + // }) + // }) +}) + +type consumerTestDeps struct { + Alerts *mockAlertsConfigClient + Context context.Context + Cursor *mongo.Cursor + Evaluator *mockStaticEvaluator + Logger log.Logger + Permissions *mockPermissionsClient + Repo *storetest.DataRepository + Session *mockConsumerGroupSession + Tokens alerts.TokenProvider +} + +func newConsumerTestDeps(docs []interface{}) (*Consumer, *consumerTestDeps) { + GinkgoHelper() + ctx, logger := addLogger(context.Background()) + alertsClient := newMockAlertsConfigClient([]*alerts.Config{ + { + UserID: testUserID, + FollowedUserID: testFollowedUserID, + Alerts: alerts.Alerts{}, + }, + }, nil) + dataRepo := storetest.NewDataRepository() + dataRepo.GetLastUpdatedForUserOutputs = []storetest.GetLastUpdatedForUserOutput{} + augmentedDocs := augmentMockMongoDocs(docs) + cur := newMockMongoCursor(augmentedDocs) + dataRepo.GetDataRangeOutputs = []storetest.GetDataRangeOutput{ + {Error: nil, Cursor: cur}, + } + tokens := &mockAlertsTokenProvider{Token: "test-token"} + permissions := newMockPermissionsClient() + evaluator := newMockStaticEvaluator() + + return &Consumer{ + Alerts: alertsClient, + Evaluator: evaluator, + Tokens: tokens, + Data: dataRepo, + Permissions: permissions, + }, &consumerTestDeps{ + Alerts: alertsClient, + Context: ctx, + Cursor: cur, + Evaluator: evaluator, + Repo: dataRepo, + Session: &mockConsumerGroupSession{}, + Logger: logger, + Tokens: tokens, + Permissions: permissions, + } +} + +func newEvaluatorTestDeps(responses []*store.AlertableResponse) (*evaluator, *evaluatorTestDeps) { + alertsClient := newMockAlertsConfigClient(nil, nil) + dataRepo := storetest.NewDataRepository() + dataRepo.GetLastUpdatedForUserOutputs = []storetest.GetLastUpdatedForUserOutput{} + for _, r := range responses { + out := storetest.GetAlertableDataOutput{Response: r} + dataRepo.GetAlertableDataOutputs = append(dataRepo.GetAlertableDataOutputs, out) + } + permissions := newMockPermissionsClient() + tokens := newMockTokensProvider() + return &evaluator{ + Alerts: alertsClient, + Data: dataRepo, + Permissions: permissions, + Tokens: tokens, + }, &evaluatorTestDeps{ + Alerts: alertsClient, + Permissions: permissions, + } +} + +type evaluatorTestDeps struct { + Alerts *mockAlertsConfigClient + Permissions *mockPermissionsClient +} + +// mockEvaluator implements Evaluator. +type mockEvaluator struct { + Evaluations map[string][]mockEvaluatorResponse + EvaluateCalls map[string]int +} + +type mockEvaluatorResponse struct { + Notes []*alerts.Note + Error error +} + +func newMockEvaluator() *mockEvaluator { + return &mockEvaluator{ + Evaluations: map[string][]mockEvaluatorResponse{}, + EvaluateCalls: map[string]int{}, + } +} + +func (e *mockEvaluator) Evaluate(ctx context.Context, followedUserID string) ([]*alerts.Note, error) { + if _, found := e.Evaluations[followedUserID]; !found { + return nil, nil + } + resp := e.Evaluations[followedUserID][0] + if len(e.Evaluations[followedUserID]) > 1 { + e.Evaluations[followedUserID] = e.Evaluations[followedUserID][1:] + } + e.EvaluateCalls[followedUserID] += 1 + if resp.Error != nil { + return nil, resp.Error + } + return resp.Notes, nil +} + +func (e *mockEvaluator) EvaluateCallsTotal() int { + total := 0 + for _, val := range e.EvaluateCalls { + total += val + } + return total +} + +// mockStaticEvaluator wraps mock evaluator with a static response. +// +// Useful when testing Consumer behavior, when the behavior of the Evaulator +// isn't relevant to the Consumer test. +type mockStaticEvaluator struct { + *mockEvaluator +} + +func newMockStaticEvaluator() *mockStaticEvaluator { + return &mockStaticEvaluator{newMockEvaluator()} +} + +func (e *mockStaticEvaluator) Evaluate(ctx context.Context, followedUserID string) ([]*alerts.Note, error) { + e.EvaluateCalls[followedUserID] += 1 + return nil, nil +} + +func newAlertsMockConsumerMessage(topic string, v any) *sarama.ConsumerMessage { + GinkgoHelper() + doc := &struct { + FullDocument any `json:"fullDocument" bson:"fullDocument"` + }{FullDocument: v} + vBytes, err := bson.MarshalExtJSON(doc, false, false) + Expect(err).To(Succeed()) + return &sarama.ConsumerMessage{ + Value: vBytes, + Topic: topic, + } +} + +func addLogger(ctx context.Context) (context.Context, log.Logger) { + GinkgoHelper() + if ctx == nil { + ctx = context.Background() + } + + lgr := newTestLogger() + return log.NewContextWithLogger(ctx, lgr), lgr +} + +func newTestLogger() log.Logger { + GinkgoHelper() + lgr := logtest.NewLogger() + return lgr +} + +func augmentMockMongoDocs(inDocs []interface{}) []interface{} { + defaultDoc := bson.M{ + "_userId": testFollowedUserID, + "_active": true, + "type": "upload", + "time": time.Now(), + } + outDocs := []interface{}{} + for _, inDoc := range inDocs { + newDoc := defaultDoc + switch v := (inDoc).(type) { + case map[string]interface{}: + for key, val := range v { + newDoc[key] = val + } + outDocs = append(outDocs, newDoc) + default: + outDocs = append(outDocs, inDoc) + } + } + return outDocs +} + +func newMockMongoCursor(docs []interface{}) *mongo.Cursor { + GinkgoHelper() + cur, err := mongo.NewCursorFromDocuments(docs, nil, nil) + Expect(err).To(Succeed()) + return cur +} + +func newTestStaticDatumMmolL(value float64) *glucose.Glucose { + return &glucose.Glucose{ + Blood: blood.Blood{ + Base: types.Base{ + Time: pointer.FromTime(time.Now()), + }, + Units: pointer.FromString(nontypesglucose.MmolL), + Value: pointer.FromFloat64(value), + }, + } +} + +func newTestUrgentLowAlert() *alerts.UrgentLowAlert { + return &alerts.UrgentLowAlert{ + Base: alerts.Base{ + Enabled: true, + Activity: alerts.Activity{}, + }, + Threshold: alerts.Threshold{ + Units: nontypesglucose.MmolL, + }, + } +} + +type mockDeviceTokensClient struct { + Error error + Tokens []*devicetokens.DeviceToken +} + +func newMockDeviceTokensClient() *mockDeviceTokensClient { + return &mockDeviceTokensClient{ + Tokens: []*devicetokens.DeviceToken{}, + } +} + +// // testingT is a subset of testing.TB +// type testingT interface { +// Errorf(format string, args ...any) +// Fatalf(format string, args ...any) +// } + +func (m *mockDeviceTokensClient) GetDeviceTokens(ctx context.Context, + userID string) ([]*devicetokens.DeviceToken, error) { + + if m.Error != nil { + return nil, m.Error + } + return m.Tokens, nil +} + +type mockPusher struct { + Pushes []string +} + +func newMockPusher() *mockPusher { + return &mockPusher{ + Pushes: []string{}, + } +} + +func (p *mockPusher) Push(ctx context.Context, + deviceToken *devicetokens.DeviceToken, notification *push.Notification) error { + p.Pushes = append(p.Pushes, notification.Message) + return nil +} + +type mockAlertsConfigClient struct { + Error error + Configs []*alerts.Config +} + +func newMockAlertsConfigClient(c []*alerts.Config, err error) *mockAlertsConfigClient { + if c == nil { + c = []*alerts.Config{} + } + return &mockAlertsConfigClient{ + Configs: c, + Error: err, + } +} + +func (c *mockAlertsConfigClient) Delete(_ context.Context, _ *alerts.Config) error { + return c.Error +} + +func (c *mockAlertsConfigClient) Get(_ context.Context, _ *alerts.Config) (*alerts.Config, error) { + if c.Error != nil { + return nil, c.Error + } else if len(c.Configs) > 0 { + return c.Configs[0], nil + } + return nil, nil +} + +func (c *mockAlertsConfigClient) List(_ context.Context, userID string) ([]*alerts.Config, error) { + if c.Error != nil { + return nil, c.Error + } else if len(c.Configs) > 0 { + return c.Configs, nil + } + return nil, nil +} + +func (c *mockAlertsConfigClient) Upsert(_ context.Context, _ *alerts.Config) error { + return c.Error +} + +type mockConsumerGroupSession struct { + MarkCalls int +} + +func (s *mockConsumerGroupSession) Claims() map[string][]int32 { + panic("not implemented") // TODO: Implement +} + +func (s *mockConsumerGroupSession) MemberID() string { + panic("not implemented") // TODO: Implement +} + +func (s *mockConsumerGroupSession) GenerationID() int32 { + panic("not implemented") // TODO: Implement +} + +func (s *mockConsumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) { + panic("not implemented") // TODO: Implement +} + +func (s *mockConsumerGroupSession) Commit() { + panic("not implemented") // TODO: Implement +} + +func (s *mockConsumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) { + panic("not implemented") // TODO: Implement +} + +func (s *mockConsumerGroupSession) MarkMessage(msg *sarama.ConsumerMessage, metadata string) { + s.MarkCalls++ +} + +func (s *mockConsumerGroupSession) Context() context.Context { + panic("not implemented") // TODO: Implement +} + +type mockAlertsTokenProvider struct { + Token string + Error error +} + +func (p *mockAlertsTokenProvider) ServerSessionToken() (string, error) { + if p.Error != nil { + return "", p.Error + } + return p.Token, nil +} + +type mockPermissionsClient struct { + Error error + Perms map[string]permission.Permissions +} + +func newMockPermissionsClient() *mockPermissionsClient { + return &mockPermissionsClient{ + Perms: map[string]permission.Permissions{}, + } +} + +func (c *mockPermissionsClient) Key(requesterUserID, targetUserID string) string { + return requesterUserID + targetUserID +} + +func (c *mockPermissionsClient) Allow(requestUserID, perm, targetUserID string) { + key := c.Key(requestUserID, targetUserID) + if _, found := c.Perms[key]; !found { + c.Perms[key] = permission.Permissions{} + } + c.Perms[key][perm] = permission.Permission{} +} + +func (c *mockPermissionsClient) DenyAll(requestUserID, targetUserID string) { + key := c.Key(requestUserID, targetUserID) + delete(c.Perms, key) +} + +func (c *mockPermissionsClient) GetUserPermissions(ctx context.Context, requestUserID string, targetUserID string) (permission.Permissions, error) { + if c.Error != nil { + return nil, c.Error + } + if p, ok := c.Perms[c.Key(requestUserID, targetUserID)]; ok { + return p, nil + } else { + return nil, errors.New("test error NOT FOUND") + } +} + +type mockTokensProvider struct{} + +func newMockTokensProvider() *mockTokensProvider { + return &mockTokensProvider{} +} + +func (p *mockTokensProvider) ServerSessionToken() (string, error) { + return "test-server-session-token", nil +} + +func testAlertsConfigUrgentLow(userID string) *alerts.Config { + return &alerts.Config{ + UserID: userID, + FollowedUserID: testFollowedUserID, + UploadID: testUploadID, + Alerts: alerts.Alerts{ + UrgentLow: &alerts.UrgentLowAlert{ + Base: alerts.Base{ + Enabled: true, + Activity: alerts.Activity{}, + }, + Threshold: alerts.Threshold{ + Value: 10.0, + Units: nontypesglucose.MgdL, + }, + }, + }, + } +} diff --git a/data/service/service/standard.go b/data/service/service/standard.go index 065f642ff..47a61df79 100644 --- a/data/service/service/standard.go +++ b/data/service/service/standard.go @@ -2,6 +2,7 @@ package service import ( "context" + "strings" "github.com/Shopify/sarama" "github.com/kelseyhightower/envconfig" @@ -18,7 +19,6 @@ import ( dataSourceStoreStructured "github.com/tidepool-org/platform/data/source/store/structured" dataSourceStoreStructuredMongo "github.com/tidepool-org/platform/data/source/store/structured/mongo" dataStoreMongo "github.com/tidepool-org/platform/data/store/mongo" - "github.com/tidepool-org/platform/devicetokens" "github.com/tidepool-org/platform/errors" "github.com/tidepool-org/platform/events" "github.com/tidepool-org/platform/log" @@ -44,8 +44,9 @@ type Standard struct { dataClient *Client clinicsClient *clinics.Client dataSourceClient *dataSourceServiceClient.Client - pusher Pusher + pusher dataEvents.Pusher userEventsHandler events.Runner + alertsEventsHandler events.Runner api *api.Standard server *server.Standard } @@ -97,6 +98,9 @@ func (s *Standard) Initialize(provider application.Provider) error { if err := s.initializeUserEventsHandler(); err != nil { return err } + if err := s.initializeAlertsEventsHandler(); err != nil { + return err + } if err := s.initializeAPI(); err != nil { return err } @@ -117,6 +121,13 @@ func (s *Standard) Terminate() { } s.userEventsHandler = nil } + if s.alertsEventsHandler != nil { + s.Logger().Info("Terminating the alertsEventsHandler") + if err := s.alertsEventsHandler.Terminate(); err != nil { + s.Logger().Errorf("Error while terminating the alertsEventsHandler: %v", err) + } + s.alertsEventsHandler = nil + } s.api = nil s.dataClient = nil if s.syncTaskStore != nil { @@ -147,6 +158,9 @@ func (s *Standard) Run() error { go func() { errs <- s.userEventsHandler.Run() }() + go func() { + errs <- s.alertsEventsHandler.Run() + }() go func() { errs <- s.server.Serve() }() @@ -434,12 +448,6 @@ func (s *Standard) initializeSaramaLogger() error { return nil } -// Pusher is a service-agnostic interface for sending push notifications. -type Pusher interface { - // Push a notification to a device. - Push(context.Context, *devicetokens.DeviceToken, *push.Notification) error -} - func (s *Standard) initializePusher() error { var err error @@ -453,7 +461,7 @@ func (s *Standard) initializePusher() error { return errors.Wrap(err, "Unable to process APNs pusher config") } - var pusher Pusher + var pusher dataEvents.Pusher pusher, err = push.NewAPNSPusherFromKeyData(apns2Config.SigningKey, apns2Config.KeyID, apns2Config.TeamID, apns2Config.BundleID) if err != nil { @@ -464,3 +472,63 @@ func (s *Standard) initializePusher() error { return nil } + +func (s *Standard) initializeAlertsEventsHandler() error { + s.Logger().Debug("Initializing alerts events handler") + + commonConfig := eventsCommon.NewConfig() + if err := commonConfig.LoadFromEnv(); err != nil { + return err + } + + // In addition to the CloudEventsConfig, additional specific config values + // are needed. + config := &struct { + KafkaAlertsTopics []string `envconfig:"KAFKA_ALERTS_TOPICS" default:"alerts,deviceData.alerts"` + KafkaAlertsGroupID string `envconfig:"KAFKA_ALERTS_CONSUMER_GROUP" required:"true"` + }{} + if err := envconfig.Process("", config); err != nil { + return errors.Wrap(err, "Unable to process envconfig") + } + + // Some kafka topics use a `-` as a prefix. But MongoDB CDC topics are created with + // `.`. This code is using CDC topics, so ensuring that a `.` is used for alerts events + // lines everything up as expected. + topicPrefix := strings.ReplaceAll(commonConfig.KafkaTopicPrefix, "-", ".") + prefixedTopics := make([]string, 0, len(config.KafkaAlertsTopics)) + for _, topic := range config.KafkaAlertsTopics { + prefixedTopics = append(prefixedTopics, topicPrefix+topic) + } + + alerts := s.dataStore.NewAlertsRepository() + dataRepo := s.dataStore.NewDataRepository() + s.Logger().WithField("permissionClient", s.permissionClient).Debug("yo!") + ec := &dataEvents.Consumer{ + Alerts: alerts, + Data: dataRepo, + DeviceTokens: s.AuthClient(), + Evaluator: dataEvents.NewAlertsEvaluator(alerts, dataRepo, s.permissionClient, s.AuthClient()), + Permissions: s.permissionClient, + Pusher: s.pusher, + Tokens: s.AuthClient(), + Logger: s.Logger(), + } + + runnerCfg := dataEvents.SaramaRunnerConfig{ + Brokers: commonConfig.KafkaBrokers, + GroupID: config.KafkaAlertsGroupID, + Logger: s.Logger(), + Topics: prefixedTopics, + Sarama: commonConfig.SaramaConfig, + MessageConsumer: &dataEvents.AlertsEventsConsumer{ + Consumer: ec, + }, + } + runner := &dataEvents.SaramaRunner{Config: runnerCfg} + if err := runner.Initialize(); err != nil { + return errors.Wrap(err, "Unable to initialize alerts events handler runner") + } + s.alertsEventsHandler = runner + + return nil +}