Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kesus resources: restore #14335

Merged
merged 2 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/apps/ydb/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
* Include coordination nodes in local backups (`ydb tools dump` and `ydb tools restore`). Rate limiters that utilize the coordination node are saved in the coordination node's backup folder, preserving the existing path hierarchy.
* Fixed a bug where some errors could be ignored when restoring from a local backup.
* Added `ydb workload log import generator` command.
* Queries in `ydb workload run` command are now executed in random order.
Expand Down
82 changes: 82 additions & 0 deletions ydb/public/lib/ydb_cli/dump/restore_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "restore_import_data.h"
#include "restore_compat.h"

#include <ydb/public/api/protos/ydb_rate_limiter.pb.h>
#include <ydb/public/api/protos/ydb_table.pb.h>
#include <ydb/public/lib/ydb_cli/common/recursive_list.h>
#include <ydb/public/lib/ydb_cli/common/recursive_remove.h>
Expand All @@ -14,6 +15,7 @@

#include <library/cpp/threading/future/core/future.h>

#include <util/generic/deque.h>
#include <util/generic/hash.h>
#include <util/generic/hash_set.h>
#include <util/generic/maybe.h>
Expand All @@ -30,6 +32,7 @@ namespace NYdb::NDump {
using namespace NConsoleClient;
using namespace NImport;
using namespace NOperation;
using namespace NRateLimiter;
using namespace NScheme;
using namespace NTable;
using namespace NTopic;
Expand Down Expand Up @@ -73,6 +76,10 @@ Ydb::Coordination::CreateNodeRequest ReadCoordinationNodeCreationRequest(const T
return ReadProtoFromFile<Ydb::Coordination::CreateNodeRequest>(fsDirPath, log, NDump::NFiles::CreateCoordinationNode());
}

Ydb::RateLimiter::CreateResourceRequest ReadRateLimiterCreationRequest(const TFsPath& fsDirPath, const TLog* log) {
return ReadProtoFromFile<Ydb::RateLimiter::CreateResourceRequest>(fsDirPath, log, NDump::NFiles::CreateRateLimiter());
}

Ydb::Scheme::ModifyPermissionsRequest ReadPermissions(const TFsPath& fsDirPath, const TLog* log) {
return ReadProtoFromFile<Ydb::Scheme::ModifyPermissionsRequest>(fsDirPath, log, NFiles::Permissions());
}
Expand Down Expand Up @@ -194,6 +201,19 @@ TStatus CreateCoordinationNode(
return result;
}

TStatus CreateRateLimiter(
TRateLimiterClient& client,
const std::string& coordinationNodePath,
const std::string& rateLimiterPath,
const Ydb::RateLimiter::CreateResourceRequest& request)
{
const auto settings = TCreateResourceSettings(request);
auto result = RetryFunction([&]() {
return client.CreateResource(coordinationNodePath, rateLimiterPath, settings).ExtractValueSync();
});
return result;
}

} // anonymous

namespace NPrivate {
Expand Down Expand Up @@ -258,6 +278,7 @@ TRestoreClient::TRestoreClient(const TDriver& driver, const std::shared_ptr<TLog
, TableClient(driver)
, TopicClient(driver)
, CoordinationNodeClient(driver)
, RateLimiterClient(driver)
, QueryClient(driver)
, Log(log)
{
Expand Down Expand Up @@ -562,6 +583,63 @@ TRestoreResult TRestoreClient::RestoreTopic(
return Result<TRestoreResult>(dbPath, std::move(result));
}

TRestoreResult TRestoreClient::RestoreRateLimiter(
const TFsPath& fsPath,
const TString& coordinationNodePath,
const TString& rateLimiterPath)
{
LOG_D("Process " << fsPath.GetPath().Quote());

if (auto error = ErrorOnIncomplete(fsPath)) {
return *error;
}

const auto creationRequest = ReadRateLimiterCreationRequest(fsPath, Log.get());
auto result = CreateRateLimiter(RateLimiterClient, coordinationNodePath, rateLimiterPath, creationRequest);
if (result.IsSuccess()) {
LOG_D("Created rate limiter: " << rateLimiterPath.Quote()
<< " dependent on the coordination node: " << coordinationNodePath.Quote()
);
return Result<TRestoreResult>();
}

LOG_E("Failed to create rate limiter: " << rateLimiterPath.Quote()
<< " dependent on the coordination node: " << coordinationNodePath.Quote()
);
return Result<TRestoreResult>(JoinFsPaths(coordinationNodePath, rateLimiterPath), std::move(result));
}

TRestoreResult TRestoreClient::RestoreDependentResources(
const TFsPath& coordinationNodeFsPath, const TString& coordinationNodeDbPath)
{
LOG_I("Restore coordination node's resources " << coordinationNodeFsPath.GetPath().Quote()
<< " to " << coordinationNodeDbPath.Quote()
);

TVector<TFsPath> children;
coordinationNodeFsPath.List(children);
TDeque<TFsPath> pathQueue(children.begin(), children.end());
while (!pathQueue.empty()) {
const auto path = pathQueue.front();
pathQueue.pop_front();
if (path.IsDirectory()) {
if (IsFileExists(path.Child(NFiles::CreateRateLimiter().FileName))) {
const auto result = RestoreRateLimiter(
path, coordinationNodeDbPath, path.RelativeTo(coordinationNodeFsPath).GetPath()
);
if (!result.IsSuccess()) {
return result;
}
}
children.clear();
path.List(children);
pathQueue.insert(pathQueue.end(), children.begin(), children.end());
}

}
return Result<TRestoreResult>();
}

TRestoreResult TRestoreClient::RestoreCoordinationNode(
const TFsPath& fsPath,
const TString& dbPath,
Expand All @@ -583,6 +661,10 @@ TRestoreResult TRestoreClient::RestoreCoordinationNode(
const auto creationRequest = ReadCoordinationNodeCreationRequest(fsPath, Log.get());
auto result = CreateCoordinationNode(CoordinationNodeClient, dbPath, creationRequest);
if (result.IsSuccess()) {
if (auto result = RestoreDependentResources(fsPath, dbPath); !result.IsSuccess()) {
LOG_E("Failed to create coordination node's resources " << dbPath.Quote());
jepett0 marked this conversation as resolved.
Show resolved Hide resolved
return Result<TRestoreResult>(dbPath, std::move(result));
}
LOG_D("Created " << dbPath.Quote());
return RestorePermissions(fsPath, dbPath, settings, isAlreadyExisting);
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/public/lib/ydb_cli/dump/restore_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb-cpp-sdk/client/import/import.h>
#include <ydb-cpp-sdk/client/operation/operation.h>
#include <ydb-cpp-sdk/client/query/client.h>
#include <ydb-cpp-sdk/client/rate_limiter/rate_limiter.h>
#include <ydb-cpp-sdk/client/scheme/scheme.h>
#include <ydb-cpp-sdk/client/table/table.h>
#include <ydb-cpp-sdk/client/topic/client.h>
Expand Down Expand Up @@ -131,6 +132,8 @@ class TRestoreClient {
TRestoreResult RestoreView(const TFsPath& fsPath, const TString& dbRestoreRoot, const TString& dbPathRelativeToRestoreRoot, const TRestoreSettings& settings, bool isAlreadyExisting);
TRestoreResult RestoreTopic(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, bool isAlreadyExisting);
TRestoreResult RestoreCoordinationNode(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, bool isAlreadyExisting);
TRestoreResult RestoreDependentResources(const TFsPath& fsPath, const TString& dbPath);
TRestoreResult RestoreRateLimiter(const TFsPath& fsPath, const TString& coordinationNodePath, const TString& resourcePath);

TRestoreResult CheckSchema(const TString& dbPath, const NTable::TTableDescription& desc);
TRestoreResult RestoreData(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc);
Expand All @@ -157,6 +160,7 @@ class TRestoreClient {
NTable::TTableClient TableClient;
NTopic::TTopicClient TopicClient;
NCoordination::TClient CoordinationNodeClient;
NRateLimiter::TRateLimiterClient RateLimiterClient;
NQuery::TQueryClient QueryClient;
std::shared_ptr<TLog> Log;

Expand Down
1 change: 1 addition & 0 deletions ydb/services/ydb/backup_ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ PEERDIR(
ydb/public/sdk/cpp/src/client/export
ydb/public/sdk/cpp/src/client/import
ydb/public/sdk/cpp/src/client/operation
ydb/public/sdk/cpp/src/client/rate_limiter
ydb/public/sdk/cpp/src/client/result
ydb/public/sdk/cpp/src/client/table
ydb/public/sdk/cpp/src/client/value
Expand Down
120 changes: 118 additions & 2 deletions ydb/services/ydb/backup_ut/ydb_backup_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/core/wrappers/ut_helpers/s3_mock.h>

#include <ydb/public/api/protos/draft/ydb_view.pb.h>
#include <ydb/public/api/protos/ydb_rate_limiter.pb.h>
#include <ydb/public/lib/ydb_cli/common/recursive_list.h>
#include <ydb/public/lib/ydb_cli/dump/dump.h>
#include <ydb/public/lib/yson_value/ydb_yson_value.h>
Expand All @@ -12,6 +13,7 @@
#include <ydb-cpp-sdk/client/import/import.h>
#include <ydb-cpp-sdk/client/operation/operation.h>
#include <ydb-cpp-sdk/client/query/client.h>
#include <ydb-cpp-sdk/client/rate_limiter/rate_limiter.h>
#include <ydb-cpp-sdk/client/table/table.h>
#include <ydb-cpp-sdk/client/value/value.h>

Expand All @@ -26,6 +28,7 @@

using namespace NYdb;
using namespace NYdb::NOperation;
using namespace NYdb::NRateLimiter;
using namespace NYdb::NScheme;
using namespace NYdb::NTable;
using namespace NYdb::NView;
Expand All @@ -46,6 +49,34 @@ bool operator==(const TKeyRange& lhs, const TKeyRange& rhs) {

}

namespace NYdb::NRateLimiter {

bool operator==(
const Ydb::RateLimiter::HierarchicalDrrSettings& lhs,
const Ydb::RateLimiter::HierarchicalDrrSettings& rhs
) {
return google::protobuf::util::MessageDifferencer::Equals(lhs, rhs);
}

bool operator==(
const TDescribeResourceResult::THierarchicalDrrProps& lhs,
const TDescribeResourceResult::THierarchicalDrrProps& rhs
) {
Ydb::RateLimiter::HierarchicalDrrSettings left;
lhs.SerializeTo(left);
Ydb::RateLimiter::HierarchicalDrrSettings right;
rhs.SerializeTo(right);
return left == right;
}

bool operator==(const TDescribeResourceResult& lhs, const TDescribeResourceResult& rhs) {
UNIT_ASSERT_C(lhs.IsSuccess(), lhs.GetIssues().ToString());
UNIT_ASSERT_C(rhs.IsSuccess(), rhs.GetIssues().ToString());
return lhs.GetHierarchicalDrrProps() == rhs.GetHierarchicalDrrProps();
}

}

namespace NYdb {

struct TTenantsTestSettings : TKikimrTestSettings {
Expand Down Expand Up @@ -747,8 +778,8 @@ void TestTopicSettingsArePreserved(
}

void CreateCoordinationNode(
NCoordination::TClient& client, const std::string& path, const NCoordination::TCreateNodeSettings& settings)
{
NCoordination::TClient& client, const std::string& path, const NCoordination::TCreateNodeSettings& settings
) {
const auto result = client.CreateNode(path, settings).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
Expand Down Expand Up @@ -796,6 +827,72 @@ void TestCoordinationNodeSettingsArePreserved(
checkDescription(DescribeCoordinationNode(nodeClient, path), DEBUG_HINT);
}

void CreateRateLimiter(
TRateLimiterClient& client,
const std::string& coordinationNodePath,
const std::string& rateLimiterPath,
const TCreateResourceSettings& settings = {}
) {
const auto result = client.CreateResource(coordinationNodePath, rateLimiterPath, settings).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

TDescribeResourceResult DescribeRateLimiter(
TRateLimiterClient& client,
const std::string& coordinationNodePath,
const std::string& rateLimiterPath
) {
const auto result = client.DescribeResource(coordinationNodePath, rateLimiterPath).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
return result;
}

void DropRateLimiter(
TRateLimiterClient& client,
const std::string& coordinationNodePath,
const std::string& rateLimiterPath
) {
const auto result = client.DropResource(coordinationNodePath, rateLimiterPath).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

void TestCoordinationNodeResourcesArePreserved(
const std::string& coordinationNodePath,
NCoordination::TClient& nodeClient,
TRateLimiterClient& rateLimiterClient,
TBackupFunction&& backup,
TRestoreFunction&& restore
) {
constexpr std::array rateLimiters = { "root", "root/firstChild", "root/secondChild" };
CreateCoordinationNode(nodeClient, coordinationNodePath, {});
// required settings
const auto settings = TCreateResourceSettings().MaxUnitsPerSecond(5);
for (const auto& rateLimiter : rateLimiters) {
CreateRateLimiter(rateLimiterClient, coordinationNodePath, rateLimiter, settings);
}

std::vector<TDescribeResourceResult> originalDescriptions;
for (const auto& rateLimiter : rateLimiters) {
originalDescriptions.emplace_back(DescribeRateLimiter(rateLimiterClient, coordinationNodePath, rateLimiter));
}

backup();

for (int i = 2; i >= 0; --i) {
DropRateLimiter(rateLimiterClient, coordinationNodePath, rateLimiters[i]);
}
DropCoordinationNode(nodeClient, coordinationNodePath);

restore();
for (int i = 0; i < 3; ++i) {
UNIT_ASSERT_EQUAL_C(
DescribeRateLimiter(rateLimiterClient, coordinationNodePath, rateLimiters[i]),
originalDescriptions[i],
"i: " << i
);
}
}

}

Y_UNIT_TEST_SUITE(BackupRestore) {
Expand Down Expand Up @@ -1035,6 +1132,25 @@ Y_UNIT_TEST_SUITE(BackupRestore) {
);
}

Y_UNIT_TEST(RestoreKesusResources) {
TKikimrWithGrpcAndRootSchema server;
auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%u", server.GetPort())));
NCoordination::TClient nodeClient(driver);
TRateLimiterClient rateLimiterClient(driver);
TTempDir tempDir;
const auto& pathToBackup = tempDir.Path();

const std::string kesus = "/Root/kesus";

TestCoordinationNodeResourcesArePreserved(
kesus,
nodeClient,
rateLimiterClient,
CreateBackupLambda(driver, pathToBackup),
CreateRestoreLambda(driver, pathToBackup)
);
}

void TestTableBackupRestore() {
TKikimrWithGrpcAndRootSchema server;
auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%u", server.GetPort())));
Expand Down
Loading