diff --git a/Ahc.py b/Ahc.py index b2d548773c96c44c90ddf9c6564077306238592b..830b8de1e95752fb8c6b8bcf75e697ff1c3869a8 100644 --- a/Ahc.py +++ b/Ahc.py @@ -255,6 +255,19 @@ class Topology: nodes = {} channels = {} + def construct_from_graph_with_node_info(self, G: nx.Graph, nodetype, channeltype, node_info): + self.G = G + nodes = list(G.nodes) + edges = list(G.edges) + for i in nodes: + cc = nodetype(nodetype.__name__, i, node_info[i]) + self.nodes[i] = cc + for k in edges: + ch = channeltype(channeltype.__name__, str(k[0]) + "-" + str(k[1])) + self.channels[k] = ch + self.nodes[k[0]].connect_me_to_channel(ConnectorTypes.DOWN, ch) + self.nodes[k[1]].connect_me_to_channel(ConnectorTypes.DOWN, ch) + def construct_from_graph(self, G: nx.Graph, nodetype, channeltype): self.G = G nodes = list(G.nodes) diff --git a/Channels.py b/Channels.py index a6e9bbd2bd74c43b2f86d21833f171255b41a6c8..a0cdbb5665397c06ae38c1cc66a53db295e85704 100644 --- a/Channels.py +++ b/Channels.py @@ -158,3 +158,25 @@ class P2PFIFOFairLossChannel(P2PFIFOPerfectChannel): class FIFOBroadcastPerfectChannel(Channel): pass + +class FIFOBroadcastFairLossChannel(Channel): + prob = 0.5 + duplicationprobability = 0.5 + # Overwrite onSendToChannel + + def on_process_in_channel(self, eventobj: Event): + if random.random() < self.prob: + myevent = Event(eventobj.eventsource, + ChannelEventTypes.DLVR, eventobj.eventcontent) + self.outputqueue.put_nowait(myevent) + if random.random() < self.duplicationprobability: + self.channelqueue.put_nowait(eventobj) + + def setPacketLossProbability(self, prob): + self.prob = prob + + def setAverageNumberOfDuplicates(self, d): + if d > 0: + self.duplicationprobability = (d - 1) / d + else: + self.duplicationprobability = 0 diff --git a/TagToTag/GenericTag.py b/TagToTag/GenericTag.py new file mode 100644 index 0000000000000000000000000000000000000000..d8bb67c6424fac48235ff0621bb6e89e74a39bbc --- /dev/null +++ b/TagToTag/GenericTag.py @@ -0,0 +1,194 @@ +import random +from enum import Enum +import time +from threading import Thread, Lock + +from Ahc import ComponentModel, MessageDestinationIdentifiers, Event, GenericMessageHeader, GenericMessagePayload, GenericMessage, EventTypes + +class TagMessageTypes(Enum): + TTT = "TAGTOTAG" + RTI = "READERTOTAGINIT" + RTC = "READERTOTAGCOLLECT" + TRI = "TAGTOREADERINITRESPONSE" + TRC = "TAGTOREADERCOLLECTRESPONSE" + +class TagMessageHeader(GenericMessageHeader): + pass + +class TagPayload(GenericMessagePayload): + pass + +class TagMessage(GenericMessage): + pass + +class TagMessageDestinationIdentifiers(Enum): + TAGTOTAGBROADCAST = -3 + +class TagComponent(ComponentModel): + known_tags_list_max_length = 10 + min_seconds_to_repeat_same_tag = 30 + sleep_time_between_broadcast_second = 10 + + def on_init(self, eventobj: Event): + # print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + pass + + def start_tag_to_tag_broadcast_thread(self): + t = Thread(target=self.send_tag_to_tag_broadcast_message_to_down, args=[self.tag_id]) + t.daemon = True + t.start() + + def send_tag_to_tag_broadcast_message_to_down(self, tag_id): + while True: + message_header = TagMessageHeader(TagMessageTypes.TTT, tag_id, TagMessageDestinationIdentifiers.TAGTOTAGBROADCAST) + message_payload = TagPayload(None) + message = TagMessage(message_header, message_payload) + + self.send_down(Event(self, EventTypes.MFRT, message)) + + time.sleep(random.randint(1, self.sleep_time_between_broadcast_second)) + + def send_message_to_reader(self, reader_id, message, message_type): + message_header = TagMessageHeader(message_type, self.tag_id, reader_id) + message_payload = TagPayload(message) + message = TagMessage(message_header, message_payload) + + self.send_down(Event(self, EventTypes.MFRT, message)) + + def compare_tags_belong_to_same_person(self, other_tag_id): + return int(other_tag_id/10) == int(self.tag_id/10) + + def on_message_from_bottom(self, eventobj: Event): + incoming_message_time = time.perf_counter() + message_header = eventobj.eventcontent.header + message_type = message_header.messagetype + message_from = message_header.messagefrom + message_to = message_header.messageto + + if message_to == TagMessageDestinationIdentifiers.TAGTOTAGBROADCAST and message_type == TagMessageTypes.TTT and not(self.compare_tags_belong_to_same_person(message_from)): + for check_index in range(self.known_tags_list_index - self.min_seconds_index_diff, self.known_tags_list_index): + if incoming_message_time - self.known_tags_list[check_index][1] > self.min_seconds_to_repeat_same_tag: + self.min_seconds_index_diff -= 1 + else: + break + + founded = False + for check_index in range(self.known_tags_list_index - self.min_seconds_index_diff, self.known_tags_list_index): + if self.known_tags_list[check_index][0] == message_from: + founded = True + break + + if founded == False: + self.known_tags_list[self.known_tags_list_index] = (message_from, incoming_message_time) + self.known_tags_list_index = (self.known_tags_list_index + 1) % self.known_tags_list_max_length + if self.min_seconds_index_diff + 1 < self.known_tags_list_max_length: + self.min_seconds_index_diff += 1 + + elif message_to == self.tag_id and self.compare_tags_belong_to_same_person(message_from): + if message_type == TagMessageTypes.RTI: + self.send_message_to_reader(message_from, self.known_tags_list_max_length, TagMessageTypes.TRI) + elif message_type == TagMessageTypes.RTC: + known_tags_list_index = eventobj.eventcontent.payload.messagepayload + self.send_message_to_reader(message_from, (known_tags_list_index, self.known_tags_list[known_tags_list_index]), TagMessageTypes.TRC) + + def __init__(self, componentname, componentinstancenumber, tag_id): + # tag_id = TC No. + [1-9] + self.tag_id = tag_id + self.known_tags_list = [None for i in range(0, self.known_tags_list_max_length)] + self.known_tags_list_index = 0 + self.min_seconds_index_diff = 0 + self.start_tag_to_tag_broadcast_thread() + + super().__init__(componentname, componentinstancenumber) + + +class ReaderComponent(ComponentModel): + reader_retry_count = 10 + reader_retry_timeout = 0.1 + def on_init(self, eventobj: Event): + # print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + pass + + def start_collecting(self): + self.incoming_messages = {} + self.initially_reading_tag = None + self.collector_thread.start() + self.collector_thread = Thread(target=self.collect_tag_information_thread, args=[]) + self.collector_thread.daemon = True + + def collect_tag_information_thread(self): + for tag_index in range(1, 10): + tag_id = self.reader_id + tag_index + self.incoming_messages[tag_id] = None + retry = 0 + while retry < self.reader_retry_count: + self.send_message_to_tag(tag_id, TagMessageTypes.RTI) + starting_time = time.perf_counter() + while time.perf_counter() - starting_time < self.reader_retry_timeout and self.incoming_messages[tag_id] == None: + pass + if self.incoming_messages[tag_id] != None: + break + retry += 1 + + ids_to_delete = [] + for tag_id in self.incoming_messages: + if self.incoming_messages[tag_id] == None: + ids_to_delete.append(tag_id) + else: + for known_tags_list_index in range(0, len(self.incoming_messages[tag_id])): + self.initially_reading_tag = (known_tags_list_index, tag_id) + retry_count = 0 + while retry_count < self.reader_retry_count and self.initially_reading_tag == (known_tags_list_index, tag_id) and self.incoming_messages[tag_id][known_tags_list_index] == None: + self.send_message_to_tag(tag_id, TagMessageTypes.RTC, known_tags_list_index) + starting_time = time.perf_counter() + while time.perf_counter() - starting_time < self.reader_retry_timeout and self.initially_reading_tag == (known_tags_list_index, tag_id) and self.incoming_messages[tag_id][known_tags_list_index] == None: + pass + retry_count += 1 + if self.initially_reading_tag == None: + break + for tag_id in ids_to_delete: + del self.incoming_messages[tag_id] + self.print_collected_information() + + def send_message_to_tag(self, send_to, message_type, payload=None): + message_header = TagMessageHeader(message_type, self.reader_id, send_to) + message_payload = TagPayload(payload) + message = TagMessage(message_header, message_payload) + + self.send_down(Event(self, EventTypes.MFRT, message)) + + def print_collected_information(self): + for tag_id in self.incoming_messages: + print("\n", tag_id, ": ", self.incoming_messages[tag_id], "\n") + + def compare_tags_belong_to_same_person(self, other_tag_id): + return int(other_tag_id/10) == int(self.reader_id/10) + + def on_message_from_bottom(self, eventobj: Event): + message_header = eventobj.eventcontent.header + message_type = message_header.messagetype + message_from = message_header.messagefrom + message_to = message_header.messageto + + if message_to == self.reader_id and self.compare_tags_belong_to_same_person(message_from): + if message_type == TagMessageTypes.TRI: + if message_from in self.incoming_messages and self.incoming_messages[message_from] == None: + known_tags_list_max_length = eventobj.eventcontent.payload.messagepayload + self.incoming_messages[message_from] = [None for i in range(0, known_tags_list_max_length)] + elif message_type == TagMessageTypes.TRC: + tag_information = eventobj.eventcontent.payload.messagepayload + if self.incoming_messages[message_from][tag_information[0]] == None and self.initially_reading_tag != None and message_from == self.initially_reading_tag[1] and tag_information[0] == self.initially_reading_tag[0]: + if tag_information[1] == None: + self.initially_reading_tag = None + else: + self.incoming_messages[message_from][tag_information[0]] = tag_information[1] + + def __init__(self, componentname, componentinstancenumber, reader_id): + # reader_id = TC No. + 0 + self.reader_id = reader_id + self.initially_reading_tag = None + self.incoming_messages = {} + + self.collector_thread = Thread(target=self.collect_tag_information_thread, args=[]) + self.collector_thread.daemon = True + super().__init__(componentname, componentinstancenumber) diff --git a/tests/testTag.py b/tests/testTag.py new file mode 100644 index 0000000000000000000000000000000000000000..78fd1a20807be96dd2d7d83c4d30234546fb0f8f --- /dev/null +++ b/tests/testTag.py @@ -0,0 +1,104 @@ +import matplotlib.pyplot as plt +import networkx as nx +import random + +from TagToTag.GenericTag import TagComponent, ReaderComponent, TagMessageDestinationIdentifiers, TagMessageTypes +from Ahc import ComponentModel, MessageDestinationIdentifiers, Event, GenericMessageHeader, GenericMessagePayload, GenericMessage, EventTypes, ConnectorTypes, Topology +from Channels import FIFOBroadcastFairLossChannel + +class AdHocNode(ComponentModel): + + def on_init(self, eventobj: Event): + # print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + pass + + def on_message_from_top(self, eventobj: Event): + # try: + # print("Outgoing Message From ", self.mainTagComponent.tag_id, ". Node, To: ", eventobj.eventcontent.header.messageto, ". Node") + # except: + # print("Outgoing Message From ", self.mainReaderComponent.reader_id, ". Node, To: ", eventobj.eventcontent.header.messageto, ". Node") + self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent)) + + def on_message_from_bottom(self, eventobj: Event): + # try: + # print("Incoming Message From: ", eventobj.eventcontent.header.messagefrom, ". Node, To: ", self.mainTagComponent.tag_id, ". Node") + # except: + # print("Incoming Message From: ", eventobj.eventcontent.header.messagefrom, ". Node, To: ", self.mainReaderComponent.reader_id, ". Node") + self.send_up(Event(self, EventTypes.MFRB, eventobj.eventcontent)) + + def __init__(self, componentname, componentid, node_info): + if node_info["node_type"] == "tag": + self.mainTagComponent = TagComponent("MainTagComponent", componentid, node_info["tag_id"]) + self.mainTagComponent.connect_me_to_component(ConnectorTypes.DOWN, self) + self.connect_me_to_component(ConnectorTypes.UP, self.mainTagComponent) + + elif node_info["node_type"] == "reader": + self.mainReaderComponent = ReaderComponent("MainReaderComponent", componentid, node_info["reader_id"]) + self.mainReaderComponent.connect_me_to_component(ConnectorTypes.DOWN, self) + self.connect_me_to_component(ConnectorTypes.UP, self.mainReaderComponent) + + super().__init__(componentname, componentid) + + +def compare_channel_and_instance_number(channel, node_id_1, node_id_2): + return set(channel.componentinstancenumber.split("-")) == {str(node_id_1), str(node_id_2)} + +def disconnect_two_nodes(graph, topology, node_id_1, node_id_2): + node_1_channels = topology.nodes[node_id_1].connectors[ConnectorTypes.DOWN] + for channel_index in range(0, len(node_1_channels)): + if compare_channel_and_instance_number(node_1_channels[channel_index], node_id_1, node_id_2): + del node_1_channels[channel_index] + break + + node_2_channels = topology.nodes[node_id_2].connectors[ConnectorTypes.DOWN] + for channel_index in range(0, len(node_2_channels)): + if compare_channel_and_instance_number(node_2_channels[channel_index], node_id_1, node_id_2): + del node_2_channels[channel_index] + break + + for channel in topology.channels: + if compare_channel_and_instance_number(topology.channels[channel], node_id_1, node_id_2): + del topology.channels[channel] + break + + graph.remove_edge(node_id_1, node_id_2) + +def connect_two_nodes(graph, topology, node_id_1, node_id_2, channeltype): + ch = channeltype(channeltype.__name__, str(node_id_1) + "-" + str(node_id_2)) + topology.channels[(node_id_1, node_id_2)] = ch + topology.nodes[node_id_1].connect_me_to_channel(ConnectorTypes.DOWN, ch) + topology.nodes[node_id_2].connect_me_to_channel(ConnectorTypes.DOWN, ch) + + graph.add_edge(node_id_1, node_id_2) + +node_count = 3 + +G = nx.random_geometric_graph(node_count, 1) +nx.draw(G, with_labels=True, font_weight='bold') + +random_tc_1 = random.randint(10000000000, 99999999999) +random_tc_2 = random.randint(10000000000, 99999999999) +# node_info = {0: {"node_type": "reader", "reader_id": random_tc_1*10}, 1: {"node_type": "tag", "tag_id": random_tc_1*10+1}, 2: {"node_type": "tag", "tag_id": random_tc_2*10+2}, 3: {"node_type": "tag", "tag_id": random_tc_1*10+3}} +node_info = {0: {"node_type": "reader", "reader_id": random_tc_1*10}, 1: {"node_type": "tag", "tag_id": random_tc_1*10+1}, 2: {"node_type": "tag", "tag_id": random_tc_2*10+2}} +topo = Topology() +topo.construct_from_graph_with_node_info(G, AdHocNode, FIFOBroadcastFairLossChannel, node_info) +topo.start() + +while 1: + print("To change the graph exit first.") + plt.show() + choice = int(input("To add an edge -> 1\nTo remove an edge -> 2\nTo start reader -> 3\nTo exit -> 0\n")) + if choice == 0: + break + if choice == 1: + node_id_1 = int(input("Node ID 1: ")) + node_id_2 = int(input("Node ID 2: ")) + connect_two_nodes(G, topo, node_id_1, node_id_2, FIFOBroadcastFairLossChannel) + elif choice == 2: + node_id_1 = int(input("Node ID 1: ")) + node_id_2 = int(input("Node ID 2: ")) + disconnect_two_nodes(G, topo, node_id_1, node_id_2) + elif choice == 3: + reader_node_id = int(input("Reader Node ID: ")) + topo.nodes[reader_node_id].mainReaderComponent.start_collecting() + nx.draw(G, with_labels=True, font_weight='bold')