Skip to content

Commit

Permalink
branch-3.0: [fix](cloud) Fix async mtmv job retry when meet -230 in c…
Browse files Browse the repository at this point in the history
…loud #47370 (#47482)

Cherry-picked from #47370

Co-authored-by: deardeng <[email protected]>
  • Loading branch information
github-actions[bot] and deardeng authored Feb 2, 2025
1 parent 4ec7f89 commit 35ed789
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.TimeUtils;
Expand Down Expand Up @@ -217,7 +219,12 @@ public void run() throws JobException {
// need get names before exec
Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVPartitionUtil
.generatePartitionSnapshots(context, relation.getBaseTablesOneLevel(), execPartitionNames);
exec(execPartitionNames, tableWithPartKey);
try {
executeWithRetry(execPartitionNames, tableWithPartKey);
} catch (Exception e) {
LOG.error("Execution failed after retries: {}", e.getMessage());
throw new JobException(e.getMessage(), e);
}
completedPartitions.addAll(execPartitionNames);
partitionSnapshots.putAll(execPartitionSnapshots);
}
Expand All @@ -232,6 +239,43 @@ public void run() throws JobException {
}
}

private void executeWithRetry(Set<String> execPartitionNames, Map<TableIf, String> tableWithPartKey)
throws Exception {
int retryCount = 0;
int retryTime = Config.max_query_retry_time;
retryTime = retryTime <= 0 ? 1 : retryTime + 1;
Exception lastException = null;
while (retryCount < retryTime) {
try {
exec(execPartitionNames, tableWithPartKey);
break; // Exit loop if execution is successful
} catch (Exception e) {
if (!(Config.isCloudMode() && e.getMessage().contains(FeConstants.CLOUD_RETRY_E230))) {
throw e; // Re-throw if it's not a retryable exception
}
lastException = e;

int randomMillis = 10 + (int) (Math.random() * 10);
if (retryCount > retryTime / 2) {
randomMillis = 20 + (int) (Math.random() * 10);
}
if (DebugPointUtil.isEnable("MTMVTask.retry.longtime")) {
randomMillis = 1000;
}

retryCount++;
LOG.warn("Retrying execution due to exception: {}. Attempt {}/{}, "
+ "taskId {} execPartitionNames {} lastQueryId {}, randomMillis {}",
e.getMessage(), retryCount, retryTime, getTaskId(),
execPartitionNames, lastQueryId, randomMillis);
if (retryCount >= retryTime) {
throw new Exception("Max retry attempts reached, original: " + lastException);
}
Thread.sleep(randomMillis);
}
}
}

private void exec(Set<String> refreshPartitionNames,
Map<TableIf, String> tableWithPartKey)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.apache.doris.regression.suite.ClusterOptions
import org.apache.doris.regression.util.NodeType
import org.apache.doris.regression.suite.SuiteCluster

suite("test_retry_e-230_async_mtmv_job", 'p0, docker') {
if (!isCloudMode()) {
return
}
def options = new ClusterOptions()
options.enableDebugPoints()
// one master, one observer
options.setFeNum(2)
options.feConfigs.add('max_query_retry_time=100')
options.feConfigs.add('sys_log_verbose_modules=org')
options.setBeNum(1)
options.cloudMode = true
// 1. connect to master
options.connectToFollower = false

def getMvTaskId = { idx ->
def ret = sql_return_maparray """
select * from tasks("type"="mv") order by CreateTime
"""
ret[idx].TaskId
}

def getMvTask = { taskId ->
def ret = sql_return_maparray """
select * from tasks("type"="mv") where TaskId=$taskId
"""
ret
}

for (def j = 0; j < 2; j++) {
docker(options) {
def tbl = 'async_mtmv_job_tbl'
def tbl_view = 'async_mtmv_job_tbl_view'

try {
sql """
CREATE TABLE ${tbl}
( k2 TINYINT, k3 INT not null ) COMMENT "base table"
PARTITION BY LIST(`k3`) (
PARTITION `p1` VALUES IN ('1'),
PARTITION `p2` VALUES IN ('2'),
PARTITION `p3` VALUES IN ('3') )
DISTRIBUTED BY HASH(k2) BUCKETS 2 PROPERTIES ( "replication_num" = "1" );
"""
sql """
INSERT INTO ${tbl} VALUES (1, 1), (2, 2), (3, 3);
"""

def result = sql """select * from ${tbl} order by k2;"""
log.info("insert result : {}", result)
assertEquals([[1, 1], [2, 2], [3, 3]], result)

sql """
CREATE MATERIALIZED VIEW ${tbl_view}
BUILD DEFERRED REFRESH AUTO ON MANUAL
partition by(`k3`) DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ( 'replication_num' = '1', 'refresh_partition_num' = '2' ) AS
SELECT * from ${tbl};
"""

// inject -230 in be
cluster.injectDebugPoints(NodeType.BE, ['CloudTablet.capture_rs_readers.return.e-230' : null])
// first refresh
sql """
REFRESH MATERIALIZED VIEW ${tbl_view} AUTO
"""
def firstTaskId = getMvTaskId(0)
def firstTask
dockerAwaitUntil(100) {
firstTask = getMvTask(firstTaskId)
logger.info("firstTask = {}, Status = {}, bool = {}", firstTask, firstTask.Status, firstTask.Status[0] == "FAILED")
firstTask.Status[0] as String == "FAILED" as String
}

// due to inject -230, so after retry, task still failed
assertTrue(firstTask.ErrorMsg[0].contains("Max retry attempts reached"))


cluster.injectDebugPoints(NodeType.FE, ['MTMVTask.retry.longtime' : null])
// second refresh
sql """
REFRESH MATERIALIZED VIEW ${tbl_view} AUTO
"""
// after 10s, debug point should be cleared, second should retry succ, but cost > 10s
def futrue1 = thread {
Thread.sleep(50 * 1000)
cluster.clearBackendDebugPoints()
}

def begin = System.currentTimeMillis();
def futrue2 = thread {
def secondTaskId = getMvTaskId(1)
def secondTask
dockerAwaitUntil(100, 5) {
secondTask = getMvTask(secondTaskId)
logger.info("secondTask = {}", secondTask)
secondTask.Status[0] == "SUCCESS"
}
}

futrue2.get()
def cost = System.currentTimeMillis() - begin;
log.info("time cost: {}", cost)
futrue1.get()
assertTrue(cost > 50 * 1000)

// check view succ
def ret = sql """select * from $tbl_view order by k2;"""
assertEquals([[1, 1], [2, 2], [3, 3]], ret)
} finally {
cluster.clearFrontendDebugPoints()
cluster.clearBackendDebugPoints()
}
}
// 2. connect to follower
options.connectToFollower = true
}
}

0 comments on commit 35ed789

Please sign in to comment.