Skip to content

Commit

Permalink
Merge pull request #21 from upstash/DX-1568
Browse files Browse the repository at this point in the history
DX-1568: raise errors on qstash token missing
  • Loading branch information
CahidArda authored Jan 16, 2025
2 parents 94ae49e + d748309 commit b100700
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 22 deletions.
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "upstash-workflow"
version = "0.0.1-rc.1"
version = "0.0.1-rc.2"
description = "Python SDK for Upstash Workflow"
license = "MIT"
authors = ["Upstash <[email protected]>"]
Expand Down Expand Up @@ -29,8 +29,7 @@ packages = [{ include = "upstash_workflow" }]

[tool.poetry.dependencies]
python = "^3.8"
httpx = ">=0.23.0, <1"
qstash = "^2.0.2"
qstash = "^2.0.3"

[tool.poetry.group.fastapi.dependencies]
fastapi = "^0.115.0"
Expand Down
2 changes: 1 addition & 1 deletion upstash_workflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.0.1-rc.1"
__version__ = "0.0.1-rc.2"

from upstash_workflow.context.context import WorkflowContext
from upstash_workflow.serve.serve import serve
Expand Down
2 changes: 1 addition & 1 deletion upstash_workflow/asyncio/serve/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _initial_payload_parser(initial_request: str) -> TInitialPayload:
return ServeBaseOptions[TInitialPayload, TResponse](
qstash_client=qstash_client
or AsyncQStash(
cast(str, environment.get("QSTASH_TOKEN", "")),
cast(str, environment.get("QSTASH_TOKEN")),
),
on_step_finish=on_step_finish or _on_step_finish,
initial_payload_parser=initial_payload_parser or _initial_payload_parser,
Expand Down
13 changes: 5 additions & 8 deletions upstash_workflow/asyncio/workflow_requests.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from __future__ import annotations
import httpx
import json
import base64
import logging
Expand Down Expand Up @@ -68,13 +67,11 @@ async def _trigger_workflow_delete(
workflow_context: AsyncWorkflowContext[TInitialPayload],
cancel: Optional[bool] = False,
) -> None:
async with httpx.AsyncClient() as client:
await client.delete(
f"https://qstash.upstash.io/v2/workflows/runs/{workflow_context.workflow_run_id}?cancel={str(cancel).lower()}",
headers={
"Authorization": f"Bearer {workflow_context.env.get('QSTASH_TOKEN', '')}"
},
)
await workflow_context.qstash_client.http.request(
path=f"/v2/workflows/runs/{workflow_context.workflow_run_id}?cancel={str(cancel).lower()}",
method="DELETE",
parse_response=False,
)


async def _handle_third_party_call_result(
Expand Down
10 changes: 10 additions & 0 deletions upstash_workflow/fastapi.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from inspect import iscoroutinefunction
import json
import os
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from typing import Callable, Awaitable, cast, TypeVar, Optional, Dict
Expand Down Expand Up @@ -45,6 +46,15 @@ def post(
:return:
"""

if not (
qstash_client
or (env is not None and env.get("QSTASH_TOKEN"))
or (env is None and os.getenv("QSTASH_TOKEN"))
):
raise ValueError(
"QSTASH_TOKEN is missing. Make sure to set it in the environment variables or pass qstash_client or env as an argument."
)

def decorator(
route_function: AsyncRouteFunction[TInitialPayload],
) -> AsyncRouteFunction[TInitialPayload]:
Expand Down
10 changes: 10 additions & 0 deletions upstash_workflow/flask.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from inspect import iscoroutinefunction
import os
from flask import Flask, request
from werkzeug.wrappers import Response
from typing import Callable, cast, TypeVar, Optional, Dict
Expand Down Expand Up @@ -49,6 +50,15 @@ def route(
:return:
"""

if not (
qstash_client
or (env is not None and env.get("QSTASH_TOKEN"))
or (env is None and os.getenv("QSTASH_TOKEN"))
):
raise ValueError(
"QSTASH_TOKEN is missing. Make sure to set it in the environment variables or pass qstash_client or env as an argument."
)

def decorator(
route_function: RouteFunction[TInitialPayload],
) -> RouteFunction[TInitialPayload]:
Expand Down
2 changes: 1 addition & 1 deletion upstash_workflow/serve/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def _initial_payload_parser(initial_request: str) -> TInitialPayload:
return ServeBaseOptions[TInitialPayload, TResponse](
qstash_client=qstash_client
or QStash(
cast(str, environment.get("QSTASH_TOKEN", "")),
cast(str, environment.get("QSTASH_TOKEN")),
),
on_step_finish=on_step_finish or _on_step_finish,
initial_payload_parser=initial_payload_parser or _initial_payload_parser,
Expand Down
13 changes: 5 additions & 8 deletions upstash_workflow/workflow_requests.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from __future__ import annotations
import httpx
import json
import base64
import logging
Expand Down Expand Up @@ -74,13 +73,11 @@ def _trigger_workflow_delete(
workflow_context: WorkflowContext[TInitialPayload],
cancel: Optional[bool] = False,
) -> None:
with httpx.Client() as client:
client.delete(
f"https://qstash.upstash.io/v2/workflows/runs/{workflow_context.workflow_run_id}?cancel={str(cancel).lower()}",
headers={
"Authorization": f"Bearer {workflow_context.env.get('QSTASH_TOKEN', '')}"
},
)
workflow_context.qstash_client.http.request(
path=f"/v2/workflows/runs/{workflow_context.workflow_run_id}?cancel={str(cancel).lower()}",
method="DELETE",
parse_response=False,
)


def _recreate_user_headers(headers: Dict[str, str]) -> Dict[str, str]:
Expand Down

0 comments on commit b100700

Please sign in to comment.