Skip to content

Commit

Permalink
Fix #3402: Prevent invalid timestamps to reach the database (#3425)
Browse files Browse the repository at this point in the history
  • Loading branch information
leplatrem authored Jun 20, 2024
1 parent b68cae5 commit 92bdfd2
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
8 changes: 6 additions & 2 deletions kinto/core/resource/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,10 @@ def _extract_limit(self):

def _extract_filters(self):
"""Extracts filters from QueryString parameters."""

def is_valid_timestamp(value):
return isinstance(value, int) or re.match(r'^"?\d+"?$', str(value))

queryparams = self.request.validated["querystring"]

filters = []
Expand Down Expand Up @@ -1090,7 +1094,7 @@ def _extract_filters(self):
send_alert(self.request, message, url)
operator = COMPARISON.LT

if value == "" or not isinstance(value, (int, str, type(None))):
if value is not None and not is_valid_timestamp(value):
raise_invalid(self.request, **error_details)

filters.append(Filter(self.model.modified_field, value, operator))
Expand Down Expand Up @@ -1127,7 +1131,7 @@ def _extract_filters(self):
error_details["description"] = "Invalid character 0x00"
raise_invalid(self.request, **error_details)

if field == self.model.modified_field and value == "":
if field == self.model.modified_field and not is_valid_timestamp(value):
raise_invalid(self.request, **error_details)

filters.append(Filter(field, value, operator))
Expand Down
9 changes: 9 additions & 0 deletions tests/core/resource/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,15 @@ def test_filter_raises_error_if_last_modified_value_is_empty(self):
self.validated["querystring"] = {"lt_last_modified": ""}
self.assertRaises(httpexceptions.HTTPBadRequest, self.resource.plural_get)

def test_filter_raises_error_if_last_modified_value_is_not_int(self):
bad_value = "171103608603432920249' or '7127'='7127"
self.validated["querystring"] = {"last_modified": bad_value}
self.assertRaises(httpexceptions.HTTPBadRequest, self.resource.plural_get)
self.validated["querystring"] = {"_since": bad_value}
self.assertRaises(httpexceptions.HTTPBadRequest, self.resource.plural_get)
self.validated["querystring"] = {"lt_last_modified": bad_value}
self.assertRaises(httpexceptions.HTTPBadRequest, self.resource.plural_get)

def test_filter_works_with_since_none(self):
self.validated["querystring"] = {"_since": None}
result = self.resource.plural_get()
Expand Down

0 comments on commit 92bdfd2

Please sign in to comment.