Skip to content

Commit

Permalink
Merge pull request #645 from roboflow/fix/workflows_ee_lineage_missed…
Browse files Browse the repository at this point in the history
…_when_crop_and_if

Fix problem with lineage and conditional execution
  • Loading branch information
PawelPeczek-Roboflow committed Sep 17, 2024
2 parents 6eb31af + 7831677 commit ed8a156
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 8 deletions.
2 changes: 1 addition & 1 deletion inference/core/workflows/execution_engine/v1/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
validate_runtime_input,
)

EXECUTION_ENGINE_V1_VERSION = Version("1.1.0")
EXECUTION_ENGINE_V1_VERSION = Version("1.1.1")


class ExecutionEngineV1(BaseExecutionEngine):
Expand Down
7 changes: 3 additions & 4 deletions inference/core/workflows/execution_engine/v1/executor/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,9 @@ def run_simd_step_in_batch_mode(
step_input = execution_data_manager.get_simd_step_input(step_selector=step_selector)
if not step_input.indices:
# no inputs - discarded either by conditional exec or by not accepting empty
return None
outputs = step_instance.run(**step_input.parameters)
outputs = []
else:
outputs = step_instance.run(**step_input.parameters)
execution_data_manager.register_simd_step_output(
step_selector=step_selector,
indices=step_input.indices,
Expand All @@ -175,8 +176,6 @@ def run_simd_step_in_non_batch_mode(
result = step_instance.run(**input_definition.parameters)
results.append(result)
indices.append(input_definition.index)
if not indices:
return None
execution_data_manager.register_simd_step_output(
step_selector=step_selector,
indices=indices,
Expand Down
2 changes: 1 addition & 1 deletion tests/inference/hosted_platform_tests/test_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def test_get_versions_of_execution_engine(object_detection_service_url: str) ->
# then
response.raise_for_status()
response_data = response.json()
assert response_data["versions"] == ["1.1.0"]
assert response_data["versions"] == ["1.1.1"]


FUNCTION = """
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ def test_get_versions_of_execution_engine(server_url: str) -> None:
# then
response.raise_for_status()
response_data = response.json()
assert response_data["versions"] == ["1.1.0"]
assert response_data["versions"] == ["1.1.1"]


def test_getting_block_schema_using_get_endpoint(server_url) -> None:
Expand Down
3 changes: 2 additions & 1 deletion tests/inference/models_predictions_tests/test_florence2.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import numpy as np
import pytest

from inference.models.florence2 import Florence2
import numpy as np


@pytest.mark.slow
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import numpy as np

from inference.core.env import WORKFLOWS_MAX_CONCURRENT_STEPS
from inference.core.managers.base import ModelManager
from inference.core.workflows.core_steps.common.entities import StepExecutionMode
from inference.core.workflows.execution_engine.core import ExecutionEngine

PROBLEMATIC_WORKFLOW = {
"version": "1.0",
"inputs": [{"type": "WorkflowImage", "name": "image"}],
"steps": [
{
"type": "ObjectDetectionModel",
"name": "general_detection",
"image": "$inputs.image",
"model_id": "yolov8n-640",
"class_filter": ["dog"],
},
{
"type": "ContinueIf",
"name": "continue_if",
"condition_statement": {
"type": "StatementGroup",
"statements": [
{
"type": "BinaryStatement",
"left_operand": {
"type": "DynamicOperand",
"operand_name": "prediction",
"operations": [{"type": "SequenceLength"}],
},
"comparator": {"type": "(Number) =="},
"right_operand": {
"type": "StaticOperand",
"value": 5,
},
}
],
},
"evaluation_parameters": {
"prediction": "$steps.general_detection.predictions"
},
"next_steps": ["$steps.cropping"],
},
{
"type": "Crop",
"name": "cropping",
"image": "$inputs.image",
"predictions": "$steps.general_detection.predictions",
},
{
"type": "ClassificationModel",
"name": "breds_classification",
"image": "$steps.cropping.crops",
"model_id": "dog-breed-xpaq6/1",
},
],
"outputs": [
{
"type": "JsonField",
"name": "predictions",
"selector": "$steps.breds_classification.predictions",
},
],
}


def test_workflow_with_flow_control_eliminating_step_changing_lineage(
model_manager: ModelManager,
dogs_image: np.ndarray,
) -> None:
"""
This test case covers bug that in Execution Engine versions <=1.1.0.
The bug was not registering outputs from steps given no inputs provided
(inputs may have been filtered by flow-control steps). As a result,
given that step changes dimensionality (registers new data lineage) -
registration could not happen and downstream steps was raising error:
"Lineage ['<workflow_input>', 'XXX'] not found. [...]"
"""
# given
workflow_init_parameters = {
"workflows_core.model_manager": model_manager,
"workflows_core.api_key": None,
"workflows_core.step_execution_mode": StepExecutionMode.LOCAL,
}
execution_engine = ExecutionEngine.init(
workflow_definition=PROBLEMATIC_WORKFLOW,
init_parameters=workflow_init_parameters,
max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS,
)

# when
result = execution_engine.run(
runtime_parameters={
"image": [dogs_image],
}
)

# then
assert isinstance(result, list), "Expected result to be list"
assert len(result) == 1, "1 image provided, so 1 output elements expected"
assert result[0].keys() == {
"predictions"
}, "Expected all declared outputs to be delivered for first result"
assert len([e for e in result[0]["predictions"] if e]) == 0, (
"Expected no predictions, due to conditional execution applied, effectively preventing "
"`cropping` step from running"
)

0 comments on commit ed8a156

Please sign in to comment.