Skip to content

Commit

Permalink
Add type hints to run script
Browse files Browse the repository at this point in the history
  • Loading branch information
dstolpmann committed Mar 21, 2024
1 parent dd28a68 commit f652a9e
Showing 1 changed file with 61 additions and 58 deletions.
119 changes: 61 additions & 58 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,34 @@
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

import io
import itertools
import json
import os
import re
import subprocess
import sys
import time
import toml
from copy import deepcopy
from datetime import datetime
from threading import Thread

import toml

from typing import Any, Dict, IO, List, Optional, Tuple


start_time = time.time()


class Environment:
path = ""
metadata = {}
operating_modes = {}
config = {}

def __init__(self, path):
def __init__(self, path: str) -> None:
self.path = path

self.metadata: Dict[str, str] = {}
self.operating_modes: Dict[str, Any] = {}
self.config: Dict[str, Any] = {}

# Load config file
try:
with open(self.path + "/config.toml", "r") as config_file:
Expand All @@ -64,32 +68,32 @@ def __init__(self, path):
self.config = environment_toml["config"]

# Run build script
def build(self):
def build(self) -> None:
print("\033[1;33mBuild\033[0m")
status = os.system(self.path + "/build.sh")
if os.WEXITSTATUS(status) != 0:
raise RuntimeError

# Run setup script
def setup(self):
def setup(self) -> None:
print("\033[1;33mSetup\033[0m")
status = os.system(self.path + "/setup.sh")
if os.WEXITSTATUS(status) != 0:
raise RuntimeError

# Run cleanup script
def cleanup(self):
def cleanup(self) -> None:
print("\033[1;33mCleanup\033[0m")
status = os.system(self.path + "/cleanup.sh")
if os.WEXITSTATUS(status) != 0:
raise RuntimeError


class Config:
metadata = {}
testcases = []
def __init__(self, path: str) -> None:
self.metadata: Dict[str, Any] = {}
self.testcases: List[Dict[str, Any]] = []

def __init__(self, path):
# Load test cases file
try:
with open(path, "r") as testcases_file:
Expand Down Expand Up @@ -127,16 +131,14 @@ def __init__(self, path):


class Results:
path = ""

def __init__(self, path, name):
def __init__(self, path: str, name: str) -> None:
# Create results directory
self.path = path + "/" + name + " - " + datetime.fromtimestamp(start_time).strftime("%Y-%m-%d %H-%M-%S")
os.system("mkdir -p \"" + self.path + "\"")

# Write metadata file
def writeMetadata(self, environment_metadata, testcases_metadata):
metadata = {}
def writeMetadata(self, environment_metadata: Dict[str, str], testcases_metadata: Dict[str, str]) -> None:
metadata: Dict[str, Any] = {}
metadata["environment"] = environment_metadata
metadata["testcases"] = testcases_metadata
metadata["timestamp"] = datetime.fromtimestamp(start_time).strftime("%Y-%m-%d %H-%M-%S")
Expand All @@ -147,37 +149,37 @@ def writeMetadata(self, environment_metadata, testcases_metadata):
json.dump(metadata, metadata_file)

# Write test case file
def writeTestcase(self, testcase):
def writeTestcase(self, testcase: Dict[str, Any]) -> None:
with open(self.path + "/testcase_" + testcase["name"] + ".json", "w") as testcase_file:
json.dump(testcase, testcase_file)

# Get results directory path
def getResultsDirectoryPath(self):
def getResultsDirectoryPath(self) -> str:
return self.path

class Process:
cmd = ""
process = None
thread_log = None
thread_error = None
logfile = None
verbose = True

def __init__(self, name, docker_container = None, color = 0):
def __init__(self, name: str, docker_container: Optional[str] = None, color: int = 0) -> None:
self.name = name
self.docker_container = docker_container
self.color = color

self.cmd: str = ""
self.process: Optional[subprocess.Popen[bytes]] = None
self.thread_log: Optional[Thread] = None
self.thread_error: Optional[Thread] = None
self.logfile: Optional[io.TextIOWrapper] = None
self.verbose: bool = True

# Open log file
def setLogfile(self, path):
def setLogfile(self, path: str) -> None:
self.logfile = open(path, "w")

# Set if process output should be printed to the command-line
def setVerbose(self, verbose):
def setVerbose(self, verbose: bool) -> None:
self.verbose = verbose

# Handle output stream
def outputstream(self, stream, level):
def outputstream(self, stream: IO[bytes], level: str) -> None:
while True:
# Get line
line = stream.readline()
Expand All @@ -198,7 +200,7 @@ def outputstream(self, stream, level):
break

# Run command
def run(self, cmd):
def run(self, cmd: str) -> None:
if not self.process:
self.cmd = cmd

Expand All @@ -210,7 +212,7 @@ def run(self, cmd):
self.logfile.write("[" + timestamp + "] " + "Command-line call: " + self.cmd + "\n")
print("\033[1;" + str(int(self.color)) + "m[" + timestamp + " - " + self.name + "] " + "Command-line call: " + self.cmd + "\033[0m\r")

if self.docker_container != None:
if self.docker_container:
# Run command in docker container
self.process = subprocess.Popen("docker exec " + self.docker_container + " " + self.cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, preexec_fn=os.setpgrp)
else:
Expand All @@ -224,7 +226,7 @@ def run(self, cmd):
self.thread_error.start()

# Wait for process to finish, join output stream threads and close log file
def wait(self):
def wait(self) -> None:
# Wait for process to finish
if self.process:
return_code = self.process.wait()
Expand All @@ -248,10 +250,10 @@ def wait(self):
self.logfile = None

# Send stop signal to process and wait
def stop(self):
def stop(self) -> None:
if self.process:
print("Stop " + self.name + "!")
if self.docker_container != None:
if self.docker_container:
os.system("docker top " + self.docker_container + " | grep \"" + self.cmd + "\" | awk '{print $2}' | xargs sudo kill --signal SIGTERM")
else:
self.process.terminate()
Expand All @@ -260,30 +262,31 @@ def stop(self):


class SudoLoop:
thread = None
def __init__(self) -> None:
self.thread: Optional[Thread] = None

# Reset sudo timeout or ask for sudo password
def validate(self):
def validate(self) -> None:
status = os.system("sudo -v")
if os.WEXITSTATUS(status) != 0:
raise RuntimeError("Sudo password is required to continue!")

# Sudo timeout reset loop
def loop(self):
def loop(self) -> None:
while True:
time.sleep(60)
self.validate()

# Ask for sudo password and start sudo timeout reset loop
def run(self):
def run(self) -> None:
self.validate()

self.thread = Thread(target=self.loop, daemon=True)
self.thread.start()


# Get value from nested dictionary if the key exists, otherwise return the default value
def get(dict, keys, default=None):
def get(dict: Dict[str, Any], keys: List[str], default: Any = None) -> Any:
element = dict

for key in keys:
Expand All @@ -296,7 +299,7 @@ def get(dict, keys, default=None):


# Main
def main():
def main() -> None:
# Check command-line arguments for environment config directory
if(len(sys.argv) < 2):
print("Please provide an environment config directory as command-line argument!")
Expand Down Expand Up @@ -383,10 +386,10 @@ def main():
testcase["repetition"] = repetition

# Get module parameters from test case
fixed_parameters = []
fixed_values = []
variable_parameters = []
variable_values = []
fixed_parameters: List[Tuple[str, Any]] = []
fixed_values: List[Any] = []
variable_parameters: List[Tuple[str, Any]] = []
variable_values: List[Any] = []
for module in testcase.keys():
if(isinstance(testcase[module], dict)):
for parameter, value in testcase[module].items():
Expand All @@ -409,9 +412,9 @@ def main():
fixed_values.append(value)

# Run test case for each parameter combination
for variable_values in itertools.product(*variable_values):
for variable_values_combination in itertools.product(*variable_values):
# Get variable parameters
parameters = dict(zip(variable_parameters, variable_values))
parameters = dict(zip(variable_parameters, variable_values_combination))
variable_parameters_name = "_".join([parameter[1].replace("_", "-") + "=" + str(value) for parameter, value in parameters.items()])

# Add fixed parameters
Expand All @@ -437,31 +440,31 @@ def main():
# Setup processes
process_logger = Process("Logger", color=37)
process_control = Process("Control", color=37)
process_source = Process("Source", docker_container=get(environment.config, ("docker_container", "source")), color=34)
process_channel = Process("Channel", docker_container=get(environment.config, ("docker_container", "channel")), color=35)
process_sink = Process("Sink", docker_container=get(environment.config, ("docker_container", "sink")), color=36)
process_source = Process("Source", docker_container=get(environment.config, ["docker_container", "source"]), color=34)
process_channel = Process("Channel", docker_container=get(environment.config, ["docker_container", "channel"]), color=35)
process_sink = Process("Sink", docker_container=get(environment.config, ["docker_container", "sink"]), color=36)

# Start processes
process_logger.setVerbose(False)
process_logger.setLogfile(results_path + ".log")
process_logger.run("mosquitto_sub -h " + get(environment.config, ("mqtt", "host"), "") + " -t '#' -v")
process_logger.run("mosquitto_sub -h " + get(environment.config, ["mqtt", "host"], "") + " -t '#' -v")

process_channel.setLogfile(results_path + "_channel.out")
process_channel.run(get(environment.config, ("run_prefix", "channel"), "") + " " + "flowemu --mqtt-host=" + get(environment.config, ("mqtt", "host"), "") + " --interface-source=" + get(environment.config, ("interface", "source"), "") + " --interface-sink=" + get(environment.config, ("interface", "sink"), "") + graph_file + module_parameters)
process_channel.run(get(environment.config, ["run_prefix", "channel"], "") + " " + "flowemu --mqtt-host=" + get(environment.config, ["mqtt", "host"], "") + " --interface-source=" + get(environment.config, ["interface", "source"], "") + " --interface-sink=" + get(environment.config, ["interface", "sink"], "") + graph_file + module_parameters)

if "sink-command" in testcase and testcase["sink-command"] != "":
process_sink.setLogfile(results_path + "_sink.out")
process_sink.run(get(environment.config, ("run_prefix", "sink"), "") + " " + testcase["sink-command"])
process_sink.run(get(environment.config, ["run_prefix", "sink"], "") + " " + testcase["sink-command"])

time.sleep(1)

if "control-command" in testcase and testcase["control-command"] != "":
process_control.setLogfile(results_path + "_control.out")
process_control.run(get(environment.config, ("run_prefix", "control"), "") + " " + testcase["control-command"])
process_control.run(get(environment.config, ["run_prefix", "control"], "") + " " + testcase["control-command"])

if "source-command" in testcase and testcase["source-command"] != "":
process_source.setLogfile(results_path + "_source.out")
process_source.run(get(environment.config, ("run_prefix", "source"), "") + " " + testcase["source-command"])
process_source.run(get(environment.config, ["run_prefix", "source"], "") + " " + testcase["source-command"])

# Wait for source process to finish before stopping all other processes
if "control-command" in testcase and testcase["control-command"] != "":
Expand All @@ -488,8 +491,8 @@ def main():
try:
print("\033[1;33mRun FlowEmu in interactive mode\033[0m")

process_channel = Process("Channel", docker_container=get(environment.config, ("docker_container", "channel")), color=35)
process_channel.run(get(environment.config, ("run_prefix", "channel"), "") + " " + "flowemu --mqtt-host=" + get(environment.config, ("mqtt", "host"), "") + " --interface-source=" + get(environment.config, ("interface", "source"), "") + " --interface-sink=" + get(environment.config, ("interface", "sink"), ""))
process_channel = Process("Channel", docker_container=get(environment.config, ["docker_container", "channel"]), color=35)
process_channel.run(get(environment.config, ["run_prefix", "channel"], "") + " " + "flowemu --mqtt-host=" + get(environment.config, ["mqtt", "host"], "") + " --interface-source=" + get(environment.config, ["interface", "source"], "") + " --interface-sink=" + get(environment.config, ["interface", "sink"], ""))
process_channel.wait()

# Catch keyboard interrupts
Expand Down

0 comments on commit f652a9e

Please sign in to comment.