From 134e4308cadf9f05c356244260dcc69859e622f1 Mon Sep 17 00:00:00 2001 From: Brent Bovenzi Date: Fri, 31 Jan 2025 10:58:07 -0500 Subject: [PATCH] Add autorefresh to dag page (#46296) * Add autorefresh to dag page * remove areActiveRuns check from task list card * Fix dag header toggle pause * Check if dag is paused, move to custom hook * Address PR feedback --- airflow/ui/src/components/TaskTrySelect.tsx | 7 +- .../components/TriggerDag/TriggerDAGForm.tsx | 4 +- airflow/ui/src/constants/sortParams.ts | 8 +- airflow/ui/src/pages/Dag/Dag.tsx | 19 +- airflow/ui/src/pages/Dag/Header.tsx | 174 ++++++++++-------- airflow/ui/src/pages/Dag/Runs/Runs.tsx | 15 +- airflow/ui/src/pages/Dag/Tasks/TaskCard.tsx | 109 ++++++----- .../ui/src/pages/Dag/Tasks/TaskRecentRuns.tsx | 32 ++-- airflow/ui/src/pages/Dag/Tasks/Tasks.tsx | 45 +---- airflow/ui/src/pages/Run/Run.tsx | 29 +-- airflow/ui/src/pages/Run/TaskInstances.tsx | 13 +- airflow/ui/src/pages/TaskInstance/Details.tsx | 11 +- .../src/pages/TaskInstance/TaskInstance.tsx | 26 ++- airflow/ui/src/queries/useLogs.tsx | 11 +- airflow/ui/src/queries/useTrigger.ts | 18 +- airflow/ui/src/utils/index.ts | 1 + airflow/ui/src/utils/query.ts | 69 +++++++ airflow/ui/src/utils/refresh.ts | 29 --- 18 files changed, 336 insertions(+), 284 deletions(-) create mode 100644 airflow/ui/src/utils/query.ts delete mode 100644 airflow/ui/src/utils/refresh.ts diff --git a/airflow/ui/src/components/TaskTrySelect.tsx b/airflow/ui/src/components/TaskTrySelect.tsx index f4b4cf6c33c56..e1c5a68ec6996 100644 --- a/airflow/ui/src/components/TaskTrySelect.tsx +++ b/airflow/ui/src/components/TaskTrySelect.tsx @@ -21,8 +21,7 @@ import { Button, createListCollection, HStack, VStack, Heading } from "@chakra-u import { useTaskInstanceServiceGetMappedTaskInstanceTries } from "openapi/queries"; import type { TaskInstanceHistoryResponse, TaskInstanceResponse } from "openapi/requests/types.gen"; import { StateBadge } from "src/components/StateBadge"; -import { useConfig } from "src/queries/useConfig"; -import { isStatePending } from "src/utils/refresh"; +import { isStatePending, useAutoRefresh } from "src/utils"; import TaskInstanceTooltip from "./TaskInstanceTooltip"; import { Select } from "./ui"; @@ -43,7 +42,7 @@ export const TaskTrySelect = ({ onSelectTryNumber, selectedTryNumber, taskInstan try_number: finalTryNumber, } = taskInstance; - const autoRefreshInterval = useConfig("auto_refresh_interval") as number; + const refetchInterval = useAutoRefresh({ dagId }); const { data: tiHistory } = useTaskInstanceServiceGetMappedTaskInstanceTries( { @@ -59,7 +58,7 @@ export const TaskTrySelect = ({ onSelectTryNumber, selectedTryNumber, taskInstan // We actually want to use || here // eslint-disable-next-line @typescript-eslint/prefer-nullish-coalescing query.state.data?.task_instances.some((ti) => isStatePending(ti.state)) || isStatePending(state) - ? autoRefreshInterval * 1000 + ? refetchInterval : false, }, ); diff --git a/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx b/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx index e8e02cc7c1f15..1fe47e8c0c95b 100644 --- a/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx +++ b/airflow/ui/src/components/TriggerDag/TriggerDAGForm.tsx @@ -55,7 +55,7 @@ const TriggerDAGForm = ({ dagId, onClose, open }: TriggerDAGFormProps) => { error: errorTrigger, isPending, triggerDagRun, - } = useTrigger({ onSuccessConfirm: onClose }); + } = useTrigger({ dagId, onSuccessConfirm: onClose }); const { conf, setConf } = useParamStore(); const { control, handleSubmit, reset, watch } = useForm({ @@ -85,7 +85,7 @@ const TriggerDAGForm = ({ dagId, onClose, open }: TriggerDAGFormProps) => { const dataIntervalEnd = watch("dataIntervalEnd"); const onSubmit = (data: DagRunTriggerParams) => { - triggerDagRun(dagId, data); + triggerDagRun(data); }; const validateAndPrettifyJson = (value: string) => { diff --git a/airflow/ui/src/constants/sortParams.ts b/airflow/ui/src/constants/sortParams.ts index 21b55730baa35..3ffc36844e82e 100644 --- a/airflow/ui/src/constants/sortParams.ts +++ b/airflow/ui/src/constants/sortParams.ts @@ -24,14 +24,14 @@ export const dagSortOptions = createListCollection({ { label: "Sort by Display Name (Z-A)", value: "-dag_display_name" }, { label: "Sort by Next DAG Run (Earliest-Latest)", value: "next_dagrun" }, { label: "Sort by Next DAG Run (Latest-Earliest)", value: "-next_dagrun" }, - { label: "Sort by Last Run State (A-Z)", value: "last_run_state" }, - { label: "Sort by Last Run State (Z-A)", value: "-last_run_state" }, + { label: "Sort by Latest Run State (A-Z)", value: "last_run_state" }, + { label: "Sort by Latest Run State (Z-A)", value: "-last_run_state" }, { - label: "Sort by Last Run Start Date (Earliest-Latest)", + label: "Sort by Latest Run Start Date (Earliest-Latest)", value: "last_run_start_date", }, { - label: "Sort by Last Run Start Date (Latest-Earliest)", + label: "Sort by Latest Run Start Date (Latest-Earliest)", value: "-last_run_start_date", }, ], diff --git a/airflow/ui/src/pages/Dag/Dag.tsx b/airflow/ui/src/pages/Dag/Dag.tsx index 89aa32eda7944..c7107f98fff71 100644 --- a/airflow/ui/src/pages/Dag/Dag.tsx +++ b/airflow/ui/src/pages/Dag/Dag.tsx @@ -20,6 +20,7 @@ import { useParams } from "react-router-dom"; import { useDagServiceGetDagDetails, useDagsServiceRecentDagRuns } from "openapi/queries"; import { DetailsLayout } from "src/layouts/Details/DetailsLayout"; +import { isStatePending, useAutoRefresh } from "src/utils"; import { Header } from "./Header"; @@ -42,6 +43,8 @@ export const Dag = () => { dagId, }); + const refetchInterval = useAutoRefresh({ dagId }); + // TODO: replace with with a list dag runs by dag id request const { data: runsData, @@ -49,13 +52,25 @@ export const Dag = () => { isLoading: isLoadingRuns, } = useDagsServiceRecentDagRuns({ dagIds: [dagId] }, undefined, { enabled: Boolean(dagId), + refetchInterval: (query) => + query.state.data?.dags + .find((recentDag) => recentDag.dag_id === dagId) + ?.latest_dag_runs.some((run) => isStatePending(run.state)) + ? refetchInterval + : false, }); - const runs = runsData?.dags.find((dagWithRuns) => dagWithRuns.dag_id === dagId)?.latest_dag_runs ?? []; + const dagWithRuns = runsData?.dags.find((recentDag) => recentDag.dag_id === dagId); return ( -
+
isStatePending(dr.state)) && Boolean(refetchInterval), + )} + /> ); }; diff --git a/airflow/ui/src/pages/Dag/Header.tsx b/airflow/ui/src/pages/Dag/Header.tsx index 0c01513adb4a7..bc4b295d64354 100644 --- a/airflow/ui/src/pages/Dag/Header.tsx +++ b/airflow/ui/src/pages/Dag/Header.tsx @@ -16,10 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -import { Box, Flex, Heading, HStack, SimpleGrid, Text } from "@chakra-ui/react"; +import { Box, Flex, Heading, HStack, SimpleGrid, Spinner, Text } from "@chakra-ui/react"; import { FiBookOpen, FiCalendar } from "react-icons/fi"; +import { useParams } from "react-router-dom"; -import type { DAGDetailsResponse, DAGRunResponse } from "openapi/requests/types.gen"; +import type { DAGDetailsResponse, DAGWithLatestDagRunsResponse } from "openapi/requests/types.gen"; import { DagIcon } from "src/assets/DagIcon"; import DagRunInfo from "src/components/DagRunInfo"; import DisplayMarkdownButton from "src/components/DisplayMarkdownButton"; @@ -33,87 +34,98 @@ import { DagTags } from "../DagsList/DagTags"; export const Header = ({ dag, - dagId, - latestRun, + dagWithRuns, + isRefreshing, }: { readonly dag?: DAGDetailsResponse; - readonly dagId?: string; - readonly latestRun?: DAGRunResponse; -}) => ( - - - - - - {dag?.dag_display_name ?? dagId} - {dag !== undefined && ( - - )} - - - {dag ? ( - - {dag.doc_md === null ? undefined : ( - } - mdContent={dag.doc_md} - text="Dag Docs" - /> - )} - - - - ) : undefined} + readonly dagWithRuns?: DAGWithLatestDagRunsResponse; + readonly isRefreshing?: boolean; +}) => { + // We would still like to show the dagId even if the dag object hasn't loaded yet + const { dagId } = useParams(); + const latestRun = dagWithRuns?.latest_dag_runs ? dagWithRuns.latest_dag_runs[0] : undefined; + + return ( + + + + + + {dag?.dag_display_name ?? dagId} + {dag !== undefined && ( + + )} + {isRefreshing ? :
} + + + {dag ? ( + + {dag.doc_md === null ? undefined : ( + } + mdContent={dag.doc_md} + text="Dag Docs" + /> + )} + + + + ) : undefined} + + + + {Boolean(dag?.timetable_summary) ? ( + + + {dag?.timetable_summary} + + + ) : undefined} + + + {Boolean(latestRun) && latestRun !== undefined ? ( + + ) : undefined} + + + {Boolean(dagWithRuns?.next_dagrun) ? ( + + ) : undefined} + +
+
+ + + + Owner: {dag?.owners.join(", ")} + - - - {Boolean(dag?.timetable_summary) ? ( - - - {dag?.timetable_summary} - - - ) : undefined} - - - {Boolean(latestRun) && latestRun !== undefined ? ( - - ) : undefined} - - - {Boolean(dag?.next_dagrun) && dag !== undefined ? ( - - ) : undefined} - -
-
- - - Owner: {dag?.owners.join(", ")} - - - -); + ); +}; diff --git a/airflow/ui/src/pages/Dag/Runs/Runs.tsx b/airflow/ui/src/pages/Dag/Runs/Runs.tsx index a986d5d2130ab..29a1e90fd8616 100644 --- a/airflow/ui/src/pages/Dag/Runs/Runs.tsx +++ b/airflow/ui/src/pages/Dag/Runs/Runs.tsx @@ -40,7 +40,7 @@ import { RunTypeIcon } from "src/components/RunTypeIcon"; import { StateBadge } from "src/components/StateBadge"; import Time from "src/components/Time"; import { Select } from "src/components/ui"; -import { capitalize, getDuration } from "src/utils"; +import { capitalize, getDuration, useAutoRefresh, isStatePending } from "src/utils"; const columns: Array> = [ { @@ -125,11 +125,13 @@ export const Runs = () => { const { setTableURLState, tableURLState } = useTableURLState(); const { pagination, sorting } = tableURLState; const [sort] = sorting; - const orderBy = sort ? `${sort.desc ? "-" : ""}${sort.id}` : "-start_date"; + const orderBy = sort ? `${sort.desc ? "-" : ""}${sort.id}` : "-logical_date"; const filteredState = searchParams.get(STATE_PARAM); - const { data, error, isFetching, isLoading } = useDagRunServiceGetDagRuns( + const refetchInterval = useAutoRefresh({ dagId }); + + const { data, error, isLoading } = useDagRunServiceGetDagRuns( { dagId: dagId ?? "~", limit: pagination.pageSize, @@ -138,7 +140,11 @@ export const Runs = () => { state: filteredState === null ? undefined : [filteredState], }, undefined, - { enabled: !isNaN(pagination.pageSize) }, + { + enabled: !isNaN(pagination.pageSize), + refetchInterval: (query) => + query.state.data?.dag_runs.some((run) => isStatePending(run.state)) ? refetchInterval : false, + }, ); const handleStateChange = useCallback( @@ -197,7 +203,6 @@ export const Runs = () => { data={data?.dag_runs ?? []} errorMessage={} initialState={tableURLState} - isFetching={isFetching} isLoading={isLoading} modelName="Dag Run" onStateChange={setTableURLState} diff --git a/airflow/ui/src/pages/Dag/Tasks/TaskCard.tsx b/airflow/ui/src/pages/Dag/Tasks/TaskCard.tsx index 3476e1850ad5a..e49f563eef52d 100644 --- a/airflow/ui/src/pages/Dag/Tasks/TaskCard.tsx +++ b/airflow/ui/src/pages/Dag/Tasks/TaskCard.tsx @@ -19,58 +19,79 @@ import { Heading, VStack, Box, SimpleGrid, Text, Link } from "@chakra-ui/react"; import { Link as RouterLink } from "react-router-dom"; -import type { TaskResponse, TaskInstanceResponse } from "openapi/requests/types.gen"; +import { useTaskInstanceServiceGetTaskInstances } from "openapi/queries/queries.ts"; +import type { TaskResponse } from "openapi/requests/types.gen"; import { StateBadge } from "src/components/StateBadge"; import TaskInstanceTooltip from "src/components/TaskInstanceTooltip"; import Time from "src/components/Time"; -import { getTaskInstanceLink } from "src/utils/links.ts"; +import { isStatePending, useAutoRefresh } from "src/utils"; +import { getTaskInstanceLink } from "src/utils/links"; import { TaskRecentRuns } from "./TaskRecentRuns.tsx"; type Props = { readonly dagId: string; readonly task: TaskResponse; - readonly taskInstances: Array; }; -export const TaskCard = ({ dagId, task, taskInstances }: Props) => ( - - - - {task.task_display_name ?? task.task_id} - {task.is_mapped ? "[]" : undefined} - - - - - - Operator - - {task.operator_name} - - - - Trigger Rule - - {task.trigger_rule} - - - - Last Instance - - {taskInstances[0] ? ( - - - - - - - ) : undefined} - - {/* TODO: Handled mapped tasks to not plot each map index as a task instance */} - {!task.is_mapped && } - - -); +export const TaskCard = ({ dagId, task }: Props) => { + const refetchInterval = useAutoRefresh({ dagId }); + + const { data } = useTaskInstanceServiceGetTaskInstances( + { + dagId, + dagRunId: "~", + limit: 14, + orderBy: "-logical_date", + taskId: task.task_id ?? "", + }, + undefined, + { + enabled: Boolean(dagId) && Boolean(task.task_id), + refetchInterval: (query) => + query.state.data?.task_instances.some((ti) => isStatePending(ti.state)) ? refetchInterval : false, + }, + ); + + return ( + + + + {task.task_display_name ?? task.task_id} + {task.is_mapped ? "[]" : undefined} + + + + + + Operator + + {task.operator_name} + + + + Trigger Rule + + {task.trigger_rule} + + + + Last Instance + + {data?.task_instances[0] ? ( + + + + + + + ) : undefined} + + {/* TODO: Handled mapped tasks to not plot each map index as a task instance */} + {!task.is_mapped && } + + + ); +}; diff --git a/airflow/ui/src/pages/Dag/Tasks/TaskRecentRuns.tsx b/airflow/ui/src/pages/Dag/Tasks/TaskRecentRuns.tsx index 3a7fc13d44949..8dd9e70ad8df3 100644 --- a/airflow/ui/src/pages/Dag/Tasks/TaskRecentRuns.tsx +++ b/airflow/ui/src/pages/Dag/Tasks/TaskRecentRuns.tsx @@ -51,23 +51,21 @@ export const TaskRecentRuns = ({ return ( - {taskInstancesWithDuration.map((taskInstance) => - taskInstance.state === null ? undefined : ( - - - - - - - - ), - )} + {taskInstancesWithDuration.map((taskInstance) => ( + + + + + + + + ))} ); }; diff --git a/airflow/ui/src/pages/Dag/Tasks/Tasks.tsx b/airflow/ui/src/pages/Dag/Tasks/Tasks.tsx index 6e005fe82e619..ae33c69f66f00 100644 --- a/airflow/ui/src/pages/Dag/Tasks/Tasks.tsx +++ b/airflow/ui/src/pages/Dag/Tasks/Tasks.tsx @@ -19,12 +19,8 @@ import { Heading, Skeleton, Box } from "@chakra-ui/react"; import { useParams } from "react-router-dom"; -import { - useTaskServiceGetTasks, - useTaskInstanceServiceGetTaskInstances, - useDagsServiceRecentDagRuns, -} from "openapi/queries"; -import type { TaskResponse, TaskInstanceResponse } from "openapi/requests/types.gen"; +import { useTaskServiceGetTasks } from "openapi/queries"; +import type { TaskResponse } from "openapi/requests/types.gen"; import { DataTable } from "src/components/DataTable"; import type { CardDef } from "src/components/DataTable/types"; import { ErrorAlert } from "src/components/ErrorAlert"; @@ -32,18 +28,8 @@ import { pluralize } from "src/utils"; import { TaskCard } from "./TaskCard"; -const cardDef = (dagId: string, taskInstances?: Array): CardDef => ({ - card: ({ row }) => ( - instance.task_id === row.task_id) - : [] - } - /> - ), +const cardDef = (dagId: string): CardDef => ({ + card: ({ row }) => , meta: { customSkeleton: , }, @@ -60,25 +46,6 @@ export const Tasks = () => { dagId, }); - const { data: runsData } = useDagsServiceRecentDagRuns({ dagIds: [dagId], dagRunsLimit: 14 }, undefined, { - enabled: Boolean(dagId), - }); - - const runs = runsData?.dags.find((dagWithRuns) => dagWithRuns.dag_id === dagId)?.latest_dag_runs ?? []; - - // TODO: Revisit this endpoint since only 100 task instances are returned and - // only duration is calculated with other attributes unused. - const { data: taskInstancesResponse } = useTaskInstanceServiceGetTaskInstances( - { - dagId, - dagRunId: "~", - logicalDateGte: runs.at(-1)?.logical_date ?? "", - orderBy: "-start_date", - }, - undefined, - { enabled: Boolean(runs[0]?.dag_run_id) }, - ); - return ( @@ -86,14 +53,14 @@ export const Tasks = () => { {pluralize("Task", data ? data.total_entries : 0)} ); diff --git a/airflow/ui/src/pages/Run/Run.tsx b/airflow/ui/src/pages/Run/Run.tsx index 8e2d8ef6f3b81..6689ca1a7adf1 100644 --- a/airflow/ui/src/pages/Run/Run.tsx +++ b/airflow/ui/src/pages/Run/Run.tsx @@ -22,8 +22,7 @@ import { useParams, Link as RouterLink } from "react-router-dom"; import { useDagRunServiceGetDagRun, useDagServiceGetDagDetails } from "openapi/queries"; import { Breadcrumb } from "src/components/ui"; import { DetailsLayout } from "src/layouts/Details/DetailsLayout"; -import { useConfig } from "src/queries/useConfig"; -import { isStatePending } from "src/utils/refresh"; +import { isStatePending, useAutoRefresh } from "src/utils"; import { Header } from "./Header"; @@ -37,7 +36,15 @@ const tabs = [ export const Run = () => { const { dagId = "", runId = "" } = useParams(); - const autoRefreshInterval = useConfig("auto_refresh_interval") as number; + const refetchInterval = useAutoRefresh({ dagId }); + + const { + data: dag, + error: dagError, + isLoading: isLoadinDag, + } = useDagServiceGetDagDetails({ + dagId, + }); const { data: dagRun, @@ -50,19 +57,10 @@ export const Run = () => { }, undefined, { - refetchInterval: (query) => - isStatePending(query.state.data?.state) ? autoRefreshInterval * 1000 : false, + refetchInterval: (query) => (isStatePending(query.state.data?.state) ? refetchInterval : false), }, ); - const { - data: dag, - error: dagError, - isLoading: isLoadinDag, - } = useDagServiceGetDagDetails({ - dagId, - }); - return ( }> @@ -75,7 +73,10 @@ export const Run = () => { {runId} {dagRun === undefined ? undefined : ( -
+
)} ); diff --git a/airflow/ui/src/pages/Run/TaskInstances.tsx b/airflow/ui/src/pages/Run/TaskInstances.tsx index e7e57375b9e8b..7ba8be6e5bfe1 100644 --- a/airflow/ui/src/pages/Run/TaskInstances.tsx +++ b/airflow/ui/src/pages/Run/TaskInstances.tsx @@ -40,10 +40,8 @@ import { StateBadge } from "src/components/StateBadge"; import Time from "src/components/Time"; import { Select } from "src/components/ui"; import { SearchParamsKeys, type SearchParamsKeysType } from "src/constants/searchParams"; -import { useConfig } from "src/queries/useConfig"; -import { capitalize, getDuration } from "src/utils"; +import { capitalize, getDuration, useAutoRefresh, isStatePending } from "src/utils"; import { getTaskInstanceLink } from "src/utils/links"; -import { isStatePending } from "src/utils/refresh"; const columns: Array> = [ { @@ -180,9 +178,9 @@ export const TaskInstances = () => { setSearchParams(searchParams); }; - const autoRefreshInterval = useConfig("auto_refresh_interval") as number; + const refetchInterval = useAutoRefresh({ dagId }); - const { data, error, isFetching, isLoading } = useTaskInstanceServiceGetTaskInstances( + const { data, error, isLoading } = useTaskInstanceServiceGetTaskInstances( { dagId, dagRunId: runId, @@ -196,9 +194,7 @@ export const TaskInstances = () => { { enabled: !isNaN(pagination.pageSize), refetchInterval: (query) => - query.state.data?.task_instances.some((ti) => isStatePending(ti.state)) - ? autoRefreshInterval * 1000 - : false, + query.state.data?.task_instances.some((ti) => isStatePending(ti.state)) ? refetchInterval : false, }, ); @@ -258,7 +254,6 @@ export const TaskInstances = () => { data={data?.task_instances ?? []} errorMessage={} initialState={tableURLState} - isFetching={isFetching} isLoading={isLoading} modelName="Task Instance" onStateChange={setTableURLState} diff --git a/airflow/ui/src/pages/TaskInstance/Details.tsx b/airflow/ui/src/pages/TaskInstance/Details.tsx index dc7074f16476d..9cc8a756572c7 100644 --- a/airflow/ui/src/pages/TaskInstance/Details.tsx +++ b/airflow/ui/src/pages/TaskInstance/Details.tsx @@ -27,9 +27,7 @@ import { StateBadge } from "src/components/StateBadge"; import { TaskTrySelect } from "src/components/TaskTrySelect"; import Time from "src/components/Time"; import { ClipboardRoot, ClipboardIconButton } from "src/components/ui"; -import { useConfig } from "src/queries/useConfig"; -import { getDuration } from "src/utils"; -import { isStatePending } from "src/utils/refresh"; +import { getDuration, useAutoRefresh, isStatePending } from "src/utils"; export const Details = () => { const { dagId = "", runId = "", taskId = "" } = useParams(); @@ -39,8 +37,6 @@ export const Details = () => { const tryNumberParam = searchParams.get("try_number"); const mapIndex = parseInt(mapIndexParam ?? "-1", 10); - const autoRefreshInterval = useConfig("auto_refresh_interval") as number; - const { data: taskInstance } = useTaskInstanceServiceGetMappedTaskInstance({ dagId, dagRunId: runId, @@ -59,6 +55,8 @@ export const Details = () => { const tryNumber = tryNumberParam === null ? taskInstance?.try_number : parseInt(tryNumberParam, 10); + const refetchInterval = useAutoRefresh({ dagId }); + const { data: tryInstance } = useTaskInstanceServiceGetTaskInstanceTryDetails( { dagId, @@ -69,8 +67,7 @@ export const Details = () => { }, undefined, { - refetchInterval: (query) => - isStatePending(query.state.data?.state) ? autoRefreshInterval * 1000 : false, + refetchInterval: (query) => (isStatePending(query.state.data?.state) ? refetchInterval : false), }, ); diff --git a/airflow/ui/src/pages/TaskInstance/TaskInstance.tsx b/airflow/ui/src/pages/TaskInstance/TaskInstance.tsx index fc5b46883ed0f..8e0013a7dfda3 100644 --- a/airflow/ui/src/pages/TaskInstance/TaskInstance.tsx +++ b/airflow/ui/src/pages/TaskInstance/TaskInstance.tsx @@ -22,8 +22,7 @@ import { useParams, Link as RouterLink, useSearchParams } from "react-router-dom import { useDagServiceGetDagDetails, useTaskInstanceServiceGetMappedTaskInstance } from "openapi/queries"; import { Breadcrumb } from "src/components/ui"; import { DetailsLayout } from "src/layouts/Details/DetailsLayout"; -import { useConfig } from "src/queries/useConfig"; -import { isStatePending } from "src/utils/refresh"; +import { isStatePending, useAutoRefresh } from "src/utils"; import { Header } from "./Header"; @@ -42,7 +41,15 @@ export const TaskInstance = () => { const mapIndexParam = searchParams.get("map_index"); const mapIndex = parseInt(mapIndexParam ?? "-1", 10); - const autoRefreshInterval = useConfig("auto_refresh_interval") as number; + const refetchInterval = useAutoRefresh({ dagId }); + + const { + data: dag, + error: dagError, + isLoading: isDagLoading, + } = useDagServiceGetDagDetails({ + dagId, + }); const { data: taskInstance, @@ -57,19 +64,10 @@ export const TaskInstance = () => { }, undefined, { - refetchInterval: (query) => - isStatePending(query.state.data?.state) ? autoRefreshInterval * 1000 : false, + refetchInterval: (query) => (isStatePending(query.state.data?.state) ? refetchInterval : false), }, ); - const { - data: dag, - error: dagError, - isLoading: isDagLoading, - } = useDagServiceGetDagDetails({ - dagId, - }); - const links = [ { label: "Dags", value: "/dags" }, { label: dag?.dag_display_name ?? dagId, value: `/dags/${dagId}` }, @@ -102,7 +100,7 @@ export const TaskInstance = () => { {taskInstance === undefined ? undefined : (
)} diff --git a/airflow/ui/src/queries/useLogs.tsx b/airflow/ui/src/queries/useLogs.tsx index bf7a164dfa52a..c165e13c0b893 100644 --- a/airflow/ui/src/queries/useLogs.tsx +++ b/airflow/ui/src/queries/useLogs.tsx @@ -20,9 +20,7 @@ import dayjs from "dayjs"; import { useTaskInstanceServiceGetLog } from "openapi/queries"; import type { TaskInstanceResponse } from "openapi/requests/types.gen"; -import { isStatePending } from "src/utils/refresh"; - -import { useConfig } from "./useConfig"; +import { isStatePending, useAutoRefresh } from "src/utils"; type Props = { dagId: string; @@ -62,7 +60,8 @@ const parseLogs = ({ data }: ParseLogsProps) => { }; export const useLogs = ({ dagId, taskInstance, tryNumber = 1 }: Props) => { - const autoRefreshInterval = useConfig("auto_refresh_interval") as number; + const refetchInterval = useAutoRefresh({ dagId }); + const { data, ...rest } = useTaskInstanceServiceGetLog( { dagId, @@ -77,8 +76,8 @@ export const useLogs = ({ dagId, taskInstance, tryNumber = 1 }: Props) => { refetchInterval: (query) => isStatePending(taskInstance?.state) || dayjs(query.state.dataUpdatedAt).isBefore(taskInstance?.end_date) - ? autoRefreshInterval * 1000 - : autoRefreshInterval * 10 * 1000, + ? refetchInterval + : false, }, ); diff --git a/airflow/ui/src/queries/useTrigger.ts b/airflow/ui/src/queries/useTrigger.ts index f21964cc3c141..0a6e6f492abb9 100644 --- a/airflow/ui/src/queries/useTrigger.ts +++ b/airflow/ui/src/queries/useTrigger.ts @@ -24,24 +24,28 @@ import { useDagRunServiceTriggerDagRun, useDagServiceGetDagsKey, useDagsServiceRecentDagRunsKey, + useTaskInstanceServiceGetTaskInstancesKey, } from "openapi/queries"; import type { DagRunTriggerParams } from "src/components/TriggerDag/TriggerDAGForm"; import { toaster } from "src/components/ui"; +import { doQueryKeysMatch, type PartialQueryKey } from "src/utils"; -export const useTrigger = ({ onSuccessConfirm }: { onSuccessConfirm: () => void }) => { +export const useTrigger = ({ dagId, onSuccessConfirm }: { dagId: string; onSuccessConfirm: () => void }) => { const queryClient = useQueryClient(); const [error, setError] = useState(undefined); const [dateValidationError, setDateValidationError] = useState(undefined); const onSuccess = async () => { - const queryKeys = [ - useDagServiceGetDagsKey, - useDagsServiceRecentDagRunsKey, - useDagRunServiceGetDagRunsKey, + const queryKeys: Array = [ + { baseKey: useDagServiceGetDagsKey }, + { baseKey: useDagsServiceRecentDagRunsKey }, + { baseKey: useDagRunServiceGetDagRunsKey, options: { dagIds: [dagId] } }, + { baseKey: useTaskInstanceServiceGetTaskInstancesKey, options: { dagId, dagRunId: "~" } }, ]; - await Promise.all(queryKeys.map((key) => queryClient.invalidateQueries({ queryKey: [key] }))); + await queryClient.invalidateQueries({ predicate: (query) => doQueryKeysMatch(query, queryKeys) }); + toaster.create({ description: "DAG run has been successfully triggered.", title: "DAG Run Request Submitted", @@ -59,7 +63,7 @@ export const useTrigger = ({ onSuccessConfirm }: { onSuccessConfirm: () => void onSuccess, }); - const triggerDagRun = (dagId: string, dagRunRequestBody: DagRunTriggerParams) => { + const triggerDagRun = (dagRunRequestBody: DagRunTriggerParams) => { const parsedConfig = JSON.parse(dagRunRequestBody.conf) as Record; const DataIntervalStart = dagRunRequestBody.dataIntervalStart diff --git a/airflow/ui/src/utils/index.ts b/airflow/ui/src/utils/index.ts index 60357e9470aba..632d24b3f450f 100644 --- a/airflow/ui/src/utils/index.ts +++ b/airflow/ui/src/utils/index.ts @@ -21,3 +21,4 @@ export { capitalize } from "./capitalize"; export { pluralize } from "./pluralize"; export { getDuration } from "./datetime_utils"; export { getMetaKey } from "./getMetaKey"; +export * from "./query"; diff --git a/airflow/ui/src/utils/query.ts b/airflow/ui/src/utils/query.ts new file mode 100644 index 0000000000000..0db7b895903e2 --- /dev/null +++ b/airflow/ui/src/utils/query.ts @@ -0,0 +1,69 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import type { Query } from "@tanstack/react-query"; + +import { useDagServiceGetDagDetails } from "openapi/queries"; +import type { TaskInstanceState } from "openapi/requests/types.gen"; +import { useConfig } from "src/queries/useConfig"; + +export const isStatePending = (state?: TaskInstanceState | null) => + state === "deferred" || + state === "scheduled" || + state === "running" || + state === "up_for_reschedule" || + state === "up_for_retry" || + state === "queued" || + state === "restarting" || + !Boolean(state); + +export type PartialQueryKey = { baseKey: string; options?: Record }; + +// This allows us to specify what query key values we actually care about and ignore the rest +// ex: match everything with this dagId and dagRunId but ignore anything related to pagination +export const doQueryKeysMatch = (query: Query, queryKeysToMatch: Array) => { + const [baseKey, options] = query.queryKey; + + const matchedKey = queryKeysToMatch.find((qk) => qk.baseKey === baseKey); + + if (!matchedKey) { + return false; + } + + return matchedKey.options + ? Object.entries(matchedKey.options).every( + ([key, value]) => typeof options === "object" && (options as Record)[key] === value, + ) + : true; +}; + +export const useAutoRefresh = ({ dagId }: { dagId?: string }) => { + const autoRefreshInterval = useConfig("auto_refresh_interval") as number | undefined; + const { data: dag } = useDagServiceGetDagDetails( + { + dagId: dagId ?? "", + }, + undefined, + { enabled: dagId !== undefined }, + ); + + const canRefresh = autoRefreshInterval !== undefined && (dagId === undefined ? true : !dag?.is_paused); + + // eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion + return (canRefresh ? autoRefreshInterval * 1000 : false) as number | false; +}; diff --git a/airflow/ui/src/utils/refresh.ts b/airflow/ui/src/utils/refresh.ts deleted file mode 100644 index d9ecb618c0e95..0000000000000 --- a/airflow/ui/src/utils/refresh.ts +++ /dev/null @@ -1,29 +0,0 @@ -/*! - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -import type { TaskInstanceState } from "openapi/requests/types.gen"; - -export const isStatePending = (state?: TaskInstanceState | null) => - state === "deferred" || - state === "scheduled" || - state === "running" || - state === "up_for_reschedule" || - state === "up_for_retry" || - state === "queued" || - state === "restarting" || - !Boolean(state);