Skip to content

Commit

Permalink
Merge active queue management algorithms
Browse files Browse the repository at this point in the history
  • Loading branch information
dstolpmann committed Mar 13, 2024
2 parents 4f6c102 + e09b3ae commit 40b48e7
Show file tree
Hide file tree
Showing 12 changed files with 1,358 additions and 1 deletion.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ Via these interfaces, the user can change the structure and parameters of the mo
* Delay:
* Fixed delay
* Trace (compatible with [Link'Em](https://github.com/sys-uos/linkem))
* Queues:
* Queues and Active Queue Management (AQM) algorithms:
* FIFO
* RED
* CoDel
* PIE
* PI2
* Rate limiters / departure processes:
* Bitrate
* Fixed interval
Expand Down
4 changes: 4 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ set(flowemu_SRCS
modules/meter/DelayMeter.cpp
modules/meter/ThroughputMeter.cpp
modules/null/NullModule.cpp
modules/queue/CodelQueueModule.cpp
modules/queue/FifoQueueModule.cpp
modules/queue/PieQueueModule.cpp
modules/queue/Pi2QueueModule.cpp
modules/queue/RedQueueModule.cpp
modules/rate/BitrateRateModule.cpp
modules/rate/FixedIntervalRateModule.cpp
modules/rate/TraceRateModule.cpp
Expand Down
12 changes: 12 additions & 0 deletions src/modules/ModuleManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
#include "meter/DelayMeter.hpp"
#include "meter/ThroughputMeter.hpp"
#include "null/NullModule.hpp"
#include "queue/CodelQueueModule.hpp"
#include "queue/FifoQueueModule.hpp"
#include "queue/PieQueueModule.hpp"
#include "queue/Pi2QueueModule.hpp"
#include "queue/RedQueueModule.hpp"
#ifdef MACHINE_LEARNING
#include "queue/DQLQueueModule.hpp"
#endif
Expand Down Expand Up @@ -118,8 +122,16 @@ void ModuleManager::addModule(const string &id, const Json::Value &json_root, bo
} else if(type == "dql_queue") {
new_module = make_shared<DQLQueueModule>(io_service, 100, 0.001);
#endif
} else if(type == "codel_queue") {
new_module = make_shared<CodelQueueModule>(io_service, 100, 5);
} else if(type == "fifo_queue") {
new_module = make_shared<FifoQueueModule>(io_service, 100);
} else if(type == "pie_queue") {
new_module = make_shared<PieQueueModule>(io_service, 100, 15, 150);
} else if(type == "pi2_queue") {
new_module = make_shared<Pi2QueueModule>(io_service, 100, 15, 100);
} else if(type == "red_queue") {
new_module = make_shared<RedQueueModule>(io_service, 100, 0.002, 15, 45, 0.1, 1.0);
} else if(type == "bitrate_rate") {
new_module = make_shared<BitrateRateModule>(io_service, 1000000);
} else if(type == "fixed_interval_rate") {
Expand Down
208 changes: 208 additions & 0 deletions src/modules/queue/CodelQueueModule.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* FlowEmu - Flow-Based Network Emulator
* Copyright (c) 2022 Institute of Communication Networks (ComNets),
* Hamburg University of Technology (TUHH),
* https://www.tuhh.de/comnets
* Copyright (c) 2022 Jesper Dell Missier <[email protected]>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

/*
* Implemented using:
* Nichols, Kathleen and Van Jacobson (2012) Appendix: CoDel pseudocode. https://queue.acm.org/appendices/codel.html
* Nichols, Kathleen et al. (2018) Controlled Delay Active Queue Management. RFC 8289.
*/

#include "CodelQueueModule.hpp"

#include <boost/bind.hpp>
#include <cmath>

using namespace std;

CodelQueueModule::CodelQueueModule(boost::asio::io_service &io_service,
size_t buffer_size, size_t target_queue_time)
: timer_statistics(io_service) {
setName("CoDelQueue");
addPort({"in", "In", PortInfo::Side::left, &input_port});
addPort({"out", "Out", PortInfo::Side::right, &output_port});
addParameter({"ecn_mode", "ECN Mode", "", &parameter_ecn_mode});
addParameter({"buffer_size", "Buffer", "packets", &parameter_buffer_size});
addParameter({"target_queue_time", "Target Queueing Time", "ms",
&parameter_target_queue_time});
addStatistic({"queue_length", "Queue", "packets", &statistic_queue_length});
addStatistic({"count", "Count", "packets", &statistic_count});
addStatistic(
{"sojourn_time", "Packet Sojourn Time", "ms", &statistic_sojourn_time});
addStatistic({"packets", "Packets Received", "", &statistic_packets});
addStatistic(
{"packets_dropped", "Packets Droppped", "", &statistic_packets_dropped});
addStatistic(
{"packets_marked", "Packets Marked", "", &statistic_packets_marked});

input_port.setReceiveHandler(
bind(&CodelQueueModule::enqueue, this, placeholders::_1));
output_port.setRequestHandler(bind(&CodelQueueModule::dequeue, this));

parameter_buffer_size.set(buffer_size);

parameter_ecn_mode.addChangeHandler([&](bool value) { ecn_mode = value; });
parameter_ecn_mode.callChangeHandlers();

parameter_target_queue_time.addChangeHandler(
[&](double value) { target = ms(static_cast<uint64_t>(value)); });
parameter_target_queue_time.callChangeHandlers();

packets = 0;
packets_dropped = 0;
packets_marked = 0;

// Start statistics timer
timer_statistics.expires_from_now(ms(0));
timer_statistics.async_wait(boost::bind(&CodelQueueModule::statistics, this,
boost::asio::placeholders::error));
}

void CodelQueueModule::enqueue(shared_ptr<Packet> packet) {
packets++;
if (packet_queue_timestamp.size() >= parameter_buffer_size.get()) {
packets_dropped++;
return;
}

auto now = chrono::high_resolution_clock::now();
auto packet_timestamp = packet_with_timestamp{packet, now};
packet_queue_timestamp.emplace(packet_timestamp);

output_port.notify();

return;
}

std::shared_ptr<Packet> CodelQueueModule::dequeue() {
auto now = chrono::high_resolution_clock::now();
dodequeue_result r = dodequeue(now);
bool ecn_capable_packet = false;

if (r.packet == nullptr) {
return r.packet;
}

if (ecn_mode) {
uint8_t ecn = r.packet->getECN();
if (ecn == 1 || ecn == 2) {
ecn_capable_packet = true;
}
}

if (dropping_) {
if (!r.ok_to_drop) {
dropping_ = false;
}
while (now >= drop_next_ && dropping_) {
count++;
if (ecn_capable_packet) {
r.packet->setECN(CONGESTION_EXPERIENCED);
drop_next_ = control_law(drop_next_, count);
packets_marked++;
goto end;
}
r = dodequeue(now);
packets_dropped++;
if (!r.ok_to_drop) {
dropping_ = false;
} else {
drop_next_ = control_law(drop_next_, count);
}
}
} else if (r.ok_to_drop) {
if (ecn_capable_packet) {
r.packet->setECN(CONGESTION_EXPERIENCED);
packets_marked++;
} else {
r = dodequeue(now);
packets_dropped++;
}

dropping_ = true;

if (now - drop_next_ < 16 * INTERVAL) {
count = count > 2 ? count - 2 : 1;
} else {
count = 1;
}
drop_next_ = control_law(now, count);
}
end:
return r.packet;
}

CodelQueueModule::time_point CodelQueueModule::control_law(time_point t,
uint32_t count) {
return t + std::chrono::nanoseconds(static_cast<int>(
std::chrono::duration_cast<std::chrono::nanoseconds>(INTERVAL)
.count() /
sqrt(count)));
}

CodelQueueModule::dodequeue_result CodelQueueModule::dodequeue(time_point now) {
packet_with_timestamp packet_timestamped;
shared_ptr<Packet> packet;
packet_timestamped = packet_with_timestamp{packet, now};

if (!packet_queue_timestamp.empty()) {
packet_timestamped = packet_queue_timestamp.front();
packet_queue_timestamp.pop();
}

dodequeue_result r = {packet_timestamped.packet, false};
if (r.packet == nullptr) {
// queue is empty - we can't be above TARGET
first_above_time_ = time_point(0ms);
return r;
}

sojourn_time = now - packet_timestamped.arrival_time;
if ((sojourn_time < target) || (packet_queue_timestamp.size() <= MAXPACKET)) {
first_above_time_ = time_point(0ms);
} else {
if (first_above_time_ == time_point(0ms)) {
first_above_time_ = now + INTERVAL;
} else if (now >= first_above_time_) {
r.ok_to_drop = true;
}
}
return r;
}

void CodelQueueModule::statistics(const boost::system::error_code &error) {
if (error == boost::asio::error::operation_aborted) {
return;
}

statistic_queue_length.set(packet_queue_timestamp.size());
statistic_count.set(count);
statistic_sojourn_time.set(
std::chrono::duration_cast<std::chrono::milliseconds>(sojourn_time)
.count());
statistic_packets.set(packets);
statistic_packets_dropped.set(packets_dropped);
statistic_packets_marked.set(packets_marked);

timer_statistics.expires_at(timer_statistics.expiry() +
chrono::milliseconds(1));
timer_statistics.async_wait(boost::bind(&CodelQueueModule::statistics, this,
boost::asio::placeholders::error));
}
105 changes: 105 additions & 0 deletions src/modules/queue/CodelQueueModule.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* FlowEmu - Flow-Based Network Emulator
* Copyright (c) 2022 Institute of Communication Networks (ComNets),
* Hamburg University of Technology (TUHH),
* https://www.tuhh.de/comnets
* Copyright (c) 2022 Jesper Dell Missier <[email protected]>
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/


#ifndef CODEL_QUEUE_MODULE_HPP
#define CODEL_QUEUE_MODULE_HPP

#define CONGESTION_EXPERIENCED 3

#include <atomic>
#include <boost/asio.hpp>
#include <chrono>
#include <memory>
#include <queue>
#include <random>

#include "../../utils/Packet.hpp"
#include "../Module.hpp"

class CodelQueueModule : public Module {
typedef std::chrono::high_resolution_clock::time_point time_point;
typedef std::chrono::milliseconds ms;

typedef struct packet_with_timestamp {
std::shared_ptr<Packet> packet;
time_point arrival_time;
} packet_with_timestamp;

typedef struct dodequeue_result {
std::shared_ptr<Packet> packet;
bool ok_to_drop;
} dodequeue_result;

public:
CodelQueueModule(boost::asio::io_service &io_service, size_t buffer_size,
size_t target_queue_time);

const char *getType() const { return "codel_queue"; }

private:
const ms INTERVAL = ms(100);
const uint32_t MAXPACKET = 1;

bool ecn_mode;
ms target = ms(15);
time_point first_above_time_ = time_point(ms(0));
time_point drop_next_ = time_point(ms(0));
std::chrono::nanoseconds sojourn_time = std::chrono::nanoseconds(0);
uint32_t count = 0;
uint32_t lastcount = 0;
bool dropping_ = false;

ReceivingPort<std::shared_ptr<Packet>> input_port;
RespondingPort<std::shared_ptr<Packet>> output_port;

ParameterBool parameter_ecn_mode = false;
ParameterDouble parameter_buffer_size = {
100, 1, std::numeric_limits<double>::quiet_NaN(), 1};

ParameterDouble parameter_target_queue_time = {
5, 0, std::numeric_limits<double>::quiet_NaN(), 1};

Statistic statistic_queue_length;
Statistic statistic_count;
Statistic statistic_sojourn_time;
Statistic statistic_packets;
Statistic statistic_packets_dropped;
Statistic statistic_packets_marked;

uint64_t packets;
uint64_t packets_dropped;
uint64_t packets_marked;

std::queue<packet_with_timestamp> packet_queue_timestamp;

void enqueue(std::shared_ptr<Packet> packet);
std::shared_ptr<Packet> dequeue();

time_point control_law(time_point t, uint32_t count);

dodequeue_result dodequeue(time_point now);

boost::asio::high_resolution_timer timer_statistics;
void statistics(const boost::system::error_code &error);
};

#endif
Loading

0 comments on commit 40b48e7

Please sign in to comment.