Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change weight priority type from Integer to Float #42410

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3642,7 +3642,10 @@ components:
type: string
nullable: true
priority_weight:
type: integer
anyOf:
- type: integer
- type: number
format: float
nullable: true
operator:
type: string
Expand Down Expand Up @@ -4030,7 +4033,10 @@ components:
type: boolean
readOnly: true
priority_weight:
type: number
anyOf:
- type: integer
- type: number
format: float
readOnly: true
weight_rule:
$ref: "#/components/schemas/WeightRule"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Change TaskInstance priority weight type.

Revision ID: e5426c9eda78
Revises: 16cbcb1c8c36
Create Date: 2024-09-18 06:38:38.777688

"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "e5426c9eda78"
down_revision = "16cbcb1c8c36"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
"""Apply Change TaskInstance priority weight type."""
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.alter_column(
"priority_weight", existing_type=sa.INTEGER(), type_=sa.Float(), existing_nullable=True
)


def downgrade():
"""Unapply Change TaskInstance priority weight type."""
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.alter_column(
"priority_weight", existing_type=sa.Float(), type_=sa.INTEGER(), existing_nullable=True
)
4 changes: 2 additions & 2 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class AbstractOperator(Templater, DAGNode):
operator_class: type[BaseOperator] | dict[str, Any]

weight_rule: PriorityWeightStrategy
priority_weight: int
priority_weight: float | int

# Defines the operator level extra links.
operator_extra_links: Collection[BaseOperatorLink]
Expand Down Expand Up @@ -450,7 +450,7 @@ def expand_start_trigger_args(self, *, context: Context, session: Session) -> St
raise NotImplementedError()

@property
def priority_weight_total(self) -> int:
def priority_weight_total(self) -> float | int:
"""
Total priority weight for the task. It might include all upstream or downstream tasks.

Expand Down
8 changes: 4 additions & 4 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def partial(
max_retry_delay: None | timedelta | float | ArgNotSet = NOTSET,
retry_delay: timedelta | float | ArgNotSet = NOTSET,
retry_exponential_backoff: bool | ArgNotSet = NOTSET,
priority_weight: int | ArgNotSet = NOTSET,
priority_weight: float | int | ArgNotSet = NOTSET,
weight_rule: str | PriorityWeightStrategy | ArgNotSet = NOTSET,
sla: timedelta | None | ArgNotSet = NOTSET,
map_index_template: str | None | ArgNotSet = NOTSET,
Expand Down Expand Up @@ -879,7 +879,7 @@ def __init__(
dag: DAG | None = None,
params: collections.abc.MutableMapping | None = None,
default_args: dict | None = None,
priority_weight: int = DEFAULT_PRIORITY_WEIGHT,
priority_weight: float | int = DEFAULT_PRIORITY_WEIGHT,
weight_rule: str | PriorityWeightStrategy = DEFAULT_WEIGHT_RULE,
queue: str = DEFAULT_QUEUE,
pool: str | None = None,
Expand Down Expand Up @@ -1003,9 +1003,9 @@ def __init__(

# At execution_time this becomes a normal dict
self.params: ParamsDict | dict = ParamsDict(params)
if priority_weight is not None and not isinstance(priority_weight, int):
if priority_weight is not None and not isinstance(priority_weight, (int, float)):
raise AirflowException(
f"`priority_weight` for task '{self.task_id}' only accepts integers, "
f"`priority_weight` for task '{self.task_id}' only accepts integers/floats, "
f"received '{type(priority_weight)}'."
)
self.priority_weight = priority_weight
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,11 +530,11 @@ def retry_exponential_backoff(self, value: bool) -> None:
self.partial_kwargs["retry_exponential_backoff"] = value

@property
def priority_weight(self) -> int: # type: ignore[override]
def priority_weight(self) -> int | float: # type: ignore[override]
return self.partial_kwargs.get("priority_weight", DEFAULT_PRIORITY_WEIGHT)

@priority_weight.setter
def priority_weight(self, value: int) -> None:
def priority_weight(self, value: int | float) -> None:
self.partial_kwargs["priority_weight"] = value

@property
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1830,7 +1830,7 @@ class TaskInstance(Base, LoggingMixin):
pool = Column(String(256), nullable=False)
pool_slots = Column(Integer, default=1, nullable=False)
queue = Column(String(256))
priority_weight = Column(Integer)
priority_weight = Column(Float)
operator = Column(String(1000))
custom_operator_name = Column(String(1000))
queued_dttm = Column(UtcDateTime)
Expand Down
2 changes: 1 addition & 1 deletion airflow/task/priority_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def get_weight(self, ti: TaskInstance):
class _DownstreamPriorityWeightStrategy(PriorityWeightStrategy):
"""Priority weight strategy that uses the sum of the priority weights of all downstream tasks."""

def get_weight(self, ti: TaskInstance) -> int:
def get_weight(self, ti: TaskInstance) -> int | float:
if TYPE_CHECKING:
assert ti.task
dag = ti.task.get_dag()
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class MappedClassProtocol(Protocol):
"2.9.0": "1949afb29106",
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"3.0.0": "16cbcb1c8c36",
"3.0.0": "e5426c9eda78",
}


Expand Down
4 changes: 2 additions & 2 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ export interface components {
pool?: string;
pool_slots?: number;
queue?: string | null;
priority_weight?: number | null;
priority_weight?: (Partial<number> & Partial<number>) | null;
/** @description *Changed in version 2.1.1*&#58; Field becomes nullable. */
operator?: string | null;
/** @description The datetime that the task enter the state QUEUE, also known as queue_at */
Expand Down Expand Up @@ -1654,7 +1654,7 @@ export interface components {
execution_timeout?: components["schemas"]["TimeDelta"];
retry_delay?: components["schemas"]["TimeDelta"];
retry_exponential_backoff?: boolean;
priority_weight?: number;
priority_weight?: Partial<number> & Partial<number>;
weight_rule?: components["schemas"]["WeightRule"];
ui_color?: components["schemas"]["Color"];
ui_fgcolor?: components["schemas"]["Color"];
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
f4379048d3f13f35aaba824c00450c17ad4deea9af82b5498d755a12f8a85a37
4d1e943e9d24879dbcb3510d9ea1b0e13e954a64dea2523a4b35fafece63c6cf
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``16cbcb1c8c36`` (head) | ``522625f6d606`` | ``3.0.0`` | Remove redundant index. |
| ``e5426c9eda78`` (head) | ``16cbcb1c8c36`` | ``3.0.0`` | Change TaskInstance priority weight type. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``16cbcb1c8c36`` | ``522625f6d606`` | ``3.0.0`` | Remove redundant index. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``522625f6d606`` | ``1cdc775ca98f`` | ``3.0.0`` | Add tables for backfill. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
4 changes: 3 additions & 1 deletion tests/models/test_baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ def test_incorrect_default_args(self):
DummyClass(default_args=default_args)

def test_incorrect_priority_weight(self):
error_msg = "`priority_weight` for task 'test_op' only accepts integers, received '<class 'str'>'."
error_msg = (
"`priority_weight` for task 'test_op' only accepts integers/floats, received '<class 'str'>'."
)
with pytest.raises(AirflowException, match=error_msg):
BaseOperator(task_id="test_op", priority_weight="2")

Expand Down