Skip to content

Commit

Permalink
restoration + tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jepett0 committed Feb 10, 2025
1 parent 8620cfb commit d47488c
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 2 deletions.
82 changes: 82 additions & 0 deletions ydb/public/lib/ydb_cli/dump/restore_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "restore_import_data.h"
#include "restore_compat.h"

#include <ydb/public/api/protos/ydb_rate_limiter.pb.h>
#include <ydb/public/api/protos/ydb_table.pb.h>
#include <ydb/public/lib/ydb_cli/common/recursive_list.h>
#include <ydb/public/lib/ydb_cli/common/recursive_remove.h>
Expand All @@ -14,6 +15,7 @@

#include <library/cpp/threading/future/core/future.h>

#include <util/generic/deque.h>
#include <util/generic/hash.h>
#include <util/generic/hash_set.h>
#include <util/generic/maybe.h>
Expand All @@ -30,6 +32,7 @@ namespace NYdb::NDump {
using namespace NConsoleClient;
using namespace NImport;
using namespace NOperation;
using namespace NRateLimiter;
using namespace NScheme;
using namespace NTable;
using namespace NTopic;
Expand Down Expand Up @@ -73,6 +76,10 @@ Ydb::Coordination::CreateNodeRequest ReadCoordinationNodeCreationRequest(const T
return ReadProtoFromFile<Ydb::Coordination::CreateNodeRequest>(fsDirPath, log, NDump::NFiles::CreateCoordinationNode());
}

Ydb::RateLimiter::CreateResourceRequest ReadRateLimiterCreationRequest(const TFsPath& fsDirPath, const TLog* log) {
return ReadProtoFromFile<Ydb::RateLimiter::CreateResourceRequest>(fsDirPath, log, NDump::NFiles::CreateRateLimiter());
}

Ydb::Scheme::ModifyPermissionsRequest ReadPermissions(const TFsPath& fsDirPath, const TLog* log) {
return ReadProtoFromFile<Ydb::Scheme::ModifyPermissionsRequest>(fsDirPath, log, NFiles::Permissions());
}
Expand Down Expand Up @@ -194,6 +201,19 @@ TStatus CreateCoordinationNode(
return result;
}

TStatus CreateRateLimiter(
TRateLimiterClient& client,
const std::string& coordinationNodePath,
const std::string& rateLimiterPath,
const Ydb::RateLimiter::CreateResourceRequest& request)
{
const auto settings = TCreateResourceSettings(request);
auto result = RetryFunction([&]() {
return client.CreateResource(coordinationNodePath, rateLimiterPath, settings).ExtractValueSync();
});
return result;
}

} // anonymous

namespace NPrivate {
Expand Down Expand Up @@ -258,6 +278,7 @@ TRestoreClient::TRestoreClient(const TDriver& driver, const std::shared_ptr<TLog
, TableClient(driver)
, TopicClient(driver)
, CoordinationNodeClient(driver)
, RateLimiterClient(driver)
, QueryClient(driver)
, Log(log)
{
Expand Down Expand Up @@ -562,6 +583,63 @@ TRestoreResult TRestoreClient::RestoreTopic(
return Result<TRestoreResult>(dbPath, std::move(result));
}

TRestoreResult TRestoreClient::RestoreRateLimiter(
const TFsPath& fsPath,
const TString& coordinationNodePath,
const TString& rateLimiterPath)
{
LOG_D("Process " << fsPath.GetPath().Quote());

if (auto error = ErrorOnIncomplete(fsPath)) {
return *error;
}

const auto creationRequest = ReadRateLimiterCreationRequest(fsPath, Log.get());
auto result = CreateRateLimiter(RateLimiterClient, coordinationNodePath, rateLimiterPath, creationRequest);
if (result.IsSuccess()) {
LOG_D("Created rate limiter: " << rateLimiterPath.Quote()
<< " dependent on the coordination node: " << coordinationNodePath.Quote()
);
return Result<TRestoreResult>();
}

LOG_E("Failed to create rate limiter: " << rateLimiterPath.Quote()
<< " dependent on the coordination node: " << coordinationNodePath.Quote()
);
return Result<TRestoreResult>(JoinFsPaths(coordinationNodePath, rateLimiterPath), std::move(result));
}

TRestoreResult TRestoreClient::RestoreDependentResources(
const TFsPath& coordinationNodeFsPath, const TString& coordinationNodeDbPath)
{
LOG_I("Restore coordination node's resources " << coordinationNodeFsPath.GetPath().Quote()
<< " to " << coordinationNodeDbPath.Quote()
);

TVector<TFsPath> children;
coordinationNodeFsPath.List(children);
TDeque<TFsPath> pathQueue(children.begin(), children.end());
while (!pathQueue.empty()) {
const auto path = pathQueue.front();
pathQueue.pop_front();
if (path.IsDirectory()) {
if (IsFileExists(path.Child(NFiles::CreateRateLimiter().FileName))) {
const auto result = RestoreRateLimiter(
path, coordinationNodeDbPath, path.RelativeTo(coordinationNodeFsPath).GetPath()
);
if (!result.IsSuccess()) {
return result;
}
}
children.clear();
path.List(children);
pathQueue.insert(pathQueue.end(), children.begin(), children.end());
}

}
return Result<TRestoreResult>();
}

TRestoreResult TRestoreClient::RestoreCoordinationNode(
const TFsPath& fsPath,
const TString& dbPath,
Expand All @@ -583,6 +661,10 @@ TRestoreResult TRestoreClient::RestoreCoordinationNode(
const auto creationRequest = ReadCoordinationNodeCreationRequest(fsPath, Log.get());
auto result = CreateCoordinationNode(CoordinationNodeClient, dbPath, creationRequest);
if (result.IsSuccess()) {
if (auto result = RestoreDependentResources(fsPath, dbPath); !result.IsSuccess()) {
LOG_E("Failed to create coordination node's resources " << dbPath.Quote());
return Result<TRestoreResult>(dbPath, std::move(result));
}
LOG_D("Created " << dbPath.Quote());
return RestorePermissions(fsPath, dbPath, settings, isAlreadyExisting);
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/public/lib/ydb_cli/dump/restore_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <ydb-cpp-sdk/client/import/import.h>
#include <ydb-cpp-sdk/client/operation/operation.h>
#include <ydb-cpp-sdk/client/query/client.h>
#include <ydb-cpp-sdk/client/rate_limiter/rate_limiter.h>
#include <ydb-cpp-sdk/client/scheme/scheme.h>
#include <ydb-cpp-sdk/client/table/table.h>
#include <ydb-cpp-sdk/client/topic/client.h>
Expand Down Expand Up @@ -131,6 +132,8 @@ class TRestoreClient {
TRestoreResult RestoreView(const TFsPath& fsPath, const TString& dbRestoreRoot, const TString& dbPathRelativeToRestoreRoot, const TRestoreSettings& settings, bool isAlreadyExisting);
TRestoreResult RestoreTopic(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, bool isAlreadyExisting);
TRestoreResult RestoreCoordinationNode(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, bool isAlreadyExisting);
TRestoreResult RestoreDependentResources(const TFsPath& fsPath, const TString& dbPath);
TRestoreResult RestoreRateLimiter(const TFsPath& fsPath, const TString& coordinationNodePath, const TString& resourcePath);

TRestoreResult CheckSchema(const TString& dbPath, const NTable::TTableDescription& desc);
TRestoreResult RestoreData(const TFsPath& fsPath, const TString& dbPath, const TRestoreSettings& settings, const NTable::TTableDescription& desc);
Expand All @@ -157,6 +160,7 @@ class TRestoreClient {
NTable::TTableClient TableClient;
NTopic::TTopicClient TopicClient;
NCoordination::TClient CoordinationNodeClient;
NRateLimiter::TRateLimiterClient RateLimiterClient;
NQuery::TQueryClient QueryClient;
std::shared_ptr<TLog> Log;

Expand Down
1 change: 1 addition & 0 deletions ydb/services/ydb/backup_ut/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ PEERDIR(
ydb/public/sdk/cpp/src/client/export
ydb/public/sdk/cpp/src/client/import
ydb/public/sdk/cpp/src/client/operation
ydb/public/sdk/cpp/src/client/rate_limiter
ydb/public/sdk/cpp/src/client/result
ydb/public/sdk/cpp/src/client/table
ydb/public/sdk/cpp/src/client/value
Expand Down
120 changes: 118 additions & 2 deletions ydb/services/ydb/backup_ut/ydb_backup_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <ydb/core/wrappers/ut_helpers/s3_mock.h>

#include <ydb/public/api/protos/draft/ydb_view.pb.h>
#include <ydb/public/api/protos/ydb_rate_limiter.pb.h>
#include <ydb/public/lib/ydb_cli/common/recursive_list.h>
#include <ydb/public/lib/ydb_cli/dump/dump.h>
#include <ydb/public/lib/yson_value/ydb_yson_value.h>
Expand All @@ -12,6 +13,7 @@
#include <ydb-cpp-sdk/client/import/import.h>
#include <ydb-cpp-sdk/client/operation/operation.h>
#include <ydb-cpp-sdk/client/query/client.h>
#include <ydb-cpp-sdk/client/rate_limiter/rate_limiter.h>
#include <ydb-cpp-sdk/client/table/table.h>
#include <ydb-cpp-sdk/client/value/value.h>

Expand All @@ -26,6 +28,7 @@

using namespace NYdb;
using namespace NYdb::NOperation;
using namespace NYdb::NRateLimiter;
using namespace NYdb::NScheme;
using namespace NYdb::NTable;
using namespace NYdb::NView;
Expand All @@ -46,6 +49,34 @@ bool operator==(const TKeyRange& lhs, const TKeyRange& rhs) {

}

namespace NYdb::NRateLimiter {

bool operator==(
const Ydb::RateLimiter::HierarchicalDrrSettings& lhs,
const Ydb::RateLimiter::HierarchicalDrrSettings& rhs
) {
return google::protobuf::util::MessageDifferencer::Equals(lhs, rhs);
}

bool operator==(
const TDescribeResourceResult::THierarchicalDrrProps& lhs,
const TDescribeResourceResult::THierarchicalDrrProps& rhs
) {
Ydb::RateLimiter::HierarchicalDrrSettings left;
lhs.SerializeTo(left);
Ydb::RateLimiter::HierarchicalDrrSettings right;
rhs.SerializeTo(right);
return left == right;
}

bool operator==(const TDescribeResourceResult& lhs, const TDescribeResourceResult& rhs) {
UNIT_ASSERT_C(lhs.IsSuccess(), lhs.GetIssues().ToString());
UNIT_ASSERT_C(rhs.IsSuccess(), rhs.GetIssues().ToString());
return lhs.GetHierarchicalDrrProps() == rhs.GetHierarchicalDrrProps();
}

}

namespace NYdb {

struct TTenantsTestSettings : TKikimrTestSettings {
Expand Down Expand Up @@ -747,8 +778,8 @@ void TestTopicSettingsArePreserved(
}

void CreateCoordinationNode(
NCoordination::TClient& client, const std::string& path, const NCoordination::TCreateNodeSettings& settings)
{
NCoordination::TClient& client, const std::string& path, const NCoordination::TCreateNodeSettings& settings
) {
const auto result = client.CreateNode(path, settings).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}
Expand Down Expand Up @@ -796,6 +827,72 @@ void TestCoordinationNodeSettingsArePreserved(
checkDescription(DescribeCoordinationNode(nodeClient, path), DEBUG_HINT);
}

void CreateRateLimiter(
TRateLimiterClient& client,
const std::string& coordinationNodePath,
const std::string& rateLimiterPath,
const TCreateResourceSettings& settings = {}
) {
const auto result = client.CreateResource(coordinationNodePath, rateLimiterPath, settings).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

TDescribeResourceResult DescribeRateLimiter(
TRateLimiterClient& client,
const std::string& coordinationNodePath,
const std::string& rateLimiterPath
) {
const auto result = client.DescribeResource(coordinationNodePath, rateLimiterPath).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
return result;
}

void DropRateLimiter(
TRateLimiterClient& client,
const std::string& coordinationNodePath,
const std::string& rateLimiterPath
) {
const auto result = client.DropResource(coordinationNodePath, rateLimiterPath).ExtractValueSync();
UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
}

void TestCoordinationNodeResourcesArePreserved(
const std::string& coordinationNodePath,
NCoordination::TClient& nodeClient,
TRateLimiterClient& rateLimiterClient,
TBackupFunction&& backup,
TRestoreFunction&& restore
) {
constexpr std::array rateLimiters = { "root", "root/firstChild", "root/secondChild" };
CreateCoordinationNode(nodeClient, coordinationNodePath, {});
// required settings
const auto settings = TCreateResourceSettings().MaxUnitsPerSecond(5);
for (const auto& rateLimiter : rateLimiters) {
CreateRateLimiter(rateLimiterClient, coordinationNodePath, rateLimiter, settings);
}

std::vector<TDescribeResourceResult> originalDescriptions;
for (const auto& rateLimiter : rateLimiters) {
originalDescriptions.emplace_back(DescribeRateLimiter(rateLimiterClient, coordinationNodePath, rateLimiter));
}

backup();

for (int i = 2; i >= 0; --i) {
DropRateLimiter(rateLimiterClient, coordinationNodePath, rateLimiters[i]);
}
DropCoordinationNode(nodeClient, coordinationNodePath);

restore();
for (int i = 0; i < 3; ++i) {
UNIT_ASSERT_EQUAL_C(
DescribeRateLimiter(rateLimiterClient, coordinationNodePath, rateLimiters[i]),
originalDescriptions[i],
"i: " << i
);
}
}

}

Y_UNIT_TEST_SUITE(BackupRestore) {
Expand Down Expand Up @@ -1035,6 +1132,25 @@ Y_UNIT_TEST_SUITE(BackupRestore) {
);
}

Y_UNIT_TEST(RestoreKesusResources) {
TKikimrWithGrpcAndRootSchema server;
auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%u", server.GetPort())));
NCoordination::TClient nodeClient(driver);
TRateLimiterClient rateLimiterClient(driver);
TTempDir tempDir;
const auto& pathToBackup = tempDir.Path();

const std::string kesus = "/Root/kesus";

TestCoordinationNodeResourcesArePreserved(
kesus,
nodeClient,
rateLimiterClient,
CreateBackupLambda(driver, pathToBackup),
CreateRestoreLambda(driver, pathToBackup)
);
}

void TestTableBackupRestore() {
TKikimrWithGrpcAndRootSchema server;
auto driver = TDriver(TDriverConfig().SetEndpoint(Sprintf("localhost:%u", server.GetPort())));
Expand Down

0 comments on commit d47488c

Please sign in to comment.