Skip to content

Commit

Permalink
Merge pull request #651 from roboflow/visualize_line_counter_zone
Browse files Browse the repository at this point in the history
Add line counter visualization block
  • Loading branch information
PawelPeczek-Roboflow committed Sep 18, 2024
2 parents 3e50476 + 82d912b commit ae8d48e
Show file tree
Hide file tree
Showing 4 changed files with 259 additions and 0 deletions.
4 changes: 4 additions & 0 deletions inference/core/workflows/core_steps/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@
from inference.core.workflows.core_steps.visualizations.label.v1 import (
LabelVisualizationBlockV1,
)
from inference.core.workflows.core_steps.visualizations.line_zone.v1 import (
LineCounterZoneVisualizationBlockV1,
)
from inference.core.workflows.core_steps.visualizations.mask.v1 import (
MaskVisualizationBlockV1,
)
Expand Down Expand Up @@ -307,6 +310,7 @@ def load_blocks() -> List[Type[WorkflowBlock]]:
MaskVisualizationBlockV1,
PixelateVisualizationBlockV1,
PolygonVisualizationBlockV1,
LineCounterZoneVisualizationBlockV1,
TriangleVisualizationBlockV1,
RoboflowCustomMetadataBlockV1,
DetectionsStitchBlockV1,
Expand Down
Empty file.
156 changes: 156 additions & 0 deletions inference/core/workflows/core_steps/visualizations/line_zone/v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import hashlib
from typing import Dict, List, Literal, Optional, Tuple, Type, Union

import cv2 as cv
import numpy as np
import supervision as sv
from pydantic import ConfigDict, Field

from inference.core.workflows.core_steps.visualizations.common.base import (
OUTPUT_IMAGE_KEY,
VisualizationBlock,
VisualizationManifest,
)
from inference.core.workflows.core_steps.visualizations.common.utils import str_to_color
from inference.core.workflows.execution_engine.entities.base import WorkflowImageData
from inference.core.workflows.execution_engine.entities.types import (
FLOAT_ZERO_TO_ONE_KIND,
INTEGER_KIND,
LIST_OF_VALUES_KIND,
STRING_KIND,
FloatZeroToOne,
StepOutputSelector,
WorkflowParameterSelector,
)
from inference.core.workflows.prototypes.block import BlockResult, WorkflowBlockManifest

TYPE: str = "roboflow_core/line_counter_zone_visualization@v1"
SHORT_DESCRIPTION = "Paints a mask over line zone in an image."
LONG_DESCRIPTION = """
The `LineCounterZoneVisualization` block draws line
in an image with a specified color and opacity.
Please note: line zone will be drawn on top of image passed to this block,
this block should be placed before other visualization blocks in the workflow.
"""


class LineCounterZoneVisualizationManifest(VisualizationManifest):
type: Literal[f"{TYPE}"]
model_config = ConfigDict(
json_schema_extra={
"name": "Polygon Zone Visualization",
"version": "v1",
"short_description": SHORT_DESCRIPTION,
"long_description": LONG_DESCRIPTION,
"license": "Apache-2.0",
"block_type": "visualization",
}
)
zone: Union[list, StepOutputSelector(kind=[LIST_OF_VALUES_KIND]), WorkflowParameterSelector(kind=[LIST_OF_VALUES_KIND])] = Field( # type: ignore
description="Line zones (one for each batch) in a format [[(x1, y1), (x2, y2)], ...];"
" each zone must consist of exactly two points",
examples=["$inputs.zones"],
)
color: Union[str, WorkflowParameterSelector(kind=[STRING_KIND])] = Field( # type: ignore
description="Color of the zone.",
default="#5bb573",
examples=["WHITE", "#FFFFFF", "rgb(255, 255, 255)" "$inputs.background_color"],
)
thickness: Union[int, WorkflowParameterSelector(kind=[INTEGER_KIND])] = Field( # type: ignore
description="Thickness of the lines in pixels.",
default=2,
examples=[2, "$inputs.thickness"],
)
count_in: Union[int, WorkflowParameterSelector(kind=[INTEGER_KIND]), StepOutputSelector(kind=[INTEGER_KIND])] = Field( # type: ignore
description="Thickness of the lines in pixels.",
default=0,
examples=[2, "$inputs.thickness"],
)
count_out: Union[int, WorkflowParameterSelector(kind=[INTEGER_KIND]), StepOutputSelector(kind=[INTEGER_KIND])] = Field( # type: ignore
description="Thickness of the lines in pixels.",
default=0,
examples=[2, "$inputs.thickness"],
)
opacity: Union[FloatZeroToOne, WorkflowParameterSelector(kind=[FLOAT_ZERO_TO_ONE_KIND])] = Field( # type: ignore
description="Transparency of the Mask overlay.",
default=0.3,
examples=[0.3, "$inputs.opacity"],
)

@classmethod
def get_execution_engine_compatibility(cls) -> Optional[str]:
return ">=1.0.0,<2.0.0"


class LineCounterZoneVisualizationBlockV1(VisualizationBlock):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._cache: Dict[str, np.ndarray] = {}

@classmethod
def get_manifest(cls) -> Type[WorkflowBlockManifest]:
return LineCounterZoneVisualizationManifest

def getAnnotator(
self,
**kwargs,
) -> sv.PolygonZoneAnnotator:
pass

def run(
self,
image: WorkflowImageData,
zone: List[Tuple[int, int]],
copy_image: bool,
color: str,
thickness: int,
count_in: int,
count_out: int,
opacity: float,
) -> BlockResult:
h, w, *_ = image.numpy_image.shape
zone_fingerprint = hashlib.md5(str(zone).encode()).hexdigest()
key = f"{zone_fingerprint}_{color}_{opacity}_{w}_{h}"
x1, y1 = zone[0]
x2, y2 = zone[1]
if key not in self._cache:
mask = np.zeros(
shape=image.numpy_image.shape,
dtype=image.numpy_image.dtype,
)
mask = cv.line(
img=mask,
pt1=(x1, y1),
pt2=(x2, y2),
color=str_to_color(color).as_bgr(),
thickness=thickness,
)
self._cache[key] = mask
mask = self._cache[key].copy()

np_image = image.numpy_image
if copy_image:
np_image = np_image.copy()
annotated_image = cv.addWeighted(
src1=mask,
alpha=opacity,
src2=np_image,
beta=1,
gamma=0,
)
annotated_image = sv.draw_text(
scene=annotated_image,
text=f"in: {count_in}, out: {count_out}",
text_anchor=sv.Point(x1, y1),
text_thickness=1,
background_color=sv.Color.WHITE,
text_padding=0,
)

output = WorkflowImageData(
parent_metadata=image.parent_metadata,
workflow_root_ancestor_metadata=image.workflow_root_ancestor_metadata,
numpy_image=annotated_image,
)

return {OUTPUT_IMAGE_KEY: output}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import cv2 as cv
import numpy as np
import pytest
from pydantic import ValidationError

from inference.core.workflows.core_steps.visualizations.line_zone.v1 import (
LineCounterZoneVisualizationManifest,
LineCounterZoneVisualizationBlockV1,
)
from inference.core.workflows.execution_engine.entities.base import (
ImageParentMetadata,
WorkflowImageData,
)


@pytest.mark.parametrize(
"type_alias", ["roboflow_core/line_counter_zone_visualization@v1"]
)
@pytest.mark.parametrize("images_field_alias", ["images", "image"])
def test_line_counter_zone_validation_when_valid_manifest_is_given(
type_alias: str,
images_field_alias: str,
) -> None:
# given
data = {
"type": type_alias,
"name": "line_counter_zone_1",
"zone": "$inputs.zone",
images_field_alias: "$inputs.image",
"color": "#FFFFFF",
"opacity": 0.5,
"thickness": 3,
"count_in": 7,
"count_out": 1,
}

# when
result = LineCounterZoneVisualizationManifest.model_validate(data)

# then
assert result == LineCounterZoneVisualizationManifest(
type=type_alias,
name="line_counter_zone_1",
images="$inputs.image",
zone= "$inputs.zone",
color="#FFFFFF",
opacity=0.5,
thickness=3,
count_in=7,
count_out=1,
)


def test_line_counter_zone_validation_when_invalid_image_is_given() -> None:
# given
data = {
"type": "roboflow_core/line_counter_zone_visualization@v1",
"name": "line_counter_zone_1",
"zone": "$inputs.zone",
"images": "invalid",
"color": "#FFFFFF",
"opacity": 0.5,
"thickness": 3,
"count_in": 7,
"count_out": 1,
}

# when
with pytest.raises(ValidationError):
_ = LineCounterZoneVisualizationManifest.model_validate(data)


def test_line_counter_zone_visualization_block() -> None:
# given
block = LineCounterZoneVisualizationBlockV1()

start_image = np.random.randint(0, 255, (1000, 1000, 3), dtype=np.uint8)
output = block.run(
image=WorkflowImageData(
parent_metadata=ImageParentMetadata(parent_id="some"),
numpy_image=start_image,
),
zone=[(10, 10), (100, 100)],
copy_image=True,
color="#FF0000",
opacity=1,
thickness=3,
count_in=7,
count_out=1,
)

assert isinstance(output, dict)
assert "image" in output
assert hasattr(output["image"], "numpy_image")

# dimensions of output match input
assert output.get("image").numpy_image.shape == (1000, 1000, 3)
# check if the image is modified
assert not np.array_equal(output.get("image").numpy_image, start_image)

0 comments on commit ae8d48e

Please sign in to comment.