diff --git a/.github/workflows/docker.yaml b/.github/workflows/docker.yaml index ec3fc1fd..85f9803f 100644 --- a/.github/workflows/docker.yaml +++ b/.github/workflows/docker.yaml @@ -63,7 +63,7 @@ jobs: - name: Build and push uses: docker/build-push-action@v4 with: - platforms: linux/amd64, linux/arm64 + platforms: linux/amd64 push: true build-args: | GIT_VERSION=${{ github.ref_type == 'tag' && github.ref_name || github.event.pull_request.head.sha || github.sha }} diff --git a/clients/mist_client.go b/clients/mist_client.go index 314be7f2..7fc6c135 100644 --- a/clients/mist_client.go +++ b/clients/mist_client.go @@ -393,6 +393,7 @@ func (mc *MistClient) authorize(unauthResp string) error { } func (mc *MistClient) sendCommandToMist(command interface{}) (string, error) { + glog.Infof("Sending command to Mist") c, err := commandToString(command) if err != nil { return "", err @@ -403,15 +404,18 @@ func (mc *MistClient) sendCommandToMist(command interface{}) (string, error) { return "", err } req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + glog.Infof("Prepared command to send, sedning") resp, err := metrics.MonitorRequest(metrics.Metrics.MistClient, mistRetryableClient, req) if err != nil { return "", err } + glog.Infof("Done Sending command to Mist") defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { return "", err } + glog.Infof("Sending command to Mist 2") return string(body), err } diff --git a/handlers/accesscontrol/access-control.go b/handlers/accesscontrol/access-control.go index 18619e25..dc584216 100644 --- a/handlers/accesscontrol/access-control.go +++ b/handlers/accesscontrol/access-control.go @@ -124,18 +124,23 @@ func (ac *AccessControlHandlersCollection) periodicRefreshIntervalCache(mapic mi time.Sleep(5 * time.Second) ac.mutex.Lock() refreshIntervalCache.mux.Lock() + var keysToInvalidate []string for key := range refreshIntervalCache.data { if time.Since(refreshIntervalCache.data[key].LastRefresh) > time.Duration(refreshIntervalCache.data[key].RefreshInterval)*time.Second { refreshIntervalCache.data[key].LastRefresh = time.Now() - mapic.InvalidateAllSessions(key) + keysToInvalidate = append(keysToInvalidate, key) for cachedAccessKey := range ac.cache[key] { delete(ac.cache[key], cachedAccessKey) } break } } - ac.mutex.Unlock() refreshIntervalCache.mux.Unlock() + ac.mutex.Unlock() + + for _, key := range keysToInvalidate { + mapic.InvalidateAllSessions(key) + } } }() } @@ -170,6 +175,7 @@ func NewAccessControlHandlersCollection(cli config.Cli, mapic mistapiconnector.I } func (ac *AccessControlHandlersCollection) HandleUserNew(ctx context.Context, payload *misttriggers.UserNewPayload) (bool, error) { + glog.Infof("Handling USER_NEW trigger payload=%v", payload) playbackID := payload.StreamName[strings.Index(payload.StreamName, "+")+1:] ctx = log.WithLogValues(ctx, "playback_id", playbackID) @@ -189,6 +195,7 @@ func (ac *AccessControlHandlersCollection) HandleUserNew(ctx context.Context, pa func (ac *AccessControlHandlersCollection) IsAuthorized(ctx context.Context, playbackID string, payload *misttriggers.UserNewPayload) (allowed bool, err error) { + glog.Infof("Handling IsAuthorized trigger playbackID=%s, payload=%v", playbackID, payload) if payload.Origin == "null" && payload.Referer == "" { // Allow redirects without caching match, _ := regexp.MatchString(`(?:prod|staging)-.*catalyst-\d+`, payload.Host) @@ -216,10 +223,12 @@ func (ac *AccessControlHandlersCollection) IsAuthorized(ctx context.Context, pla Inc() }() allowed, err = ac.isAuthorized(ctx, playbackID, payload) + glog.Infof("isAuthorized handled playbackID=%s, allowed=%v, err=%v", playbackID, allowed, err) return } func (ac *AccessControlHandlersCollection) isAuthorized(ctx context.Context, playbackID string, payload *misttriggers.UserNewPayload) (bool, error) { + glog.Infof("Handling AccessControlHandlersCollection.isAuthorized() trigger payload=%v", payload) webhookHeaders := make(map[string]string) webhookHeaders["User-Agent"] = payload.UserAgent @@ -263,6 +272,7 @@ func (ac *AccessControlHandlersCollection) isAuthorized(ctx context.Context, pla return false, err } cacheKey = "accessKey_" + hashCacheKey + glog.Infof("Produced cacheKey from accessKey: %s", cacheKey) } else if jwt != "" { for _, blocked := range ac.blockedJWTs { if jwt == blocked { @@ -284,6 +294,7 @@ func (ac *AccessControlHandlersCollection) isAuthorized(ctx context.Context, pla return false, err } cacheKey = "jwtPubKey_" + hashCacheKey + glog.Infof("Produced cacheKey from JWT: %s", cacheKey) } body, err := json.Marshal(acReq) @@ -302,6 +313,7 @@ func (ac *AccessControlHandlersCollection) isAuthorized(ctx context.Context, pla // checkViewerLimit is used to limit viewers per user globally (as configured with Gate API) func (ac *AccessControlHandlersCollection) checkViewerLimit(playbackID string) bool { + glog.Infof("checkViewerLimit playbackID=%s", playbackID) viewerLimitCache.mux.RLock() defer viewerLimitCache.mux.RUnlock() @@ -332,6 +344,7 @@ func (ac *AccessControlHandlersCollection) checkViewerLimit(playbackID string) b } func (ac *AccessControlHandlersCollection) refreshConcurrentViewerCache(playbackID string) { + glog.Infof("refreshConcurrentViewerCache playbackID=%s", playbackID) viewerLimitCache.mux.RLock() viewerLimit, ok := viewerLimitCache.data[playbackID] viewerLimitCache.mux.RUnlock() @@ -373,34 +386,42 @@ func (ac *AccessControlHandlersCollection) refreshConcurrentViewerCache(playback } func (ac *AccessControlHandlersCollection) GetPlaybackAccessControlInfo(ctx context.Context, playbackID, cacheKey string, requestBody []byte) (bool, error) { + glog.Infof("GetPlaybackAccessControlInfo playbackID=%s, cacheKey=%s, requestBody=%v", playbackID, cacheKey, requestBody) ac.mutex.RLock() + glog.Infof("ac.mutex.RLock()") entry := ac.cache[playbackID][cacheKey] ac.mutex.RUnlock() if isExpired(entry) { log.V(7).LogCtx(ctx, "Cache expired", "cache_key", cacheKey) + glog.Infof("Cache expired, calling cachePlaybackAccessControlInfo") err := ac.cachePlaybackAccessControlInfo(playbackID, cacheKey, requestBody) if err != nil { return false, err } + glog.Infof("Cache expired, cached new playback access control info") } else if isStale(entry) { log.V(7).LogCtx(ctx, "Cache stale", "cache_key", cacheKey) + glog.Infof("Cache stale") go func() { ac.mutex.RLock() stillStale := isStale(ac.cache[playbackID][cacheKey]) ac.mutex.RUnlock() if stillStale { + glog.Infof("Cache still stale, calling cachePlaybackAccessControlInfo") err := ac.cachePlaybackAccessControlInfo(playbackID, cacheKey, requestBody) if err != nil { log.LogCtx(ctx, "Error caching playback access control info", "err", err) } + glog.Infof("Cache still stale, cached new playback access control info") } }() } ac.mutex.RLock() + glog.Infof("ac.mutex.RLock() 2") entry = ac.cache[playbackID][cacheKey] ac.mutex.RUnlock() @@ -421,14 +442,17 @@ func (ac *AccessControlHandlersCollection) ProduceHashCacheKey(cachePayload Play } func isExpired(entry *PlaybackAccessControlEntry) bool { + glog.Infof("isExpired") return entry == nil || time.Now().After(entry.Stale) } func isStale(entry *PlaybackAccessControlEntry) bool { + glog.Infof("isStale") return entry != nil && time.Now().After(entry.MaxAge) && !isExpired(entry) } func (ac *AccessControlHandlersCollection) cachePlaybackAccessControlInfo(playbackID, cacheKey string, requestBody []byte) error { + glog.Infof("cachePlaybackAccessControlInfo, playbackID=%s, cacheKey=%s, requestBody=%v", playbackID, cacheKey, requestBody) allow, gateConfig, err := ac.gateClient.QueryGate(requestBody) refreshInterval := gateConfig.RefreshInterval diff --git a/handlers/misttriggers/user_new.go b/handlers/misttriggers/user_new.go index 174bce74..36bcf3ed 100644 --- a/handlers/misttriggers/user_new.go +++ b/handlers/misttriggers/user_new.go @@ -64,6 +64,8 @@ func (d *MistCallbackHandlersCollection) TriggerUserNew(ctx context.Context, w h payload, err := ParseUserNewPayload(body) cookies := req.Cookies() + glog.Infof("Got USER_NEW trigger sessionId=%q payload=%v", payload.SessionID, body) + var accessKey, jwt string for _, cookie := range cookies { switch cookie.Name { diff --git a/mapic/mistapiconnector_app.go b/mapic/mistapiconnector_app.go index a6ed8677..dbc12eb6 100644 --- a/mapic/mistapiconnector_app.go +++ b/mapic/mistapiconnector_app.go @@ -224,7 +224,9 @@ func (mc *mac) StopSessions(playbackID string) { } func (mc *mac) InvalidateAllSessions(playbackID string) { + glog.Infof("Invalidating Mist Sessions") mc.invalidateAllSessions(playbackID) + glog.Infof("Done Invalidating Mist Sessions") } func (mc *mac) handleStreamBuffer(ctx context.Context, payload *misttriggers.StreamBufferPayload) error { diff --git a/metrics/monitor_request.go b/metrics/monitor_request.go index 537a1a24..441a3a9f 100644 --- a/metrics/monitor_request.go +++ b/metrics/monitor_request.go @@ -3,6 +3,7 @@ package metrics import ( "context" "fmt" + "github.com/golang/glog" "net/http" "time" @@ -20,11 +21,14 @@ func MonitorRequest(clientMetrics ClientMetrics, client *http.Client, r *http.Re req := r.WithContext(ctx) start := time.Now() + glog.Infof("client.Do(req)") res, err := client.Do(req) + glog.Infof("Done client.Do(req), err=%v", err) duration := time.Since(start) retries := ctx.Value(RetriesKey).(*Retries) if retries.lastStatusCode >= 400 { + glog.Infof("retries.lastStatusCode >= 400") clientMetrics.FailureCount.WithLabelValues(req.URL.Host, fmt.Sprint(retries.lastStatusCode)).Inc() return res, err } @@ -32,10 +36,12 @@ func MonitorRequest(clientMetrics ClientMetrics, client *http.Client, r *http.Re clientMetrics.RequestDuration.WithLabelValues(req.URL.Host).Observe(duration.Seconds()) clientMetrics.RetryCount.WithLabelValues(req.URL.Host).Set(float64(retries.count)) + glog.Infof("return res, err") return res, err } func HttpRetryHook(ctx context.Context, res *http.Response, err error) (bool, error) { + glog.Infof("HttpRetryHook") retries := ctx.Value(RetriesKey).(*Retries) if res == nil { // TODO: have a better way to represent closed/refused connections and timeouts @@ -47,5 +53,6 @@ func HttpRetryHook(ctx context.Context, res *http.Response, err error) (bool, er } retries.count++ + glog.Infof("Done HttpRetryHook") return retryablehttp.DefaultRetryPolicy(ctx, res, err) }