Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the enclave dispatcher thread to wait on a condition variable for incoming writes #6863

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
### Fixed

- `ccf.ledger`/`read_ledger.py` previously enforced too strict a condition on node membership when validating ledger files (#6849).
- Restore low CPU usage on idle nodes, which had increased in dev20 (#6816).

## [6.0.0-dev20]

Expand Down
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,7 @@ if(BUILD_TESTS)
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/unit_strings.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/dl_list.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/nonstd.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/ds/test/work_beacon.cpp
)
target_link_libraries(ds_test PRIVATE ${CMAKE_THREAD_LIBS_INIT})

Expand Down
101 changes: 101 additions & 0 deletions src/ds/notifying.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once

#include "ring_buffer.h"
#include "work_beacon.h"

namespace ringbuffer
{
class NotifyingWriter : public AbstractWriter
{
private:
WriterPtr underlying_writer;
ccf::ds::WorkBeaconPtr work_beacon;

public:
NotifyingWriter(const WriterPtr& writer, const ccf::ds::WorkBeaconPtr& wb) :
underlying_writer(writer),
work_beacon(wb)
{}

// After the underlying writer finishes writing a message, notify any
// waiting receivers
void finish(const WriteMarker& marker) override
{
underlying_writer->finish(marker);
work_beacon->notify_work_available();
}

// For all other overrides, defer directly to the underlying writer
WriteMarker prepare(
Message m,
size_t size,
bool wait = true,
size_t* identifier = nullptr) override
{
return underlying_writer->prepare(m, size, wait, identifier);
}

WriteMarker write_bytes(
const WriteMarker& marker, const uint8_t* bytes, size_t size) override
{
return underlying_writer->write_bytes(marker, bytes, size);
}

size_t get_max_message_size() override
{
return underlying_writer->get_max_message_size();
}
};

class NotifyingWriterFactory : public AbstractWriterFactory
{
private:
AbstractWriterFactory& factory_impl;

ccf::ds::WorkBeaconPtr outbound_work_beacon;
ccf::ds::WorkBeaconPtr inbound_work_beacon;

public:
NotifyingWriterFactory(AbstractWriterFactory& impl) :
factory_impl(impl),
outbound_work_beacon(std::make_shared<ccf::ds::WorkBeacon>()),
inbound_work_beacon(std::make_shared<ccf::ds::WorkBeacon>())
{}

ccf::ds::WorkBeaconPtr get_outbound_work_beacon()
{
return outbound_work_beacon;
}

ccf::ds::WorkBeaconPtr get_inbound_work_beacon()
{
return inbound_work_beacon;
}

std::shared_ptr<NotifyingWriter> create_notifying_writer_to_outside()
{
return std::make_shared<NotifyingWriter>(
factory_impl.create_writer_to_outside(), outbound_work_beacon);
}

std::shared_ptr<NotifyingWriter> create_notifying_writer_to_inside()
{
return std::make_shared<NotifyingWriter>(
factory_impl.create_writer_to_inside(), inbound_work_beacon);
}

std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_outside()
override
{
return create_notifying_writer_to_outside();
}

std::shared_ptr<ringbuffer::AbstractWriter> create_writer_to_inside()
override
{
return create_notifying_writer_to_inside();
}
};
}
206 changes: 206 additions & 0 deletions src/ds/test/work_beacon.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.

#include "../work_beacon.h"

#include "ccf/ds/logger.h"

#include <doctest/doctest.h>
#include <functional>
#include <iostream>
#include <optional>
#include <queue>
#include <thread>

using WorkItem = std::function<bool()>;

struct WorkQueue
{
std::mutex mutex;
std::queue<WorkItem> work;

void add_work(WorkItem&& item)
{
std::unique_lock<std::mutex> lock(mutex);
work.push(std::move(item));
}

std::optional<WorkItem> get_work()
{
std::unique_lock<std::mutex> lock(mutex);

std::optional<WorkItem> result = std::nullopt;
if (!work.empty())
{
result = work.front();
work.pop();
}

return result;
}
};

// Do nothing, so that callers simply spin-loop
struct NopBeacon
{
void wait_for_work() {}
void notify_work_available() {}
};

// Simulate what should generally be done in production. Always wait with a
// timeout, rather than indefinitely, to handle senders who forget/fail to
// notify.
struct WaitBeacon
{
ccf::ds::WorkBeacon internal;

void wait_for_work()
{
internal.wait_for_work_with_timeout(std::chrono::milliseconds(100));
}

void notify_work_available()
{
// Sometimes, just forget to notify the receivers
if (rand() % 4 != 0)
{
internal.notify_work_available();
}
else
{
// Instead, waste so much time that the receivers wake up
std::this_thread::sleep_for(std::chrono::milliseconds(110));
}
}
};

// Run a simple task simulation, with some sender and receiver threads passing
// work items around. Return how often the receivers checked the work queue and
// found it empty.
template <typename TBeacon>
size_t run_jobs(size_t n_senders, size_t n_receivers)
{
std::vector<std::thread> senders;
std::vector<std::thread> receivers;

WorkQueue work_queue;
TBeacon beacon;

std::atomic<size_t> workless_wakes = 0;

for (auto i = 0; i < n_senders; ++i)
{
senders.push_back(std::thread(
[&](size_t sender_idx) {
for (auto x = 0; x < 10; ++x)
{
work_queue.add_work([=]() {
std::this_thread::sleep_for(std::chrono::nanoseconds(x * x));
return false;
});
beacon.notify_work_available();
std::this_thread::sleep_for(std::chrono::milliseconds(2));
}

// Add tasks that tells the receiving worker to terminate. Each sender
// is responsible for sending some fraction of terminate tasks, such
// that each receiver receives exactly one.
size_t quota = n_receivers / n_senders;
if (sender_idx == 0)
{
quota += n_receivers % n_senders;
}
for (auto j = 0; j < quota; ++j)
{
work_queue.add_work([&]() { return true; });
beacon.notify_work_available();
}
},
i));
}

for (auto i = 0; i < n_receivers; ++i)
{
receivers.push_back(std::thread([&]() {
while (true)
{
beacon.wait_for_work();

auto task = work_queue.get_work();
if (!task.has_value())
{
++workless_wakes;
}
else
{
const auto task_ret = task.value()();
if (task_ret == true)
{
return;
}
}
}
}));
}

for (auto& sender : senders)
{
sender.join();
}

for (auto& receiver : receivers)
{
receiver.join();
}

return workless_wakes;
}

TEST_CASE("WorkBeacon" * doctest::test_suite("workbeacon"))
{
std::vector<size_t> test_vals{1, 5, 8};
for (auto n_senders : test_vals)
{
for (auto n_receivers : test_vals)
{
{
LOG_INFO_FMT(
"Testing {} senders and {} receivers", n_senders, n_receivers);
// Allow test to be repeated multiple times locally to improve
// confidence in the given bounds
static constexpr auto n_repeats = 1;
for (size_t i = 0; i < n_repeats; ++i)
{
const auto wakes_with_spinloop =
run_jobs<NopBeacon>(n_senders, n_receivers);
const auto wakes_with_waits =
run_jobs<WaitBeacon>(n_senders, n_receivers);
const auto wakes_with_beacon =
run_jobs<ccf::ds::WorkBeacon>(n_senders, n_receivers);

LOG_INFO_FMT(
" {} vs {} vs {}",
wakes_with_beacon,
wakes_with_waits,
wakes_with_spinloop);

// Spin loop should have hundreds of thousands of idle wake-ups.
// Beacon should have 0 idle wake-ups.
// Beacon with timer should have a handful of idle wake-ups, roughly
// proportional to the ratio of receivers / senders, and the total
// test run-time, and the various ad-hoc random parameters in this
// test. But crucially, drastically fewer than the spin-loop

// First assert the order
REQUIRE(wakes_with_beacon == 0);
REQUIRE(wakes_with_beacon <= wakes_with_waits);
REQUIRE(wakes_with_waits < wakes_with_spinloop);

// Then try to be a little more precise, allowing head-room for
// different build configs and cosmic rays
REQUIRE(wakes_with_waits * 10 < wakes_with_spinloop);
}
}
}
}
}
61 changes: 61 additions & 0 deletions src/ds/work_beacon.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once

#define FMT_HEADER_ONLY
#include <condition_variable>
#include <fmt/format.h>
#include <iostream>
#include <memory>
#include <mutex>

namespace ccf::ds
{
class WorkBeacon
{
protected:
std::mutex mutex;
std::condition_variable condition_variable;
size_t work_available = 0;

public:
void wait_for_work()
{
std::unique_lock<std::mutex> lock(mutex);
condition_variable.wait(lock, [this] { return work_available > 0; });
--work_available;
}

// Returns true if condition variable indicated work is available, false if
// timeout was reached
template <typename DurationRep, typename DurationPeriod>
bool wait_for_work_with_timeout(
const std::chrono::duration<DurationRep, DurationPeriod>& timeout)
{
std::unique_lock<std::mutex> lock(mutex);
const auto woke_for_work = condition_variable.wait_for(
lock, timeout, [this] { return work_available > 0; });
if (woke_for_work)
{
if (work_available > 0)
{
--work_available;
}
}

return woke_for_work;
}

void notify_work_available()
{
{
std::lock_guard<std::mutex> lock(mutex);
++work_available;
}

condition_variable.notify_all();
}
};

using WorkBeaconPtr = std::shared_ptr<WorkBeacon>;
}
Loading