diff --git a/AnonymousNetworks/IEEE1394.py b/AnonymousNetworks/IEEE1394.py new file mode 100644 index 0000000000000000000000000000000000000000..796e6347003c4728a08f2a1b8783d99bfb440688 --- /dev/null +++ b/AnonymousNetworks/IEEE1394.py @@ -0,0 +1,216 @@ +#!/usr/bin/env python + +""" +Implementation of the "IEEE1394 (FireWire)" as described in the textbook +"Fokkink, Wan. Distributed algorithms: an intuitive approach. MIT Press, +2018.", first introduced in " IEEE 1394-1995 - IEEE Standard for a High +Performance Serial Bus" +""" + +__author__ = "Yigit Sever" +__contact__ = "yigit@yigitsever.com" +__copyright__ = "Copyright 2021, WINSLAB" +__credits__ = ["Yigit Sever"] +__date__ = "2021-05-24" +__deprecated__ = False +__email__ = "yigit@yigitsever.com" +__license__ = "GPLv3" +__maintainer__ = "developer" +__status__ = "Production" +__version__ = "0.0.1" + +import datetime +import random +from enum import Enum +from time import sleep + +from Ahc import (ComponentModel, Event, EventTypes, GenericMessage, + GenericMessageHeader, GenericMessagePayload, Topology) + + +class FireWirePacketType(Enum): + """Two types of FireWire requests: + - Parent Request + - Acknowledgement""" + + PARENT_REQ = "PARENT_REQ" + ACKNOWLEDGEMENT = "ACKNOWLEDGEMENT" + START_TIMER = "START_TIMER" + CHECK_TIMER = "CHECK_TIMER" + TIMEOUT = "TIMEOUT" + ROOT_CONTENTION = "ROOT_CONTENTION" + + +class FireWireMessageHeader(GenericMessageHeader): + def __init__( + self, + messagefrom, + messageto, + messagetype="FireWire Message", + nexthop=float("inf"), + interfaceid=float("inf"), + sequencenumber=-1, + ): + super().__init__( + messagetype, messagefrom, messageto, nexthop, interfaceid, sequencenumber + ) + + +class FireWireMessagePayload(GenericMessagePayload): + def __init__(self): + super().__init__(messagepayload="FireWire Message") + + +class FireWireNode(ComponentModel): + + # For animation/plotting + callback = None + draw_delay = None + + def __init__(self, component_name, component_id): + super().__init__(component_name, component_id) + self.eventhandlers[FireWirePacketType.START_TIMER] = self.on_timer_initialize + self.eventhandlers[FireWirePacketType.CHECK_TIMER] = self.check_timer + self.eventhandlers[FireWirePacketType.TIMEOUT] = self.timeout + self.eventhandlers[FireWirePacketType.ROOT_CONTENTION] = self.root_contention + + self.parent = None + self.received = list() + self.neighbours = set() + self.is_leader = False + self.in_root_contention = False + self.is_waiting = False + self.is_terminated = False + + self.waiting_since = None + self.timeout_duration = 2 + + def on_init(self, eventobj: Event): + sleep(1) + self.neighbours = set(Topology().get_neighbors(self.componentinstancenumber)) + print(f"the neighbours of {self.componentinstancenumber} is {self.neighbours}") + + self.send_parent_req() + + def send_parent_req(self): + """Send a parent request to the only eligible neighbour node. The + neighbour node should not have sent this node a parent request and the + number of such neighbours of this node should be 1. + """ + # if there is *only* one possible parent then send them a parent request here + + result = self.neighbours - set(self.received) + + if len(result) == 1: + par = result.pop() + self.parent = par + + print( + f"🤖 {self.componentinstancenumber} picked {self.parent} as it's parent" + ) + + next_hop_interface_id = f"{self.componentinstancenumber}-{self.parent}" + + header = FireWireMessageHeader( + messagefrom=self.componentinstancenumber, + messageto=self.parent, + nexthop=self.parent, + messagetype=FireWirePacketType.PARENT_REQ, + interfaceid=next_hop_interface_id, + ) + payload = FireWireMessagePayload() + + message = GenericMessage(header, payload) + self.send_down(Event(self, EventTypes.MFRT, message)) + else: + # Cannot send a parent request, more than one possible parent + return + + def root_contention(self, eventobj: Event): + if self.is_leader: + return + print(f"🤖 {self.componentinstancenumber} is in ROOT CONTENTION") + decision = random.choice([True, False]) + + if decision: + print(f"🤖 {self.componentinstancenumber} decides to YIELD") + self.in_root_contention = True + self.send_parent_req() + self.is_waiting = False + self.waiting_since = None + else: + print(f"🤖 {self.componentinstancenumber} decides to HOLD") + self.is_waiting = True + self.in_root_contention = False + self.send_self(Event(self, FireWirePacketType.START_TIMER, "...")) + + # self.callback.set() + # self.draw_delay.wait() + # self.draw_delay.clear() + + def on_timer_initialize(self, eventobj: Event): + start_time = eventobj.time + self.waiting_since = start_time + self.send_self(Event(self, FireWirePacketType.CHECK_TIMER, "...")) + + def check_timer(self, eventobj: Event): + current_time = datetime.datetime.now() + delta = current_time - self.waiting_since + if delta.seconds > self.timeout_duration: + self.send_self(Event(self, FireWirePacketType.TIMEOUT, "...")) + else: + sleep(0.2) + self.send_self(Event(self, FireWirePacketType.CHECK_TIMER, "...")) + + def timeout(self, eventobj: Event): + self.send_self(Event(self, FireWirePacketType.ROOT_CONTENTION, "...")) + + def on_message_from_bottom(self, eventobj: Event): + """ New message from the link layer """ + header: FireWireMessageHeader = eventobj.eventcontent.header + # paylaod is not important for FireWire + + if header.messagetype == FireWirePacketType.PARENT_REQ: + if header.messagefrom is not self.parent: + new_child = header.messagefrom + + self.received.append(new_child) + + next_hop_interface_id = f"{self.componentinstancenumber}-{new_child}" + + header = FireWireMessageHeader( + messagefrom=self.componentinstancenumber, + messageto=new_child, + nexthop=new_child, + messagetype=FireWirePacketType.ACKNOWLEDGEMENT, + interfaceid=next_hop_interface_id, + ) + payload = FireWireMessagePayload() + + ack = GenericMessage(header, payload) + self.send_down(Event(self, EventTypes.MFRT, ack)) + + self.send_parent_req() + elif not self.is_waiting: + self.send_self(Event(self, FireWirePacketType.ROOT_CONTENTION, "...")) + else: + print(f" 👑 {self.componentinstancenumber} is elected as the leader") + self.is_leader = True + self.is_terminated = True + + elif ( + header.messagetype == FireWirePacketType.ACKNOWLEDGEMENT + and header.messagefrom == self.parent + ): + # This node's parent request got acknowledged, the process can + # safely terminate + print( + f"🤖 {self.componentinstancenumber} received an ACK " + f" from {header.messagefrom}, terminating" + ) + self.is_terminated = True + self.in_root_contention = False + + self.callback.set() + self.draw_delay.wait() + self.draw_delay.clear() diff --git a/AnonymousNetworks/ItaiRodeh.py b/AnonymousNetworks/ItaiRodeh.py new file mode 100644 index 0000000000000000000000000000000000000000..856a5b0aa66c627babe74b48a3df891b99621a03 --- /dev/null +++ b/AnonymousNetworks/ItaiRodeh.py @@ -0,0 +1,269 @@ +#!/usr/bin/env python + +""" +Implementation of the "Itai-Rodeh Election Algorithm for Rings" as +described in the textbook "Fokkink, Wan. Distributed algorithms: an intuitive +approach. MIT Press, 2018." with additional help from the paper "W. Fokkink and +J. Pang, Simplifying Itai-Rodeh Leader Election for Anonymous Rings. 2004." +""" + +__author__ = "Yigit Sever" +__contact__ = "yigit@yigitsever.com" +__copyright__ = "Copyright 2021, WINSLAB" +__credits__ = ["Yigit Sever"] +__date__ = "2021-05-24" +__deprecated__ = False +__email__ = "yigit@yigitsever.com" +__license__ = "GPLv3" +__maintainer__ = "developer" +__status__ = "Production" +__version__ = "0.0.1" + +from enum import Enum +from random import randint + +from Ahc import (ComponentModel, Event, EventTypes, GenericMessage, + GenericMessageHeader, GenericMessagePayload, Topology) + + +class State(Enum): + """ + State of the nodes, one from {active, passive, leader} + - "active" nodes are initiators, attempting to become a leader + - "passive" nodes have selected smaller id's than "active" nodes and cannot + compete for leadership anymore + - "leader" node has won an election round, only one such node should be + present in the network + """ + + active = 1 + passive = 2 + leader = 3 + + +class ItaiRodehMessageHeader(GenericMessageHeader): + def __init__( + self, + messagefrom, + messageto, + messagetype="Itai-Rodeh Message", + nexthop=float("inf"), + interfaceid=float("inf"), + sequencenumber=-1, + ): + super().__init__( + messagetype, messagefrom, messageto, nexthop, interfaceid, sequencenumber + ) + + +class ItaiRodehMessagePayload(GenericMessagePayload): + """ + Itai-Rodeh Algorithm uses messages with 4 fields (using textbook's + terminology here): + + - id_p (i): The (random) id the process has chosen for itself for the + current round + - election_round (n'): The current election round, old messages are + silently dismissed + - hop_count (h): Used so that the originating node can recognize their own + message + - dirty_bit (b): For resolving ties, nodes that picked this (highest) id + will still be active next round + """ + + def __init__(self, election_round, id_p): + self.election_round = election_round + self.id_p = id_p + self.hop_count = 1 + # disparity between the paper & textbook, following textbook here + # mnemonic = bit is not dirty, we can still be the leader + self.dirty_bit = False + + +class ItaiRodehNode(ComponentModel): + """ + Node in a system that uses Itai-Rodeh algorithm + Each process has three parameters: + - id_p: 1 <= i <= N where N is the ring size + - state: from the State enum, active nodes participate in the election, + passive nodes pass messages around, leader is selected at the end of the + election cycle + - round: current election round, starts at 1 + """ + + ring_size = 0 + callback = None + draw_delay = None + global_round = 1 + + def __init__(self, component_name, component_id): + super().__init__(component_name, component_id) + + """ The anonymous id the node will select for the round """ + self.id_p = 0 + + """ Initially, all processes are active """ + self.state = State.active + + """ Initialized at round 0 """ + self.election_round = 1 + + def send_election_packet(self): + + header = ItaiRodehMessageHeader( + messagefrom=self.componentinstancenumber, + messageto=self.next_hop, + nexthop=self.next_hop, + messagetype="ItaiRodeh Message", + interfaceid=self.next_hop_interface_id, + ) + payload = ItaiRodehMessagePayload(self.election_round, self.id_p) + + message = GenericMessage(header, payload) + self.send_down(Event(self, EventTypes.MFRT, message)) + + def pass_packet_along(self, message): + """For passive processes + + :message: the whole Event, with eventcontent which includes header and + payload + + """ + pass + + def on_init(self, eventobj: Event): + # Select an id for round 1 + self.id_p = randint(1, self.ring_size) + print( + f"🤖 {self.componentinstancenumber} selected {self.id_p} as their" + f" ID for round {self.election_round}" + ) + + # Calculate the neighbour, we're on a directed ring + self.neighbour_id = (int(self.componentinstancenumber) + 1) % self.ring_size + + self.next_hop = Topology().get_next_hop( + self.componentinstancenumber, self.neighbour_id + ) + + self.next_hop_interface_id = f"{self.componentinstancenumber}-{self.next_hop}" + + self.send_election_packet() + + def on_message_from_bottom(self, eventobj: Event): + """ New message from the link layer """ + payload: ItaiRodehMessagePayload = eventobj.eventcontent.payload + header: ItaiRodehMessageHeader = eventobj.eventcontent.header + + message_election_round = payload.election_round + message_assumed_id = payload.id_p + + # For the active node, we are going to follow the if/else chain given + # in the textbook + + if self.state == State.passive: + # passive node, pass the message on to the next hop, increase the + # 'hop_count' of packet by one, no other responsibility + payload.hop_count += 1 + header.messageto = self.next_hop + header.next_hop = self.next_hop + header.interfaceid = self.next_hop_interface_id + + message = GenericMessage(header, payload) + self.send_down(Event(self, EventTypes.MFRT, message)) + elif self.state == State.active: + + if message_election_round > self.election_round or ( + message_election_round == self.election_round + and message_assumed_id > self.id_p + ): + # Another node has picked a higher id than this node for the + # current round or this node has received a message from a + # future round, going passive + + print( + f"🤖 {self.componentinstancenumber} is PASSIVE: " + f"{message_assumed_id} for round {message_election_round} " + f"encountered, this node is at {self.election_round} with " + f"{self.id_p}" + ) + + self.state = State.passive + # we clear the id_p here to have them *not* show up in the + # animation + self.id_p = " " + payload.hop_count += 1 + + header.messageto = self.next_hop + header.next_hop = self.next_hop + header.interfaceid = self.next_hop_interface_id + + message = GenericMessage(header, payload) + self.send_down(Event(self, EventTypes.MFRT, message)) + + elif message_election_round < self.election_round or ( + message_election_round == self.election_round + and message_assumed_id < self.id_p + ): + # This node has received a message from a previous round or + # from the current round but with a lower assumed id, so this + # node can dismiss the election attempt of the sender node + print( + f"🤖 {self.componentinstancenumber} is dismissing " + f"{message_assumed_id} for round {message_election_round} " + f"this node is at {self.election_round} with {self.id_p}" + ) + + elif ( + message_election_round == self.election_round + and message_assumed_id == self.id_p + ): + + if payload.hop_count < self.ring_size: + # receiver node is not the initial sender node + # another node has picked our id, dirty their bit and pass it along + payload.dirty_bit = True + payload.hop_count += 1 + + header.messageto = self.next_hop + header.next_hop = self.next_hop + header.interfaceid = self.next_hop_interface_id + + print( + f"🤖 {self.componentinstancenumber} dirtied the bit " + f"for {message_assumed_id}, passing it along" + ) + message = GenericMessage(header, payload) + self.send_down(Event(self, EventTypes.MFRT, message)) + + elif payload.hop_count == self.ring_size: + # the message that this node has sent traversed all the way + # around the ring + print( + f"🤖 {self.componentinstancenumber}'s message " + f"traversed all the way around" + ) + if payload.dirty_bit: + # Bit has been dirtied, next round + self.id_p = randint(1, self.ring_size) + self.election_round += 1 + ItaiRodehNode.global_round = self.election_round + print( + f"🤖 {self.componentinstancenumber} is moving " + f"onto round {self.election_round}" + ) + print( + f"🤖 {self.componentinstancenumber} selected " + f"{self.id_p} as their ID for round " + f"{self.election_round}" + ) + self.send_election_packet() + else: + # The bit is still false, this node is the leader + self.state = State.leader + print( + f"🤖 {self.componentinstancenumber}: I'M THE ELECTED LEADER" + ) + self.callback.set() + self.draw_delay.wait() + self.draw_delay.clear() diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 6b324dac56e47877995ad21812e37b4179d33526..378cacc5c4cd6a7b6afd870f54f0c9c9d3489864 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -1,3 +1,11 @@ # Contributors - [Berker Acır](https://github.com/berkeracir) +- [Osman Ufuk Yağmur](https://github.com/VengerA) +- [Berke Tezergil](https://github.com/btezergil) +- [Saim Sunel](https://github.com/SaimSUNEL) +- [Ozan Akın](https://github.com/oznakn) +- [Ilyas Eren Yilmaz](https://github.com/ilyaserenyilmaz) +- [İbrahim Koç] +- [Ege Uçak](https://github.com/egeucak) +- [Mahmoud Alasmar](https://github.com/e212602) diff --git a/tests/AnonymousNetworks/__init__.py b/tests/AnonymousNetworks/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/AnonymousNetworks/testIEEE1394.py b/tests/AnonymousNetworks/testIEEE1394.py new file mode 100644 index 0000000000000000000000000000000000000000..bf4e701103b0f8d97dc759726c05256c7ee398c4 --- /dev/null +++ b/tests/AnonymousNetworks/testIEEE1394.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python + +# the project root must be in PYTHONPATH for imports +# $ export PYTHONPATH=$(pwd); python tests/AnonymousNetworks/testItaiRodeh.py + +import sys +import threading +from math import atan2, cos, radians, sin, sqrt +from time import sleep + +import matplotlib.pyplot as plt +import networkx as nx +from Ahc import Topology +from Channels import P2PFIFOPerfectChannel + +from AnonymousNetworks.IEEE1394 import FireWireNode + +ACTIVE_NODE_COLOUR = "#98971a" +PASSIVE_NODE_COLOUR = "#7c6f64" +LEADER_NODE_COLOUR = "#d79921" +CONTENDING_NODE_COLOUR = "#b16286" +EDGE_COLOUR = "#3c3836" + +FPS = 0.4 + + +def main(): + n = int(sys.argv[1]) + print(f"Creating a tree with size {n}") + G = nx.random_tree(n) + + update = threading.Event() + draw_delay = threading.Event() + plt.ion() + fig = plt.figure(num=0) + + topology = Topology() + topology.construct_from_graph(G, FireWireNode, P2PFIFOPerfectChannel) + topology.start() + + FireWireNode.callback = update + FireWireNode.draw_delay = draw_delay + + while True: + update.wait() + node_colours = list() + + G = Topology().G + pos = nx.spectral_layout(G, center=(0, 0)) + + for nodeID in Topology().nodes: + node = Topology().nodes[nodeID] + + if node.is_leader: + node_colours.append(LEADER_NODE_COLOUR) + elif node.in_root_contention: + node_colours.append(CONTENDING_NODE_COLOUR) + elif node.is_terminated: + node_colours.append(PASSIVE_NODE_COLOUR) + elif not node.is_terminated: + node_colours.append(ACTIVE_NODE_COLOUR) + + nx.draw( + G, + pos, + node_color=node_colours, + edge_color=EDGE_COLOUR, + with_labels=True, + font_weight="bold", + ) + + fig.canvas.draw() + fig.canvas.flush_events() + fig.clear() + update.clear() + draw_delay.set() + sleep(1.0 / FPS) + + +if __name__ == "__main__": + main() diff --git a/tests/AnonymousNetworks/testItaiRodeh.py b/tests/AnonymousNetworks/testItaiRodeh.py new file mode 100644 index 0000000000000000000000000000000000000000..7a6dc332f8ecfa752905bdb333eed04bbe949122 --- /dev/null +++ b/tests/AnonymousNetworks/testItaiRodeh.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python + +# the project root must be in PYTHONPATH for imports +# $ export PYTHONPATH=$(pwd); python tests/AnonymousNetworks/testItaiRodeh.py + +import sys +import threading +from math import atan2, cos, radians, sin, sqrt +from time import sleep + +import matplotlib.pyplot as plt +import networkx as nx +from Ahc import Topology +from Channels import P2PFIFOPerfectChannel + +from AnonymousNetworks.ItaiRodeh import ItaiRodehNode, State + +ACTIVE_NODE_COLOUR = "#ff0000" +PASSIVE_NODE_COLOUR = "#e0e0e0" +LEADER_NODE_COLOUR = "#ff00ff" +EDGE_COLOUR = "#1a1c20" + +FPS = 1 + + +def main(): + n = int(sys.argv[1]) + print(f"Creating a ring with size {n}") + G = nx.cycle_graph(n) + + plt.ion() + fig = plt.figure(num=0) + + topology = Topology() + topology.construct_from_graph(G, ItaiRodehNode, P2PFIFOPerfectChannel) + topology.start() + ItaiRodehNode.ring_size = n + + update = threading.Event() + draw_delay = threading.Event() + ItaiRodehNode.callback = update + ItaiRodehNode.draw_delay = draw_delay + + while True: + update.wait() + assumed_ids = list() + node_colours = list() + + G = Topology().G + pos = nx.circular_layout(G, center=(0, 0)) + + for nodeID in Topology().nodes: + node = Topology().nodes[nodeID] + G.nodes[nodeID]["id_p"] = node.id_p + assumed_ids.append(node.id_p) + + if node.state == State.active: + node_colours.append(ACTIVE_NODE_COLOUR) + elif node.state == State.passive: + node_colours.append(PASSIVE_NODE_COLOUR) + elif node.state == State.leader: + node_colours.append(LEADER_NODE_COLOUR) + + node_id_label_pos = {} + for key in pos: + x, y = pos[key] + theta = atan2(y, x) + radians(75) + d = 0.1 + node_id_label_pos[key] = (x + d * cos(theta), y + d * sin(theta)) + + node_id_labels = nx.get_node_attributes(G, "id_p") + + nx.draw( + G, + pos, + node_color=node_colours, + edge_color=EDGE_COLOUR, + with_labels=False, + font_weight="bold", + ) + + nx.draw_networkx_labels(G, node_id_label_pos, node_id_labels) + + fig.text(0.2, 0.2, f"Round: {ItaiRodehNode.global_round}") + + fig.canvas.draw() + fig.canvas.flush_events() + fig.clear() + update.clear() + draw_delay.set() + sleep(1.0 / FPS) + + +if __name__ == "__main__": + main()