Skip to content

Commit

Permalink
Change weight priority type from Integer to Float
Browse files Browse the repository at this point in the history
  • Loading branch information
molcay committed Sep 20, 2024
1 parent a083930 commit a82c385
Show file tree
Hide file tree
Showing 14 changed files with 229 additions and 138 deletions.
10 changes: 8 additions & 2 deletions airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3952,7 +3952,10 @@ components:
type: string
nullable: true
priority_weight:
type: integer
anyOf:
- type: integer
- type: number
format: float
nullable: true
operator:
type: string
Expand Down Expand Up @@ -4340,7 +4343,10 @@ components:
type: boolean
readOnly: true
priority_weight:
type: number
anyOf:
- type: integer
- type: number
format: float
readOnly: true
weight_rule:
$ref: "#/components/schemas/WeightRule"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
Change TaskInstance priority weight type.
Revision ID: e5426c9eda78
Revises: 522625f6d606
Create Date: 2024-09-18 06:38:38.777688
"""

from __future__ import annotations

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "e5426c9eda78"
down_revision = "522625f6d606"
branch_labels = None
depends_on = None
airflow_version = "3.0.0"


def upgrade():
"""Apply Change TaskInstance priority weight type."""
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.alter_column(
"priority_weight", existing_type=sa.INTEGER(), type_=sa.Float(), existing_nullable=True
)


def downgrade():
"""Unapply Change TaskInstance priority weight type."""
with op.batch_alter_table("task_instance", schema=None) as batch_op:
batch_op.alter_column(
"priority_weight", existing_type=sa.Float(), type_=sa.INTEGER(), existing_nullable=True
)
4 changes: 2 additions & 2 deletions airflow/models/abstractoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class AbstractOperator(Templater, DAGNode):
operator_class: type[BaseOperator] | dict[str, Any]

weight_rule: PriorityWeightStrategy
priority_weight: int
priority_weight: float | int

# Defines the operator level extra links.
operator_extra_links: Collection[BaseOperatorLink]
Expand Down Expand Up @@ -450,7 +450,7 @@ def expand_start_trigger_args(self, *, context: Context, session: Session) -> St
raise NotImplementedError()

@property
def priority_weight_total(self) -> int:
def priority_weight_total(self) -> float | int:
"""
Total priority weight for the task. It might include all upstream or downstream tasks.
Expand Down
8 changes: 4 additions & 4 deletions airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def partial(
max_retry_delay: None | timedelta | float | ArgNotSet = NOTSET,
retry_delay: timedelta | float | ArgNotSet = NOTSET,
retry_exponential_backoff: bool | ArgNotSet = NOTSET,
priority_weight: int | ArgNotSet = NOTSET,
priority_weight: float | int | ArgNotSet = NOTSET,
weight_rule: str | PriorityWeightStrategy | ArgNotSet = NOTSET,
sla: timedelta | None | ArgNotSet = NOTSET,
map_index_template: str | None | ArgNotSet = NOTSET,
Expand Down Expand Up @@ -879,7 +879,7 @@ def __init__(
dag: DAG | None = None,
params: collections.abc.MutableMapping | None = None,
default_args: dict | None = None,
priority_weight: int = DEFAULT_PRIORITY_WEIGHT,
priority_weight: float | int = DEFAULT_PRIORITY_WEIGHT,
weight_rule: str | PriorityWeightStrategy = DEFAULT_WEIGHT_RULE,
queue: str = DEFAULT_QUEUE,
pool: str | None = None,
Expand Down Expand Up @@ -1003,9 +1003,9 @@ def __init__(

# At execution_time this becomes a normal dict
self.params: ParamsDict | dict = ParamsDict(params)
if priority_weight is not None and not isinstance(priority_weight, int):
if priority_weight is not None and not isinstance(priority_weight, (int, float)):
raise AirflowException(
f"`priority_weight` for task '{self.task_id}' only accepts integers, "
f"`priority_weight` for task '{self.task_id}' only accepts integers/floats, "
f"received '{type(priority_weight)}'."
)
self.priority_weight = priority_weight
Expand Down
4 changes: 2 additions & 2 deletions airflow/models/mappedoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,11 +530,11 @@ def retry_exponential_backoff(self, value: bool) -> None:
self.partial_kwargs["retry_exponential_backoff"] = value

@property
def priority_weight(self) -> int: # type: ignore[override]
def priority_weight(self) -> int | float: # type: ignore[override]
return self.partial_kwargs.get("priority_weight", DEFAULT_PRIORITY_WEIGHT)

@priority_weight.setter
def priority_weight(self, value: int) -> None:
def priority_weight(self, value: int | float) -> None:
self.partial_kwargs["priority_weight"] = value

@property
Expand Down
2 changes: 1 addition & 1 deletion airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1830,7 +1830,7 @@ class TaskInstance(Base, LoggingMixin):
pool = Column(String(256), nullable=False)
pool_slots = Column(Integer, default=1, nullable=False)
queue = Column(String(256))
priority_weight = Column(Integer)
priority_weight = Column(Float)
operator = Column(String(1000))
custom_operator_name = Column(String(1000))
queued_dttm = Column(UtcDateTime)
Expand Down
2 changes: 1 addition & 1 deletion airflow/task/priority_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def get_weight(self, ti: TaskInstance):
class _DownstreamPriorityWeightStrategy(PriorityWeightStrategy):
"""Priority weight strategy that uses the sum of the priority weights of all downstream tasks."""

def get_weight(self, ti: TaskInstance) -> int:
def get_weight(self, ti: TaskInstance) -> int | float:
if TYPE_CHECKING:
assert ti.task
dag = ti.task.get_dag()
Expand Down
20 changes: 18 additions & 2 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1341,7 +1341,15 @@ export const $TaskInstance = {
nullable: true,
},
priority_weight: {
type: "integer",
anyOf: [
{
type: "integer",
},
{
type: "number",
format: "float",
},
],
nullable: true,
},
operator: {
Expand Down Expand Up @@ -1869,7 +1877,15 @@ export const $Task = {
readOnly: true,
},
priority_weight: {
type: "number",
anyOf: [
{
type: "integer",
},
{
type: "number",
format: "float",
},
],
readOnly: true,
},
weight_rule: {
Expand Down
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class MappedClassProtocol(Protocol):
"2.9.0": "1949afb29106",
"2.9.2": "686269002441",
"2.10.0": "22ed7efa9da2",
"3.0.0": "522625f6d606",
"3.0.0": "e5426c9eda78",
}


Expand Down
4 changes: 2 additions & 2 deletions airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1532,7 +1532,7 @@ export interface components {
pool?: string;
pool_slots?: number;
queue?: string | null;
priority_weight?: number | null;
priority_weight?: (Partial<number> & Partial<number>) | null;
/** @description *Changed in version 2.1.1*&#58; Field becomes nullable. */
operator?: string | null;
/** @description The datetime that the task enter the state QUEUE, also known as queue_at */
Expand Down Expand Up @@ -1746,7 +1746,7 @@ export interface components {
execution_timeout?: components["schemas"]["TimeDelta"];
retry_delay?: components["schemas"]["TimeDelta"];
retry_exponential_backoff?: boolean;
priority_weight?: number;
priority_weight?: Partial<number> & Partial<number>;
weight_rule?: components["schemas"]["WeightRule"];
ui_color?: components["schemas"]["Color"];
ui_fgcolor?: components["schemas"]["Color"];
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
d58a76011bd215ac7a0c42146986ad0eead5f4a9c8ffd7932b1aa1eb61e6e4af
550e06c20ab8acedc478e28281f9275d3075e39628142a4806d0fe1f04b58418
Loading

0 comments on commit a82c385

Please sign in to comment.