Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor grouping to share logic, track duplicates #858

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 114 additions & 89 deletions src/seer/grouping/grouping.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import sentry_sdk
import sqlalchemy.orm
import torch
from pydantic import BaseModel, ValidationInfo, field_validator
from pydantic import BaseModel, Field, ValidationInfo, field_validator
from sentence_transformers import SentenceTransformer
from sentry_sdk import metrics
from sqlalchemy.dialects.postgresql import insert

from seer.db import DbGroupingRecord, Session
Expand Down Expand Up @@ -45,7 +46,13 @@ class GroupingResponse(BaseModel):


class SimilarityResponse(BaseModel):
responses: List[GroupingResponse]
responses: List[GroupingResponse] = Field(default_factory=list)

# This will happen naturally -- retries on inflight messages, halting problems related to serialization
# in processing queues, etc, etc, etc. Do not assume that duplicates are necessarily a bug to be fixed,
# they are the natural and healthy consequence of trade offs in system design.
# This value is mostly for metrics and awareness, it does not indicate failure.
had_duplicate: bool = False


class SimilarityBenchmarkResponse(BaseModel):
Expand Down Expand Up @@ -163,14 +170,17 @@ def query_nearest_k_neighbors(
distance: float,
k: int,
) -> List[tuple[DbGroupingRecord, float]]:
cos_dist = DbGroupingRecord.stacktrace_embedding.cosine_distance(embedding).label(
"distance"
)
return (
session.query(
DbGroupingRecord,
DbGroupingRecord.stacktrace_embedding.cosine_distance(embedding).label("distance"),
cos_dist,
)
.filter(
DbGroupingRecord.project_id == project_id,
DbGroupingRecord.stacktrace_embedding.cosine_distance(embedding) <= distance,
cos_dist <= distance,
DbGroupingRecord.hash != hash,
)
.order_by("distance")
Expand All @@ -188,59 +198,11 @@ def get_nearest_neighbors(self, issue: GroupingRequest) -> SimilarityResponse:
:return: A SimilarityResponse object containing a list of GroupingResponse objects with the nearest group IDs,
stacktrace similarity scores, message similarity scores, and grouping flags.
"""
with Session() as session:
embedding = self.encode_text(issue.stacktrace).astype("float32")

results = self.query_nearest_k_neighbors(
session,
embedding,
issue.project_id,
issue.hash,
NN_SIMILARITY_DISTANCE if issue.read_only else issue.threshold,
issue.k,
)

# If no existing groups within the threshold, insert the request as a new GroupingRecord
if not (issue.read_only or any(distance <= issue.threshold for _, distance in results)):
logger.info(
"insert_new_grouping_record",
extra={
"input_hash": issue.hash,
"project_id": issue.project_id,
"stacktrace_length": len(issue.stacktrace),
},
)
self.insert_new_grouping_record(session, issue, embedding)
session.commit()

similarity_response = SimilarityResponse(responses=[])
for record, distance in results:
message_similarity_score = difflib.SequenceMatcher(
None, issue.message, record.message
).ratio()
should_group = distance <= issue.threshold

if should_group:
logger.info(
"should_group",
extra={
"input_hash": issue.hash,
"stacktrace_length": len(issue.stacktrace),
"parent_hash": record.hash,
"project_id": issue.project_id,
},
)
responses = self.bulk_nearest_neighbor([issue])
if responses:
return responses[0]

similarity_response.responses.append(
GroupingResponse(
parent_hash=record.hash,
stacktrace_distance=distance,
message_distance=1.0 - message_similarity_score,
should_group=should_group,
)
)

return similarity_response
raise AssertionError("bulk_nearest_neighbor() didn't return a response!")

@sentry_sdk.tracing.trace
def bulk_create_and_insert_grouping_records(
Expand All @@ -252,9 +214,28 @@ def bulk_create_and_insert_grouping_records(
if len(data.data) != len(data.stacktrace_list):
return BulkCreateGroupingRecordsResponse(success=False, groups_with_neighbor={})

groups_with_neighbor = self.insert_batch_grouping_records(data)
responses: list[SimilarityResponse] = self.bulk_nearest_neighbor(
[
GroupingRequest(
project_id=create_group.project_id,
stacktrace=stacktrace,
message=create_group.message,
hash=create_group.hash,
exception_type=create_group.exception_type,
k=1,
)
for create_group, stacktrace in zip(data.data, data.stacktrace_list)
],
data.encode_stacktrace_batch_size,
)

return BulkCreateGroupingRecordsResponse(
success=True, groups_with_neighbor=groups_with_neighbor
success=True,
groups_with_neighbor={
str(request.group_id): response.responses[0]
for request, response in zip(data.data, responses)
if response.responses
},
)

@sentry_sdk.tracing.trace
Expand Down Expand Up @@ -314,42 +295,86 @@ def insert_batch_grouping_records(
return groups_with_neighbor

@sentry_sdk.tracing.trace
def insert_new_grouping_record(
self, session, issue: GroupingRequest, embedding: np.ndarray
) -> None:
def bulk_nearest_neighbor(
self,
requests: List[GroupingRequest],
encode_stacktrace_batch_size: int = 1,
) -> list[SimilarityResponse]:
"""
Inserts a new GroupingRecord into the database if the group_hash does not already exist.
If new grouping record was created, return the id.
Given the grouping requests, encodes and searches for nearest neighbors according
to that request. When that request is not read_only and no neighbors within the threshold
is found, a new one is created. When duplicates are encountered, that is noted in the response.
All neighbors within the given threshold are returned.

:param session: The database session.
:param issue: The issue to insert as a new GroupingRecord.
:param embedding: The embedding of the stacktrace.
Responses are always in order of the requests, and of the same size.
"""
existing_record = (
session.query(DbGroupingRecord)
.filter_by(hash=issue.hash, project_id=issue.project_id)
.first()
result: list[SimilarityResponse] = []
embeddings = self.encode_multiple_texts(
[r.stacktrace for r in requests], encode_stacktrace_batch_size
)
with Session() as session:
request: GroupingRequest
embedding: np.ndarray
for embedding, request in zip(embeddings, requests):
embedding = embedding.astype("float32")
nearest_neighbors = self.query_nearest_k_neighbors(
session,
embedding,
request.project_id,
request.hash,
request.threshold if not request.read_only else NN_GROUPING_DISTANCE,
request.k,
)

if existing_record is None:
new_record = GroupingRecord(
project_id=issue.project_id,
message=issue.message,
stacktrace_embedding=embedding,
hash=issue.hash,
error_type=issue.exception_type,
).to_db_model()
session.add(new_record)
else:
logger.info(
"group_already_exists_in_seer_db",
extra={
"existing_hash": existing_record.hash,
"project_id": issue.project_id,
"stacktrace_length": len(issue.stacktrace),
"input_hash": issue.hash,
},
)
threshold_matches = any(
distance <= request.threshold for _, distance in nearest_neighbors
)
response = SimilarityResponse()
result.append(response)
if not request.read_only and not threshold_matches:
response.had_duplicate = self.upsert_grouping_record(
embedding, request, session
)

for record, distance in nearest_neighbors:
message_similarity_score = difflib.SequenceMatcher(
None, request.message, record.message
).ratio()
response.responses.append(
GroupingResponse(
parent_hash=record.hash,
stacktrace_distance=distance,
message_distance=1.0 - message_similarity_score,
should_group=distance <= request.threshold,
)
)

session.commit()
return result

def upsert_grouping_record(
self, embedding: np.ndarray, request: GroupingRequest, session: sqlalchemy.orm.Session
) -> bool:
insert_stmt = insert(DbGroupingRecord).values(
project_id=request.project_id,
message=request.message,
error_type=request.exception_type,
hash=request.hash,
stacktrace_embedding=embedding,
)
row = session.execute(
insert_stmt.on_conflict_do_nothing(
index_elements=(DbGroupingRecord.project_id, DbGroupingRecord.hash)
).returning(DbGroupingRecord.id)
)
had_duplicate = row.first() is None

if had_duplicate:
# this isn't necessarily a huge deal unless unexpected increases occur not correlated with
# volume.
metrics.incr("seer.grouping.grouping_duplicate")

return had_duplicate

@sentry_sdk.tracing.trace
def delete_grouping_records_for_project(self, project_id: int) -> bool:
Expand Down
54 changes: 23 additions & 31 deletions tests/seer/grouping/test_grouping.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import unittest
import uuid

from johen import change_watcher
from johen.pytest import parametrize
Expand Down Expand Up @@ -33,7 +32,7 @@ def test_get_nearest_neighbors_has_neighbor(self):
message="message",
hash="QYK7aNYNnp5FgSev9Np1soqb1SdtyahD",
)
grouping_lookup().insert_new_grouping_record(session, grouping_request, embedding)
grouping_lookup().upsert_grouping_record(embedding, grouping_request, session)
session.commit()

grouping_request = GroupingRequest(
Expand Down Expand Up @@ -121,10 +120,10 @@ def test_insert_new_grouping_record_group_record_exists(self):
hash="QYK7aNYNnp5FgSev9Np1soqb1SdtyahD",
)
# Insert the grouping record
grouping_lookup().insert_new_grouping_record(session, grouping_request, embedding)
grouping_lookup().upsert_grouping_record(embedding, grouping_request, session)
session.commit()
# Re-insert the grouping record
grouping_lookup().insert_new_grouping_record(session, grouping_request, embedding)
grouping_lookup().upsert_grouping_record(embedding, grouping_request, session)
session.commit()
matching_record = (
session.query(DbGroupingRecord)
Expand All @@ -149,9 +148,9 @@ def test_insert_new_grouping_record_group_record_cross_project(self):
hash="QYK7aNYNnp5FgSev9Np1soqb1SdtyahD",
)
# Insert the grouping record
grouping_lookup().insert_new_grouping_record(session, grouping_request1, embedding)
grouping_lookup().upsert_grouping_record(embedding, grouping_request1, session)
session.commit()
grouping_lookup().insert_new_grouping_record(session, grouping_request2, embedding)
grouping_lookup().upsert_grouping_record(embedding, grouping_request2, session)
session.commit()
matching_record = (
session.query(DbGroupingRecord)
Expand Down Expand Up @@ -292,7 +291,7 @@ def test_bulk_create_and_insert_grouping_records_has_neighbor_in_existing_record
message="message",
hash="QYK7aNYNnp5FgSev9Np1soqb1SdtyahD",
)
grouping_lookup().insert_new_grouping_record(session, grouping_request, embedding)
grouping_lookup().upsert_grouping_record(embedding, grouping_request, session)
session.commit()

# Create record data to attempt to be inserted, create 5 with the stacktrace "stacktrace"
Expand Down Expand Up @@ -414,29 +413,23 @@ def test_delete_grouping_records_for_project(self):


@parametrize(count=1)
def test_GroupingLookup_insert_batch_grouping_records_duplicates(
def test_upsert_grouping_record(
project_1_id: int,
hash_1: str,
orig_record: CreateGroupingRecordData,
grouping_request: CreateGroupingRecordsRequest,
orig_record: GroupingRequest,
):
orig_record.project_id = project_1_id
orig_record.hash = hash_1
project_2_id = project_1_id + 1
hash_2 = hash_1 + "_2"

updated_duplicate = orig_record.copy(update=dict(message=orig_record.message + " updated?"))

grouping_request.data = [
embedding = grouping_lookup().encode_text("some text")
test_records = [
orig_record,
orig_record.copy(update=dict(project_id=project_2_id)),
orig_record.copy(update=dict(hash=hash_2)),
orig_record.copy(update=dict(project_id=(project_1_id + 1))),
orig_record.copy(update=dict(hash=(hash_1 + "_2"))),
# Duplicate of original should not actually update the original
updated_duplicate,
orig_record.copy(update=dict(message=orig_record.message + " updated?")),
]
grouping_request.stacktrace_list = [uuid.uuid4().hex for r in grouping_request.data]

def query_created(record: CreateGroupingRecordData) -> DbGroupingRecord | None:
def query_created(record: GroupingRequest) -> DbGroupingRecord | None:
with Session() as session:
return (
session.query(DbGroupingRecord)
Expand All @@ -451,20 +444,19 @@ def updated_message_for_orig():
return db_record.message
return None

duplicates = 0
with updated_message_for_orig as changed:
grouping_lookup().insert_batch_grouping_records(grouping_request)
with Session() as session:
for record in test_records:
duplicates += int(
grouping_lookup().upsert_grouping_record(embedding, record, session)
)
session.commit()

assert duplicates == 1
assert changed
assert changed.to_value(orig_record.message)

# ensure that atleast a record was made for each item
for item in grouping_request.data:
for item in test_records:
assert query_created(item) is not None

# Again, ensuring that duplicates are ignored
grouping_request.data = [updated_duplicate]
grouping_request.stacktrace_list = ["does not matter" for _ in grouping_request.data]
with updated_message_for_orig as changed:
grouping_lookup().insert_batch_grouping_records(grouping_request)

assert not changed
Loading