Skip to content

Commit

Permalink
[Fix](recycler) Further fix for apache#47475 (apache#47486)
Browse files Browse the repository at this point in the history
Related PR: apache#47475
在pr#47475中,我们修复了潜在的少删数据的问题,是通过在delete rowset data函数中添加删除逻辑,删除recycle
tablet中漏删的文件,但是那个pr忽略了其中存在的一个判断条件,导致在recycle tablet中漏删的文件被跳过了,没有实际删除。

在此pr中,tmp
rowset不再依赖于上述判断条件,尽可能删除每一个rowset数据,包括倒排索引v2的数据,但是普通的rowset不会跳过这个判断条件,因为普通rowset数量过大,如果不跳过,可能会影响删除效率。
  • Loading branch information
Yukang-Lian committed Feb 5, 2025
1 parent 3c594a5 commit 5bd342b
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 12 deletions.
24 changes: 15 additions & 9 deletions cloud/src/recycler/recycler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1464,15 +1464,18 @@ int InstanceRecycler::delete_rowset_data(const doris::RowsetMetaCloudPB& rs_meta
return accessor->delete_files(file_paths);
}

int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaCloudPB>& rowsets) {
int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaCloudPB>& rowsets,
RowsetRecyclingState type) {
int ret = 0;
// resource_id -> file_paths
std::map<std::string, std::vector<std::string>> resource_file_paths;
// (resource_id, tablet_id, rowset_id)
std::vector<std::tuple<std::string, int64_t, std::string>> rowsets_delete_by_prefix;

for (const auto& rs : rowsets) {
{
// we have to treat tmp rowset as "orphans" that may not related to any existing tablets
// due to aborted schema change.
if (type == RowsetRecyclingState::FORMAL_ROWSET) {
std::lock_guard lock(recycled_tablets_mtx_);
if (recycled_tablets_.count(rs.tablet_id())) {
continue; // Rowset data has already been deleted
Expand All @@ -1499,7 +1502,7 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
std::vector<std::pair<int64_t, std::string>> index_ids;
// default format as v1.
InvertedIndexStorageFormatPB index_format = InvertedIndexStorageFormatPB::V1;

int inverted_index_get_ret = 0;
if (rs.has_tablet_schema()) {
for (const auto& index : rs.tablet_schema().index()) {
if (index.has_index_type() && index.index_type() == IndexType::INVERTED) {
Expand All @@ -1519,12 +1522,12 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
continue;
}
InvertedIndexInfo index_info;
int get_ret =
inverted_index_get_ret =
inverted_index_id_cache_->get(rs.index_id(), rs.schema_version(), index_info);
if (get_ret == 0) {
if (inverted_index_get_ret == 0) {
index_format = index_info.first;
index_ids = index_info.second;
} else if (get_ret == 1) {
} else if (inverted_index_get_ret == 1) {
// 1. Schema kv not found means tablet has been recycled
// Maybe some tablet recycle failed by some bugs
// We need to delete again to double check
Expand Down Expand Up @@ -1562,7 +1565,10 @@ int InstanceRecycler::delete_rowset_data(const std::vector<doris::RowsetMetaClou
file_paths.push_back(inverted_index_path_v1(tablet_id, rowset_id, i,
index_id.first, index_id.second));
}
} else if (!index_ids.empty()) {
} else if (!index_ids.empty() || inverted_index_get_ret == 1) {
// try to recycle inverted index v2 when get_ret == 1
// we treat schema not found as if it has a v2 format inverted index
// to reduce chance of data leakage
file_paths.push_back(inverted_index_path_v2(tablet_id, rowset_id, i));
}
}
Expand Down Expand Up @@ -2028,7 +2034,7 @@ int InstanceRecycler::recycle_rowsets() {
rowsets_to_delete.swap(rowsets);
worker_pool->submit([&, rowset_keys_to_delete = std::move(rowset_keys_to_delete),
rowsets_to_delete = std::move(rowsets_to_delete)]() {
if (delete_rowset_data(rowsets_to_delete) != 0) {
if (delete_rowset_data(rowsets_to_delete, RowsetRecyclingState::FORMAL_ROWSET) != 0) {
LOG(WARNING) << "failed to delete rowset data, instance_id=" << instance_id_;
return;
}
Expand Down Expand Up @@ -2225,7 +2231,7 @@ int InstanceRecycler::recycle_tmp_rowsets() {
tmp_rowset_keys.clear();
tmp_rowsets.clear();
});
if (delete_rowset_data(tmp_rowsets) != 0) {
if (delete_rowset_data(tmp_rowsets, RowsetRecyclingState::TMP_ROWSET) != 0) {
LOG(WARNING) << "failed to delete tmp rowset data, instance_id=" << instance_id_;
return -1;
}
Expand Down
8 changes: 7 additions & 1 deletion cloud/src/recycler/recycler.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ class Recycler {
std::shared_ptr<TxnLazyCommitter> txn_lazy_committer_;
};

enum class RowsetRecyclingState {
FORMAL_ROWSET,
TMP_ROWSET,
};

class InstanceRecycler {
public:
explicit InstanceRecycler(std::shared_ptr<TxnKv> txn_kv, const InstanceInfoPB& instance,
Expand Down Expand Up @@ -222,7 +227,8 @@ class InstanceRecycler {
const std::string& rowset_id);

// return 0 for success otherwise error
int delete_rowset_data(const std::vector<doris::RowsetMetaCloudPB>& rowsets);
int delete_rowset_data(const std::vector<doris::RowsetMetaCloudPB>& rowsets,
RowsetRecyclingState type);

/**
* Get stage storage info from instance and init StorageVaultAccessor
Expand Down
5 changes: 3 additions & 2 deletions cloud/test/recycler_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1129,6 +1129,7 @@ TEST(RecyclerTest, recycle_indexes) {
j & 1);
auto tmp_rowset = create_rowset("recycle_tmp_rowsets", tablet_id, index_id, 5,
schemas[j % 5], txn_id_base + j);
tmp_rowset.set_resource_id("recycle_indexes");
create_tmp_rowset(txn_kv.get(), accessor.get(), tmp_rowset, j & 1);
}
for (int j = 0; j < 10; ++j) {
Expand Down Expand Up @@ -3132,7 +3133,7 @@ TEST(RecyclerTest, delete_rowset_data) {

rowset_pbs.emplace_back(std::move(rowset));
}
ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs));
ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, RowsetRecyclingState::FORMAL_ROWSET));
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_all(&list_iter));
ASSERT_FALSE(list_iter->has_next());
Expand Down Expand Up @@ -3237,7 +3238,7 @@ TEST(RecyclerTest, delete_rowset_data_without_inverted_index_storage_format) {

rowset_pbs.emplace_back(std::move(rowset));
}
ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs));
ASSERT_EQ(0, recycler.delete_rowset_data(rowset_pbs, RowsetRecyclingState::FORMAL_ROWSET));
std::unique_ptr<ListIterator> list_iter;
ASSERT_EQ(0, accessor->list_all(&list_iter));
ASSERT_FALSE(list_iter->has_next());
Expand Down

0 comments on commit 5bd342b

Please sign in to comment.