From 9d856b62a9a486b11cb6302c8a31305be96a79d8 Mon Sep 17 00:00:00 2001 From: Yigit Sever Date: Tue, 25 May 2021 21:05:35 +0300 Subject: [PATCH 01/10] Add Itai Rodeh Algorithm Itai Rodeh is an election algorithm for anonymous networks that work in directed ring topologies. --- AnonymousNetworks/ItaiRodeh.py | 222 +++++++++++++++++++++++++++++++++ 1 file changed, 222 insertions(+) create mode 100644 AnonymousNetworks/ItaiRodeh.py diff --git a/AnonymousNetworks/ItaiRodeh.py b/AnonymousNetworks/ItaiRodeh.py new file mode 100644 index 0000000..de1cfc0 --- /dev/null +++ b/AnonymousNetworks/ItaiRodeh.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python + +""" +Implementation of the "Itai-Rodeh Election Algorithm for Rings" as +described in the textbook "Fokkink, Wan. Distributed algorithms: an intuitive +approach. MIT Press, 2018." with additional help from the paper "W. Fokkink and +J. Pang, Simplifying Itai-Rodeh Leader Election for Anonymous Rings. 2004." +""" + +__author__ = "Yigit Sever" +__contact__ = "yigit@yigitsever.com" +__copyright__ = "Copyright 2021, WINSLAB" +__credits__ = ["Yigit Sever"] +__date__ = "2021-05-24" +__deprecated__ = False +__email__ = "yigit@yigitsever.com" +__license__ = "GPLv3" +__maintainer__ = "developer" +__status__ = "Production" +__version__ = "0.0.1" + +from enum import Enum +from random import randint + +from Ahc import (ComponentModel, ComponentRegistry, Event, EventTypes, + GenericMessage, GenericMessageHeader, GenericMessagePayload, + Topology) + + +class State(Enum): + """ + State of the nodes, one from {active, passive, leader} + - "active" nodes are initiators, attempting to become a leader + - "passive" nodes have selected smaller id's than "active" nodes and cannot + compete for leadership anymore + - "leader" node has won an election round, only one such node should be + present in the network + """ + + active = 1 + passive = 2 + leader = 3 + + +class ItaiRodehMessageHeader(GenericMessageHeader): + def __init__( + self, + messagefrom, + messageto, + messagetype="Itai-Rodeh Message", + nexthop=float("inf"), + interfaceid=float("inf"), + sequencenumber=-1, + ): + super().__init__( + messagetype, messagefrom, messageto, nexthop, interfaceid, sequencenumber + ) + + +class ItaiRodehMessagePayload(GenericMessagePayload): + """ + Itai-Rodeh Algorithm uses messages with 4 fields (using textbook's + terminology here): + + - id_p (i): The (random) id the process has chosen for itself for the + current round + - election_round (n'): The current election round, old messages are + silently dismissed + - hop_count (h): Used so that the originating node can recognize their own + message + - dirty_bit (b): For resolving ties, nodes that picked this (highest) id + will still be active next round + """ + + def __init__(self, election_round, id_p): + self.election_round = election_round + self.id_p = id_p + self.hop_count = 1 + # disparity between the paper & textbook, following textbook here + # mnemonic = bit is not dirty, we can still be the leader + self.dirty_bit = False + + +class ItaiRodehNode(ComponentModel): + """ + Node in a system that uses Itai-Rodeh algorithm + Each process has three parameters: + - id_: 1 <= i <= N where N is the ring size + - state: from the State enum, active nodes participate in the election, + passive nodes pass messages around, leader is selected at the end of the + election cycle + - round: current election round, starts at 1 + """ + + ring_size = 0 + + def __init__(self, component_name, component_id): + super().__init__(component_name, component_id) + + """ The anonymous id the node will select for the round """ + self.id_ = 0 + + """ Initially, all processes are active """ + self.state = State.active + + """ Initialized at round 0 """ + self.election_round = 0 + + def send_election_packet(self): + + header = ItaiRodehMessageHeader( + messagefrom=self.componentinstancenumber, + messageto=self.next_hop, + nexthop=self.next_hop, + messagetype="ItaiRodeh Message", + interfaceid=self.next_hop_interface_id, + ) + payload = ItaiRodehMessagePayload(self.election_round, self.id_) + + message = GenericMessage(header, payload) + self.send_down(Event(self, EventTypes.MFRT, message)) + + def pass_packet_along(self, message): + """For passive processes + + :message: the whole Event, with eventcontent which includes header and + payload + + """ + pass + + def on_init(self, eventobj: Event): + # Select an id for round 1 + self.id_ = randint(1, self.ring_size) + print(f"{self.componentinstancenumber} selected {self.id_} as their ID") + + # Calculate the neighbour, we're on a directed ring + self.neighbour_id = (int(self.componentinstancenumber) + 1) % self.ring_size + # print(f"The neighbour of {self.componentinstancenumber} is {self.neighbour_id}") + + self.next_hop = Topology().get_next_hop( + self.componentinstancenumber, self.neighbour_id + ) + + # print(f"{self.componentinstancenumber} calculated next hop as {self.next_hop}") + + self.next_hop_interface_id = f"{self.componentinstancenumber}-{self.next_hop}" + + self.send_election_packet() + + def on_message_from_bottom(self, eventobj: Event): + """ New message from the link layer """ + payload: ItaiRodehMessagePayload = eventobj.eventcontent.payload + header: ItaiRodehMessageHeader = eventobj.eventcontent.header + print( + f"New message from bottom on {self.componentinstancenumber}: {payload.id_p} it's coming from {header.messagefrom}" + ) + + given_round_id = str(payload.election_round) + str(payload.id_p) + our_round_id = str(self.election_round) + str(self.id_) + + if self.state == State.passive: + # passive node, pass the message on to the next hop, increase the + # 'hop_count' of packet by one + payload.hop_count += 1 + header.messageto = self.next_hop + header.next_hop = self.next_hop + header.interfaceid = self.next_hop_interface_id + + message = GenericMessage(header, payload) + self.send_down(Event(self, EventTypes.MFRT, message)) + elif self.state == State.active: + print(f"πŸ€– {self.componentinstancenumber} is ACTIVE: doing my part πŸ‘·") + # active participant, has stuff to do + if payload.hop_count == self.ring_size: + print( + f"πŸ€– {self.componentinstancenumber}'s message traversed all the way around" + ) + # the message that we sent traversed all the way around the ring + if payload.dirty_bit: + # Bit has been dirtied, next round + self.id_ = randint(1, self.ring_size) + self.election_round += 1 + self.send_election_packet() + else: + # The bit is still false, this node is the leader + self.state = State.leader + print(f"πŸ€– {self.componentinstancenumber}: I'M THE ELECTED LEADER") + # TODO: can we indicate this with a colour on the graph? + # <25-05-21, yigit> # + elif given_round_id == our_round_id: + print( + f"πŸ€– {self.componentinstancenumber}: this round/ID is the same as mine" + ) + # another node has picked our id, dirty their bit and pass it along + payload.dirty_bit = True + payload.hop_count += 1 + + header.messageto = self.next_hop + header.next_hop = self.next_hop + header.interfaceid = self.next_hop_interface_id + + print( + f"πŸ€– {self.componentinstancenumber} dirtied the bit, passing it along" + ) + message = GenericMessage(header, payload) + self.send_down(Event(self, EventTypes.MFRT, message)) + elif given_round_id > our_round_id: + # Another node has picked a higher id then us, going passive + self.state = State.passive + + payload.hop_count += 1 + + header.messageto = self.next_hop + header.next_hop = self.next_hop + header.interfaceid = self.next_hop_interface_id + + message = GenericMessage(header, payload) + self.send_down(Event(self, EventTypes.MFRT, message)) + else: # given_round_id < our_round_id + # dismiss the message + pass -- GitLab From df0cfeed882de68b0d443ef61721c90361766921 Mon Sep 17 00:00:00 2001 From: Yigit Sever Date: Tue, 25 May 2021 21:50:20 +0300 Subject: [PATCH 02/10] Bugfix Using election round || id_p with lexicographic ordering was wrong (5 > 13), idea was from the paper, regular integer comparison is used now --- AnonymousNetworks/ItaiRodeh.py | 146 +++++++++++++++++++++------------ 1 file changed, 93 insertions(+), 53 deletions(-) diff --git a/AnonymousNetworks/ItaiRodeh.py b/AnonymousNetworks/ItaiRodeh.py index de1cfc0..455d200 100644 --- a/AnonymousNetworks/ItaiRodeh.py +++ b/AnonymousNetworks/ItaiRodeh.py @@ -22,9 +22,8 @@ __version__ = "0.0.1" from enum import Enum from random import randint -from Ahc import (ComponentModel, ComponentRegistry, Event, EventTypes, - GenericMessage, GenericMessageHeader, GenericMessagePayload, - Topology) +from Ahc import (ComponentModel, Event, EventTypes, GenericMessage, + GenericMessageHeader, GenericMessagePayload, Topology) class State(Enum): @@ -85,7 +84,7 @@ class ItaiRodehNode(ComponentModel): """ Node in a system that uses Itai-Rodeh algorithm Each process has three parameters: - - id_: 1 <= i <= N where N is the ring size + - id_p: 1 <= i <= N where N is the ring size - state: from the State enum, active nodes participate in the election, passive nodes pass messages around, leader is selected at the end of the election cycle @@ -98,7 +97,7 @@ class ItaiRodehNode(ComponentModel): super().__init__(component_name, component_id) """ The anonymous id the node will select for the round """ - self.id_ = 0 + self.id_p = 0 """ Initially, all processes are active """ self.state = State.active @@ -115,7 +114,7 @@ class ItaiRodehNode(ComponentModel): messagetype="ItaiRodeh Message", interfaceid=self.next_hop_interface_id, ) - payload = ItaiRodehMessagePayload(self.election_round, self.id_) + payload = ItaiRodehMessagePayload(self.election_round, self.id_p) message = GenericMessage(header, payload) self.send_down(Event(self, EventTypes.MFRT, message)) @@ -131,19 +130,19 @@ class ItaiRodehNode(ComponentModel): def on_init(self, eventobj: Event): # Select an id for round 1 - self.id_ = randint(1, self.ring_size) - print(f"{self.componentinstancenumber} selected {self.id_} as their ID") + self.id_p = randint(1, self.ring_size) + print( + f"πŸ€– {self.componentinstancenumber} selected {self.id_p} as their" + f" ID for round {self.election_round}" + ) # Calculate the neighbour, we're on a directed ring self.neighbour_id = (int(self.componentinstancenumber) + 1) % self.ring_size - # print(f"The neighbour of {self.componentinstancenumber} is {self.neighbour_id}") self.next_hop = Topology().get_next_hop( self.componentinstancenumber, self.neighbour_id ) - # print(f"{self.componentinstancenumber} calculated next hop as {self.next_hop}") - self.next_hop_interface_id = f"{self.componentinstancenumber}-{self.next_hop}" self.send_election_packet() @@ -152,16 +151,16 @@ class ItaiRodehNode(ComponentModel): """ New message from the link layer """ payload: ItaiRodehMessagePayload = eventobj.eventcontent.payload header: ItaiRodehMessageHeader = eventobj.eventcontent.header - print( - f"New message from bottom on {self.componentinstancenumber}: {payload.id_p} it's coming from {header.messagefrom}" - ) - given_round_id = str(payload.election_round) + str(payload.id_p) - our_round_id = str(self.election_round) + str(self.id_) + message_election_round = payload.election_round + message_assumed_id = payload.id_p + + # For the active node, we are going to follow the if/else chain given + # in the textbook if self.state == State.passive: # passive node, pass the message on to the next hop, increase the - # 'hop_count' of packet by one + # 'hop_count' of packet by one, no other responsibility payload.hop_count += 1 header.messageto = self.next_hop header.next_hop = self.next_hop @@ -170,43 +169,22 @@ class ItaiRodehNode(ComponentModel): message = GenericMessage(header, payload) self.send_down(Event(self, EventTypes.MFRT, message)) elif self.state == State.active: - print(f"πŸ€– {self.componentinstancenumber} is ACTIVE: doing my part πŸ‘·") - # active participant, has stuff to do - if payload.hop_count == self.ring_size: - print( - f"πŸ€– {self.componentinstancenumber}'s message traversed all the way around" - ) - # the message that we sent traversed all the way around the ring - if payload.dirty_bit: - # Bit has been dirtied, next round - self.id_ = randint(1, self.ring_size) - self.election_round += 1 - self.send_election_packet() - else: - # The bit is still false, this node is the leader - self.state = State.leader - print(f"πŸ€– {self.componentinstancenumber}: I'M THE ELECTED LEADER") - # TODO: can we indicate this with a colour on the graph? - # <25-05-21, yigit> # - elif given_round_id == our_round_id: - print( - f"πŸ€– {self.componentinstancenumber}: this round/ID is the same as mine" - ) - # another node has picked our id, dirty their bit and pass it along - payload.dirty_bit = True - payload.hop_count += 1 - header.messageto = self.next_hop - header.next_hop = self.next_hop - header.interfaceid = self.next_hop_interface_id + if message_election_round > self.election_round or ( + message_election_round == self.election_round + and message_assumed_id > self.id_p + ): + # Another node has picked a higher id than this node for the + # current round or this node has received a message from a + # future round, going passive print( - f"πŸ€– {self.componentinstancenumber} dirtied the bit, passing it along" + f"πŸ€– {self.componentinstancenumber} is PASSIVE: " + f"{message_assumed_id} for round {message_election_round} " + f"encountered, this node is at {self.election_round} with " + f"{self.id_p}" ) - message = GenericMessage(header, payload) - self.send_down(Event(self, EventTypes.MFRT, message)) - elif given_round_id > our_round_id: - # Another node has picked a higher id then us, going passive + self.state = State.passive payload.hop_count += 1 @@ -217,6 +195,68 @@ class ItaiRodehNode(ComponentModel): message = GenericMessage(header, payload) self.send_down(Event(self, EventTypes.MFRT, message)) - else: # given_round_id < our_round_id - # dismiss the message - pass + + elif message_election_round < self.election_round or ( + message_election_round == self.election_round + and message_assumed_id < self.id_p + ): + # This node has received a message from a previous round or + # from the current round but with a lower assumed id, so this + # node can dismiss the election attempt of the sender node + print( + f"πŸ€– {self.componentinstancenumber} is dismissing " + f"{message_assumed_id} for round {message_election_round} " + f"this node is at {self.election_round} with {self.id_p}" + ) + return + elif ( + message_election_round == self.election_round + and message_assumed_id == self.id_p + ): + + if payload.hop_count < self.ring_size: + # receiver node is not the initial sender node + # another node has picked our id, dirty their bit and pass it along + payload.dirty_bit = True + payload.hop_count += 1 + + header.messageto = self.next_hop + header.next_hop = self.next_hop + header.interfaceid = self.next_hop_interface_id + + print( + f"πŸ€– {self.componentinstancenumber} dirtied the bit " + f"for {message_assumed_id}, passing it along" + ) + message = GenericMessage(header, payload) + self.send_down(Event(self, EventTypes.MFRT, message)) + + elif payload.hop_count == self.ring_size: + # the message that this node has sent traversed all the way + # around the ring + print( + f"πŸ€– {self.componentinstancenumber}'s message " + f"traversed all the way around" + ) + if payload.dirty_bit: + # Bit has been dirtied, next round + self.id_p = randint(1, self.ring_size) + self.election_round += 1 + print( + f"πŸ€– {self.componentinstancenumber} is moving " + f"onto round {self.election_round}" + ) + print( + f"{self.componentinstancenumber} selected " + f"{self.id_p} as their ID for round " + f"{self.election_round}" + ) + self.send_election_packet() + else: + # The bit is still false, this node is the leader + self.state = State.leader + print( + f"πŸ€– {self.componentinstancenumber}: I'M THE ELECTED LEADER" + ) + # TODO: can we indicate this with a colour on the graph? + # <25-05-21, yigit> # -- GitLab From c4acb9c05aa8ea598957f38ca401be7632dead68 Mon Sep 17 00:00:00 2001 From: Yigit Sever Date: Wed, 26 May 2021 01:08:20 +0300 Subject: [PATCH 03/10] Add Itai Rodeh Tests Itai Rodeh Algorithm is tested with animations that show the election process --- AnonymousNetworks/ItaiRodeh.py | 15 +++- tests/AnonymousNetworks/__init__.py | 0 tests/AnonymousNetworks/testItaiRodeh.py | 100 +++++++++++++++++++++++ 3 files changed, 111 insertions(+), 4 deletions(-) create mode 100644 tests/AnonymousNetworks/__init__.py create mode 100644 tests/AnonymousNetworks/testItaiRodeh.py diff --git a/AnonymousNetworks/ItaiRodeh.py b/AnonymousNetworks/ItaiRodeh.py index 455d200..9ff1f86 100644 --- a/AnonymousNetworks/ItaiRodeh.py +++ b/AnonymousNetworks/ItaiRodeh.py @@ -92,6 +92,9 @@ class ItaiRodehNode(ComponentModel): """ ring_size = 0 + callback = None + draw_delay = None + global_round = 1 def __init__(self, component_name, component_id): super().__init__(component_name, component_id) @@ -103,7 +106,7 @@ class ItaiRodehNode(ComponentModel): self.state = State.active """ Initialized at round 0 """ - self.election_round = 0 + self.election_round = 1 def send_election_packet(self): @@ -186,7 +189,7 @@ class ItaiRodehNode(ComponentModel): ) self.state = State.passive - + self.id_p = " " payload.hop_count += 1 header.messageto = self.next_hop @@ -208,7 +211,7 @@ class ItaiRodehNode(ComponentModel): f"{message_assumed_id} for round {message_election_round} " f"this node is at {self.election_round} with {self.id_p}" ) - return + elif ( message_election_round == self.election_round and message_assumed_id == self.id_p @@ -242,12 +245,13 @@ class ItaiRodehNode(ComponentModel): # Bit has been dirtied, next round self.id_p = randint(1, self.ring_size) self.election_round += 1 + ItaiRodehNode.global_round = self.election_round print( f"πŸ€– {self.componentinstancenumber} is moving " f"onto round {self.election_round}" ) print( - f"{self.componentinstancenumber} selected " + f"πŸ€– {self.componentinstancenumber} selected " f"{self.id_p} as their ID for round " f"{self.election_round}" ) @@ -260,3 +264,6 @@ class ItaiRodehNode(ComponentModel): ) # TODO: can we indicate this with a colour on the graph? # <25-05-21, yigit> # + self.callback.set() + self.draw_delay.wait() + self.draw_delay.clear() diff --git a/tests/AnonymousNetworks/__init__.py b/tests/AnonymousNetworks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/AnonymousNetworks/testItaiRodeh.py b/tests/AnonymousNetworks/testItaiRodeh.py new file mode 100644 index 0000000..fb501a7 --- /dev/null +++ b/tests/AnonymousNetworks/testItaiRodeh.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python + +# the project root must be in PYTHONPATH for imports +# $ export PYTHONPATH=$(pwd); python tests/AnonymousNetworks/testItaiRodeh.py + +import sys +import threading +from math import atan2, cos, radians, sin, sqrt +from time import sleep + +import matplotlib.pyplot as plt +import networkx as nx +from Ahc import Topology +from Channels import P2PFIFOPerfectChannel + +from AnonymousNetworks.ItaiRodeh import ItaiRodehNode, State + +ACTIVE_NODE_COLOUR = "#ff0000" +PASSIVE_NODE_COLOUR = "#e0e0e0" +LEADER_NODE_COLOUR = "#ff00ff" +EDGE_COLOUR = "#1a1c20" + + +FPS = 1 + + +def main(): + n = int(sys.argv[1]) + print(f"Creating a ring with size {n}") + G = nx.cycle_graph(n) + + plt.ion() + fig = plt.figure(num=0) + + topology = Topology() + topology.construct_from_graph(G, ItaiRodehNode, P2PFIFOPerfectChannel) + topology.start() + ItaiRodehNode.ring_size = n + + update = threading.Event() + draw_delay = threading.Event() + ItaiRodehNode.callback = update + ItaiRodehNode.draw_delay = draw_delay + + while True: + update.wait() + assumed_ids = list() + node_colours = list() + font_colours = list() + + G = Topology().G + pos = nx.circular_layout(G, center=(0, 0)) + + for nodeID in Topology().nodes: + node = Topology().nodes[nodeID] + G.nodes[nodeID]["id_p"] = node.id_p + assumed_ids.append(node.id_p) + + if node.state == State.active: + node_colours.append(ACTIVE_NODE_COLOUR) + font_colours.append(ACTIVE_NODE_COLOUR) + elif node.state == State.passive: + node_colours.append(PASSIVE_NODE_COLOUR) + font_colours.append(PASSIVE_NODE_COLOUR) + elif node.state == State.leader: + node_colours.append(LEADER_NODE_COLOUR) + font_colours.append(LEADER_NODE_COLOUR) + + node_id_label_pos = {} + for key in pos: + x, y = pos[key] + theta = atan2(y, x) + radians(75) + d = 0.1 + node_id_label_pos[key] = (x + d * cos(theta), y + d * sin(theta)) + + node_id_labels = nx.get_node_attributes(G, "id_p") + + nx.draw( + G, + pos, + node_color=node_colours, + edge_color=EDGE_COLOUR, + with_labels=False, + font_weight="bold", + ) + + nx.draw_networkx_labels(G, node_id_label_pos, node_id_labels) + + fig.text(0.2, 0.2, f"Round: {ItaiRodehNode.global_round}") + + fig.canvas.draw() + fig.canvas.flush_events() + fig.clear() + update.clear() + draw_delay.set() + sleep(1.0 / FPS) + + +if __name__ == "__main__": + main() -- GitLab From 9d033aaaea4af033da530b26e1383bd1ca4e256a Mon Sep 17 00:00:00 2001 From: Yigit Sever Date: Wed, 26 May 2021 01:11:55 +0300 Subject: [PATCH 04/10] Cleanup --- AnonymousNetworks/ItaiRodeh.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/AnonymousNetworks/ItaiRodeh.py b/AnonymousNetworks/ItaiRodeh.py index 9ff1f86..856a5b0 100644 --- a/AnonymousNetworks/ItaiRodeh.py +++ b/AnonymousNetworks/ItaiRodeh.py @@ -189,6 +189,8 @@ class ItaiRodehNode(ComponentModel): ) self.state = State.passive + # we clear the id_p here to have them *not* show up in the + # animation self.id_p = " " payload.hop_count += 1 @@ -262,8 +264,6 @@ class ItaiRodehNode(ComponentModel): print( f"πŸ€– {self.componentinstancenumber}: I'M THE ELECTED LEADER" ) - # TODO: can we indicate this with a colour on the graph? - # <25-05-21, yigit> # self.callback.set() self.draw_delay.wait() self.draw_delay.clear() -- GitLab From 5a0fcf206454c3777004f8dd4c383a41495579e7 Mon Sep 17 00:00:00 2001 From: Yigit Sever Date: Wed, 26 May 2021 07:09:27 +0300 Subject: [PATCH 05/10] Add IEEE1394 Election (FireWire) Slightly WIP, as the timeout is slightly hacky, otherwise implementation done and test requires some polish & animation --- AnonymousNetworks/IEEE1394.py | 183 ++++++++++++++++++++++++ tests/AnonymousNetworks/testIEEE1394.py | 33 +++++ 2 files changed, 216 insertions(+) create mode 100644 AnonymousNetworks/IEEE1394.py create mode 100644 tests/AnonymousNetworks/testIEEE1394.py diff --git a/AnonymousNetworks/IEEE1394.py b/AnonymousNetworks/IEEE1394.py new file mode 100644 index 0000000..285a1be --- /dev/null +++ b/AnonymousNetworks/IEEE1394.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python + +""" +Implementation of the "IEEE1394 (FireWire)" as described in the textbook +"Fokkink, Wan. Distributed algorithms: an intuitive approach. MIT Press, +2018.", first introduced in " IEEE 1394-1995 - IEEE Standard for a High +Performance Serial Bus" +""" + +__author__ = "Yigit Sever" +__contact__ = "yigit@yigitsever.com" +__copyright__ = "Copyright 2021, WINSLAB" +__credits__ = ["Yigit Sever"] +__date__ = "2021-05-24" +__deprecated__ = False +__email__ = "yigit@yigitsever.com" +__license__ = "GPLv3" +__maintainer__ = "developer" +__status__ = "Production" +__version__ = "0.0.1" + +import random +from enum import Enum +from random import randint +from time import sleep + +from Ahc import (ComponentModel, Event, EventTypes, GenericMessage, + GenericMessageHeader, GenericMessagePayload, Topology) + + +class FireWirePacketType(Enum): + """Two types of FireWire requests: + - Parent Request + - Acknowledgement""" + + PARENT_REQ = "PARENT_REQ" + ACKNOWLEDGEMENT = "ACKNOWLEDGEMENT" + + +class FireWireMessageHeader(GenericMessageHeader): + def __init__( + self, + messagefrom, + messageto, + messagetype="FireWire Message", + nexthop=float("inf"), + interfaceid=float("inf"), + sequencenumber=-1, + ): + super().__init__( + messagetype, messagefrom, messageto, nexthop, interfaceid, sequencenumber + ) + + +class FireWireMessagePayload(GenericMessagePayload): + def __init__(self): + super().__init__(messagepayload="FireWire Message") + + +class FireWireNode(ComponentModel): + + holders = 0 + + def __init__(self, component_name, component_id): + super().__init__(component_name, component_id) + + self.parent = None + self.received = list() + self.neighbours = set() + self.is_leader = False + self.in_root_contention = True + self.is_waiting = False + self.is_terminated = False + + def on_init(self, eventobj: Event): + self.neighbours = set(Topology().get_neighbors(self.componentinstancenumber)) + print(f"the neighbours of {self.componentinstancenumber} is {self.neighbours}") + + self.send_parent_req() + + def send_parent_req(self): + """Send a parent request to the only eligible neighbour node. The + neighbour node should not have sent this node a parent request and the + number of such neighbours of this node should be 1. + """ + # if there is *only* one possible parent then send them a parent request here + + result = self.neighbours - set(self.received) + + if len(result) == 1: + par = result.pop() + self.parent = par + + print( + f"πŸ€– {self.componentinstancenumber} picked {self.parent} as it's parent" + ) + + next_hop_interface_id = f"{self.componentinstancenumber}-{self.parent}" + + header = FireWireMessageHeader( + messagefrom=self.componentinstancenumber, + messageto=self.parent, + nexthop=self.parent, + messagetype=FireWirePacketType.PARENT_REQ, + interfaceid=next_hop_interface_id, + ) + payload = FireWireMessagePayload() + + message = GenericMessage(header, payload) + self.send_down(Event(self, EventTypes.MFRT, message)) + else: + # Cannot send a parent request, more than one possible parent + return + + def root_contention(self): + print(f"πŸ€– {self.componentinstancenumber} is in ROOT CONTENTION") + decision = random.choice([True, False]) + + if decision: + print(f"πŸ€– {self.componentinstancenumber} decides to YIELD") + self.in_root_contention = True + self.send_parent_req() + self.is_waiting = False + else: + FireWireNode.holders += 1 + print( + f"πŸ€– {self.componentinstancenumber} decides to HOLD " + f" ({FireWireNode.holders} holders)" + ) + self.is_waiting = True + + # can only save one of them currently + # we need a proper timeout mechanism then we're done + if FireWireNode.holders == 2: + FireWireNode.holders = 0 + # timeout! + self.root_contention() + + def on_message_from_bottom(self, eventobj: Event): + """ New message from the link layer """ + header: FireWireMessageHeader = eventobj.eventcontent.header + # paylaod is not important for FireWire + + if header.messagetype == FireWirePacketType.PARENT_REQ: + if header.messagefrom is not self.parent: + new_child = header.messagefrom + + self.received.append(new_child) + + next_hop_interface_id = f"{self.componentinstancenumber}-{new_child}" + + header = FireWireMessageHeader( + messagefrom=self.componentinstancenumber, + messageto=new_child, + nexthop=new_child, + messagetype=FireWirePacketType.ACKNOWLEDGEMENT, + interfaceid=next_hop_interface_id, + ) + payload = FireWireMessagePayload() + + ack = GenericMessage(header, payload) + self.send_down(Event(self, EventTypes.MFRT, ack)) + + self.send_parent_req() + elif not self.is_waiting: + self.root_contention() + else: + print(f" πŸ‘‘ {self.componentinstancenumber} is elected as the leader") + self.is_leader = True + self.is_terminated = True + + elif ( + header.messagetype == FireWirePacketType.ACKNOWLEDGEMENT + and header.messagefrom == self.parent + ): + # This node's parent request got acknowledged, the process can + # safely terminate + print( + f"πŸ€– {self.componentinstancenumber} received an ACK " + f" from {header.messagefrom}, terminating" + ) + self.is_terminated = True + return diff --git a/tests/AnonymousNetworks/testIEEE1394.py b/tests/AnonymousNetworks/testIEEE1394.py new file mode 100644 index 0000000..bafb450 --- /dev/null +++ b/tests/AnonymousNetworks/testIEEE1394.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python + +# the project root must be in PYTHONPATH for imports +# $ export PYTHONPATH=$(pwd); python tests/AnonymousNetworks/testItaiRodeh.py + +import sys +import threading +from math import atan2, cos, radians, sin, sqrt +from time import sleep + +import matplotlib.pyplot as plt +import networkx as nx +from Ahc import Topology +from Channels import P2PFIFOPerfectChannel + +from AnonymousNetworks.IEEE1394 import FireWireNode + + +def main(): + n = int(sys.argv[1]) + print(f"Creating a tree with size {n}") + G = nx.random_tree(n) + + topology = Topology() + topology.construct_from_graph(G, FireWireNode, P2PFIFOPerfectChannel) + + topology.start() + topology.plot() + plt.show() + + +if __name__ == "__main__": + main() -- GitLab From cc5059b6bb4f0ce18bd68848836142c8d0aa54de Mon Sep 17 00:00:00 2001 From: Yigit Sever Date: Wed, 26 May 2021 12:55:07 +0300 Subject: [PATCH 06/10] Fix timeout logic Time is now involved in timing out --- AnonymousNetworks/IEEE1394.py | 52 +++++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/AnonymousNetworks/IEEE1394.py b/AnonymousNetworks/IEEE1394.py index 285a1be..5d965be 100644 --- a/AnonymousNetworks/IEEE1394.py +++ b/AnonymousNetworks/IEEE1394.py @@ -19,9 +19,9 @@ __maintainer__ = "developer" __status__ = "Production" __version__ = "0.0.1" +import datetime import random from enum import Enum -from random import randint from time import sleep from Ahc import (ComponentModel, Event, EventTypes, GenericMessage, @@ -35,6 +35,10 @@ class FireWirePacketType(Enum): PARENT_REQ = "PARENT_REQ" ACKNOWLEDGEMENT = "ACKNOWLEDGEMENT" + START_TIMER = "START_TIMER" + CHECK_TIMER = "CHECK_TIMER" + TIMEOUT = "TIMEOUT" + ROOT_CONTENTION = "ROOT_CONTENTION" class FireWireMessageHeader(GenericMessageHeader): @@ -58,11 +62,12 @@ class FireWireMessagePayload(GenericMessagePayload): class FireWireNode(ComponentModel): - - holders = 0 - def __init__(self, component_name, component_id): super().__init__(component_name, component_id) + self.eventhandlers[FireWirePacketType.START_TIMER] = self.on_timer_initialize + self.eventhandlers[FireWirePacketType.CHECK_TIMER] = self.check_timer + self.eventhandlers[FireWirePacketType.TIMEOUT] = self.timeout + self.eventhandlers[FireWirePacketType.ROOT_CONTENTION] = self.root_contention self.parent = None self.received = list() @@ -72,6 +77,9 @@ class FireWireNode(ComponentModel): self.is_waiting = False self.is_terminated = False + self.waiting_since = None + self.timeout_duration = 2 + def on_init(self, eventobj: Event): self.neighbours = set(Topology().get_neighbors(self.componentinstancenumber)) print(f"the neighbours of {self.componentinstancenumber} is {self.neighbours}") @@ -112,7 +120,9 @@ class FireWireNode(ComponentModel): # Cannot send a parent request, more than one possible parent return - def root_contention(self): + def root_contention(self, eventobj: Event): + if self.is_leader: + return print(f"πŸ€– {self.componentinstancenumber} is in ROOT CONTENTION") decision = random.choice([True, False]) @@ -121,20 +131,28 @@ class FireWireNode(ComponentModel): self.in_root_contention = True self.send_parent_req() self.is_waiting = False + self.waiting_since = None else: - FireWireNode.holders += 1 - print( - f"πŸ€– {self.componentinstancenumber} decides to HOLD " - f" ({FireWireNode.holders} holders)" - ) + print(f"πŸ€– {self.componentinstancenumber} decides to HOLD") self.is_waiting = True + self.send_self(Event(self, FireWirePacketType.START_TIMER, "...")) + + def on_timer_initialize(self, eventobj: Event): + start_time = eventobj.time + self.waiting_since = start_time + self.send_self(Event(self, FireWirePacketType.CHECK_TIMER, "...")) + + def check_timer(self, eventobj: Event): + current_time = datetime.datetime.now() + delta = current_time - self.waiting_since + if delta.seconds > self.timeout_duration: + self.send_self(Event(self, FireWirePacketType.TIMEOUT, "...")) + else: + sleep(0.2) + self.send_self(Event(self, FireWirePacketType.CHECK_TIMER, "...")) - # can only save one of them currently - # we need a proper timeout mechanism then we're done - if FireWireNode.holders == 2: - FireWireNode.holders = 0 - # timeout! - self.root_contention() + def timeout(self, eventobj: Event): + self.send_self(Event(self, FireWirePacketType.ROOT_CONTENTION, "...")) def on_message_from_bottom(self, eventobj: Event): """ New message from the link layer """ @@ -163,7 +181,7 @@ class FireWireNode(ComponentModel): self.send_parent_req() elif not self.is_waiting: - self.root_contention() + self.send_self(Event(self, FireWirePacketType.ROOT_CONTENTION, "...")) else: print(f" πŸ‘‘ {self.componentinstancenumber} is elected as the leader") self.is_leader = True -- GitLab From 80e4a39748326ac86709783f3110b618fe15b497 Mon Sep 17 00:00:00 2001 From: Yigit Sever Date: Wed, 26 May 2021 13:25:57 +0300 Subject: [PATCH 07/10] Include IEEE1394 test --- AnonymousNetworks/IEEE1394.py | 19 ++++++++- tests/AnonymousNetworks/testIEEE1394.py | 54 +++++++++++++++++++++++-- 2 files changed, 68 insertions(+), 5 deletions(-) diff --git a/AnonymousNetworks/IEEE1394.py b/AnonymousNetworks/IEEE1394.py index 5d965be..796e634 100644 --- a/AnonymousNetworks/IEEE1394.py +++ b/AnonymousNetworks/IEEE1394.py @@ -62,6 +62,11 @@ class FireWireMessagePayload(GenericMessagePayload): class FireWireNode(ComponentModel): + + # For animation/plotting + callback = None + draw_delay = None + def __init__(self, component_name, component_id): super().__init__(component_name, component_id) self.eventhandlers[FireWirePacketType.START_TIMER] = self.on_timer_initialize @@ -73,7 +78,7 @@ class FireWireNode(ComponentModel): self.received = list() self.neighbours = set() self.is_leader = False - self.in_root_contention = True + self.in_root_contention = False self.is_waiting = False self.is_terminated = False @@ -81,6 +86,7 @@ class FireWireNode(ComponentModel): self.timeout_duration = 2 def on_init(self, eventobj: Event): + sleep(1) self.neighbours = set(Topology().get_neighbors(self.componentinstancenumber)) print(f"the neighbours of {self.componentinstancenumber} is {self.neighbours}") @@ -135,8 +141,13 @@ class FireWireNode(ComponentModel): else: print(f"πŸ€– {self.componentinstancenumber} decides to HOLD") self.is_waiting = True + self.in_root_contention = False self.send_self(Event(self, FireWirePacketType.START_TIMER, "...")) + # self.callback.set() + # self.draw_delay.wait() + # self.draw_delay.clear() + def on_timer_initialize(self, eventobj: Event): start_time = eventobj.time self.waiting_since = start_time @@ -198,4 +209,8 @@ class FireWireNode(ComponentModel): f" from {header.messagefrom}, terminating" ) self.is_terminated = True - return + self.in_root_contention = False + + self.callback.set() + self.draw_delay.wait() + self.draw_delay.clear() diff --git a/tests/AnonymousNetworks/testIEEE1394.py b/tests/AnonymousNetworks/testIEEE1394.py index bafb450..bf4e701 100644 --- a/tests/AnonymousNetworks/testIEEE1394.py +++ b/tests/AnonymousNetworks/testIEEE1394.py @@ -15,18 +15,66 @@ from Channels import P2PFIFOPerfectChannel from AnonymousNetworks.IEEE1394 import FireWireNode +ACTIVE_NODE_COLOUR = "#98971a" +PASSIVE_NODE_COLOUR = "#7c6f64" +LEADER_NODE_COLOUR = "#d79921" +CONTENDING_NODE_COLOUR = "#b16286" +EDGE_COLOUR = "#3c3836" + +FPS = 0.4 + def main(): n = int(sys.argv[1]) print(f"Creating a tree with size {n}") G = nx.random_tree(n) + update = threading.Event() + draw_delay = threading.Event() + plt.ion() + fig = plt.figure(num=0) + topology = Topology() topology.construct_from_graph(G, FireWireNode, P2PFIFOPerfectChannel) - topology.start() - topology.plot() - plt.show() + + FireWireNode.callback = update + FireWireNode.draw_delay = draw_delay + + while True: + update.wait() + node_colours = list() + + G = Topology().G + pos = nx.spectral_layout(G, center=(0, 0)) + + for nodeID in Topology().nodes: + node = Topology().nodes[nodeID] + + if node.is_leader: + node_colours.append(LEADER_NODE_COLOUR) + elif node.in_root_contention: + node_colours.append(CONTENDING_NODE_COLOUR) + elif node.is_terminated: + node_colours.append(PASSIVE_NODE_COLOUR) + elif not node.is_terminated: + node_colours.append(ACTIVE_NODE_COLOUR) + + nx.draw( + G, + pos, + node_color=node_colours, + edge_color=EDGE_COLOUR, + with_labels=True, + font_weight="bold", + ) + + fig.canvas.draw() + fig.canvas.flush_events() + fig.clear() + update.clear() + draw_delay.set() + sleep(1.0 / FPS) if __name__ == "__main__": -- GitLab From cbde4233f6bd39885bbe76a17199ed43e99824d2 Mon Sep 17 00:00:00 2001 From: Yigit Sever Date: Wed, 26 May 2021 13:26:14 +0300 Subject: [PATCH 08/10] Cleanup --- tests/AnonymousNetworks/testItaiRodeh.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/AnonymousNetworks/testItaiRodeh.py b/tests/AnonymousNetworks/testItaiRodeh.py index fb501a7..7a6dc33 100644 --- a/tests/AnonymousNetworks/testItaiRodeh.py +++ b/tests/AnonymousNetworks/testItaiRodeh.py @@ -20,7 +20,6 @@ PASSIVE_NODE_COLOUR = "#e0e0e0" LEADER_NODE_COLOUR = "#ff00ff" EDGE_COLOUR = "#1a1c20" - FPS = 1 @@ -46,7 +45,6 @@ def main(): update.wait() assumed_ids = list() node_colours = list() - font_colours = list() G = Topology().G pos = nx.circular_layout(G, center=(0, 0)) @@ -58,13 +56,10 @@ def main(): if node.state == State.active: node_colours.append(ACTIVE_NODE_COLOUR) - font_colours.append(ACTIVE_NODE_COLOUR) elif node.state == State.passive: node_colours.append(PASSIVE_NODE_COLOUR) - font_colours.append(PASSIVE_NODE_COLOUR) elif node.state == State.leader: node_colours.append(LEADER_NODE_COLOUR) - font_colours.append(LEADER_NODE_COLOUR) node_id_label_pos = {} for key in pos: -- GitLab From 0914f57c76b4c56ca9aa2524229baef7e4616c3b Mon Sep 17 00:00:00 2001 From: Yigit Sever Date: Wed, 26 May 2021 13:28:14 +0300 Subject: [PATCH 09/10] Add myself to authors --- CONTRIBUTORS.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 6b324da..69635ca 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -1,3 +1,4 @@ # Contributors - [Berker AcΔ±r](https://github.com/berkeracir) +- [Yigit Sever](https://github.com/yigitsever) -- GitLab From 68ebb7852fcc6e40ff31a8e927cf2c15fcc52116 Mon Sep 17 00:00:00 2001 From: WINS Laboratory Date: Thu, 26 Aug 2021 14:19:35 +0300 Subject: [PATCH 10/10] Update CONTRIBUTORS.md --- CONTRIBUTORS.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 69635ca..378cacc 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -1,4 +1,11 @@ # Contributors - [Berker AcΔ±r](https://github.com/berkeracir) -- [Yigit Sever](https://github.com/yigitsever) +- [Osman Ufuk Yağmur](https://github.com/VengerA) +- [Berke Tezergil](https://github.com/btezergil) +- [Saim Sunel](https://github.com/SaimSUNEL) +- [Ozan AkΔ±n](https://github.com/oznakn) +- [Ilyas Eren Yilmaz](https://github.com/ilyaserenyilmaz) +- [Δ°brahim KoΓ§] +- [Ege UΓ§ak](https://github.com/egeucak) +- [Mahmoud Alasmar](https://github.com/e212602) -- GitLab