diff --git a/athina/steps/api.py b/athina/steps/api.py index d8e94f2..b5d4011 100644 --- a/athina/steps/api.py +++ b/athina/steps/api.py @@ -3,10 +3,9 @@ import time from typing import Union, Dict, Any, Optional import aiohttp -from jinja2 import Environment -from athina.helpers.jinja_helper import PreserveUndefined from athina.steps.base import Step import asyncio +from jinja2 import Environment def prepare_input_data(data: Dict[str, Any]) -> Dict[str, Any]: @@ -17,15 +16,6 @@ def prepare_input_data(data: Dict[str, Any]) -> Dict[str, Any]: } -def create_jinja_env() -> Environment: - """Create a Jinja2 environment with custom settings.""" - return Environment( - variable_start_string="{{", - variable_end_string="}}", - undefined=PreserveUndefined, - ) - - def prepare_template_data( env: Environment, template_dict: Optional[Dict[str, str]], @@ -51,33 +41,6 @@ def prepare_body( return env.from_string(body_template).render(**input_data) -def process_response( - status_code: int, - response_text: str, -) -> Dict[str, Any]: - """Process the API response and return formatted result.""" - if status_code >= 400: - # If the status code is an error, return the error message - return { - "status": "error", - "data": f"Failed to make the API call.\nStatus code: {status_code}\nError:\n{response_text}", - } - - try: - json_response = json.loads(response_text) - # If the response is JSON, return the JSON data - return { - "status": "success", - "data": json_response, - } - except json.JSONDecodeError: - # If the response is not JSON, return the text - return { - "status": "success", - "data": response_text, - } - - class ApiCall(Step): """ Step that makes an external API call. @@ -103,17 +66,52 @@ class ApiCall(Step): class Config: arbitrary_types_allowed = True + def process_response( + self, + status_code: int, + response_text: str, + start_time: float, + ) -> Dict[str, Any]: + """Process the API response and return formatted result.""" + if status_code >= 400: + # If the status code is an error, return the error message + return self._create_step_result( + status="error", + data=f"Failed to make the API call.\nStatus code: {status_code}\nError:\n{response_text}", + start_time=start_time, + ) + + try: + json_response = json.loads(response_text) + # If the response is JSON, return the JSON data + return self._create_step_result( + status="success", + data=json_response, + start_time=start_time, + ) + except json.JSONDecodeError: + # If the response is not JSON, return the text + return self._create_step_result( + status="success", + data=response_text, + start_time=start_time, + ) + async def execute_async(self, input_data: Any) -> Union[Dict[str, Any], None]: """Make an async API call and return the response.""" + start_time = time.perf_counter() if input_data is None: input_data = {} if not isinstance(input_data, dict): - raise TypeError("Input data must be a dictionary.") - + return self._create_step_result( + status="error", + data="Input data must be a dictionary.", + start_time=start_time, + ) # Prepare the environment and input data - self.env = create_jinja_env() + self.env = self._create_jinja_env() prepared_input_data = prepare_input_data(input_data) # Prepare request components @@ -144,23 +142,27 @@ async def execute_async(self, input_data: Any) -> Union[Dict[str, Any], None]: json=json_body, ) as response: response_text = await response.text() - return process_response(response.status, response_text) + return self.process_response( + response.status, response_text, start_time + ) except asyncio.TimeoutError: if attempt < self.retries - 1: await asyncio.sleep(2) continue # If the request times out after multiple attempts, return an error message - return { - "status": "error", - "data": "Failed to make the API call.\nRequest timed out after multiple attempts.", - } + return self._create_step_result( + status="error", + data="Failed to make the API call.\nRequest timed out after multiple attempts.", + start_time=start_time, + ) except Exception as e: # If an exception occurs, return the error message - return { - "status": "error", - "data": f"Failed to make the API call.\nError: {e.__class__.__name__}\nDetails:\n{str(e)}", - } + return self._create_step_result( + status="error", + data=f"Failed to make the API call.\nError: {e.__class__.__name__}\nDetails:\n{str(e)}", + start_time=start_time, + ) def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: """Synchronous execute api call that runs the async method in an event loop.""" diff --git a/athina/steps/base.py b/athina/steps/base.py index 7b38fd7..943414a 100644 --- a/athina/steps/base.py +++ b/athina/steps/base.py @@ -1,13 +1,16 @@ import os import json import logging -from typing import Dict, Any, List, Iterable, Optional, Callable +from typing import Dict, Any, List, Iterable, Optional, Callable, TypedDict, Literal from pydantic import BaseModel +from jinja2 import Environment +from athina.helpers.jinja_helper import PreserveUndefined from athina.helpers.json import JsonHelper, JsonExtractor from athina.llms.abstract_llm_service import AbstractLlmService from athina.llms.openai_service import OpenAiService from athina.keys import OpenAiApiKey import functools +import time # Configure logging @@ -22,6 +25,12 @@ class StepError(Exception): pass +class StepResult(TypedDict): + status: Literal["success", "error"] + data: str + metadata: Dict[str, Any] + + def step(func: Callable) -> Callable: @functools.wraps(func) def wrapper(self, *args, **kwargs): @@ -81,6 +90,45 @@ def extract_input_data(self, context: Dict[str, Any]) -> Any: input_data = context return input_data + def _create_step_result( + self, + status: Literal["success", "error"], + data: str, + start_time: float, + metadata: Dict[str, Any] = {}, + exported_vars: Optional[Dict] = None, + ) -> StepResult: + """ + Create a standardized result object for step execution. + + Args: + status: Step execution status ("success" or "error") + data: Output data or error message + start_time: Time when step started execution (from perf_counter) + metadata: Optional dictionary of metadata + exported_vars: Optional dictionary of exported variables + """ + if "response_time" not in metadata: + execution_time_ms = round((time.perf_counter() - start_time) * 1000) + metadata = {"response_time": execution_time_ms} + + if exported_vars is not None: + metadata["exported_vars"] = exported_vars + + return {"status": status, "data": data, "metadata": metadata} + + def _create_jinja_env( + self, + variable_start_string: str = "{{", + variable_end_string: str = "}}", + ) -> Environment: + """Create a Jinja2 environment with custom settings.""" + return Environment( + variable_start_string=variable_start_string, + variable_end_string=variable_end_string, + undefined=PreserveUndefined, + ) + @step def run( self, diff --git a/athina/steps/chroma_retrieval.py b/athina/steps/chroma_retrieval.py index 64cbbbe..16c691f 100644 --- a/athina/steps/chroma_retrieval.py +++ b/athina/steps/chroma_retrieval.py @@ -8,6 +8,7 @@ from jinja2 import Environment from athina.helpers.jinja_helper import PreserveUndefined from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction +import time class AuthType(str, Enum): @@ -85,12 +86,22 @@ def __init__(self, *args, **kwargs): """Makes a call to chromadb collection to fetch relevant chunks""" def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: + start_time = time.perf_counter() + if input_data is None or not isinstance(input_data, dict): - return {"status": "error", "data": "Input data must be a dictionary."} + return self._create_step_result( + status="error", + data="Input data must be a dictionary.", + start_time=start_time, + ) query = input_data.get(self.input_column) if query is None: - return {"status": "error", "data": "Input column not found."} + return self._create_step_result( + status="error", + data="Input column not found.", + start_time=start_time, + ) try: if isinstance(query, list) and isinstance(query[0], float): @@ -106,9 +117,17 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: include=["documents", "metadatas", "distances"], ) - return {"status": "success", "data": response["documents"][0]} + return self._create_step_result( + status="success", + data=response["documents"][0], + start_time=start_time, + ) except Exception as e: - return {"status": "error", "data": str(e)} + return self._create_step_result( + status="error", + data=str(e), + start_time=start_time, + ) def close(self): if self._client: diff --git a/athina/steps/classify_text.py b/athina/steps/classify_text.py index b469203..089386c 100644 --- a/athina/steps/classify_text.py +++ b/athina/steps/classify_text.py @@ -2,6 +2,7 @@ from typing import Union, Dict, Any from athina.steps import Step import marvin +import time class ClassifyText(Step): @@ -22,17 +23,25 @@ class ClassifyText(Step): def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: """Classify the text and return the label.""" + start_time = time.perf_counter() if input_data is None: input_data = {} if not isinstance(input_data, dict): - raise TypeError("Input data must be a dictionary.") - + return self._create_step_result( + status="error", + data="Input data must be a dictionary.", + start_time=start_time, + ) input_text = input_data.get(self.input_column, None) if input_text is None: - return None + return self._create_step_result( + status="error", + data="Input column not found.", + start_time=start_time, + ) marvin.settings.openai.api_key = self.llm_api_key marvin.settings.openai.chat.completions.model = self.language_model_id @@ -42,12 +51,14 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: input_text, labels=self.labels, ) - return { - "status": "success", - "data": result, - } + return self._create_step_result( + status="success", + data=result, + start_time=start_time, + ) except Exception as e: - return { - "status": "error", - "data": str(e), - } + return self._create_step_result( + status="error", + data=str(e), + start_time=start_time, + ) diff --git a/athina/steps/code_execution.py b/athina/steps/code_execution.py index 98930c3..2e8d855 100644 --- a/athina/steps/code_execution.py +++ b/athina/steps/code_execution.py @@ -5,6 +5,7 @@ import json import re import tempfile +import time class CodeExecution(Step): @@ -18,7 +19,53 @@ class CodeExecution(Step): code: str name: Optional[str] = None - def bandit_check(self, code: str) -> None: + def format_bandit_result(self, stdout: str) -> str: + """ + Format the Bandit output into a more readable string. + """ + try: + data = json.loads(stdout) + output = [] + # Add header + output.append("Security Check Results") + output.append("=" * 20) + # Add results + if data["results"]: + for result in data["results"]: + output.append(f"\nIssue Found:") + output.append(f" Severity: {result['issue_severity']}") + output.append(f" Confidence: {result['issue_confidence']}") + output.append(f" Description: {result['issue_text']}") + output.append("\n Problematic Code:") + output.append(" " + "-" * 16) + for line in result["code"].splitlines(): + output.append(f" {line}") + + if "issue_cwe" in result: + output.append(f"\n CWE: {result['issue_cwe']['id']}") + output.append(f" CWE Link: {result['issue_cwe']['link']}") + + output.append(f" More Info: {result['more_info']}") + else: + output.append("\nNo security issues found.") + # Add metrics summary + output.append("\nMetrics Summary") + output.append("-" * 15) + metrics = data["metrics"]["_totals"] + output.append(f"Total lines of code: {metrics['loc']}") + output.append(f"High severity issues: {metrics['SEVERITY.HIGH']}") + output.append(f"Medium severity issues: {metrics['SEVERITY.MEDIUM']}") + output.append(f"Low severity issues: {metrics['SEVERITY.LOW']}") + return "\n".join(output) + + except json.JSONDecodeError: + return f"Error parsing Bandit output: {stdout}" + except KeyError as e: + return f"Error processing Bandit output: Missing key {e}" + except Exception as e: + return f"Error processing Bandit output: {e}" + + def bandit_check(self, code: str) -> Optional[str]: """ Run Bandit security check on the provided code. """ @@ -27,29 +74,40 @@ def bandit_check(self, code: str) -> None: temp_file_path = temp_file.name try: result = subprocess.run( - ["bandit", "-r", temp_file_path, "-f", "json", "-c", "bandit.yml"], + ["bandit", "-r", temp_file_path, "-f", "json"], capture_output=True, text=True, ) if result.returncode != 0: - return json.dumps(result.stdout) + return self.format_bandit_result(result.stdout) + except Exception as e: + return str(e) finally: os.remove(temp_file_path) return None def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: """Execute the code with the input data.""" + start_time = time.perf_counter() if input_data is None: input_data = {} if not isinstance(input_data, dict): - raise TypeError("Input data must be a dictionary.") + return self._create_step_result( + status="error", + data="Input data must be a dictionary.", + start_time=start_time, + ) try: issues = self.bandit_check(self.code) - if not issues: - return {"error": "Security check failed. " + issues} + if issues: + return self._create_step_result( + status="error", + data="Security check failed. Issues:\n" + issues, + start_time=start_time, + ) from RestrictedPython import compile_restricted from RestrictedPython import safe_globals from RestrictedPython.Guards import safe_builtins @@ -60,7 +118,6 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: import editdistance import textdistance from datetime import datetime - import time import textstat custom_builtins = safe_builtins.copy() @@ -151,12 +208,14 @@ def wrap_non_serializable(obj): return str(obj) wrapped_result = wrap_non_serializable(result) - return { - "status": "success", - "data": wrapped_result, - } + return self._create_step_result( + status="success", + data=wrapped_result, + start_time=start_time, + ) except Exception as e: - return { - "status": "error", - "data": f"Failed to execute the code.\nDetails:\n{str(e)}", - } + return self._create_step_result( + status="error", + data=f"Failed to execute the code.\nDetails:\n{str(e)}", + start_time=start_time, + ) diff --git a/athina/steps/extract_entities.py b/athina/steps/extract_entities.py index 85d712e..dca2207 100644 --- a/athina/steps/extract_entities.py +++ b/athina/steps/extract_entities.py @@ -2,6 +2,7 @@ from typing import Union, Dict, Any from athina.steps import Step import marvin +import time class ExtractEntities(Step): @@ -22,17 +23,24 @@ class ExtractEntities(Step): def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: """Extract entities from the text and return the entities.""" - + start_time = time.perf_counter() if input_data is None: input_data = {} if not isinstance(input_data, dict): - raise TypeError("Input data must be a dictionary.") - + return self._create_step_result( + status="error", + data="Input data must be a dictionary.", + start_time=start_time, + ) input_text = input_data.get(self.input_column, None) if input_text is None: - return None + return self._create_step_result( + status="error", + data="Input column not found.", + start_time=start_time, + ) marvin.settings.openai.api_key = self.llm_api_key marvin.settings.openai.chat.completions.model = self.language_model_id @@ -42,12 +50,14 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: input_text, instructions=self.instructions, ) - return { - "status": "success", - "data": result, - } + return self._create_step_result( + status="success", + data=result, + start_time=start_time, + ) except Exception as e: - return { - "status": "error", - "data": str(e), - } + return self._create_step_result( + status="error", + data=str(e), + start_time=start_time, + ) diff --git a/athina/steps/extract_json_path.py b/athina/steps/extract_json_path.py index ab9fcb5..5b1c44c 100644 --- a/athina/steps/extract_json_path.py +++ b/athina/steps/extract_json_path.py @@ -2,6 +2,7 @@ from athina.steps import Step import json from jsonpath_ng import parse +import time class ExtractJsonPath(Step): @@ -18,17 +19,25 @@ class ExtractJsonPath(Step): def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: """Extract the JsonPath from the input data.""" + start_time = time.perf_counter() if input_data is None: input_data = {} if not isinstance(input_data, dict): - raise TypeError("Input data must be a dictionary.") - + return self._create_step_result( + status="error", + data="Input data must be a dictionary.", + start_time=start_time, + ) input_text = input_data.get(self.input_column, None) if input_text is None: - return None + return self._create_step_result( + status="error", + data="Input column not found.", + start_time=start_time, + ) try: if isinstance(input_text, dict) or isinstance(input_text, list): @@ -36,7 +45,11 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: elif isinstance(input_text, str): input_json = json.loads(input_text) else: - return None + return self._create_step_result( + status="error", + data="Input column must be a dictionary or a string.", + start_time=start_time, + ) result = parse(self.json_path).find(input_json) if not result or len(result) == 0: @@ -46,12 +59,14 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: else: result = [match.value for match in result] - return { - "status": "success", - "data": result, - } + return self._create_step_result( + status="success", + data=result, + start_time=start_time, + ) except Exception as e: - return { - "status": "error", - "data": str(e), - } + return self._create_step_result( + status="error", + data=str(e), + start_time=start_time, + ) diff --git a/athina/steps/llm.py b/athina/steps/llm.py index 8a22b3c..aa163d8 100644 --- a/athina/steps/llm.py +++ b/athina/steps/llm.py @@ -12,28 +12,27 @@ from athina.steps.transform import ExtractJsonFromString, ExtractNumberFromString import traceback import json +import time + class TextContent(BaseModel): type: str text: str + class ImageContent(BaseModel): type: str = "image_url" image_url: Union[str, Dict[str, str]] def to_api_format(self): if isinstance(self.image_url, dict): - return { - "type": "image_url", - "image_url": self.image_url - } - return { - "type": "image_url", - "image_url": {"url": self.image_url} - } + return {"type": "image_url", "image_url": self.image_url} + return {"type": "image_url", "image_url": {"url": self.image_url}} + Content = Union[str, List[Union[TextContent, ImageContent]]] + class PromptMessage(BaseModel): role: str content: Optional[Content] = None @@ -43,22 +42,20 @@ def to_api_format(self) -> dict: """Convert the message to the format expected by the OpenAI API""" if self.content is None: return {"role": self.role} - + if isinstance(self.content, str): return {"role": self.role, "content": self.content} - + if isinstance(self.content, list): formatted_content = [] for item in self.content: if isinstance(item, TextContent): - formatted_content.append({ - "type": "text", - "text": item.text - }) + formatted_content.append({"type": "text", "text": item.text}) elif isinstance(item, ImageContent): formatted_content.append(item.to_api_format()) return {"role": self.role, "content": formatted_content} + class ModelOptions(BaseModel): max_tokens: Optional[int] = None temperature: Optional[float] = None @@ -143,19 +140,27 @@ def resolve(self, **kwargs) -> List[PromptMessage]: if isinstance(item, TextContent): content_template = self.env.from_string(item.text) resolved_text = content_template.render(**kwargs) - resolved_content.append(TextContent(text=resolved_text, type='text')) + resolved_content.append( + TextContent(text=resolved_text, type="text") + ) elif isinstance(item, ImageContent): if isinstance(item.image_url, str): url_template = self.env.from_string(item.image_url) resolved_url = url_template.render(**kwargs) - resolved_content.append(ImageContent(image_url=resolved_url)) + resolved_content.append( + ImageContent(image_url=resolved_url) + ) elif isinstance(item.image_url, dict): resolved_url_dict = {} for key, value in item.image_url.items(): url_template = self.env.from_string(value) resolved_url_dict[key] = url_template.render(**kwargs) - resolved_content.append(ImageContent(image_url=resolved_url_dict)) - resolved_message = PromptMessage(role=message.role, content=resolved_content) + resolved_content.append( + ImageContent(image_url=resolved_url_dict) + ) + resolved_message = PromptMessage( + role=message.role, content=resolved_content + ) resolved_messages.append(resolved_message) return resolved_messages @@ -207,6 +212,7 @@ def simple( def execute(self, input_data: dict, **kwargs) -> str: """Execute a prompt with the LLM service.""" + start_time = time.perf_counter() if input_data is None: input_data = {} @@ -217,7 +223,7 @@ def execute(self, input_data: dict, **kwargs) -> str: messages = self.template.resolve(**input_data) # Convert messages to API format api_formatted_messages = [msg.to_api_format() for msg in messages] - + llm_service_response = self.llm_service.chat_completion( api_formatted_messages, model=self.model, @@ -264,23 +270,29 @@ def execute(self, input_data: dict, **kwargs) -> str: response = llmresponse if error: - return {"status": "error", "data": error} + return self._create_step_result( + status="error", start_time=start_time, data=error + ) else: - return { - "status": "success", - "data": response, - "metadata": ( + return self._create_step_result( + status="success", + data=response, + start_time=start_time, + metadata=( json.loads(llm_service_response.get("metadata", "{}")) if llm_service_response.get("metadata") else {} ), - } + ) except Exception as e: traceback.print_exc() - return {"status": "error", "data": str(e)} + return self._create_step_result( + status="error", start_time=start_time, data=str(e) + ) async def execute_async(self, input_data: dict, **kwargs) -> dict: """Execute a prompt with the LLM service asynchronously.""" + start_time = time.perf_counter() if input_data is None: input_data = {} @@ -339,17 +351,22 @@ async def execute_async(self, input_data: dict, **kwargs) -> dict: response = llmresponse if error: - return {"status": "error", "data": error} + return self._create_step_result( + status="error", start_time=start_time, data=error + ) else: - return { - "status": "success", - "data": response, - "metadata": ( + return self._create_step_result( + status="success", + data=response, + start_time=start_time, + metadata=( json.loads(llm_service_response.get("metadata", "{}")) if llm_service_response.get("metadata") else {} ), - } + ) except Exception as e: traceback.print_exc() - return {"status": "error", "data": str(e)} + return self._create_step_result( + status="error", start_time=start_time, data=str(e) + ) diff --git a/athina/steps/open_ai_assistant.py b/athina/steps/open_ai_assistant.py index dc85448..8260ec8 100644 --- a/athina/steps/open_ai_assistant.py +++ b/athina/steps/open_ai_assistant.py @@ -3,6 +3,7 @@ from athina.steps import Step from openai import OpenAI import os +import time class OpenAiAssistant(Step): @@ -33,17 +34,26 @@ def __init__(self, assistant_id: str, openai_api_key: str, input_column: str): def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: """Calls OpenAI's Assistant API and returns the response.""" + start_time = time.perf_counter() if input_data is None: input_data = {} if not isinstance(input_data, dict): - raise TypeError("Input data must be a dictionary.") + return self._create_step_result( + status="error", + data="Input data must be a dictionary.", + start_time=start_time, + ) input_text = input_data.get(self.input_column, None) if input_text is None: - return None + return self._create_step_result( + status="error", + data="Input column must be a string.", + start_time=start_time, + ) try: # Create a thread thread = self.client.beta.threads.create() @@ -66,7 +76,11 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: # Handle failed case if run.status == "failed": - return {"status": "error", "data": "The assistant run failed."} + return self._create_step_result( + status="error", + data="The assistant run failed.", + start_time=start_time, + ) # Retrieve the assistant's response messages = self.client.beta.threads.messages.list(thread_id=thread.id) @@ -76,15 +90,26 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: if message.role == "assistant": for content in message.content: if content.type == "text": - return {"status": "success", "data": content.text.value} + return self._create_step_result( + status="success", + data=content.text.value, + start_time=start_time, + ) elif content.type == "json": - return {"status": "success", "data": content.json.value} - return { - "status": "success", - "data": None, - } + return self._create_step_result( + status="success", + data=content.json.value, + start_time=start_time, + ) + + return self._create_step_result( + status="success", + data=None, + start_time=start_time, + ) except Exception as e: - return { - "status": "error", - "data": str(e), - } + return self._create_step_result( + status="error", + data=str(e), + start_time=start_time, + ) diff --git a/athina/steps/parse_document.py b/athina/steps/parse_document.py index 0c296bb..6ace655 100644 --- a/athina/steps/parse_document.py +++ b/athina/steps/parse_document.py @@ -2,8 +2,10 @@ from athina.steps import Step from llama_parse import LlamaParse import nest_asyncio +import time + +nest_asyncio.apply() # LlamaParse can cause nested asyncio exceptions so we need this line of code -nest_asyncio.apply() # LlamaParse can cause nested asyncio exceptions so we need this line of code class ParseDocument(Step): """ @@ -23,37 +25,47 @@ class ParseDocument(Step): def execute(self, input_data) -> Union[Dict[str, Any], None]: """Parse a document using LlamaParse and return the result.""" + start_time = time.perf_counter() if input_data is None: input_data = {} if not isinstance(input_data, dict): - raise TypeError("Input data must be a dictionary.") + return self._create_step_result( + status="error", + data="Input data must be a dictionary.", + start_time=start_time, + ) try: # Initialize LlamaParse client - llama_parse = LlamaParse(api_key=self.llama_parse_key, verbose=self.verbose, result_type=self.output_format) + llama_parse = LlamaParse( + api_key=self.llama_parse_key, + verbose=self.verbose, + result_type=self.output_format, + ) # Parse the document - documents = llama_parse.load_data( - file_path=self.file_url - ) - + documents = llama_parse.load_data(file_path=self.file_url) + if not documents: - return { - "status": "error", - "data": "No documents were parsed." - } + return self._create_step_result( + status="error", + data="No documents were parsed.", + start_time=start_time, + ) parsed_content = documents[0].text - return { - "status": "success", - "data": parsed_content, - } + return self._create_step_result( + status="success", + data=parsed_content, + start_time=start_time, + ) except Exception as e: - return { - "status": "error", - "data": f"LlamaParse error: {str(e)}", - } \ No newline at end of file + return self._create_step_result( + status="error", + data=f"LlamaParse error: {str(e)}", + start_time=start_time, + ) diff --git a/athina/steps/pinecone_retrieval.py b/athina/steps/pinecone_retrieval.py index f2bcb1c..d3c0648 100644 --- a/athina/steps/pinecone_retrieval.py +++ b/athina/steps/pinecone_retrieval.py @@ -8,6 +8,7 @@ from llama_index.vector_stores.pinecone import PineconeVectorStore from llama_index.core import VectorStoreIndex from llama_index.core.retrievers import VectorIndexRetriever +import time class PineconeRetrieval(Step): @@ -60,30 +61,42 @@ class Config: def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: """makes a call to pinecone index to fetch relevent chunks""" + start_time = time.perf_counter() + if input_data is None: input_data = {} if not isinstance(input_data, dict): - raise TypeError("Input data must be a dictionary.") + return self._create_step_result( + status="error", + data="Input data must be a dictionary.", + start_time=start_time, + ) input_text = input_data.get(self.input_column, None) if input_text is None: - return None + return self._create_step_result( + status="error", + data="Input column not found.", + start_time=start_time, + ) try: response = self._retriever.retrieve(input_text) result = [node.get_content() for node in response] - return { - "status": "success", - "data": result, - } + return self._create_step_result( + status="success", + data=result, + start_time=start_time, + ) except Exception as e: import traceback traceback.print_exc() print(f"Error during retrieval: {str(e)}") - return { - "status": "error", - "data": str(e), - } + return self._create_step_result( + status="error", + data=str(e), + start_time=start_time, + ) diff --git a/athina/steps/qdrant_retrieval.py b/athina/steps/qdrant_retrieval.py index df39257..430c1bf 100644 --- a/athina/steps/qdrant_retrieval.py +++ b/athina/steps/qdrant_retrieval.py @@ -8,6 +8,7 @@ from llama_index.core import VectorStoreIndex from llama_index.core.retrievers import VectorIndexRetriever import qdrant_client +import time class QdrantRetrieval(Step): @@ -19,7 +20,7 @@ class QdrantRetrieval(Step): url: url of the qdrant server top_k: How many chunks to fetch. api_key: api key for the qdrant server - input_column: column name in the input data + input_column: the query which will be sent to qdrant env: jinja environment """ @@ -55,37 +56,43 @@ class Config: def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: """makes a call to pinecone index to fetch relevent chunks""" + start_time = time.perf_counter() if input_data is None: input_data = {} if not isinstance(input_data, dict): - raise TypeError("Input data must be a dictionary.") + return self._create_step_result( + status="error", + data="Input data must be a dictionary.", + start_time=start_time, + ) - input_text = input_data.get(self.input_column, None) + self.env = self._create_jinja_env() - if input_text is None: - return None + query_text = self.env.from_string(self.input_column).render(**input_data) + + if query_text is None: + return self._create_step_result( + status="error", data="Query text not found.", start_time=start_time + ) try: - response = self._retriever.retrieve(input_text) + response = self._retriever.retrieve(query_text) if not response: - print("No chunks retrieved") - return { - "status": "success", - "data": [], - } + print("No chunks retrieved for query text") + return self._create_step_result( + status="success", data=[], start_time=start_time + ) result = [node.get_content() for node in response] - return { - "status": "success", - "data": result, - } + return self._create_step_result( + status="success", data=result, start_time=start_time + ) except Exception as e: import traceback traceback.print_exc() print(f"Error during retrieval: {str(e)}") - return { - "status": "error", - "data": str(e), - } + return self._create_step_result( + status="error", data=str(e), start_time=start_time + ) diff --git a/athina/steps/search.py b/athina/steps/search.py index 109dc2f..f20de2a 100644 --- a/athina/steps/search.py +++ b/athina/steps/search.py @@ -54,8 +54,11 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: input_data = {} if not isinstance(input_data, dict): - raise TypeError("Input data must be a dictionary.") - + return self._create_step_result( + status="error", + data="Input data must be a dictionary.", + start_time=start_time, + ) # Create a custom Jinja2 environment with double curly brace delimiters and PreserveUndefined self.env = Environment( variable_start_string="{{", @@ -104,47 +107,40 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: ) if response.status_code >= 400: # If the status code is an error, return the error message - return { - "status": "error", - "data": f"Failed to make the API call.\nStatus code: {response.status_code}\nError:\n{response.text}", - } + return self._create_step_result( + status="error", + data=f"Failed to make the API call.\nStatus code: {response.status_code}\nError:\n{response.text}", + start_time=start_time, + ) try: json_response = response.json() # If the response is JSON, return the JSON data - return { - "status": "success", - "data": json_response, - "metadata": { - "response_time": (time.time() - start_time) * 1000, - }, - } + return self._create_step_result( + status="success", + data=json_response, + start_time=start_time, + ) except json.JSONDecodeError: # If the response is not JSON, return the text - return { - "status": "success", - "data": response.text, - "metadata": { - "response_time": (time.time() - start_time) * 1000, - }, - } + return self._create_step_result( + status="success", + data=response.text, + start_time=start_time, + ) except requests.Timeout: if attempt < retries - 1: time.sleep(2) continue # If the request times out after multiple attempts, return an error message - return { - "status": "error", - "data": "Failed to make the API call.\nRequest timed out after multiple attempts.", - "metadata": { - "response_time": (time.time() - start_time) * 1000, - }, - } + return self._create_step_result( + status="error", + data="Failed to make the API call.\nRequest timed out after multiple attempts.", + start_time=start_time, + ) except Exception as e: # If an exception occurs, return the error message - return { - "status": "error", - "data": f"Failed to make the API call.\nError: {e.__class__.__name__}\nDetails:\n{str(e)}", - "metadata": { - "response_time": (time.time() - start_time) * 1000, - }, - } + return self._create_step_result( + status="error", + data=f"Failed to make the API call.\nError: {e.__class__.__name__}\nDetails:\n{str(e)}", + start_time=start_time, + ) diff --git a/athina/steps/spider_crawl.py b/athina/steps/spider_crawl.py index 1d7bfee..76de65c 100644 --- a/athina/steps/spider_crawl.py +++ b/athina/steps/spider_crawl.py @@ -5,12 +5,13 @@ import requests from athina.steps import Step from jinja2 import Environment -from athina.helpers.jinja_helper import PreserveUndefined def prepare_input_data(data): - return {key: json.dumps(value) if isinstance(value, (list, dict)) else value - for key, value in data.items()} + return { + key: json.dumps(value) if isinstance(value, (list, dict)) else value + for key, value in data.items() + } class SpiderCrawl(Step): @@ -34,56 +35,60 @@ class SpiderCrawl(Step): class Config: arbitrary_types_allowed = True - def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: """Make an Search API call and return the response.""" + start_time = time.perf_counter() if input_data is None: input_data = {} if not isinstance(input_data, dict): - raise TypeError("Input data must be a dictionary.") + return self._create_step_result( + status="error", + data="Input data must be a dictionary.", + start_time=start_time, + ) - # Create a custom Jinja2 environment with double curly brace delimiters and PreserveUndefined - self.env = Environment( - variable_start_string='{{', - variable_end_string='}}', - undefined=PreserveUndefined - ) + # Create a custom Jinja2 environment + self.env = self._create_jinja_env() - body={ + body = { "url": self.url, "limit": self.limit, "metadata": self.metadata, - "return_format": self.return_format + "return_format": self.return_format, } prepared_body = None # Add a filter to the Jinja2 environment to convert the input data to JSON body_template = self.env.from_string(json.dumps(body)) prepared_input_data = prepare_input_data(input_data) prepared_body = body_template.render(**prepared_input_data) - retries = 2 # number of retries timeout = 300 # seconds for attempt in range(retries): try: response = requests.post( - url= "https://api.spider.cloud/crawl", + url="https://api.spider.cloud/crawl", headers={ "Content-Type": "application/json", - 'Authorization': f'Bearer {self.spider_key}', + "Authorization": f"Bearer {self.spider_key}", }, - json=json.loads(prepared_body, strict=False) if prepared_body else None, + json=( + json.loads(prepared_body, strict=False) + if prepared_body + else None + ), timeout=timeout, ) if response.status_code >= 400: # If the status code is an error, return the error message - return { - "status": "error", - "data": f"Failed to make the API call.\nStatus code: {response.status_code}\nError:\n{response.text}", - } + return self._create_step_result( + status="error", + data=f"Failed to make the API call.\nStatus code: {response.status_code}\nError:\n{response.text}", + start_time=start_time, + ) try: json_response = response.json() # If the response is JSON, return the JSON data @@ -92,36 +97,40 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: content = [] for item in json_response: value = { - "content": item.get('content'), - "url": item.get('url'), - "error": item.get('error'), + "content": item.get("content"), + "url": item.get("url"), + "error": item.get("error"), } content.append(value) - - return { - "status": "success", - "data": content, - } - + return self._create_step_result( + status="success", + data=content, + start_time=start_time, + ) + except json.JSONDecodeError: # If the response is not JSON, return the text - return { - "status": "success", - "data": response.text, - } + return self._create_step_result( + status="success", + data=response.text, + start_time=start_time, + ) except requests.Timeout: if attempt < retries - 1: time.sleep(2) continue + # If the request times out after multiple attempts, return an error message - return { - "status": "error", - "data": "Failed to make the API call.\nRequest timed out after multiple attempts.", - } + return self._create_step_result( + status="error", + data="Failed to make the API call.\nRequest timed out after multiple attempts.", + start_time=start_time, + ) except Exception as e: # If an exception occurs, return the error message - return { - "status": "error", - "data": f"Failed to make the API call.\nError: {e.__class__.__name__}\nDetails:\n{str(e)}", - } + return self._create_step_result( + status="error", + data=f"Failed to make the API call.\nError: {e.__class__.__name__}\nDetails:\n{str(e)}", + start_time=start_time, + ) diff --git a/athina/steps/transcribe_speech_to_text.py b/athina/steps/transcribe_speech_to_text.py index d8a251d..b457676 100644 --- a/athina/steps/transcribe_speech_to_text.py +++ b/athina/steps/transcribe_speech_to_text.py @@ -1,6 +1,7 @@ from typing import Any, Dict, Optional import requests from athina.steps import Step +import time class TranscribeSpeechToText(Step): @@ -39,6 +40,8 @@ class TranscribeSpeechToText(Step): def execute(self, input_data: Any) -> Dict[str, Any]: """Transcribe audio file and return the text.""" + + start_time = time.perf_counter() try: # Prepare the request to Deepgram API headers = { @@ -107,19 +110,23 @@ def execute(self, input_data: Any) -> Dict[str, Any]: "language": result.get("metadata", {}).get("language"), } - return { - "status": "success", - "data": transcribed_text, - "metadata": metadata, # Only include serializable metadata - } + return self._create_step_result( + status="success", + data=transcribed_text, + metadata=metadata, + start_time=start_time, + ) except requests.RequestException as e: - return { - "status": "error", - "data": f"Failed to download audio file: {str(e)}", - } + return self._create_step_result( + status="error", + data=f"Failed to download audio file: {str(e)}", + start_time=start_time, + ) + except Exception as e: - return { - "status": "error", - "data": f"Transcription failed: {str(e)}", - } + return self._create_step_result( + status="error", + data=f"Transcription failed: {str(e)}", + start_time=start_time, + ) diff --git a/athina/steps/weaviate_retrieval.py b/athina/steps/weaviate_retrieval.py index a661e8c..ca585de 100644 --- a/athina/steps/weaviate_retrieval.py +++ b/athina/steps/weaviate_retrieval.py @@ -6,6 +6,7 @@ from typing import Union, Dict, Any from athina.steps import Step from jinja2 import Environment +import time class WeaviateRetrieval(Step): @@ -50,17 +51,25 @@ class Config: def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: """Makes a call to weaviate collection to fetch relevant chunks""" + start_time = time.perf_counter() if input_data is None: input_data = {} if not isinstance(input_data, dict): - raise TypeError("Input data must be a dictionary.") - + return self._create_step_result( + status="error", + data="Input data must be a dictionary.", + start_time=start_time, + ) input_text = input_data.get(self.input_column, None) if input_text is None: - return {"status": "error", "data": "Input column not found."} + return self._create_step_result( + status="error", + data="Input column not found.", + start_time=start_time, + ) try: if self.search_type == "semantic_search": @@ -82,15 +91,17 @@ def execute(self, input_data: Any) -> Union[Dict[str, Any], None]: for obj in response.objects: results.append(obj.properties[self.key]) - return { - "status": "success", - "data": results, - } + return self._create_step_result( + status="success", + data=results, + start_time=start_time, + ) except Exception as e: - return { - "status": "error", - "data": str(e), - } + return self._create_step_result( + status="error", + data=str(e), + start_time=start_time, + ) def close(self): """Closes the connection to the Weaviate client.""" diff --git a/pyproject.toml b/pyproject.toml index 36ff6e5..d1974ec 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "athina" -version = "1.7.2" +version = "1.7.3" description = "Python SDK to configure and run evaluations for your LLM-based application" authors = ["Shiv Sakhuja ", "Akshat Gupta ", "Vivek Aditya ", "Akhil Bisht "] readme = "README.md" @@ -18,7 +18,7 @@ pandas = "*" datasets = "^2.16.0" python-dotenv = "^1.0.0" requests = "^2.31.0" -litellm = "1.55.10" +litellm = "1.56.6" jinja2 = "^3.1.4" pinecone-client = "^4.1.0" qdrant-client = "^1.9.1"