Skip to content

Commit

Permalink
wip - port partition_translator into scheduler interface
Browse files Browse the repository at this point in the history
  • Loading branch information
bharathv committed Feb 18, 2025
1 parent bb3b0ed commit 82bcd17
Show file tree
Hide file tree
Showing 14 changed files with 1,159 additions and 190 deletions.
2 changes: 2 additions & 0 deletions src/v/datalake/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,15 @@ redpanda_cc_library(
"//src/v/datalake/coordinator:catalog_factory",
"//src/v/datalake/coordinator:frontend",
"//src/v/datalake/translation:partition_translator",
"//src/v/datalake/translation:scheduler",
"//src/v/features",
"//src/v/model",
"//src/v/pandaproxy",
"//src/v/raft",
"//src/v/schema:registry",
"//src/v/serde",
"//src/v/ssx:semaphore",
"//src/v/ssx:work_queue",
"//src/v/utils:directory_walker",
"@fmt",
"@seastar",
Expand Down
2 changes: 1 addition & 1 deletion src/v/datalake/coordinator/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ redpanda_cc_library(
"types.h",
],
include_prefix = "datalake/coordinator",
visibility = [":__subpackages__"],
visibility = ["//visibility:public"],
deps = [
":translated_offset_range",
"//src/v/datalake:schema_identifier",
Expand Down
270 changes: 123 additions & 147 deletions src/v/datalake/datalake_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@

#include <memory>

// TODO(iceberg): Make me configurable
static constexpr auto scheduler_block_size_default = 4_MiB;
static constexpr auto scheduler_concurrent_translations = 4;
static constexpr auto scheduler_time_slice = 30s;
static constexpr std::string_view iceberg_data_path_prefix = "data";

namespace datalake {

namespace {
Expand Down Expand Up @@ -101,34 +107,28 @@ datalake_manager::datalake_manager(
.cache_size = 50, .small_size = 10}))
, _as(as)
, _sg(sg)
, _effective_max_translator_buffered_data(
std::min(memory_limit, max_translator_buffered_data))
, _iceberg_commit_interval(
config::shard_local_cfg().iceberg_catalog_commit_interval_ms.bind())
, _iceberg_invalid_record_action(
config::shard_local_cfg().iceberg_invalid_record_action.bind())
, _writer_scratch_space(config::node().datalake_staging_path()) {
vassert(memory_limit > 0, "Memory limit must be greater than 0");
auto max_parallel = static_cast<size_t>(
std::floor(memory_limit / _effective_max_translator_buffered_data));
vlog(
datalake_log.debug,
"Creating datalake manager with memory limit: {}, effective max "
"translator buffered data: {} and max parallel translations: {}",
, _writer_scratch_space(config::node().datalake_staging_path())
, _scheduler(
memory_limit,
_effective_max_translator_buffered_data,
max_parallel);

_parallel_translations = std::make_unique<ssx::semaphore>(
size_t(max_parallel), "datalake_parallel_translations");
}
scheduler_block_size_default,
translation::scheduling::scheduling_policy::make_default(
scheduler_concurrent_translations,
std::chrono::duration_cast<translation::scheduling::clock::duration>(
scheduler_time_slice)))
, _queue(sg, [](const std::exception_ptr& ex) {
vlog(
datalake_log.error, "unexpected error in managing translator: {}", ex);
}) {}
datalake_manager::~datalake_manager() = default;

double datalake_manager::average_translation_backlog() {
size_t total_lag = 0;
size_t translators_with_backlog = 0;
for (const auto& [_, translator] : _translators) {
auto backlog_size = translator->translation_backlog();
const auto& translators = _scheduler.all_translators();
for (const auto& [_, translator] : translators) {
auto backlog_size = translator.status().translation_backlog;
// skip over translators that are not yet ready to report anything
if (!backlog_size) {
continue;
Expand Down Expand Up @@ -172,13 +172,17 @@ ss::future<> datalake_manager::start() {
= _partition_mgr->local().register_manage_notification(
model::kafka_namespace,
[this](ss::lw_shared_ptr<cluster::partition> new_partition) {
on_group_notification(new_partition->ntp());
_queue.submit([this, ntp = new_partition->ntp()]() {
return handle_translator_state_change(ntp);
});
});
auto partition_unmanaged_notification
= _partition_mgr->local().register_unmanage_notification(
model::kafka_namespace, [this](model::topic_partition_view tp) {
model::ntp ntp{model::kafka_namespace, tp.topic, tp.partition};
stop_translator(ntp);
_queue.submit([this, ntp = std::move(ntp)]() {
return handle_translator_state_change(ntp);
});
});
// Handle leadership changes
auto leadership_registration
Expand All @@ -189,7 +193,9 @@ ss::future<> datalake_manager::start() {
std::optional<::model::node_id>) {
auto partition = _partition_mgr->local().partition_for(group);
if (partition) {
on_group_notification(partition->ntp());
_queue.submit([this, ntp = partition->ntp()]() {
return handle_translator_state_change(ntp);
});
}
});

Expand All @@ -202,7 +208,9 @@ ss::future<> datalake_manager::start() {
if (
entry.type
== cluster::topic_table_ntp_delta_type::properties_updated) {
on_group_notification(entry.ntp);
_queue.submit([this, ntp = entry.ntp]() {
return handle_translator_state_change(ntp);
});
}
}
});
Expand All @@ -224,25 +232,27 @@ ss::future<> datalake_manager::start() {
_topic_table->local().unregister_ntp_delta_notification(
topic_properties_registration);
});
_iceberg_commit_interval.watch([this] {
ssx::spawn_with_gate(_gate, [this]() {
for (const auto& [group, _] : _translators) {
on_group_notification(group);
}
});
});
_iceberg_invalid_record_action.watch([this] {
// Copy the keys to avoid iterator invalidation.
chunked_vector<model::ntp> ntps;
ntps.reserve(_translators.size());
for (const auto& [ntp, _] : _translators) {
ntps.push_back(ntp);
}

for (const auto& ntp : ntps) {
on_group_notification(ntp);
for (auto& [_, entry] : _scheduler.all_translators()) {
entry.translator_ptr()->reconcile_properties();
}
});

if (!_features->local().is_active(features::feature::datalake_iceberg_ga)) {
ssx::spawn_with_gate(_gate, [this] {
return _features->local()
.await_feature(
features::feature::datalake_iceberg_ga, _as->local())
.then([this] {
for (const auto& [ntp, _] : _scheduler.all_translators()) {
_queue.submit([this, ntp]() {
return handle_translator_state_change(ntp);
});
}
});
});
}

_schema_cache->start();
_backlog_controller = std::make_unique<backlog_controller>(
[this] { return average_translation_backlog(); }, _sg);
Expand All @@ -251,126 +261,92 @@ ss::future<> datalake_manager::start() {

ss::future<> datalake_manager::stop() {
auto f = _gate.close();
co_await _queue.shutdown();
co_await _backlog_controller->stop();
_deregistrations.clear();
co_await ss::max_concurrent_for_each(
_translators, 32, [](auto& entry) mutable {
return entry.second->stop();
});
co_await _scheduler.stop();
co_await std::move(f);
_schema_cache->stop();
}

std::chrono::milliseconds datalake_manager::translation_interval_ms() const {
// This aims to have multiple translations within a single commit interval
// window. A minimum interval is in place to disallow frequent translations
// and hence tiny parquet files. This is generally optimized for higher
// throughputs that accumulate enough data within a commit interval window.
static constexpr std::chrono::milliseconds min_translation_interval{5s};
return std::max(min_translation_interval, _iceberg_commit_interval() / 3);
}

void datalake_manager::on_group_notification(const model::ntp& ntp) {
auto partition = _partition_mgr->local().get(ntp);
if (!partition || !model::is_user_topic(ntp)) {
return;
ss::future<>
datalake_manager::handle_translator_state_change(const model::ntp& ntp) {
if (_gate.is_closed() || !model::is_user_topic(ntp)) {
co_return;
}
auto partition = _partition_mgr->local().get(ntp);
auto is_leader = partition && partition->raft()->is_leader();
const auto& topic_cfg = _topic_table->local().get_topic_cfg(
model::topic_namespace_view{ntp});
if (!topic_cfg) {
return;
}
auto it = _translators.find(ntp);
// todo(iceberg) handle topic / partition disabling
auto iceberg_disabled = topic_cfg->properties.iceberg_mode
== model::iceberg_mode::disabled;
if (!partition->is_leader() || iceberg_disabled) {
if (it != _translators.end()) {
stop_translator(partition->ntp());
}
return;
}

// By now we know the partition is a leader and iceberg is enabled, so
// there has to be a translator, spin one up if it doesn't already exist.
if (it == _translators.end()) {
start_translator(
partition,
topic_cfg->properties.iceberg_mode,
topic_cfg->properties.iceberg_invalid_record_action.value_or(
config::shard_local_cfg().iceberg_invalid_record_action.value()));
} else {
// check if translation interval changed.
auto target_interval = translation_interval_ms();
if (it->second->translation_interval() != target_interval) {
it->second->reset_translation_interval(target_interval);
}

// check if invalid record action changed.
auto target_action
= topic_cfg->properties.iceberg_invalid_record_action.value_or(
config::shard_local_cfg().iceberg_invalid_record_action.value());

vlog(
datalake_log.warn,
"Invalid record action: {} vs {}",
target_action,
it->second->invalid_record_action());

if (it->second->invalid_record_action() != target_action) {
it->second->reset_invalid_record_action(target_action);
const auto& translators = _scheduler.all_translators();
auto translator_it = translators.find(ntp);
auto translator_exists = translator_it != translators.end();
auto iceberg_disabled = topic_cfg
&& topic_cfg->properties.iceberg_mode
== model::iceberg_mode::disabled;
auto requires_active_translator = partition && topic_cfg
&& !iceberg_disabled && is_leader;
if (translator_exists && !requires_active_translator) {
co_await _scheduler.remove_translator(ntp);
} else if (!translator_exists && requires_active_translator) {
auto mode = topic_cfg->properties.iceberg_mode;
auto type_resolver = make_type_resolver(
mode, *_schema_registry, *_schema_cache);
auto record_translator = make_record_translator(mode);
auto table_creator = translation::make_default_table_creator(
_coordinator_frontend->local());
auto term = partition->term();
auto path = remote_path{
fmt::format("{}/{}/{}", iceberg_data_path_prefix, ntp.path(), term)};
auto& reservations = _scheduler.reservations();
// make a new translator
auto coordinator
= translation::coordinator_api::make_default_coordinator_api(
_coordinator_frontend->local());
auto data_src = translation::data_source::make_default_data_source(
partition);
auto translation_ctx
= translation::translation_context::make_default_translation_context(
local_path{_writer_scratch_space},
partition->ntp(),
partition->get_revision_id(),
*_cloud_data_io,
*_schema_mgr,
std::move(type_resolver),
std::move(record_translator),
std::move(table_creator),
_location_provider,
std::move(path),
*reservations,
_topic_table,
_features);

auto translator = std::make_unique<translation::partition_translator>(
_sg,
std::move(coordinator),
std::move(data_src),
std::move(translation_ctx));

auto add_f = co_await ss::coroutine::as_future(
_scheduler.add_translator(std::move(translator)));

if (add_f.failed() || !add_f.get()) {
add_f.ignore_ready_future();
vlog(
datalake_log.warn, "adding translator failed, retrying in a bit");
if (!_gate.is_closed()) {
_queue.submit_delayed(10s, [this, ntp]() {
return handle_translator_state_change(ntp);
});
}
}
} else if (translator_exists) {
// TODO: add more tests that exercise this code path (to ensure new
// property updates are getting picked up correctly)
translator_it->second.translator_ptr()->reconcile_properties();
}
}

void datalake_manager::start_translator(
ss::lw_shared_ptr<cluster::partition> partition,
model::iceberg_mode mode,
model::iceberg_invalid_record_action invalid_record_action) {
auto it = _translators.find(partition->ntp());
vassert(
it == _translators.end(),
"Attempt to start a translator for ntp {} in term {} while another "
"instance already exists",
partition->ntp(),
partition->term());
auto translator = std::make_unique<translation::partition_translator>(
partition,
_coordinator_frontend,
_features,
&_cloud_data_io,
_location_provider,
_schema_mgr.get(),
make_type_resolver(mode, *_schema_registry, *_schema_cache),
make_record_translator(mode),
translation_interval_ms(),
_sg,
_effective_max_translator_buffered_data,
&_parallel_translations,
invalid_record_action,
_writer_scratch_space);
_translators.emplace(partition->ntp(), std::move(translator));
}

void datalake_manager::stop_translator(const model::ntp& ntp) {
if (_gate.is_closed()) {
// Cleanup should be deferred to stop().
return;
}
auto it = _translators.find(ntp);
if (it == _translators.end()) {
return;
}
auto t = std::move(it->second);
_translators.erase(it);
ssx::spawn_with_gate(_gate, [t = std::move(t)]() mutable {
// Keep 't' alive by capturing it into the finally below. Use the raw
// pointer here to avoid a user-after-move.
auto* t_ptr = t.get();
return t_ptr->stop().finally([_ = std::move(t)] {});
});
}

ss::future<uint64_t> datalake_manager::disk_usage() {
const auto path = config::node().datalake_staging_path();

Expand Down
Loading

0 comments on commit 82bcd17

Please sign in to comment.