From 3fc62dd63ea396972e093912ebcdaa49454d6c85 Mon Sep 17 00:00:00 2001 From: Ege U Date: Sun, 23 May 2021 02:56:51 +0300 Subject: [PATCH 1/4] Wave implementations --- .gitignore | 1 + Ahc.py | 4 ++ Waves/DepthFirstSearch.py | 110 +++++++++++++++++++++++++++++++++++++ Waves/Tarrys.py | 101 ++++++++++++++++++++++++++++++++++ tests/Waves/test_dfs.py | 59 ++++++++++++++++++++ tests/Waves/test_tarrys.py | 60 ++++++++++++++++++++ 6 files changed, 335 insertions(+) create mode 100644 .gitignore create mode 100644 Waves/DepthFirstSearch.py create mode 100644 Waves/Tarrys.py create mode 100644 tests/Waves/test_dfs.py create mode 100644 tests/Waves/test_tarrys.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7e99e36 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.pyc \ No newline at end of file diff --git a/Ahc.py b/Ahc.py index b2d5487..ec4e808 100644 --- a/Ahc.py +++ b/Ahc.py @@ -34,6 +34,7 @@ import datetime import queue from enum import Enum from threading import Thread, Lock +from random import sample import matplotlib.pyplot as plt import networkx as nx @@ -360,3 +361,6 @@ class Topology: plt.draw() print(self.nodecolors) #self.lock.release() + + def get_random_node(self): + return self.nodes[sample(self.G.nodes(), 1)[0]] diff --git a/Waves/DepthFirstSearch.py b/Waves/DepthFirstSearch.py new file mode 100644 index 0000000..d6d31ce --- /dev/null +++ b/Waves/DepthFirstSearch.py @@ -0,0 +1,110 @@ +from random import choice +import time +from enum import Enum +from typing import Any, Dict, List +import uuid + +from Ahc import ComponentModel, Event, GenericMessageHeader, GenericMessagePayload, GenericMessage, Topology, \ + MessageDestinationIdentifiers, EventTypes + +# define your own message types +class DfsMessageTypes(Enum): + FORWARD = "@tarrys/forward" + START = "@tarrys/start" + +# define your own message header structure +class DfsMessageHeader(GenericMessageHeader): + def __init__(self, *args, token, **kwargs): + super().__init__(*args, **kwargs) + self.token = token + +# define your own message payload structure +class DfsMessagePayload(GenericMessagePayload): + pass + +class DfsNeighbor: + def __init__(self, id, invoked): + self.id = id + self.invoked = invoked + +class DfsTraverse(ComponentModel): + def __init__(self, componentname, componentinstancenumber): + super().__init__(componentname, componentinstancenumber) + self.token_neighbor_mapping = {} + self.token_parent_mapping = {} + self.is_parent = False + + def on_message_from_bottom(self, eventobj: Event): + msg = eventobj.eventcontent + hdr = msg.header + message_source = hdr.messagefrom + + payload:List[Any] = msg.payload.messagepayload + + if hdr.messagetype == DfsMessageTypes.FORWARD or hdr.messagetype == DfsMessageTypes.START: + token = hdr.token + if hdr.messagetype == DfsMessageTypes.START: + self.token_parent_mapping[token] = -1 + + parent_for_token = self.token_parent_mapping.get(token, None) + if parent_for_token == None: + self.token_parent_mapping[token] = message_source + parent_for_token = message_source + + message = None + next_target = None + """ + DFS's algorithm has 3 rules. + 1 - A process never forwards the token through the same channel twice. + 2 - A process only forwards the token to its parent when there is no other option. + 3 - When a process receives the token, it immediately sends it back through the same + channel if this is allowed by rules 1 and 2. + """ + uninvoked_neighbors = [n for n in self.get_neighbor_mapping_for_token(token) if n.invoked == False and n.id != parent_for_token and n.id != message_source] + # Send message back if possible + if message_source in [n.id for n in self.get_neighbor_mapping_for_token(token) if n.invoked == False] and message_source != parent_for_token: + neigh = [n for n in self.get_neighbor_mapping_for_token(token) if n.invoked == False and n.id == message_source][0] + neigh.invoked = True + next_target = message_source + elif len(uninvoked_neighbors) > 0: # If true, send to the available neighbor + neigh = choice(uninvoked_neighbors) + neigh.invoked = True + next_target = neigh.id + else: # Else, send the token back to the parent + if parent_for_token == -1: # If I am the initiator, traversing is completed + print(payload) + print("->".join(payload)) + print("TRAVERSING IS COMPLETED IN " + str(len(payload)) + " hops") + print(f"Graph had {Topology().G.number_of_edges()} edges") + print(len(set(payload))) + return + else: + next_target = parent_for_token + + payload.append(str(self.componentinstancenumber)) + message = self.prepare_message(DfsMessageTypes.FORWARD, next_target, token, payload) + self.send_down(Event(self, EventTypes.MFRT, message)) + self.is_parent = True + + def start_traverse(self): + token = self.create_token() + self.send_self(Event(self, EventTypes.MFRB, self.prepare_message(DfsMessageTypes.START, self.componentinstancenumber, token, []))) + + def create_token(self): + return str(uuid.uuid4()) + + def prepare_neighbor_map(self): + neighbor_list = Topology().get_neighbors(self.componentinstancenumber) + return [DfsNeighbor(n, False) for n in neighbor_list] + + def get_neighbor_mapping_for_token(self, token: str) -> List[DfsNeighbor]: + mapping = self.token_neighbor_mapping.get(token) + if mapping == None: + mapping = self.prepare_neighbor_map() + self.token_neighbor_mapping[token] = mapping + return mapping + + def prepare_message(self, message_type: DfsMessageTypes, neighbor: int, token: str, payload:Any = None) -> GenericMessage: + header = DfsMessageHeader(message_type, self.componentinstancenumber, neighbor, neighbor, token=token) + payload = DfsMessagePayload(payload) + return GenericMessage(header, payload) \ No newline at end of file diff --git a/Waves/Tarrys.py b/Waves/Tarrys.py new file mode 100644 index 0000000..a579f94 --- /dev/null +++ b/Waves/Tarrys.py @@ -0,0 +1,101 @@ +from random import choice +import time +from enum import Enum +from typing import Any, List +import uuid + +from Ahc import ComponentModel, Event, GenericMessageHeader, GenericMessagePayload, GenericMessage, Topology, \ + MessageDestinationIdentifiers, EventTypes + +# define your own message types +class WaveMessageTypes(Enum): + FORWARD = "@tarrys/forward" + START = "@tarrys/start" + +# define your own message header structure +class WaveMessageHeader(GenericMessageHeader): + def __init__(self, *args, token, **kwargs): + super().__init__(*args, **kwargs) + self.token = token + +# define your own message payload structure +class WaveMessagePayload(GenericMessagePayload): + pass + +class TarryNeighbor: + def __init__(self, id, invoked): + self.id = id + self.invoked = invoked + +class TarrysTraverse(ComponentModel): + def __init__(self, componentname, componentinstancenumber): + super().__init__(componentname, componentinstancenumber) + self.token_neighbor_mapping = {} + self.token_parent_mapping = {} + + def on_message_from_bottom(self, eventobj: Event): + msg = eventobj.eventcontent + hdr = msg.header + message_source = hdr.messagefrom + + payload:List[Any] = msg.payload.messagepayload + + if hdr.messagetype == WaveMessageTypes.FORWARD or hdr.messagetype == WaveMessageTypes.START: + token = hdr.token + if hdr.messagetype == WaveMessageTypes.START: + self.token_parent_mapping[token] = -1 + + parent_for_token = self.token_parent_mapping.get(token, None) + if parent_for_token == None: + self.token_parent_mapping[token] = message_source + parent_for_token = message_source + + message = None + next_target = None + """ + Tarry's algorithm has 2 rules. + 1 - A process never forwards the token through the same channel twice. + 2 - A process only forwards the token to its parent when there is no other option. + """ + uninvoked_neighbors = [n for n in self.get_neighbor_mapping_for_token(token) if n.invoked == False and n.id != parent_for_token] + if len(uninvoked_neighbors) > 0: # If true, send to the available neighbor + neigh = choice(uninvoked_neighbors) + # neigh = uninvoked_neighbors[0] + neigh.invoked = True + next_target = neigh.id + else: # Else, send the token back to the parent + if parent_for_token == -1: # If I am the initiator, traversing is completed + print("->".join(payload)) + print("TRAVERSING IS COMPLETED IN " + str(len(payload)) + " hops") + print(f"Graph had {Topology().G.number_of_edges()} edges") + print(len(set(payload))) + return + else: + next_target = parent_for_token + payload.append(str(self.componentinstancenumber)) + message = self.prepare_message(WaveMessageTypes.FORWARD, next_target, token, payload) + self.send_down(Event(self, EventTypes.MFRT, message)) + + def start_traverse(self): + token = self.create_token() + self.send_self(Event(self, EventTypes.MFRB, self.prepare_message(WaveMessageTypes.START, self.componentinstancenumber, token, []))) + + def create_token(self): + return str(uuid.uuid4()) + + def prepare_neighbor_map(self): + neighbor_list = Topology().get_neighbors(self.componentinstancenumber) + # return [{"neighbor": n, "invoked": False} for n in neighbor_list] + return [TarryNeighbor(n, False) for n in neighbor_list] + + def get_neighbor_mapping_for_token(self, token: str) -> List[TarryNeighbor]: + mapping = self.token_neighbor_mapping.get(token) + if mapping == None: + mapping = self.prepare_neighbor_map() + self.token_neighbor_mapping[token] = mapping + return mapping + + def prepare_message(self, message_type: WaveMessageTypes, neighbor: int, token: str, payload:str = None ) -> GenericMessage: + header = WaveMessageHeader(message_type, self.componentinstancenumber, neighbor, neighbor, token=token) + payload = WaveMessagePayload(payload) + return GenericMessage(header, payload) \ No newline at end of file diff --git a/tests/Waves/test_dfs.py b/tests/Waves/test_dfs.py new file mode 100644 index 0000000..d6f85bf --- /dev/null +++ b/tests/Waves/test_dfs.py @@ -0,0 +1,59 @@ +import os +import sys +import random +import time + +sys.path.insert(0, os.getcwd()) + +import networkx as nx +import matplotlib.pyplot as plt + +from Ahc import ComponentModel, Event, ConnectorTypes, Topology, EventTypes +from Ahc import ComponentRegistry +from Waves.DepthFirstSearch import DfsTraverse +from Channels import P2PFIFOPerfectChannel +from LinkLayers.GenericLinkLayer import LinkLayer + +registry = ComponentRegistry() + +class AdHocNode(ComponentModel): + def on_message_from_top(self, eventobj: Event): + self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent)) + + def on_message_from_bottom(self, eventobj: Event): + self.send_up(Event(self, EventTypes.MFRB, eventobj.eventcontent)) + + def __init__(self, componentname, componentid): + # SUBCOMPONENTS + self.traverse_service = DfsTraverse("DfsTraverse", componentid) + self.link_layer = LinkLayer("LinkLayer", componentid) + + # CONNECTIONS AMONG SUBCOMPONENTS + self.traverse_service.connect_me_to_component(ConnectorTypes.DOWN, self.link_layer) + self.link_layer.connect_me_to_component(ConnectorTypes.UP, self.traverse_service) + + # Connect the bottom component to the composite component.... + self.link_layer.connect_me_to_component(ConnectorTypes.DOWN, self) + self.connect_me_to_component(ConnectorTypes.UP, self.link_layer) + + super().__init__(componentname, componentid) + +def main(): + G = nx.random_geometric_graph(9, 0.5, seed=5) + nx.draw(G, with_labels=True, font_weight='bold') + plt.draw() + topo = Topology() + topo.construct_from_graph(G, AdHocNode, P2PFIFOPerfectChannel) + + topo.start() + time.sleep(1) + + random.seed(10) + random_node:AdHocNode = topo.get_random_node() + random_node.traverse_service.start_traverse() + plt.show() + + while (True): pass + +if __name__ == "__main__": + main() diff --git a/tests/Waves/test_tarrys.py b/tests/Waves/test_tarrys.py new file mode 100644 index 0000000..f404adc --- /dev/null +++ b/tests/Waves/test_tarrys.py @@ -0,0 +1,60 @@ +import os +import sys +import random +import time + +sys.path.insert(0, os.getcwd()) + +import networkx as nx +import matplotlib.pyplot as plt + +from Ahc import ComponentModel, Event, ConnectorTypes, Topology, EventTypes +from Ahc import ComponentRegistry +from Waves.Tarrys import TarrysTraverse +from Channels import P2PFIFOPerfectChannel +from LinkLayers.GenericLinkLayer import LinkLayer + +registry = ComponentRegistry() + +class AdHocNode(ComponentModel): + def on_message_from_top(self, eventobj: Event): + self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent)) + + def on_message_from_bottom(self, eventobj: Event): + self.send_up(Event(self, EventTypes.MFRB, eventobj.eventcontent)) + + def __init__(self, componentname, componentid): + # SUBCOMPONENTS + self.traverse_service = TarrysTraverse("TarrysTraverse", componentid) + self.link_layer = LinkLayer("LinkLayer", componentid) + + # CONNECTIONS AMONG SUBCOMPONENTS + self.traverse_service.connect_me_to_component(ConnectorTypes.DOWN, self.link_layer) + self.link_layer.connect_me_to_component(ConnectorTypes.UP, self.traverse_service) + + # Connect the bottom component to the composite component.... + self.link_layer.connect_me_to_component(ConnectorTypes.DOWN, self) + self.connect_me_to_component(ConnectorTypes.UP, self.link_layer) + + super().__init__(componentname, componentid) + +def main(): + # G = nx.random_geometric_graph(19, 0.5, seed=5) + G = nx.random_geometric_graph(9, 0.5, seed=5) + nx.draw(G, with_labels=True, font_weight='bold') + plt.draw() + topo = Topology() + topo.construct_from_graph(G, AdHocNode, P2PFIFOPerfectChannel) + + topo.start() + time.sleep(1) + + random.seed(10) + random_node:AdHocNode = topo.get_random_node() + random_node.traverse_service.start_traverse() + plt.show() + + while (True): pass + +if __name__ == "__main__": + main() -- GitLab From 11f5e3ce34fbe3d481142543f1a01a12cbcc922e Mon Sep 17 00:00:00 2001 From: Ege U Date: Sun, 23 May 2021 18:02:43 +0300 Subject: [PATCH 2/4] Added self to contributors list --- CONTRIBUTORS.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 6b324da..7cd672c 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -1,3 +1,4 @@ # Contributors - [Berker Acır](https://github.com/berkeracir) +- [Ege Uçak](https://github.com/egeucak) -- GitLab From d14b425afa72da7735cb5b6073597744a945dca9 Mon Sep 17 00:00:00 2001 From: Ege U Date: Tue, 25 May 2021 18:47:47 +0300 Subject: [PATCH 3/4] Removed an unused variable --- Waves/DepthFirstSearch.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/Waves/DepthFirstSearch.py b/Waves/DepthFirstSearch.py index d6d31ce..6f98f01 100644 --- a/Waves/DepthFirstSearch.py +++ b/Waves/DepthFirstSearch.py @@ -32,7 +32,6 @@ class DfsTraverse(ComponentModel): super().__init__(componentname, componentinstancenumber) self.token_neighbor_mapping = {} self.token_parent_mapping = {} - self.is_parent = False def on_message_from_bottom(self, eventobj: Event): msg = eventobj.eventcontent @@ -84,7 +83,6 @@ class DfsTraverse(ComponentModel): payload.append(str(self.componentinstancenumber)) message = self.prepare_message(DfsMessageTypes.FORWARD, next_target, token, payload) self.send_down(Event(self, EventTypes.MFRT, message)) - self.is_parent = True def start_traverse(self): token = self.create_token() -- GitLab From 350be3f376a1536abb884064e5bb9f54f38d4584 Mon Sep 17 00:00:00 2001 From: WINS Laboratory Date: Thu, 26 Aug 2021 13:53:23 +0300 Subject: [PATCH 4/4] Update CONTRIBUTORS.md --- CONTRIBUTORS.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 7cd672c..3388dcf 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -1,4 +1,9 @@ # Contributors - [Berker Acır](https://github.com/berkeracir) -- [Ege Uçak](https://github.com/egeucak) +- [Osman Ufuk Yağmur](https://github.com/VengerA) +- [Berke Tezergil](https://github.com/btezergil) +- [Saim Sunel](https://github.com/SaimSUNEL) +- [Ozan Akın](https://github.com/oznakn) +- [Ilyas Eren Yilmaz](https://github.com/ilyaserenyilmaz) +- [İbrahim Koç] -- GitLab