From 13ec978454f5098f14eb91d4881a5203cbb69b88 Mon Sep 17 00:00:00 2001 From: Akhil Date: Tue, 28 Jan 2025 18:02:34 +0530 Subject: [PATCH 1/4] Added loop step --- athina/steps/__init__.py | 2 + athina/steps/loop.py | 188 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 190 insertions(+) create mode 100644 athina/steps/loop.py diff --git a/athina/steps/__init__.py b/athina/steps/__init__.py index dd69950..36615fa 100644 --- a/athina/steps/__init__.py +++ b/athina/steps/__init__.py @@ -2,6 +2,7 @@ from athina.steps.conditional import ConditionalStep from athina.steps.chain import Chain from athina.steps.iterator import Map +from athina.steps.loop import LoopStep from athina.steps.llm import PromptExecution from athina.steps.api import ApiCall from athina.steps.extract_entities import ExtractEntities @@ -42,4 +43,5 @@ "SpiderCrawl", "ParseDocument", "ConditionalStep", + "LoopStep", ] diff --git a/athina/steps/loop.py b/athina/steps/loop.py new file mode 100644 index 0000000..d9d5eaf --- /dev/null +++ b/athina/steps/loop.py @@ -0,0 +1,188 @@ +import asyncio +from typing import Dict, List, Any, Optional +from athina.steps.base import Step +from pydantic import ConfigDict +from athina.steps.code_execution_v2 import CodeExecutionV2, EXECUTION_E2B +from concurrent.futures import ThreadPoolExecutor + + +class LoopStep(Step): + """Step that evaluates conditions and executes appropriate branch steps.""" + + loop_type: str + loop_input: Optional[str] + loop_count: Optional[int] + sequence: List[Step] + execution_mode: Optional[str] + max_workers: int = 5 # Default number of workers + + async def _execute_single_step(self, step: Step, context: Dict) -> Dict: + """Execute a single step asynchronously using ThreadPoolExecutor.""" + loop = asyncio.get_event_loop() + with ThreadPoolExecutor(max_workers=1) as executor: + return await loop.run_in_executor( + executor, + step.execute, + context + ) + + async def _execute_steps_async(self, steps: List[Step], inputs: Dict, semaphore: asyncio.Semaphore) -> Dict: + """Execute a sequence of steps with given inputs asynchronously.""" + async with semaphore: + cumulative_context = inputs.copy() + final_output = None + executed_steps = [] + + for step in steps: + # Execute each step asynchronously + step_result = await self._execute_single_step(step, cumulative_context) + executed_steps.append(step_result) + cumulative_context = { + **cumulative_context, + f"{step.name}": step_result.get("data", {}), + } + final_output = step_result.get("data") + + return { + "status": "success", + "data": final_output, + "metadata": {"executed_steps": executed_steps}, + } + + async def _process_batch(self, batch_items: List[tuple], inputs: Dict, semaphore: asyncio.Semaphore) -> List[Dict]: + """Process a batch of items in parallel.""" + tasks = [] + for index, item in batch_items: + context = {**inputs, "item": item, "index": index, "count": len(batch_items)} + tasks.append(self._execute_steps_async(self.sequence, context, semaphore)) + return await asyncio.gather(*tasks) + + async def _process_count(self, count: int, inputs: Dict, semaphore: asyncio.Semaphore) -> List[Dict]: + """Process a count of items in parallel.""" + tasks = [] + for index in range(count): + context = {**inputs, "index": index, "count": count} + tasks.append(self._execute_steps_async(self.sequence, context, semaphore)) + return await asyncio.gather(*tasks) + + def execute(self, inputs: Dict) -> Dict: + """Execute the loop step by running input list items on appropriate steps one by one.""" + try: + if self.loop_type == "map": + loop_input = inputs.get(self.loop_input, []) + + if not isinstance(loop_input, list): + return { + "status": "error", + "data": "Input not of type list", + "metadata": {}, + } + + current = [] + executed_steps = [] + + if self.execution_mode == "parallel": + async def run_parallel(): + semaphore = asyncio.Semaphore(self.max_workers) + # Create a list of (index, item) tuples + indexed_items = list(enumerate(loop_input)) + results = await self._process_batch(indexed_items, inputs, semaphore) + return results + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + results = loop.run_until_complete(run_parallel()) + for result in results: + current.append(result.get("data")) + executed_steps.append(result.get("metadata")) + finally: + loop.close() + else: + # Sequential execution + for index, item in enumerate(loop_input): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + semaphore = asyncio.Semaphore(1) + result = loop.run_until_complete( + self._execute_steps_async( + self.sequence, + {**inputs, "item": item, "index": index, "count": len(loop_input)}, + semaphore + ) + ) + current.append(result.get("data")) + executed_steps.append(result.get("metadata")) + finally: + loop.close() + + return { + "status": "success", + "data": current, + "metadata": { + "executed_steps": executed_steps, + }, + } + + else: + count = self.loop_count + + if not isinstance(count, int) or count <= 0: + return { + "status": "error", + "data": "Invalid loop count", + "metadata": {}, + } + + current = [] + executed_steps = [] + + if self.execution_mode == "parallel": + async def run_parallel(): + semaphore = asyncio.Semaphore(self.max_workers) + results = await self._process_count(count=count, inputs = inputs, semaphore = semaphore) + return results + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + results = loop.run_until_complete(run_parallel()) + for result in results: + current.append(result.get("data")) + executed_steps.append(result.get("metadata")) + finally: + loop.close() + else: + # Sequential execution + for index in range(count): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + semaphore = asyncio.Semaphore(1) + result = loop.run_until_complete( + self._execute_steps_async( + self.sequence, + {**inputs, "index": index, "count": count}, + semaphore + ) + ) + current.append(result.get("data")) + executed_steps.append(result.get("metadata")) + finally: + loop.close() + + return { + "status": "success", + "data": current, + "metadata": { + "executed_steps": executed_steps, + }, + } + + except Exception as e: + return { + "status": "error", + "data": f"Loop step execution failed: {str(e)}", + "metadata": {}, + } \ No newline at end of file From 73bb32ea09b802cb4722a837df946a940b2fd928 Mon Sep 17 00:00:00 2001 From: Akhil Date: Wed, 29 Jan 2025 10:16:23 +0530 Subject: [PATCH 2/4] Fix loop step comments --- athina/steps/__init__.py | 4 +- athina/steps/loop.py | 211 +++++++++++---------------------------- 2 files changed, 61 insertions(+), 154 deletions(-) diff --git a/athina/steps/__init__.py b/athina/steps/__init__.py index 36615fa..cf4661d 100644 --- a/athina/steps/__init__.py +++ b/athina/steps/__init__.py @@ -2,7 +2,7 @@ from athina.steps.conditional import ConditionalStep from athina.steps.chain import Chain from athina.steps.iterator import Map -from athina.steps.loop import LoopStep +from athina.steps.loop import Loop from athina.steps.llm import PromptExecution from athina.steps.api import ApiCall from athina.steps.extract_entities import ExtractEntities @@ -43,5 +43,5 @@ "SpiderCrawl", "ParseDocument", "ConditionalStep", - "LoopStep", + "Loop", ] diff --git a/athina/steps/loop.py b/athina/steps/loop.py index d9d5eaf..869c903 100644 --- a/athina/steps/loop.py +++ b/athina/steps/loop.py @@ -1,24 +1,20 @@ import asyncio from typing import Dict, List, Any, Optional from athina.steps.base import Step -from pydantic import ConfigDict -from athina.steps.code_execution_v2 import CodeExecutionV2, EXECUTION_E2B from concurrent.futures import ThreadPoolExecutor -class LoopStep(Step): - """Step that evaluates conditions and executes appropriate branch steps.""" - +class Loop(Step): loop_type: str loop_input: Optional[str] loop_count: Optional[int] sequence: List[Step] execution_mode: Optional[str] - max_workers: int = 5 # Default number of workers + max_workers: int = 5 async def _execute_single_step(self, step: Step, context: Dict) -> Dict: """Execute a single step asynchronously using ThreadPoolExecutor.""" - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() with ThreadPoolExecutor(max_workers=1) as executor: return await loop.run_in_executor( executor, @@ -26,163 +22,74 @@ async def _execute_single_step(self, step: Step, context: Dict) -> Dict: context ) - async def _execute_steps_async(self, steps: List[Step], inputs: Dict, semaphore: asyncio.Semaphore) -> Dict: - """Execute a sequence of steps with given inputs asynchronously.""" + async def _execute_sequence(self, inputs: Dict, semaphore: asyncio.Semaphore) -> Dict: + """Execute a sequence of steps asynchronously with proper context handling.""" async with semaphore: - cumulative_context = inputs.copy() - final_output = None + context = inputs.copy() executed_steps = [] + final_output = None - for step in steps: - # Execute each step asynchronously - step_result = await self._execute_single_step(step, cumulative_context) - executed_steps.append(step_result) - cumulative_context = { - **cumulative_context, - f"{step.name}": step_result.get("data", {}), + for step in self.sequence: + result = await self._execute_single_step(step, context) + executed_steps.append(result) + context = { + **context, + f"{step.name}": result.get("data", {}), } - final_output = step_result.get("data") + final_output = result.get("data") # Ensure final output is correctly captured return { "status": "success", - "data": final_output, - "metadata": {"executed_steps": executed_steps}, + "data": final_output, # Ensure only final result is returned + "metadata": {"executed_steps": executed_steps} } - async def _process_batch(self, batch_items: List[tuple], inputs: Dict, semaphore: asyncio.Semaphore) -> List[Dict]: - """Process a batch of items in parallel.""" - tasks = [] - for index, item in batch_items: - context = {**inputs, "item": item, "index": index, "count": len(batch_items)} - tasks.append(self._execute_steps_async(self.sequence, context, semaphore)) - return await asyncio.gather(*tasks) - - async def _process_count(self, count: int, inputs: Dict, semaphore: asyncio.Semaphore) -> List[Dict]: - """Process a count of items in parallel.""" - tasks = [] - for index in range(count): - context = {**inputs, "index": index, "count": count} - tasks.append(self._execute_steps_async(self.sequence, context, semaphore)) - return await asyncio.gather(*tasks) + async def _execute_loop(self, inputs: Dict) -> Dict: + """Handles loop execution, managing parallelism properly.""" + semaphore = asyncio.Semaphore(self.max_workers if self.execution_mode == "parallel" else 1) + results = [] - def execute(self, inputs: Dict) -> Dict: - """Execute the loop step by running input list items on appropriate steps one by one.""" - try: - if self.loop_type == "map": - loop_input = inputs.get(self.loop_input, []) - - if not isinstance(loop_input, list): - return { - "status": "error", - "data": "Input not of type list", - "metadata": {}, - } - - current = [] - executed_steps = [] - - if self.execution_mode == "parallel": - async def run_parallel(): - semaphore = asyncio.Semaphore(self.max_workers) - # Create a list of (index, item) tuples - indexed_items = list(enumerate(loop_input)) - results = await self._process_batch(indexed_items, inputs, semaphore) - return results + if self.loop_type == "map": + items = inputs.get(self.loop_input, []) + if not isinstance(items, list): + return {"status": "error", "data": "Input not of type list", "metadata": {}} - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - results = loop.run_until_complete(run_parallel()) - for result in results: - current.append(result.get("data")) - executed_steps.append(result.get("metadata")) - finally: - loop.close() - else: - # Sequential execution - for index, item in enumerate(loop_input): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - semaphore = asyncio.Semaphore(1) - result = loop.run_until_complete( - self._execute_steps_async( - self.sequence, - {**inputs, "item": item, "index": index, "count": len(loop_input)}, - semaphore - ) - ) - current.append(result.get("data")) - executed_steps.append(result.get("metadata")) - finally: - loop.close() + tasks = [ + self._execute_sequence( + {**inputs, "item": item, "index": idx, "count": len(items)}, + semaphore + ) + for idx, item in enumerate(items) + ] + else: + if not isinstance(self.loop_count, int) or self.loop_count <= 0: + return {"status": "error", "data": "Invalid loop count", "metadata": {}} - return { - "status": "success", - "data": current, - "metadata": { - "executed_steps": executed_steps, - }, - } - - else: - count = self.loop_count - - if not isinstance(count, int) or count <= 0: - return { - "status": "error", - "data": "Invalid loop count", - "metadata": {}, - } - - current = [] - executed_steps = [] - - if self.execution_mode == "parallel": - async def run_parallel(): - semaphore = asyncio.Semaphore(self.max_workers) - results = await self._process_count(count=count, inputs = inputs, semaphore = semaphore) - return results + tasks = [ + self._execute_sequence( + {**inputs, "index": i, "count": self.loop_count}, + semaphore + ) + for i in range(self.loop_count) + ] - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - results = loop.run_until_complete(run_parallel()) - for result in results: - current.append(result.get("data")) - executed_steps.append(result.get("metadata")) - finally: - loop.close() - else: - # Sequential execution - for index in range(count): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - semaphore = asyncio.Semaphore(1) - result = loop.run_until_complete( - self._execute_steps_async( - self.sequence, - {**inputs, "index": index, "count": count}, - semaphore - ) - ) - current.append(result.get("data")) - executed_steps.append(result.get("metadata")) - finally: - loop.close() + results = await asyncio.gather(*tasks) # Gather results concurrently - return { - "status": "success", - "data": current, - "metadata": { - "executed_steps": executed_steps, - }, - } - + return { + "status": "success", + "data": [r["data"] for r in results], # Ensure correct final output format + "metadata": {"executed_steps": [r["metadata"] for r in results]} + } + + def execute(self, inputs: Dict) -> Dict: + """Handles execution, avoiding issues with already running event loops.""" + try: + loop = asyncio.get_event_loop() + if loop.is_running(): + future = asyncio.ensure_future(self._execute_loop(inputs)) + loop.run_until_complete(future) + return future.result() + else: + return asyncio.run(self._execute_loop(inputs)) except Exception as e: - return { - "status": "error", - "data": f"Loop step execution failed: {str(e)}", - "metadata": {}, - } \ No newline at end of file + return {"status": "error", "data": str(e), "metadata": {}} From 393f716d31aa79c9740b59777d15ce4e6fba0bfa Mon Sep 17 00:00:00 2001 From: Akhilathina Date: Thu, 30 Jan 2025 11:51:54 +0530 Subject: [PATCH 3/4] Update version pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 4d9a167..1cfbda2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "athina" -version = "1.7.7" +version = "1.7.8" description = "Python SDK to configure and run evaluations for your LLM-based application" authors = ["Shiv Sakhuja ", "Akshat Gupta ", "Vivek Aditya ", "Akhil Bisht "] readme = "README.md" From cc7a2947a6b249d790b510ac643118cbe052cafe Mon Sep 17 00:00:00 2001 From: Akhilathina Date: Thu, 30 Jan 2025 11:57:00 +0530 Subject: [PATCH 4/4] Update pyproject.toml --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 1cfbda2..5d7562b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "athina" -version = "1.7.8" +version = "1.7.10" description = "Python SDK to configure and run evaluations for your LLM-based application" authors = ["Shiv Sakhuja ", "Akshat Gupta ", "Vivek Aditya ", "Akhil Bisht "] readme = "README.md"