diff --git a/BakeryAlgorithm/Ahc.py b/BakeryAlgorithm/Ahc.py
new file mode 100644
index 0000000000000000000000000000000000000000..a610b3aa513b1257cdf4d34fe96218e3dd285b0b
--- /dev/null
+++ b/BakeryAlgorithm/Ahc.py
@@ -0,0 +1,380 @@
+#!/usr/bin/env python
+""" Implements the AHC library.
+
+TODO: Longer description of this module to be written.
+
+This program is free software: you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation, either version 3 of the License, or (at your option) any later
+version.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+You should have received a copy of the GNU General Public License along with
+this program. If not, see .
+
+"""
+
+__author__ = "One solo developer"
+__authors__ = ["Ertan Onur", "Berke Tezergil", "etc"]
+__contact__ = "eonur@ceng.metu.edu.tr"
+__copyright__ = "Copyright 2021, WINSLAB"
+__credits__ = ["Ertan Onur", "Berke Tezergil", "etc"]
+__date__ = "2021/04/07"
+__deprecated__ = False
+__email__ = "eonur@ceng.metu.edu.tr"
+__license__ = "GPLv3"
+__maintainer__ = "developer"
+__status__ = "Production"
+__version__ = "0.0.1"
+
+
+import datetime
+import queue
+from enum import Enum
+from threading import Thread, Lock
+
+import matplotlib.pyplot as plt
+import networkx as nx
+
+# TIMING ASSUMPTIONS
+# TODO: Event handling time, message sending time, assumptions about clock (drift, skew, ...)
+# TODO: 1. Asynch, 2. Synch 3. Partial-synch 4. Timed asynch
+# TODO: Causal-order (happen before), total-order,
+# TODO: Causal-order algebra!!!
+# TODO: Implement logical clocks (Lamport clocks, vector clocks) in event handling loop
+
+# AUTOMATA and EXECUTIONS
+# TODO: Let component model hande executions and chekcs on executions (which event, in which order, per process or per system, similarity of executions)
+
+
+# VISUALIZATION
+# TODO: Space-time diagrams for events
+
+# TOPOLOGY MANAGEMENT
+# TODO: Given a graph as input, generate the topology....
+
+inf = float('inf')
+
+# The following are the common default events for all components.
+class EventTypes(Enum):
+ INIT = "init"
+ MFRB = "msgfrombottom"
+ MFRT = "msgfromtop"
+ MFRP = "msgfrompeer"
+
+class MessageDestinationIdentifiers(Enum):
+ LINKLAYERBROADCAST = -1, # sinngle-hop broadcast, means all directly connected nodes
+ NETWORKLAYERBROADCAST = -2 # For flooding over multiple-hops means all connected nodes to me over one or more links
+
+# A Dictionary that holds a list for the same key
+class ConnectorList(dict):
+ def __setitem__(self, key, value):
+ try:
+ self[key]
+ except KeyError:
+ super(ConnectorList, self).__setitem__(key, [])
+ self[key].append(value)
+
+class ConnectorTypes(Enum):
+ DOWN = "DOWN"
+ UP = "UP"
+ PEER = "PEER"
+
+class GenericMessagePayload:
+ def __init__(self, messagepayload):
+ self.messagepayload = messagepayload
+
+class GenericMessageHeader:
+ def __init__(self, messagetype, messagefrom, messageto, nexthop=float('inf'), interfaceid=float('inf'), sequencenumber=-1):
+ self.messagetype = messagetype
+ self.messagefrom = messagefrom
+ self.messageto = messageto
+ self.nexthop = nexthop
+ self.interfaceid = interfaceid
+ self.sequencenumber = sequencenumber
+
+class GenericMessage:
+ def __init__(self, header, payload):
+ self.header = header
+ self.payload = payload
+ self.uniqueid = str(header.messagefrom) + "-" + str(header.sequencenumber)
+
+class Event:
+ def __init__(self, eventsource, event, eventcontent, fromchannel=None):
+ self.eventsource = eventsource
+ self.event = event
+ self.time = datetime.datetime.now()
+ self.eventcontent = eventcontent
+ self.fromchannel = fromchannel
+
+def singleton(cls):
+ instance = [None]
+
+ def wrapper(*args, **kwargs):
+ if instance[0] is None:
+ instance[0] = cls(*args, **kwargs)
+ return instance[0]
+
+ return wrapper
+
+@singleton
+class ComponentRegistry:
+ components = {}
+
+ def get_component_by_instance(self, instance):
+ list_of_keys = list()
+ list_of_items = self.components.items()
+ for item in list_of_items:
+ if item[1] == instance:
+ list_of_keys.append(item[0])
+ return list_of_keys
+
+ def add_component(self, component):
+ key = component.componentname + str(component.componentinstancenumber)
+ self.components[key] = component
+
+ def get_component_by_key(self, componentname, componentinstancenumber):
+ key = componentname + str(componentinstancenumber)
+ return self.components[key]
+
+ def init(self):
+ for itemkey in self.components:
+ cmp = self.components[itemkey]
+ cmp.inputqueue.put_nowait(Event(self, EventTypes.INIT, None))
+
+ def print_components(self):
+ for itemkey in self.components:
+ cmp = self.components[itemkey]
+ print(f"I am {cmp.componentname}.{cmp.componentinstancenumber}")
+ for i in cmp.connectors:
+ connectedcmp = cmp.connectors[i]
+ for p in connectedcmp:
+ print(f"\t{i} {p.componentname}.{p.componentinstancenumber}")
+
+registry = ComponentRegistry()
+
+class ComponentModel:
+ terminated = False
+
+ def on_init(self, eventobj: Event):
+ # print(f"Initializing {self.componentname}.{self.componentinstancenumber}")
+ pass
+
+ def on_message_from_bottom(self, eventobj: Event):
+ print(f"{EventTypes.MFRB} {self.componentname}.{self.componentinstancenumber}")
+
+ def on_message_from_top(self, eventobj: Event):
+ print(f"{EventTypes.MFRT} {self.componentname}.{self.componentinstancenumber}")
+
+ def on_message_from_peer(self, eventobj: Event):
+ print(f"{EventTypes.MFRP} {self.componentname}.{self.componentinstancenumber}")
+
+ def __init__(self, componentname, componentinstancenumber, num_worker_threads=1):
+ self.eventhandlers = {EventTypes.INIT: self.on_init, EventTypes.MFRB: self.on_message_from_bottom,
+ EventTypes.MFRT: self.on_message_from_top, EventTypes.MFRP: self.on_message_from_peer}
+ # Add default handlers to all instantiated components.
+ # If a component overwrites the __init__ method it has to call the super().__init__ method
+ self.inputqueue = queue.Queue()
+ self.componentname = componentname
+ self.componentinstancenumber = componentinstancenumber
+ self.num_worker_threads = num_worker_threads
+ try:
+ if self.connectors:
+ pass
+ except AttributeError:
+ self.connectors = ConnectorList()
+
+ self.registry = ComponentRegistry()
+ self.registry.add_component(self)
+
+ for i in range(self.num_worker_threads):
+ t = Thread(target=self.queue_handler, args=[self.inputqueue])
+ t.daemon = True
+ t.start()
+
+ def connect_me_to_component(self, name, component):
+ try:
+ self.connectors[name] = component
+ except AttributeError:
+ self.connectors = ConnectorList()
+ self.connectors[name] = component
+
+ def connect_me_to_channel(self, name, channel):
+ try:
+ self.connectors[name] = channel
+ except AttributeError:
+ self.connectors = ConnectorList()
+ self.connectors[name] = channel
+ connectornameforchannel = self.componentname + str(self.componentinstancenumber)
+ channel.connect_me_to_component(connectornameforchannel, self)
+
+ def terminate(self):
+ self.terminated = True
+
+ def send_down(self, event: Event):
+ try:
+ for p in self.connectors[ConnectorTypes.DOWN]:
+ p.trigger_event(event)
+ except:
+ pass
+
+ def send_up(self, event: Event):
+ try:
+ for p in self.connectors[ConnectorTypes.UP]:
+ p.trigger_event(event)
+ except:
+ pass
+
+ def send_peer(self, event: Event):
+ try:
+ for p in self.connectors[ConnectorTypes.PEER]:
+ p.trigger_event(event)
+ except:
+ pass
+
+ def send_self(self, event: Event):
+ self.trigger_event(event)
+
+ # noinspection PyArgumentList
+ def queue_handler(self, myqueue):
+ while not self.terminated:
+ workitem = myqueue.get()
+ if workitem.event in self.eventhandlers:
+ self.eventhandlers[workitem.event](eventobj=workitem) # call the handler
+ else:
+ print(f"Event Handler: {workitem.event} is not implemented")
+ myqueue.task_done()
+
+ def trigger_event(self, eventobj: Event):
+ self.inputqueue.put_nowait(eventobj)
+
+@singleton
+class Topology:
+ nodes = {}
+ channels = {}
+
+ def construct_from_graph(self, G: nx.Graph, nodetype, channeltype):
+ self.G = G
+ nodes = list(G.nodes)
+ edges = list(G.edges)
+ for i in nodes:
+ cc = nodetype(nodetype.__name__, i)
+ self.nodes[i] = cc
+ for k in edges:
+ ch = channeltype(channeltype.__name__, str(k[0]) + "-" + str(k[1]))
+ self.channels[k] = ch
+ self.nodes[k[0]].connect_me_to_channel(ConnectorTypes.DOWN, ch)
+ self.nodes[k[1]].connect_me_to_channel(ConnectorTypes.DOWN, ch)
+
+ def construct_from_graph_bakery(self, G: nx.Graph, nodetype, channeltype):
+ self.G = G
+ nodes = list(G.nodes)
+ edges = list(G.edges)
+
+ nodes = nodes[::-1]
+ edges = edges[::-1]
+
+ for i in nodes:
+ cc = nodetype(nodetype.__name__, i,i)
+ self.nodes[i] = cc
+ for k in edges:
+ ch = channeltype(channeltype.__name__, str(k[0]) + "-" + str(k[1]))
+ self.channels[k] = ch
+ self.nodes[k[0]].connect_me_to_channel(ConnectorTypes.DOWN, ch)
+ self.nodes[k[1]].connect_me_to_channel(ConnectorTypes.DOWN, ch)
+
+
+ def construct_single_node(self, nodetype, instancenumber):
+ self.singlenode = nodetype(nodetype.__name__, instancenumber)
+ self.G = nx.Graph()
+ self.G.add_nodes_from([0])
+ self.nodes[0] = self.singlenode
+
+ def construct_sender_receiver(self, sendertype, receivertype, channeltype):
+ self.sender = sendertype(sendertype.__name__, 0)
+ self.receiver = receivertype(receivertype.__name__, 1)
+ ch = channeltype(channeltype.__name__, "0-1")
+ self.G = nx.Graph()
+ self.G.add_nodes_from([0, 1])
+ self.G.add_edges_from([(0, 1)])
+ self.nodes[self.sender.componentinstancenumber] = self.sender
+ self.nodes[self.sender.componentinstancenumber] = self.receiver
+ self.channels[ch.componentinstancenumber] = ch
+ self.sender.connect_me_to_channel(ConnectorTypes.DOWN, ch)
+ self.receiver.connect_me_to_channel(ConnectorTypes.DOWN, ch)
+
+ def allpairs_shortest_path(self):
+ return dict(nx.all_pairs_shortest_path(self.G))
+
+ def shortest_path_to_all(self, myid):
+ path = dict(nx.all_pairs_shortest_path(self.G))
+ nodecnt = len(self.G.nodes)
+ for i in range(nodecnt):
+ print(path[myid][i])
+
+ def start(self):
+ # registry.printComponents()
+ N = len(self.G.nodes)
+ self.compute_forwarding_table()
+ self.nodecolors = ['b'] * N
+ self.nodepos = nx.drawing.spring_layout(self.G)
+ self.lock = Lock()
+ ComponentRegistry().init()
+
+ def compute_forwarding_table(self):
+ #N = len(self.G.nodes)
+ self.ForwardingTable = dict(nx.all_pairs_shortest_path(self.G))
+ # print(f"There are {N} nodes")
+ #for i in range(N):
+ #for j in range(N):
+ #try:
+ #mypath = path[i][j]
+ # print(f"{i}to{j} path = {path[i][j]} nexthop = {path[i][j][1]}")
+ #self.ForwardingTable[i][j] = path[i][j][1]
+
+ # print(f"{i}to{j}path = NONE")
+ #self.ForwardingTable[i][j] = inf # No paths
+ #except IndexError:
+ # print(f"{i}to{j} nexthop = NONE")
+ #self.ForwardingTable[i][j] = i # There is a path but length = 1 (self)
+
+ # all-seeing eye routing table contruction
+ def print_forwarding_table(self):
+ registry.print_components()
+ print('\n'.join([''.join(['{:4}'.format(item) for item in row])
+ for row in list(self.ForwardingTable.values())]))
+
+ # returns the all-seeing eye routing based next hop id
+ def get_next_hop(self, fromId, toId):
+ try:
+ retval = self.ForwardingTable[fromId][toId]
+ return retval[1]
+ except KeyError:
+ return inf
+ except IndexError:
+ return fromId
+
+ # Returns the list of neighbors of a node
+ def get_neighbors(self, nodeId):
+ return sorted([neighbor for neighbor in self.G.neighbors(nodeId)])
+
+ def get_predecessors(self, nodeId):
+ return sorted([neighbor for neighbor in self.G.predecessors(nodeId)])
+
+ def get_successors(self, nodeId):
+ return sorted([neighbor for neighbor in self.G.neighbors(nodeId)])
+
+
+ # Returns the list of neighbors of a node
+ def get_neighbor_count(self, nodeId):
+ # return len([neighbor for neighbor in self.G.neighbors(nodeId)])
+ return self.G.degree[nodeId]
+
+ def plot(self):
+ #self.lock.acquire()
+ nx.draw(self.G, self.nodepos, node_color=self.nodecolors, with_labels=True, font_weight='bold')
+ plt.draw()
+ print(self.nodecolors)
+ #self.lock.release()
diff --git a/BakeryAlgorithm/BakeryComponent.py b/BakeryAlgorithm/BakeryComponent.py
new file mode 100644
index 0000000000000000000000000000000000000000..5984e3e349a3ea77a538f22b0ca2381562c05939
--- /dev/null
+++ b/BakeryAlgorithm/BakeryComponent.py
@@ -0,0 +1,71 @@
+from enum import Enum
+import time
+from Ahc import ComponentModel, Event, GenericMessageHeader, GenericMessagePayload, GenericMessage, EventTypes
+from threading import Thread
+
+node_number = 2
+waiting_ticket = [True]*node_number
+ticket_values = [0]*node_number
+
+class BakeryMessageTypes(Enum):
+ INC = "+"
+ DEC = "-"
+
+class ResourceComponent(ComponentModel):
+ def on_init(self, eventobj: Event):
+ print(f"Initializing {self.componentname}.{self.componentinstancenumber}\n")
+ pass
+
+ def on_message_from_bottom(self, eventobj: Event):
+ messagetype = eventobj.eventcontent.header.messagetype ## INC / DEC
+ if messagetype == BakeryMessageTypes.INC:
+ print("Increment value")
+ self.value += 1
+ elif messagetype == BakeryMessageTypes.DEC:
+ print("Decrement value")
+ self.value -= 1
+
+ def __init__(self, componentname, componentinstancenumber):
+ super().__init__(componentname, componentinstancenumber)
+ self.value = 0
+
+ def __repr__(self):
+ return f"Value:{self.value}"
+
+class BakeryComponent(ComponentModel):
+ def on_init(self, eventobj: Event):
+ print(f"Initializing {self.componentname}.{self.componentinstancenumber}\n")
+ pass
+
+ def send_message(self):
+ global flag,turn
+ message_header = GenericMessageHeader(self.messageType, self.componentinstancenumber, None,interfaceid=f"{self.componentinstancenumber}-{self.resourceComponentId}")
+ message_payload = GenericMessagePayload(None)
+ message = GenericMessage(message_header, message_payload)
+
+ ticket_values[self.id] = max(ticket_values)+1
+ waiting_ticket[self.id] = False
+ for i in range(node_number):
+ while waiting_ticket[i] == True:
+ continue
+ while ticket_values[i] > 0 and (ticket_values[i] < ticket_values[self.id] or(ticket_values[i] == ticket_values[self.id] and i < self.id)):
+ continue
+ ####critical section####
+ self.send_down(Event(self, EventTypes.MFRT, message))
+ time.sleep(0.001)
+ ########################
+ ticket_values[self.id] = 0
+
+ self.done = True
+
+ def __init__(self, componentname, componentinstancenumber,type = BakeryMessageTypes.INC , resourceComponentId = 2):
+ super().__init__(componentname, componentinstancenumber)
+ self.resourceComponentId = resourceComponentId
+ self.done = False
+ self.messageType = type
+ self.id = self.componentinstancenumber
+
+ def start(self):
+ t = Thread(target=self.send_message)
+ t.daemon = True
+ t.start()
diff --git a/BakeryAlgorithm/Channels.py b/BakeryAlgorithm/Channels.py
new file mode 100644
index 0000000000000000000000000000000000000000..a6e9bbd2bd74c43b2f86d21833f171255b41a6c8
--- /dev/null
+++ b/BakeryAlgorithm/Channels.py
@@ -0,0 +1,160 @@
+import queue
+import random
+from enum import Enum
+from threading import Thread
+
+from Ahc import ComponentModel, EventTypes, ConnectorList, MessageDestinationIdentifiers
+from Ahc import Event
+
+# TODO: Channel failure models: lossy-link, fair-loss, stubborn links, perfect links (WHAT ELSE?), FIFO perfect
+# TODO: Logged perfect links (tolerance to crashes), authenticated perfect links
+# TODO: Broadcast channels? and their failure models? Collisions?
+# TODO: Properties of all models separately: e.g., Fair loss, finite duplication, no fabrication in fair-loss link model
+# TODO: Packets: loss, duplication, sequence change, windowing?,
+# TODO: Eventually (unbounded time) or bounded time for message delivery?
+
+
+# Channels have three events: sendtochannel, processinchannel and delivertocomponent
+# Components tell channels to handle a message by the EventTypes.MFRT event, the component calls senddown with the event EventTypes.MFRT
+# First pipeline stage moves the message to the interim pipeline stage with the "processinchannel" event for further processing, such as the channel may drop it, delay it, or whatever
+# Channels deliver the message to output queue by the "delivertocomponent" event
+# The output queue then will send the message up to the connected component(s) using the "messagefromchannel" event
+# The components that will use the channel directly, will have to handle "messagefromchannel" event
+
+class ChannelEventTypes(Enum):
+ INCH = "processinchannel"
+ DLVR = "delivertocomponent"
+
+class Channel(ComponentModel):
+
+ def on_init(self, eventobj: Event):
+
+ pass
+
+ # Overwrite onSendToChannel if you want to do something in the first pipeline stage
+ def on_message_from_top(self, eventobj: Event):
+ # channel receives the input message and will process the message by the process event in the next pipeline stage
+ myevent = Event(eventobj.eventsource, ChannelEventTypes.INCH, eventobj.eventcontent)
+ self.channelqueue.put_nowait(myevent)
+
+ # Overwrite onProcessInChannel if you want to do something in interim pipeline stage
+ def on_process_in_channel(self, eventobj: Event):
+ # Add delay, drop, change order whatever....
+ # Finally put the message in outputqueue with event deliver
+ myevent = Event(eventobj.eventsource, ChannelEventTypes.DLVR, eventobj.eventcontent)
+ self.outputqueue.put_nowait(myevent)
+
+ # Overwrite onDeliverToComponent if you want to do something in the last pipeline stage
+ # onDeliver will deliver the message from the channel to the receiver component using messagefromchannel event
+ def on_deliver_to_component(self, eventobj: Event):
+ callername = eventobj.eventsource.componentinstancenumber
+ for item in self.connectors:
+ callees = self.connectors[item]
+ for callee in callees:
+ calleename = callee.componentinstancenumber
+ # print(f"I am connected to {calleename}. Will check if I have to distribute it to {item}")
+ if calleename == callername:
+ pass
+ else:
+ myevent = Event(eventobj.eventsource, EventTypes.MFRB, eventobj.eventcontent, self.componentinstancenumber)
+ callee.trigger_event(myevent)
+
+ def __init__(self, componentname, componentinstancenumber):
+ super().__init__(componentname, componentinstancenumber)
+ self.outputqueue = queue.Queue()
+ self.channelqueue = queue.Queue()
+ self.eventhandlers[ChannelEventTypes.INCH] = self.on_process_in_channel
+ self.eventhandlers[ChannelEventTypes.DLVR] = self.on_deliver_to_component
+
+ for i in range(self.num_worker_threads):
+ # note that the input queue is handled by the super class...
+ t = Thread(target=self.queue_handler, args=[self.channelqueue])
+ t1 = Thread(target=self.queue_handler, args=[self.outputqueue])
+ t.daemon = True
+ t1.daemon = True
+ t.start()
+ t1.start()
+
+class AHCChannelError(Exception):
+ pass
+
+class P2PFIFOPerfectChannel(Channel):
+
+ # Overwrite onSendToChannel
+ # Channels are broadcast, that is why we have to check channel id's using hdr.interfaceid for P2P
+ def on_message_from_top(self, eventobj: Event):
+ # if channelid != hdr.interfaceif then drop (should not be on this channel)
+ hdr = eventobj.eventcontent.header
+ if hdr.nexthop != MessageDestinationIdentifiers.LINKLAYERBROADCAST:
+ if set(hdr.interfaceid.split("-")) == set(self.componentinstancenumber.split("-")):
+ #print(f"Will forward message since {hdr.interfaceid} and {self.componentinstancenumber}")
+ myevent = Event(eventobj.eventsource, ChannelEventTypes.INCH, eventobj.eventcontent)
+ self.channelqueue.put_nowait(myevent)
+ else:
+ #print(f"Will drop message since {hdr.interfaceid} and {self.componentinstancenumber}")
+ pass
+
+ def on_deliver_to_component(self, eventobj: Event):
+ msg = eventobj.eventcontent
+ callername = eventobj.eventsource.componentinstancenumber
+ for item in self.connectors:
+ callees = self.connectors[item]
+ for callee in callees:
+ calleename = callee.componentinstancenumber
+ # print(f"I am connected to {calleename}. Will check if I have to distribute it to {item}")
+ if calleename == callername:
+ pass
+ else:
+ myevent = Event(eventobj.eventsource, EventTypes.MFRB, eventobj.eventcontent, self.componentinstancenumber)
+ callee.trigger_event(myevent)
+
+ # Overwriting to limit the number of connected components
+ def connect_me_to_component(self, name, component):
+ try:
+ self.connectors[name] = component
+ # print(f"Number of nodes connected: {len(self.ports)}")
+ if len(self.connectors) > 2:
+ raise AHCChannelError("More than two nodes cannot connect to a P2PFIFOChannel")
+ except AttributeError:
+ self.connectors = ConnectorList()
+ self.connectors[name] = component
+ # except AHCChannelError as e:
+ # print( f"{e}" )
+
+class P2PFIFOFairLossChannel(P2PFIFOPerfectChannel):
+ prob = 1
+ duplicationprobability = 0
+ # Overwrite onSendToChannel
+ # Channels are broadcast, that is why we have to check channel id's using hdr.interfaceid for P2P
+
+ def on_message_from_top(self, eventobj: Event):
+ # if channelid != hdr.interfaceif then drop (should not be on this channel)
+ hdr = eventobj.eventcontent.header
+ if hdr.nexthop != MessageDestinationIdentifiers.LINKLAYERBROADCAST:
+ if set(hdr.interfaceid.split("-")) == set(self.componentinstancenumber.split("-")):
+ #print(f"Will forward message since {hdr.interfaceid} and {self.componentinstancenumber}")
+ myevent = Event(eventobj.eventsource, ChannelEventTypes.INCH, eventobj.eventcontent)
+ self.channelqueue.put_nowait(myevent)
+ else:
+ #print(f"Will drop message since {hdr.interfaceid} and {self.componentinstancenumber}")
+ pass
+
+
+ def on_process_in_channel(self, eventobj: Event):
+ if random.random() < self.prob:
+ myevent = Event(eventobj.eventsource, ChannelEventTypes.DLVR, eventobj.eventcontent)
+ self.outputqueue.put_nowait(myevent)
+ if random.random() < self.duplicationprobability:
+ self.channelqueue.put_nowait(eventobj)
+
+ def setPacketLossProbability(self, prob):
+ self.prob = prob
+
+ def setAverageNumberOfDuplicates(self, d):
+ if d > 0:
+ self.duplicationprobability = (d - 1) / d
+ else:
+ self.duplicationprobability = 0
+
+class FIFOBroadcastPerfectChannel(Channel):
+ pass
diff --git a/BakeryAlgorithm/bakery.py b/BakeryAlgorithm/bakery.py
new file mode 100644
index 0000000000000000000000000000000000000000..960ed4c3b35cdbe548f0a17eda9131d2b2d8956c
--- /dev/null
+++ b/BakeryAlgorithm/bakery.py
@@ -0,0 +1,46 @@
+import threading
+
+def produce(id):
+ global counter
+ counter += 1
+ #print("Thread {} counter:".format(id),counter)
+
+def threadRoutine(id):
+ global waitFlag,waitingTicket,ticket_values,threadNumber
+ while waitFlag:
+ continue
+ print("Thread {} started".format(id))
+
+ ticketValues[id] = max(ticketValues)+1
+ waitingTicket[id] = False
+
+ for i in range(threadNumber):
+ while waitingTicket[i] == True:
+ continue
+ while ticketValues[i] > 0 and (ticketValues[i] < ticketValues[id] or(ticketValues[i] == ticketValues[id] and i < id)):
+ continue
+ ####critical section####
+ produce(id)
+ print("Thread {} done its job in critical section".format(id))
+ ########################
+ ticketValues[id] = 0
+ print("Thread {} finished".format(id))
+
+if __name__ == "__main__":
+
+ counter = 0
+ threadNumber = 20
+ waitingTicket = [True]*threadNumber
+ ticketValues = [0]*threadNumber
+ thread_list = []
+ for i in range(threadNumber):
+ thread_list.append(threading.Thread(target=threadRoutine, args=(i,)))
+ waitFlag = True
+ for thread in thread_list:
+ thread.start()
+ waitFlag = False
+ for thread in thread_list:
+ thread.join()
+
+ print("Expected output {} , real output {}".format(threadNumber,counter))
+
\ No newline at end of file
diff --git a/BakeryAlgorithm/testBakery.py b/BakeryAlgorithm/testBakery.py
new file mode 100644
index 0000000000000000000000000000000000000000..2277971b9bb641364120c3034e97d839236c764d
--- /dev/null
+++ b/BakeryAlgorithm/testBakery.py
@@ -0,0 +1,58 @@
+import random
+import time
+from enum import Enum
+
+import matplotlib.pyplot as plt
+import networkx as nx
+
+from BakeryComponent import *
+from Ahc import ComponentModel, Event, GenericMessageHeader, GenericMessagePayload, GenericMessage, EventTypes, ConnectorTypes, Topology, ComponentRegistry
+from Channels import FIFOBroadcastPerfectChannel
+
+class AdHocNode(ComponentModel):
+
+ def on_init(self, eventobj: Event):
+ print(f"Initializing {self.componentname}.{self.componentinstancenumber}")
+
+ def on_message_from_top(self, eventobj: Event):
+ print("Outgoing Message from ", self.componentinstancenumber, ". Node")
+ self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent))
+
+ def on_message_from_bottom(self, eventobj: Event):
+ print("Incoming Message to ", self.componentinstancenumber, ". Node, From: ", eventobj.eventcontent.header.messagefrom)
+ self.send_up(Event(self, EventTypes.MFRB, eventobj.eventcontent))
+
+ def __init__(self, componentname, componentid, type):
+ if type == 0:
+ self.mainComponent = BakeryComponent("BakeryProducerComponent", componentid,BakeryMessageTypes.INC)
+ self.mainComponent.connect_me_to_component(ConnectorTypes.DOWN, self)
+ elif type == 1:
+ self.mainComponent = BakeryComponent("BakeryConsumerComponent", componentid,BakeryMessageTypes.DEC)
+ self.mainComponent.connect_me_to_component(ConnectorTypes.DOWN, self)
+ elif type == 2:
+ self.mainComponent = ResourceComponent("ResourceComponent", componentid)
+ self.connect_me_to_component(ConnectorTypes.UP, self.mainComponent)
+ super().__init__(componentname, componentid)
+
+start = time.time()
+G = nx.random_geometric_graph(3,1)
+nx.draw(G, with_labels=True, font_weight='bold')
+plt.draw()
+
+topo = Topology()
+topo.construct_from_graph_bakery(G, AdHocNode, FIFOBroadcastPerfectChannel)
+topo.start()
+
+time.sleep(1)
+topo.nodes[0].mainComponent.start()
+topo.nodes[1].mainComponent.start()
+#plt.show()
+
+while (not topo.nodes[0].mainComponent.done):
+ time.sleep(0.1)
+
+while (not topo.nodes[1].mainComponent.done):
+ time.sleep(0.1)
+
+print(f"{time.time() - start} s passed.")
+print(topo.nodes[2].mainComponent)
diff --git a/PetersonsAlgorithm/Ahc.py b/PetersonsAlgorithm/Ahc.py
new file mode 100644
index 0000000000000000000000000000000000000000..1f7ff817e3f1dbc50e6390c82a8e277d5fb61e03
--- /dev/null
+++ b/PetersonsAlgorithm/Ahc.py
@@ -0,0 +1,380 @@
+#!/usr/bin/env python
+""" Implements the AHC library.
+
+TODO: Longer description of this module to be written.
+
+This program is free software: you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation, either version 3 of the License, or (at your option) any later
+version.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+You should have received a copy of the GNU General Public License along with
+this program. If not, see .
+
+"""
+
+__author__ = "One solo developer"
+__authors__ = ["Ertan Onur", "Berke Tezergil", "etc"]
+__contact__ = "eonur@ceng.metu.edu.tr"
+__copyright__ = "Copyright 2021, WINSLAB"
+__credits__ = ["Ertan Onur", "Berke Tezergil", "etc"]
+__date__ = "2021/04/07"
+__deprecated__ = False
+__email__ = "eonur@ceng.metu.edu.tr"
+__license__ = "GPLv3"
+__maintainer__ = "developer"
+__status__ = "Production"
+__version__ = "0.0.1"
+
+
+import datetime
+import queue
+from enum import Enum
+from threading import Thread, Lock
+
+import matplotlib.pyplot as plt
+import networkx as nx
+
+# TIMING ASSUMPTIONS
+# TODO: Event handling time, message sending time, assumptions about clock (drift, skew, ...)
+# TODO: 1. Asynch, 2. Synch 3. Partial-synch 4. Timed asynch
+# TODO: Causal-order (happen before), total-order,
+# TODO: Causal-order algebra!!!
+# TODO: Implement logical clocks (Lamport clocks, vector clocks) in event handling loop
+
+# AUTOMATA and EXECUTIONS
+# TODO: Let component model hande executions and chekcs on executions (which event, in which order, per process or per system, similarity of executions)
+
+
+# VISUALIZATION
+# TODO: Space-time diagrams for events
+
+# TOPOLOGY MANAGEMENT
+# TODO: Given a graph as input, generate the topology....
+
+inf = float('inf')
+
+# The following are the common default events for all components.
+class EventTypes(Enum):
+ INIT = "init"
+ MFRB = "msgfrombottom"
+ MFRT = "msgfromtop"
+ MFRP = "msgfrompeer"
+
+class MessageDestinationIdentifiers(Enum):
+ LINKLAYERBROADCAST = -1, # sinngle-hop broadcast, means all directly connected nodes
+ NETWORKLAYERBROADCAST = -2 # For flooding over multiple-hops means all connected nodes to me over one or more links
+
+# A Dictionary that holds a list for the same key
+class ConnectorList(dict):
+ def __setitem__(self, key, value):
+ try:
+ self[key]
+ except KeyError:
+ super(ConnectorList, self).__setitem__(key, [])
+ self[key].append(value)
+
+class ConnectorTypes(Enum):
+ DOWN = "DOWN"
+ UP = "UP"
+ PEER = "PEER"
+
+class GenericMessagePayload:
+ def __init__(self, messagepayload):
+ self.messagepayload = messagepayload
+
+class GenericMessageHeader:
+ def __init__(self, messagetype, messagefrom, messageto, nexthop=float('inf'), interfaceid=float('inf'), sequencenumber=-1):
+ self.messagetype = messagetype
+ self.messagefrom = messagefrom
+ self.messageto = messageto
+ self.nexthop = nexthop
+ self.interfaceid = interfaceid
+ self.sequencenumber = sequencenumber
+
+class GenericMessage:
+ def __init__(self, header, payload):
+ self.header = header
+ self.payload = payload
+ self.uniqueid = str(header.messagefrom) + "-" + str(header.sequencenumber)
+
+class Event:
+ def __init__(self, eventsource, event, eventcontent, fromchannel=None):
+ self.eventsource = eventsource
+ self.event = event
+ self.time = datetime.datetime.now()
+ self.eventcontent = eventcontent
+ self.fromchannel = fromchannel
+
+def singleton(cls):
+ instance = [None]
+
+ def wrapper(*args, **kwargs):
+ if instance[0] is None:
+ instance[0] = cls(*args, **kwargs)
+ return instance[0]
+
+ return wrapper
+
+@singleton
+class ComponentRegistry:
+ components = {}
+
+ def get_component_by_instance(self, instance):
+ list_of_keys = list()
+ list_of_items = self.components.items()
+ for item in list_of_items:
+ if item[1] == instance:
+ list_of_keys.append(item[0])
+ return list_of_keys
+
+ def add_component(self, component):
+ key = component.componentname + str(component.componentinstancenumber)
+ self.components[key] = component
+
+ def get_component_by_key(self, componentname, componentinstancenumber):
+ key = componentname + str(componentinstancenumber)
+ return self.components[key]
+
+ def init(self):
+ for itemkey in self.components:
+ cmp = self.components[itemkey]
+ cmp.inputqueue.put_nowait(Event(self, EventTypes.INIT, None))
+
+ def print_components(self):
+ for itemkey in self.components:
+ cmp = self.components[itemkey]
+ print(f"I am {cmp.componentname}.{cmp.componentinstancenumber}")
+ for i in cmp.connectors:
+ connectedcmp = cmp.connectors[i]
+ for p in connectedcmp:
+ print(f"\t{i} {p.componentname}.{p.componentinstancenumber}")
+
+registry = ComponentRegistry()
+
+class ComponentModel:
+ terminated = False
+
+ def on_init(self, eventobj: Event):
+ # print(f"Initializing {self.componentname}.{self.componentinstancenumber}")
+ pass
+
+ def on_message_from_bottom(self, eventobj: Event):
+ print(f"{EventTypes.MFRB} {self.componentname}.{self.componentinstancenumber}")
+
+ def on_message_from_top(self, eventobj: Event):
+ print(f"{EventTypes.MFRT} {self.componentname}.{self.componentinstancenumber}")
+
+ def on_message_from_peer(self, eventobj: Event):
+ print(f"{EventTypes.MFRP} {self.componentname}.{self.componentinstancenumber}")
+
+ def __init__(self, componentname, componentinstancenumber, num_worker_threads=1):
+ self.eventhandlers = {EventTypes.INIT: self.on_init, EventTypes.MFRB: self.on_message_from_bottom,
+ EventTypes.MFRT: self.on_message_from_top, EventTypes.MFRP: self.on_message_from_peer}
+ # Add default handlers to all instantiated components.
+ # If a component overwrites the __init__ method it has to call the super().__init__ method
+ self.inputqueue = queue.Queue()
+ self.componentname = componentname
+ self.componentinstancenumber = componentinstancenumber
+ self.num_worker_threads = num_worker_threads
+ try:
+ if self.connectors:
+ pass
+ except AttributeError:
+ self.connectors = ConnectorList()
+
+ self.registry = ComponentRegistry()
+ self.registry.add_component(self)
+
+ for i in range(self.num_worker_threads):
+ t = Thread(target=self.queue_handler, args=[self.inputqueue])
+ t.daemon = True
+ t.start()
+
+ def connect_me_to_component(self, name, component):
+ try:
+ self.connectors[name] = component
+ except AttributeError:
+ self.connectors = ConnectorList()
+ self.connectors[name] = component
+
+ def connect_me_to_channel(self, name, channel):
+ try:
+ self.connectors[name] = channel
+ except AttributeError:
+ self.connectors = ConnectorList()
+ self.connectors[name] = channel
+ connectornameforchannel = self.componentname + str(self.componentinstancenumber)
+ channel.connect_me_to_component(connectornameforchannel, self)
+
+ def terminate(self):
+ self.terminated = True
+
+ def send_down(self, event: Event):
+ try:
+ for p in self.connectors[ConnectorTypes.DOWN]:
+ p.trigger_event(event)
+ except:
+ pass
+
+ def send_up(self, event: Event):
+ try:
+ for p in self.connectors[ConnectorTypes.UP]:
+ p.trigger_event(event)
+ except:
+ pass
+
+ def send_peer(self, event: Event):
+ try:
+ for p in self.connectors[ConnectorTypes.PEER]:
+ p.trigger_event(event)
+ except:
+ pass
+
+ def send_self(self, event: Event):
+ self.trigger_event(event)
+
+ # noinspection PyArgumentList
+ def queue_handler(self, myqueue):
+ while not self.terminated:
+ workitem = myqueue.get()
+ if workitem.event in self.eventhandlers:
+ self.eventhandlers[workitem.event](eventobj=workitem) # call the handler
+ else:
+ print(f"Event Handler: {workitem.event} is not implemented")
+ myqueue.task_done()
+
+ def trigger_event(self, eventobj: Event):
+ self.inputqueue.put_nowait(eventobj)
+
+@singleton
+class Topology:
+ nodes = {}
+ channels = {}
+
+ def construct_from_graph(self, G: nx.Graph, nodetype, channeltype):
+ self.G = G
+ nodes = list(G.nodes)
+ edges = list(G.edges)
+ for i in nodes:
+ cc = nodetype(nodetype.__name__, i)
+ self.nodes[i] = cc
+ for k in edges:
+ ch = channeltype(channeltype.__name__, str(k[0]) + "-" + str(k[1]))
+ self.channels[k] = ch
+ self.nodes[k[0]].connect_me_to_channel(ConnectorTypes.DOWN, ch)
+ self.nodes[k[1]].connect_me_to_channel(ConnectorTypes.DOWN, ch)
+
+ def construct_from_graph_peterson(self, G: nx.Graph, nodetype, channeltype):
+ self.G = G
+ nodes = list(G.nodes)
+ edges = list(G.edges)
+
+ nodes = nodes[::-1]
+ edges = edges[::-1]
+
+ for i in nodes:
+ cc = nodetype(nodetype.__name__, i,i)
+ self.nodes[i] = cc
+ for k in edges:
+ ch = channeltype(channeltype.__name__, str(k[0]) + "-" + str(k[1]))
+ self.channels[k] = ch
+ self.nodes[k[0]].connect_me_to_channel(ConnectorTypes.DOWN, ch)
+ self.nodes[k[1]].connect_me_to_channel(ConnectorTypes.DOWN, ch)
+
+
+ def construct_single_node(self, nodetype, instancenumber):
+ self.singlenode = nodetype(nodetype.__name__, instancenumber)
+ self.G = nx.Graph()
+ self.G.add_nodes_from([0])
+ self.nodes[0] = self.singlenode
+
+ def construct_sender_receiver(self, sendertype, receivertype, channeltype):
+ self.sender = sendertype(sendertype.__name__, 0)
+ self.receiver = receivertype(receivertype.__name__, 1)
+ ch = channeltype(channeltype.__name__, "0-1")
+ self.G = nx.Graph()
+ self.G.add_nodes_from([0, 1])
+ self.G.add_edges_from([(0, 1)])
+ self.nodes[self.sender.componentinstancenumber] = self.sender
+ self.nodes[self.sender.componentinstancenumber] = self.receiver
+ self.channels[ch.componentinstancenumber] = ch
+ self.sender.connect_me_to_channel(ConnectorTypes.DOWN, ch)
+ self.receiver.connect_me_to_channel(ConnectorTypes.DOWN, ch)
+
+ def allpairs_shortest_path(self):
+ return dict(nx.all_pairs_shortest_path(self.G))
+
+ def shortest_path_to_all(self, myid):
+ path = dict(nx.all_pairs_shortest_path(self.G))
+ nodecnt = len(self.G.nodes)
+ for i in range(nodecnt):
+ print(path[myid][i])
+
+ def start(self):
+ # registry.printComponents()
+ N = len(self.G.nodes)
+ self.compute_forwarding_table()
+ self.nodecolors = ['b'] * N
+ self.nodepos = nx.drawing.spring_layout(self.G)
+ self.lock = Lock()
+ ComponentRegistry().init()
+
+ def compute_forwarding_table(self):
+ #N = len(self.G.nodes)
+ self.ForwardingTable = dict(nx.all_pairs_shortest_path(self.G))
+ # print(f"There are {N} nodes")
+ #for i in range(N):
+ #for j in range(N):
+ #try:
+ #mypath = path[i][j]
+ # print(f"{i}to{j} path = {path[i][j]} nexthop = {path[i][j][1]}")
+ #self.ForwardingTable[i][j] = path[i][j][1]
+
+ # print(f"{i}to{j}path = NONE")
+ #self.ForwardingTable[i][j] = inf # No paths
+ #except IndexError:
+ # print(f"{i}to{j} nexthop = NONE")
+ #self.ForwardingTable[i][j] = i # There is a path but length = 1 (self)
+
+ # all-seeing eye routing table contruction
+ def print_forwarding_table(self):
+ registry.print_components()
+ print('\n'.join([''.join(['{:4}'.format(item) for item in row])
+ for row in list(self.ForwardingTable.values())]))
+
+ # returns the all-seeing eye routing based next hop id
+ def get_next_hop(self, fromId, toId):
+ try:
+ retval = self.ForwardingTable[fromId][toId]
+ return retval[1]
+ except KeyError:
+ return inf
+ except IndexError:
+ return fromId
+
+ # Returns the list of neighbors of a node
+ def get_neighbors(self, nodeId):
+ return sorted([neighbor for neighbor in self.G.neighbors(nodeId)])
+
+ def get_predecessors(self, nodeId):
+ return sorted([neighbor for neighbor in self.G.predecessors(nodeId)])
+
+ def get_successors(self, nodeId):
+ return sorted([neighbor for neighbor in self.G.neighbors(nodeId)])
+
+
+ # Returns the list of neighbors of a node
+ def get_neighbor_count(self, nodeId):
+ # return len([neighbor for neighbor in self.G.neighbors(nodeId)])
+ return self.G.degree[nodeId]
+
+ def plot(self):
+ #self.lock.acquire()
+ nx.draw(self.G, self.nodepos, node_color=self.nodecolors, with_labels=True, font_weight='bold')
+ plt.draw()
+ print(self.nodecolors)
+ #self.lock.release()
diff --git a/PetersonsAlgorithm/Channels.py b/PetersonsAlgorithm/Channels.py
new file mode 100644
index 0000000000000000000000000000000000000000..a6e9bbd2bd74c43b2f86d21833f171255b41a6c8
--- /dev/null
+++ b/PetersonsAlgorithm/Channels.py
@@ -0,0 +1,160 @@
+import queue
+import random
+from enum import Enum
+from threading import Thread
+
+from Ahc import ComponentModel, EventTypes, ConnectorList, MessageDestinationIdentifiers
+from Ahc import Event
+
+# TODO: Channel failure models: lossy-link, fair-loss, stubborn links, perfect links (WHAT ELSE?), FIFO perfect
+# TODO: Logged perfect links (tolerance to crashes), authenticated perfect links
+# TODO: Broadcast channels? and their failure models? Collisions?
+# TODO: Properties of all models separately: e.g., Fair loss, finite duplication, no fabrication in fair-loss link model
+# TODO: Packets: loss, duplication, sequence change, windowing?,
+# TODO: Eventually (unbounded time) or bounded time for message delivery?
+
+
+# Channels have three events: sendtochannel, processinchannel and delivertocomponent
+# Components tell channels to handle a message by the EventTypes.MFRT event, the component calls senddown with the event EventTypes.MFRT
+# First pipeline stage moves the message to the interim pipeline stage with the "processinchannel" event for further processing, such as the channel may drop it, delay it, or whatever
+# Channels deliver the message to output queue by the "delivertocomponent" event
+# The output queue then will send the message up to the connected component(s) using the "messagefromchannel" event
+# The components that will use the channel directly, will have to handle "messagefromchannel" event
+
+class ChannelEventTypes(Enum):
+ INCH = "processinchannel"
+ DLVR = "delivertocomponent"
+
+class Channel(ComponentModel):
+
+ def on_init(self, eventobj: Event):
+
+ pass
+
+ # Overwrite onSendToChannel if you want to do something in the first pipeline stage
+ def on_message_from_top(self, eventobj: Event):
+ # channel receives the input message and will process the message by the process event in the next pipeline stage
+ myevent = Event(eventobj.eventsource, ChannelEventTypes.INCH, eventobj.eventcontent)
+ self.channelqueue.put_nowait(myevent)
+
+ # Overwrite onProcessInChannel if you want to do something in interim pipeline stage
+ def on_process_in_channel(self, eventobj: Event):
+ # Add delay, drop, change order whatever....
+ # Finally put the message in outputqueue with event deliver
+ myevent = Event(eventobj.eventsource, ChannelEventTypes.DLVR, eventobj.eventcontent)
+ self.outputqueue.put_nowait(myevent)
+
+ # Overwrite onDeliverToComponent if you want to do something in the last pipeline stage
+ # onDeliver will deliver the message from the channel to the receiver component using messagefromchannel event
+ def on_deliver_to_component(self, eventobj: Event):
+ callername = eventobj.eventsource.componentinstancenumber
+ for item in self.connectors:
+ callees = self.connectors[item]
+ for callee in callees:
+ calleename = callee.componentinstancenumber
+ # print(f"I am connected to {calleename}. Will check if I have to distribute it to {item}")
+ if calleename == callername:
+ pass
+ else:
+ myevent = Event(eventobj.eventsource, EventTypes.MFRB, eventobj.eventcontent, self.componentinstancenumber)
+ callee.trigger_event(myevent)
+
+ def __init__(self, componentname, componentinstancenumber):
+ super().__init__(componentname, componentinstancenumber)
+ self.outputqueue = queue.Queue()
+ self.channelqueue = queue.Queue()
+ self.eventhandlers[ChannelEventTypes.INCH] = self.on_process_in_channel
+ self.eventhandlers[ChannelEventTypes.DLVR] = self.on_deliver_to_component
+
+ for i in range(self.num_worker_threads):
+ # note that the input queue is handled by the super class...
+ t = Thread(target=self.queue_handler, args=[self.channelqueue])
+ t1 = Thread(target=self.queue_handler, args=[self.outputqueue])
+ t.daemon = True
+ t1.daemon = True
+ t.start()
+ t1.start()
+
+class AHCChannelError(Exception):
+ pass
+
+class P2PFIFOPerfectChannel(Channel):
+
+ # Overwrite onSendToChannel
+ # Channels are broadcast, that is why we have to check channel id's using hdr.interfaceid for P2P
+ def on_message_from_top(self, eventobj: Event):
+ # if channelid != hdr.interfaceif then drop (should not be on this channel)
+ hdr = eventobj.eventcontent.header
+ if hdr.nexthop != MessageDestinationIdentifiers.LINKLAYERBROADCAST:
+ if set(hdr.interfaceid.split("-")) == set(self.componentinstancenumber.split("-")):
+ #print(f"Will forward message since {hdr.interfaceid} and {self.componentinstancenumber}")
+ myevent = Event(eventobj.eventsource, ChannelEventTypes.INCH, eventobj.eventcontent)
+ self.channelqueue.put_nowait(myevent)
+ else:
+ #print(f"Will drop message since {hdr.interfaceid} and {self.componentinstancenumber}")
+ pass
+
+ def on_deliver_to_component(self, eventobj: Event):
+ msg = eventobj.eventcontent
+ callername = eventobj.eventsource.componentinstancenumber
+ for item in self.connectors:
+ callees = self.connectors[item]
+ for callee in callees:
+ calleename = callee.componentinstancenumber
+ # print(f"I am connected to {calleename}. Will check if I have to distribute it to {item}")
+ if calleename == callername:
+ pass
+ else:
+ myevent = Event(eventobj.eventsource, EventTypes.MFRB, eventobj.eventcontent, self.componentinstancenumber)
+ callee.trigger_event(myevent)
+
+ # Overwriting to limit the number of connected components
+ def connect_me_to_component(self, name, component):
+ try:
+ self.connectors[name] = component
+ # print(f"Number of nodes connected: {len(self.ports)}")
+ if len(self.connectors) > 2:
+ raise AHCChannelError("More than two nodes cannot connect to a P2PFIFOChannel")
+ except AttributeError:
+ self.connectors = ConnectorList()
+ self.connectors[name] = component
+ # except AHCChannelError as e:
+ # print( f"{e}" )
+
+class P2PFIFOFairLossChannel(P2PFIFOPerfectChannel):
+ prob = 1
+ duplicationprobability = 0
+ # Overwrite onSendToChannel
+ # Channels are broadcast, that is why we have to check channel id's using hdr.interfaceid for P2P
+
+ def on_message_from_top(self, eventobj: Event):
+ # if channelid != hdr.interfaceif then drop (should not be on this channel)
+ hdr = eventobj.eventcontent.header
+ if hdr.nexthop != MessageDestinationIdentifiers.LINKLAYERBROADCAST:
+ if set(hdr.interfaceid.split("-")) == set(self.componentinstancenumber.split("-")):
+ #print(f"Will forward message since {hdr.interfaceid} and {self.componentinstancenumber}")
+ myevent = Event(eventobj.eventsource, ChannelEventTypes.INCH, eventobj.eventcontent)
+ self.channelqueue.put_nowait(myevent)
+ else:
+ #print(f"Will drop message since {hdr.interfaceid} and {self.componentinstancenumber}")
+ pass
+
+
+ def on_process_in_channel(self, eventobj: Event):
+ if random.random() < self.prob:
+ myevent = Event(eventobj.eventsource, ChannelEventTypes.DLVR, eventobj.eventcontent)
+ self.outputqueue.put_nowait(myevent)
+ if random.random() < self.duplicationprobability:
+ self.channelqueue.put_nowait(eventobj)
+
+ def setPacketLossProbability(self, prob):
+ self.prob = prob
+
+ def setAverageNumberOfDuplicates(self, d):
+ if d > 0:
+ self.duplicationprobability = (d - 1) / d
+ else:
+ self.duplicationprobability = 0
+
+class FIFOBroadcastPerfectChannel(Channel):
+ pass
diff --git a/PetersonsAlgorithm/PetersonComponent.py b/PetersonsAlgorithm/PetersonComponent.py
new file mode 100644
index 0000000000000000000000000000000000000000..d44f18be40b967e5f22d67b3df8d5230a3c4557c
--- /dev/null
+++ b/PetersonsAlgorithm/PetersonComponent.py
@@ -0,0 +1,70 @@
+from enum import Enum
+import time
+from Ahc import ComponentModel, Event, GenericMessageHeader, GenericMessagePayload, GenericMessage, EventTypes
+from threading import Thread
+
+flag = [False,False]
+turn = 0
+
+class PetersonMessageTypes(Enum):
+ INC = "+"
+ DEC = "-"
+
+class ResourceComponent(ComponentModel):
+ def on_init(self, eventobj: Event):
+ print(f"Initializing {self.componentname}.{self.componentinstancenumber}\n")
+ pass
+
+ def on_message_from_bottom(self, eventobj: Event):
+ messagetype = eventobj.eventcontent.header.messagetype ## INC / DEC
+ if messagetype == PetersonMessageTypes.INC:
+ print("Increment value")
+ self.value += 1
+ elif messagetype == PetersonMessageTypes.DEC:
+ print("Decrement value")
+ self.value -= 1
+
+ def __init__(self, componentname, componentinstancenumber):
+ super().__init__(componentname, componentinstancenumber)
+ self.value = 0
+
+ def __repr__(self):
+ return f"Value:{self.value}"
+
+class ProducerConsumerComponent(ComponentModel):
+ def on_init(self, eventobj: Event):
+ print(f"Initializing {self.componentname}.{self.componentinstancenumber}\n")
+ pass
+
+ def send_message(self):
+ global flag,turn
+ message_header = GenericMessageHeader(self.messageType, self.componentinstancenumber, None,interfaceid=f"{self.componentinstancenumber}-{self.resourceComponentId}")
+ message_payload = GenericMessagePayload(None)
+ message = GenericMessage(message_header, message_payload)
+ otherThreadId = 1 - self.id
+ for i in range(self.count):
+ flag[self.id] = True # I want to get in to critical section
+ turn = otherThreadId # Lets wait until other thread finish
+ while flag[otherThreadId] and turn == otherThreadId:
+ time.sleep(0.0000001)
+ continue
+ ####critical section####
+ self.send_down(Event(self, EventTypes.MFRT, message))
+ time.sleep(0.001)
+ ########################
+ flag[self.id] = False # My job is finished
+
+ self.done = True
+
+ def __init__(self, componentname, componentinstancenumber,type = PetersonMessageTypes.INC ,count = 100, resourceComponentId = 2):
+ super().__init__(componentname, componentinstancenumber)
+ self.count = count
+ self.resourceComponentId = resourceComponentId
+ self.done = False
+ self.messageType = type
+ self.id = self.componentinstancenumber
+
+ def start(self):
+ t = Thread(target=self.send_message)
+ t.daemon = True
+ t.start()
diff --git a/PetersonsAlgorithm/petersons.py b/PetersonsAlgorithm/petersons.py
new file mode 100644
index 0000000000000000000000000000000000000000..0ac22661501b2c6db6f49d2847aaa86d7e7912ab
--- /dev/null
+++ b/PetersonsAlgorithm/petersons.py
@@ -0,0 +1,172 @@
+import threading
+import time
+
+mutex = threading.Lock()
+
+def peterson(id,function):
+ global flag,turn
+ otherThreadId = 1 - id
+
+ flag[id] = True # I want to get in to critical section
+ turn = otherThreadId # Lets wait until other thread finish
+ while flag[otherThreadId] and turn == otherThreadId:
+ time.sleep(0.0000001)
+ continue
+
+ ####critical section####
+ function(id)
+ ########################
+
+ flag[id] = False # My job is finished
+
+def produce(id):
+ global counter
+ counter += 1
+ #print("Thread {} counter:".format(id),counter)
+
+def consume(id):
+ global counter
+ counter -= 1
+ #print("Thread {} counter:".format(id),counter)
+
+def producer(id,ntime):
+ global waitFlag
+ while waitFlag:
+ continue
+ print("Producer {} started".format(id))
+ for i in range(ntime):
+ produce(id)
+ print("Producer {} finished".format(id))
+
+def producerPeterson(id,ntime):
+ global waitFlag
+ while waitFlag:
+ continue
+ print("Producer {} started".format(id))
+ for i in range(ntime):
+ peterson(id,produce)
+ print("Producer {} finished".format(id))
+
+def producerMutex(id,ntime):
+ global waitFlag
+ while waitFlag:
+ continue
+ print("Producer {} started".format(id))
+ for i in range(ntime):
+ with mutex:
+ produce(id)
+ print("Producer {} finished".format(id))
+
+def consumer(id,ntime):
+ global waitFlag
+ while waitFlag:
+ continue
+ print("Consumer {} started".format(id))
+ for i in range(ntime):
+ consume(id)
+ print("Consumer {} finished".format(id))
+
+def consumerPeterson(id,ntime):
+ global waitFlag
+ while waitFlag:
+ continue
+ print("Consumer {} started".format(id))
+ for i in range(ntime):
+ peterson(id,consume)
+ print("Consumer {} finished".format(id))
+
+def consumerMutex(id,ntime):
+ global waitFlag
+ while waitFlag:
+ continue
+ print("Consumer {} started".format(id))
+ for i in range(ntime):
+ with mutex:
+ consume(id)
+ print("Consumer {} finished".format(id))
+
+if __name__ == "__main__":
+
+ ntime = 500000
+
+ waitFlag = True
+ counter = 0
+ t0 = threading.Thread(target=producer, args=(0,ntime))
+ t1 = threading.Thread(target=producer, args=(1,ntime))
+ t0.start()
+ t1.start()
+ startTime = time.time()
+ waitFlag = False
+ t0.join()
+ t1.join()
+ print("Increment counters without any syncronization: Expected output {} , real output {}".format(ntime*2,counter))
+ print("Time: {}".format(time.time()-startTime))
+
+ waitFlag = True
+ counter = 0
+ t0 = threading.Thread(target=producerMutex, args=(0,ntime))
+ t1 = threading.Thread(target=producerMutex, args=(1,ntime))
+ t0.start()
+ t1.start()
+ startTime = time.time()
+ waitFlag = False
+ t0.join()
+ t1.join()
+ print("Increment counters with mutex: Expected output {} , real output {}".format(ntime*2,counter))
+ print("Time: {}".format(time.time()-startTime))
+
+ waitFlag = True
+ counter = 0
+ t0 = threading.Thread(target=producer, args=(0,ntime))
+ t1 = threading.Thread(target=consumer, args=(1,ntime))
+ t0.start()
+ t1.start()
+ startTime = time.time()
+ waitFlag = False
+ t0.join()
+ t1.join()
+ print("Producer-consumer without any syncronization: Expected output 0 , real output {}".format(counter))
+ print("Time: {}".format(time.time()-startTime))
+
+ waitFlag = True
+ counter = 0
+ t0 = threading.Thread(target=producerMutex, args=(0,ntime))
+ t1 = threading.Thread(target=consumerMutex, args=(1,ntime))
+ t0.start()
+ t1.start()
+ startTime = time.time()
+ waitFlag = False
+ t0.join()
+ t1.join()
+ print("Producer-consumer with mutex: Expected output 0 , real output {}".format(counter))
+ print("Time: {}".format(time.time()-startTime))
+
+ waitFlag = True
+ flag = [False,False]
+ turn = 0
+ counter = 0
+ t0 = threading.Thread(target=producerPeterson, args=(0,ntime))
+ t1 = threading.Thread(target=producerPeterson, args=(1,ntime))
+ t0.start()
+ t1.start()
+ startTime = time.time()
+ waitFlag = False
+ t0.join()
+ t1.join()
+ print("Increment counters with Petersons Algorithm: Expected output {} , real output {}".format(ntime*2,counter))
+ print("Time: {}".format(time.time()-startTime))
+
+ waitFlag = True
+ flag = [False,False]
+ turn = 0
+ counter = 0
+ t0 = threading.Thread(target=producerPeterson, args=(0,ntime))
+ t1 = threading.Thread(target=consumerPeterson, args=(1,ntime))
+ t0.start()
+ t1.start()
+ startTime = time.time()
+ waitFlag = False
+ t0.join()
+ t1.join()
+ print("Producer-consumer with Petersons Algorithm: Expected output 0 , real output {}".format(counter))
+ print("Time: {}".format(time.time()-startTime))
\ No newline at end of file
diff --git a/PetersonsAlgorithm/testPeterson.py b/PetersonsAlgorithm/testPeterson.py
new file mode 100644
index 0000000000000000000000000000000000000000..64563e09e74f0dd6383f5aa41a53d26a711900e1
--- /dev/null
+++ b/PetersonsAlgorithm/testPeterson.py
@@ -0,0 +1,58 @@
+import random
+import time
+from enum import Enum
+
+import matplotlib.pyplot as plt
+import networkx as nx
+
+from PetersonComponent import *
+from Ahc import ComponentModel, MessageDestinationIdentifiers, Event, GenericMessageHeader, GenericMessagePayload, GenericMessage, EventTypes, ConnectorTypes, Topology, ComponentRegistry
+from Channels import FIFOBroadcastPerfectChannel
+
+class AdHocNode(ComponentModel):
+
+ def on_init(self, eventobj: Event):
+ print(f"Initializing {self.componentname}.{self.componentinstancenumber}")
+
+ def on_message_from_top(self, eventobj: Event):
+ print("Outgoing Message from ", self.componentinstancenumber, ". Node")
+ self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent))
+
+ def on_message_from_bottom(self, eventobj: Event):
+ print("Incoming Message to ", self.componentinstancenumber, ". Node, From: ", eventobj.eventcontent.header.messagefrom)
+ self.send_up(Event(self, EventTypes.MFRB, eventobj.eventcontent))
+
+ def __init__(self, componentname, componentid, type):
+ if type == 0:
+ self.mainComponent = ProducerConsumerComponent("ProducerComponent", componentid,PetersonMessageTypes.INC)
+ self.mainComponent.connect_me_to_component(ConnectorTypes.DOWN, self)
+ elif type == 1:
+ self.mainComponent = ProducerConsumerComponent("ConsumerComponent", componentid,PetersonMessageTypes.DEC)
+ self.mainComponent.connect_me_to_component(ConnectorTypes.DOWN, self)
+ elif type == 2:
+ self.mainComponent = ResourceComponent("ResourceComponent", componentid)
+ self.connect_me_to_component(ConnectorTypes.UP, self.mainComponent)
+ super().__init__(componentname, componentid)
+
+start = time.time()
+G = nx.random_geometric_graph(3,1)
+nx.draw(G, with_labels=True, font_weight='bold')
+plt.draw()
+
+topo = Topology()
+topo.construct_from_graph_peterson(G, AdHocNode, FIFOBroadcastPerfectChannel)
+topo.start()
+
+time.sleep(1)
+topo.nodes[0].mainComponent.start()
+topo.nodes[1].mainComponent.start()
+plt.show()
+
+while (not topo.nodes[0].mainComponent.done):
+ time.sleep(0.1)
+
+while (not topo.nodes[1].mainComponent.done):
+ time.sleep(0.1)
+
+print(f"{time.time() - start} s passed.")
+print(topo.nodes[2].mainComponent)