Skip to content

Commit

Permalink
Change weight priority type from Integer to Float
Browse files Browse the repository at this point in the history
  • Loading branch information
molcay committed Sep 23, 2024
1 parent 268a400 commit 7385aad
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 19 deletions.
10 changes: 8 additions & 2 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3642,7 +3642,10 @@ components:
type: string
nullable: true
priority_weight:
type: integer
anyOf:
- type: integer
- type: number
format: float
nullable: true
operator:
type: string
Expand Down Expand Up @@ -4030,7 +4033,10 @@ components:
type: boolean
readOnly: true
priority_weight:
type: number
anyOf:
- type: integer
- type: number
format: float
readOnly: true
weight_rule:
$ref: "#/components/schemas/WeightRule"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Change TaskInstance priority weight type.
Revision ID: e5426c9eda78
Revises: 16cbcb1c8c36
Create Date: 2024-09-18 06:38:38.777688
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "e5426c9eda78"
down_revision = "16cbcb1c8c36"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
"""Apply Change TaskInstance priority weight type."""
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.alter_column(
"priority_weight", existing_type=sa.INTEGER(), type_=sa.Float(), existing_nullable=True
)


def downgrade():
"""Unapply Change TaskInstance priority weight type."""
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.alter_column(
"priority_weight", existing_type=sa.Float(), type_=sa.INTEGER(), existing_nullable=True
)
4 changes: 2 additions & 2 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class AbstractOperator(Templater, DAGNode):
operator_class: type[BaseOperator] | dict[str, Any]

weight_rule: PriorityWeightStrategy
priority_weight: int
priority_weight: float | int

# Defines the operator level extra links.
operator_extra_links: Collection[BaseOperatorLink]
Expand Down Expand Up @@ -450,7 +450,7 @@ def expand_start_trigger_args(self, *, context: Context, session: Session) -> St
raise NotImplementedError()

@property
def priority_weight_total(self) -> int:
def priority_weight_total(self) -> float | int:
"""
Total priority weight for the task. It might include all upstream or downstream tasks.
Expand Down
8 changes: 4 additions & 4 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def partial(
max_retry_delay: None | timedelta | float | ArgNotSet = NOTSET,
retry_delay: timedelta | float | ArgNotSet = NOTSET,
retry_exponential_backoff: bool | ArgNotSet = NOTSET,
priority_weight: int | ArgNotSet = NOTSET,
priority_weight: float | int | ArgNotSet = NOTSET,
weight_rule: str | PriorityWeightStrategy | ArgNotSet = NOTSET,
sla: timedelta | None | ArgNotSet = NOTSET,
map_index_template: str | None | ArgNotSet = NOTSET,
Expand Down Expand Up @@ -879,7 +879,7 @@ def __init__(
dag: DAG | None = None,
params: collections.abc.MutableMapping | None = None,
default_args: dict | None = None,
priority_weight: int = DEFAULT_PRIORITY_WEIGHT,
priority_weight: float | int = DEFAULT_PRIORITY_WEIGHT,
weight_rule: str | PriorityWeightStrategy = DEFAULT_WEIGHT_RULE,
queue: str = DEFAULT_QUEUE,
pool: str | None = None,
Expand Down Expand Up @@ -1003,9 +1003,9 @@ def __init__(

# At execution_time this becomes a normal dict
self.params: ParamsDict | dict = ParamsDict(params)
if priority_weight is not None and not isinstance(priority_weight, int):
if priority_weight is not None and not isinstance(priority_weight, (int, float)):
raise AirflowException(
f"`priority_weight` for task '{self.task_id}' only accepts integers, "
f"`priority_weight` for task '{self.task_id}' only accepts integers/floats, "
f"received '{type(priority_weight)}'."
)
self.priority_weight = priority_weight
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,11 +530,11 @@ def retry_exponential_backoff(self, value: bool) -> None:
self.partial_kwargs["retry_exponential_backoff"] = value

@property
def priority_weight(self) -> int: # type: ignore[override]
def priority_weight(self) -> int | float: # type: ignore[override]
return self.partial_kwargs.get("priority_weight", DEFAULT_PRIORITY_WEIGHT)

@priority_weight.setter
def priority_weight(self, value: int) -> None:
def priority_weight(self, value: int | float) -> None:
self.partial_kwargs["priority_weight"] = value

@property
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1830,7 +1830,7 @@ class TaskInstance(Base, LoggingMixin):
pool = Column(String(256), nullable=False)
pool_slots = Column(Integer, default=1, nullable=False)
queue = Column(String(256))
priority_weight = Column(Integer)
priority_weight = Column(Float)
operator = Column(String(1000))
custom_operator_name = Column(String(1000))
queued_dttm = Column(UtcDateTime)
Expand Down
2 changes: 1 addition & 1 deletion airflow/task/priority_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def get_weight(self, ti: TaskInstance):
class _DownstreamPriorityWeightStrategy(PriorityWeightStrategy):
"""Priority weight strategy that uses the sum of the priority weights of all downstream tasks."""

def get_weight(self, ti: TaskInstance) -> int:
def get_weight(self, ti: TaskInstance) -> int | float:
if TYPE_CHECKING:
assert ti.task
dag = ti.task.get_dag()
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class MappedClassProtocol(Protocol):
"2.9.0": "1949afb29106",
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"3.0.0": "16cbcb1c8c36",
"3.0.0": "e5426c9eda78",
}


Expand Down
4 changes: 2 additions & 2 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1440,7 +1440,7 @@ export interface components {
pool?: string;
pool_slots?: number;
queue?: string | null;
priority_weight?: number | null;
priority_weight?: (Partial<number> & Partial<number>) | null;
/** @description *Changed in version 2.1.1*&#58; Field becomes nullable. */
operator?: string | null;
/** @description The datetime that the task enter the state QUEUE, also known as queue_at */
Expand Down Expand Up @@ -1654,7 +1654,7 @@ export interface components {
execution_timeout?: components["schemas"]["TimeDelta"];
retry_delay?: components["schemas"]["TimeDelta"];
retry_exponential_backoff?: boolean;
priority_weight?: number;
priority_weight?: Partial<number> & Partial<number>;
weight_rule?: components["schemas"]["WeightRule"];
ui_color?: components["schemas"]["Color"];
ui_fgcolor?: components["schemas"]["Color"];
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
f4379048d3f13f35aaba824c00450c17ad4deea9af82b5498d755a12f8a85a37
4d1e943e9d24879dbcb3510d9ea1b0e13e954a64dea2523a4b35fafece63c6cf
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion docs/apache-airflow/migrations-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ Here's the list of all the Database Migrations that are executed via when you ru
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| Revision ID | Revises ID | Airflow Version | Description |
+=========================+==================+===================+==============================================================+
| ``16cbcb1c8c36`` (head) | ``522625f6d606`` | ``3.0.0`` | Remove redundant index. |
| ``e5426c9eda78`` (head) | ``16cbcb1c8c36`` | ``3.0.0`` | Change TaskInstance priority weight type. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``16cbcb1c8c36`` | ``522625f6d606`` | ``3.0.0`` | Remove redundant index. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
| ``522625f6d606`` | ``1cdc775ca98f`` | ``3.0.0`` | Add tables for backfill. |
+-------------------------+------------------+-------------------+--------------------------------------------------------------+
Expand Down
4 changes: 3 additions & 1 deletion tests/models/test_baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,9 @@ def test_incorrect_default_args(self):
DummyClass(default_args=default_args)

def test_incorrect_priority_weight(self):
error_msg = "`priority_weight` for task 'test_op' only accepts integers, received '<class 'str'>'."
error_msg = (
"`priority_weight` for task 'test_op' only accepts integers/floats, received '<class 'str'>'."
)
with pytest.raises(AirflowException, match=error_msg):
BaseOperator(task_id="test_op", priority_weight="2")

Expand Down

0 comments on commit 7385aad

Please sign in to comment.