Skip to content

Commit

Permalink
Adding the event source method to NinjaAPI and Router
Browse files Browse the repository at this point in the history
  • Loading branch information
rroblf01 committed Jan 10, 2025
1 parent 39f40b3 commit 686b328
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 1 deletion.
5 changes: 5 additions & 0 deletions docs/docs/guides/input/operations.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ An `operation` can be one of the following [HTTP methods](https://developer.mozi
- PUT
- DELETE
- PATCH
- EVENT SOURCE

**Django Ninja** comes with a decorator for each operation:

Expand All @@ -32,6 +33,10 @@ def delete_operation(request):
@api.patch("/path")
def patch_operation(request):
...

@api.event_source("/path")
def event_source_operation(request):
...
```

See the [operations parameters](../../reference/operations-parameters.md)
Expand Down
55 changes: 54 additions & 1 deletion ninja/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
Union,
)

from django.http import HttpRequest, HttpResponse
from django.http import HttpRequest, HttpResponse, StreamingHttpResponse
from django.urls import URLPattern, URLResolver, reverse
from django.utils.module_loading import import_string

Expand Down Expand Up @@ -334,6 +334,59 @@ def put(
openapi_extra=openapi_extra,
)

def event_source(
self,
path: str,
*,
auth: Any = NOT_SET,
throttle: Union[BaseThrottle, List[BaseThrottle], NOT_SET_TYPE] = NOT_SET,
response: Any = NOT_SET,
operation_id: Optional[str] = None,
summary: Optional[str] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
deprecated: Optional[bool] = None,
by_alias: Optional[bool] = None,
exclude_unset: Optional[bool] = None,
exclude_defaults: Optional[bool] = None,
exclude_none: Optional[bool] = None,
url_name: Optional[str] = None,
include_in_schema: bool = True,
openapi_extra: Optional[Dict[str, Any]] = None,
) -> Callable[[TCallable], TCallable]:
"""
`EventSource` operation.
"""
def decorator(view_func: TCallable) -> TCallable:
def wrapped_view(request: HttpRequest, *args: Any, **kwargs: Any) -> StreamingHttpResponse:
response = StreamingHttpResponse(view_func(request, *args, **kwargs), content_type='text/event-stream')
response['Cache-Control'] = 'no-cache'
return response

self.default_router.add_api_operation(
path,
methods=["GET"],
view_func=wrapped_view,
auth=auth,
throttle=throttle,
response=response,
operation_id=operation_id,
summary=summary,
description=description,
tags=tags,
deprecated=deprecated,
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_defaults=exclude_defaults,
exclude_none=exclude_none,
url_name=url_name,
include_in_schema=include_in_schema,
openapi_extra=openapi_extra,
)
return view_func

return decorator

def api_operation(
self,
methods: List[str],
Expand Down
51 changes: 51 additions & 0 deletions ninja/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
Union,
)

from django.http import HttpRequest, StreamingHttpResponse
from django.urls import URLPattern
from django.urls import path as django_path
from django.utils.module_loading import import_string
Expand Down Expand Up @@ -252,6 +253,56 @@ def put(
openapi_extra=openapi_extra,
)

def event_source(
self,
path: str,
*,
auth: Any = NOT_SET,
throttle: Union[BaseThrottle, List[BaseThrottle], NOT_SET_TYPE] = NOT_SET,
response: Any = NOT_SET,
operation_id: Optional[str] = None,
summary: Optional[str] = None,
description: Optional[str] = None,
tags: Optional[List[str]] = None,
deprecated: Optional[bool] = None,
by_alias: Optional[bool] = None,
exclude_unset: Optional[bool] = None,
exclude_defaults: Optional[bool] = None,
exclude_none: Optional[bool] = None,
url_name: Optional[str] = None,
include_in_schema: bool = True,
openapi_extra: Optional[Dict[str, Any]] = None,
) -> Callable[[TCallable], TCallable]:
def decorator(view_func: TCallable) -> TCallable:
def wrapped_view(request: HttpRequest, *args: Any, **kwargs: Any) -> StreamingHttpResponse:
response = StreamingHttpResponse(view_func(request, *args, **kwargs), content_type='text/event-stream')
response['Cache-Control'] = 'no-cache'
return response

self.add_api_operation(
path,
methods=["GET"],
view_func=wrapped_view,
auth=auth,
throttle=throttle,
response=response,
operation_id=operation_id,
summary=summary,
description=description,
tags=tags,
deprecated=deprecated,
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_defaults=exclude_defaults,
exclude_none=exclude_none,
url_name=url_name,
include_in_schema=include_in_schema,
openapi_extra=openapi_extra,
)
return view_func

return decorator

def api_operation(
self,
methods: List[str],
Expand Down
23 changes: 23 additions & 0 deletions tests/test_event_source.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from ninja import NinjaAPI
from ninja.testing import TestClient

api = NinjaAPI()

messages = ["lorem", "ipsum", "dolor", "sit", "amet", "consectetur", "adipiscing", "elit"]

@api.event_source("/event_source")
def event_source_op(request):
for message in messages:
yield f"data: {message}\n\n"


client = TestClient(api)


def test_event_source_op():
response = client.get("/event_source")

assert response.status_code == 200

expected_content = b"".join([f"data: {message}\n\n".encode() for message in messages])
assert response.content == expected_content

0 comments on commit 686b328

Please sign in to comment.