diff --git a/Ahc.py b/Ahc.py index 2ff80b97b8ddc0492821b350190da6c6730a7a35..fdc3fdff8fccaab62d0179aa48d4da66332725aa 100755 --- a/Ahc.py +++ b/Ahc.py @@ -211,7 +211,8 @@ class ComponentModel: def on_message_from_peer(self, eventobj: Event): print(f"{EventTypes.MFRP} {self.componentname}.{self.componentinstancenumber}") - def __init__(self, componentname, componentinstancenumber, num_worker_threads=1): + def __init__(self, componentname, componentinstancenumber, context=None, num_worker_threads=1): + self.context = context self.eventhandlers = {EventTypes.INIT: self.on_init, EventTypes.MFRB: self.on_message_from_bottom, EventTypes.MFRT: self.on_message_from_top, EventTypes.MFRP: self.on_message_from_peer} # Add default handlers to all instantiated components. @@ -306,12 +307,12 @@ class Topology: nodes = {} channels = {} - def construct_from_graph(self, G: nx.Graph, nodetype, channeltype): + def construct_from_graph(self, G: nx.Graph, nodetype, channeltype, context): self.G = G nodes = list(G.nodes) edges = list(G.edges) for i in nodes: - cc = nodetype(nodetype.__name__, i) + cc = nodetype(nodetype.__name__, i, context) self.nodes[i] = cc for k in edges: ch = channeltype(channeltype.__name__, str(k[0]) + "-" + str(k[1])) @@ -352,7 +353,7 @@ class Topology: N = len(self.G.nodes) self.compute_forwarding_table() self.nodecolors = ['b'] * N - self.nodepos = nx.drawing.spring_layout(self.G) + # self.nodepos = nx.drawing.spring_layout(self.G) self.lock = Lock() ComponentRegistry().init() @@ -407,8 +408,8 @@ class Topology: def plot(self): #self.lock.acquire() - nx.draw(self.G, self.nodepos, node_color=self.nodecolors, with_labels=True, font_weight='bold') - plt.draw() + # nx.draw(self.G, self.nodepos, node_color=self.nodecolors, with_labels=True, font_weight='bold') + # plt.draw() print(self.nodecolors) #self.lock.release() diff --git a/NetworkLayers/AllSeeingEyeNetworkLayer.py b/NetworkLayers/AllSeeingEyeNetworkLayer.py index c18259c4402cc485fbdd30add4df3f2c9bfbb4a5..dcd4bed5bb0defa6acbefc2a0b51f38edfc79711 100755 --- a/NetworkLayers/AllSeeingEyeNetworkLayer.py +++ b/NetworkLayers/AllSeeingEyeNetworkLayer.py @@ -23,14 +23,15 @@ class AllSeingEyeNetworkLayer(ComponentModel): destination = applmsg.header.messageto nexthop = Topology().get_next_hop(self.componentinstancenumber, destination) if nexthop != float('inf'): - print(f"{self.componentinstancenumber} will SEND a message to {destination} over {nexthop}") + # print(f"{self.componentinstancenumber} will SEND a message to {destination} over {nexthop}") hdr = NetworkLayerMessageHeader(NetworkLayerMessageTypes.NETMSG, self.componentinstancenumber, destination, nexthop) payload = eventobj.eventcontent msg = GenericMessage(hdr, payload) self.send_down(Event(self, EventTypes.MFRT, msg)) else: - print(f"NO PATH: {self.componentinstancenumber} will NOTSEND a message to {destination} over {nexthop}") + pass + # print(f"NO PATH: {self.componentinstancenumber} will NOTSEND a message to {destination} over {nexthop}") def on_message_from_bottom(self, eventobj: Event): msg = eventobj.eventcontent @@ -39,7 +40,7 @@ class AllSeingEyeNetworkLayer(ComponentModel): if hdr.messageto == self.componentinstancenumber or hdr.messageto == MessageDestinationIdentifiers.NETWORKLAYERBROADCAST: # Add if broadcast.... self.send_up(Event(self, EventTypes.MFRB, payload)) - print(f"I received a message to {hdr.messageto} and I am {self.componentinstancenumber}") + # print(f"I received a message to {hdr.messageto} and I am {self.componentinstancenumber}") else: destination = hdr.messageto nexthop = Topology().get_next_hop(self.componentinstancenumber, destination) @@ -49,9 +50,10 @@ class AllSeingEyeNetworkLayer(ComponentModel): newpayload = eventobj.eventcontent.payload msg = GenericMessage(newhdr, newpayload) self.send_down(Event(self, EventTypes.MFRT, msg)) - print(f"{self.componentinstancenumber} will FORWARD a message to {destination} over {nexthop}") + # print(f"{self.componentinstancenumber} will FORWARD a message to {destination} over {nexthop}") else: - print(f"NO PATH {self.componentinstancenumber} will NOT FORWARD a message to {destination} over {nexthop}") + pass + # print(f"NO PATH {self.componentinstancenumber} will NOT FORWARD a message to {destination} over {nexthop}") def __init__(self, componentname, componentinstancenumber): super().__init__(componentname, componentinstancenumber) diff --git a/Paper Notes.md b/Paper Notes.md new file mode 100644 index 0000000000000000000000000000000000000000..c9d597e4a42c3d97a08c31cf530fac314db2ca49 --- /dev/null +++ b/Paper Notes.md @@ -0,0 +1,20 @@ +# Wave algorithm + +## why echo? +- works for cyclic graphs, tree algo only works in acyclic ones. + +# evaluation metrics# possibe evaluation metrics + +- control packet drop in SF is not a big deal, there are N waves. in DS, just one controller exist (initiator). +- detection lateness in SF > DS (smaller is better) +- detection success SF > DS (bigger is better) + +- CPR : control packet ratio = num(control packages) / num(total packages) +- DL : detection latency = tick(algo done) - tick(termination) => ne kadar erken detect etti +- WPR : wave packet ratio = num(wave packages) / num(total packages) +- CPPN : control packets per node = num(control packages) / num(nodes) +- WPPN : wave packets per node = num(wave packages) / num(nodes) + +- ALSO, these metrics can be analyzed dependent on the network structure!! + +- DS/SF detection latency distribution and standard deviation \ No newline at end of file diff --git a/algorithms_orig.py b/algorithms_orig.py new file mode 100644 index 0000000000000000000000000000000000000000..9fceced51d344ce8c1a784a4056b7db07c521ecf --- /dev/null +++ b/algorithms_orig.py @@ -0,0 +1,220 @@ +import os +import queue +import sys +import time +import json +import queue +import random +import networkx as nx +from enum import Enum +import matplotlib.pyplot as plt +from datetime import datetime as dt + +from Channels import P2PFIFOPerfectChannel +from LinkLayers.GenericLinkLayer import LinkLayer +from NetworkLayers.AllSeeingEyeNetworkLayer import AllSeingEyeNetworkLayer +from Ahc import (ComponentModel, Event, ConnectorTypes, Topology, + ComponentRegistry, GenericMessagePayload, GenericMessageHeader, + GenericMessage, EventTypes) + +registry = ComponentRegistry() + +# define your own message types +class ApplicationLayerMessageType(Enum): + BASIC = "basic" + CONTROL = "control" + +class AHCNodeSimulationStatus(Enum): + ACTIVE = "active" + PASSIVE = "passive" + OUT_OF_CLOCK = "ooc" + +# define your own message header structure +class ApplicationLayerMessageHeader(GenericMessageHeader): + pass + +# define your own message payload structure +class ApplicationLayerMessagePayload(GenericMessagePayload): + pass + +class ApplicationLayerComponent(ComponentModel): + def __init__(self, componentname, componentinstancenumber, context): + super().__init__(componentname, componentinstancenumber, context=context) + + self.context = context + # self.eventhandlers[ApplicationLayerMessageType.BASIC] = self.on_basic_message + # self.eventhandlers[ApplicationLayerMessageType.CONTROL] = self.on_control_message + + self.basic_message_queue = queue.Queue(maxsize=-1) + self.control_message_queue = queue.Queue(maxsize=-1) + self.simulation_state = AHCNodeSimulationStatus.PASSIVE + + self.sleep_ms_per_tick = context["ms_per_tick"] + self.simulation_ticks_total = context["simulation_ticks"] + self.alive_for_next_ticks = context["initial_liveness"][componentinstancenumber] + self.communication_on_active_prob = context["communication_on_active_prob"] + self.min_activeness_after_receive = context["min_activeness_after_receive"] + self.max_activeness_after_receive = context["max_activeness_after_receive"] + self.package_process_per_tick = context["node_package_process_per_tick"] + self.die_passiveness_threshold = context["passiveness_death_thresh"] + + if context["hard_stop_on_tick"] is None: + self.hard_stop_on_tick = None + else: + self.hard_stop_on_tick = context["hard_stop_on_tick"][self.componentinstancenumber] + + self.friend_ids = [i for i in context["network"].G.nodes() if i != componentinstancenumber] + + self.__tick_n = 0 + self._passive_counter = 0 + + if self.alive_for_next_ticks > 0: + self.simulation_state = AHCNodeSimulationStatus.ACTIVE + + def prepare_application_layer_message(self, message_type: ApplicationLayerMessageType, destination_node_id: int, payload: object) -> GenericMessage: + hdr = ApplicationLayerMessageHeader(message_type, self.componentinstancenumber, destination_node_id) + payload = ApplicationLayerMessagePayload(payload) + + return GenericMessage(hdr, payload) + + def send_random_basic_message(self, to: int) -> None: + self.send_down(Event(self, EventTypes.MFRT, self.prepare_application_layer_message(ApplicationLayerMessageType.BASIC, to, str(dt.now().timestamp())))) + + def on_init(self, eventobj: Event): + # print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + pass + + def on_message_from_bottom(self, eventobj: Event): + applmessage = eventobj.eventcontent + hdr = applmessage.header + + # print(f"Node-{self.componentinstancenumber}: Node-{hdr.messagefrom} has sent {hdr.messagetype} message (payload: {applmessage.payload})") + + if hdr.messagetype == ApplicationLayerMessageType.BASIC: + self.basic_message_queue.put_nowait(applmessage) + elif hdr.messagetype == ApplicationLayerMessageType.CONTROL: + self.control_message_queue.put_nowait(applmessage) + + def simulation_tick(self): + next_state = None + got_packages_from = None + to_friend = None + + if self.__tick_n >= self.simulation_ticks_total: + next_state = AHCNodeSimulationStatus.OUT_OF_CLOCK + elif self._passive_counter >= self.die_passiveness_threshold: + next_state = AHCNodeSimulationStatus.OUT_OF_CLOCK + elif self.__tick_n >= self.hard_stop_on_tick: + next_state = AHCNodeSimulationStatus.OUT_OF_CLOCK + print(f" ==> N-{self.componentinstancenumber}: HARD STOP") + else: + if self.simulation_state == AHCNodeSimulationStatus.OUT_OF_CLOCK: + next_state = AHCNodeSimulationStatus.OUT_OF_CLOCK + elif self.simulation_state == AHCNodeSimulationStatus.PASSIVE: + if self.basic_message_queue.empty(): + # no incoming package, still passive. + next_state = AHCNodeSimulationStatus.PASSIVE + else: + got_packages_from = [] + + for _ in range(self.package_process_per_tick): + try: + package = self.basic_message_queue.get_nowait() + # print(f"+P+ N-{self.componentinstancenumber} <==BASIC== N-{package.header.messagefrom} ({package.payload.messagepayload})") + got_packages_from.append(package.header.messagefrom) + except queue.Empty: + break + + self.alive_for_next_ticks = random.randint(self.min_activeness_after_receive, self.max_activeness_after_receive) + next_state = AHCNodeSimulationStatus.ACTIVE + elif self.simulation_state == AHCNodeSimulationStatus.ACTIVE: + got_packages_from = [] + + for _ in range(self.package_process_per_tick): + try: + package = self.basic_message_queue.get_nowait() + # print(f"+A+ N-{self.componentinstancenumber} <==BASIC== N-{package.header.messagefrom} ({package.payload.messagepayload})") + got_packages_from.append(package.header.messagefrom) + except queue.Empty: + break + + if random.random() <= self.communication_on_active_prob: + # send package to a random friend.. + to_friend = random.choice(self.friend_ids) + self.send_random_basic_message(to_friend) + + self.alive_for_next_ticks -= 1 + + if self.alive_for_next_ticks == 0: + if len(got_packages_from) > 0: + # got a package, this means immeiate activeness from passive! + next_state = AHCNodeSimulationStatus.ACTIVE + self.alive_for_next_ticks = random.randint(self.min_activeness_after_receive, self.max_activeness_after_receive) + else: + next_state = AHCNodeSimulationStatus.PASSIVE + else: + next_state = AHCNodeSimulationStatus.ACTIVE + + assert next_state is not None + + # ST: state + # NS: next state + # GPF: got packages from + # SPF: sent package to friend + # ANT: alive next ticks + # P2P: packages to process + # print(f" ==> N-{self.componentinstancenumber}: ST: {self.simulation_state}, NS: {next_state}, GPF: {got_packages_from}, SPF: {to_friend}, ANT: {self.alive_for_next_ticks}, P2P: {self.basic_message_queue.qsize()}") + + # time.sleep(self.sleep_ms_per_tick / 1000) + self.__tick_n += 1 + self.simulation_state = next_state + + if self.simulation_state == AHCNodeSimulationStatus.PASSIVE: + self._passive_counter += 1 + elif self.simulation_state == AHCNodeSimulationStatus.ACTIVE: + self._passive_counter = 0 + + return next_state, to_friend + +class AdHocNode(ComponentModel): + def __init__(self, componentname, componentid, context): + self.context = context + # SUBCOMPONENTS + self.appllayer = ApplicationLayerComponent("ApplicationLayer", componentid, context=self.context) + self.netlayer = AllSeingEyeNetworkLayer("NetworkLayer", componentid) + self.linklayer = LinkLayer("LinkLayer", componentid) + # self.failuredetect = GenericFailureDetector("FailureDetector", componentid) + + # CONNECTIONS AMONG SUBCOMPONENTS + self.appllayer.connect_me_to_component(ConnectorTypes.DOWN, self.netlayer) + # self.failuredetect.connectMeToComponent(PortNames.DOWN, self.netlayer) + self.netlayer.connect_me_to_component(ConnectorTypes.UP, self.appllayer) + # self.netlayer.connectMeToComponent(PortNames.UP, self.failuredetect) + self.netlayer.connect_me_to_component(ConnectorTypes.DOWN, self.linklayer) + self.linklayer.connect_me_to_component(ConnectorTypes.UP, self.netlayer) + + # Connect the bottom component to the composite component.... + self.linklayer.connect_me_to_component(ConnectorTypes.DOWN, self) + self.connect_me_to_component(ConnectorTypes.UP, self.linklayer) + + super().__init__(componentname, componentid, context=self.context) + + def on_init(self, eventobj: Event): + # print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + pass + + def on_message_from_top(self, eventobj: Event): + self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent)) + + def on_message_from_bottom(self, eventobj: Event): + self.send_up(Event(self, EventTypes.MFRB, eventobj.eventcontent)) + + def simulation_tick(self): + return self.appllayer.simulation_tick() + + @property + def waiting_packages_on_queue(self): + if self.appllayer.simulation_state == AHCNodeSimulationStatus.OUT_OF_CLOCK: + return 0 + + return self.appllayer.basic_message_queue.qsize() \ No newline at end of file diff --git a/analyze.py b/analyze.py new file mode 100644 index 0000000000000000000000000000000000000000..58befbfb7afae23846eb008b704f9c9ca6410ab7 --- /dev/null +++ b/analyze.py @@ -0,0 +1,366 @@ +import os +import sys +import time +import json +import pickle +import random +import pandas as pd +import seaborn as sns +import networkx as nx +import matplotlib.pyplot as plt +from datetime import datetime as dt + +from Ahc import Topology +from graph import ERG, Grid, Star +from shavit_francez import ShavitFrancezAdHocNode, SFAHCNodeSimulationStatus +from Channels import P2PFIFOPerfectChannel + +metrics = { + "DS": { + "erg": [], + "star": [], + "grid": [] + }, + "SF": { + "erg": [], + "star": [], + "grid": [] + } +} + +latency = { + "DS": { + "erg": [], + "star": [], + "grid": [] + }, + "SF": { + "erg": [], + "star": [], + "grid": [] + } +} + +lnr_distr = { + "DS": { + "erg": None, + "star": None, + "grid": None + }, + "SF": { + "erg": None, + "star": None, + "grid": None + } +} + +snr_distrib = { + "DS": { + "erg": {}, + "star": {}, + "grid": {} + }, + "SF": { + "erg": {}, + "star": {}, + "grid": {} + } +} + + +for fpath in os.listdir("bench_dump/simdump/"): + if fpath.endswith(".pkl"): + algo, _, topo, nodes, _ = fpath.split("_") + nodes = int(nodes) + + # print(f"++ {algo} {topo} {nodes}") + + with open(f"bench_dump/simdump/{fpath}", "rb") as fp: + data = pickle.load(fp) + + cpr = list(data["stats"]["df"]["control_total_cumulative_ratio"])[-1] + cppn = sum(list(data["stats"]["df"]["control_packets_sent"])) / nodes + + if algo == "SF": + wpr = list(data["stats"]["df"]["wave_total_cumulative_ratio"])[-1] + wppn = sum(list(data["stats"]["df"]["wave_packets_sent"])) / nodes + + metrics["SF"][topo].append((nodes, { + "cpr": cpr, + "wpr": wpr, + "cppn": cppn, + "wppn": wppn, + "cwpr": cpr + wpr + })) + elif algo == "DS": + metrics["DS"][topo].append((nodes, { + "cpr": cpr, + "cppn": cppn, + })) + else: + raise RuntimeError(algo) + +for fpath in os.listdir("bench_dump/"): + if fpath.endswith(".out"): + algo, topo, nodes, _ = fpath.split("_") + nodes = int(nodes) + + if topo == "grid": + nodes = nodes ** 2 + elif topo == "star": + nodes = nodes + 1 + elif topo == "erg": + pass + else: + raise RuntimeError(topo) + + # print(f"## {algo} {topo} {nodes} ({fpath})") + + try: + with open(f"bench_dump/{fpath}", "r") as fp: + lines = fp.readlines() + except UnicodeDecodeError as e: + print(f"[!!!] UnicodeDecodeError ({fpath}): {e}") + continue + + if algo == "SF": + reason = lines[-1].split(" ")[0] + + if reason not in ["wave", "forced"]: + raise RuntimeError(reason) + + if reason == "forced": + latency["SF"][topo].append((False, )) + + if nodes not in snr_distrib["SF"][topo]: + snr_distrib["SF"][topo][nodes] = [] + + snr_distrib["SF"][topo][nodes].append(0) + else: + reason, _, _, _, diff = lines[-1].split() + + assert diff[0] == "[" and diff[-1] == "]" + + if diff[1:-1] != "None": + dif = int(diff[1:-1]) + else: + dif = 0 + + latency["SF"][topo].append((True, dif, dif/nodes)) + + if nodes not in snr_distrib["SF"][topo]: + snr_distrib["SF"][topo][nodes] = [] + + snr_distrib["SF"][topo][nodes].append(1) + elif algo == "DS": + reason = lines[-1].split(" ")[0] + + if reason not in ["root", "forced"]: + raise RuntimeError(reason) + + if reason == "forced": + latency["DS"][topo].append((False, )) + + if nodes not in snr_distrib["DS"][topo]: + snr_distrib["DS"][topo][nodes] = [] + + snr_distrib["DS"][topo][nodes].append(0) + else: + reason, _, _, diff = lines[-1].split() + + assert diff[0] == "[" and diff[-1] == "]" + + if diff[1:-1] != "None": + dif = int(diff[1:-1]) + else: + dif = 0 + + latency["DS"][topo].append((True, dif, dif/nodes)) + + if nodes not in snr_distrib["DS"][topo]: + snr_distrib["DS"][topo][nodes] = [] + + snr_distrib["DS"][topo][nodes].append(1) + else: + raise RuntimeError(algo) + +for algo in lnr_distr: + for topo in lnr_distr[algo]: + lnr_distr[algo][topo] = [x[-1] for x in latency[algo][topo] if x[0] and x[-1] > 0] + +# print(json.dumps({ +# "metrics": metrics, +# "latency": latency, +# "lnr": lnr_distr +# }, indent=4)) + +fig, axes = plt.subplots(2, 3) +fig.set_figheight(10) +fig.set_figwidth(25) + +axes[0][0].set_title("DS/SF Grid Topology LNR Distribution") +axes[0][1].set_title("DS/SF Star Topology LNR Distribution") +axes[0][2].set_title("DS/SF ERG Topology LNR Distribution") + +axes[0][0].set_xlabel("Latency/Node Ratio (ticks/node)") +axes[0][1].set_xlabel("Latency/Node Ratio (ticks/node)") +axes[0][2].set_xlabel("Latency/Node Ratio (ticks/node)") + +sns.histplot(data={"Dijkstra-Scholten": lnr_distr["DS"]["grid"], "Shavit-Francez": lnr_distr["SF"]["grid"]}, kde=True, ax=axes[0][0], log_scale=True) +sns.histplot(data={"Dijkstra-Scholten": lnr_distr["DS"]["star"], "Shavit-Francez": lnr_distr["SF"]["star"]}, kde=True, ax=axes[0][1], log_scale=True) +sns.histplot(data={"Dijkstra-Scholten": lnr_distr["DS"]["erg"], "Shavit-Francez": lnr_distr["SF"]["erg"]}, kde=True, ax=axes[0][2], log_scale=True) + +axes[1][0].set_title("DS/SF Grid SNR Topology Plot") +axes[1][1].set_title("DS/SF Star SNR Topology Plot") +axes[1][2].set_title("DS/SF ERG SNR Topology Plot") + +axes[1][0].set_xlabel("Node Count") +axes[1][1].set_xlabel("Node Count") +axes[1][2].set_xlabel("Node Count") + +axes[1][0].set_ylabel("Successive Simulations (%)") +axes[1][1].set_ylabel("Successive Simulations (%)") +axes[1][2].set_ylabel("Successive Simulations (%)") + +topos = [ + "grid", + "star", + "erg" +] + +for i in range(3): + sns.lineplot(y=[100 * sum(snr_distrib["DS"][topos[i]][node_count]) / len(snr_distrib["DS"][topos[i]][node_count]) for node_count in snr_distrib["DS"][topos[i]]], x=[node_count for node_count in snr_distrib["DS"][topos[i]]], ax=axes[1][i], legend='brief', label="Dijkstra-Scholten", alpha=0.7) + sns.lineplot(y=[100 * sum(snr_distrib["SF"][topos[i]][node_count]) / len(snr_distrib["SF"][topos[i]][node_count]) for node_count in snr_distrib["SF"][topos[i]]], x=[node_count for node_count in snr_distrib["SF"][topos[i]]], ax=axes[1][i], legend='brief', label="Shavit-Francez", alpha=0.7) + +sns.despine() +plt.savefig("lnr_snr.png", dpi=200) + +fig, axes = plt.subplots(2, 3) +fig.set_figheight(10) +fig.set_figwidth(25) + +axes[0][0].set_title("DS/SF Grid Control Package Ratio Plot") +axes[0][1].set_title("DS/SF Star Control Package Ratio Plot") +axes[0][2].set_title("DS/SF ERG Control Package Ratio Plot") + +axes[0][0].set_xlabel("Node Count") +axes[0][1].set_xlabel("Node Count") +axes[0][2].set_xlabel("Node Count") + +axes[0][0].set_ylabel("Average CPR (%)") +axes[0][1].set_ylabel("Average CPR (%)") +axes[0][2].set_ylabel("Average CPR (%)") + +# axes[0][0].set_yscale("log") +# axes[0][1].set_yscale("log") +# axes[0][2].set_yscale("log") + +for i in range(3): + _top = topos[i] + cprs = {} + c_wprs = {} + c_prs = {} + wprs = {} + + for x in metrics["DS"][_top]: + node_count = x[0] + cpr = x[1]["cpr"] + + if node_count not in cprs: + cprs[node_count] = [] + + cprs[node_count].append(cpr) + + for x in metrics["SF"][_top]: + node_count = x[0] + + cw_pr = x[1]["cwpr"] + c_pr = x[1]["cpr"] + wpr = x[1]["wpr"] + + if node_count not in c_wprs: + c_wprs[node_count] = [] + + if node_count not in c_prs: + c_prs[node_count] = [] + + if node_count not in wprs: + wprs[node_count] = [] + + c_wprs[node_count].append(cw_pr) + c_prs[node_count].append(c_pr) + wprs[node_count].append(wpr) + + sns.lineplot(y=[sum(cprs[node_count]) / len(cprs[node_count]) for node_count in cprs], x=[node_count for node_count in cprs], ax=axes[0][i], legend='brief', label="Dijkstra-Scholten (CPR)") + sns.lineplot(y=[sum(c_prs[node_count]) / len(c_prs[node_count]) for node_count in c_prs], x=[node_count for node_count in c_prs], ax=axes[0][i], legend='brief', label="Shavit-Francez (CPR)") + sns.lineplot(y=[sum(wprs[node_count]) / len(wprs[node_count]) for node_count in wprs], x=[node_count for node_count in wprs], ax=axes[0][i], legend='brief', label="Shavit-Francez (WPR)") + sns.lineplot(y=[sum(c_wprs[node_count]) / len(c_wprs[node_count]) for node_count in c_wprs], x=[node_count for node_count in c_wprs], ax=axes[0][i], legend='brief', label="Shavit-Francez (CWPR)") + +axes[1][0].set_title("Dijkstra-Scholten Average CPPN Ratio Plot") +axes[1][1].set_title("Shavit-Francez Average CPPN Ratio Plot") +axes[1][2].set_title("Shavit-Francez Average WPPN Ratio Plot") + +axes[1][0].set_xlabel("Node Count") +axes[1][1].set_xlabel("Node Count") +axes[1][2].set_xlabel("Node Count") + +axes[1][0].set_ylabel("Average CPPN (control packets/node)") +axes[1][1].set_ylabel("Average CPPN (control packets/node)") +axes[1][2].set_ylabel("Average WPPN (wave packets/node)") + +def get_average_cppn(algo, topo): + cppns = {} + + for x in metrics[algo][topo]: + node_count = x[0] + cppn = x[1]["cppn"] + + if node_count not in cppns: + cppns[node_count] = [] + + cppns[node_count].append(cppn) + + return ([sum(cppns[node_count]) / len(cppns[node_count]) for node_count in cppns], [node_count for node_count in cppns]) + +def get_average_wppn(topo): + wppns = {} + + for x in metrics["SF"][topo]: + node_count = x[0] + wppn = x[1]["wppn"] + + if node_count not in wppns: + wppns[node_count] = [] + + wppns[node_count].append(wppn) + + return ([sum(wppns[node_count]) / len(wppns[node_count]) for node_count in wppns], [node_count for node_count in wppns]) + +ds_grid_cppns = get_average_cppn("DS", "grid") +ds_star_cppns = get_average_cppn("DS", "star") +ds_erg_cppns = get_average_cppn("DS", "erg") + +sf_grid_cppns = get_average_cppn("SF", "grid") +sf_star_cppns = get_average_cppn("SF", "star") +sf_erg_cppns = get_average_cppn("SF", "erg") + +sf_grid_wppns = get_average_wppn("grid") +sf_star_wppns = get_average_wppn("star") +sf_erg_wppns = get_average_wppn("erg") + +sns.lineplot(y=ds_grid_cppns[0], x=ds_grid_cppns[1], ax=axes[1][0], legend='brief', label="Grid") +sns.lineplot(y=ds_star_cppns[0], x=ds_star_cppns[1], ax=axes[1][0], legend='brief', label="Star") +sns.lineplot(y=ds_erg_cppns[0], x=ds_erg_cppns[1], ax=axes[1][0], legend='brief', label="ERG") + +sns.lineplot(y=sf_grid_cppns[0], x=sf_grid_cppns[1], ax=axes[1][1], legend='brief', label="Grid") +sns.lineplot(y=sf_star_cppns[0], x=sf_star_cppns[1], ax=axes[1][1], legend='brief', label="Star") +sns.lineplot(y=sf_erg_cppns[0], x=sf_erg_cppns[1], ax=axes[1][1], legend='brief', label="ERG") + +sns.lineplot(y=sf_grid_wppns[0], x=sf_grid_wppns[1], ax=axes[1][2], legend='brief', label="Grid") +sns.lineplot(y=sf_star_wppns[0], x=sf_star_wppns[1], ax=axes[1][2], legend='brief', label="Star") +sns.lineplot(y=sf_erg_wppns[0], x=sf_erg_wppns[1], ax=axes[1][2], legend='brief', label="ERG") + +sns.despine() + +plt.savefig("cpr_cppn_wpr_wppn.png", dpi=200) +# plt.show() \ No newline at end of file diff --git a/cli.py b/cli.py new file mode 100644 index 0000000000000000000000000000000000000000..2f47229e56c303d37369bdf25a64bf8fed318e9d --- /dev/null +++ b/cli.py @@ -0,0 +1,65 @@ +from argparse import ArgumentParser +from simulator_sf import run_shavit_francez_simulation +from simulator_ds import run_dijkstra_scholten_simulation + +algorithm_handlers = { + "djikstra-scholten": run_dijkstra_scholten_simulation, + "shavit-francez": run_shavit_francez_simulation, +} + + ## IMPORTANT NOTE + # possibe evaluation metrics + # num(control packages) / num(total packages) => + # tick(algo done) - tick(termination) => ne kadar erken detect etti + +parser = ArgumentParser() +parser.add_argument("algorithm", type=str, choices=list(algorithm_handlers.keys())) +parser.add_argument("simulation_ticks", type=int) +parser.add_argument("ms_per_tick", type=int) +parser.add_argument("--node_min_activeness_after_receive", type=int, default=3) # paket aldiktan sonra min bu kadar aktif kal +parser.add_argument("--node_max_activeness_after_receive", type=int, default=5) # paket aldiktan sonra max bu kadar aktif kal +parser.add_argument("--node_activeness_communication_prob", type=float, default=0.5) # alive iken baska nodelara paket gonderme olasiligi +parser.add_argument("--node_initial_activeness_prob", type=float, default=0.5) +parser.add_argument("--node_package_process_per_tick", type=int, default=5) + +parser.add_argument("--only_root_alive_initially", action="store_true", default=False) + +parser.add_argument("--run_until_termination", action="store_true", default=False) +parser.add_argument("--exit_on_termination", action="store_true", default=False) +parser.add_argument("--wait_ticks_after_termination", type=int, default=0) + +parser.add_argument("--passiveness_death_thresh", type=int, default=20) +parser.add_argument("--hard_stop_nodes", action="store_true", default=False) +parser.add_argument("--hard_stop_min_tick", type=int, default=50) +parser.add_argument("--hard_stop_max_tick", type=int, default=300) +parser.add_argument("--hard_stop_prob", type=float, default=0.5) + +parser.add_argument("--no_realtime_plot", action="store_true", default=False) +parser.add_argument("--save_tick_plots", action="store_true", default=False) +parser.add_argument("--tick_plots_save_dir", type=str, default="simdump") +parser.add_argument("--generate_gif", action="store_true", default=False) + +sp = parser.add_subparsers() + +erg_parser = sp.add_parser("erg") +erg_parser.add_argument("node_count", type=int) +erg_parser.add_argument("--node_connectivity", type=float, default=0.5) +erg_parser.set_defaults(network_type="erg") + +grid_parser = sp.add_parser("grid") +grid_parser.add_argument("node_count_on_edge", type=int) +grid_parser.set_defaults(network_type="grid") + +star_parser = sp.add_parser("star") +star_parser.add_argument("slave_count", type=int) +star_parser.add_argument("--master_is_root", type=bool, default=True) +star_parser.set_defaults(network_type="star") + +if __name__ == "__main__": + args = parser.parse_args() + print(f"[+] Network type: {args.network_type}") + + if args.generate_gif: + args.save_tick_plots = True + + algorithm_handlers[args.algorithm](args) \ No newline at end of file diff --git a/dijkstra_scholten.py b/dijkstra_scholten.py new file mode 100644 index 0000000000000000000000000000000000000000..c06ddc414184c6b52958af6be4b469f4a22aa37e --- /dev/null +++ b/dijkstra_scholten.py @@ -0,0 +1,323 @@ +import os +import queue +import sys +import time +import json +import queue +import random +import networkx as nx +from enum import Enum +import matplotlib.pyplot as plt +from datetime import datetime as dt + +from Channels import P2PFIFOPerfectChannel +from LinkLayers.GenericLinkLayer import LinkLayer +from NetworkLayers.AllSeeingEyeNetworkLayer import AllSeingEyeNetworkLayer +from Ahc import (ComponentModel, Event, ConnectorTypes, Topology, + ComponentRegistry, GenericMessagePayload, GenericMessageHeader, + GenericMessage, EventTypes) + +registry = ComponentRegistry() + +# define your own message types +class ApplicationLayerMessageType(Enum): + BASIC = "basic" + CONTROL = "control" # NOTE: Means the acknowledgement message! + +class DSAHCNodeSimulationStatus(Enum): + ACTIVE = "active" + PASSIVE = "passive" + OUT_OF_CLOCK = "ooc" + OUT_OF_TREE = "oot" + + def __str__(self): + return self.name + +# define your own message header structure +class ApplicationLayerMessageHeader(GenericMessageHeader): + pass + +# define your own message payload structure +class ApplicationLayerMessagePayload(GenericMessagePayload): + pass + +class DijkstraScholtenApplicationLayerComponent(ComponentModel): + def __init__(self, componentname, componentinstancenumber, context): + super().__init__(componentname, componentinstancenumber, context=context) + + self.context = context + # self.eventhandlers[ApplicationLayerMessageType.BASIC] = self.on_basic_message + # self.eventhandlers[ApplicationLayerMessageType.CONTROL] = self.on_control_message + + self.basic_message_queue = queue.Queue(maxsize=-1) + self.control_message_queue = queue.Queue(maxsize=-1) + self.simulation_state = DSAHCNodeSimulationStatus.PASSIVE + + self.sleep_ms_per_tick = context["ms_per_tick"] + self.simulation_ticks_total = context["simulation_ticks"] + self.alive_for_next_ticks = context["initial_liveness"][componentinstancenumber] + self.communication_on_active_prob = context["communication_on_active_prob"] + self.min_activeness_after_receive = context["min_activeness_after_receive"] + self.max_activeness_after_receive = context["max_activeness_after_receive"] + self.package_process_per_tick = context["node_package_process_per_tick"] + self.die_passiveness_threshold = context["passiveness_death_thresh"] + + if context["hard_stop_on_tick"] is None: + self.hard_stop_on_tick = None + else: + self.hard_stop_on_tick = context["hard_stop_on_tick"][self.componentinstancenumber] + + self.__tick_n = 0 + self._passive_counter = 0 + + self._child_counter = 0 + self._parent_node = None + self._in_tree = False + self._children = [] + + self._cms = 0 + + self._i_am_root = context["network"].root == self.componentinstancenumber + + if context["only_root_alive_initially"]: + if self._i_am_root: + self.alive_for_next_ticks = 20 # this may change though... + self._in_tree = True + else: + self.alive_for_next_ticks = 0 + elif self._i_am_root and self.alive_for_next_ticks == 0: + self.alive_for_next_ticks = 20 # this may change though... + + if self.alive_for_next_ticks > 0: + self.simulation_state = DSAHCNodeSimulationStatus.ACTIVE + + def prepare_application_layer_message(self, message_type: ApplicationLayerMessageType, destination_node_id: int, payload: object) -> GenericMessage: + hdr = ApplicationLayerMessageHeader(message_type, self.componentinstancenumber, destination_node_id) + payload = ApplicationLayerMessagePayload(payload) + + return GenericMessage(hdr, payload) + + def send_random_basic_message(self, to: int) -> None: + self._child_counter += 1 + self._children.append(to) + self.send_down(Event(self, EventTypes.MFRT, self.prepare_application_layer_message(ApplicationLayerMessageType.BASIC, to, str(dt.now().timestamp())))) + + def send_ack_control_message(self, to: int, is_dead: bool) -> None: + # print(f"send_ack_control_message: N-{self.componentinstancenumber} ==> N-{to} ({self._parent_node}) : {'DEAD' if is_dead else 'PKG_RESP'}") + self.send_down(Event(self, EventTypes.MFRT, self.prepare_application_layer_message(ApplicationLayerMessageType.CONTROL, to, str(dt.now().timestamp())))) + self._cms += 1 + + def on_init(self, eventobj: Event): + # print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + pass + + def on_message_from_bottom(self, eventobj: Event): + applmessage = eventobj.eventcontent + hdr = applmessage.header + + # print(f"Node-{self.componentinstancenumber}: Node-{hdr.messagefrom} has sent {hdr.messagetype} message (payload: {applmessage.payload})") + + if hdr.messagetype == ApplicationLayerMessageType.BASIC: + self.basic_message_queue.put_nowait(applmessage) + + if self._in_tree: + self.send_ack_control_message(hdr.messagefrom, False) + else: + self._parent_node = hdr.messagefrom + self._in_tree = True + + if self.componentinstancenumber not in self.context["alive_nodes"]: + self.context["alive_nodes"].append(self.componentinstancenumber) + + elif hdr.messagetype == ApplicationLayerMessageType.CONTROL: + # self.control_message_queue.put_nowait(applmessage) + + try: + self._children.remove(hdr.messagefrom) + self._child_counter -= 1 + except ValueError as e: + # print(f"\n\n\n{self.componentinstancenumber}: {e} {hdr.messagefrom} {self._children}\n\n\n") + pass + + def exit_tree(self): + # Exit from the tree + if self._in_tree: + self._in_tree = False + if self._parent_node is not None: + self.send_ack_control_message(self._parent_node, True) + + self._parent_node = None + + if self.componentinstancenumber in self.context["alive_nodes"]: + self.context["alive_nodes"].remove(self.componentinstancenumber) + + def simulation_tick(self): + next_state = None + got_packages_from = None + to_friend = None + + _upd_children = [] + + for c in self._children: + if c in self.context["alive_nodes"]: + _upd_children.append(c) + + self._children = _upd_children + self._child_counter = len(self._children) + + if self.simulation_state == DSAHCNodeSimulationStatus.OUT_OF_TREE: + next_state = DSAHCNodeSimulationStatus.OUT_OF_TREE + # print(f" ==> N-{self.componentinstancenumber}: OOT") #NOTE: DEV + # print(f" ==> N-{self.componentinstancenumber}: OUT OF TREE") + elif self.__tick_n >= self.simulation_ticks_total: + self.exit_tree() + next_state = DSAHCNodeSimulationStatus.OUT_OF_TREE + # print(f" ==> N-{self.componentinstancenumber}: OOC DEAD") #NOTE: DEV + elif not self._i_am_root and self._passive_counter >= self.die_passiveness_threshold: + self.exit_tree() + next_state = DSAHCNodeSimulationStatus.OUT_OF_TREE + # print(f" ==> N-{self.componentinstancenumber}: PASSIVE DIE") #NOTE: DEV + elif not self._i_am_root and self.__tick_n >= self.hard_stop_on_tick: + self.exit_tree() + next_state = DSAHCNodeSimulationStatus.OUT_OF_TREE + # print(f" ==> N-{self.componentinstancenumber}: HARD STOP") #NOTE: DEV + else: + if self.simulation_state == DSAHCNodeSimulationStatus.OUT_OF_CLOCK: + next_state = DSAHCNodeSimulationStatus.OUT_OF_CLOCK + elif self.simulation_state == DSAHCNodeSimulationStatus.PASSIVE: + if self.basic_message_queue.empty(): + if self._in_tree and self._child_counter == 0: + if self._i_am_root: + print(f" **ROOT** N-{self.componentinstancenumber}: Termination!!!") + __cms = self._cms + self._cms = 0 + return None, None, __cms + else: + self.exit_tree() + next_state = DSAHCNodeSimulationStatus.PASSIVE + print(f" ==> N-{self.componentinstancenumber}: OUT OF TREE / PASSIVE") + else: + # no incoming package, still passive. + next_state = DSAHCNodeSimulationStatus.PASSIVE + else: + got_packages_from = [] + + for _ in range(self.package_process_per_tick): + try: + package = self.basic_message_queue.get_nowait() + # print(f"+P+ N-{self.componentinstancenumber} <==BASIC== N-{package.header.messagefrom} ({package.payload.messagepayload})") + got_packages_from.append(package.header.messagefrom) + except queue.Empty: + break + + self.alive_for_next_ticks = random.randint(self.min_activeness_after_receive, self.max_activeness_after_receive) + next_state = DSAHCNodeSimulationStatus.ACTIVE + elif self.simulation_state == DSAHCNodeSimulationStatus.ACTIVE: + got_packages_from = [] + + for _ in range(self.package_process_per_tick): + try: + package = self.basic_message_queue.get_nowait() + # print(f"+A+ N-{self.componentinstancenumber} <==BASIC== N-{package.header.messagefrom} ({package.payload.messagepayload})") + got_packages_from.append(package.header.messagefrom) + except queue.Empty: + break + + if random.random() <= self.communication_on_active_prob: + # send package to a random friend.. + + _alive_ones = [n for n in self.context["alive_nodes"] if n != self.componentinstancenumber] + + if len(_alive_ones) == 0: # everyone is dead!!! + # print(f" **ROOT** N-{self.componentinstancenumber}: Eveyone is dead!!!") + # return None, None # time to go! + to_friend = None + else: + to_friend = random.choice(_alive_ones) + self.send_random_basic_message(to_friend) + + self.alive_for_next_ticks -= 1 + + if self.alive_for_next_ticks == 0: + if len(got_packages_from) > 0: + # got a package, this means immeiate activeness from passive! + next_state = DSAHCNodeSimulationStatus.ACTIVE + self.alive_for_next_ticks = random.randint(self.min_activeness_after_receive, self.max_activeness_after_receive) + else: + next_state = DSAHCNodeSimulationStatus.PASSIVE + else: + next_state = DSAHCNodeSimulationStatus.ACTIVE + + assert next_state is not None + + # ST: state + # NS: next state + # GPF: got packages from + # SPF: sent package to friend + # ANT: alive next ticks + # P2P: packages to process + # print(f" {'ROOT' if self._i_am_root else '==>'} N-{self.componentinstancenumber}: P: {self._parent_node}, CC: ({self._child_counter}) {self._children}, ST: {self.simulation_state}, NS: {next_state}, GPF: {got_packages_from}, SPF: {to_friend}, ANT: {self.alive_for_next_ticks}, P2P: {self.basic_message_queue.qsize()}") + + if self._i_am_root: + print(f" {'ROOT' if self._i_am_root else '==>'} N-{self.componentinstancenumber}: P: {self._parent_node}, CC: ({self._child_counter}) {self._children}, ST: {self.simulation_state}, NS: {next_state}, GPF: {got_packages_from}, SPF: {to_friend}, ANT: {self.alive_for_next_ticks}, P2P: {self.basic_message_queue.qsize()}") + + # time.sleep(self.sleep_ms_per_tick / 1000) + self.__tick_n += 1 + self.simulation_state = next_state + + if self.simulation_state == DSAHCNodeSimulationStatus.PASSIVE: + self._passive_counter += 1 + elif self.simulation_state == DSAHCNodeSimulationStatus.ACTIVE: + self._passive_counter = 0 + + __cms = self._cms + self._cms = 0 + + return next_state, to_friend, __cms + +class DijkstraScholtenAdHocNode(ComponentModel): + def __init__(self, componentname, componentid, context): + self.context = context + # SUBCOMPONENTS + self.appllayer = DijkstraScholtenApplicationLayerComponent("ApplicationLayer", componentid, context=self.context) + self.netlayer = AllSeingEyeNetworkLayer("NetworkLayer", componentid) + self.linklayer = LinkLayer("LinkLayer", componentid) + # self.failuredetect = GenericFailureDetector("FailureDetector", componentid) + + # CONNECTIONS AMONG SUBCOMPONENTS + self.appllayer.connect_me_to_component(ConnectorTypes.DOWN, self.netlayer) + # self.failuredetect.connectMeToComponent(PortNames.DOWN, self.netlayer) + self.netlayer.connect_me_to_component(ConnectorTypes.UP, self.appllayer) + # self.netlayer.connectMeToComponent(PortNames.UP, self.failuredetect) + self.netlayer.connect_me_to_component(ConnectorTypes.DOWN, self.linklayer) + self.linklayer.connect_me_to_component(ConnectorTypes.UP, self.netlayer) + + # Connect the bottom component to the composite component.... + self.linklayer.connect_me_to_component(ConnectorTypes.DOWN, self) + self.connect_me_to_component(ConnectorTypes.UP, self.linklayer) + + super().__init__(componentname, componentid, context=self.context) + + def on_init(self, eventobj: Event): + # print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + pass + + def on_message_from_top(self, eventobj: Event): + self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent)) + + def on_message_from_bottom(self, eventobj: Event): + self.send_up(Event(self, EventTypes.MFRB, eventobj.eventcontent)) + + def simulation_tick(self): + return self.appllayer.simulation_tick() + + @property + def waiting_packages_on_queue(self): + if self.appllayer.simulation_state == DSAHCNodeSimulationStatus.OUT_OF_CLOCK or self.appllayer.simulation_state == DSAHCNodeSimulationStatus.OUT_OF_TREE: + return 0 + + return self.appllayer.basic_message_queue.qsize() + + @property + def parent_node(self): + return self.appllayer._parent_node \ No newline at end of file diff --git a/graph.py b/graph.py new file mode 100644 index 0000000000000000000000000000000000000000..bc7badb7dbe9adadb222b76943a8e8521fcd436d --- /dev/null +++ b/graph.py @@ -0,0 +1,71 @@ +import random +import networkx as nx +import matplotlib.pyplot as plt + +class ERG: + def __init__(self, node_count: int, connectivity: float, ax = None) -> None: + self.ax = ax + self.node_count = node_count + self.root = random.choice(list(range(self.node_count))) + self.G = nx.nx.erdos_renyi_graph(self.node_count, connectivity, seed=random.randint(100, 10000), directed=False) + + def plot(self): + node_colors = ["red" if i == self.root else "mediumslateblue" for i in range(self.node_count)] + + if self.ax is not None: + nx.draw(self.G, with_labels=True, node_color=node_colors, ax=self.ax) + else: + nx.draw(self.G, with_labels=True, node_color=node_colors) + +class Grid: + def __init__(self, node_count_on_edge: int, ax = None) -> None: + self.ax = ax + self.node_count_on_edge = node_count_on_edge + self.root = random.choice(list(range(self.node_count_on_edge ** 2))) + self.G = nx.grid_2d_graph(self.node_count_on_edge, self.node_count_on_edge) + self.positions = {self.node_count_on_edge * x[0] + x[1]: x for x in self.G.nodes()} + + self.G = nx.relabel_nodes(self.G, lambda x: self.node_count_on_edge * x[0] + x[1]) + + def plot(self): + node_colors = ["red" if i == self.root else "mediumslateblue" for i in range(self.node_count_on_edge ** 2)] + + if self.ax is not None: + nx.draw(self.G, with_labels=True, node_color=node_colors, pos=self.positions, ax=self.ax) + else: + nx.draw(self.G, with_labels=True, node_color=node_colors, pos=self.positions) + +class Star: + def __init__(self, slave_count: int, master_is_root: bool = True, ax = None) -> None: + self.ax = ax + self.slave_count = slave_count + self.root = 0 if master_is_root else random.choice(list(range(1, slave_count + 1))) + + self.G = nx.Graph() + + self.G.add_node(0) + + for i in range(1, self.slave_count + 1): + self.G.add_node(i) + self.G.add_edge(0, i) + + def plot(self): + node_colors = ["red" if i == self.root else "mediumslateblue" for i in range(self.slave_count + 1)] + + if self.ax is not None: + nx.draw(self.G, with_labels=True, node_color=node_colors, ax=self.ax) + else: + nx.draw(self.G, with_labels=True, node_color=node_colors) + +if __name__ == "__main__": + fig, axes = plt.subplots(1, 4) + fig.set_figheight(5) + fig.set_figwidth(25) + fig.tight_layout() + + ERG(10, 0.45, ax=axes[0]).plot() + Grid(4, ax=axes[1]).plot() + Star(6, master_is_root=True, ax=axes[2]).plot() + Star(6, master_is_root=False, ax=axes[3]).plot() + + plt.show() \ No newline at end of file diff --git a/run_sim.sh b/run_sim.sh new file mode 100755 index 0000000000000000000000000000000000000000..3d55fe662130c88ea7d598efd10ddd0a7a259742 --- /dev/null +++ b/run_sim.sh @@ -0,0 +1,72 @@ +#!/bin/bash + +set -e + +run_sim_ds() { + echo "++ DS ${1}(${2}) - run ${3}" + python3 ../cli.py djikstra-scholten 100 100 --run_until_termination --passiveness_death_thresh 3000 --hard_stop_nodes --hard_stop_min_tick 50 --hard_stop_max_tick 300 --node_package_process_per_tick 3 --node_initial_activeness_prob .5 --node_activeness_communication_prob .5 --wait_ticks_after_termination 200 --only_root_alive_initially --no_realtime_plot "${1}" "${2}" > "DS_${1}_${2}_run-${3}.out" <<< "\n" +} + +run_sim_sf() { + echo "++ SF ${1}(${2}) - run ${3}" + python3 ../cli.py shavit-francez 100 100 --run_until_termination --passiveness_death_thresh 3000 --hard_stop_nodes --hard_stop_min_tick 50 --hard_stop_max_tick 300 --node_package_process_per_tick 3 --node_initial_activeness_prob .5 --node_activeness_communication_prob .5 --wait_ticks_after_termination 200 --only_root_alive_initially --no_realtime_plot "${1}" "${2}" > "SF_${1}_${2}_run-${3}.out" <<< "\n" +} + +for i in {1..10}; do + run_sim_ds "grid" "3" "${i}" + run_sim_ds "grid" "4" "${i}" + run_sim_ds "grid" "5" "${i}" + run_sim_ds "grid" "6" "${i}" + run_sim_ds "grid" "7" "${i}" + run_sim_ds "grid" "8" "${i}" + run_sim_ds "grid" "9" "${i}" + run_sim_ds "grid" "10" "${i}" + run_sim_sf "grid" "3" "${i}" + run_sim_sf "grid" "4" "${i}" + run_sim_sf "grid" "5" "${i}" + run_sim_sf "grid" "6" "${i}" + run_sim_sf "grid" "7" "${i}" + run_sim_sf "grid" "8" "${i}" + run_sim_sf "grid" "9" "${i}" + run_sim_sf "grid" "10" "${i}" +done + +for i in {1..10}; do + run_sim_ds "star" "4" "${i}" + run_sim_ds "star" "9" "${i}" + run_sim_ds "star" "19" "${i}" + run_sim_ds "star" "29" "${i}" + run_sim_ds "star" "39" "${i}" + run_sim_ds "star" "49" "${i}" + run_sim_ds "star" "59" "${i}" + run_sim_sf "star" "4" "${i}" + run_sim_sf "star" "9" "${i}" + run_sim_sf "star" "19" "${i}" + run_sim_sf "star" "29" "${i}" + run_sim_sf "star" "39" "${i}" + run_sim_sf "star" "49" "${i}" + run_sim_sf "star" "59" "${i}" +done + +for i in {1..10}; do + run_sim_ds "erg" "5" "${i}" + run_sim_ds "erg" "10" "${i}" + run_sim_ds "erg" "15" "${i}" + run_sim_ds "erg" "20" "${i}" + run_sim_ds "erg" "25" "${i}" + run_sim_ds "erg" "30" "${i}" + run_sim_ds "erg" "35" "${i}" + run_sim_ds "erg" "40" "${i}" + run_sim_ds "erg" "45" "${i}" + run_sim_ds "erg" "50" "${i}" + run_sim_sf "erg" "5" "${i}" + run_sim_sf "erg" "10" "${i}" + run_sim_sf "erg" "15" "${i}" + run_sim_sf "erg" "20" "${i}" + run_sim_sf "erg" "25" "${i}" + run_sim_sf "erg" "30" "${i}" + run_sim_sf "erg" "35" "${i}" + run_sim_sf "erg" "40" "${i}" + run_sim_sf "erg" "45" "${i}" + run_sim_sf "erg" "50" "${i}" +done \ No newline at end of file diff --git a/shavit_francez.py b/shavit_francez.py new file mode 100644 index 0000000000000000000000000000000000000000..8a94390ddb89af9a61b1cf45a2c473508155ccb3 --- /dev/null +++ b/shavit_francez.py @@ -0,0 +1,405 @@ +import os +import queue +import sys +import time +import json +import queue +import random +import networkx as nx +from enum import Enum +import matplotlib.pyplot as plt +from datetime import datetime as dt + +from Channels import P2PFIFOPerfectChannel +from LinkLayers.GenericLinkLayer import LinkLayer +from NetworkLayers.AllSeeingEyeNetworkLayer import AllSeingEyeNetworkLayer +from Ahc import (ComponentModel, Event, ConnectorTypes, Topology, + ComponentRegistry, GenericMessagePayload, GenericMessageHeader, + GenericMessage, EventTypes) + +WAVE_DEBUG = False + +registry = ComponentRegistry() + +# define your own message types +class ApplicationLayerMessageType(Enum): + BASIC = "basic" + CONTROL = "control" # NOTE: Means the acknowledgement message! + WAVE = "wave" + +class SFWaveMessageType(Enum): + REQUEST = "req" + RESPONSE = "resp" + + def __str__(self): + return self.name + +class SFWaveResponse(Enum): + FINISHED = "fin" + ACTIVE = "act" + + def __str__(self): + return self.name + +class SFWaveMessagePayload: + tag: int + type: SFWaveMessageType + response: SFWaveResponse + + def __init__(self, tag: int, typ: SFWaveMessageType, resp: SFWaveResponse = None) -> None: + self.tag = tag + self.response = resp + self.type = typ + +class SFAHCNodeSimulationStatus(Enum): + ACTIVE = "active" + PASSIVE = "passive" + OUT_OF_CLOCK = "ooc" + OUT_OF_TREE = "oot" + + def __str__(self): + return self.name + +# define your own message header structure +class ApplicationLayerMessageHeader(GenericMessageHeader): + pass + +# define your own message payload structure +class ApplicationLayerMessagePayload(GenericMessagePayload): + pass + +class ShavitFrancezApplicationLayerComponent(ComponentModel): + def __init__(self, componentname, componentinstancenumber, context): + super().__init__(componentname, componentinstancenumber, context=context) + + self.context = context + # self.eventhandlers[ApplicationLayerMessageType.BASIC] = self.on_basic_message + # self.eventhandlers[ApplicationLayerMessageType.CONTROL] = self.on_control_message + + self.basic_message_queue = queue.Queue(maxsize=-1) + self.control_message_queue = queue.Queue(maxsize=-1) + self.simulation_state = SFAHCNodeSimulationStatus.PASSIVE + + self.sleep_ms_per_tick = context["ms_per_tick"] + self.simulation_ticks_total = context["simulation_ticks"] + self.alive_for_next_ticks = context["initial_liveness"][componentinstancenumber] + self.communication_on_active_prob = context["communication_on_active_prob"] + self.min_activeness_after_receive = context["min_activeness_after_receive"] + self.max_activeness_after_receive = context["max_activeness_after_receive"] + self.package_process_per_tick = context["node_package_process_per_tick"] + self.die_passiveness_threshold = context["passiveness_death_thresh"] + + if context["hard_stop_on_tick"] is None: + self.hard_stop_on_tick = None + else: + self.hard_stop_on_tick = context["hard_stop_on_tick"][self.componentinstancenumber] + + self.__tick_n = 0 + self._passive_counter = 0 + + self._child_counter = 0 + self._parent_node = None + self._in_tree = False + self._children = [] + + self._cms = 0 # control messages sent + self._wms = 0 # wave messages sent + + self._i_am_root = context["network"].root == self.componentinstancenumber + + if context["only_root_alive_initially"]: + if self._i_am_root: + self.alive_for_next_ticks = 20 # this may change though... + self._in_tree = True + else: + self.alive_for_next_ticks = 0 + elif self._i_am_root and self.alive_for_next_ticks == 0: + self.alive_for_next_ticks = 20 # this may change though... + + if self.alive_for_next_ticks > 0: + self.simulation_state = SFAHCNodeSimulationStatus.ACTIVE + + self.__exited_from_tree = False + self.my_wave_bucket = [] + self.announce_on_next_tick = False + + def prepare_application_layer_message(self, message_type: ApplicationLayerMessageType, destination_node_id: int, payload: object) -> GenericMessage: + hdr = ApplicationLayerMessageHeader(message_type, self.componentinstancenumber, destination_node_id) + payload = ApplicationLayerMessagePayload(payload) + + return GenericMessage(hdr, payload) + + def send_random_basic_message(self, to: int) -> None: + self._child_counter += 1 + self._children.append(to) + self.send_down(Event(self, EventTypes.MFRT, self.prepare_application_layer_message(ApplicationLayerMessageType.BASIC, to, str(dt.now().timestamp())))) + + def send_ack_control_message(self, to: int, is_dead: bool) -> None: + # print(f"send_ack_control_message: N-{self.componentinstancenumber} ==> N-{to} ({self._parent_node}) : {'DEAD' if is_dead else 'PKG_RESP'}") + self.send_down(Event(self, EventTypes.MFRT, self.prepare_application_layer_message(ApplicationLayerMessageType.CONTROL, to, str(dt.now().timestamp())))) + self._cms += 1 + + def on_init(self, eventobj: Event): + # print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + pass + + def on_message_from_bottom(self, eventobj: Event): + applmessage = eventobj.eventcontent + hdr = applmessage.header + + # print(f"Node-{self.componentinstancenumber}: Node-{hdr.messagefrom} has sent {hdr.messagetype} message (payload: {applmessage.payload})") + + if hdr.messagetype == ApplicationLayerMessageType.BASIC: + self.basic_message_queue.put_nowait(applmessage) + + if self._in_tree: + self.send_ack_control_message(hdr.messagefrom, False) + else: + self._parent_node = hdr.messagefrom + self._in_tree = True + + if self.componentinstancenumber not in self.context["alive_nodes"]: + self.context["alive_nodes"].append(self.componentinstancenumber) + elif hdr.messagetype == ApplicationLayerMessageType.CONTROL: + # self.control_message_queue.put_nowait(applmessage) + + try: + self._children.remove(hdr.messagefrom) + self._child_counter -= 1 + except ValueError as e: + # print(f"\n\n\n{self.componentinstancenumber}: {e} {hdr.messagefrom} {self._children}\n\n\n") + pass + elif hdr.messagetype == ApplicationLayerMessageType.WAVE: + if applmessage.payload.messagepayload.tag == self.componentinstancenumber: + self.my_wave_bucket.append(applmessage.payload.messagepayload.response) + + if WAVE_DEBUG: + print(f" WAVE << N-{self.componentinstancenumber} << N-{hdr.messagefrom} ({applmessage.payload.messagepayload.response})") + + if len(self.my_wave_bucket) == (len(self.context["network"].G.nodes()) - 1): + if WAVE_DEBUG: + print(f" ::: WAVE END >> N-{self.componentinstancenumber}: {' '.join(['A' if x == SFWaveResponse.ACTIVE else ('F' if x == SFWaveResponse.FINISHED else 'XXX') for x in self.my_wave_bucket])}") + + if SFWaveResponse.ACTIVE not in self.my_wave_bucket: + print(f" ::: WAVE END >> N-{self.componentinstancenumber}: ANNOUNCE!") + self.announce_on_next_tick = True + else: + print(f" ::: WAVE END >> N-{self.componentinstancenumber}: NOT DONE YET!") + self.my_wave_bucket = [] + else: + if self.__exited_from_tree: + self.send_down(Event(self, EventTypes.MFRT, self.prepare_application_layer_message(ApplicationLayerMessageType.WAVE, applmessage.payload.messagepayload.tag, SFWaveMessagePayload(applmessage.payload.messagepayload.tag, SFWaveMessageType.RESPONSE, SFWaveResponse.FINISHED)))) + if WAVE_DEBUG: + print(f" WAVE >> N-{self.componentinstancenumber} >> N-{applmessage.payload.messagepayload.tag} >> FINISHED") + else: + self.send_down(Event(self, EventTypes.MFRT, self.prepare_application_layer_message(ApplicationLayerMessageType.WAVE, applmessage.payload.messagepayload.tag, SFWaveMessagePayload(applmessage.payload.messagepayload.tag, SFWaveMessageType.RESPONSE, SFWaveResponse.ACTIVE)))) + if WAVE_DEBUG: + print(f" WAVE >> N-{self.componentinstancenumber} >> N-{applmessage.payload.messagepayload.tag} >> ACTIVE") + + self._wms += 1 + else: + print(f"\n!!! N-{self.componentinstancenumber}: GOT MSG UNIDENT: {hdr.messagetype}, FROM {hdr.messagefrom}") + sys.exit(2) + + def call_wave(self): + print(f" > CALL WAVE @@ N-{self.componentinstancenumber}") + + to_nodes = [i for i in range(len(self.context["network"].G.nodes())) if i != self.componentinstancenumber] + + for to in to_nodes: + self.send_down(Event(self, EventTypes.MFRT, self.prepare_application_layer_message(ApplicationLayerMessageType.WAVE, to, SFWaveMessagePayload(self.componentinstancenumber, SFWaveMessageType.REQUEST)))) + self._wms += 1 + + def exit_tree(self): + # Exit from the tree + + if not self.__exited_from_tree: + if self._in_tree: + self._in_tree = False + if self._parent_node is not None: + self.send_ack_control_message(self._parent_node, True) + + self._parent_node = None + + if self.componentinstancenumber in self.context["alive_nodes"]: + self.context["alive_nodes"].remove(self.componentinstancenumber) + + self.__exited_from_tree = True + print(f" > EXIT @@ N-{self.componentinstancenumber}") + + self.call_wave() + + def simulation_tick(self): + next_state = None + got_packages_from = None + to_friend = None + + _upd_children = [] + + for c in self._children: + if c in self.context["alive_nodes"]: + _upd_children.append(c) + + self._children = _upd_children + self._child_counter = len(self._children) + + if self.announce_on_next_tick: + __cms = self._cms + __wms = self._wms + self._cms = 0 + self._wms = 0 + + return None, None, __cms, __wms + + if self.simulation_state == SFAHCNodeSimulationStatus.OUT_OF_TREE: + next_state = SFAHCNodeSimulationStatus.OUT_OF_TREE + # print(f" ==> N-{self.componentinstancenumber}: OOT") #NOTE: DEV + # print(f" ==> N-{self.componentinstancenumber}: OUT OF TREE") + elif self.__tick_n >= self.simulation_ticks_total: + self.exit_tree() + next_state = SFAHCNodeSimulationStatus.OUT_OF_TREE + # print(f" ==> N-{self.componentinstancenumber}: OOC DEAD") #NOTE: DEV + elif self._passive_counter >= self.die_passiveness_threshold: # NOTE: initiator can also die from passiveness in Shavit-Francez. + self.exit_tree() + next_state = SFAHCNodeSimulationStatus.OUT_OF_TREE + # print(f" ==> N-{self.componentinstancenumber}: PASSIVE DIE") #NOTE: DEV + elif self.__tick_n >= self.hard_stop_on_tick: # NOTE: initiator can also hard stop in Shavit-Francez. + self.exit_tree() + next_state = SFAHCNodeSimulationStatus.OUT_OF_TREE + # print(f" ==> N-{self.componentinstancenumber}: HARD STOP") #NOTE: DEV + else: + if self.simulation_state == SFAHCNodeSimulationStatus.OUT_OF_CLOCK: + next_state = SFAHCNodeSimulationStatus.OUT_OF_CLOCK + elif self.simulation_state == SFAHCNodeSimulationStatus.PASSIVE: + if self.basic_message_queue.empty(): + if self._in_tree and self._child_counter == 0: + self.exit_tree() + next_state = SFAHCNodeSimulationStatus.PASSIVE + print(f" ==> N-{self.componentinstancenumber}: OUT OF TREE / PASSIVE") + else: + # no incoming package, still passive. + next_state = SFAHCNodeSimulationStatus.PASSIVE + else: + got_packages_from = [] + + for _ in range(self.package_process_per_tick): + try: + package = self.basic_message_queue.get_nowait() + # print(f"+P+ N-{self.componentinstancenumber} <==BASIC== N-{package.header.messagefrom} ({package.payload.messagepayload})") + got_packages_from.append(package.header.messagefrom) + except queue.Empty: + break + + self.alive_for_next_ticks = random.randint(self.min_activeness_after_receive, self.max_activeness_after_receive) + next_state = SFAHCNodeSimulationStatus.ACTIVE + elif self.simulation_state == SFAHCNodeSimulationStatus.ACTIVE: + got_packages_from = [] + + for _ in range(self.package_process_per_tick): + try: + package = self.basic_message_queue.get_nowait() + # print(f"+A+ N-{self.componentinstancenumber} <==BASIC== N-{package.header.messagefrom} ({package.payload.messagepayload})") + got_packages_from.append(package.header.messagefrom) + except queue.Empty: + break + + if random.random() <= self.communication_on_active_prob: + # send package to a random friend.. + + _alive_ones = [n for n in self.context["alive_nodes"] if n != self.componentinstancenumber] + + if len(_alive_ones) == 0: # everyone is dead!!! + # print(f" **ROOT** N-{self.componentinstancenumber}: Eveyone is dead!!!") + # return None, None # time to go! + to_friend = None + else: + to_friend = random.choice(_alive_ones) + self.send_random_basic_message(to_friend) + + self.alive_for_next_ticks -= 1 + + if self.alive_for_next_ticks == 0: + if len(got_packages_from) > 0: + # got a package, this means immeiate activeness from passive! + next_state = SFAHCNodeSimulationStatus.ACTIVE + self.alive_for_next_ticks = random.randint(self.min_activeness_after_receive, self.max_activeness_after_receive) + else: + next_state = SFAHCNodeSimulationStatus.PASSIVE + else: + next_state = SFAHCNodeSimulationStatus.ACTIVE + + assert next_state is not None + + # ST: state + # NS: next state + # GPF: got packages from + # SPF: sent package to friend + # ANT: alive next ticks + # P2P: packages to process + # print(f" {'ROOT' if self._i_am_root else '==>'} N-{self.componentinstancenumber}: P: {self._parent_node}, CC: ({self._child_counter}) {self._children}, ST: {self.simulation_state}, NS: {next_state}, GPF: {got_packages_from}, SPF: {to_friend}, ANT: {self.alive_for_next_ticks}, P2P: {self.basic_message_queue.qsize()}") + + if self._i_am_root: + print(f" {'INIT' if self._i_am_root else '==>'} N-{self.componentinstancenumber}: P: {self._parent_node}, CC: ({self._child_counter}) {self._children}, ST: {self.simulation_state}, NS: {next_state}, GPF: {got_packages_from}, SPF: {to_friend}, ANT: {self.alive_for_next_ticks}, P2P: {self.basic_message_queue.qsize()}") + + # time.sleep(self.sleep_ms_per_tick / 1000) + self.__tick_n += 1 + self.simulation_state = next_state + + if self.simulation_state == SFAHCNodeSimulationStatus.PASSIVE: + self._passive_counter += 1 + elif self.simulation_state == SFAHCNodeSimulationStatus.ACTIVE: + self._passive_counter = 0 + + __cms = self._cms + __wms = self._wms + self._cms = 0 + self._wms = 0 + + return next_state, to_friend, __cms, __wms + +class ShavitFrancezAdHocNode(ComponentModel): + def __init__(self, componentname, componentid, context): + self.context = context + # SUBCOMPONENTS + self.appllayer = ShavitFrancezApplicationLayerComponent("ApplicationLayer", componentid, context=self.context) + self.netlayer = AllSeingEyeNetworkLayer("NetworkLayer", componentid) + self.linklayer = LinkLayer("LinkLayer", componentid) + # self.failuredetect = GenericFailureDetector("FailureDetector", componentid) + + # CONNECTIONS AMONG SUBCOMPONENTS + self.appllayer.connect_me_to_component(ConnectorTypes.DOWN, self.netlayer) + # self.failuredetect.connectMeToComponent(PortNames.DOWN, self.netlayer) + self.netlayer.connect_me_to_component(ConnectorTypes.UP, self.appllayer) + # self.netlayer.connectMeToComponent(PortNames.UP, self.failuredetect) + self.netlayer.connect_me_to_component(ConnectorTypes.DOWN, self.linklayer) + self.linklayer.connect_me_to_component(ConnectorTypes.UP, self.netlayer) + + # Connect the bottom component to the composite component.... + self.linklayer.connect_me_to_component(ConnectorTypes.DOWN, self) + self.connect_me_to_component(ConnectorTypes.UP, self.linklayer) + + super().__init__(componentname, componentid, context=self.context) + + def on_init(self, eventobj: Event): + # print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + pass + + def on_message_from_top(self, eventobj: Event): + self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent)) + + def on_message_from_bottom(self, eventobj: Event): + self.send_up(Event(self, EventTypes.MFRB, eventobj.eventcontent)) + + def simulation_tick(self): + return self.appllayer.simulation_tick() + + @property + def waiting_packages_on_queue(self): + if self.appllayer.simulation_state == SFAHCNodeSimulationStatus.OUT_OF_CLOCK or self.appllayer.simulation_state == SFAHCNodeSimulationStatus.OUT_OF_TREE: + return 0 + + return self.appllayer.basic_message_queue.qsize() + + @property + def parent_node(self): + return self.appllayer._parent_node \ No newline at end of file diff --git a/simulator_ds.py b/simulator_ds.py new file mode 100644 index 0000000000000000000000000000000000000000..2b7e4c2d8eeca3f8e1b8da4f6318c89024e067ac --- /dev/null +++ b/simulator_ds.py @@ -0,0 +1,289 @@ +import os +import sys +import time +import pickle +import random +from typing import Text +import pandas as pd +import seaborn as sns +import networkx as nx +import matplotlib.pyplot as plt +from datetime import datetime as dt +from networkx.drawing.nx_pydot import graphviz_layout + +from Ahc import Topology +from graph import ERG, Grid, Star +from dijkstra_scholten import DijkstraScholtenAdHocNode, DSAHCNodeSimulationStatus +from Channels import P2PFIFOPerfectChannel + +import glob +from PIL import Image + +# # filepaths +# fp_in = "/path/to/image_*.png" +# fp_out = "/path/to/image.gif" + +def make_gif(from_path: str, to_path: str, out_file: str, duration: int) -> None: + # https://pillow.readthedocs.io/en/stable/handbook/image-file-formats.html#gif + _images = sorted(glob.glob(from_path)) + print(_images) + img, *imgs = [Image.open(f) for f in _images] + img.save(fp=f"{to_path}/{out_file}", format='GIF', append_images=imgs, + save_all=True, duration=duration, loop=0) + +def run_dijkstra_scholten_simulation(args): + ts = dt.now().timestamp() + + if args.network_type == "erg": + N = ERG(args.node_count, args.node_connectivity) + total_nodes = args.node_count + elif args.network_type == "grid": + N = Grid(args.node_count_on_edge) + total_nodes = args.node_count_on_edge ** 2 + elif args.network_type == "star": + N = Star(args.slave_count, master_is_root=args.master_is_root) + total_nodes = args.slave_count + 1 + + if args.save_tick_plots: + tick_plots_save_path = f"{args.tick_plots_save_dir}/DS_{args.network_type}_{total_nodes}_{ts}/" + os.mkdir(tick_plots_save_path) + + print(f"++ Tick plots will be saved to: {tick_plots_save_path}") + + if args.run_until_termination: + args.simulation_ticks = 10**10 + + assert args.hard_stop_max_tick < args.simulation_ticks + assert args.hard_stop_min_tick > 0 + + hard_stop_on_tick = None + + if args.hard_stop_nodes: + hard_stop_on_tick = [random.randint(args.hard_stop_min_tick, args.hard_stop_max_tick) for _ in range(total_nodes)] + + node_active_ticks_initial = [random.randint(args.node_min_activeness_after_receive, args.node_max_activeness_after_receive) if random.random() <= args.node_initial_activeness_prob else 0 for _ in range(total_nodes)] + alive_nodes = list(range(total_nodes)) + + topo_context = { + "network": N, + "ms_per_tick": args.ms_per_tick, + "simulation_ticks": args.simulation_ticks, + "initial_liveness": node_active_ticks_initial, + "communication_on_active_prob": args.node_activeness_communication_prob, + "min_activeness_after_receive": args.node_min_activeness_after_receive, + "max_activeness_after_receive": args.node_max_activeness_after_receive, + "node_package_process_per_tick": args.node_package_process_per_tick, + "passiveness_death_thresh": args.passiveness_death_thresh, + "hard_stop_on_tick": hard_stop_on_tick, + "alive_nodes": alive_nodes, + "only_root_alive_initially": args.only_root_alive_initially + } + + print(topo_context) + + topo = Topology() + topo.construct_from_graph(N.G, DijkstraScholtenAdHocNode, P2PFIFOPerfectChannel, context=topo_context) + + topo.start() + + stats = { + "df": pd.DataFrame(data={ + "dead_nodes": [], + "active_nodes": [], + "packets_in_transmit": [], + "queued_packets": [], + "control_packets_sent": [], + "control_basic_instant_ratio": [], + "control_total_cumulative_ratio": [], + }), + "terminated_on_tick": None, + "announced_on_tick": None + } + + graphs = [] + + fig, axes = plt.subplots(2, 3) + fig.set_figwidth(25) + fig.set_figheight(10) + # fig.tight_layout() + + input("\n>>> Proceed ?") + + term_wait_ctr = 0 + reason = None + + try: + for t in range(1, args.simulation_ticks + 1): + print(f"[S] Tick: {t}") + + packages_sent = 0 + packages_waiting_on_queue = 0 + num_nodes_active = 0 + num_dead_nodes = 0 + break_this_tick = False + control_packets_sent = 0 + + T = nx.Graph() + node_color = [] + + for index, node in topo.nodes.items(): + new_state, pkg_sent_to_friend, cps = node.simulation_tick() + + if N.root == index: + node_color.append("red") + elif new_state == DSAHCNodeSimulationStatus.ACTIVE: + node_color.append("green") + elif new_state == DSAHCNodeSimulationStatus.PASSIVE: + node_color.append("mediumslateblue") + elif new_state == DSAHCNodeSimulationStatus.OUT_OF_TREE: + node_color.append("gray") + + if index not in T.nodes(): + T.add_node(index) + + if node.parent_node is not None: + if node.parent_node not in T.nodes(): + T.add_node(node.parent_node) + + T.add_edge(index, node.parent_node) + + if index == N.root and new_state is None: + reason = f"root terminated ({t})" + break_this_tick = True + + if new_state == DSAHCNodeSimulationStatus.ACTIVE: + num_nodes_active += 1 + elif new_state == DSAHCNodeSimulationStatus.OUT_OF_TREE: + num_dead_nodes += 1 + + if pkg_sent_to_friend is not None: + packages_sent += 1 + + control_packets_sent += cps + packages_waiting_on_queue += node.waiting_packages_on_queue + + print(f" (ACTIVE: {num_nodes_active}, PKGS-WAIT: {packages_waiting_on_queue}, PKGS-SENT: {packages_sent})") + + if (packages_waiting_on_queue == 0 and num_nodes_active == 0 and packages_sent == 0): + if stats["terminated_on_tick"] is None: + stats["terminated_on_tick"] = t + + print("!!! TERMINATED !!!") + + term_wait_ctr += 1 + + if args.wait_ticks_after_termination > 0 and term_wait_ctr > args.wait_ticks_after_termination: + print("!!! FORCE TERMINATED !!!") + reason = f"forced ({args.wait_ticks_after_termination})" + break_this_tick = True + + if args.exit_on_termination: + break_this_tick = True + + total_pkgs_sent_cum = (stats["df"]["control_packets_sent"].sum() + control_packets_sent + stats["df"]["packets_in_transmit"].sum() + packages_sent) + + stats["df"].loc[t-1] = [ + num_dead_nodes, + num_nodes_active, + packages_sent, + packages_waiting_on_queue, + control_packets_sent, + (control_packets_sent / packages_sent) if packages_sent > 0 else 0, # TODO: Fix later, find a better soln,,, + (((stats["df"]["control_packets_sent"].sum() + control_packets_sent) / total_pkgs_sent_cum) if total_pkgs_sent_cum > 0 else 0) * 100 + ] + + graphs.append({ + "edges": T.edges(), + "nodes": T.nodes(), + }) + + if not args.no_realtime_plot or args.save_tick_plots: + axes[0][0].cla() + axes[0][1].cla() + axes[0][2].cla() + axes[1][0].cla() + axes[1][1].cla() + axes[1][2].cla() + + axes[0][0].set_title("Active/Dead Nodes") + axes[0][0].set_xlabel(f"Simulation Ticks ({args.ms_per_tick}ms/tick)") + axes[0][0].set_ylabel(f"Node Count") + + axes[0][1].set_title("Packets in Transmit") + axes[0][1].set_xlabel(f"Simulation Ticks ({args.ms_per_tick}ms/tick)") + axes[0][1].set_ylabel(f"Packet Count") + + axes[1][0].set_title("Control Packet Ratio") + axes[1][0].set_xlabel(f"CPR ({args.ms_per_tick}ms/tick)") + axes[1][0].set_ylabel(f"CPR (control packets/total packets)") + + axes[1][1].set_title("Instant Control Packet Ratio") + axes[1][1].set_xlabel(f"Simulation Ticks ({args.ms_per_tick}ms/tick)") + axes[1][1].set_ylabel(f"CPR (control packets/total packets)") + + sns.lineplot(data=stats["df"]["active_nodes"], ax=axes[0][0], color="orange", legend='brief', label="Active nodes") + sns.lineplot(data=stats["df"]["dead_nodes"], ax=axes[0][0], color="blue", legend='brief', label="Dead nodes") + sns.lineplot(data=stats["df"]["packets_in_transmit"], ax=axes[0][1], color="purple", legend='brief', label="Basic packets") + sns.lineplot(data=stats["df"]["control_packets_sent"], ax=axes[0][1], color="green", legend='brief', label="Control packets") + sns.lineplot(data=stats["df"]["control_total_cumulative_ratio"], ax=axes[1][0], color="mediumslateblue") + sns.lineplot(data=stats["df"]["control_basic_instant_ratio"], ax=axes[1][1], color="red") + + nx.draw(T, ax=axes[0][2], with_labels=True, node_color=node_color) + + if args.network_type == "grid": + pos = {i: (i // args.node_count_on_edge, i % args.node_count_on_edge) for i in range(total_nodes)} + nx.draw(N.G, ax=axes[1][2], with_labels=True, node_color=node_color, pos=pos) + else: + nx.draw(N.G, ax=axes[1][2], with_labels=True, node_color=node_color) + + plt.pause(0.0005) + + if args.save_tick_plots: + plt.savefig(tick_plots_save_path + f"{str(t).zfill(3)}.png", dpi=160) + + if break_this_tick: + break + + time.sleep(args.ms_per_tick / 1000) + except KeyboardInterrupt: + pass + + axes[0][0].cla() + axes[0][1].cla() + axes[0][2].cla() + axes[1][0].cla() + axes[1][1].cla() + axes[1][2].cla() + + sns.lineplot(data=stats["df"]["active_nodes"], ax=axes[0][0], color="orange") + sns.lineplot(data=stats["df"]["dead_nodes"], ax=axes[0][0], color="blue") + sns.lineplot(data=stats["df"]["packets_in_transmit"], ax=axes[0][1], color="purple") + sns.lineplot(data=stats["df"]["control_packets_sent"], ax=axes[0][1], color="green") + sns.lineplot(data=stats["df"]["control_total_cumulative_ratio"], ax=axes[1][0], color="mediumslateblue") + sns.lineplot(data=stats["df"]["control_basic_instant_ratio"], ax=axes[1][1], color="red") + + nx.draw(T, ax=axes[0][2], with_labels=True, node_color=node_color) + + if args.network_type == "grid": + pos = {i: (i // args.node_count_on_edge, i % args.node_count_on_edge) for i in range(total_nodes)} + nx.draw(N.G, ax=axes[1][2], with_labels=True, node_color=node_color, pos=pos) + else: + nx.draw(N.G, ax=axes[1][2], with_labels=True, node_color=node_color) + + plt.savefig(f"simdump/DS_stats_{args.network_type}_{total_nodes}_{ts}.png", dpi=200) + + with open(f"simdump/DS_run_{args.network_type}_{total_nodes}_{ts}.pkl", "wb") as fp: + pickle.dump({ + "args": args, + "context": topo_context, + "stats": stats, + "graphs": graphs + }, fp) + + if not args.no_realtime_plot: + plt.show() + + print(f"\n{reason} [{t - stats['terminated_on_tick'] if stats['terminated_on_tick'] is not None else None}]") + + if args.generate_gif: + make_gif(f"{tick_plots_save_path}/*.png", tick_plots_save_path, "animation.gif", t * 20) \ No newline at end of file diff --git a/simulator_sf.py b/simulator_sf.py new file mode 100644 index 0000000000000000000000000000000000000000..163e6f4993f5f6a9c9f0c8e50d640c92ca638e10 --- /dev/null +++ b/simulator_sf.py @@ -0,0 +1,257 @@ +import os +import sys +import time +import pickle +import random +from typing import Text +import pandas as pd +import seaborn as sns +import networkx as nx +import matplotlib.pyplot as plt +from datetime import datetime as dt +from networkx.drawing.nx_pydot import graphviz_layout + +from Ahc import Topology +from graph import ERG, Grid, Star +from shavit_francez import ShavitFrancezAdHocNode, SFAHCNodeSimulationStatus +from Channels import P2PFIFOPerfectChannel + +def run_shavit_francez_simulation(args): + if args.network_type == "erg": + N = ERG(args.node_count, args.node_connectivity) + total_nodes = args.node_count + elif args.network_type == "grid": + N = Grid(args.node_count_on_edge) + total_nodes = args.node_count_on_edge ** 2 + elif args.network_type == "star": + N = Star(args.slave_count, master_is_root=args.master_is_root) + total_nodes = args.slave_count + 1 + + if args.run_until_termination: + args.simulation_ticks = 10**10 + + assert args.hard_stop_max_tick < args.simulation_ticks + assert args.hard_stop_min_tick > 0 + + hard_stop_on_tick = None + + if args.hard_stop_nodes: + hard_stop_on_tick = [random.randint(args.hard_stop_min_tick, args.hard_stop_max_tick) for _ in range(total_nodes)] + + node_active_ticks_initial = [random.randint(args.node_min_activeness_after_receive, args.node_max_activeness_after_receive) if random.random() <= args.node_initial_activeness_prob else 0 for _ in range(total_nodes)] + alive_nodes = list(range(total_nodes)) + + topo_context = { + "network": N, + "ms_per_tick": args.ms_per_tick, + "simulation_ticks": args.simulation_ticks, + "initial_liveness": node_active_ticks_initial, + "communication_on_active_prob": args.node_activeness_communication_prob, + "min_activeness_after_receive": args.node_min_activeness_after_receive, + "max_activeness_after_receive": args.node_max_activeness_after_receive, + "node_package_process_per_tick": args.node_package_process_per_tick, + "passiveness_death_thresh": args.passiveness_death_thresh, + "hard_stop_on_tick": hard_stop_on_tick, + "alive_nodes": alive_nodes, + "only_root_alive_initially": args.only_root_alive_initially + } + + print(topo_context) + # N.plot() + # plt.show() + + topo = Topology() + topo.construct_from_graph(N.G, ShavitFrancezAdHocNode, P2PFIFOPerfectChannel, context=topo_context) + + topo.start() + + stats = { + "df": pd.DataFrame(data={ + "dead_nodes": [], + "active_nodes": [], + "packets_in_transmit": [], + "queued_packets": [], + "control_packets_sent": [], + "control_basic_instant_ratio": [], + "control_total_cumulative_ratio": [], + "wave_packets_sent": [], + "wave_basic_instant_ratio": [], + "wave_total_cumulative_ratio": [], + "wave_control_cumulative_ratio": [] + }), + "terminated_on_tick": None, + "announced_on_tick": None + } + + fig, axes = plt.subplots(2, 3) + fig.set_figwidth(25) + fig.set_figheight(10) + # fig.tight_layout() + + input("\n>>> Proceed ?") + + term_wait_ctr = 0 + reason = None + wave_finisher = None + + try: + for t in range(1, args.simulation_ticks + 1): + print(f"[S] Tick: {t}") + + packages_sent = 0 + packages_waiting_on_queue = 0 + num_nodes_active = 0 + num_dead_nodes = 0 + break_this_tick = False + control_packets_sent = 0 + wave_packets_sent = 0 + + node_color = [] + + for index, node in topo.nodes.items(): + new_state, pkg_sent_to_friend, cps, wps = node.simulation_tick() + + if N.root == index and new_state != SFAHCNodeSimulationStatus.OUT_OF_TREE: + node_color.append("red") + elif N.root == index and new_state == SFAHCNodeSimulationStatus.OUT_OF_TREE: + node_color.append("orange") + elif new_state == SFAHCNodeSimulationStatus.ACTIVE: + node_color.append("green") + elif new_state == SFAHCNodeSimulationStatus.PASSIVE: + node_color.append("mediumslateblue") + elif new_state == SFAHCNodeSimulationStatus.OUT_OF_TREE: + node_color.append("gray") + else: # is None... + # It's the wave finisher!!! + node_color.append("yellow") + wave_finisher = index + + if new_state is None: + reason = f"wave terminated ({t}) ({wave_finisher})" + break_this_tick = True + + if new_state == SFAHCNodeSimulationStatus.ACTIVE: + num_nodes_active += 1 + elif new_state == SFAHCNodeSimulationStatus.OUT_OF_TREE: + num_dead_nodes += 1 + + if pkg_sent_to_friend is not None: + packages_sent += 1 + + control_packets_sent += cps + wave_packets_sent += wps + packages_waiting_on_queue += node.waiting_packages_on_queue + + print(f" (ACTIVE: {num_nodes_active}, PKGS-WAIT: {packages_waiting_on_queue}, PKGS-SENT: {packages_sent})") + + if (packages_waiting_on_queue == 0 and num_nodes_active == 0 and packages_sent == 0): + if stats["terminated_on_tick"] is None: + stats["terminated_on_tick"] = t + + print("!!! TERMINATED !!!") + + term_wait_ctr += 1 + + if args.wait_ticks_after_termination > 0 and term_wait_ctr > args.wait_ticks_after_termination: + print("!!! FORCE TERMINATED !!!") + reason = f"forced ({args.wait_ticks_after_termination})" + break_this_tick = True + + if args.exit_on_termination: + break_this_tick = True + + control_pkgs_sent_cum = stats["df"]["control_packets_sent"].sum() + control_packets_sent + total_pkgs_sent_cum = (stats["df"]["control_packets_sent"].sum() + control_packets_sent + stats["df"]["packets_in_transmit"].sum() + packages_sent + stats["df"]["wave_packets_sent"].sum() + wave_packets_sent) + + stats["df"].loc[t-1] = [ + num_dead_nodes, + num_nodes_active, + packages_sent, + packages_waiting_on_queue, + control_packets_sent, + (control_packets_sent / packages_sent) if packages_sent > 0 else 0, # TODO: Fix later, find a better soln,,, + (((stats["df"]["control_packets_sent"].sum() + control_packets_sent) / total_pkgs_sent_cum) if total_pkgs_sent_cum > 0 else 0) * 100, + wave_packets_sent, + (wave_packets_sent / packages_sent) if packages_sent > 0 else 0, # TODO: Fix later, find a better soln,,, + (((stats["df"]["wave_packets_sent"].sum() + wave_packets_sent) / total_pkgs_sent_cum) if total_pkgs_sent_cum > 0 else 0) * 100, + (((stats["df"]["wave_packets_sent"].sum() + wave_packets_sent) / control_pkgs_sent_cum) if control_pkgs_sent_cum > 0 else 0) * 100, + ] + + # axes.scatter(x=t, y=num_nodes_active) + + if not args.no_realtime_plot: + axes[0][0].cla() + axes[0][1].cla() + axes[0][2].cla() + axes[1][0].cla() + axes[1][1].cla() + axes[1][2].cla() + + sns.lineplot(data=stats["df"]["active_nodes"], ax=axes[0][0], color="orange") + sns.lineplot(data=stats["df"]["dead_nodes"], ax=axes[0][0], color="blue") + sns.lineplot(data=stats["df"]["packets_in_transmit"], ax=axes[0][1], color="purple") + sns.lineplot(data=stats["df"]["control_packets_sent"], ax=axes[0][1], color="green") + sns.lineplot(data=stats["df"]["wave_packets_sent"], ax=axes[0][1], color="blue") + sns.lineplot(data=stats["df"]["control_total_cumulative_ratio"], ax=axes[1][0], color="mediumslateblue") + sns.lineplot(data=stats["df"]["wave_total_cumulative_ratio"], ax=axes[1][0], color="yellow") + sns.lineplot(data=stats["df"]["control_basic_instant_ratio"], ax=axes[1][1], color="red") + sns.lineplot(data=stats["df"]["wave_basic_instant_ratio"], ax=axes[1][1], color="orange") + sns.lineplot(data=stats["df"]["wave_control_cumulative_ratio"], ax=axes[0][2], color="mediumslateblue") + + if args.network_type == "grid": + pos = {i: (i // args.node_count_on_edge, i % args.node_count_on_edge) for i in range(total_nodes)} + nx.draw(N.G, ax=axes[1][2], with_labels=True, node_color=node_color, pos=pos) + else: + nx.draw(N.G, ax=axes[1][2], with_labels=True, node_color=node_color) + + plt.pause(0.0005) + + if break_this_tick: + break + + time.sleep(args.ms_per_tick / 1000) + except KeyboardInterrupt: + pass + + ts = dt.now().timestamp() + + if args.no_realtime_plot: + axes[0][0].cla() + axes[0][1].cla() + axes[0][2].cla() + axes[1][0].cla() + axes[1][1].cla() + axes[1][2].cla() + + sns.lineplot(data=stats["df"]["active_nodes"], ax=axes[0][0], color="orange") + sns.lineplot(data=stats["df"]["dead_nodes"], ax=axes[0][0], color="blue") + sns.lineplot(data=stats["df"]["packets_in_transmit"], ax=axes[0][1], color="purple") + sns.lineplot(data=stats["df"]["control_packets_sent"], ax=axes[0][1], color="green") + sns.lineplot(data=stats["df"]["wave_packets_sent"], ax=axes[0][1], color="blue") + sns.lineplot(data=stats["df"]["control_total_cumulative_ratio"], ax=axes[1][0], color="mediumslateblue") + sns.lineplot(data=stats["df"]["wave_total_cumulative_ratio"], ax=axes[1][0], color="yellow") + sns.lineplot(data=stats["df"]["control_basic_instant_ratio"], ax=axes[1][1], color="red") + sns.lineplot(data=stats["df"]["wave_basic_instant_ratio"], ax=axes[1][1], color="orange") + sns.lineplot(data=stats["df"]["wave_control_cumulative_ratio"], ax=axes[0][2], color="mediumslateblue") + + if args.network_type == "grid": + pos = {i: (i // args.node_count_on_edge, i % args.node_count_on_edge) for i in range(total_nodes)} + nx.draw(N.G, ax=axes[1][2], with_labels=True, node_color=node_color, pos=pos) + else: + nx.draw(N.G, ax=axes[1][2], with_labels=True, node_color=node_color) + + plt.pause(0.0005) + + plt.savefig(f"simdump/SF_stats_{args.network_type}_{total_nodes}_{ts}.png", dpi=200) + + with open(f"simdump/SF_run_{args.network_type}_{total_nodes}_{ts}.pkl", "wb") as fp: + pickle.dump({ + "args": args, + "context": topo_context, + "stats": stats + }, fp) + + if not args.no_realtime_plot: + plt.show() + + print(f"\n{reason} [{t - stats['terminated_on_tick'] if stats['terminated_on_tick'] is not None else None}]")