From f81c49fbd30d0904726d187c252e59eda67f9adb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ozan=20Ak=C4=B1n?= Date: Wed, 26 May 2021 01:54:34 +0300 Subject: [PATCH 1/3] Add shared memory: self-stabilization --- SelfStabilization/AfekKuttenYang.py | 233 +++++++++++++++++++++++ SelfStabilization/AroraGouda.py | 45 +++++ SelfStabilization/RWLock.py | 65 +++++++ SelfStabilization/SharedMemory.py | 107 +++++++++++ tests/SelfStabilization/test.py | 29 +++ tests/SelfStabilization/testbenchmark.py | 41 ++++ 6 files changed, 520 insertions(+) create mode 100644 SelfStabilization/AfekKuttenYang.py create mode 100644 SelfStabilization/AroraGouda.py create mode 100644 SelfStabilization/RWLock.py create mode 100644 SelfStabilization/SharedMemory.py create mode 100644 tests/SelfStabilization/test.py create mode 100644 tests/SelfStabilization/testbenchmark.py diff --git a/SelfStabilization/AfekKuttenYang.py b/SelfStabilization/AfekKuttenYang.py new file mode 100644 index 0000000..bb01e23 --- /dev/null +++ b/SelfStabilization/AfekKuttenYang.py @@ -0,0 +1,233 @@ +from SelfStabilization.SharedMemory import * + +class RequestDirection: + ASK = 1 + GRANT = 2 + + + +class AfekKuttenYangNodeCopy: + def __init__(self, topology, node_index): + self.topology = topology + self.node_index = node_index + + def update(self): + node_instance = self.topology.nodes[self.node_index] + + self.parent_node = node_instance.parent_node.val + self.root_node = node_instance.root_node.val + self.distance_to_root = node_instance.distance_to_root.val + + self.req_node = node_instance.req_node.val + self.req_from_node = node_instance.req_from_node.val + self.req_to_node = node_instance.req_to_node.val + self.req_direction = node_instance.req_direction.val + + self.toggle = node_instance.toggle.val + + + +class AfekKuttenYangNode(SharedMemoryNode): + def __init__(self, topology, node_index, K): + super().__init__(topology, node_index, K) + + self.req_node = RWLockedVal(None) + self.req_from_node = RWLockedVal(None) + self.req_to_node = RWLockedVal(None) + self.req_direction = RWLockedVal(None) + + self.toggle = RWLockedVal(True) + + self.neighbor_copies = dict() + + for neighbor_node in self.neighbors: + self.neighbor_copies[neighbor_node] = AfekKuttenYangNodeCopy(self.topology, neighbor_node) + + def setup(self): + for neighbor_node in self.neighbors: + self.neighbor_copies[neighbor_node].update() + + @property + def am_root(self): + return self.parent_node.val is None and self.root_node.val == self.node_index and self.distance_to_root.val == 0 + + @property + def not_root(self): + if self.parent_node.val is None: + return False + + parent_obj = self.neighbor_copies[self.parent_node.val] + + return self.parent_node.val in self.neighbors and \ + self.root_node.val > self.node_index and \ + self.root_node.val == parent_obj.root_node and \ + self.distance_to_root.val == parent_obj.distance_to_root + 1 + + @property + def max_root(self): + root_node = self.root_node.val + + for neighbor_node in self.neighbors: + neighbor = self.neighbor_copies[neighbor_node] + + if root_node < neighbor.root_node: + return False + + return True + + def fetch_asking(self, q): + q_node_obj = self.neighbor_copies[q] + q_root_node = q_node_obj.root_node + + for neighbor_node in self.neighbors: + neighbor = self.neighbor_copies[neighbor_node] + + if q_root_node < neighbor.root_node: + return False + + return self.req_node.val == self.req_from_node.val == self.node_index and \ + self.req_to_node.val == q and \ + self.req_direction.val == RequestDirection.ASK + + def fetch_granted(self, q): + q_node_obj = self.neighbor_copies[q] + + # Second statement has changed + return self.req_node.val == q_node_obj.req_node and \ + self.node_index == q_node_obj.req_from_node and \ + q_node_obj.req_direction == RequestDirection.GRANT and \ + self.req_direction.val == RequestDirection.ASK + + @property + def requestor(self): + return self.req_to_node.val in self.neighbors and \ + self.neighbor_copies[self.req_to_node.val].root_node > self.node_index and \ + self.req_node.val == self.req_from_node.val == self.node_index + + def fetch_handling(self, q): + q_node_obj = self.neighbor_copies[q] + + return self.req_node.val == q_node_obj.req_node and \ + self.req_from_node.val == q and \ + q_node_obj.req_to_node == self.node_index and \ + self.req_to_node.val == self.parent_node.val and \ + q_node_obj.req_direction == RequestDirection.ASK + + def fetch_request(self, q): + q_node_obj = self.neighbor_copies[q] + q_node_am_root = q_node_obj.parent_node is None and q_node_obj.root_node == q and q_node_obj.distance_to_root == 0 + + return ((q_node_am_root and q_node_obj.req_node == q_node_obj.req_from_node == q) or \ + (q_node_obj.parent_node == self.node_index and q_node_obj.req_node != q and q_node_obj.req_node is not None)) and \ + q_node_obj.req_to_node == self.node_index + + @property + def not_handling(self): + return self.req_node.val is None and \ + self.req_from_node.val is None and \ + self.req_to_node.val is None and \ + self.req_direction.val is None + + def thread_handler(self): + while True: + if self.topology.stable.val: + break + + for neighbor_node in self.neighbors: + self.neighbor_copies[neighbor_node].update() + + is_all_toggle = True + + for neighbor_node in self.neighbors: + if self.toggle.val != self.topology.nodes[neighbor_node].neighbor_copies[self.node_index].toggle: + is_all_toggle = False + break + + if is_all_toggle: + if not (self.not_root and self.max_root) and not self.am_root: + self.parent_node.set(None) + self.root_node.set(self.node_index) + self.distance_to_root.set(0) + + elif not self.max_root: + is_all_not_asking = True + + for neighbor_node in self.neighbors: + if self.fetch_asking(neighbor_node): + is_all_not_asking = False + break + + if is_all_not_asking: + selected_q = -1 + selected_root_node = -1 + + for neighbor_node in self.neighbors: + new_root_node = self.neighbor_copies[neighbor_node].root_node + + if new_root_node > selected_root_node: + selected_root_node = new_root_node + selected_q = neighbor_node + + if selected_q != -1: + self.req_node.set(self.node_index) + self.req_from_node.set(self.node_index) + self.req_to_node.set(selected_q) + self.req_direction.set(RequestDirection.ASK) + + elif self.requestor and self.fetch_granted(self.req_to_node.val): + new_parent = self.req_to_node.val + + self.parent_node.set(new_parent) + self.root_node.set(self.neighbor_copies[new_parent].root_node) + self.distance_to_root.set(self.neighbor_copies[new_parent].distance_to_root + 1) + + self.req_node.set(None) + self.req_from_node.set(None) + self.req_to_node.set(None) + self.req_direction.set(None) + + else: + is_all_not_request_and_not_handling = True + + for neighbor_node in self.neighbors: + if self.fetch_request(neighbor_node) and self.fetch_handling(neighbor_node): + is_all_not_request_and_not_handling = False + break + + if is_all_not_request_and_not_handling: + if not self.not_handling: + self.req_node.set(None) + self.req_from_node.set(None) + self.req_to_node.set(None) + self.req_direction.set(None) + + elif self.am_root or \ + (self.parent_node.val is not None and self.neighbor_copies[self.parent_node.val].req_from_node != self.node_index): + selected_q = None + + for neighbor_node in self.neighbors: + if self.fetch_request(neighbor_node): + selected_q = neighbor_node + break + + if selected_q != None: + self.req_node.set(self.neighbor_copies[selected_q].req_node) + self.req_from_node.set(selected_q) + self.req_to_node.set(self.parent_node.val) + self.req_direction.set(RequestDirection.GRANT) + + + elif self.am_root and self.req_direction.val == RequestDirection.ASK: + self.req_direction.set(RequestDirection.GRANT) + + elif self.parent_node.val is not None and self.fetch_granted(self.parent_node.val): + self.req_direction.set(RequestDirection.GRANT) + + self.topology.stable_statuses[self.node_index].set( + self.max_root and ( + (self.node_index == self.K and self.am_root) or + (self.node_index != self.K and self.not_root) + ) + ) + + self.toggle.set(not self.toggle.val) diff --git a/SelfStabilization/AroraGouda.py b/SelfStabilization/AroraGouda.py new file mode 100644 index 0000000..99b6217 --- /dev/null +++ b/SelfStabilization/AroraGouda.py @@ -0,0 +1,45 @@ +from SelfStabilization.SharedMemory import * + +class AroraGoudaNode(SharedMemoryNode): + def thread_handler(self): + while True: + if self.topology.stable.val: + break + + any_change = False + + if self.root_node.val < self.node_index or \ + (self.parent_node.val == None and (self.root_node.val != self.node_index or self.distance_to_root.val != 0)) or \ + (self.parent_node.val != None and self.parent_node.val not in self.neighbors) or \ + self.distance_to_root.val >= self.K: + self.parent_node.set(None) + self.root_node.set(self.node_index) + self.distance_to_root.set(0) + + any_change = True + + else: + for neighbor_node__i in self.neighbors: + neighbor_node_obj = self.topology.nodes[neighbor_node__i] + + if neighbor_node_obj.distance_to_root.val < self.K: + if self.parent_node.val == neighbor_node__i: + if self.root_node.val != neighbor_node_obj.root_node.val: + self.root_node.set(neighbor_node_obj.root_node.val) + self.distance_to_root.set(neighbor_node_obj.distance_to_root.val + 1) + + any_change = True + + elif self.distance_to_root.val != neighbor_node_obj.distance_to_root.val + 1: + self.distance_to_root.set(neighbor_node_obj.distance_to_root.val + 1) + + any_change = True + + elif self.root_node.val < neighbor_node_obj.root_node.val: + self.parent_node.set(neighbor_node_obj.node_index) + self.root_node.set(neighbor_node_obj.root_node.val) + self.distance_to_root.set(neighbor_node_obj.distance_to_root.val + 1) + + any_change = True + + self.topology.stable_statuses[self.node_index].set(not any_change) diff --git a/SelfStabilization/RWLock.py b/SelfStabilization/RWLock.py new file mode 100644 index 0000000..9a3c22a --- /dev/null +++ b/SelfStabilization/RWLock.py @@ -0,0 +1,65 @@ +from contextlib import contextmanager +from threading import Lock + +class RWLock: + def __init__(self): + self.w_lock = Lock() + self.num_r_lock = Lock() + self.num_r = 0 + + def r_acquire(self): + self.num_r_lock.acquire() + self.num_r += 1 + + if self.num_r == 1: + self.w_lock.acquire() + + self.num_r_lock.release() + + def r_release(self): + assert self.num_r > 0 # TODO: remove + self.num_r_lock.acquire() + self.num_r -= 1 + + if self.num_r == 0: + self.w_lock.release() + + self.num_r_lock.release() + + @contextmanager + def r_locked(self): + try: + self.r_acquire() + yield + finally: + self.r_release() + + def w_acquire(self): + self.w_lock.acquire() + + def w_release(self): + self.w_lock.release() + + @contextmanager + def w_locked(self): + try: + self.w_acquire() + yield + finally: + self.w_release() + +class RWLockedVal: + def __init__(self, initial_val): + self._val = None + self._lock = RWLock() + + self.set(initial_val) + + @property + def val(self): + with self._lock.r_locked(): + return self._val + + def set(self, val): + with self._lock.w_locked(): + self._val = val diff --git a/SelfStabilization/SharedMemory.py b/SelfStabilization/SharedMemory.py new file mode 100644 index 0000000..4c86ea9 --- /dev/null +++ b/SelfStabilization/SharedMemory.py @@ -0,0 +1,107 @@ +import time +from threading import Thread + +import matplotlib.pyplot as plt +import networkx as nx + +from SelfStabilization.RWLock import * + +class SharedMemoryNode: + def __init__(self, topology, node_index, K): + self.topology = topology + self.node_index = node_index + self.K = K + + self.thread = None + + self.neighbors = [] + + for u, v in list(topology.G.edges): + if v == self.node_index: + self.neighbors.append(u) + elif u == self.node_index: + self.neighbors.append(v) + + self.parent_node = RWLockedVal(None) + self.root_node = RWLockedVal(node_index) + self.distance_to_root = RWLockedVal(0) + + def setup(self): + pass + + def thread_handler(self): + raise NotImplementedError(f'method thread_handler is not implemented for {self.__class__.__name__}') + + def start(self): + self.thread = Thread(target=self.thread_handler) + self.thread.start() + + def __str__(self): + return f'{self.__class__.__name__}: {self.node_index}' + + def __repr__(self): + return self.__str__() + + +class SharedMemoryTopology: + def __init__(self): + self.G = None + self.node_count = None + + self.nodes = dict() + self.stable_statuses = dict() + + self.stable = RWLockedVal(False) + + def construct_from_tree(self, G, NodeClass, args=[]): + self.G = G + self.node_count = len(list(G.nodes)) + + for i in list(G.nodes): + self.nodes[i] = NodeClass(self, i, *args) + self.stable_statuses[i] = RWLockedVal(False) + + for i in list(G.nodes): + self.nodes[i].setup() + + def plot_base_graph(self): # unused function + nx.draw(self.G, nx.drawing.spring_layout(self.G), node_color=(['b'] * self.node_count), with_labels=True, font_weight='bold') + plt.draw() + + def plot_directed_graph(self): + directed_graph = nx.DiGraph() + + directed_graph.add_nodes_from(self.G.nodes) + + for node in self.nodes.values(): + if node.parent_node.val is not None: + directed_graph.add_edge(node.node_index, node.parent_node.val) + + nx.draw(directed_graph, nx.drawing.spring_layout(directed_graph), node_color=(['b'] * self.node_count), with_labels=True, font_weight='bold') + plt.draw() + + def stable_check_handler(self): + while True: + stable = True + + for locked_v in self.stable_statuses.values(): + if not locked_v.val: + stable = False + break + + if stable: + self.stable.set(True) + break + + time.sleep(0.0001) + + def start(self): + for node in self.nodes.values(): + node.start() + + # stable_check_thread = Thread(target=self.stable_check_handler) + # stable_check_thread.start() + self.stable_check_handler() + + def plot(self): + self.plot_directed_graph() diff --git a/tests/SelfStabilization/test.py b/tests/SelfStabilization/test.py new file mode 100644 index 0000000..f5e689d --- /dev/null +++ b/tests/SelfStabilization/test.py @@ -0,0 +1,29 @@ +import os +import sys + +sys.path.insert(0, os.getcwd()) + +import matplotlib.pyplot as plt + +from SelfStabilization.AroraGouda import * +from SelfStabilization.AfekKuttenYang import * + + +def main(): + G = nx.random_geometric_graph(7, 0.8) + + topology = SharedMemoryTopology() + topology.construct_from_tree(G, AroraGoudaNode, args=[len(G.nodes)]) # K value + + topology.plot_base_graph() + plt.show() + + plt.clf() + + topology.start() + topology.plot() + + plt.show() + +if __name__ == "__main__": + main() diff --git a/tests/SelfStabilization/testbenchmark.py b/tests/SelfStabilization/testbenchmark.py new file mode 100644 index 0000000..04dc05f --- /dev/null +++ b/tests/SelfStabilization/testbenchmark.py @@ -0,0 +1,41 @@ +import os +import sys + +sys.path.insert(0, os.getcwd()) + +import time + +import matplotlib.pyplot as plt + +from SelfStabilization.AroraGouda import * +from SelfStabilization.AfekKuttenYang import * + +def eprint(*args, **kwargs): + print(*args, file=sys.stderr, **kwargs) + +def main(): + for n in range(15, 50 + 1): + G = nx.random_geometric_graph(n, 0.8) + + topology1 = SharedMemoryTopology() + topology1.construct_from_tree(G, AroraGoudaNode, args=[len(G.nodes)]) # K value + + topology2 = SharedMemoryTopology() + topology2.construct_from_tree(G, AfekKuttenYangNode, args=[max(list(G.nodes))]) + + eprint('started') + + start_time = time.monotonic() + topology1.start() + elapsed1 = time.monotonic() - start_time + eprint('skip') + + start_time = time.monotonic() + topology2.start() + elapsed2 = time.monotonic() - start_time + + print(f'{elapsed1};{elapsed2}') + eprint(f'{elapsed1};{elapsed2}') + +if __name__ == "__main__": + main() -- GitLab From f08ec41b71c2ff3869319815421313436e84b44b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ozan=20Ak=C4=B1n?= Date: Wed, 26 May 2021 01:54:50 +0300 Subject: [PATCH 2/3] Added self to contributor list --- CONTRIBUTORS.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 6b324da..3804efa 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -1,3 +1,4 @@ # Contributors - [Berker Acır](https://github.com/berkeracir) +- [Ozan Akın](https://github.com/oznakn) -- GitLab From 31547357fee268405177fbc78f159c7c23505878 Mon Sep 17 00:00:00 2001 From: WINS Laboratory Date: Thu, 26 Aug 2021 13:31:35 +0300 Subject: [PATCH 3/3] Update CONTRIBUTORS.md --- CONTRIBUTORS.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index 3804efa..3a1d36c 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -1,4 +1,7 @@ # Contributors - [Berker Acır](https://github.com/berkeracir) +- [Osman Ufuk Yağmur](https://github.com/VengerA) +- [Berke Tezergil](https://github.com/btezergil) +- [Saim Sunel](https://github.com/SaimSUNEL) - [Ozan Akın](https://github.com/oznakn) -- GitLab