From c1b9dd0476c8a3b983ad628059438ef72332b7f6 Mon Sep 17 00:00:00 2001 From: antonio Date: Wed, 15 Mar 2023 11:09:31 +0100 Subject: [PATCH] EVRAES Powerlaw model --- EVRAESMultiProcessingDD.py | 202 +++++++++++++++++++++++++++++ src/Graphs/Objects/MultipleEdge.py | 191 ++++++++++++++++++++++++++- src/StastModules/Snapshot.py | 50 ++++++- 3 files changed, 436 insertions(+), 7 deletions(-) create mode 100644 EVRAESMultiProcessingDD.py diff --git a/EVRAESMultiProcessingDD.py b/EVRAESMultiProcessingDD.py new file mode 100644 index 0000000..d81d5bd --- /dev/null +++ b/EVRAESMultiProcessingDD.py @@ -0,0 +1,202 @@ +import multiprocessing +from src.Graphs.Objects.MultipleEdge import DynamicGraph +from src.StastModules.Snapshot import get_snapshot_dynamic_dd +from src.FileOperations.WriteOnFile import create_file, create_folder, write_on_file_contents + +import math as mt +import logging +import pandas as pd + +logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.DEBUG) + + +def worker(data, return_dict): + def check_convergence_dynamic(): + flood_dictionary = {} + # print(G.get_converged()) + if (G.get_converged()): + if (G.flooding.get_initiator() == -1): + # print("STARTING FLOODING") + logging.info("Flooding started for simulation %r ", data['sim']) + G.set_flooding() + G.flooding.set_stop_time(mt.floor(mt.log(G.get_target_n(), 2))) + G.flooding.set_initiator() + G.flooding.update_flooding(G) + else: + + # Updating Flooding + # if (G.flooding.get_t_flood() == 1): + # print("Flooding protocol STARTED %r" % (G.flooding.get_started())) + if (G.flooding.get_started() == True): + G.flooding.update_flooding(G) + + if (not G.flooding.check_flooding_status()): + G.set_converged(True) + if (G.flooding.get_number_of_restart() == 0): + # print("All the informed nodes left the network") + # print("Flooding Protocol status: Failed") + # print("----------------------------------------------------------------") + G.flooding.set_converged(False) + G.flooding.set_failed(True) + # if (G.flooding.get_converged()): + # print("AL NODES IN THE NETWORK ARE INFORMED") + # print("Number of informed nodes %d" % (G.flooding.get_informed_nodes())) + # print("Number of uninformed nodes %d " % (G.flooding.get_uninformed_nodes())) + # print("Percentage of informed nodes %r" % (G.flooding.get_percentage())) + # print("Informed Ratio: %r" % (G.flooding.get_last_ratio())) + # print("Flooding Protocol status: Correctly Terminated") + # print("Flooding time: %d" % (G.flooding.get_t_flood())) + # print("----------------------------------------------------------------") + + threshold = G.get_target_n() + if (G.flooding.get_t_flood() > 100 * threshold): + # print("Iterations > threshold") + # print("The Flooding protocol is too slow, stopping the simulation") + # print("Number of informed nodes %d " % (G.flooding.get_informed_nodes())) + # print("Number of uninformed nodes %d " % (G.flooding.get_uninformed_nodes())) + # print("Percentage of informed nodes %r" % (G.flooding.get_percentage())) + # print("Informed Ratio: %r" % (G.flooding.get_last_ratio())) + # print("Flooding Protocol status: Failed") + # print("Number of executed steps: %d Step threshold: %d" % ( + # G.flooding.get_t_flood(), threshold)) + # print("----------------------------------------------------------------") + G.set_converged(True) + G.flooding.set_converged(False) + G.flooding.set_failed(True) + + flood_dictionary['informed_nodes'] = G.flooding.get_informed_nodes() + flood_dictionary['uninformed_nodes'] = G.flooding.get_uninformed_nodes() + flood_dictionary['percentage_informed'] = G.flooding.get_percentage() + flood_dictionary['t_flood'] = G.flooding.get_t_flood() + flood_dictionary['process_status'] = G.get_converged() + flood_dictionary['flood_status'] = G.flooding.get_converged() + flood_dictionary['initiator'] = G.flooding.get_initiator() + + else: + flood_dictionary['informed_nodes'] = 0 + flood_dictionary['uninformed_nodes'] = len(G.get_list_of_nodes()) + flood_dictionary['percentage_informed'] = 0 + flood_dictionary['t_flood'] = 0 + flood_dictionary['process_status'] = G.get_converged() + flood_dictionary['flood_status'] = G.flooding.get_converged() + flood_dictionary['initiator'] = G.flooding.get_initiator() + + return (flood_dictionary) + + """worker function""" + final_stats = [] + + c = data["c"] + gamma = data["gamma"] + inrate = data["inrate"] + outrate = data["outrate"] + edge_falling_rate = data["edge_falling_rate"] + # sim = data["sim"] + max_iter = data["max_iter"] + G = DynamicGraph(0, 3, c, inrate, outrate, edge_falling_rate,gamma=gamma) + t = 0 + achieved = False + repeat = True + sim = { + "simulation": data["sim"], + "pl_exponent":data["gamma"] + } + while (repeat): + + G.disconnect_from_network() + G.connect_to_network_dd() + # print("NODES IN new ",G.get_nodes_t()) + + G.add_phase_vd_dd() + G.del_phase_vd_dd() + if (edge_falling_rate != 0): + G.random_fall() + + if (not achieved): + if (G.get_target_density()): + # print("The Graph contains the desired number of nodes") + achieved = True + # print("CI SONO") + G.set_converged(True) + stats = get_snapshot_dynamic_dd(G, G.get_c(),G.get_vd_dd(), t) + flood_info = check_convergence_dynamic() + conv_perc = {"conv_percentage": (G.get_semiregular_percentage())} + final_stats.append({**sim, **conv_perc, **stats, **flood_info}) + else: + stats = get_snapshot_dynamic_dd(G, G.get_c(),G.get_vd_dd(), t) + flood_info = check_convergence_dynamic() + conv_perc = {"conv_percentage": (G.get_semiregular_percentage())} + final_stats.append({**sim, **conv_perc, **stats, **flood_info}) + + else: + stats = get_snapshot_dynamic_dd(G, G.get_c(),G.get_vd_dd(), t) + flood_info = check_convergence_dynamic() + conv_perc = {"conv_percentage": (G.get_semiregular_percentage())} + final_stats.append({**sim, **conv_perc, **stats, **flood_info}) + + if (G.flooding.get_t_flood() == max_iter): + logging.info("Flooding protocol simulation %r: CONVERGED" % data["sim"]) + repeat = False + if (G.flooding.get_failed()): + repeat = False + logging.info("Flooding protocol simulation %r: FAILED" % data["sim"]) + t += 1 + + # print(G.flooding.get_list_of_informed_ndoes()) + # print(str(sim) + " represent!") + return_dict[sim['simulation']] = final_stats + + +if __name__ == "__main__": + c_list = [1.5] + n_list = [256] + outrate_list = [0.5] + inrate_list = [] + for n in n_list: + for q in outrate_list: + inrate_list.append((n * q,q)) + + #probs_list = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1] + probs_list = [0.0] + exponent = [2,2.3,2.5,2.7,3] + + #probs_list = [0.0] + + outPath = "./tmp/" + for c in c_list: + for inrate in inrate_list: + #for outrate in outrate_list: + for probs in probs_list: + for ex in exponent: + data = { + "c": c, + "inrate": inrate[0], + "outrate": inrate[1], + "edge_falling_rate": probs, + "max_iter": 100, + "gamma":ex + } + name = "MixedDynamic_c_" + str(c) + "_inrate_" + str( + inrate[0]) + "_outrate_" + str(inrate[1]) + "_p_" + str(probs)+"_g_"+str(ex) + outpath = create_folder(outPath, name) + logging.info("EXECUTING: %r " % name) + manager = multiprocessing.Manager() + return_dict = manager.dict() + jobs = [] + for i in range(10): + data["sim"] = i + p = multiprocessing.Process(target=worker, args=(data, return_dict)) + jobs.append(p) + p.start() + + for proc in jobs: + proc.join() + + reduced = [] + for key in return_dict: + reduced.extend(return_dict[key]) + df = pd.DataFrame(reduced) + + df.to_csv(outpath + "results.csv") + + # print(return_dict.values()) \ No newline at end of file diff --git a/src/Graphs/Objects/MultipleEdge.py b/src/Graphs/Objects/MultipleEdge.py index 9375153..a860c9a 100644 --- a/src/Graphs/Objects/MultipleEdge.py +++ b/src/Graphs/Objects/MultipleEdge.py @@ -6,13 +6,15 @@ import scipy.sparse import threading from concurrent.futures import ThreadPoolExecutor +from networkx.algorithms.graphical import is_graphical +from networkx.utils.random_sequence import powerlaw_sequence from src.MathModules.MathTools import flip from src.Protocols.FloodOBJ import Flooding from src.Protocols.Consensus import Consensus class DynamicGraph: - def __init__(self,n = 0,d = 4,c = 1.5,lam = 1 ,beta = 1,falling_probability = 0,model = "Multiple" ,starting_edge_list = [],edge_birth_rate = None, edge_death_rate = None,degree_sequence = []): + def __init__(self,n = 0,d = 4,c = 1.5,lam = 1 ,beta = 1,falling_probability = 0,model = "Multiple" ,starting_edge_list = [],edge_birth_rate = None, edge_death_rate = None,degree_sequence = [],gamma=None): """ :param n: int, number of nodes. @@ -53,6 +55,8 @@ def __init__(self,n = 0,d = 4,c = 1.5,lam = 1 ,beta = 1,falling_probability = 0, self.time_conv = 0 self.reset_number = 0 self.t = 0 + self.gamma = gamma + self.vd_dd = {} self.max_label = -1 if(starting_edge_list): self.G.add_edges_from(starting_edge_list) @@ -121,6 +125,8 @@ def get_number_of_entering_nodes_at_each_round(self): return(self.number_of_entering_nodes_at_each_round) def get_target_density(self): return(self.target_density) + def get_vd_dd(self): + return self.vd_dd def get_target_n(self): return(self.target_n) def get_n(self): @@ -523,6 +529,98 @@ def connect_to_network(self): self.set_number_of_entering_nodes_at_each_round(X_t) + + + # Function for nodes accessing the network following a Poisson Flow + def connect_to_network_dd(self): + # Poisson Process of parameter Lambda for the number of nodes accessing in the network + X_t = np.random.poisson(self.inrate) + + # Getting the maximum label in the Graph + #nodes = self.get_list_of_nodes() + # if(nodes): + # max_label = max(nodes) + # else: + # max_label = -1 + # Defining labels of the new nodes + entering_nodes = [] + # for i in range(X_t): + # entering_nodes.append(max_label + 1) + # max_label += 1 + for i in range(X_t): + #entering_nodes.append(str(self.max_label + 1)) + entering_nodes.append(self.max_label + 1) + + self.max_label += 1 + + # Setting new target degree distribution + degs = [self.vd_dd[u] for u in self.G.nodes()] + if (len(degs)>0 or (len(degs) == 0 and X_t >= 2)): + iterate = True + new_derees = [] + j= 0 + #while iterate: + new_derees = [int(round(d)) for d in powerlaw_sequence(X_t, self.gamma)] + dd = degs + new_derees + #if is_graphical(dd): + i = 0 + for u in entering_nodes: + self.vd_dd[u] = new_derees[i] + i+=1 + #iterate = False + + else: + for u in entering_nodes: + self.vd_dd[u] = int(round(powerlaw_sequence(X_t, self.gamma))) + + + # Adding the list of nodes in the Graph + self.G.add_nodes_from(entering_nodes) + + self.entering_nodes = entering_nodes + if (self.flooding.get_started() ): + self.flooding.add_nodes_to_dictionary(entering_nodes) + + if(self.consensus.get_started()): + new_nodes_colors = [] + for i in entering_nodes: + if (flip(self.consensus_bias[0]) == 'H'): + new_nodes_colors.append(0) + else: + new_nodes_colors.append(1) + self.consensus.add_nodes_to_dictionary(entering_nodes,new_nodes_colors) + + if(not self.target_density): + + self.target_density = self.target_size_achieved() + + self.set_max_label(X_t) + + self.set_number_of_entering_nodes_at_each_round(X_t) + + + # Function that delete agets from the network with probability q + def disconnect_from_network_dd(self): + # Random variable that counts the number of nodes that exits the network + Z_t = 0 + # List of exiting nodes + exiting_nodes = [] + # Doing a Bernoulli experiment for each node of the network + # With probability q a node leave the Graph + nodes = self.get_list_of_nodes() + for u in nodes: + if (flip(self.outrate) == "H"): + self.G.remove_node(u) + del self.vd_dd[u] + exiting_nodes.append(u) + Z_t += 1 + if (self.flooding.get_started()): + self.flooding.del_nodes_from_dictionary(exiting_nodes) + + if(self.consensus.get_started()): + self.consensus.del_nodes_from_dictionary(exiting_nodes) + self.set_number_of_exiting_nodes_at_each_round(Z_t) + # Function that delete agets from the network with probability q def disconnect_from_network(self): # Random variable that counts the number of nodes that exits the network @@ -586,7 +684,8 @@ def add_phase_vd(self): sample_size = self.get_sample_add_phase(neighbors) v_sample = rnd.choices(app,k=sample_size) # Adding the edge (i,v) to the graph - edge_list.append((i, int(v_sample[0]))) + for x in v_sample: + edge_list.append((i, int(x))) # If is not the first round and there are nodes in the network, # then the new nodes sample d vertices in the network @@ -618,7 +717,61 @@ def add_phase_vd(self): self.G.add_edges_from(list(set(preprocessed))) - # Del phase where nodes with |N(u)|>c*d choose u.a.r. a list of nodes in there Neighborhood and disconnect from it + def add_phase_vd_dd(self): + + nodes = list(set(self.G.nodes())-set(self.entering_nodes)) + edge_list = [] + + + + for i in nodes: + + neighbors = [n for n in self.G.neighbors(i)] + if (len(neighbors) < self.vd_dd[i]): + # Calculating the set of the elements over random sampling + if (len(neighbors) > 0): + app = list(set(nodes) - set(neighbors) - set([i])) + else: + app = list(set(nodes) - set([i])) + if (app): + # Converting in int the element sampled over the list + # Calculating the sample size + sample_size = self.get_sample_add_phase_dd(neighbors,self.vd_dd[i]) + v_sample = rnd.choices(app,k=sample_size) + # Adding the edge (i,v) to the graph + for x in v_sample: + edge_list.append((i, int(x))) + # If is not the first round and there are nodes in the network, + # then the new nodes sample d vertices in the network + + if (self.t > 0 and len(nodes) > 0): + + # New nodes are connecting to the network + survived_entering_nodes = list(set(self.entering_nodes).intersection(set(self.G.nodes()))) + if (len(survived_entering_nodes) > 0): + + for i in survived_entering_nodes: + sample_size = self.get_sample_add_phase_dd([],self.vd_dd[i]) + + v_sample = rnd.choices(nodes, k=sample_size) + for x in v_sample: + #edge_list.append((i, str(x))) + edge_list.append((i, int(x))) + + + + #print(edge_list) + # Now we have to transform the directed edge list in ad undirected edge list + preprocessed = [] + for i in edge_list: + if int(i[0] )> int(i[1]): + preprocessed.append((i[1], i[0])) + else: + preprocessed.append((i[0], i[1])) + # Adding the undirected edge list to the graph + self.G.add_edges_from(list(set(preprocessed))) + + # Del phase where nodes with |N(u)|>c*d choose u.a.r. a list of nodes in there Neighborhood and disconnect from it def del_phase_vd(self): self.t += 1 @@ -651,6 +804,38 @@ def del_phase_vd(self): #self.G.remove_edges_from(list(preprocessed)) + def del_phase_vd_dd(self): + self.t += 1 + nodes = list(set(self.G.nodes())-set(self.entering_nodes)) + edge_list = [] + lista_rimozioni = [] + for i in nodes: + neig = [n for n in self.G.neighbors(i)] + if (len(neig) > self.c * self.vd_dd[i]): + # Calculating the sample size + sample_size = self.get_sample_del_phase_dd(neig,int(self.c *self.vd_dd[i])) + # Sampling a node from the neighborhood + v_sample = rnd.choices(neig, k=sample_size) + + lista_rimozioni.append(i) + # Adding the samples to the list of nodes to remove + for x in v_sample: + #edge_list.append((str(i), str(x))) + edge_list.append((int(i), int(x))) + + # Now we have to transform the directed edge list in ad undirected edge list + #preprocessed = [] + #set_preprocessed = list(set(edge_list)) + preprocessed = edge_list + # Removing the undirected edge list from the graph + #print(list(set(preprocessed))) + to_delete =list(set(preprocessed)) + + self.G.remove_edges_from(to_delete) + #self.G.remove_edges_from(list(preprocessed)) + + + def edge_markovian(self): edges = self.get_list_of_edges() diff --git a/src/StastModules/Snapshot.py b/src/StastModules/Snapshot.py index 1765ff9..0a03c4a 100644 --- a/src/StastModules/Snapshot.py +++ b/src/StastModules/Snapshot.py @@ -14,7 +14,7 @@ def get_snapshot_dynamic(G,d,c,t): # spectralGap = abs(spectralGap) #if np.iscomplexobj(lambdaNGap): # lambdaNGap = abs(lambdaNGap) - dict = { + dic = { "n":n, "target_n":G.get_target_n(), "d":G.get_d(), @@ -41,7 +41,49 @@ def get_snapshot_dynamic(G,d,c,t): - return (dict) + return (dic) + +def get_snapshot_dynamic_dd(G,c,dd,t): + + #IsInvertible,spectralGap, lambdaNGap = get_spectral_gap_transition_matrix(G.get_G()) + n, avg_deg, stdv, var, semireg, underreg, overreg, vol,diameter,radius = get_graph_properties_dd(G.get_G(),c,dd) + + # Creating Dictionary with all the informations + if (G.isregular()): + regularity = False + else: + regularity = True + #if np.iscomplexobj(spectralGap): + # spectralGap = abs(spectralGap) + #if np.iscomplexobj(lambdaNGap): + # lambdaNGap = abs(lambdaNGap) + dic = { + "n":n, + "target_n":G.get_target_n(), + "c":G.get_c(), + "p":G.get_p(), + "avg_deg":avg_deg, + "stdv":stdv, + "var":var, + "semireg":semireg, + "underreg":underreg, + "overreg":overreg, + "vol":vol, + "diameter":diameter, + "radius":radius, + "regularity":regularity, + "lambda": G.get_inrate(), + "beta":G.get_outrate(), + "model_type":G.get_type_of_dynamic_graph(), + "entering_nodes":G.get_number_of_entering_nodes_at_each_round()[-1], + "exiting_nodes":G.get_number_of_exiting_nodes_at_each_round()[-1], + "t":t + + } + + + + return (dic) def get_snapshot_dynamicND(G,d,c,t): @@ -56,7 +98,7 @@ def get_snapshot_dynamicND(G,d,c,t): # spectralGap = abs(spectralGap) #if np.iscomplexobj(lambdaNGap): # lambdaNGap = abs(lambdaNGap) - dict = { + dic = { "n":n, "target_n":G.get_target_n(), "d":G.get_d(), @@ -81,7 +123,7 @@ def get_snapshot_dynamicND(G,d,c,t): - return (dict) + return (dic) def get_snapshot(G,p,d,c,t,n_in=None,n_out=None):