From 84f6fb94e3554b74c99c9657b43a4b364c2f0d8d Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 12 Nov 2024 14:48:56 -0800 Subject: [PATCH 01/13] Add basic asyncio support This is meant to be sort of a hello world for asyncio support in airflow. It will be refined and extended in the future. But I think airflow ultimately really needs to go in this direction: in the new REST API, in the new AIP-72 internal API server, in triggers, and ultimately, in the scheduler. --- airflow/settings.py | 33 ++++++++++++++++++++++++++++++++- tests/utils/test_session.py | 13 +++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/airflow/settings.py b/airflow/settings.py index a3f99510adba..7f6f4d419b77 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -31,6 +31,7 @@ import pluggy from packaging.version import Version from sqlalchemy import create_engine, exc, text +from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine from sqlalchemy.orm import scoped_session, sessionmaker from sqlalchemy.pool import NullPool @@ -95,8 +96,17 @@ DONOT_MODIFY_HANDLERS: bool | None = None DAGS_FOLDER: str = os.path.expanduser(conf.get_mandatory_value("core", "DAGS_FOLDER")) +AIO_LIBS_MAPPING = {"sqlite": "aiosqlite", "postgresql": "asyncpg", "mysql": "aiomysql"} +""" +Mapping of sync scheme to async scheme. + +:meta private: +""" + engine: Engine Session: Callable[..., SASession] +async_engine: AsyncEngine +create_async_session: Callable[..., AsyncSession] # The JSON library to use for DAG Serialization and De-Serialization json = json @@ -199,13 +209,22 @@ def load_policy_plugins(pm: pluggy.PluginManager): pm.load_setuptools_entrypoints("airflow.policy") +def _get_async_conn_uri_from_sync(): + scheme, rest = SQL_ALCHEMY_CONN.split(":", maxsplit=1) + scheme = scheme.split("+", maxsplit=1)[0] + aiolib = AIO_LIBS_MAPPING[scheme] + return f"{scheme}+{aiolib}:{rest}" + + def configure_vars(): """Configure Global Variables from airflow.cfg.""" global SQL_ALCHEMY_CONN + global SQL_ALCHEMY_CONN_ASYNC global DAGS_FOLDER global PLUGINS_FOLDER global DONOT_MODIFY_HANDLERS SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN") + SQL_ALCHEMY_CONN_ASYNC = _get_async_conn_uri_from_sync() DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER")) @@ -441,6 +460,9 @@ def configure_orm(disable_connection_pool=False, pool_class=None): global Session global engine + global async_engine + global create_async_session + if os.environ.get("_AIRFLOW_SKIP_DB_TESTS") == "true": # Skip DB initialization in unit tests, if DB tests are skipped Session = SkipDBTestsSession @@ -466,7 +488,16 @@ def configure_orm(disable_connection_pool=False, pool_class=None): connect_args["check_same_thread"] = False engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args, future=True) - + async_engine = create_async_engine( + SQL_ALCHEMY_CONN_ASYNC, connect_args=connect_args, **engine_args, future=True + ) + create_async_session = sessionmaker( + bind=async_engine, + autocommit=False, + autoflush=False, + class_=AsyncSession, + expire_on_commit=False, + ) mask_secret(engine.url.password) setup_event_handlers(engine) diff --git a/tests/utils/test_session.py b/tests/utils/test_session.py index 82d0a00a26d9..63db479231c6 100644 --- a/tests/utils/test_session.py +++ b/tests/utils/test_session.py @@ -18,7 +18,10 @@ from __future__ import annotations import pytest +from sqlalchemy import select +from airflow.models import Log +from airflow.settings import create_async_session from airflow.utils.session import provide_session pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] @@ -53,3 +56,13 @@ def test_provide_session_with_kwargs(self): session = object() assert wrapper(session=session) is session + + +@pytest.mark.asyncio +@pytest.mark.db_test +async def test_async_session(): + session = create_async_session() + session.add(Log(event="hihi1234")) + await session.commit() + l = await session.scalar(select(Log).where(Log.event == "hihi1234").limit(1)) # noqa: E741 + assert l.event == "hihi1234" From fcb930dfc110e401a23b3c51c8ec2301561547b7 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 12 Nov 2024 14:58:48 -0800 Subject: [PATCH 02/13] add provider deps --- generated/provider_dependencies.json | 3 +++ providers/src/airflow/providers/mysql/provider.yaml | 1 + providers/src/airflow/providers/postgres/provider.yaml | 1 + providers/src/airflow/providers/sqlite/provider.yaml | 1 + 4 files changed, 6 insertions(+) diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 31a8beb99c79..9169b7c253b4 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -906,6 +906,7 @@ }, "mysql": { "deps": [ + "aiomysql>=0.2.0", "apache-airflow-providers-common-sql>=1.20.0", "apache-airflow>=2.8.0", "mysql-connector-python>=8.0.29", @@ -1085,6 +1086,7 @@ "deps": [ "apache-airflow-providers-common-sql>=1.20.0", "apache-airflow>=2.8.0", + "asyncpg>=0.30.0", "psycopg2-binary>=2.9.4" ], "devel-deps": [], @@ -1260,6 +1262,7 @@ }, "sqlite": { "deps": [ + "aiosqlite>=0.2.0", "apache-airflow-providers-common-sql>=1.20.0", "apache-airflow>=2.8.0" ], diff --git a/providers/src/airflow/providers/mysql/provider.yaml b/providers/src/airflow/providers/mysql/provider.yaml index 19f53bb141f8..9f75d1f3b156 100644 --- a/providers/src/airflow/providers/mysql/provider.yaml +++ b/providers/src/airflow/providers/mysql/provider.yaml @@ -75,6 +75,7 @@ dependencies: # Instead, if someone attempts to use it on MacOS, they will get explanatory error on how to install it - mysqlclient>=1.4.0; sys_platform != 'darwin' - mysql-connector-python>=8.0.29 + - aiomysql>=0.2.0 additional-extras: # only needed for backwards compatibility diff --git a/providers/src/airflow/providers/postgres/provider.yaml b/providers/src/airflow/providers/postgres/provider.yaml index 7ad546043967..fe5cea4137b4 100644 --- a/providers/src/airflow/providers/postgres/provider.yaml +++ b/providers/src/airflow/providers/postgres/provider.yaml @@ -69,6 +69,7 @@ dependencies: - apache-airflow>=2.8.0 - apache-airflow-providers-common-sql>=1.20.0 - psycopg2-binary>=2.9.4 + - asyncpg>=0.30.0 additional-extras: - name: amazon diff --git a/providers/src/airflow/providers/sqlite/provider.yaml b/providers/src/airflow/providers/sqlite/provider.yaml index 4f00d3e6f437..030063f89aab 100644 --- a/providers/src/airflow/providers/sqlite/provider.yaml +++ b/providers/src/airflow/providers/sqlite/provider.yaml @@ -56,6 +56,7 @@ versions: dependencies: - apache-airflow>=2.8.0 + - aiosqlite>=0.2.0 - apache-airflow-providers-common-sql>=1.20.0 integrations: From 5dfbaf08823ad2c545fb6e7df7a2087e8eade9e9 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 12 Nov 2024 15:22:48 -0800 Subject: [PATCH 03/13] handle bad schema --- airflow/settings.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/airflow/settings.py b/airflow/settings.py index 7f6f4d419b77..c2d5d783e69c 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -212,8 +212,11 @@ def load_policy_plugins(pm: pluggy.PluginManager): def _get_async_conn_uri_from_sync(): scheme, rest = SQL_ALCHEMY_CONN.split(":", maxsplit=1) scheme = scheme.split("+", maxsplit=1)[0] - aiolib = AIO_LIBS_MAPPING[scheme] - return f"{scheme}+{aiolib}:{rest}" + aiolib = AIO_LIBS_MAPPING.get(scheme) + if aiolib: + return f"{scheme}+{aiolib}:{rest}" + else: + return SQL_ALCHEMY_CONN def configure_vars(): From 3de48eb25c06f38459678dfe4a236bc5a6af634b Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 12 Nov 2024 15:27:34 -0800 Subject: [PATCH 04/13] add param --- airflow/settings.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow/settings.py b/airflow/settings.py index c2d5d783e69c..0ac51c08b013 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -209,14 +209,14 @@ def load_policy_plugins(pm: pluggy.PluginManager): pm.load_setuptools_entrypoints("airflow.policy") -def _get_async_conn_uri_from_sync(): - scheme, rest = SQL_ALCHEMY_CONN.split(":", maxsplit=1) +def _get_async_conn_uri_from_sync(sync_uri): + scheme, rest = sync_uri.split(":", maxsplit=1) scheme = scheme.split("+", maxsplit=1)[0] aiolib = AIO_LIBS_MAPPING.get(scheme) if aiolib: return f"{scheme}+{aiolib}:{rest}" else: - return SQL_ALCHEMY_CONN + return sync_uri def configure_vars(): @@ -227,7 +227,7 @@ def configure_vars(): global PLUGINS_FOLDER global DONOT_MODIFY_HANDLERS SQL_ALCHEMY_CONN = conf.get("database", "SQL_ALCHEMY_CONN") - SQL_ALCHEMY_CONN_ASYNC = _get_async_conn_uri_from_sync() + SQL_ALCHEMY_CONN_ASYNC = _get_async_conn_uri_from_sync(sync_uri=SQL_ALCHEMY_CONN) DAGS_FOLDER = os.path.expanduser(conf.get("core", "DAGS_FOLDER")) From 97546da25cf7c487840372dee4eec8a6e7c623d6 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 12 Nov 2024 15:54:41 -0800 Subject: [PATCH 05/13] fix test --- dev/breeze/tests/test_packages.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dev/breeze/tests/test_packages.py b/dev/breeze/tests/test_packages.py index fe3173b2e9dd..7e641e79dc2b 100644 --- a/dev/breeze/tests/test_packages.py +++ b/dev/breeze/tests/test_packages.py @@ -221,6 +221,7 @@ def test_get_documentation_package_path(): """ "apache-airflow-providers-common-sql>=1.20.0", "apache-airflow>=2.8.0", + "asyncpg>=0.30.0", "psycopg2-binary>=2.9.4", """, id="No suffix postgres", From a99a39302e09e35d5f48fdb8119b7119b20a8d6d Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 12 Nov 2024 16:18:42 -0800 Subject: [PATCH 06/13] remove args --- airflow/settings.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/airflow/settings.py b/airflow/settings.py index 0ac51c08b013..c3f32fa59d98 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -491,9 +491,7 @@ def configure_orm(disable_connection_pool=False, pool_class=None): connect_args["check_same_thread"] = False engine = create_engine(SQL_ALCHEMY_CONN, connect_args=connect_args, **engine_args, future=True) - async_engine = create_async_engine( - SQL_ALCHEMY_CONN_ASYNC, connect_args=connect_args, **engine_args, future=True - ) + async_engine = create_async_engine(SQL_ALCHEMY_CONN_ASYNC, future=True) create_async_session = sessionmaker( bind=async_engine, autocommit=False, From de31c5d6a91a7a0818534edd19c6338489d79d50 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Tue, 12 Nov 2024 19:35:30 -0800 Subject: [PATCH 07/13] fix test --- tests/utils/test_session.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/tests/utils/test_session.py b/tests/utils/test_session.py index 63db479231c6..ff19c0ca6dad 100644 --- a/tests/utils/test_session.py +++ b/tests/utils/test_session.py @@ -21,7 +21,6 @@ from sqlalchemy import select from airflow.models import Log -from airflow.settings import create_async_session from airflow.utils.session import provide_session pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode] @@ -57,12 +56,12 @@ def test_provide_session_with_kwargs(self): session = object() assert wrapper(session=session) is session + @pytest.mark.asyncio + async def test_async_session(self): + from airflow.settings import create_async_session -@pytest.mark.asyncio -@pytest.mark.db_test -async def test_async_session(): - session = create_async_session() - session.add(Log(event="hihi1234")) - await session.commit() - l = await session.scalar(select(Log).where(Log.event == "hihi1234").limit(1)) # noqa: E741 - assert l.event == "hihi1234" + session = create_async_session() + session.add(Log(event="hihi1234")) + await session.commit() + l = await session.scalar(select(Log).where(Log.event == "hihi1234").limit(1)) # noqa: E741 + assert l.event == "hihi1234" From 00fbf35369ac7f2eb32fdbd30c9886f76add3318 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 13 Nov 2024 14:56:13 -0800 Subject: [PATCH 08/13] add sqlite to test workflow --- .github/workflows/basic-tests.yml | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/.github/workflows/basic-tests.yml b/.github/workflows/basic-tests.yml index 22f5d0652c9b..1e57ce9e9da6 100644 --- a/.github/workflows/basic-tests.yml +++ b/.github/workflows/basic-tests.yml @@ -196,8 +196,14 @@ jobs: working-directory: ./clients/python - name: "Install source version of required packages" run: | - breeze release-management prepare-provider-packages fab standard common.sql --package-format \ - wheel --skip-tag-check --version-suffix-for-pypi dev0 + breeze release-management prepare-provider-packages \ + fab \ + standard \ + common.sql \ + sqlite \ + --package-format wheel \ + --skip-tag-check \ + --version-suffix-for-pypi dev0 pip install . dist/apache_airflow_providers_fab-*.whl \ dist/apache_airflow_providers_standard-*.whl dist/apache_airflow_providers_common_sql-*.whl breeze release-management prepare-task-sdk-package --package-format wheel From 24bbf30d173eed040b1060b116729be5f44e720a Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Thu, 14 Nov 2024 00:01:49 +0000 Subject: [PATCH 09/13] Update .github/workflows/basic-tests.yml --- .github/workflows/basic-tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/basic-tests.yml b/.github/workflows/basic-tests.yml index 1e57ce9e9da6..1fcb1b292a82 100644 --- a/.github/workflows/basic-tests.yml +++ b/.github/workflows/basic-tests.yml @@ -205,7 +205,7 @@ jobs: --skip-tag-check \ --version-suffix-for-pypi dev0 pip install . dist/apache_airflow_providers_fab-*.whl \ - dist/apache_airflow_providers_standard-*.whl dist/apache_airflow_providers_common_sql-*.whl + dist/apache_airflow_providers_standard-*.whl dist/apache_airflow_providers_common_sql-*.whl dist/apache_airflow_providers_sqlite-*.whl breeze release-management prepare-task-sdk-package --package-format wheel pip install ./dist/apache_airflow_task_sdk-*.whl - name: "Install Python client" From 60cce2cb0106a8b032a8a1ef6df7be77e02fe04b Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 13 Nov 2024 15:43:27 -0800 Subject: [PATCH 10/13] fix breeze test --- dev/breeze/tests/test_packages.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dev/breeze/tests/test_packages.py b/dev/breeze/tests/test_packages.py index 7e641e79dc2b..cccfd45e340f 100644 --- a/dev/breeze/tests/test_packages.py +++ b/dev/breeze/tests/test_packages.py @@ -211,6 +211,7 @@ def test_get_documentation_package_path(): """ "apache-airflow-providers-common-sql>=1.20.0b0", "apache-airflow>=2.8.0b0", + "asyncpg>=0.30.0", "psycopg2-binary>=2.9.4", """, id="beta0 suffix postgres", From 6791e41c626f2fdb6882e75aea3cebe383ba03d8 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 13 Nov 2024 16:03:55 -0800 Subject: [PATCH 11/13] merge --- .github/workflows/basic-tests.yml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.github/workflows/basic-tests.yml b/.github/workflows/basic-tests.yml index 1fcb1b292a82..14f0628e454b 100644 --- a/.github/workflows/basic-tests.yml +++ b/.github/workflows/basic-tests.yml @@ -204,8 +204,11 @@ jobs: --package-format wheel \ --skip-tag-check \ --version-suffix-for-pypi dev0 - pip install . dist/apache_airflow_providers_fab-*.whl \ - dist/apache_airflow_providers_standard-*.whl dist/apache_airflow_providers_common_sql-*.whl dist/apache_airflow_providers_sqlite-*.whl + pip install . \ + dist/apache_airflow_providers_fab-*.whl \ + dist/apache_airflow_providers_standard-*.whl \ + dist/apache_airflow_providers_common_sql-*.whl \ + dist/apache_airflow_providers_sqlite-*.whl breeze release-management prepare-task-sdk-package --package-format wheel pip install ./dist/apache_airflow_task_sdk-*.whl - name: "Install Python client" From ca7b9b2d0d5836193ab55390157c8468688b74b0 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Wed, 13 Nov 2024 21:42:29 -0800 Subject: [PATCH 12/13] try adding sqlite as dep for k8s tests --- scripts/ci/kubernetes/k8s_requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/ci/kubernetes/k8s_requirements.txt b/scripts/ci/kubernetes/k8s_requirements.txt index 2d00510e3826..db62ee859638 100644 --- a/scripts/ci/kubernetes/k8s_requirements.txt +++ b/scripts/ci/kubernetes/k8s_requirements.txt @@ -1,3 +1,3 @@ --e .[devel-devscripts,devel-tests,cncf.kubernetes] +-e .[devel-devscripts,devel-tests,cncf.kubernetes,sqlite] -e ./providers -e ./task_sdk From c1e5029e331e0fa597fc3f8c21d02cdc65429233 Mon Sep 17 00:00:00 2001 From: Daniel Standish <15932138+dstandish@users.noreply.github.com> Date: Thu, 14 Nov 2024 12:51:22 -0800 Subject: [PATCH 13/13] bump aiosqlite --- generated/provider_dependencies.json | 2 +- providers/src/airflow/providers/sqlite/provider.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/generated/provider_dependencies.json b/generated/provider_dependencies.json index 9169b7c253b4..5d4ed7984004 100644 --- a/generated/provider_dependencies.json +++ b/generated/provider_dependencies.json @@ -1262,7 +1262,7 @@ }, "sqlite": { "deps": [ - "aiosqlite>=0.2.0", + "aiosqlite>=0.20.0", "apache-airflow-providers-common-sql>=1.20.0", "apache-airflow>=2.8.0" ], diff --git a/providers/src/airflow/providers/sqlite/provider.yaml b/providers/src/airflow/providers/sqlite/provider.yaml index 030063f89aab..7364863e4ce4 100644 --- a/providers/src/airflow/providers/sqlite/provider.yaml +++ b/providers/src/airflow/providers/sqlite/provider.yaml @@ -56,7 +56,7 @@ versions: dependencies: - apache-airflow>=2.8.0 - - aiosqlite>=0.2.0 + - aiosqlite>=0.20.0 - apache-airflow-providers-common-sql>=1.20.0 integrations: