Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic asyncio support #43944

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions .github/workflows/basic-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,10 +196,19 @@ jobs:
working-directory: ./clients/python
- name: "Install source version of required packages"
run: |
breeze release-management prepare-provider-packages fab standard common.sql --package-format \
wheel --skip-tag-check --version-suffix-for-pypi dev0
pip install . dist/apache_airflow_providers_fab-*.whl \
dist/apache_airflow_providers_standard-*.whl dist/apache_airflow_providers_common_sql-*.whl
breeze release-management prepare-provider-packages \
fab \
standard \
common.sql \
sqlite \
--package-format wheel \
--skip-tag-check \
--version-suffix-for-pypi dev0
pip install . \
dist/apache_airflow_providers_fab-*.whl \
dist/apache_airflow_providers_standard-*.whl \
dist/apache_airflow_providers_common_sql-*.whl \
dist/apache_airflow_providers_sqlite-*.whl
breeze release-management prepare-task-sdk-package --package-format wheel
pip install ./dist/apache_airflow_task_sdk-*.whl
- name: "Install Python client"
Expand Down
34 changes: 33 additions & 1 deletion airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import pluggy
from packaging.version import Version
from sqlalchemy import create_engine, exc, text
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import NullPool

Expand Down Expand Up @@ -95,8 +96,17 @@
DONOT_MODIFY_HANDLERS: bool | None = None
DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER"))

AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"}
"""
Mapping of sync scheme to async scheme.
:meta private:
"""

engine: Engine
Session: Callable[..., SASession]
async_engine: AsyncEngine
create_async_session: Callable[..., AsyncSession]

# The JSON library to use for DAG Serialization and De-Serialization
json = json
Expand Down Expand Up @@ -199,13 +209,25 @@ def load_policy_plugins(pm: pluggy.PluginManager):
pm.load_setuptools_entrypoints("airflow.policy")


def _get_async_conn_uri_from_sync(sync_uri):
scheme, rest = sync_uri.split(":", maxsplit=1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SQLA has stuff built in to parse URLs/schemas. Maybe we should use that instead?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheme = scheme.split("+", maxsplit=1)[0]
aiolib = AIO_LIBS_MAPPING.get(scheme)
if aiolib:
return f"{scheme}+{aiolib}:{rest}"
else:
return sync_uri


def configure_vars():
"""Configure Global Variables from airflow.cfg."""
global SQL_ALCHEMY_CONN
global SQL_ALCHEMY_CONN_ASYNC
global DAGS_FOLDER
global PLUGINS_FOLDER
global DONOT_MODIFY_HANDLERS
SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN")
SQL_ALCHEMY_CONN_ASYNC = _get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN)

DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER"))

Expand Down Expand Up @@ -441,6 +463,9 @@ def configure_orm(disable_connection_pool=False, pool_class=None):

global Session
global engine
global async_engine
global create_async_session
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
global create_async_session

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why @ashb ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's necessary ... it doesn't get set until configure_orm is called


if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true":
# Skip DB initialization in unit tests, if DB tests are skipped
Session = SkipDBTestsSession
Expand All @@ -466,7 +491,14 @@ def configure_orm(disable_connection_pool=False, pool_class=None):
connect_args["check_same_thread"] = False

engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args, future=True)

async_engine = create_async_engine(SQL_ALCHEMY_CONN_ASYNC, future=True)
create_async_session = sessionmaker(
bind=async_engine,
autocommit=False,
autoflush=False,
class_=AsyncSession,
expire_on_commit=False,
)
mask_secret(engine.url.password)

setup_event_handlers(engine)
Expand Down
2 changes: 2 additions & 0 deletions dev/breeze/tests/test_packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,7 @@ def test_get_documentation_package_path():
"""
"apache-airflow-providers-common-sql>=1.20.0b0",
"apache-airflow>=2.8.0b0",
"asyncpg>=0.30.0",
"psycopg2-binary>=2.9.4",
""",
id="beta0 suffix postgres",
Expand All @@ -221,6 +222,7 @@ def test_get_documentation_package_path():
"""
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow>=2.8.0",
"asyncpg>=0.30.0",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

psycopg 3 also has async support but might conflict with psycopg 2 already installed.

https://www.psycopg.org/psycopg3/docs/advanced/async.html

https://docs.sqlalchemy.org/en/20/dialects/postgresql.html#module-sqlalchemy.dialects.postgresql.psycopg

True, yeah. We could in future look at allowing diff libs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmark, so take with a pinch of salt, but https://pypi.org/project/asyncpg/

In our testing asyncpg is, on average, 5x faster than psycopg3.

That's def a big enough of a change that we should see if we notice any difference

"psycopg2-binary>=2.9.4",
""",
id="No suffix postgres",
Expand Down
3 changes: 3 additions & 0 deletions generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@
},
"mysql": {
"deps": [
"aiomysql>=0.2.0",
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow>=2.8.0",
"mysql-connector-python>=8.0.29",
Expand Down Expand Up @@ -1085,6 +1086,7 @@
"deps": [
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow>=2.8.0",
"asyncpg>=0.30.0",
"psycopg2-binary>=2.9.4"
],
"devel-deps": [],
Expand Down Expand Up @@ -1260,6 +1262,7 @@
},
"sqlite": {
"deps": [
"aiosqlite>=0.2.0",
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow>=2.8.0"
],
Expand Down
1 change: 1 addition & 0 deletions providers/src/airflow/providers/mysql/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ dependencies:
# Instead, if someone attempts to use it on MacOS, they will get explanatory error on how to install it
- mysqlclient>=1.4.0; sys_platform != 'darwin'
- mysql-connector-python>=8.0.29
- aiomysql>=0.2.0

additional-extras:
# only needed for backwards compatibility
Expand Down
1 change: 1 addition & 0 deletions providers/src/airflow/providers/postgres/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ dependencies:
- apache-airflow>=2.8.0
- apache-airflow-providers-common-sql>=1.20.0
- psycopg2-binary>=2.9.4
- asyncpg>=0.30.0

additional-extras:
- name: amazon
Expand Down
1 change: 1 addition & 0 deletions providers/src/airflow/providers/sqlite/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ versions:

dependencies:
- apache-airflow>=2.8.0
- aiosqlite>=0.2.0
- apache-airflow-providers-common-sql>=1.20.0

integrations:
Expand Down
2 changes: 1 addition & 1 deletion scripts/ci/kubernetes/k8s_requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
-e .[devel-devscripts,devel-tests,cncf.kubernetes]
-e .[devel-devscripts,devel-tests,cncf.kubernetes,sqlite]
-e ./providers
-e ./task_sdk
12 changes: 12 additions & 0 deletions tests/utils/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
from __future__ import annotations

import pytest
from sqlalchemy import select

from airflow.models import Log
from airflow.utils.session import provide_session

pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]
Expand Down Expand Up @@ -53,3 +55,13 @@ def test_provide_session_with_kwargs(self):

session = object()
assert wrapper(session=session) is session

@pytest.mark.asyncio
async def test_async_session(self):
from airflow.settings import create_async_session

session = create_async_session()
session.add(Log(event="hihi1234"))
await session.commit()
l = await session.scalar(select(Log).where(Log.event == "hihi1234").limit(1)) # noqa: E741
assert l.event == "hihi1234"
Loading