Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Swap CeleryExecutor over to use TaskSDK for execution. #46265

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

ashb
Copy link
Member

@ashb ashb commented Jan 29, 2025

Closes #45426

Some points of note about this PR:

  • Logging is changed in Celery, but only for Airflow 3

    Celery does it's own "capture stdout" logging, which conflicts with the ones
    we do in the TaskSDK, so we disable that; but to not change anything for
    Airflow 3.

  • Simplify task SDK logging redirection

    As part of this discovery that Celery captures stdout/stderr itself (and
    before disabling that) I discovered a simpler way to re-open the
    stdin/out/err so that the implementation needs fewer/no special casing.

  • Make JSON task logs more readable by giving them a consistent/useful order

    We re-order (by re-creating) the event_dict so that timestamp, level, and
    then even are always the first items in the dict

  • Makes the CeleryExecutor understand the concept of "workloads" instead a
    command tuple.

    This change isn't done in the best way, but until Kube executor is swapped
    over (and possibly the other in-tree executors, such as ECS) we need to
    support both styles concurrently.

    The change should be done in such a way that the provider still works with
    Airflow v2, if it's running on that version.

  • Upgrade Celery

    This turned out to not be 100% necessary but it does fix some deprecation
    warnings when running on Python 3.12

  • Ensure that the forked process in TaskSDK never ever exits

    Again, this isn't possible usually, but since the setup step of _fork_main
    died, it didn't call os._exit(), and was caught further up, which meant
    the process stayed alive as it never closed the sockets properly. We put and
    extra safety try/except block in place to catch that

I have not yet included a newsfragment for changing the executor interface as
the old style is currently still supported.

Testing: I ran airflow scheduler, airflow fastapi-api, and airflow celery worker and triggered some dags, viewed logs etc.

And since a picture makes all PRs better

Screenshot 2025-01-29 at 22 49 49

Celery window showing debug logs from the supervisor itself (task output goes to file)

Screenshot 2025-01-29 at 22 50 56

@boring-cyborg boring-cyborg bot added area:Executors-core LocalExecutor & SequentialExecutor area:Scheduler including HA (high availability) scheduler area:task-sdk provider:celery labels Jan 29, 2025
task_sdk/src/airflow/sdk/log.py Outdated Show resolved Hide resolved
@ashb ashb force-pushed the swap-celery-exec-to-tasksdk branch from 315d736 to 8263c94 Compare January 29, 2025 22:59
Copy link
Contributor

@amoghrajesh amoghrajesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this @ashb!
Have a few comments, nothing serious, general comments / nits, I am OK with the PR mostly

@amoghrajesh

This comment was marked as resolved.

@ashb ashb force-pushed the swap-celery-exec-to-tasksdk branch 3 times, most recently from 2038dc1 to 8e2d7cc Compare January 30, 2025 13:36
@ashb ashb requested a review from eladkal as a code owner January 30, 2025 13:36
@ashb ashb force-pushed the swap-celery-exec-to-tasksdk branch 2 times, most recently from ef0a8b9 to fd30378 Compare January 30, 2025 15:29
@ashb ashb force-pushed the swap-celery-exec-to-tasksdk branch from fd30378 to a7635c9 Compare January 30, 2025 16:34
@ashb
Copy link
Member Author

ashb commented Jan 30, 2025

RIght I think this is good, the failing tests are fixed in main by Jarek's hard work.

One failure I wasn't happy with, so I've rebased.

Some points of note about this change:

- Logging is changed in Celery, but only for Airflow 3

  Celery does it's own "capture stdout" logging, which conflicts with the ones
  we do in the TaskSDK, so we disable that; but to not change anything for
  Airflow 3.

- Simplify task SDK logging redirection

  As part of this discovery that Celery captures stdout/stderr itself (and
  before disabling that) I discovered a simpler way to re-open the
  stdin/out/err so that the implementation needs fewer/no special casing.

- Make JSON task logs more readable by giving them a consistent/useful order

  We re-order (by re-creating) the event_dict so that timestamp, level, and
  then even are always the first items in the dict

- Makes the CeleryExecutor understand the concept of "workloads" instead a
  command tuple.

  This change isn't done in the best way, but until Kube executor is swapped
  over (and possibly the other in-tree executors, such as ECS) we need to
  support both styles concurrently.

  The change should be done in such a way that the provider still works with
  Airflow v2, if it's running on that version.

- Upgrade Celery

  This turned out to not be 100% necessary but it does fix some deprecation
  warnings when running on Python 3.12

- Ensure that the forked process in TaskSDK _never ever_ exits

  Again, this isn't possible usually, but since the setup step of `_fork_main`
  died, it didn't call `os._exit()`, and was caught further up, which meant
  the process stayed alive as it never closed the sockets properly. We put and
  extra safety try/except block in place to catch that

I have not yet included a newsfragment for changing the executor interface as
the old style is _currently_ still supported.
@ashb ashb force-pushed the swap-celery-exec-to-tasksdk branch from a7635c9 to 15aec32 Compare January 30, 2025 17:33
Copy link
Contributor

@o-nikolas o-nikolas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes to base look good to me to support both for now. I think it's called out in a comment but we should migrate all in-tree executors (K8s, ECS, Batch, Edge, etc) and add a new min Airflow version for those packages.

with _prepare_app(execute=fake_execute_command):
# fake_execute_command takes no arguments while execute_command takes 1,
with _prepare_app(execute=fake_task):
# fake_execute_command takes no arguments while execute_workload takes 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# fake_execute_command takes no arguments while execute_workload takes 1,
# fake_task takes no arguments while execute_workload takes 1,

airflow/executors/base_executor.py Show resolved Hide resolved
@ashb
Copy link
Member Author

ashb commented Jan 30, 2025

Changes to base look good to me to support both for now. I think it's called out in a comment but we should migrate all in-tree executors (K8s, ECS, Batch, Edge, etc) and add a new min Airflow version for those packages.

👍 Yup, or we can make them work with 2+3 similar to how I've done for celery. The ecs etc changes right now are very minimal in this or

Copy link
Contributor

@jscheffl jscheffl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow I would have expected a larger PR from the challenges you faced. Looks pretty solid.

Did a regression with EdgeExecutor and also this is still working. Just one detail - when I executed my "Integration Test" DAG from the Edge Examples, the BranchOperator in EdgeWorker is failing, which seems to be working in Celery. So might be I need a bit of ENV fixing. Exception is:
{"timestamp":"2025-01-30T22:01:35.193579","level":"error","event":"Task failed with exception","logger":"task","error_detail":[{"exc_type":"OperationalError","exc_value":"(sqlite3.OperationalError) no such table: dag_run\n[SQL: SELECT dag_run.state AS dag_run_state, dag_run.id AS dag_run_id, dag_run.dag_id AS dag_run_dag_id, dag_run.queued_at AS dag_run_queued_at, dag_run.logical_date AS dag_run_logical_date, dag_run.start_date AS dag_run_start_date, dag_run.end_date AS dag_run_end_date, dag_run.run_id AS dag_run_run_id, dag_run.creating_job_id AS dag_run_creating_job_id, dag_run.external_trigger AS dag_run_external_trigger, dag_run.run_type AS dag_run_run_type, dag_run.triggered_by AS dag_run_triggered_by, dag_run.conf AS dag_run_conf, dag_run.data_interval_start AS dag_run_data_interval_start, dag_run.data_interval_end AS dag_run_data_interval_end, dag_run.last_scheduling_decision AS dag_run_last_scheduling_decision, dag_run.log_template_id AS dag_run_log_template_id, dag_run.updated_at AS dag_run_updated_at, dag_run.clear_number AS dag_run_clear_number, dag_run.backfill_id AS dag_run_backfill_id, dag_run.dag_version_id AS dag_run_dag_version_id, dag_run.bundle_version AS dag_run_bundle_version \nFROM dag_run \nWHERE dag_run.dag_id = ? AND dag_run.run_id = ?]\n[parameters: ('integration_test', 'manual__2025-01-30T23:01:24+01:00')]\n(Background on this error at: [https://sqlalche.me/e/14/e3q8)","syntax_error":null,"is_cause":false,"frames":%22,%22syntax_error%22:null,%22is_cause%22:false,%22frames%22:[){"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":527,"name":"run"},{"filename":"/opt/airflow/task_sdk/src/airflow/sdk/execution_time/task_runner.py","lineno":626,"name":"_execute_task"},{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":173,"name":"wrapper"},{"filename":"/opt/airflow/airflow/decorators/base.py","lineno":252,"name":"execute"},{"filename":"/opt/airflow/airflow/models/baseoperator.py","lineno":173,"name":"wrapper"},{"filename":"/opt/airflow/providers/standard/src/airflow/providers/standard/operators/python.py","lineno":240,"name":"execute"},{"filename":"/opt/airflow/airflow/operators/branch.py","lineno":41,"name":"do_branch"},{"filename":"/opt/airflow/airflow/models/skipmixin.py","lineno":212,"name":"skip_all_except"},{"filename":"/opt/airflow/airflow/utils/session.py","lineno":98,"name":"wrapper"},{"filename":"/opt/airflow/airflow/models/taskinstance.py","lineno":2480,"name":"get_dagrun"},{"filename":"/opt/airflow/airflow/models/taskinstance.py","lineno":2461,"name":"_get_dagrun"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/query.py","lineno":2870,"name":"one"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/query.py","lineno":2916,"name":"_iter"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/orm/session.py","lineno":1717,"name":"execute"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py","lineno":1710,"name":"_execute_20"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py","lineno":334,"name":"_execute_on_connection"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py","lineno":1577,"name":"_execute_clauseelement"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py","lineno":1953,"name":"_execute_context"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py","lineno":2134,"name":"handle_dbapi_exception"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/util/compat.py","lineno":211,"name":"raise"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py","lineno":1910,"name":"_execute_context"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/default.py","lineno":736,"name":"do_execute"}]},{"exc_type":"OperationalError","exc_value":"no such table: dag_run","syntax_error":null,"is_cause":true,"frames":[{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/base.py","lineno":1910,"name":"_execute_context"},{"filename":"/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/default.py","lineno":736,"name":"do_execute"}]}]}

..but anyway, LGTM!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:Executors-core LocalExecutor & SequentialExecutor area:Scheduler including HA (high availability) scheduler area:task-sdk provider:celery
Projects
None yet
5 participants