diff --git a/src/seer/grouping/grouping.py b/src/seer/grouping/grouping.py index fe06e5e99..f08cbe1bf 100644 --- a/src/seer/grouping/grouping.py +++ b/src/seer/grouping/grouping.py @@ -6,8 +6,9 @@ import sentry_sdk import sqlalchemy.orm import torch -from pydantic import BaseModel, ValidationInfo, field_validator +from pydantic import BaseModel, Field, ValidationInfo, field_validator from sentence_transformers import SentenceTransformer +from sentry_sdk import metrics from sqlalchemy.dialects.postgresql import insert from seer.db import DbGroupingRecord, Session @@ -45,7 +46,13 @@ class GroupingResponse(BaseModel): class SimilarityResponse(BaseModel): - responses: List[GroupingResponse] + responses: List[GroupingResponse] = Field(default_factory=list) + + # This will happen naturally -- retries on inflight messages, halting problems related to serialization + # in processing queues, etc, etc, etc. Do not assume that duplicates are necessarily a bug to be fixed, + # they are the natural and healthy consequence of trade offs in system design. + # This value is mostly for metrics and awareness, it does not indicate failure. + had_duplicate: bool = False class SimilarityBenchmarkResponse(BaseModel): @@ -163,14 +170,17 @@ def query_nearest_k_neighbors( distance: float, k: int, ) -> List[tuple[DbGroupingRecord, float]]: + cos_dist = DbGroupingRecord.stacktrace_embedding.cosine_distance(embedding).label( + "distance" + ) return ( session.query( DbGroupingRecord, - DbGroupingRecord.stacktrace_embedding.cosine_distance(embedding).label("distance"), + cos_dist, ) .filter( DbGroupingRecord.project_id == project_id, - DbGroupingRecord.stacktrace_embedding.cosine_distance(embedding) <= distance, + cos_dist <= distance, DbGroupingRecord.hash != hash, ) .order_by("distance") @@ -188,59 +198,11 @@ def get_nearest_neighbors(self, issue: GroupingRequest) -> SimilarityResponse: :return: A SimilarityResponse object containing a list of GroupingResponse objects with the nearest group IDs, stacktrace similarity scores, message similarity scores, and grouping flags. """ - with Session() as session: - embedding = self.encode_text(issue.stacktrace).astype("float32") - - results = self.query_nearest_k_neighbors( - session, - embedding, - issue.project_id, - issue.hash, - NN_SIMILARITY_DISTANCE if issue.read_only else issue.threshold, - issue.k, - ) - - # If no existing groups within the threshold, insert the request as a new GroupingRecord - if not (issue.read_only or any(distance <= issue.threshold for _, distance in results)): - logger.info( - "insert_new_grouping_record", - extra={ - "input_hash": issue.hash, - "project_id": issue.project_id, - "stacktrace_length": len(issue.stacktrace), - }, - ) - self.insert_new_grouping_record(session, issue, embedding) - session.commit() - - similarity_response = SimilarityResponse(responses=[]) - for record, distance in results: - message_similarity_score = difflib.SequenceMatcher( - None, issue.message, record.message - ).ratio() - should_group = distance <= issue.threshold - - if should_group: - logger.info( - "should_group", - extra={ - "input_hash": issue.hash, - "stacktrace_length": len(issue.stacktrace), - "parent_hash": record.hash, - "project_id": issue.project_id, - }, - ) + responses = self.bulk_nearest_neighbor([issue]) + if responses: + return responses[0] - similarity_response.responses.append( - GroupingResponse( - parent_hash=record.hash, - stacktrace_distance=distance, - message_distance=1.0 - message_similarity_score, - should_group=should_group, - ) - ) - - return similarity_response + raise AssertionError("bulk_nearest_neighbor() didn't return a response!") @sentry_sdk.tracing.trace def bulk_create_and_insert_grouping_records( @@ -252,9 +214,28 @@ def bulk_create_and_insert_grouping_records( if len(data.data) != len(data.stacktrace_list): return BulkCreateGroupingRecordsResponse(success=False, groups_with_neighbor={}) - groups_with_neighbor = self.insert_batch_grouping_records(data) + responses: list[SimilarityResponse] = self.bulk_nearest_neighbor( + [ + GroupingRequest( + project_id=create_group.project_id, + stacktrace=stacktrace, + message=create_group.message, + hash=create_group.hash, + exception_type=create_group.exception_type, + k=1, + ) + for create_group, stacktrace in zip(data.data, data.stacktrace_list) + ], + data.encode_stacktrace_batch_size, + ) + return BulkCreateGroupingRecordsResponse( - success=True, groups_with_neighbor=groups_with_neighbor + success=True, + groups_with_neighbor={ + str(request.group_id): response.responses[0] + for request, response in zip(data.data, responses) + if response.responses + }, ) @sentry_sdk.tracing.trace @@ -314,42 +295,86 @@ def insert_batch_grouping_records( return groups_with_neighbor @sentry_sdk.tracing.trace - def insert_new_grouping_record( - self, session, issue: GroupingRequest, embedding: np.ndarray - ) -> None: + def bulk_nearest_neighbor( + self, + requests: List[GroupingRequest], + encode_stacktrace_batch_size: int = 1, + ) -> list[SimilarityResponse]: """ - Inserts a new GroupingRecord into the database if the group_hash does not already exist. - If new grouping record was created, return the id. + Given the grouping requests, encodes and searches for nearest neighbors according + to that request. When that request is not read_only and no neighbors within the threshold + is found, a new one is created. When duplicates are encountered, that is noted in the response. + All neighbors within the given threshold are returned. - :param session: The database session. - :param issue: The issue to insert as a new GroupingRecord. - :param embedding: The embedding of the stacktrace. + Responses are always in order of the requests, and of the same size. """ - existing_record = ( - session.query(DbGroupingRecord) - .filter_by(hash=issue.hash, project_id=issue.project_id) - .first() + result: list[SimilarityResponse] = [] + embeddings = self.encode_multiple_texts( + [r.stacktrace for r in requests], encode_stacktrace_batch_size ) + with Session() as session: + request: GroupingRequest + embedding: np.ndarray + for embedding, request in zip(embeddings, requests): + embedding = embedding.astype("float32") + nearest_neighbors = self.query_nearest_k_neighbors( + session, + embedding, + request.project_id, + request.hash, + request.threshold if not request.read_only else NN_GROUPING_DISTANCE, + request.k, + ) - if existing_record is None: - new_record = GroupingRecord( - project_id=issue.project_id, - message=issue.message, - stacktrace_embedding=embedding, - hash=issue.hash, - error_type=issue.exception_type, - ).to_db_model() - session.add(new_record) - else: - logger.info( - "group_already_exists_in_seer_db", - extra={ - "existing_hash": existing_record.hash, - "project_id": issue.project_id, - "stacktrace_length": len(issue.stacktrace), - "input_hash": issue.hash, - }, - ) + threshold_matches = any( + distance <= request.threshold for _, distance in nearest_neighbors + ) + response = SimilarityResponse() + result.append(response) + if not request.read_only and not threshold_matches: + response.had_duplicate = self.upsert_grouping_record( + embedding, request, session + ) + + for record, distance in nearest_neighbors: + message_similarity_score = difflib.SequenceMatcher( + None, request.message, record.message + ).ratio() + response.responses.append( + GroupingResponse( + parent_hash=record.hash, + stacktrace_distance=distance, + message_distance=1.0 - message_similarity_score, + should_group=distance <= request.threshold, + ) + ) + + session.commit() + return result + + def upsert_grouping_record( + self, embedding: np.ndarray, request: GroupingRequest, session: sqlalchemy.orm.Session + ) -> bool: + insert_stmt = insert(DbGroupingRecord).values( + project_id=request.project_id, + message=request.message, + error_type=request.exception_type, + hash=request.hash, + stacktrace_embedding=embedding, + ) + row = session.execute( + insert_stmt.on_conflict_do_nothing( + index_elements=(DbGroupingRecord.project_id, DbGroupingRecord.hash) + ).returning(DbGroupingRecord.id) + ) + had_duplicate = row.first() is None + + if had_duplicate: + # this isn't necessarily a huge deal unless unexpected increases occur not correlated with + # volume. + metrics.incr("seer.grouping.grouping_duplicate") + + return had_duplicate @sentry_sdk.tracing.trace def delete_grouping_records_for_project(self, project_id: int) -> bool: diff --git a/tests/seer/grouping/test_grouping.py b/tests/seer/grouping/test_grouping.py index 923d6d10f..4a5bf280e 100644 --- a/tests/seer/grouping/test_grouping.py +++ b/tests/seer/grouping/test_grouping.py @@ -1,5 +1,4 @@ import unittest -import uuid from johen import change_watcher from johen.pytest import parametrize @@ -33,7 +32,7 @@ def test_get_nearest_neighbors_has_neighbor(self): message="message", hash="QYK7aNYNnp5FgSev9Np1soqb1SdtyahD", ) - grouping_lookup().insert_new_grouping_record(session, grouping_request, embedding) + grouping_lookup().upsert_grouping_record(embedding, grouping_request, session) session.commit() grouping_request = GroupingRequest( @@ -121,10 +120,10 @@ def test_insert_new_grouping_record_group_record_exists(self): hash="QYK7aNYNnp5FgSev9Np1soqb1SdtyahD", ) # Insert the grouping record - grouping_lookup().insert_new_grouping_record(session, grouping_request, embedding) + grouping_lookup().upsert_grouping_record(embedding, grouping_request, session) session.commit() # Re-insert the grouping record - grouping_lookup().insert_new_grouping_record(session, grouping_request, embedding) + grouping_lookup().upsert_grouping_record(embedding, grouping_request, session) session.commit() matching_record = ( session.query(DbGroupingRecord) @@ -149,9 +148,9 @@ def test_insert_new_grouping_record_group_record_cross_project(self): hash="QYK7aNYNnp5FgSev9Np1soqb1SdtyahD", ) # Insert the grouping record - grouping_lookup().insert_new_grouping_record(session, grouping_request1, embedding) + grouping_lookup().upsert_grouping_record(embedding, grouping_request1, session) session.commit() - grouping_lookup().insert_new_grouping_record(session, grouping_request2, embedding) + grouping_lookup().upsert_grouping_record(embedding, grouping_request2, session) session.commit() matching_record = ( session.query(DbGroupingRecord) @@ -292,7 +291,7 @@ def test_bulk_create_and_insert_grouping_records_has_neighbor_in_existing_record message="message", hash="QYK7aNYNnp5FgSev9Np1soqb1SdtyahD", ) - grouping_lookup().insert_new_grouping_record(session, grouping_request, embedding) + grouping_lookup().upsert_grouping_record(embedding, grouping_request, session) session.commit() # Create record data to attempt to be inserted, create 5 with the stacktrace "stacktrace" @@ -414,29 +413,23 @@ def test_delete_grouping_records_for_project(self): @parametrize(count=1) -def test_GroupingLookup_insert_batch_grouping_records_duplicates( +def test_upsert_grouping_record( project_1_id: int, hash_1: str, - orig_record: CreateGroupingRecordData, - grouping_request: CreateGroupingRecordsRequest, + orig_record: GroupingRequest, ): orig_record.project_id = project_1_id orig_record.hash = hash_1 - project_2_id = project_1_id + 1 - hash_2 = hash_1 + "_2" - - updated_duplicate = orig_record.copy(update=dict(message=orig_record.message + " updated?")) - - grouping_request.data = [ + embedding = grouping_lookup().encode_text("some text") + test_records = [ orig_record, - orig_record.copy(update=dict(project_id=project_2_id)), - orig_record.copy(update=dict(hash=hash_2)), + orig_record.copy(update=dict(project_id=(project_1_id + 1))), + orig_record.copy(update=dict(hash=(hash_1 + "_2"))), # Duplicate of original should not actually update the original - updated_duplicate, + orig_record.copy(update=dict(message=orig_record.message + " updated?")), ] - grouping_request.stacktrace_list = [uuid.uuid4().hex for r in grouping_request.data] - def query_created(record: CreateGroupingRecordData) -> DbGroupingRecord | None: + def query_created(record: GroupingRequest) -> DbGroupingRecord | None: with Session() as session: return ( session.query(DbGroupingRecord) @@ -451,20 +444,19 @@ def updated_message_for_orig(): return db_record.message return None + duplicates = 0 with updated_message_for_orig as changed: - grouping_lookup().insert_batch_grouping_records(grouping_request) + with Session() as session: + for record in test_records: + duplicates += int( + grouping_lookup().upsert_grouping_record(embedding, record, session) + ) + session.commit() + assert duplicates == 1 assert changed assert changed.to_value(orig_record.message) # ensure that atleast a record was made for each item - for item in grouping_request.data: + for item in test_records: assert query_created(item) is not None - - # Again, ensuring that duplicates are ignored - grouping_request.data = [updated_duplicate] - grouping_request.stacktrace_list = ["does not matter" for _ in grouping_request.data] - with updated_message_for_orig as changed: - grouping_lookup().insert_batch_grouping_records(grouping_request) - - assert not changed