From 5894e480502be9d762f2fffb342b4bb9d2c89ad7 Mon Sep 17 00:00:00 2001 From: Vivek Aditya Date: Thu, 9 Jan 2025 16:37:36 +0530 Subject: [PATCH 1/4] wip for api changes --- athina/steps/api.py | 62 +++++++++++++++++++++++++++++---------------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/athina/steps/api.py b/athina/steps/api.py index b5d4011..79f14f2 100644 --- a/athina/steps/api.py +++ b/athina/steps/api.py @@ -8,14 +8,6 @@ from jinja2 import Environment -def prepare_input_data(data: Dict[str, Any]) -> Dict[str, Any]: - """Prepare input data by converting complex types to JSON strings.""" - return { - key: json.dumps(value) if isinstance(value, (list, dict)) else value - for key, value in data.items() - } - - def prepare_template_data( env: Environment, template_dict: Optional[Dict[str, str]], @@ -31,6 +23,19 @@ def prepare_template_data( return prepared_dict +def debug_json_structure(body_str: str, error: json.JSONDecodeError) -> dict: + """Analyze JSON structure and identify problematic keys.""" + lines = body_str.split("\n") + error_line_num = error.lineno - 1 + + return { + "original_body": body_str, + "problematic_line": ( + lines[error_line_num] if error_line_num < len(lines) else None + ), + } + + def prepare_body( env: Environment, body_template: Optional[str], input_data: Dict[str, Any] ) -> Optional[str]: @@ -112,31 +117,44 @@ async def execute_async(self, input_data: Any) -> Union[Dict[str, Any], None]: ) # Prepare the environment and input data self.env = self._create_jinja_env() - prepared_input_data = prepare_input_data(input_data) # Prepare request components - prepared_body = prepare_body(self.env, self.body, prepared_input_data) - prepared_headers = prepare_template_data( - self.env, self.headers, prepared_input_data - ) - prepared_params = prepare_template_data( - self.env, self.params, prepared_input_data - ) + prepared_body = prepare_body(self.env, self.body, input_data) + prepared_headers = prepare_template_data(self.env, self.headers, input_data) + prepared_params = prepare_template_data(self.env, self.params, input_data) + # Prepare the URL by rendering the template + prepared_url = self.env.from_string(self.url).render(**input_data) timeout = aiohttp.ClientTimeout(total=self.timeout) for attempt in range(self.retries): try: async with aiohttp.ClientSession(timeout=timeout) as session: - json_body = ( - json.loads(prepared_body, strict=False) - if prepared_body - else None - ) + try: + json_body = ( + json.loads(prepared_body, strict=False) + if prepared_body + else None + ) + except json.JSONDecodeError as e: + debug_info = debug_json_structure(prepared_body, e) + return self._create_step_result( + status="error", + data=json.dumps( + { + "message": f"Failed to parse request body as JSON", + "error_type": "JSONDecodeError", + "error_details": str(e), + "debug_info": debug_info, + }, + indent=2, + ), + start_time=start_time, + ) async with session.request( method=self.method, - url=self.url, + url=prepared_url, headers=prepared_headers, params=prepared_params, json=json_body, From 096a847b1020027e88b29dfc23614dbc63f3ec64 Mon Sep 17 00:00:00 2001 From: Vivek Aditya Date: Sat, 11 Jan 2025 00:43:40 +0530 Subject: [PATCH 2/4] Fixes for retrieval steps --- athina/steps/chroma_retrieval.py | 46 +++++++++++-------------- athina/steps/pinecone_retrieval.py | 48 +++++++++++++++++++------- athina/steps/qdrant_retrieval.py | 16 ++++++--- athina/steps/weaviate_retrieval.py | 55 +++++++++++++++++++++--------- pyproject.toml | 6 ++-- 5 files changed, 106 insertions(+), 65 deletions(-) diff --git a/athina/steps/chroma_retrieval.py b/athina/steps/chroma_retrieval.py index 16c691f..5fadd19 100644 --- a/athina/steps/chroma_retrieval.py +++ b/athina/steps/chroma_retrieval.py @@ -26,7 +26,7 @@ class ChromaRetrieval(Step): port (int): The port of the Chroma server. collection_name (str): The name of the Chroma collection. limit (int): The maximum number of results to fetch. - input_column (str): The column name in the input data. + user_query (str): the query which will be sent to chroma. openai_api_key (str): The OpenAI API key. auth_type (str): The authentication type for the Chroma server (e.g., "token" or "basic"). auth_credentials (str): The authentication credentials for the Chroma server. @@ -35,9 +35,8 @@ class ChromaRetrieval(Step): host: str port: int collection_name: str - key: str limit: int - input_column: str + user_query: str openai_api_key: str auth_type: Optional[AuthType] = None auth_credentials: Optional[str] = None @@ -76,12 +75,6 @@ def __init__(self, *args, **kwargs): self._collection = self._client.get_collection( name=self.collection_name, embedding_function=self._embedding_function ) - # Create a custom Jinja2 environment with double curly brace delimiters and PreserveUndefined - self.env = Environment( - variable_start_string="{{", - variable_end_string="}}", - undefined=PreserveUndefined, - ) """Makes a call to chromadb collection to fetch relevant chunks""" @@ -95,31 +88,30 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: start_time=start_time, ) - query = input_data.get(self.input_column) - if query is None: + self.env = self._create_jinja_env() + + query_text = self.env.from_string(self.user_query).render(**input_data) + + if query_text is None: return self._create_step_result( - status="error", - data="Input column not found.", - start_time=start_time, + status="error", data="Query text is Empty.", start_time=start_time ) try: - if isinstance(query, list) and isinstance(query[0], float): - response = self._collection.query( - query_embeddings=[query], - n_results=self.limit, - include=["documents", "metadatas", "distances"], - ) - else: - response = self._collection.query( - query_texts=[query], - n_results=self.limit, - include=["documents", "metadatas", "distances"], + response = self._collection.query( + query_texts=[query_text], + n_results=self.limit, + include=["documents", "metadatas", "distances"], + ) + result = [ + {"text": text, "score": distance} + for text, distance in zip( + response["documents"][0], response["distances"][0] ) - + ] return self._create_step_result( status="success", - data=response["documents"][0], + data=result, start_time=start_time, ) except Exception as e: diff --git a/athina/steps/pinecone_retrieval.py b/athina/steps/pinecone_retrieval.py index d3c0648..4ab9087 100644 --- a/athina/steps/pinecone_retrieval.py +++ b/athina/steps/pinecone_retrieval.py @@ -1,5 +1,3 @@ -# Step to make a call to pinecone index to fetch relevent chunks -import pinecone from typing import Optional, Union, Dict, Any from pydantic import Field, PrivateAttr @@ -9,6 +7,7 @@ from llama_index.core import VectorStoreIndex from llama_index.core.retrievers import VectorIndexRetriever import time +import traceback class PineconeRetrieval(Step): @@ -22,36 +21,48 @@ class PineconeRetrieval(Step): metadata_filters: filters to apply to metadata. environment: pinecone environment. api_key: api key for the pinecone server - input_column: column name in the input data + user_query: the query which will be sent to pinecone env: jinja environment """ index_name: str top_k: int api_key: str - input_column: str + user_query: str env: Environment = None metadata_filters: Optional[Dict[str, Any]] = None namespace: Optional[str] = None environment: Optional[str] = None + text_key: Optional[str] = None # Optional parameter for text key _vector_store: PineconeVectorStore = PrivateAttr() _vector_index: VectorStoreIndex = PrivateAttr() _retriever: VectorIndexRetriever = PrivateAttr() def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) + # Initialize base vector store arguments vector_store_args = {"api_key": self.api_key, "index_name": self.index_name} + # Add text_key only if specified by user + if self.text_key: + vector_store_args["text_key"] = self.text_key + # Only add environment if it's provided if self.environment is not None: vector_store_args["environment"] = self.environment - if self.namespace is not None: + # Only add namespace if it's provided and not None + if self.namespace: vector_store_args["namespace"] = self.namespace + # Initialize vector store with filtered arguments self._vector_store = PineconeVectorStore(**vector_store_args) + + # Create vector index from store self._vector_index = VectorStoreIndex.from_vector_store( vector_store=self._vector_store ) + + # Initialize retriever with specified top_k self._retriever = VectorIndexRetriever( index=self._vector_index, similarity_top_k=self.top_k ) @@ -60,9 +71,10 @@ class Config: arbitrary_types_allowed = True def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: - """makes a call to pinecone index to fetch relevent chunks""" + """Makes a call to pinecone index to fetch relevant chunks""" start_time = time.perf_counter() + # Validate input data if input_data is None: input_data = {} @@ -73,26 +85,36 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: start_time=start_time, ) - input_text = input_data.get(self.input_column, None) + # Create Jinja environment and render query + self.env = self._create_jinja_env() + query_text = self.env.from_string(self.user_query).render(**input_data) - if input_text is None: + if not query_text: return self._create_step_result( status="error", - data="Input column not found.", + data="Query text is Empty.", start_time=start_time, ) try: - response = self._retriever.retrieve(input_text) - result = [node.get_content() for node in response] + # Perform retrieval + response = self._retriever.retrieve(query_text) + result = [ + { + "text": node.get_content(), + "score": node.get_score(), + } + for node in response + ] + return self._create_step_result( + status="success", data=result, start_time=start_time + ) return self._create_step_result( status="success", data=result, start_time=start_time, ) except Exception as e: - import traceback - traceback.print_exc() print(f"Error during retrieval: {str(e)}") return self._create_step_result( diff --git a/athina/steps/qdrant_retrieval.py b/athina/steps/qdrant_retrieval.py index 430c1bf..9d0da1d 100644 --- a/athina/steps/qdrant_retrieval.py +++ b/athina/steps/qdrant_retrieval.py @@ -20,7 +20,7 @@ class QdrantRetrieval(Step): url: url of the qdrant server top_k: How many chunks to fetch. api_key: api key for the qdrant server - input_column: the query which will be sent to qdrant + user_query: the query which will be sent to qdrant env: jinja environment """ @@ -28,7 +28,7 @@ class QdrantRetrieval(Step): url: str top_k: int api_key: str - input_column: str + user_query: str env: Environment = None _qdrant_client: qdrant_client.QdrantClient = PrivateAttr() _vector_store: QdrantVectorStore = PrivateAttr() @@ -70,11 +70,11 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: self.env = self._create_jinja_env() - query_text = self.env.from_string(self.input_column).render(**input_data) + query_text = self.env.from_string(self.user_query).render(**input_data) if query_text is None: return self._create_step_result( - status="error", data="Query text not found.", start_time=start_time + status="error", data="Query text is Empty.", start_time=start_time ) try: @@ -84,7 +84,13 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: return self._create_step_result( status="success", data=[], start_time=start_time ) - result = [node.get_content() for node in response] + result = [ + { + "text": node.get_content(), + "score": node.get_score(), + } + for node in response + ] return self._create_step_result( status="success", data=result, start_time=start_time ) diff --git a/athina/steps/weaviate_retrieval.py b/athina/steps/weaviate_retrieval.py index ca585de..2ac1020 100644 --- a/athina/steps/weaviate_retrieval.py +++ b/athina/steps/weaviate_retrieval.py @@ -1,17 +1,18 @@ -# Step to make a call to weaviate collection to fetch relevant chunks import weaviate from weaviate.classes.init import Auth +from weaviate.classes.query import MetadataQuery from weaviate.client import WeaviateClient from weaviate.collections.collection import Collection -from typing import Union, Dict, Any +from typing import Union, Dict, Any, List from athina.steps import Step from jinja2 import Environment import time +import traceback class WeaviateRetrieval(Step): """ - Step that makes a call to weaviate collection to fetch relevant chunks. + Step that makes a call to weaviate collection to fetch relevant chunks with similarity scores. Attributes: url: URL of the Weaviate instance. @@ -21,7 +22,7 @@ class WeaviateRetrieval(Step): limit: Maximum number of results to fetch. api_key: API key for the Weaviate server. openai_api_key: OpenAI Api Key. - input_column: Column name in the input data. + user_query: the query which will be sent to Weaviate env: Jinja environment. """ @@ -32,7 +33,7 @@ class WeaviateRetrieval(Step): limit: int api_key: str openai_api_key: str - input_column: str + user_query: str env: Environment = None _client: WeaviateClient = None _collection: Collection = None @@ -43,6 +44,7 @@ def __init__(self, *args, **kwargs): cluster_url=self.url, auth_credentials=Auth.api_key(self.api_key), headers={"X-OpenAI-Api-Key": self.openai_api_key}, + skip_init_checks=True, ) self._collection = self._client.collections.get(self.collection_name) @@ -50,7 +52,7 @@ class Config: arbitrary_types_allowed = True def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: - """Makes a call to weaviate collection to fetch relevant chunks""" + """Makes a call to weaviate collection to fetch relevant chunks with scores""" start_time = time.perf_counter() if input_data is None: @@ -62,34 +64,54 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: data="Input data must be a dictionary.", start_time=start_time, ) - input_text = input_data.get(self.input_column, None) - if input_text is None: + self.env = self._create_jinja_env() + + query_text = self.env.from_string(self.user_query).render(**input_data) + + if query_text is None: return self._create_step_result( - status="error", - data="Input column not found.", - start_time=start_time, + status="error", data="Query text is Empty.", start_time=start_time ) try: + if self.search_type == "semantic_search": response = self._collection.query.near_text( - query=input_text, limit=self.limit + query=query_text, + limit=self.limit, + return_metadata=MetadataQuery.full(), ) elif self.search_type == "keyword_search": response = self._collection.query.bm25( - query=input_text, limit=self.limit + query=query_text, + limit=self.limit, + return_metadata=MetadataQuery.full(), ) elif self.search_type == "hybrid_search": response = self._collection.query.hybrid( - query=input_text, limit=self.limit + query=query_text, + limit=self.limit, + return_metadata=MetadataQuery.full(), ) else: raise ValueError(f"Unsupported search type: {self.search_type}") - + print(response) results = [] for obj in response.objects: - results.append(obj.properties[self.key]) + if self.search_type == "semantic_search": + score = ( + obj.metadata.certainty + if hasattr(obj.metadata, "certainty") + else None + ) + else: + score = ( + obj.metadata.score if hasattr(obj.metadata, "score") else None + ) + + result = {"text": obj.properties[self.key], "score": score} + results.append(result) return self._create_step_result( status="success", @@ -97,6 +119,7 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: start_time=start_time, ) except Exception as e: + traceback.print_exc() return self._create_step_result( status="error", data=str(e), diff --git a/pyproject.toml b/pyproject.toml index e41f439..f33ca47 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "athina" -version = "1.7.6" +version = "1.7.7" description = "Python SDK to configure and run evaluations for your LLM-based application" authors = ["Shiv Sakhuja ", "Akshat Gupta ", "Vivek Aditya ", "Akhil Bisht "] readme = "README.md" @@ -20,8 +20,6 @@ python-dotenv = "^1.0.0" requests = "^2.31.0" litellm = "1.56.6" jinja2 = "^3.1.4" -pinecone-client = "^4.1.0" -qdrant-client = "^1.9.1" marvin = "^2.3.4" pydantic = "^2.6.3" pydantic-settings = "^2.2.1" @@ -37,7 +35,7 @@ textdistance = "^4.6.3" textstat = "^0.7.4" chromadb-client = "^0.5.20" llama-index = "^0.11.0" -llama-index-vector-stores-pinecone = "^0.2.0" +llama-index-vector-stores-pinecone = "^0.3.1" llama-index-vector-stores-qdrant = "^0.3.0" [tool.poetry.group.dev.dependencies] From 45a306cb8ea50b42878d93bfc431b220cedc1fcd Mon Sep 17 00:00:00 2001 From: Vivek Aditya Date: Sat, 11 Jan 2025 07:56:33 +0530 Subject: [PATCH 3/4] added back qdrant client --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index f33ca47..86483fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ python-dotenv = "^1.0.0" requests = "^2.31.0" litellm = "1.56.6" jinja2 = "^3.1.4" +qdrant-client = "^1.9.1" marvin = "^2.3.4" pydantic = "^2.6.3" pydantic-settings = "^2.2.1" From 4e1525b3690e78d296f2b718ae4dc7cf28eb16f0 Mon Sep 17 00:00:00 2001 From: Vivek Aditya Date: Sat, 11 Jan 2025 07:56:44 +0530 Subject: [PATCH 4/4] added back pinecone client --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 86483fe..4d9a167 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,6 +20,7 @@ python-dotenv = "^1.0.0" requests = "^2.31.0" litellm = "1.56.6" jinja2 = "^3.1.4" +pinecone-client = "^4.1.0" qdrant-client = "^1.9.1" marvin = "^2.3.4" pydantic = "^2.6.3"