diff --git a/BakeryAlgorithm/Ahc.py b/BakeryAlgorithm/Ahc.py new file mode 100644 index 0000000000000000000000000000000000000000..a610b3aa513b1257cdf4d34fe96218e3dd285b0b --- /dev/null +++ b/BakeryAlgorithm/Ahc.py @@ -0,0 +1,380 @@ +#!/usr/bin/env python +""" Implements the AHC library. + +TODO: Longer description of this module to be written. + +This program is free software: you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation, either version 3 of the License, or (at your option) any later +version. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +You should have received a copy of the GNU General Public License along with +this program. If not, see . + +""" + +__author__ = "One solo developer" +__authors__ = ["Ertan Onur", "Berke Tezergil", "etc"] +__contact__ = "eonur@ceng.metu.edu.tr" +__copyright__ = "Copyright 2021, WINSLAB" +__credits__ = ["Ertan Onur", "Berke Tezergil", "etc"] +__date__ = "2021/04/07" +__deprecated__ = False +__email__ = "eonur@ceng.metu.edu.tr" +__license__ = "GPLv3" +__maintainer__ = "developer" +__status__ = "Production" +__version__ = "0.0.1" + + +import datetime +import queue +from enum import Enum +from threading import Thread, Lock + +import matplotlib.pyplot as plt +import networkx as nx + +# TIMING ASSUMPTIONS +# TODO: Event handling time, message sending time, assumptions about clock (drift, skew, ...) +# TODO: 1. Asynch, 2. Synch 3. Partial-synch 4. Timed asynch +# TODO: Causal-order (happen before), total-order, +# TODO: Causal-order algebra!!! +# TODO: Implement logical clocks (Lamport clocks, vector clocks) in event handling loop + +# AUTOMATA and EXECUTIONS +# TODO: Let component model hande executions and chekcs on executions (which event, in which order, per process or per system, similarity of executions) + + +# VISUALIZATION +# TODO: Space-time diagrams for events + +# TOPOLOGY MANAGEMENT +# TODO: Given a graph as input, generate the topology.... + +inf = float('inf') + +# The following are the common default events for all components. +class EventTypes(Enum): + INIT = "init" + MFRB = "msgfrombottom" + MFRT = "msgfromtop" + MFRP = "msgfrompeer" + +class MessageDestinationIdentifiers(Enum): + LINKLAYERBROADCAST = -1, # sinngle-hop broadcast, means all directly connected nodes + NETWORKLAYERBROADCAST = -2 # For flooding over multiple-hops means all connected nodes to me over one or more links + +# A Dictionary that holds a list for the same key +class ConnectorList(dict): + def __setitem__(self, key, value): + try: + self[key] + except KeyError: + super(ConnectorList, self).__setitem__(key, []) + self[key].append(value) + +class ConnectorTypes(Enum): + DOWN = "DOWN" + UP = "UP" + PEER = "PEER" + +class GenericMessagePayload: + def __init__(self, messagepayload): + self.messagepayload = messagepayload + +class GenericMessageHeader: + def __init__(self, messagetype, messagefrom, messageto, nexthop=float('inf'), interfaceid=float('inf'), sequencenumber=-1): + self.messagetype = messagetype + self.messagefrom = messagefrom + self.messageto = messageto + self.nexthop = nexthop + self.interfaceid = interfaceid + self.sequencenumber = sequencenumber + +class GenericMessage: + def __init__(self, header, payload): + self.header = header + self.payload = payload + self.uniqueid = str(header.messagefrom) + "-" + str(header.sequencenumber) + +class Event: + def __init__(self, eventsource, event, eventcontent, fromchannel=None): + self.eventsource = eventsource + self.event = event + self.time = datetime.datetime.now() + self.eventcontent = eventcontent + self.fromchannel = fromchannel + +def singleton(cls): + instance = [None] + + def wrapper(*args, **kwargs): + if instance[0] is None: + instance[0] = cls(*args, **kwargs) + return instance[0] + + return wrapper + +@singleton +class ComponentRegistry: + components = {} + + def get_component_by_instance(self, instance): + list_of_keys = list() + list_of_items = self.components.items() + for item in list_of_items: + if item[1] == instance: + list_of_keys.append(item[0]) + return list_of_keys + + def add_component(self, component): + key = component.componentname + str(component.componentinstancenumber) + self.components[key] = component + + def get_component_by_key(self, componentname, componentinstancenumber): + key = componentname + str(componentinstancenumber) + return self.components[key] + + def init(self): + for itemkey in self.components: + cmp = self.components[itemkey] + cmp.inputqueue.put_nowait(Event(self, EventTypes.INIT, None)) + + def print_components(self): + for itemkey in self.components: + cmp = self.components[itemkey] + print(f"I am {cmp.componentname}.{cmp.componentinstancenumber}") + for i in cmp.connectors: + connectedcmp = cmp.connectors[i] + for p in connectedcmp: + print(f"\t{i} {p.componentname}.{p.componentinstancenumber}") + +registry = ComponentRegistry() + +class ComponentModel: + terminated = False + + def on_init(self, eventobj: Event): + # print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + pass + + def on_message_from_bottom(self, eventobj: Event): + print(f"{EventTypes.MFRB} {self.componentname}.{self.componentinstancenumber}") + + def on_message_from_top(self, eventobj: Event): + print(f"{EventTypes.MFRT} {self.componentname}.{self.componentinstancenumber}") + + def on_message_from_peer(self, eventobj: Event): + print(f"{EventTypes.MFRP} {self.componentname}.{self.componentinstancenumber}") + + def __init__(self, componentname, componentinstancenumber, num_worker_threads=1): + self.eventhandlers = {EventTypes.INIT: self.on_init, EventTypes.MFRB: self.on_message_from_bottom, + EventTypes.MFRT: self.on_message_from_top, EventTypes.MFRP: self.on_message_from_peer} + # Add default handlers to all instantiated components. + # If a component overwrites the __init__ method it has to call the super().__init__ method + self.inputqueue = queue.Queue() + self.componentname = componentname + self.componentinstancenumber = componentinstancenumber + self.num_worker_threads = num_worker_threads + try: + if self.connectors: + pass + except AttributeError: + self.connectors = ConnectorList() + + self.registry = ComponentRegistry() + self.registry.add_component(self) + + for i in range(self.num_worker_threads): + t = Thread(target=self.queue_handler, args=[self.inputqueue]) + t.daemon = True + t.start() + + def connect_me_to_component(self, name, component): + try: + self.connectors[name] = component + except AttributeError: + self.connectors = ConnectorList() + self.connectors[name] = component + + def connect_me_to_channel(self, name, channel): + try: + self.connectors[name] = channel + except AttributeError: + self.connectors = ConnectorList() + self.connectors[name] = channel + connectornameforchannel = self.componentname + str(self.componentinstancenumber) + channel.connect_me_to_component(connectornameforchannel, self) + + def terminate(self): + self.terminated = True + + def send_down(self, event: Event): + try: + for p in self.connectors[ConnectorTypes.DOWN]: + p.trigger_event(event) + except: + pass + + def send_up(self, event: Event): + try: + for p in self.connectors[ConnectorTypes.UP]: + p.trigger_event(event) + except: + pass + + def send_peer(self, event: Event): + try: + for p in self.connectors[ConnectorTypes.PEER]: + p.trigger_event(event) + except: + pass + + def send_self(self, event: Event): + self.trigger_event(event) + + # noinspection PyArgumentList + def queue_handler(self, myqueue): + while not self.terminated: + workitem = myqueue.get() + if workitem.event in self.eventhandlers: + self.eventhandlers[workitem.event](eventobj=workitem) # call the handler + else: + print(f"Event Handler: {workitem.event} is not implemented") + myqueue.task_done() + + def trigger_event(self, eventobj: Event): + self.inputqueue.put_nowait(eventobj) + +@singleton +class Topology: + nodes = {} + channels = {} + + def construct_from_graph(self, G: nx.Graph, nodetype, channeltype): + self.G = G + nodes = list(G.nodes) + edges = list(G.edges) + for i in nodes: + cc = nodetype(nodetype.__name__, i) + self.nodes[i] = cc + for k in edges: + ch = channeltype(channeltype.__name__, str(k[0]) + "-" + str(k[1])) + self.channels[k] = ch + self.nodes[k[0]].connect_me_to_channel(ConnectorTypes.DOWN, ch) + self.nodes[k[1]].connect_me_to_channel(ConnectorTypes.DOWN, ch) + + def construct_from_graph_bakery(self, G: nx.Graph, nodetype, channeltype): + self.G = G + nodes = list(G.nodes) + edges = list(G.edges) + + nodes = nodes[::-1] + edges = edges[::-1] + + for i in nodes: + cc = nodetype(nodetype.__name__, i,i) + self.nodes[i] = cc + for k in edges: + ch = channeltype(channeltype.__name__, str(k[0]) + "-" + str(k[1])) + self.channels[k] = ch + self.nodes[k[0]].connect_me_to_channel(ConnectorTypes.DOWN, ch) + self.nodes[k[1]].connect_me_to_channel(ConnectorTypes.DOWN, ch) + + + def construct_single_node(self, nodetype, instancenumber): + self.singlenode = nodetype(nodetype.__name__, instancenumber) + self.G = nx.Graph() + self.G.add_nodes_from([0]) + self.nodes[0] = self.singlenode + + def construct_sender_receiver(self, sendertype, receivertype, channeltype): + self.sender = sendertype(sendertype.__name__, 0) + self.receiver = receivertype(receivertype.__name__, 1) + ch = channeltype(channeltype.__name__, "0-1") + self.G = nx.Graph() + self.G.add_nodes_from([0, 1]) + self.G.add_edges_from([(0, 1)]) + self.nodes[self.sender.componentinstancenumber] = self.sender + self.nodes[self.sender.componentinstancenumber] = self.receiver + self.channels[ch.componentinstancenumber] = ch + self.sender.connect_me_to_channel(ConnectorTypes.DOWN, ch) + self.receiver.connect_me_to_channel(ConnectorTypes.DOWN, ch) + + def allpairs_shortest_path(self): + return dict(nx.all_pairs_shortest_path(self.G)) + + def shortest_path_to_all(self, myid): + path = dict(nx.all_pairs_shortest_path(self.G)) + nodecnt = len(self.G.nodes) + for i in range(nodecnt): + print(path[myid][i]) + + def start(self): + # registry.printComponents() + N = len(self.G.nodes) + self.compute_forwarding_table() + self.nodecolors = ['b'] * N + self.nodepos = nx.drawing.spring_layout(self.G) + self.lock = Lock() + ComponentRegistry().init() + + def compute_forwarding_table(self): + #N = len(self.G.nodes) + self.ForwardingTable = dict(nx.all_pairs_shortest_path(self.G)) + # print(f"There are {N} nodes") + #for i in range(N): + #for j in range(N): + #try: + #mypath = path[i][j] + # print(f"{i}to{j} path = {path[i][j]} nexthop = {path[i][j][1]}") + #self.ForwardingTable[i][j] = path[i][j][1] + + # print(f"{i}to{j}path = NONE") + #self.ForwardingTable[i][j] = inf # No paths + #except IndexError: + # print(f"{i}to{j} nexthop = NONE") + #self.ForwardingTable[i][j] = i # There is a path but length = 1 (self) + + # all-seeing eye routing table contruction + def print_forwarding_table(self): + registry.print_components() + print('\n'.join([''.join(['{:4}'.format(item) for item in row]) + for row in list(self.ForwardingTable.values())])) + + # returns the all-seeing eye routing based next hop id + def get_next_hop(self, fromId, toId): + try: + retval = self.ForwardingTable[fromId][toId] + return retval[1] + except KeyError: + return inf + except IndexError: + return fromId + + # Returns the list of neighbors of a node + def get_neighbors(self, nodeId): + return sorted([neighbor for neighbor in self.G.neighbors(nodeId)]) + + def get_predecessors(self, nodeId): + return sorted([neighbor for neighbor in self.G.predecessors(nodeId)]) + + def get_successors(self, nodeId): + return sorted([neighbor for neighbor in self.G.neighbors(nodeId)]) + + + # Returns the list of neighbors of a node + def get_neighbor_count(self, nodeId): + # return len([neighbor for neighbor in self.G.neighbors(nodeId)]) + return self.G.degree[nodeId] + + def plot(self): + #self.lock.acquire() + nx.draw(self.G, self.nodepos, node_color=self.nodecolors, with_labels=True, font_weight='bold') + plt.draw() + print(self.nodecolors) + #self.lock.release() diff --git a/BakeryAlgorithm/BakeryComponent.py b/BakeryAlgorithm/BakeryComponent.py new file mode 100644 index 0000000000000000000000000000000000000000..5984e3e349a3ea77a538f22b0ca2381562c05939 --- /dev/null +++ b/BakeryAlgorithm/BakeryComponent.py @@ -0,0 +1,71 @@ +from enum import Enum +import time +from Ahc import ComponentModel, Event, GenericMessageHeader, GenericMessagePayload, GenericMessage, EventTypes +from threading import Thread + +node_number = 2 +waiting_ticket = [True]*node_number +ticket_values = [0]*node_number + +class BakeryMessageTypes(Enum): + INC = "+" + DEC = "-" + +class ResourceComponent(ComponentModel): + def on_init(self, eventobj: Event): + print(f"Initializing {self.componentname}.{self.componentinstancenumber}\n") + pass + + def on_message_from_bottom(self, eventobj: Event): + messagetype = eventobj.eventcontent.header.messagetype ## INC / DEC + if messagetype == BakeryMessageTypes.INC: + print("Increment value") + self.value += 1 + elif messagetype == BakeryMessageTypes.DEC: + print("Decrement value") + self.value -= 1 + + def __init__(self, componentname, componentinstancenumber): + super().__init__(componentname, componentinstancenumber) + self.value = 0 + + def __repr__(self): + return f"Value:{self.value}" + +class BakeryComponent(ComponentModel): + def on_init(self, eventobj: Event): + print(f"Initializing {self.componentname}.{self.componentinstancenumber}\n") + pass + + def send_message(self): + global flag,turn + message_header = GenericMessageHeader(self.messageType, self.componentinstancenumber, None,interfaceid=f"{self.componentinstancenumber}-{self.resourceComponentId}") + message_payload = GenericMessagePayload(None) + message = GenericMessage(message_header, message_payload) + + ticket_values[self.id] = max(ticket_values)+1 + waiting_ticket[self.id] = False + for i in range(node_number): + while waiting_ticket[i] == True: + continue + while ticket_values[i] > 0 and (ticket_values[i] < ticket_values[self.id] or(ticket_values[i] == ticket_values[self.id] and i < self.id)): + continue + ####critical section#### + self.send_down(Event(self, EventTypes.MFRT, message)) + time.sleep(0.001) + ######################## + ticket_values[self.id] = 0 + + self.done = True + + def __init__(self, componentname, componentinstancenumber,type = BakeryMessageTypes.INC , resourceComponentId = 2): + super().__init__(componentname, componentinstancenumber) + self.resourceComponentId = resourceComponentId + self.done = False + self.messageType = type + self.id = self.componentinstancenumber + + def start(self): + t = Thread(target=self.send_message) + t.daemon = True + t.start() diff --git a/BakeryAlgorithm/Channels.py b/BakeryAlgorithm/Channels.py new file mode 100644 index 0000000000000000000000000000000000000000..a6e9bbd2bd74c43b2f86d21833f171255b41a6c8 --- /dev/null +++ b/BakeryAlgorithm/Channels.py @@ -0,0 +1,160 @@ +import queue +import random +from enum import Enum +from threading import Thread + +from Ahc import ComponentModel, EventTypes, ConnectorList, MessageDestinationIdentifiers +from Ahc import Event + +# TODO: Channel failure models: lossy-link, fair-loss, stubborn links, perfect links (WHAT ELSE?), FIFO perfect +# TODO: Logged perfect links (tolerance to crashes), authenticated perfect links +# TODO: Broadcast channels? and their failure models? Collisions? +# TODO: Properties of all models separately: e.g., Fair loss, finite duplication, no fabrication in fair-loss link model +# TODO: Packets: loss, duplication, sequence change, windowing?, +# TODO: Eventually (unbounded time) or bounded time for message delivery? + + +# Channels have three events: sendtochannel, processinchannel and delivertocomponent +# Components tell channels to handle a message by the EventTypes.MFRT event, the component calls senddown with the event EventTypes.MFRT +# First pipeline stage moves the message to the interim pipeline stage with the "processinchannel" event for further processing, such as the channel may drop it, delay it, or whatever +# Channels deliver the message to output queue by the "delivertocomponent" event +# The output queue then will send the message up to the connected component(s) using the "messagefromchannel" event +# The components that will use the channel directly, will have to handle "messagefromchannel" event + +class ChannelEventTypes(Enum): + INCH = "processinchannel" + DLVR = "delivertocomponent" + +class Channel(ComponentModel): + + def on_init(self, eventobj: Event): + + pass + + # Overwrite onSendToChannel if you want to do something in the first pipeline stage + def on_message_from_top(self, eventobj: Event): + # channel receives the input message and will process the message by the process event in the next pipeline stage + myevent = Event(eventobj.eventsource, ChannelEventTypes.INCH, eventobj.eventcontent) + self.channelqueue.put_nowait(myevent) + + # Overwrite onProcessInChannel if you want to do something in interim pipeline stage + def on_process_in_channel(self, eventobj: Event): + # Add delay, drop, change order whatever.... + # Finally put the message in outputqueue with event deliver + myevent = Event(eventobj.eventsource, ChannelEventTypes.DLVR, eventobj.eventcontent) + self.outputqueue.put_nowait(myevent) + + # Overwrite onDeliverToComponent if you want to do something in the last pipeline stage + # onDeliver will deliver the message from the channel to the receiver component using messagefromchannel event + def on_deliver_to_component(self, eventobj: Event): + callername = eventobj.eventsource.componentinstancenumber + for item in self.connectors: + callees = self.connectors[item] + for callee in callees: + calleename = callee.componentinstancenumber + # print(f"I am connected to {calleename}. Will check if I have to distribute it to {item}") + if calleename == callername: + pass + else: + myevent = Event(eventobj.eventsource, EventTypes.MFRB, eventobj.eventcontent, self.componentinstancenumber) + callee.trigger_event(myevent) + + def __init__(self, componentname, componentinstancenumber): + super().__init__(componentname, componentinstancenumber) + self.outputqueue = queue.Queue() + self.channelqueue = queue.Queue() + self.eventhandlers[ChannelEventTypes.INCH] = self.on_process_in_channel + self.eventhandlers[ChannelEventTypes.DLVR] = self.on_deliver_to_component + + for i in range(self.num_worker_threads): + # note that the input queue is handled by the super class... + t = Thread(target=self.queue_handler, args=[self.channelqueue]) + t1 = Thread(target=self.queue_handler, args=[self.outputqueue]) + t.daemon = True + t1.daemon = True + t.start() + t1.start() + +class AHCChannelError(Exception): + pass + +class P2PFIFOPerfectChannel(Channel): + + # Overwrite onSendToChannel + # Channels are broadcast, that is why we have to check channel id's using hdr.interfaceid for P2P + def on_message_from_top(self, eventobj: Event): + # if channelid != hdr.interfaceif then drop (should not be on this channel) + hdr = eventobj.eventcontent.header + if hdr.nexthop != MessageDestinationIdentifiers.LINKLAYERBROADCAST: + if set(hdr.interfaceid.split("-")) == set(self.componentinstancenumber.split("-")): + #print(f"Will forward message since {hdr.interfaceid} and {self.componentinstancenumber}") + myevent = Event(eventobj.eventsource, ChannelEventTypes.INCH, eventobj.eventcontent) + self.channelqueue.put_nowait(myevent) + else: + #print(f"Will drop message since {hdr.interfaceid} and {self.componentinstancenumber}") + pass + + def on_deliver_to_component(self, eventobj: Event): + msg = eventobj.eventcontent + callername = eventobj.eventsource.componentinstancenumber + for item in self.connectors: + callees = self.connectors[item] + for callee in callees: + calleename = callee.componentinstancenumber + # print(f"I am connected to {calleename}. Will check if I have to distribute it to {item}") + if calleename == callername: + pass + else: + myevent = Event(eventobj.eventsource, EventTypes.MFRB, eventobj.eventcontent, self.componentinstancenumber) + callee.trigger_event(myevent) + + # Overwriting to limit the number of connected components + def connect_me_to_component(self, name, component): + try: + self.connectors[name] = component + # print(f"Number of nodes connected: {len(self.ports)}") + if len(self.connectors) > 2: + raise AHCChannelError("More than two nodes cannot connect to a P2PFIFOChannel") + except AttributeError: + self.connectors = ConnectorList() + self.connectors[name] = component + # except AHCChannelError as e: + # print( f"{e}" ) + +class P2PFIFOFairLossChannel(P2PFIFOPerfectChannel): + prob = 1 + duplicationprobability = 0 + # Overwrite onSendToChannel + # Channels are broadcast, that is why we have to check channel id's using hdr.interfaceid for P2P + + def on_message_from_top(self, eventobj: Event): + # if channelid != hdr.interfaceif then drop (should not be on this channel) + hdr = eventobj.eventcontent.header + if hdr.nexthop != MessageDestinationIdentifiers.LINKLAYERBROADCAST: + if set(hdr.interfaceid.split("-")) == set(self.componentinstancenumber.split("-")): + #print(f"Will forward message since {hdr.interfaceid} and {self.componentinstancenumber}") + myevent = Event(eventobj.eventsource, ChannelEventTypes.INCH, eventobj.eventcontent) + self.channelqueue.put_nowait(myevent) + else: + #print(f"Will drop message since {hdr.interfaceid} and {self.componentinstancenumber}") + pass + + + def on_process_in_channel(self, eventobj: Event): + if random.random() < self.prob: + myevent = Event(eventobj.eventsource, ChannelEventTypes.DLVR, eventobj.eventcontent) + self.outputqueue.put_nowait(myevent) + if random.random() < self.duplicationprobability: + self.channelqueue.put_nowait(eventobj) + + def setPacketLossProbability(self, prob): + self.prob = prob + + def setAverageNumberOfDuplicates(self, d): + if d > 0: + self.duplicationprobability = (d - 1) / d + else: + self.duplicationprobability = 0 + +class FIFOBroadcastPerfectChannel(Channel): + pass diff --git a/BakeryAlgorithm/bakery.py b/BakeryAlgorithm/bakery.py new file mode 100644 index 0000000000000000000000000000000000000000..960ed4c3b35cdbe548f0a17eda9131d2b2d8956c --- /dev/null +++ b/BakeryAlgorithm/bakery.py @@ -0,0 +1,46 @@ +import threading + +def produce(id): + global counter + counter += 1 + #print("Thread {} counter:".format(id),counter) + +def threadRoutine(id): + global waitFlag,waitingTicket,ticket_values,threadNumber + while waitFlag: + continue + print("Thread {} started".format(id)) + + ticketValues[id] = max(ticketValues)+1 + waitingTicket[id] = False + + for i in range(threadNumber): + while waitingTicket[i] == True: + continue + while ticketValues[i] > 0 and (ticketValues[i] < ticketValues[id] or(ticketValues[i] == ticketValues[id] and i < id)): + continue + ####critical section#### + produce(id) + print("Thread {} done its job in critical section".format(id)) + ######################## + ticketValues[id] = 0 + print("Thread {} finished".format(id)) + +if __name__ == "__main__": + + counter = 0 + threadNumber = 20 + waitingTicket = [True]*threadNumber + ticketValues = [0]*threadNumber + thread_list = [] + for i in range(threadNumber): + thread_list.append(threading.Thread(target=threadRoutine, args=(i,))) + waitFlag = True + for thread in thread_list: + thread.start() + waitFlag = False + for thread in thread_list: + thread.join() + + print("Expected output {} , real output {}".format(threadNumber,counter)) + \ No newline at end of file diff --git a/BakeryAlgorithm/testBakery.py b/BakeryAlgorithm/testBakery.py new file mode 100644 index 0000000000000000000000000000000000000000..2277971b9bb641364120c3034e97d839236c764d --- /dev/null +++ b/BakeryAlgorithm/testBakery.py @@ -0,0 +1,58 @@ +import random +import time +from enum import Enum + +import matplotlib.pyplot as plt +import networkx as nx + +from BakeryComponent import * +from Ahc import ComponentModel, Event, GenericMessageHeader, GenericMessagePayload, GenericMessage, EventTypes, ConnectorTypes, Topology, ComponentRegistry +from Channels import FIFOBroadcastPerfectChannel + +class AdHocNode(ComponentModel): + + def on_init(self, eventobj: Event): + print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + + def on_message_from_top(self, eventobj: Event): + print("Outgoing Message from ", self.componentinstancenumber, ". Node") + self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent)) + + def on_message_from_bottom(self, eventobj: Event): + print("Incoming Message to ", self.componentinstancenumber, ". Node, From: ", eventobj.eventcontent.header.messagefrom) + self.send_up(Event(self, EventTypes.MFRB, eventobj.eventcontent)) + + def __init__(self, componentname, componentid, type): + if type == 0: + self.mainComponent = BakeryComponent("BakeryProducerComponent", componentid,BakeryMessageTypes.INC) + self.mainComponent.connect_me_to_component(ConnectorTypes.DOWN, self) + elif type == 1: + self.mainComponent = BakeryComponent("BakeryConsumerComponent", componentid,BakeryMessageTypes.DEC) + self.mainComponent.connect_me_to_component(ConnectorTypes.DOWN, self) + elif type == 2: + self.mainComponent = ResourceComponent("ResourceComponent", componentid) + self.connect_me_to_component(ConnectorTypes.UP, self.mainComponent) + super().__init__(componentname, componentid) + +start = time.time() +G = nx.random_geometric_graph(3,1) +nx.draw(G, with_labels=True, font_weight='bold') +plt.draw() + +topo = Topology() +topo.construct_from_graph_bakery(G, AdHocNode, FIFOBroadcastPerfectChannel) +topo.start() + +time.sleep(1) +topo.nodes[0].mainComponent.start() +topo.nodes[1].mainComponent.start() +#plt.show() + +while (not topo.nodes[0].mainComponent.done): + time.sleep(0.1) + +while (not topo.nodes[1].mainComponent.done): + time.sleep(0.1) + +print(f"{time.time() - start} s passed.") +print(topo.nodes[2].mainComponent) diff --git a/PetersonsAlgorithm/Ahc.py b/PetersonsAlgorithm/Ahc.py new file mode 100644 index 0000000000000000000000000000000000000000..1f7ff817e3f1dbc50e6390c82a8e277d5fb61e03 --- /dev/null +++ b/PetersonsAlgorithm/Ahc.py @@ -0,0 +1,380 @@ +#!/usr/bin/env python +""" Implements the AHC library. + +TODO: Longer description of this module to be written. + +This program is free software: you can redistribute it and/or modify it under +the terms of the GNU General Public License as published by the Free Software +Foundation, either version 3 of the License, or (at your option) any later +version. + +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. +You should have received a copy of the GNU General Public License along with +this program. If not, see . + +""" + +__author__ = "One solo developer" +__authors__ = ["Ertan Onur", "Berke Tezergil", "etc"] +__contact__ = "eonur@ceng.metu.edu.tr" +__copyright__ = "Copyright 2021, WINSLAB" +__credits__ = ["Ertan Onur", "Berke Tezergil", "etc"] +__date__ = "2021/04/07" +__deprecated__ = False +__email__ = "eonur@ceng.metu.edu.tr" +__license__ = "GPLv3" +__maintainer__ = "developer" +__status__ = "Production" +__version__ = "0.0.1" + + +import datetime +import queue +from enum import Enum +from threading import Thread, Lock + +import matplotlib.pyplot as plt +import networkx as nx + +# TIMING ASSUMPTIONS +# TODO: Event handling time, message sending time, assumptions about clock (drift, skew, ...) +# TODO: 1. Asynch, 2. Synch 3. Partial-synch 4. Timed asynch +# TODO: Causal-order (happen before), total-order, +# TODO: Causal-order algebra!!! +# TODO: Implement logical clocks (Lamport clocks, vector clocks) in event handling loop + +# AUTOMATA and EXECUTIONS +# TODO: Let component model hande executions and chekcs on executions (which event, in which order, per process or per system, similarity of executions) + + +# VISUALIZATION +# TODO: Space-time diagrams for events + +# TOPOLOGY MANAGEMENT +# TODO: Given a graph as input, generate the topology.... + +inf = float('inf') + +# The following are the common default events for all components. +class EventTypes(Enum): + INIT = "init" + MFRB = "msgfrombottom" + MFRT = "msgfromtop" + MFRP = "msgfrompeer" + +class MessageDestinationIdentifiers(Enum): + LINKLAYERBROADCAST = -1, # sinngle-hop broadcast, means all directly connected nodes + NETWORKLAYERBROADCAST = -2 # For flooding over multiple-hops means all connected nodes to me over one or more links + +# A Dictionary that holds a list for the same key +class ConnectorList(dict): + def __setitem__(self, key, value): + try: + self[key] + except KeyError: + super(ConnectorList, self).__setitem__(key, []) + self[key].append(value) + +class ConnectorTypes(Enum): + DOWN = "DOWN" + UP = "UP" + PEER = "PEER" + +class GenericMessagePayload: + def __init__(self, messagepayload): + self.messagepayload = messagepayload + +class GenericMessageHeader: + def __init__(self, messagetype, messagefrom, messageto, nexthop=float('inf'), interfaceid=float('inf'), sequencenumber=-1): + self.messagetype = messagetype + self.messagefrom = messagefrom + self.messageto = messageto + self.nexthop = nexthop + self.interfaceid = interfaceid + self.sequencenumber = sequencenumber + +class GenericMessage: + def __init__(self, header, payload): + self.header = header + self.payload = payload + self.uniqueid = str(header.messagefrom) + "-" + str(header.sequencenumber) + +class Event: + def __init__(self, eventsource, event, eventcontent, fromchannel=None): + self.eventsource = eventsource + self.event = event + self.time = datetime.datetime.now() + self.eventcontent = eventcontent + self.fromchannel = fromchannel + +def singleton(cls): + instance = [None] + + def wrapper(*args, **kwargs): + if instance[0] is None: + instance[0] = cls(*args, **kwargs) + return instance[0] + + return wrapper + +@singleton +class ComponentRegistry: + components = {} + + def get_component_by_instance(self, instance): + list_of_keys = list() + list_of_items = self.components.items() + for item in list_of_items: + if item[1] == instance: + list_of_keys.append(item[0]) + return list_of_keys + + def add_component(self, component): + key = component.componentname + str(component.componentinstancenumber) + self.components[key] = component + + def get_component_by_key(self, componentname, componentinstancenumber): + key = componentname + str(componentinstancenumber) + return self.components[key] + + def init(self): + for itemkey in self.components: + cmp = self.components[itemkey] + cmp.inputqueue.put_nowait(Event(self, EventTypes.INIT, None)) + + def print_components(self): + for itemkey in self.components: + cmp = self.components[itemkey] + print(f"I am {cmp.componentname}.{cmp.componentinstancenumber}") + for i in cmp.connectors: + connectedcmp = cmp.connectors[i] + for p in connectedcmp: + print(f"\t{i} {p.componentname}.{p.componentinstancenumber}") + +registry = ComponentRegistry() + +class ComponentModel: + terminated = False + + def on_init(self, eventobj: Event): + # print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + pass + + def on_message_from_bottom(self, eventobj: Event): + print(f"{EventTypes.MFRB} {self.componentname}.{self.componentinstancenumber}") + + def on_message_from_top(self, eventobj: Event): + print(f"{EventTypes.MFRT} {self.componentname}.{self.componentinstancenumber}") + + def on_message_from_peer(self, eventobj: Event): + print(f"{EventTypes.MFRP} {self.componentname}.{self.componentinstancenumber}") + + def __init__(self, componentname, componentinstancenumber, num_worker_threads=1): + self.eventhandlers = {EventTypes.INIT: self.on_init, EventTypes.MFRB: self.on_message_from_bottom, + EventTypes.MFRT: self.on_message_from_top, EventTypes.MFRP: self.on_message_from_peer} + # Add default handlers to all instantiated components. + # If a component overwrites the __init__ method it has to call the super().__init__ method + self.inputqueue = queue.Queue() + self.componentname = componentname + self.componentinstancenumber = componentinstancenumber + self.num_worker_threads = num_worker_threads + try: + if self.connectors: + pass + except AttributeError: + self.connectors = ConnectorList() + + self.registry = ComponentRegistry() + self.registry.add_component(self) + + for i in range(self.num_worker_threads): + t = Thread(target=self.queue_handler, args=[self.inputqueue]) + t.daemon = True + t.start() + + def connect_me_to_component(self, name, component): + try: + self.connectors[name] = component + except AttributeError: + self.connectors = ConnectorList() + self.connectors[name] = component + + def connect_me_to_channel(self, name, channel): + try: + self.connectors[name] = channel + except AttributeError: + self.connectors = ConnectorList() + self.connectors[name] = channel + connectornameforchannel = self.componentname + str(self.componentinstancenumber) + channel.connect_me_to_component(connectornameforchannel, self) + + def terminate(self): + self.terminated = True + + def send_down(self, event: Event): + try: + for p in self.connectors[ConnectorTypes.DOWN]: + p.trigger_event(event) + except: + pass + + def send_up(self, event: Event): + try: + for p in self.connectors[ConnectorTypes.UP]: + p.trigger_event(event) + except: + pass + + def send_peer(self, event: Event): + try: + for p in self.connectors[ConnectorTypes.PEER]: + p.trigger_event(event) + except: + pass + + def send_self(self, event: Event): + self.trigger_event(event) + + # noinspection PyArgumentList + def queue_handler(self, myqueue): + while not self.terminated: + workitem = myqueue.get() + if workitem.event in self.eventhandlers: + self.eventhandlers[workitem.event](eventobj=workitem) # call the handler + else: + print(f"Event Handler: {workitem.event} is not implemented") + myqueue.task_done() + + def trigger_event(self, eventobj: Event): + self.inputqueue.put_nowait(eventobj) + +@singleton +class Topology: + nodes = {} + channels = {} + + def construct_from_graph(self, G: nx.Graph, nodetype, channeltype): + self.G = G + nodes = list(G.nodes) + edges = list(G.edges) + for i in nodes: + cc = nodetype(nodetype.__name__, i) + self.nodes[i] = cc + for k in edges: + ch = channeltype(channeltype.__name__, str(k[0]) + "-" + str(k[1])) + self.channels[k] = ch + self.nodes[k[0]].connect_me_to_channel(ConnectorTypes.DOWN, ch) + self.nodes[k[1]].connect_me_to_channel(ConnectorTypes.DOWN, ch) + + def construct_from_graph_peterson(self, G: nx.Graph, nodetype, channeltype): + self.G = G + nodes = list(G.nodes) + edges = list(G.edges) + + nodes = nodes[::-1] + edges = edges[::-1] + + for i in nodes: + cc = nodetype(nodetype.__name__, i,i) + self.nodes[i] = cc + for k in edges: + ch = channeltype(channeltype.__name__, str(k[0]) + "-" + str(k[1])) + self.channels[k] = ch + self.nodes[k[0]].connect_me_to_channel(ConnectorTypes.DOWN, ch) + self.nodes[k[1]].connect_me_to_channel(ConnectorTypes.DOWN, ch) + + + def construct_single_node(self, nodetype, instancenumber): + self.singlenode = nodetype(nodetype.__name__, instancenumber) + self.G = nx.Graph() + self.G.add_nodes_from([0]) + self.nodes[0] = self.singlenode + + def construct_sender_receiver(self, sendertype, receivertype, channeltype): + self.sender = sendertype(sendertype.__name__, 0) + self.receiver = receivertype(receivertype.__name__, 1) + ch = channeltype(channeltype.__name__, "0-1") + self.G = nx.Graph() + self.G.add_nodes_from([0, 1]) + self.G.add_edges_from([(0, 1)]) + self.nodes[self.sender.componentinstancenumber] = self.sender + self.nodes[self.sender.componentinstancenumber] = self.receiver + self.channels[ch.componentinstancenumber] = ch + self.sender.connect_me_to_channel(ConnectorTypes.DOWN, ch) + self.receiver.connect_me_to_channel(ConnectorTypes.DOWN, ch) + + def allpairs_shortest_path(self): + return dict(nx.all_pairs_shortest_path(self.G)) + + def shortest_path_to_all(self, myid): + path = dict(nx.all_pairs_shortest_path(self.G)) + nodecnt = len(self.G.nodes) + for i in range(nodecnt): + print(path[myid][i]) + + def start(self): + # registry.printComponents() + N = len(self.G.nodes) + self.compute_forwarding_table() + self.nodecolors = ['b'] * N + self.nodepos = nx.drawing.spring_layout(self.G) + self.lock = Lock() + ComponentRegistry().init() + + def compute_forwarding_table(self): + #N = len(self.G.nodes) + self.ForwardingTable = dict(nx.all_pairs_shortest_path(self.G)) + # print(f"There are {N} nodes") + #for i in range(N): + #for j in range(N): + #try: + #mypath = path[i][j] + # print(f"{i}to{j} path = {path[i][j]} nexthop = {path[i][j][1]}") + #self.ForwardingTable[i][j] = path[i][j][1] + + # print(f"{i}to{j}path = NONE") + #self.ForwardingTable[i][j] = inf # No paths + #except IndexError: + # print(f"{i}to{j} nexthop = NONE") + #self.ForwardingTable[i][j] = i # There is a path but length = 1 (self) + + # all-seeing eye routing table contruction + def print_forwarding_table(self): + registry.print_components() + print('\n'.join([''.join(['{:4}'.format(item) for item in row]) + for row in list(self.ForwardingTable.values())])) + + # returns the all-seeing eye routing based next hop id + def get_next_hop(self, fromId, toId): + try: + retval = self.ForwardingTable[fromId][toId] + return retval[1] + except KeyError: + return inf + except IndexError: + return fromId + + # Returns the list of neighbors of a node + def get_neighbors(self, nodeId): + return sorted([neighbor for neighbor in self.G.neighbors(nodeId)]) + + def get_predecessors(self, nodeId): + return sorted([neighbor for neighbor in self.G.predecessors(nodeId)]) + + def get_successors(self, nodeId): + return sorted([neighbor for neighbor in self.G.neighbors(nodeId)]) + + + # Returns the list of neighbors of a node + def get_neighbor_count(self, nodeId): + # return len([neighbor for neighbor in self.G.neighbors(nodeId)]) + return self.G.degree[nodeId] + + def plot(self): + #self.lock.acquire() + nx.draw(self.G, self.nodepos, node_color=self.nodecolors, with_labels=True, font_weight='bold') + plt.draw() + print(self.nodecolors) + #self.lock.release() diff --git a/PetersonsAlgorithm/Channels.py b/PetersonsAlgorithm/Channels.py new file mode 100644 index 0000000000000000000000000000000000000000..a6e9bbd2bd74c43b2f86d21833f171255b41a6c8 --- /dev/null +++ b/PetersonsAlgorithm/Channels.py @@ -0,0 +1,160 @@ +import queue +import random +from enum import Enum +from threading import Thread + +from Ahc import ComponentModel, EventTypes, ConnectorList, MessageDestinationIdentifiers +from Ahc import Event + +# TODO: Channel failure models: lossy-link, fair-loss, stubborn links, perfect links (WHAT ELSE?), FIFO perfect +# TODO: Logged perfect links (tolerance to crashes), authenticated perfect links +# TODO: Broadcast channels? and their failure models? Collisions? +# TODO: Properties of all models separately: e.g., Fair loss, finite duplication, no fabrication in fair-loss link model +# TODO: Packets: loss, duplication, sequence change, windowing?, +# TODO: Eventually (unbounded time) or bounded time for message delivery? + + +# Channels have three events: sendtochannel, processinchannel and delivertocomponent +# Components tell channels to handle a message by the EventTypes.MFRT event, the component calls senddown with the event EventTypes.MFRT +# First pipeline stage moves the message to the interim pipeline stage with the "processinchannel" event for further processing, such as the channel may drop it, delay it, or whatever +# Channels deliver the message to output queue by the "delivertocomponent" event +# The output queue then will send the message up to the connected component(s) using the "messagefromchannel" event +# The components that will use the channel directly, will have to handle "messagefromchannel" event + +class ChannelEventTypes(Enum): + INCH = "processinchannel" + DLVR = "delivertocomponent" + +class Channel(ComponentModel): + + def on_init(self, eventobj: Event): + + pass + + # Overwrite onSendToChannel if you want to do something in the first pipeline stage + def on_message_from_top(self, eventobj: Event): + # channel receives the input message and will process the message by the process event in the next pipeline stage + myevent = Event(eventobj.eventsource, ChannelEventTypes.INCH, eventobj.eventcontent) + self.channelqueue.put_nowait(myevent) + + # Overwrite onProcessInChannel if you want to do something in interim pipeline stage + def on_process_in_channel(self, eventobj: Event): + # Add delay, drop, change order whatever.... + # Finally put the message in outputqueue with event deliver + myevent = Event(eventobj.eventsource, ChannelEventTypes.DLVR, eventobj.eventcontent) + self.outputqueue.put_nowait(myevent) + + # Overwrite onDeliverToComponent if you want to do something in the last pipeline stage + # onDeliver will deliver the message from the channel to the receiver component using messagefromchannel event + def on_deliver_to_component(self, eventobj: Event): + callername = eventobj.eventsource.componentinstancenumber + for item in self.connectors: + callees = self.connectors[item] + for callee in callees: + calleename = callee.componentinstancenumber + # print(f"I am connected to {calleename}. Will check if I have to distribute it to {item}") + if calleename == callername: + pass + else: + myevent = Event(eventobj.eventsource, EventTypes.MFRB, eventobj.eventcontent, self.componentinstancenumber) + callee.trigger_event(myevent) + + def __init__(self, componentname, componentinstancenumber): + super().__init__(componentname, componentinstancenumber) + self.outputqueue = queue.Queue() + self.channelqueue = queue.Queue() + self.eventhandlers[ChannelEventTypes.INCH] = self.on_process_in_channel + self.eventhandlers[ChannelEventTypes.DLVR] = self.on_deliver_to_component + + for i in range(self.num_worker_threads): + # note that the input queue is handled by the super class... + t = Thread(target=self.queue_handler, args=[self.channelqueue]) + t1 = Thread(target=self.queue_handler, args=[self.outputqueue]) + t.daemon = True + t1.daemon = True + t.start() + t1.start() + +class AHCChannelError(Exception): + pass + +class P2PFIFOPerfectChannel(Channel): + + # Overwrite onSendToChannel + # Channels are broadcast, that is why we have to check channel id's using hdr.interfaceid for P2P + def on_message_from_top(self, eventobj: Event): + # if channelid != hdr.interfaceif then drop (should not be on this channel) + hdr = eventobj.eventcontent.header + if hdr.nexthop != MessageDestinationIdentifiers.LINKLAYERBROADCAST: + if set(hdr.interfaceid.split("-")) == set(self.componentinstancenumber.split("-")): + #print(f"Will forward message since {hdr.interfaceid} and {self.componentinstancenumber}") + myevent = Event(eventobj.eventsource, ChannelEventTypes.INCH, eventobj.eventcontent) + self.channelqueue.put_nowait(myevent) + else: + #print(f"Will drop message since {hdr.interfaceid} and {self.componentinstancenumber}") + pass + + def on_deliver_to_component(self, eventobj: Event): + msg = eventobj.eventcontent + callername = eventobj.eventsource.componentinstancenumber + for item in self.connectors: + callees = self.connectors[item] + for callee in callees: + calleename = callee.componentinstancenumber + # print(f"I am connected to {calleename}. Will check if I have to distribute it to {item}") + if calleename == callername: + pass + else: + myevent = Event(eventobj.eventsource, EventTypes.MFRB, eventobj.eventcontent, self.componentinstancenumber) + callee.trigger_event(myevent) + + # Overwriting to limit the number of connected components + def connect_me_to_component(self, name, component): + try: + self.connectors[name] = component + # print(f"Number of nodes connected: {len(self.ports)}") + if len(self.connectors) > 2: + raise AHCChannelError("More than two nodes cannot connect to a P2PFIFOChannel") + except AttributeError: + self.connectors = ConnectorList() + self.connectors[name] = component + # except AHCChannelError as e: + # print( f"{e}" ) + +class P2PFIFOFairLossChannel(P2PFIFOPerfectChannel): + prob = 1 + duplicationprobability = 0 + # Overwrite onSendToChannel + # Channels are broadcast, that is why we have to check channel id's using hdr.interfaceid for P2P + + def on_message_from_top(self, eventobj: Event): + # if channelid != hdr.interfaceif then drop (should not be on this channel) + hdr = eventobj.eventcontent.header + if hdr.nexthop != MessageDestinationIdentifiers.LINKLAYERBROADCAST: + if set(hdr.interfaceid.split("-")) == set(self.componentinstancenumber.split("-")): + #print(f"Will forward message since {hdr.interfaceid} and {self.componentinstancenumber}") + myevent = Event(eventobj.eventsource, ChannelEventTypes.INCH, eventobj.eventcontent) + self.channelqueue.put_nowait(myevent) + else: + #print(f"Will drop message since {hdr.interfaceid} and {self.componentinstancenumber}") + pass + + + def on_process_in_channel(self, eventobj: Event): + if random.random() < self.prob: + myevent = Event(eventobj.eventsource, ChannelEventTypes.DLVR, eventobj.eventcontent) + self.outputqueue.put_nowait(myevent) + if random.random() < self.duplicationprobability: + self.channelqueue.put_nowait(eventobj) + + def setPacketLossProbability(self, prob): + self.prob = prob + + def setAverageNumberOfDuplicates(self, d): + if d > 0: + self.duplicationprobability = (d - 1) / d + else: + self.duplicationprobability = 0 + +class FIFOBroadcastPerfectChannel(Channel): + pass diff --git a/PetersonsAlgorithm/PetersonComponent.py b/PetersonsAlgorithm/PetersonComponent.py new file mode 100644 index 0000000000000000000000000000000000000000..d44f18be40b967e5f22d67b3df8d5230a3c4557c --- /dev/null +++ b/PetersonsAlgorithm/PetersonComponent.py @@ -0,0 +1,70 @@ +from enum import Enum +import time +from Ahc import ComponentModel, Event, GenericMessageHeader, GenericMessagePayload, GenericMessage, EventTypes +from threading import Thread + +flag = [False,False] +turn = 0 + +class PetersonMessageTypes(Enum): + INC = "+" + DEC = "-" + +class ResourceComponent(ComponentModel): + def on_init(self, eventobj: Event): + print(f"Initializing {self.componentname}.{self.componentinstancenumber}\n") + pass + + def on_message_from_bottom(self, eventobj: Event): + messagetype = eventobj.eventcontent.header.messagetype ## INC / DEC + if messagetype == PetersonMessageTypes.INC: + print("Increment value") + self.value += 1 + elif messagetype == PetersonMessageTypes.DEC: + print("Decrement value") + self.value -= 1 + + def __init__(self, componentname, componentinstancenumber): + super().__init__(componentname, componentinstancenumber) + self.value = 0 + + def __repr__(self): + return f"Value:{self.value}" + +class ProducerConsumerComponent(ComponentModel): + def on_init(self, eventobj: Event): + print(f"Initializing {self.componentname}.{self.componentinstancenumber}\n") + pass + + def send_message(self): + global flag,turn + message_header = GenericMessageHeader(self.messageType, self.componentinstancenumber, None,interfaceid=f"{self.componentinstancenumber}-{self.resourceComponentId}") + message_payload = GenericMessagePayload(None) + message = GenericMessage(message_header, message_payload) + otherThreadId = 1 - self.id + for i in range(self.count): + flag[self.id] = True # I want to get in to critical section + turn = otherThreadId # Lets wait until other thread finish + while flag[otherThreadId] and turn == otherThreadId: + time.sleep(0.0000001) + continue + ####critical section#### + self.send_down(Event(self, EventTypes.MFRT, message)) + time.sleep(0.001) + ######################## + flag[self.id] = False # My job is finished + + self.done = True + + def __init__(self, componentname, componentinstancenumber,type = PetersonMessageTypes.INC ,count = 100, resourceComponentId = 2): + super().__init__(componentname, componentinstancenumber) + self.count = count + self.resourceComponentId = resourceComponentId + self.done = False + self.messageType = type + self.id = self.componentinstancenumber + + def start(self): + t = Thread(target=self.send_message) + t.daemon = True + t.start() diff --git a/PetersonsAlgorithm/petersons.py b/PetersonsAlgorithm/petersons.py new file mode 100644 index 0000000000000000000000000000000000000000..0ac22661501b2c6db6f49d2847aaa86d7e7912ab --- /dev/null +++ b/PetersonsAlgorithm/petersons.py @@ -0,0 +1,172 @@ +import threading +import time + +mutex = threading.Lock() + +def peterson(id,function): + global flag,turn + otherThreadId = 1 - id + + flag[id] = True # I want to get in to critical section + turn = otherThreadId # Lets wait until other thread finish + while flag[otherThreadId] and turn == otherThreadId: + time.sleep(0.0000001) + continue + + ####critical section#### + function(id) + ######################## + + flag[id] = False # My job is finished + +def produce(id): + global counter + counter += 1 + #print("Thread {} counter:".format(id),counter) + +def consume(id): + global counter + counter -= 1 + #print("Thread {} counter:".format(id),counter) + +def producer(id,ntime): + global waitFlag + while waitFlag: + continue + print("Producer {} started".format(id)) + for i in range(ntime): + produce(id) + print("Producer {} finished".format(id)) + +def producerPeterson(id,ntime): + global waitFlag + while waitFlag: + continue + print("Producer {} started".format(id)) + for i in range(ntime): + peterson(id,produce) + print("Producer {} finished".format(id)) + +def producerMutex(id,ntime): + global waitFlag + while waitFlag: + continue + print("Producer {} started".format(id)) + for i in range(ntime): + with mutex: + produce(id) + print("Producer {} finished".format(id)) + +def consumer(id,ntime): + global waitFlag + while waitFlag: + continue + print("Consumer {} started".format(id)) + for i in range(ntime): + consume(id) + print("Consumer {} finished".format(id)) + +def consumerPeterson(id,ntime): + global waitFlag + while waitFlag: + continue + print("Consumer {} started".format(id)) + for i in range(ntime): + peterson(id,consume) + print("Consumer {} finished".format(id)) + +def consumerMutex(id,ntime): + global waitFlag + while waitFlag: + continue + print("Consumer {} started".format(id)) + for i in range(ntime): + with mutex: + consume(id) + print("Consumer {} finished".format(id)) + +if __name__ == "__main__": + + ntime = 500000 + + waitFlag = True + counter = 0 + t0 = threading.Thread(target=producer, args=(0,ntime)) + t1 = threading.Thread(target=producer, args=(1,ntime)) + t0.start() + t1.start() + startTime = time.time() + waitFlag = False + t0.join() + t1.join() + print("Increment counters without any syncronization: Expected output {} , real output {}".format(ntime*2,counter)) + print("Time: {}".format(time.time()-startTime)) + + waitFlag = True + counter = 0 + t0 = threading.Thread(target=producerMutex, args=(0,ntime)) + t1 = threading.Thread(target=producerMutex, args=(1,ntime)) + t0.start() + t1.start() + startTime = time.time() + waitFlag = False + t0.join() + t1.join() + print("Increment counters with mutex: Expected output {} , real output {}".format(ntime*2,counter)) + print("Time: {}".format(time.time()-startTime)) + + waitFlag = True + counter = 0 + t0 = threading.Thread(target=producer, args=(0,ntime)) + t1 = threading.Thread(target=consumer, args=(1,ntime)) + t0.start() + t1.start() + startTime = time.time() + waitFlag = False + t0.join() + t1.join() + print("Producer-consumer without any syncronization: Expected output 0 , real output {}".format(counter)) + print("Time: {}".format(time.time()-startTime)) + + waitFlag = True + counter = 0 + t0 = threading.Thread(target=producerMutex, args=(0,ntime)) + t1 = threading.Thread(target=consumerMutex, args=(1,ntime)) + t0.start() + t1.start() + startTime = time.time() + waitFlag = False + t0.join() + t1.join() + print("Producer-consumer with mutex: Expected output 0 , real output {}".format(counter)) + print("Time: {}".format(time.time()-startTime)) + + waitFlag = True + flag = [False,False] + turn = 0 + counter = 0 + t0 = threading.Thread(target=producerPeterson, args=(0,ntime)) + t1 = threading.Thread(target=producerPeterson, args=(1,ntime)) + t0.start() + t1.start() + startTime = time.time() + waitFlag = False + t0.join() + t1.join() + print("Increment counters with Petersons Algorithm: Expected output {} , real output {}".format(ntime*2,counter)) + print("Time: {}".format(time.time()-startTime)) + + waitFlag = True + flag = [False,False] + turn = 0 + counter = 0 + t0 = threading.Thread(target=producerPeterson, args=(0,ntime)) + t1 = threading.Thread(target=consumerPeterson, args=(1,ntime)) + t0.start() + t1.start() + startTime = time.time() + waitFlag = False + t0.join() + t1.join() + print("Producer-consumer with Petersons Algorithm: Expected output 0 , real output {}".format(counter)) + print("Time: {}".format(time.time()-startTime)) \ No newline at end of file diff --git a/PetersonsAlgorithm/testPeterson.py b/PetersonsAlgorithm/testPeterson.py new file mode 100644 index 0000000000000000000000000000000000000000..64563e09e74f0dd6383f5aa41a53d26a711900e1 --- /dev/null +++ b/PetersonsAlgorithm/testPeterson.py @@ -0,0 +1,58 @@ +import random +import time +from enum import Enum + +import matplotlib.pyplot as plt +import networkx as nx + +from PetersonComponent import * +from Ahc import ComponentModel, MessageDestinationIdentifiers, Event, GenericMessageHeader, GenericMessagePayload, GenericMessage, EventTypes, ConnectorTypes, Topology, ComponentRegistry +from Channels import FIFOBroadcastPerfectChannel + +class AdHocNode(ComponentModel): + + def on_init(self, eventobj: Event): + print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + + def on_message_from_top(self, eventobj: Event): + print("Outgoing Message from ", self.componentinstancenumber, ". Node") + self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent)) + + def on_message_from_bottom(self, eventobj: Event): + print("Incoming Message to ", self.componentinstancenumber, ". Node, From: ", eventobj.eventcontent.header.messagefrom) + self.send_up(Event(self, EventTypes.MFRB, eventobj.eventcontent)) + + def __init__(self, componentname, componentid, type): + if type == 0: + self.mainComponent = ProducerConsumerComponent("ProducerComponent", componentid,PetersonMessageTypes.INC) + self.mainComponent.connect_me_to_component(ConnectorTypes.DOWN, self) + elif type == 1: + self.mainComponent = ProducerConsumerComponent("ConsumerComponent", componentid,PetersonMessageTypes.DEC) + self.mainComponent.connect_me_to_component(ConnectorTypes.DOWN, self) + elif type == 2: + self.mainComponent = ResourceComponent("ResourceComponent", componentid) + self.connect_me_to_component(ConnectorTypes.UP, self.mainComponent) + super().__init__(componentname, componentid) + +start = time.time() +G = nx.random_geometric_graph(3,1) +nx.draw(G, with_labels=True, font_weight='bold') +plt.draw() + +topo = Topology() +topo.construct_from_graph_peterson(G, AdHocNode, FIFOBroadcastPerfectChannel) +topo.start() + +time.sleep(1) +topo.nodes[0].mainComponent.start() +topo.nodes[1].mainComponent.start() +plt.show() + +while (not topo.nodes[0].mainComponent.done): + time.sleep(0.1) + +while (not topo.nodes[1].mainComponent.done): + time.sleep(0.1) + +print(f"{time.time() - start} s passed.") +print(topo.nodes[2].mainComponent)