Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(eap): Add configuration for attributes meta #6162

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
version: v1
kind: readable_storage
name: spans_attributes_meta

storage:
key: spans_attributes_meta
set_key: events_analytics_platform

readiness_state: partial

schema:
columns:
[
{ name: organization_id, type: UInt, args: { size: 64 } },
{ name: attribute_type, type: String },
{ name: attribute_key, type: String },
{ name: attribute_value, type: String },
{ name: timestamp, type: DateTime },
{ name: retention_days, type: UInt, args: { size: 16 } },
{ name: count, type: AggregateFunction, args: { func: sum, arg_types: [{ type: Float, args: { size: 64 } }] } }
]
local_table_name: spans_attributes_meta_local
dist_table_name: spans_attributes_meta_dist
partition_format: [date]
allocation_policies:
- name: ConcurrentRateLimitAllocationPolicy
args:
required_tenant_types:
- organization_id
- referrer
default_config_overrides:
is_enforced: 0
evanh marked this conversation as resolved.
Show resolved Hide resolved
- name: ReferrerGuardRailPolicy
args:
required_tenant_types:
- referrer
default_config_overrides:
is_enforced: 0
is_active: 0
evanh marked this conversation as resolved.
Show resolved Hide resolved
- name: BytesScannedRejectingPolicy
args:
required_tenant_types:
- organization_id
- referrer
default_config_overrides:
is_active: 0
is_enforced: 0
evanh marked this conversation as resolved.
Show resolved Hide resolved

query_processors:
- processor: TableRateLimit
- processor: TupleUnaliaser

mandatory_condition_checkers:
- condition: OrgIdEnforcer
args:
field_name: organization_id
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
from __future__ import annotations

from typing import Sequence

from snuba.clickhouse.columns import AggregateFunction, Column, DateTime, String, UInt
from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations, table_engines
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import OperationTarget, SqlOperation
from snuba.utils.constants import ATTRIBUTE_BUCKETS

META_KEY_QUERY_TEMPLATE = """
SELECT
organization_id,
attribute_key,
{attribute_value} AS attribute_value,
toMonday(start_timestamp) AS timestamp,
retention_days,
sumState(cast(1, 'UInt64')) AS count
FROM eap_spans_local
LEFT ARRAY JOIN
arrayConcat({key_columns}) AS attribute_key,
arrayConcat({value_columns}) AS attr_value
GROUP BY
organization_id,
attribute_key,
attribute_value,
timestamp,
retention_days
"""


class Migration(migration.ClickhouseNodeMigration):
"""
This migration creates a table meant to store just the attributes seen in a particular org.
The table is populated by a separate materialized view for each type of attribute.
"""

blocking = False
storage_set_key = StorageSetKey.EVENTS_ANALYTICS_PLATFORM
granularity = "8192"

value_types = ["str", "num"]

meta_view_name = "spans_attributes_{attribute_type}_meta_mv"
meta_local_table_name = "spans_attributes_meta_local"
meta_dist_table_name = "spans_attributes_meta_dist"
meta_table_columns: Sequence[Column[Modifiers]] = [
Column("organization_id", UInt(64)),
Column("attribute_type", String()),
Column("attribute_key", String()),
Column("attribute_value", String()),
Column("timestamp", DateTime(modifiers=Modifiers(codecs=["DoubleDelta"]))),
Column("retention_days", UInt(16)),
Column("count", AggregateFunction("sum", [UInt(64)])),
]

def forwards_ops(self) -> Sequence[SqlOperation]:
create_table_ops: list[SqlOperation] = [
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.meta_local_table_name,
engine=table_engines.AggregatingMergeTree(
storage_set=self.storage_set_key,
primary_key="(organization_id, attribute_key)",
order_by="(organization_id, attribute_key, attribute_value, timestamp)",
partition_by="toMonday(timestamp)",
settings={
"index_granularity": self.granularity,
# Since the partitions contain multiple retention periods, need to ensure
# that rows within partitions are dropped
"ttl_only_drop_parts": 0,
},
ttl="timestamp + toIntervalDay(retention_days)",
),
columns=self.meta_table_columns,
target=OperationTarget.LOCAL,
),
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.meta_dist_table_name,
engine=table_engines.Distributed(
local_table_name=self.meta_local_table_name, sharding_key=None
),
columns=self.meta_table_columns,
target=OperationTarget.DISTRIBUTED,
),
]

materialized_view_ops: list[SqlOperation] = []
for value_type in self.value_types:
attribute_value = "attr_value" if value_type == "str" else "''"

key_columns = ",".join(
[f"mapKeys(attr_{value_type}_{i})" for i in range(ATTRIBUTE_BUCKETS)]
)
value_columns = ",".join(
[f"mapValues(attr_{value_type}_{i})" for i in range(ATTRIBUTE_BUCKETS)]
)

materialized_view_ops.append(
operations.CreateMaterializedView(
storage_set=self.storage_set_key,
view_name=self.meta_view_name.format(attribute_type=value_type),
columns=self.meta_table_columns,
destination_table_name=self.meta_local_table_name,
target=OperationTarget.LOCAL,
query=META_KEY_QUERY_TEMPLATE.format(
attribute_value=attribute_value,
key_columns=key_columns,
value_columns=value_columns,
),
),
)

return create_table_ops + materialized_view_ops

def backwards_ops(self) -> Sequence[SqlOperation]:
ops: Sequence[SqlOperation] = [
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.meta_view_name.format(attribute_type=value_type),
target=OperationTarget.LOCAL,
)
for value_type in self.value_types
] + [
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.meta_local_table_name,
target=OperationTarget.LOCAL,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.meta_dist_table_name,
target=OperationTarget.DISTRIBUTED,
),
]
return ops
3 changes: 3 additions & 0 deletions snuba/utils/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@

#: Metrics granularities for which a materialized view exist, in ascending order
GRANULARITIES_AVAILABLE = (10, 60, 60 * 60, 24 * 60 * 60)

# Number of EAP Span buckets
ATTRIBUTE_BUCKETS = 20
8 changes: 5 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ def create_databases() -> None:
storage_sets=cluster["storage_sets"],
single_node=cluster["single_node"],
cluster_name=cluster["cluster_name"] if "cluster_name" in cluster else None,
distributed_cluster_name=cluster["distributed_cluster_name"]
if "distributed_cluster_name" in cluster
else None,
distributed_cluster_name=(
cluster["distributed_cluster_name"]
if "distributed_cluster_name" in cluster
else None
),
)

database_name = cluster["database"]
Expand Down
125 changes: 125 additions & 0 deletions tests/test_eap_spans_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from __future__ import annotations

import uuid
from datetime import UTC, datetime, timedelta
from typing import Any

import pytest
import simplejson as json

from snuba import settings
from snuba.datasets.entities.entity_key import EntityKey
from snuba.datasets.entities.factory import get_entity
from tests.base import BaseApiTest
from tests.helpers import write_raw_unprocessed_events


@pytest.mark.clickhouse_db
@pytest.mark.redis_db
class TestEAPSpansAPI(BaseApiTest):
def post(self, url: str, data: str) -> Any:
return self.app.post(url, data=data, headers={"referer": "test"})

def basic_span_data(self, start_time: datetime) -> dict[str, Any]:
# missing span_id, parent_span_id, trace_id
return {
"duration_ms": 10,
"exclusive_time_ms": 9,
"is_segment": True,
"project_id": 70156,
"organization_id": 1,
"received": datetime.now().timestamp(),
"retention_days": 90,
"segment_id": "1234567890123456",
"sentry_tags": {
"transaction.method": "GET",
"user": "user1",
"release": "release1234",
},
"start_timestamp": start_time.strftime(settings.PAYLOAD_DATETIME_FORMAT),
"start_timestamp_ms": int(start_time.timestamp() * 1000),
"start_timestamp_precise": start_time.timestamp(),
"end_timestamp": (start_time + timedelta(milliseconds=10)).strftime(
settings.PAYLOAD_DATETIME_FORMAT
),
"end_timestamp_precise": (
start_time + timedelta(milliseconds=10)
).timestamp(),
}

@pytest.fixture(autouse=True)
def setup_teardown(self, clickhouse_db: None, redis_db: None) -> None:
self.trace_id = uuid.UUID("7400045b-25c4-43b8-8591-4600aa83adaa")
self.raw_span_id = "7400045b25c443b%d"

spans = []
self.start_time = datetime.now(tz=UTC) - timedelta(minutes=10)
for i in range(10):
span_id = self.raw_span_id % i
parent_span_id = self.raw_span_id % (i - 1) if i > 0 else None
start_time = self.start_time + timedelta(minutes=i)
span_data = self.basic_span_data(start_time)
span_data["trace_id"] = str(self.trace_id)
span_data["span_id"] = span_id
span_data["parent_span_id"] = parent_span_id
spans.append(span_data)

self.project_id = spans[0]["project_id"]
self.organization_id = spans[0]["organization_id"]
spans_storage = get_entity(EntityKey.EAP_SPANS).get_writable_storage()
assert spans_storage is not None
write_raw_unprocessed_events(spans_storage, spans)

def test_simple_query(self) -> None:
response = self.post(
"/events_analytics_platform/snql",
data=json.dumps(
{
"query": f"""MATCH (eap_spans)
SELECT span_id
WHERE organization_id = {self.organization_id}
AND start_timestamp >= toDateTime('{(self.start_time - timedelta(minutes=20)).isoformat()}')
AND start_timestamp < toDateTime('{(self.start_time + timedelta(minutes=20)).isoformat()}')
AND trace_id = '{self.trace_id}'
""",
"referrer": "myreferrer",
"debug": True,
"tenant_ids": {
"referrer": "r",
"organization_id": self.organization_id,
},
}
),
)
data = json.loads(response.data)

assert response.status_code == 200, data
assert len(data["data"]) == 10
span_ids = set([span["span_id"] for span in data["data"]])
for i in range(10):
assert self.raw_span_id % i in span_ids

def test_attribute_meta_query(self) -> None:
response = self.post(
"/events_analytics_platform/snql",
data=json.dumps(
{
"query": f"""MATCH STORAGE(spans_attributes_meta)
SELECT attribute_value BY attribute_value
WHERE organization_id = {self.organization_id}
AND attribute_key = 'release'
""",
"referrer": "myreferrer",
"debug": True,
"tenant_ids": {
"referrer": "r",
"organization_id": self.organization_id,
},
}
),
)
data = json.loads(response.data)

assert response.status_code == 200, data
assert len(data["data"]) == 1
assert data["data"][0]["attribute_value"] == "release1234"
Loading