Skip to content

Commit

Permalink
Fix OK responses for events (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
UTXOnly authored Mar 9, 2024
1 parent 561883a commit 9fbc283
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 40 deletions.
7 changes: 4 additions & 3 deletions docker_stuff/python_stuff/event_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,12 @@ async def add_event(self, conn, cur) -> None:
)
await conn.commit()

def evt_response(self, results_json, http_status_code):
def evt_response(self, results_status, http_status_code, message=""):
response = {
"event": "OK",
"subscription_id": "n0stafarian419",
"results_json": results_json,
"subscription_id": self.event_id,
"results_json": results_status,
"message": message,
}
return JSONResponse(content=response, status_code=http_status_code)

Expand Down
42 changes: 30 additions & 12 deletions docker_stuff/python_stuff/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
redis_client: redis.Redis = redis.Redis(host="172.28.0.6", port=6379)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)
log_file = "./logs/event_handler.log"
handler = RotatingFileHandler(log_file, maxBytes=1000000, backupCount=5)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
Expand Down Expand Up @@ -121,24 +121,42 @@ async def handle_new_event(request: Request) -> JSONResponse:
await event_obj.delete_event(
conn, cur, events_to_delete, logger
)
return event_obj.evt_response("true", 200)
return event_obj.evt_response(
results_status="true", http_status_code=200
)
else:
return event_obj.evt_response("flase", 200)
return event_obj.evt_response(
results_status="flase", http_status_code=200
)

else:
await event_obj.add_event(conn, cur)
statsd.increment("nostr.event.added.count", tags=["func:new_event"])
return event_obj.evt_response("true", 200)
try:
await event_obj.add_event(conn, cur)
statsd.increment(
"nostr.event.added.count", tags=["func:new_event"]
)
except psycopg.IntegrityError as e:
conn.rollback()
logger.info(
f"Event with ID {event_obj.event_id} already exists"
)
return event_obj.evt_response(
results_status="true",
http_status_code=409,
message="duplicate: already have this event",
)

return event_obj.evt_response(
results_status="true", http_status_code=200
)

except psycopg.IntegrityError as e:
conn.rollback()
return event_obj.evt_response(
f"Event with ID {event_obj.event_id} already exists", 409
)
except Exception as e:
logger.debug(f"Entering gen exc")
conn.rollback()
return event_obj.evt_response(
f"Error:{e} occured adding event {event_obj.event_id}", 409
results_status="false",
http_status_code=500,
message="error: could not connect to the database",
)


Expand Down
30 changes: 5 additions & 25 deletions docker_stuff/python_stuff/websocket_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
tracer.configure(hostname="172.28.0.5", port=8126)

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)

log_file: str = "./logs/websocket_handler.log"
handler = RotatingFileHandler(log_file, maxBytes=1000000, backupCount=5)
Expand Down Expand Up @@ -161,6 +161,7 @@ def __init__(self, response_data):
"""
self.event_type = response_data["event"]
self.subscription_id = response_data["subscription_id"]
self.message = response_data.get("message", "")
try:
self.results = json.loads(response_data["results_json"])
except json.JSONDecodeError as json_error:
Expand All @@ -169,21 +170,6 @@ def __init__(self, response_data):
)
self.results = ""

self.comment = ""
self.rate_limit_response: Tuple[str, Optional[str], str, Optional[str]] = (
"OK",
"nostafarian419",
"false",
"rate-limited: slow your roll nostrich",
)
self.duplicate_response: Tuple[str, Optional[str], str, Optional[str]] = (
"OK",
"nostafarian419",
"false",
"duplicate: already have this event",
)

#
async def _process_event(self, event_result):
try:
logger.debug(f"event_result var is {event_result}")
Expand All @@ -192,7 +178,6 @@ async def _process_event(self, event_result):
except Exception as exc:
logger.error(f"Process events exc is {exc}", exc_info=True)
return ""
#

async def format_response(self):
"""
Expand All @@ -215,7 +200,7 @@ async def format_response(self):
self.event_type,
self.subscription_id,
self.results,
self.comment,
self.message,
)

else:
Expand Down Expand Up @@ -395,14 +380,9 @@ async def send_event_to_handler(
f"Received response from Event Handler {response_data}, data types is {type(response_data)}"
)
response_object = ExtractedResponse(response_data)
if response.status == 200:
if response.status:
formatted_response = await response_object.format_response()
logger.debug(
f"Formatted response data from send_event_to_handler function: {formatted_response}"
)
await websocket.send(json.dumps(formatted_response))
elif response.status == 409:
return response_object.duplicate_response
except Exception as e:
logger.error(f"An error occurred while sending the event to the handler: {e}")

Expand Down Expand Up @@ -451,7 +431,7 @@ async def send_subscription_to_handler(


if __name__ == "__main__":
rate_limiter = TokenBucketRateLimiter(tokens_per_second=1, max_tokens=5000)
rate_limiter = TokenBucketRateLimiter(tokens_per_second=1, max_tokens=50000)

try:
start_server = websockets.serve(handle_websocket_connection, "0.0.0.0", 8008)
Expand Down

0 comments on commit 9fbc283

Please sign in to comment.