diff --git a/README.md b/README.md index 553575d..2c86280 100644 --- a/README.md +++ b/README.md @@ -51,8 +51,12 @@ Via these interfaces, the user can change the structure and parameters of the mo * Delay: * Fixed delay * Trace (compatible with [Link'Em](https://github.com/sys-uos/linkem)) - * Queues: + * Queues and Active Queue Management (AQM) algorithms: * FIFO + * RED + * CoDel + * PIE + * PI2 * Rate limiters / departure processes: * Bitrate * Fixed interval diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 4d60a56..aa0eccb 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -20,7 +20,11 @@ set(flowemu_SRCS modules/meter/DelayMeter.cpp modules/meter/ThroughputMeter.cpp modules/null/NullModule.cpp + modules/queue/CodelQueueModule.cpp modules/queue/FifoQueueModule.cpp + modules/queue/PieQueueModule.cpp + modules/queue/Pi2QueueModule.cpp + modules/queue/RedQueueModule.cpp modules/rate/BitrateRateModule.cpp modules/rate/FixedIntervalRateModule.cpp modules/rate/TraceRateModule.cpp diff --git a/src/modules/ModuleManager.cpp b/src/modules/ModuleManager.cpp index c61f244..66157ba 100644 --- a/src/modules/ModuleManager.cpp +++ b/src/modules/ModuleManager.cpp @@ -37,7 +37,11 @@ #include "meter/DelayMeter.hpp" #include "meter/ThroughputMeter.hpp" #include "null/NullModule.hpp" +#include "queue/CodelQueueModule.hpp" #include "queue/FifoQueueModule.hpp" +#include "queue/PieQueueModule.hpp" +#include "queue/Pi2QueueModule.hpp" +#include "queue/RedQueueModule.hpp" #ifdef MACHINE_LEARNING #include "queue/DQLQueueModule.hpp" #endif @@ -118,8 +122,16 @@ void ModuleManager::addModule(const string &id, const Json::Value &json_root, bo } else if(type == "dql_queue") { new_module = make_shared(io_service, 100, 0.001); #endif + } else if(type == "codel_queue") { + new_module = make_shared(io_service, 100, 5); } else if(type == "fifo_queue") { new_module = make_shared(io_service, 100); + } else if(type == "pie_queue") { + new_module = make_shared(io_service, 100, 15, 150); + } else if(type == "pi2_queue") { + new_module = make_shared(io_service, 100, 15, 100); + } else if(type == "red_queue") { + new_module = make_shared(io_service, 100, 0.002, 15, 45, 0.1, 1.0); } else if(type == "bitrate_rate") { new_module = make_shared(io_service, 1000000); } else if(type == "fixed_interval_rate") { diff --git a/src/modules/queue/CodelQueueModule.cpp b/src/modules/queue/CodelQueueModule.cpp new file mode 100644 index 0000000..894b7de --- /dev/null +++ b/src/modules/queue/CodelQueueModule.cpp @@ -0,0 +1,208 @@ +/* + * FlowEmu - Flow-Based Network Emulator + * Copyright (c) 2022 Institute of Communication Networks (ComNets), + * Hamburg University of Technology (TUHH), + * https://www.tuhh.de/comnets + * Copyright (c) 2022 Jesper Dell Missier + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +/* +* Implemented using: +* Nichols, Kathleen and Van Jacobson (2012) Appendix: CoDel pseudocode. https://queue.acm.org/appendices/codel.html +* Nichols, Kathleen et al. (2018) Controlled Delay Active Queue Management. RFC 8289. +*/ + +#include "CodelQueueModule.hpp" + +#include +#include + +using namespace std; + +CodelQueueModule::CodelQueueModule(boost::asio::io_service &io_service, + size_t buffer_size, size_t target_queue_time) + : timer_statistics(io_service) { + setName("CoDelQueue"); + addPort({"in", "In", PortInfo::Side::left, &input_port}); + addPort({"out", "Out", PortInfo::Side::right, &output_port}); + addParameter({"ecn_mode", "ECN Mode", "", ¶meter_ecn_mode}); + addParameter({"buffer_size", "Buffer", "packets", ¶meter_buffer_size}); + addParameter({"target_queue_time", "Target Queueing Time", "ms", + ¶meter_target_queue_time}); + addStatistic({"queue_length", "Queue", "packets", &statistic_queue_length}); + addStatistic({"count", "Count", "packets", &statistic_count}); + addStatistic( + {"sojourn_time", "Packet Sojourn Time", "ms", &statistic_sojourn_time}); + addStatistic({"packets", "Packets Received", "", &statistic_packets}); + addStatistic( + {"packets_dropped", "Packets Droppped", "", &statistic_packets_dropped}); + addStatistic( + {"packets_marked", "Packets Marked", "", &statistic_packets_marked}); + + input_port.setReceiveHandler( + bind(&CodelQueueModule::enqueue, this, placeholders::_1)); + output_port.setRequestHandler(bind(&CodelQueueModule::dequeue, this)); + + parameter_buffer_size.set(buffer_size); + + parameter_ecn_mode.addChangeHandler([&](bool value) { ecn_mode = value; }); + parameter_ecn_mode.callChangeHandlers(); + + parameter_target_queue_time.addChangeHandler( + [&](double value) { target = ms(static_cast(value)); }); + parameter_target_queue_time.callChangeHandlers(); + + packets = 0; + packets_dropped = 0; + packets_marked = 0; + + // Start statistics timer + timer_statistics.expires_from_now(ms(0)); + timer_statistics.async_wait(boost::bind(&CodelQueueModule::statistics, this, + boost::asio::placeholders::error)); +} + +void CodelQueueModule::enqueue(shared_ptr packet) { + packets++; + if (packet_queue_timestamp.size() >= parameter_buffer_size.get()) { + packets_dropped++; + return; + } + + auto now = chrono::high_resolution_clock::now(); + auto packet_timestamp = packet_with_timestamp{packet, now}; + packet_queue_timestamp.emplace(packet_timestamp); + + output_port.notify(); + + return; +} + +std::shared_ptr CodelQueueModule::dequeue() { + auto now = chrono::high_resolution_clock::now(); + dodequeue_result r = dodequeue(now); + bool ecn_capable_packet = false; + + if (r.packet == nullptr) { + return r.packet; + } + + if (ecn_mode) { + uint8_t ecn = r.packet->getECN(); + if (ecn == 1 || ecn == 2) { + ecn_capable_packet = true; + } + } + + if (dropping_) { + if (!r.ok_to_drop) { + dropping_ = false; + } + while (now >= drop_next_ && dropping_) { + count++; + if (ecn_capable_packet) { + r.packet->setECN(CONGESTION_EXPERIENCED); + drop_next_ = control_law(drop_next_, count); + packets_marked++; + goto end; + } + r = dodequeue(now); + packets_dropped++; + if (!r.ok_to_drop) { + dropping_ = false; + } else { + drop_next_ = control_law(drop_next_, count); + } + } + } else if (r.ok_to_drop) { + if (ecn_capable_packet) { + r.packet->setECN(CONGESTION_EXPERIENCED); + packets_marked++; + } else { + r = dodequeue(now); + packets_dropped++; + } + + dropping_ = true; + + if (now - drop_next_ < 16 * INTERVAL) { + count = count > 2 ? count - 2 : 1; + } else { + count = 1; + } + drop_next_ = control_law(now, count); + } +end: + return r.packet; +} + +CodelQueueModule::time_point CodelQueueModule::control_law(time_point t, + uint32_t count) { + return t + std::chrono::nanoseconds(static_cast( + std::chrono::duration_cast(INTERVAL) + .count() / + sqrt(count))); +} + +CodelQueueModule::dodequeue_result CodelQueueModule::dodequeue(time_point now) { + packet_with_timestamp packet_timestamped; + shared_ptr packet; + packet_timestamped = packet_with_timestamp{packet, now}; + + if (!packet_queue_timestamp.empty()) { + packet_timestamped = packet_queue_timestamp.front(); + packet_queue_timestamp.pop(); + } + + dodequeue_result r = {packet_timestamped.packet, false}; + if (r.packet == nullptr) { + // queue is empty - we can't be above TARGET + first_above_time_ = time_point(0ms); + return r; + } + + sojourn_time = now - packet_timestamped.arrival_time; + if ((sojourn_time < target) || (packet_queue_timestamp.size() <= MAXPACKET)) { + first_above_time_ = time_point(0ms); + } else { + if (first_above_time_ == time_point(0ms)) { + first_above_time_ = now + INTERVAL; + } else if (now >= first_above_time_) { + r.ok_to_drop = true; + } + } + return r; +} + +void CodelQueueModule::statistics(const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + return; + } + + statistic_queue_length.set(packet_queue_timestamp.size()); + statistic_count.set(count); + statistic_sojourn_time.set( + std::chrono::duration_cast(sojourn_time) + .count()); + statistic_packets.set(packets); + statistic_packets_dropped.set(packets_dropped); + statistic_packets_marked.set(packets_marked); + + timer_statistics.expires_at(timer_statistics.expiry() + + chrono::milliseconds(1)); + timer_statistics.async_wait(boost::bind(&CodelQueueModule::statistics, this, + boost::asio::placeholders::error)); +} diff --git a/src/modules/queue/CodelQueueModule.hpp b/src/modules/queue/CodelQueueModule.hpp new file mode 100644 index 0000000..158bfe2 --- /dev/null +++ b/src/modules/queue/CodelQueueModule.hpp @@ -0,0 +1,105 @@ +/* + * FlowEmu - Flow-Based Network Emulator + * Copyright (c) 2022 Institute of Communication Networks (ComNets), + * Hamburg University of Technology (TUHH), + * https://www.tuhh.de/comnets + * Copyright (c) 2022 Jesper Dell Missier + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + + +#ifndef CODEL_QUEUE_MODULE_HPP +#define CODEL_QUEUE_MODULE_HPP + +#define CONGESTION_EXPERIENCED 3 + +#include +#include +#include +#include +#include +#include + +#include "../../utils/Packet.hpp" +#include "../Module.hpp" + +class CodelQueueModule : public Module { + typedef std::chrono::high_resolution_clock::time_point time_point; + typedef std::chrono::milliseconds ms; + + typedef struct packet_with_timestamp { + std::shared_ptr packet; + time_point arrival_time; + } packet_with_timestamp; + + typedef struct dodequeue_result { + std::shared_ptr packet; + bool ok_to_drop; + } dodequeue_result; + +public: + CodelQueueModule(boost::asio::io_service &io_service, size_t buffer_size, + size_t target_queue_time); + + const char *getType() const { return "codel_queue"; } + +private: + const ms INTERVAL = ms(100); + const uint32_t MAXPACKET = 1; + + bool ecn_mode; + ms target = ms(15); + time_point first_above_time_ = time_point(ms(0)); + time_point drop_next_ = time_point(ms(0)); + std::chrono::nanoseconds sojourn_time = std::chrono::nanoseconds(0); + uint32_t count = 0; + uint32_t lastcount = 0; + bool dropping_ = false; + + ReceivingPort> input_port; + RespondingPort> output_port; + + ParameterBool parameter_ecn_mode = false; + ParameterDouble parameter_buffer_size = { + 100, 1, std::numeric_limits::quiet_NaN(), 1}; + + ParameterDouble parameter_target_queue_time = { + 5, 0, std::numeric_limits::quiet_NaN(), 1}; + + Statistic statistic_queue_length; + Statistic statistic_count; + Statistic statistic_sojourn_time; + Statistic statistic_packets; + Statistic statistic_packets_dropped; + Statistic statistic_packets_marked; + + uint64_t packets; + uint64_t packets_dropped; + uint64_t packets_marked; + + std::queue packet_queue_timestamp; + + void enqueue(std::shared_ptr packet); + std::shared_ptr dequeue(); + + time_point control_law(time_point t, uint32_t count); + + dodequeue_result dodequeue(time_point now); + + boost::asio::high_resolution_timer timer_statistics; + void statistics(const boost::system::error_code &error); +}; + +#endif diff --git a/src/modules/queue/Pi2QueueModule.cpp b/src/modules/queue/Pi2QueueModule.cpp new file mode 100644 index 0000000..e49efdc --- /dev/null +++ b/src/modules/queue/Pi2QueueModule.cpp @@ -0,0 +1,226 @@ +/* + * FlowEmu - Flow-Based Network Emulator + * Copyright (c) 2022 Institute of Communication Networks (ComNets), + * Hamburg University of Technology (TUHH), + * https://www.tuhh.de/comnets + * Copyright (c) 2022 Jesper Dell Missier + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include "Pi2QueueModule.hpp" + +#include + +using namespace std; + +Pi2QueueModule::Pi2QueueModule(boost::asio::io_service &io_service, + size_t buffer_size, + uint64_t qdelay_ref_input, + uint64_t rtt_max_input, uint32_t seed) + : timer_probability_update(io_service), timer_statistics(io_service) { + setName("PI2 Queue"); + addPort({"in", "In", PortInfo::Side::left, &input_port}); + addPort({"out", "Out", PortInfo::Side::right, &output_port}); + addParameter({"ecn_mode", "ECN Mode", "", ¶meter_ecn_mode}); + addParameter({"buffer_size", "Buffer", "packets", ¶meter_buffer_size}); + addParameter({"qdelay_ref", "Target QDelay", "ms", ¶meter_qdelay_ref}); + addParameter({"rtt_max", "Max Expected RTT", "ms", ¶meter_rtt_max}); + addParameter({"seed", "Seed", "", ¶meter_seed}); + addStatistic({"queue_length", "Queue", "packets", &statistic_queue_length}); + addStatistic({"drop_prob", "Drop Prob", "", &statistic_drop_prob}); + addStatistic({"pseudo_p", "Pseudo P", "", &statistic_pseudo_p}); + addStatistic({"queue_delay", "Queue Delay", "ms", &statistic_queue_delay}); + addStatistic({"packets", "Packets Received", "", &statistic_packets}); + addStatistic( + {"packets_dropped", "Packets Droppped", "", &statistic_packets_dropped}); + addStatistic( + {"packets_marked", "Packets Marked", "", &statistic_packets_marked}); + + input_port.setReceiveHandler( + bind(&Pi2QueueModule::enqueue, this, placeholders::_1)); + output_port.setRequestHandler(bind(&Pi2QueueModule::dequeue, this)); + + parameter_buffer_size.set(buffer_size); + + parameter_ecn_mode.addChangeHandler([&](bool value) { ecn_mode = value; }); + parameter_ecn_mode.callChangeHandlers(); + + parameter_rtt_max.addChangeHandler([&](uint64_t value) { + rtt_max = ms(value); + update_alpha_beta(); }); + parameter_rtt_max.callChangeHandlers(); + + parameter_qdelay_ref.addChangeHandler([&](uint64_t value) { + qdelay_ref = ms(value); + update_alpha_beta(); }); + parameter_qdelay_ref.callChangeHandlers(); + + parameter_seed.addChangeHandler([&](double value) { + generator.seed(value); + }); + parameter_seed.set(seed); + distribution.reset(new uniform_real_distribution(0.0, 1.0)); + + + pseudo_p = 0; + drop_prob = 0; + qdelay_current = ms(0); + qdelay_old = ms(0); + + packets = 0; + packets_dropped = 0; + packets_marked = 0; + + // Start timer to update probability periodically + timer_probability_update.expires_from_now(ms(0)); + timer_probability_update.async_wait( + boost::bind(&Pi2QueueModule::calculate_drop_prob, + this, + boost::asio::placeholders::error)); + + // Start statistics timer + timer_statistics.expires_from_now(ms(0)); + timer_statistics.async_wait(boost::bind( + &Pi2QueueModule::statistics, this, boost::asio::placeholders::error)); +} + +void Pi2QueueModule::update_alpha_beta() { + t_update = min(qdelay_ref, rtt_max / 3); + alpha = 0.1 * (static_cast(t_update.count()) / 1000 / + (static_cast(rtt_max.count()) / 1000 * + static_cast(rtt_max.count()) / 1000)); + beta = 0.3 / (static_cast(rtt_max.count()) / 1000); +} + +void Pi2QueueModule::enqueue(shared_ptr packet) { + uint8_t ecn = 0; + if (ecn_mode) { + ecn = packet->getECN(); + } + packets++; + + if (drop_early(ecn)) { + if ((ecn == ECT0) || (ecn == ECT1)) { + packet->setECN(CONGESTION_EXPERIENCED); + packet->updateIPv4HeaderChecksum(); + packets_marked++; + } else { + packets_dropped++; + return; + } + } + if (packet_queue.size() >= parameter_buffer_size.get()) { + if (packet->getECN() == CONGESTION_EXPERIENCED) { + packets_marked--; + } + packets_dropped++; + return; + } + packet_with_timestamp packet_timestamped = { + packet, chrono::high_resolution_clock::now()}; + packet_queue.emplace(packet_timestamped); + output_port.notify(); +} + +std::shared_ptr +Pi2QueueModule::dequeue() { + packet_with_timestamp packet_timestamped; + shared_ptr packet; + + if (!packet_queue.empty()) { + packet_timestamped = packet_queue.front(); + packet_queue.pop(); + packet = packet_timestamped.packet; + qdelay_current = chrono::duration_cast( + chrono::high_resolution_clock::now() - packet_timestamped.arrival_time); + } + + return packet; +} + +bool Pi2QueueModule::drop_early(uint8_t ecn) { + if (packet_queue.size() <= 2) { + return false; + } + + double probability = drop_prob; + // Use pseudo_p if packet supports scalable traffic, indicated by ECT(1); + // drop_prob otherwise + if (ecn == ECT1) { + probability = pseudo_p; + } + + double u = (*distribution)(generator); + if (u <= probability) { + return true; + } else { + return false; + } +} + +void Pi2QueueModule::calculate_drop_prob(const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + return; + } + pseudo_p = + pseudo_p + + alpha * + (static_cast((qdelay_current - qdelay_ref).count()) / 1000) + + beta * (static_cast((qdelay_current - qdelay_old).count()) / 1000); + + // Bound drop probability [0, 1] + if (pseudo_p < 0) { + pseudo_p = 0.0; + } + if (pseudo_p > 1) { + pseudo_p = 1.0; + } + + drop_prob = pseudo_p * pseudo_p; + + if (drop_prob > MAX_PROB) { + drop_prob = MAX_PROB; + } + + qdelay_old = qdelay_current; + + statistic_pseudo_p.set(pseudo_p); + statistic_drop_prob.set(drop_prob); + + timer_probability_update.expires_at(timer_probability_update.expires_at() + + t_update); + timer_probability_update.async_wait( + boost::bind(&Pi2QueueModule::calculate_drop_prob, + this, + boost::asio::placeholders::error)); +} + +void Pi2QueueModule::statistics(const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + return; + } + + statistic_queue_length.set(packet_queue.size()); + + statistic_queue_delay.set(qdelay_current.count()); + statistic_packets.set(packets); + statistic_packets_dropped.set(packets_dropped); + statistic_packets_marked.set(packets_marked); + + timer_statistics.expires_at(timer_statistics.expiry() + + chrono::milliseconds(1)); + timer_statistics.async_wait(boost::bind( + &Pi2QueueModule::statistics, this, boost::asio::placeholders::error)); +} diff --git a/src/modules/queue/Pi2QueueModule.hpp b/src/modules/queue/Pi2QueueModule.hpp new file mode 100644 index 0000000..d8102e0 --- /dev/null +++ b/src/modules/queue/Pi2QueueModule.hpp @@ -0,0 +1,113 @@ +/* + * FlowEmu - Flow-Based Network Emulator + * Copyright (c) 2022 Institute of Communication Networks (ComNets), + * Hamburg University of Technology (TUHH), + * https://www.tuhh.de/comnets + * Copyright (c) 2022 Jesper Dell Missier + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + + +#ifndef PI2_QUEUE_MODULE_HPP +#define PI2_QUEUE_MODULE_HPP + +#define CONGESTION_EXPERIENCED 3 +#define ECT0 2 +#define ECT1 1 + +#define MAX_PROB 0.25 + +#include +#include +#include +#include +#include + +#include "../../utils/Packet.hpp" +#include "../Module.hpp" + +class Pi2QueueModule : public Module { + typedef std::chrono::high_resolution_clock::time_point time_point; + typedef std::chrono::milliseconds ms; + + typedef struct packet_with_timestamp { + std::shared_ptr packet; + time_point arrival_time; + } packet_with_timestamp; + +public: + Pi2QueueModule(boost::asio::io_service &io_service, size_t buffer_size, + uint64_t qdelay_ref_input, uint64_t rtt_max_input, uint32_t seed = 1); + + const char *getType() const { return "pi2_queue"; } + +private: + const double mark_ecnth = 0.1; + + ReceivingPort> input_port; + RespondingPort> output_port; + + ParameterBool parameter_ecn_mode = false; + ParameterDouble parameter_buffer_size = { + 100, 1, std::numeric_limits::quiet_NaN(), 1}; + ParameterDouble parameter_qdelay_ref = { + 15, 0, std::numeric_limits::quiet_NaN(), 1}; + ParameterDouble parameter_rtt_max = {100, 1, + std::numeric_limits::quiet_NaN(), 1}; + ParameterDouble parameter_seed = {1, 0, std::numeric_limits::quiet_NaN(), 1}; + + bool ecn_mode; + ms qdelay_ref; + ms rtt_max; + + ms t_update; + double alpha; + double beta; + double pseudo_p; + + ms qdelay_current; + double drop_prob; + ms qdelay_old; + + uint64_t packets; + uint64_t packets_dropped; + uint64_t packets_marked; + + std::mt19937 generator; + std::unique_ptr> distribution; + + Statistic statistic_queue_length; + Statistic statistic_drop_prob; + Statistic statistic_pseudo_p; + Statistic statistic_queue_delay; + Statistic statistic_packets; + Statistic statistic_packets_dropped; + Statistic statistic_packets_marked; + + std::queue packet_queue; + + void enqueue(std::shared_ptr packet); + std::shared_ptr dequeue(); + + void update_alpha_beta(); + bool drop_early(uint8_t ecn); + boost::asio::high_resolution_timer timer_probability_update; + void calculate_drop_prob(const boost::system::error_code &error); + + boost::asio::high_resolution_timer timer_statistics; + void statistics(const boost::system::error_code &error); +}; + +#endif diff --git a/src/modules/queue/PieQueueModule.cpp b/src/modules/queue/PieQueueModule.cpp new file mode 100644 index 0000000..6595a39 --- /dev/null +++ b/src/modules/queue/PieQueueModule.cpp @@ -0,0 +1,241 @@ +/* + * FlowEmu - Flow-Based Network Emulator + * Copyright (c) 2022 Institute of Communication Networks (ComNets), + * Hamburg University of Technology (TUHH), + * https://www.tuhh.de/comnets + * Copyright (c) 2022 Jesper Dell Missier + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + + +#include "PieQueueModule.hpp" + +#include + +using namespace std; + +PieQueueModule::PieQueueModule(boost::asio::io_service &io_service, + size_t buffer_size, uint64_t qdelay_ref_input, + uint64_t max_burst_input, uint32_t seed) + : timer_probability_update(io_service), timer_statistics(io_service) { + setName("PIE Queue"); + addPort({"in", "In", PortInfo::Side::left, &input_port}); + addPort({"out", "Out", PortInfo::Side::right, &output_port}); + addParameter({"ecn_mode", "ECN Mode", "", ¶meter_ecn_mode}); + addParameter({"buffer_size", "Buffer", "packets", ¶meter_buffer_size}); + addParameter({"qdelay_ref", "QDelay", "ms", ¶meter_qdelay_ref}); + addParameter({"max_burst", "Max Burst", "ms", ¶meter_max_burst}); + addParameter({"seed", "Seed", "", ¶meter_seed}); + addStatistic({"queue_length", "Queue", "packets", &statistic_queue_length}); + addStatistic({"drop_prob", "Drop Prob", "", &statistic_drop_prob}); + addStatistic( + {"qdelay_current", "Current QDelay", "ms", &statistic_qdelay_current}); + addStatistic( + {"burst_allowance", "Burst allowance", "ms", &statistic_burst_allowance}); + addStatistic({"packets", "Packets Received", "", &statistic_packets}); + addStatistic( + {"packets_dropped", "Packets Droppped", "", &statistic_packets_dropped}); + addStatistic( + {"packets_marked", "Packets Marked", "", &statistic_packets_marked}); + + input_port.setReceiveHandler( + bind(&PieQueueModule::enqueue, this, placeholders::_1)); + output_port.setRequestHandler(bind(&PieQueueModule::dequeue, this)); + + parameter_seed.addChangeHandler([&](double value) { + generator.seed(value); + }); + parameter_seed.set(seed); + distribution.reset(new uniform_real_distribution(0.0, 1.0)); + + parameter_ecn_mode.addChangeHandler([&](bool value) { ecn_mode = value; }); + parameter_ecn_mode.callChangeHandlers(); + + parameter_buffer_size.set(buffer_size); + + parameter_qdelay_ref.addChangeHandler( + [&](uint64_t value) { qdelay_ref = ms(value); }); + parameter_qdelay_ref.callChangeHandlers(); + + parameter_max_burst.addChangeHandler( + [&](uint64_t value) { max_burst = ms(value); }); + parameter_max_burst.callChangeHandlers(); + + qdelay_current = ms(0); + burst_allowance = ms(0); + qdelay_old = ms(0); + packets = 0; + packets_dropped = 0; + packets_marked = 0; + + // Start timer to update probability periodically + timer_probability_update.expires_from_now(0ms); + timer_probability_update.async_wait( + boost::bind(&PieQueueModule::calculate_drop_prob, this, + boost::asio::placeholders::error)); + + // Start statistics timer + timer_statistics.expires_from_now(0ms); + timer_statistics.async_wait(boost::bind(&PieQueueModule::statistics, this, + boost::asio::placeholders::error)); +} + +void PieQueueModule::enqueue(shared_ptr packet) { + bool ecn_capable_packet = false; + if (ecn_mode) { + uint8_t ecn = packet->getECN(); + if (ecn == 1 || ecn == 2) { + ecn_capable_packet = true; + } + } + packets++; + + if ((drop_prob == 0) && (qdelay_current < (qdelay_ref / 2)) && + (qdelay_old < (qdelay_ref / 2))) { + burst_allowance = max_burst; + } + if ((burst_allowance == ms(0)) && (drop_early() == true)) { + if (ecn_capable_packet && (drop_prob <= mark_ecnth)) { + packet->setECN(CONGESTION_EXPERIENCED); + packet->updateIPv4HeaderChecksum(); + packets_marked++; + } else { + packets_dropped++; + return; + } + } + if (packet_queue.size() >= parameter_buffer_size.get()) { + if (packet->getECN() == CONGESTION_EXPERIENCED) { + packets_marked--; + } + packets_dropped++; + return; + } + packet_with_timestamp packet_timestamped = { + packet, chrono::high_resolution_clock::now()}; + packet_queue.emplace(packet_timestamped); + output_port.notify(); +} + +std::shared_ptr PieQueueModule::dequeue() { + packet_with_timestamp packet_timestamped; + shared_ptr packet; + + if (!packet_queue.empty()) { + packet_timestamped = packet_queue.front(); + packet_queue.pop(); + packet = packet_timestamped.packet; + qdelay_current = chrono::duration_cast( + chrono::high_resolution_clock::now() - packet_timestamped.arrival_time); + } else { + qdelay_current = ms(0); + } + + return packet; +} + +bool PieQueueModule::drop_early() { + // Dont drop if queue qdel half of reference value and drop_prob is not too + // high dont drop if queue holds 2 or less packets + if (((qdelay_old < (qdelay_ref / 2)) && (drop_prob < 0.2)) || + (packet_queue.size() <= 2)) { + return false; + } + + double u = (*distribution)(generator); + if (u <= drop_prob) { + return true; + } else { + return false; + } +} + +void PieQueueModule::calculate_drop_prob( + const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + return; + } + double p = + alpha * + (static_cast((qdelay_current - qdelay_ref).count()) / 1000) + + beta * + (static_cast((qdelay_current - qdelay_old).count()) / 1000); + + // Automatically adjust de-/increase of drop probability to avoid large + // relative changes + if (drop_prob < 0.000001) { + p /= 2048; + } else if (drop_prob < 0.00001) { + p /= 512; + } else if (drop_prob < 0.0001) { + p /= 128; + } else if (drop_prob < 0.001) { + p /= 32; + } else if (drop_prob < 0.01) { + p /= 8; + } else if (drop_prob < 0.1) { + p /= 2; + } else { + p = p; + } + + drop_prob += p; + + // Decay drop probability if no congestion + if (qdelay_current == ms(0) && qdelay_old == ms(0)) { + drop_prob *= 0.98; + } + + // Bound drop probability [0, 1] + if (drop_prob < 0) { + drop_prob = 0.0; + } + if (drop_prob > 1) { + drop_prob = 1.0; + } + + qdelay_old = qdelay_current; + + if (burst_allowance <= t_update) { + burst_allowance = ms(0); + } else { + burst_allowance = burst_allowance - t_update; + } + + timer_probability_update.expires_at(timer_probability_update.expires_at() + + t_update); + timer_probability_update.async_wait( + boost::bind(&PieQueueModule::calculate_drop_prob, this, + boost::asio::placeholders::error)); +} + +void PieQueueModule::statistics(const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + return; + } + + statistic_queue_length.set(packet_queue.size()); + statistic_drop_prob.set(drop_prob); + statistic_qdelay_current.set(qdelay_current.count()); + statistic_burst_allowance.set(burst_allowance.count()); + + statistic_packets.set(packets); + statistic_packets_dropped.set(packets_dropped); + statistic_packets_marked.set(packets_marked); + + timer_statistics.expires_at(timer_statistics.expiry() + 1ms); + timer_statistics.async_wait(boost::bind(&PieQueueModule::statistics, this, + boost::asio::placeholders::error)); +} diff --git a/src/modules/queue/PieQueueModule.hpp b/src/modules/queue/PieQueueModule.hpp new file mode 100644 index 0000000..3bd7920 --- /dev/null +++ b/src/modules/queue/PieQueueModule.hpp @@ -0,0 +1,107 @@ +/* + * FlowEmu - Flow-Based Network Emulator + * Copyright (c) 2022 Institute of Communication Networks (ComNets), + * Hamburg University of Technology (TUHH), + * https://www.tuhh.de/comnets + * Copyright (c) 2022 Jesper Dell Missier + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + + +#ifndef PIE_QUEUE_MODULE_HPP +#define PIE_QUEUE_MODULE_HPP + +#define CONGESTION_EXPERIENCED 3 + +#include +#include +#include +#include +#include + +#include "../../utils/Packet.hpp" +#include "../Module.hpp" + +class PieQueueModule : public Module { + typedef std::chrono::high_resolution_clock::time_point time_point; + typedef std::chrono::milliseconds ms; + + typedef struct packet_with_timestamp { + std::shared_ptr packet; + time_point arrival_time; + } packet_with_timestamp; + +public: + PieQueueModule(boost::asio::io_service &io_service, size_t buffer_size, + uint64_t qdelay_ref_input, uint64_t max_burst_input, uint32_t seed = 1); + + const char *getType() const { return "pie_queue"; } + +private: + const double alpha = 0.125; + const double beta = 1.25; + const ms t_update = ms(15); + const double mark_ecnth = 0.1; + + ReceivingPort> input_port; + RespondingPort> output_port; + + ParameterBool parameter_ecn_mode = false; + ParameterDouble parameter_buffer_size = { + 100, 1, std::numeric_limits::quiet_NaN(), 1}; + ParameterDouble parameter_qdelay_ref = { + 15, 0, std::numeric_limits::quiet_NaN(), 1}; + ParameterDouble parameter_max_burst = {150, 0, + std::numeric_limits::quiet_NaN(), 1}; + ParameterDouble parameter_seed = {1, 0, std::numeric_limits::quiet_NaN(), 1}; + + Statistic statistic_queue_length; + Statistic statistic_drop_prob; + Statistic statistic_qdelay_current; + Statistic statistic_burst_allowance; + Statistic statistic_packets; + Statistic statistic_packets_dropped; + Statistic statistic_packets_marked; + + bool ecn_mode; + ms qdelay_ref; + ms max_burst; + + ms qdelay_current; + ms burst_allowance; + double drop_prob; + ms qdelay_old; + + uint64_t packets; + uint64_t packets_dropped; + uint64_t packets_marked; + + std::mt19937 generator; + std::unique_ptr> distribution; + + std::queue packet_queue; + + void enqueue(std::shared_ptr packet); + std::shared_ptr dequeue(); + + bool drop_early(); + boost::asio::high_resolution_timer timer_probability_update; + void calculate_drop_prob(const boost::system::error_code &error); + + boost::asio::high_resolution_timer timer_statistics; + void statistics(const boost::system::error_code &error); +}; + +#endif diff --git a/src/modules/queue/RedQueueModule.cpp b/src/modules/queue/RedQueueModule.cpp new file mode 100644 index 0000000..d7233ec --- /dev/null +++ b/src/modules/queue/RedQueueModule.cpp @@ -0,0 +1,203 @@ +/* + * FlowEmu - Flow-Based Network Emulator + * Copyright (c) 2022 Institute of Communication Networks (ComNets), + * Hamburg University of Technology (TUHH), + * https://www.tuhh.de/comnets + * Copyright (c) 2022 Jesper Dell Missier + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + + +#include "RedQueueModule.hpp" + +#include +#include + +using namespace std; + +RedQueueModule::RedQueueModule(boost::asio::io_service &io_service, + size_t buffer_size, double queue_weight, + uint32_t min_threshold_input, + uint32_t max_threshold_input, double max_p, + double transmission_time, uint32_t seed) + : timer_statistics(io_service) { + setName("Red Queue"); + addPort({"in", "In", PortInfo::Side::left, &input_port}); + addPort({"out", "Out", PortInfo::Side::right, &output_port}); + addParameter({"buffer_size", "Buffer", "packets", ¶meter_buffer_size}); + addParameter({"ecn_mode", "ECN Mode", "", ¶meter_ecn_mode}); + addParameter({"queue_weight", "Weight", "", ¶meter_queue_weight}); + addParameter( + {"min_threshold", "Min_th", "packets", ¶meter_min_threshold}); + addParameter( + {"max_threshold", "Max_th", "packets", ¶meter_max_threshold}); + addParameter({"max_p", "Max_p", "", ¶meter_max_p}); + addParameter({"transmission_time", "Avg. packet transmission", "ms", + ¶meter_transmission_time}); + addParameter({"seed", "Seed", "", ¶meter_seed}); + addStatistic({"drop_prob", "Drop Prob", "", &statistic_drop_prob}); + addStatistic({"avg", "Average queue length", "packets", &statistic_avg}); + addStatistic({"count", "Count not dropped", "packets", &statistic_count}); + addStatistic({"queue_length", "Queue", "packets", &statistic_queue_length}); + addStatistic({"packets", "Packets Received", "", &statistic_packets}); + addStatistic({"packets_dropped", "Packets Droppped", "", &statistic_packets_dropped}); + addStatistic({"packets_marked", "Packets Marked", "", &statistic_packets_marked}); + + input_port.setReceiveHandler( + bind(&RedQueueModule::receivePacket, this, placeholders::_1)); + output_port.setRequestHandler(bind(&RedQueueModule::dequeue, this)); + + min_threshold = min_threshold_input; + max_threshold = max_threshold_input; + + parameter_buffer_size.set(buffer_size); + parameter_queue_weight.set(queue_weight); + parameter_min_threshold.addChangeHandler([&](uint64_t value) { + if (value >= max_threshold) { + min_threshold = max_threshold - 1; + parameter_min_threshold.set(min_threshold); + } else { + min_threshold = value; + } + }); + parameter_min_threshold.callChangeHandlers(); + parameter_max_threshold.addChangeHandler([&](uint64_t value) { + if (value <= min_threshold) { + max_threshold = min_threshold + 1; + parameter_max_threshold.set(max_threshold); + } else { + max_threshold = value; + } + }); + parameter_max_threshold.callChangeHandlers(); + parameter_max_p.set(max_p); + parameter_transmission_time.set(transmission_time); + + parameter_ecn_mode.addChangeHandler([&](bool value) { ecn_mode = value; }); + parameter_ecn_mode.callChangeHandlers(); + + distribution.reset(new uniform_real_distribution(0.0, 1.0)); + + parameter_seed.addChangeHandler([&](double value) { + generator.seed(value); + }); + parameter_seed.set(seed); + + q_time = chrono::high_resolution_clock::now(); + + packets = 0; + packets_dropped = 0; + packets_marked = 0; + + // Start statistics timer + timer_statistics.expires_from_now(chrono::milliseconds(0)); + timer_statistics.async_wait(boost::bind(&RedQueueModule::statistics, this, + boost::asio::placeholders::error)); +} + +void RedQueueModule::receivePacket(shared_ptr packet) { + bool packet_marked = false; + uint8_t ecn = 0; + if (ecn_mode) { + ecn = packet->getECN(); + } + packets++; + + // Recalculate Average + if (packet_queue.size() > 0) { + avg = (1 - parameter_queue_weight.get()) * avg + + parameter_queue_weight.get() * packet_queue.size(); + } else { + auto m = chrono::high_resolution_clock::now() - q_time; + avg = pow((1 - parameter_queue_weight.get()), + (chrono::duration_cast(m).count() / + parameter_transmission_time.get())) * + avg; + } + + // calculate marking probability p_a + if ((min_threshold <= avg) && (avg < max_threshold)) { + count++; + double p_b = parameter_max_p.get() * (avg - min_threshold) / + (max_threshold - min_threshold); + p_a = p_b / (1 - count * p_b); + if (p_a < 0.0) { + p_a = 0.0; + } + + // mark packet with probability p_a + if ((*distribution)(generator) <= p_a) { + packet_marked = true; + count = 0; + } + } else if (max_threshold < avg) { + packet_marked = true; + count = 0; + } else { + count = -1; + } + + if (!packet_marked) { + if (packet_queue.size() >= parameter_buffer_size.get()) { + packets_dropped++; + return; + } + packet_queue.emplace(packet); + + output_port.notify(); + } else { + if(ecn_mode && (ecn == ECT0 || ecn == ECT1)) { + packets_marked++; + packet->setECN(CONGESTION_EXPERIENCED); + packet_queue.emplace(packet); + + output_port.notify(); + } else { + packets_dropped++; + } + } + return; +} + +std::shared_ptr RedQueueModule::dequeue() { + shared_ptr packet; + if (!packet_queue.empty()) { + packet = packet_queue.front(); + packet_queue.pop(); + + if (packet_queue.empty()) { + q_time = chrono::high_resolution_clock::now(); + } + } + return packet; +} + +void RedQueueModule::statistics(const boost::system::error_code &error) { + if (error == boost::asio::error::operation_aborted) { + return; + } + statistic_drop_prob.set(p_a); + statistic_avg.set(avg); + statistic_count.set(count); + statistic_queue_length.set(packet_queue.size()); + statistic_packets.set(packets); + statistic_packets_dropped.set(packets_dropped); + statistic_packets_marked.set(packets_marked); + + timer_statistics.expires_at(timer_statistics.expiry() + + chrono::milliseconds(100)); + timer_statistics.async_wait(boost::bind(&RedQueueModule::statistics, this, + boost::asio::placeholders::error)); +} diff --git a/src/modules/queue/RedQueueModule.hpp b/src/modules/queue/RedQueueModule.hpp new file mode 100644 index 0000000..5792b5d --- /dev/null +++ b/src/modules/queue/RedQueueModule.hpp @@ -0,0 +1,98 @@ +/* + * FlowEmu - Flow-Based Network Emulator + * Copyright (c) 2022 Institute of Communication Networks (ComNets), + * Hamburg University of Technology (TUHH), + * https://www.tuhh.de/comnets + * Copyright (c) 2022 Jesper Dell Missier + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + + +#ifndef RED_QUEUE_MODULE_HPP +#define RED_QUEUE_MODULE_HPP + +#define CONGESTION_EXPERIENCED 3 +#define ECT0 2 +#define ECT1 1 + +#include +#include +#include +#include +#include +#include + +#include "../../utils/Packet.hpp" +#include "../Module.hpp" + +class RedQueueModule : public Module { +public: + RedQueueModule(boost::asio::io_service &io_service, size_t buffer_size, + double queue_weight, uint32_t min_threshold_input, + uint32_t max_threshold_input, double max_p, + double transmission_time, uint32_t seed = 1); + + const char *getType() const { return "red_queue"; } + +private: + ReceivingPort> input_port; + RespondingPort> output_port; + + ParameterDouble parameter_buffer_size = { + 100, 1, std::numeric_limits::quiet_NaN(), 1}; + ParameterBool parameter_ecn_mode = false; + ParameterDouble parameter_queue_weight = {0.002, 0.001, 0.005, 0.001}; + ParameterDouble parameter_min_threshold = {15, 1, 100, 1}; + ParameterDouble parameter_max_threshold = {45, 1, 100, 1}; + ParameterDouble parameter_max_p = {0.1, 0.1, 0.1, 0.1}; + ParameterDouble parameter_transmission_time = { + 1.0, 0.001, std::numeric_limits::quiet_NaN(), 0.01}; + ParameterDouble parameter_seed = {1, 0, std::numeric_limits::quiet_NaN(), 1}; + + bool ecn_mode; + + uint64_t min_threshold; + uint64_t max_threshold; + + uint64_t packets; + uint64_t packets_dropped; + uint64_t packets_marked; + + Statistic statistic_drop_prob; + Statistic statistic_avg; + Statistic statistic_count; + Statistic statistic_queue_length; + Statistic statistic_packets; + Statistic statistic_packets_dropped; + Statistic statistic_packets_marked; + + double avg = 0; + int32_t count = -1; + std::chrono::time_point q_time; + double p_a = 0; + + std::mt19937 generator; + std::unique_ptr> distribution; + + std::queue> packet_queue; + + void receivePacket(std::shared_ptr packet); + std::shared_ptr dequeue(); + + boost::asio::high_resolution_timer timer_statistics; + void statistics(const boost::system::error_code &error); +}; + +#endif diff --git a/webui/index.html b/webui/index.html index 8899918..4f04d65 100644 --- a/webui/index.html +++ b/webui/index.html @@ -302,6 +302,42 @@ fifo_queue.addContentItem(flow); queue_group.addNode(fifo_queue); + var codel_queue = new Node("codel_queue"); + codel_queue.setType("codel_queue"); + codel_queue.setTitle("CoDel Queue"); + var flow = new NodeContentFlow(); + flow.addPort("left", new Port("in", "receiving", "In")); + flow.addPort("right", new Port("out", "responding", "Out")); + codel_queue.addContentItem(flow) + queue_group.addNode(codel_queue); + + var pie_queue = new Node("pie_queue"); + pie_queue.setType("pie_queue"); + pie_queue.setTitle("PIE Queue"); + var flow = new NodeContentFlow(); + flow.addPort("left", new Port("in", "receiving", "In")); + flow.addPort("right", new Port("out", "responding", "Out")); + pie_queue.addContentItem(flow) + queue_group.addNode(pie_queue); + + var pi2_queue = new Node("pi2_queue"); + pi2_queue.setType("pi2_queue"); + pi2_queue.setTitle("PI2 Queue"); + var flow = new NodeContentFlow(); + flow.addPort("left", new Port("in", "receiving", "In")); + flow.addPort("right", new Port("out", "responding", "Out")); + pi2_queue.addContentItem(flow) + queue_group.addNode(pi2_queue); + + var red_queue = new Node("red_queue"); + red_queue.setType("red_queue"); + red_queue.setTitle("RED Queue"); + var flow = new NodeContentFlow(); + flow.addPort("left", new Port("in", "receiving", "In")); + flow.addPort("right", new Port("out", "responding", "Out")); + red_queue.addContentItem(flow) + queue_group.addNode(red_queue); + node_library.addGroup(queue_group);