From 79e6b7d5979e4abc2b44114f0cd63c6e57d39bef Mon Sep 17 00:00:00 2001 From: Ali Atli Date: Mon, 17 May 2021 09:02:16 +0300 Subject: [PATCH 1/5] election works in the most basic case --- Ahc.py | 13 +- Channels.py | 24 +++ Consensus/Raft/__init__.py | 0 Consensus/Raft/log.py | 57 +++++++ Consensus/Raft/states.py | 289 ++++++++++++++++++++++++++++++++++++ Consensus/__init__.py | 0 Consensus/raft_component.py | 81 ++++++++++ tests/test.py | 192 ++++++++++++------------ tests/testraftconsensus.py | 69 +++++++++ 9 files changed, 632 insertions(+), 93 deletions(-) create mode 100644 Consensus/Raft/__init__.py create mode 100644 Consensus/Raft/log.py create mode 100644 Consensus/Raft/states.py create mode 100644 Consensus/__init__.py create mode 100644 Consensus/raft_component.py create mode 100644 tests/testraftconsensus.py diff --git a/Ahc.py b/Ahc.py index b2d5487..70bd8f4 100644 --- a/Ahc.py +++ b/Ahc.py @@ -153,13 +153,22 @@ class ComponentRegistry: for p in connectedcmp: print(f"\t{i} {p.componentname}.{p.componentinstancenumber}") + def get_non_channel_components(self): + res = [] + for itemkey in self.components: + cmp = self.components[itemkey] + if cmp.componentname.find("Channel") != -1: + continue + res.append(cmp) + return res + registry = ComponentRegistry() class ComponentModel: terminated = False def on_init(self, eventobj: Event): - # print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + print(f"Initializing {self.componentname}.{self.componentinstancenumber}") pass def on_message_from_bottom(self, eventobj: Event): @@ -359,4 +368,4 @@ class Topology: nx.draw(self.G, self.nodepos, node_color=self.nodecolors, with_labels=True, font_weight='bold') plt.draw() print(self.nodecolors) - #self.lock.release() + #self.lock.release() \ No newline at end of file diff --git a/Channels.py b/Channels.py index a6e9bbd..49f45e6 100644 --- a/Channels.py +++ b/Channels.py @@ -75,6 +75,30 @@ class Channel(ComponentModel): t.start() t1.start() + +class BasicLossyChannel(Channel): + def __init__(self, componentname, componentinstancenum, loss_percentage=0): + super().__init__(componentname, componentinstancenum,) + self.loss_percentage = loss_percentage + # Overwrite onDeliverToComponent if you want to do something in the last pipeline stage + # onDeliver will deliver the message from the channel to the receiver component using messagefromchannel event + def on_deliver_to_component(self, eventobj: Event): + callername = eventobj.eventsource.componentinstancenumber + for item in self.connectors: + callees = self.connectors[item] + for callee in callees: + calleename = callee.componentinstancenumber + # print(f"I am connected to {calleename}. Will check if I have to distribute it to {item}") + if calleename == callername: + pass + else: + randomnum = random.uniform(0, 1) + # print("random number here is " , randomnum, " for ", self.componentinstancenumber) + if randomnum >= self.loss_percentage: + myevent = Event(eventobj.eventsource, EventTypes.MFRB, eventobj.eventcontent, self.componentinstancenumber) + callee.trigger_event(myevent) + + class AHCChannelError(Exception): pass diff --git a/Consensus/Raft/__init__.py b/Consensus/Raft/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Consensus/Raft/log.py b/Consensus/Raft/log.py new file mode 100644 index 0000000..65c64b0 --- /dev/null +++ b/Consensus/Raft/log.py @@ -0,0 +1,57 @@ +import logging + +logger = logging.getLogger(__name__) + + +class LogManager: + """Instantiate and manage the components of the "Log" subsystem. + That is: the log, the compactor and the state machine.""" + + def __init__(self): + self.log = [] + self.commitIndex = len(self.log) + + def __getitem__(self, index): + """Get item or slice from the log, based on absolute log indexes. + Item(s) already compacted cannot be requested.""" + if type(index) is slice: + start = index.start + stop = index.stop + return self.log[start:stop:index.step] + elif type(index) is int: + return self.log[index] + + @property + def index(self): + """Log tip index.""" + return len(self.log) + + def term(self, index=None): + """Return a term given a log index. If no index is passed, return + log tip term.""" + if index is None: + return self.term(self.index) + elif index == -1: + return 0 + elif not len(self.log): + return 0 + else: + return self.log[index]['term'] + + def append_entries(self, entries, prevLogIndex): + start = prevLogIndex + if len(self.log) >= start: + del self.log[:start] + self.log.extend(entries) + else: + self.log.extend(entries) + if entries: + logger.debug('Appending. New log: %s', self.log) + + def commit(self, leaderCommit): + if leaderCommit <= self.commitIndex: + return + + self.commitIndex = min(leaderCommit, self.index) # no overshoots + logger.debug('Advancing commit to %s', self.commitIndex) + # above is the actual commit operation, just incrementing the counter! diff --git a/Consensus/Raft/states.py b/Consensus/Raft/states.py new file mode 100644 index 0000000..671c9d9 --- /dev/null +++ b/Consensus/Raft/states.py @@ -0,0 +1,289 @@ +import asyncio +import logging +import statistics +import sys +import threading +from random import randrange +from os.path import join +from threading import Thread + +from Ahc import ComponentRegistry +from Consensus.Raft.log import LogManager +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + +logger = logging.getLogger(__name__) + +class State: + """Abstract state for subclassing.""" + + def __init__(self, old_state=None, server=None): + """State is initialized passing an orchestator instance when first + deployed. Subsequent state changes use the old_state parameter to + preserve the environment. + """ + if old_state: + self.server = old_state.server + self.votedFor = old_state.votedFor + self.currentTerm = old_state.currentTerm + self.leaderId = old_state.leaderId + self.log = old_state.log + else: + self.server = server + self.votedFor = None + self.currentTerm = 0 + self.leaderId = None + self.log = LogManager() + + def data_received_peer(self, peer, msg): + """Receive peer messages from orchestrator and pass them to the + appropriate method.""" + logger.debug('For component %s Received %s from %s', msg['type'], self.server.componentinstancenumber ,peer.componentinstancenumber) + + if self.currentTerm < msg['term']: + self.currentTerm = msg['term'] + if not type(self) is Follower: + logger.info('For component %s Remote term is higher, converting to Follower', self.server.componentinstancenumber) + self.server.change_state(Follower) + self.server.state.data_received_peer(peer, msg) + return + method = getattr(self, 'on_peer_' + msg['type'], None) + if method: + method(peer, msg) + else: + logger.info('For component %s Unrecognized message from %s: %s', self.server.componentinstancenumber,peer.componentinstancenumber, msg) + + def data_received_client(self, client, msg): + """Receive client messages from orchestrator and pass them to the + appropriate method.""" + method = getattr(self, 'on_client_' + msg['type'], None) + if method: + method(client, msg) + + def on_client_append(self, client, msg): + """Redirect client to leader upon receiving a client_append message.""" + msg = {'type': 'redirect', + 'leader': self.leaderId} + client.send(msg) + logger.debug('Redirect client %s to leader', + self.leaderId) + + def on_client_get(self, client, msg): + """Return state machine to client.""" + client.send(self.log) + + +class Follower(State): + """Follower state.""" + + def __init__(self, old_state=None, server=None): + """Initialize parent and start election timer.""" + super().__init__(old_state, server) + self.votedFor = None + self.restart_election_timer() + + def teardown(self): + """Stop timers before changing state.""" + self.election_timer.cancel() + + def restart_election_timer(self): + """Delays transition to the Candidate state by timer.""" + if hasattr(self, 'election_timer'): + self.election_timer.cancel() + + timeout = randrange(1, 4) + #loop = get_or_create_eventloop() + #self.election_timer = loop. \ + # call_later(timeout, self.server.change_state, Candidate) + self.election_timer = threading.Timer(timeout, self.server.change_state, (Candidate,)) + self.election_timer.start() + logger.debug(' For component %s Election timer restarted: %s s', self.server.componentinstancenumber,timeout) + + def on_peer_request_vote(self, peer, msg): + """Grant this node's vote to Candidates.""" + term_is_current = msg['term'] >= self.currentTerm + can_vote = self.votedFor is None + index_is_current = (msg['lastLogTerm'] > self.log.term() or + (msg['lastLogTerm'] == self.log.term() and + msg['lastLogIndex'] >= self.log.index)) + granted = term_is_current and can_vote and index_is_current + + if granted: + self.votedFor = msg['candidateId'] + self.restart_election_timer() + + logger.debug('For component %s Voting for %s. Term:%s Vote:%s Index:%s', self.server.componentinstancenumber, + peer.componentinstancenumber, term_is_current, can_vote, index_is_current) + + response = {'type': 'response_vote', 'voteGranted': granted, + 'term': self.currentTerm} + self.server.send_to_component(peer, response) + + def on_peer_append_entries(self, peer, msg): + + self.restart_election_timer() + + term_is_current = msg['term'] >= self.currentTerm + prev_log_term_match = msg['prevLogTerm'] is None or \ + self.log.term(msg['prevLogIndex']) == msg['prevLogTerm'] + success = term_is_current and prev_log_term_match + + if success: + self.log.append_entries(msg['entries'], msg['prevLogIndex']) + self.log.commit(msg['leaderCommit']) + self.leaderId = msg['leaderId'] + logger.debug('For component %s Log index is now %s', self.server.componentinstancenumber, self.log.index) + else: + logger.warning('For component %s Could not append entries. cause: %s', self.server.componentinstancenumber, + 'wrong\ + term' if not term_is_current else 'prev log term mismatch') + + resp = {'type': 'response_append', 'success': success, + 'term': self.currentTerm, + 'matchIndex': self.log.index} + self.server.send_to_component(peer, resp) + + +class Candidate(Follower): + """Candidate state. Notice that this state subclasses Follower.""" + + def __init__(self, old_state=None, server=None): + """Initialize parent, increase term, vote for self, ask for votes.""" + super().__init__(old_state, server) + self.currentTerm += 1 + self.votes_count = 0 + logger.info('New Election. Term: %s', self.currentTerm) + #TODO: put it in seperate thread + def vote_self(): + self.votedFor = self.server.componentinstancenumber + self.on_peer_response_vote( + self.votedFor, {'voteGranted': True}) + + vote_self() + self.send_vote_requests() + + + + def send_vote_requests(self): + """Ask peers for votes.""" + logger.info(' For component %s Broadcasting request_vote', self.server.componentinstancenumber) + msg = {'type': 'request_vote', 'term': self.currentTerm, + 'candidateId': self.votedFor, + 'lastLogIndex': self.log.index, + 'lastLogTerm': self.log.term()} + self.server.broadcast_peers(msg) + + def on_peer_append_entries(self, peer, msg): + """Transition back to Follower upon receiving an append_entries.""" + logger.debug('For component %s Converting to Follower', self.server.componentinstancenumber) + self.server.change_state(Follower) + self.server.state.on_peer_append_entries(peer, msg) + + def on_peer_response_vote(self, peer, msg): + """Register peers votes, transition to Leader upon majority vote.""" + self.votes_count += msg['voteGranted'] + logger.info(' For component %s Vote count: %s', self.server.componentinstancenumber, self.votes_count) + if self.votes_count > len(self.server.registry.get_non_channel_components()) / 2: + self.server.change_state(Leader) + + +class Leader(State): + """Leader state.""" + + def __init__(self, old_state=None, server=None): + """Initialize parent, sets leader variables, start periodic + append_entries""" + super().__init__(old_state, server) + logger.info('For component %s Leader of term: %s', self.server.componentinstancenumber,self.currentTerm) + self.leaderId = self.server.componentinstancenumber + cluster = self.server.registry.get_non_channel_components() + cluster_names = [c.componentinstancenumber for c in cluster] + self.matchIndex = {p: 0 for p in cluster_names} + self.nextIndex = {p: self.log.commitIndex + 1 for p in self.matchIndex} + self.waiting_clients = {} + self.send_append_entries() + + self.log.append_entries([{'term': self.currentTerm, + 'data': { + 'key': 'leaderId', + 'value': self.server.componentinstancenumber, + }}], + self.log.index) + self.log.commit(self.log.index) + + def teardown(self): + """Stop timers before changing state.""" + self.append_timer.cancel() + + for clients in self.waiting_clients.values(): + for client in clients: + client.send({'type': 'result', 'success': False}) + logger.error('Sent unsuccessful response to client') + + def send_append_entries(self): + """Send append_entries to the cluster, containing: + - nothing: if remote node is up to date. + - compacted log: if remote node has to catch up. + - log entries: if available. + Finally schedules itself for later esecution.""" + cluster = self.server.registry.get_non_channel_components() + for peer in cluster: + if peer.componentinstancenumber == self.server.componentinstancenumber: + continue + msg = {'type': 'append_entries', + 'term': self.currentTerm, + 'leaderCommit': self.log.commitIndex, + 'leaderId': self.server.componentinstancenumber, + 'prevLogIndex': self.nextIndex[peer.componentinstancenumber] - 1, + 'entries': self.log.log + } + msg.update({'prevLogTerm': self.log.term(msg['prevLogIndex'])}) + + logger.debug(' For component %s Sending %s entries to %s. Start index %s',self.server.componentinstancenumber, + len(msg['entries']), peer.componentinstancenumber, self.nextIndex[peer.componentinstancenumber]) + self.server.send_to_component(peer, msg) + + timeout = randrange(1, 4) * 10 ** -1 + #loop = get_or_create_eventloop() + #self.append_timer = loop.call_later(timeout, self.send_append_entries) + self.append_timer = threading.Timer(timeout, self.send_append_entries) + self.append_timer.start() + def on_peer_response_append(self, peer, msg): + """Handle peer response to append_entries. + If successful RPC, try to commit new entries. + If RPC unsuccessful, backtrack.""" + if msg['success']: + self.matchIndex[peer] = msg['matchIndex'] + self.nextIndex[peer] = msg['matchIndex'] + 1 + + self.matchIndex[self.server.componentinstancenumber] = self.log.index + self.nextIndex[self.server.componentinstancenumber] = self.log.index + 1 + index = statistics.median_low(self.matchIndex.values()) + self.log.commit(index) + self.send_client_append_response() + else: + self.nextIndex[peer.componentinstancenumber] = max(0, self.nextIndex[peer.componentinstancenumber] - 1) + + def on_client_append(self, client, msg): + """Append new entries to Leader log.""" + entry = {'term': self.currentTerm, 'data': msg['data']} + + self.log.append_entries([entry], self.log.index) + if self.log.index in self.waiting_clients: + self.waiting_clients[self.log.index].append(client) + else: + self.waiting_clients[self.log.index] = [client] + self.on_peer_response_append( + self.server.componentinstancenumber, {'success': True, + 'matchIndex': self.log.commitIndex}) + + def send_client_append_response(self): + """Respond to client upon commitment of log entries.""" + to_delete = [] + for client_index, clients in self.waiting_clients.items(): + if client_index <= self.log.commitIndex: + for client in clients: + client.send({'type': 'result', 'success': True}) # TODO + logger.debug('Sent successful response to client') + to_delete.append(client_index) + for index in to_delete: + del self.waiting_clients[index] diff --git a/Consensus/__init__.py b/Consensus/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Consensus/raft_component.py b/Consensus/raft_component.py new file mode 100644 index 0000000..fb3242a --- /dev/null +++ b/Consensus/raft_component.py @@ -0,0 +1,81 @@ +import sys + +from Ahc import ComponentModel, EventTypes, ConnectorTypes +from Ahc import Event + +from Consensus.Raft.states import Follower +import logging +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + +logger = logging.getLogger(__name__) + + +class RaftConsensusComponent(ComponentModel): + + def __init__(self, componentname, componentinstancenumber): + super().__init__(componentname, componentinstancenumber, 1) + self.state = None + + def on_message_from_bottom(self, eventobj: Event): + self.data_received_peer(eventobj.eventsource, eventobj.eventcontent) + + def on_message_from_top(self, eventobj: Event): + print("client asking for a trouble...") + self.data_received_client(eventobj.eventsource, eventobj.eventcontent) + + def change_state(self, new_state): + self.state.teardown() + logger.info('For component %s State change:' + new_state.__name__, self.componentinstancenumber) + self.state = new_state(old_state=self.state) + + def data_received_peer(self, sender, message): + self.state.data_received_peer(sender, message) + + def data_received_client(self, client, message): + self.state.data_received_client(client, message) + + def send(self, client, message): + client.send(message) + + def send_to_component(self, recipient, message): + if recipient != self: + for conn in self.connectors[ConnectorTypes.DOWN]: + if conn.componentinstancenumber.find(recipient.componentinstancenumber) != -1: + conn.trigger_event(Event(self, EventTypes.MFRT, message)) + + def broadcast_peers(self, message): + self.send_down(Event(self, EventTypes.MFRT, message)) + + def on_init(self, eventobj: Event): + self.state = Follower(server=self) + + +class ConsensusComponent(ComponentModel): + + def __init__(self, component_name, component_id): + super().__init__(component_name, component_id, 1) + self._commitIndex = 0 + self._currentTerm = 0 + self._lastApplied = 0 + self._lastLogIndex = 0 + self._lastLogTerm = None + self._state = Follower() + self._log = [] + + def set_log(self, log): + self._log = log + + def on_message(self, message): + state, response = self._state.on_message(message) + self._state = state + + def on_message_from_bottom(self, eventobj: Event): + print(f"{EventTypes.MFRT} {self.componentname}.{self.componentinstancenumber}") + self.on_message(eventobj) + + def send_message(self, message): + self.send_down(Event(self, EventTypes.MFRT, message)) + + # also means start election + def on_init(self, eventobj: Event): + self._state.set_server(self) diff --git a/tests/test.py b/tests/test.py index 7b217f8..91f96db 100644 --- a/tests/test.py +++ b/tests/test.py @@ -14,120 +14,130 @@ from NetworkLayers.AllSeeingEyeNetworkLayer import AllSeingEyeNetworkLayer registry = ComponentRegistry() + # define your own message types class ApplicationLayerMessageTypes(Enum): - PROPOSE = "PROPOSE" - ACCEPT = "ACCEPT" + PROPOSE = "PROPOSE" + ACCEPT = "ACCEPT" + # define your own message header structure class ApplicationLayerMessageHeader(GenericMessageHeader): - pass + pass + # define your own message payload structure class ApplicationLayerMessagePayload(GenericMessagePayload): - pass + pass + class ApplicationLayerComponent(ComponentModel): - def on_init(self, eventobj: Event): - print(f"Initializing {self.componentname}.{self.componentinstancenumber}") - - if self.componentinstancenumber == 0: - # destination = random.randint(len(Topology.G.nodes)) - destination = 1 - hdr = ApplicationLayerMessageHeader(ApplicationLayerMessageTypes.PROPOSE, self.componentinstancenumber, - destination) - payload = ApplicationLayerMessagePayload("23") - proposalmessage = GenericMessage(hdr, payload) - randdelay = random.randint(0, 5) - time.sleep(randdelay) - self.send_self(Event(self, "propose", proposalmessage)) - else: - pass - - def on_message_from_bottom(self, eventobj: Event): - try: - applmessage = eventobj.eventcontent - hdr = applmessage.header - if hdr.messagetype == ApplicationLayerMessageTypes.ACCEPT: - print(f"Node-{self.componentinstancenumber} says Node-{hdr.messagefrom} has sent {hdr.messagetype} message") - elif hdr.messagetype == ApplicationLayerMessageTypes.PROPOSE: - print(f"Node-{self.componentinstancenumber} says Node-{hdr.messagefrom} has sent {hdr.messagetype} message") - except AttributeError: - print("Attribute Error") - - # print(f"{self.componentname}.{self.componentinstancenumber}: Gotton message {eventobj.content} ") - # value = eventobj.content.value - # value += 1 - # newmsg = MessageContent( value ) - # myevent = Event( self, "agree", newmsg ) - # self.trigger_event(myevent) - - def on_propose(self, eventobj: Event): - destination = 1 - hdr = ApplicationLayerMessageHeader(ApplicationLayerMessageTypes.ACCEPT, self.componentinstancenumber, destination) - payload = ApplicationLayerMessagePayload("23") - proposalmessage = GenericMessage(hdr, payload) - self.send_down(Event(self, EventTypes.MFRT, proposalmessage)) - - def on_agree(self, eventobj: Event): - print(f"Agreed on {eventobj.eventcontent}") - - def on_timer_expired(self, eventobj: Event): - pass + def on_init(self, eventobj: Event): + print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + + if self.componentinstancenumber == 0: + # destination = random.randint(len(Topology.G.nodes)) + destination = 1 + hdr = ApplicationLayerMessageHeader(ApplicationLayerMessageTypes.PROPOSE, self.componentinstancenumber, + destination) + payload = ApplicationLayerMessagePayload("23") + proposalmessage = GenericMessage(hdr, payload) + randdelay = random.randint(0, 5) + time.sleep(randdelay) + self.send_self(Event(self, "propose", proposalmessage)) + else: + pass + + def on_message_from_bottom(self, eventobj: Event): + try: + applmessage = eventobj.eventcontent + hdr = applmessage.header + if hdr.messagetype == ApplicationLayerMessageTypes.ACCEPT: + print( + f"Node-{self.componentinstancenumber} says Node-{hdr.messagefrom} has sent {hdr.messagetype} message") + elif hdr.messagetype == ApplicationLayerMessageTypes.PROPOSE: + print( + f"Node-{self.componentinstancenumber} says Node-{hdr.messagefrom} has sent {hdr.messagetype} message") + except AttributeError: + print("Attribute Error") + + # print(f"{self.componentname}.{self.componentinstancenumber}: Gotton message {eventobj.content} ") + # value = eventobj.content.value + # value += 1 + # newmsg = MessageContent( value ) + # myevent = Event( self, "agree", newmsg ) + # self.trigger_event(myevent) + + def on_propose(self, eventobj: Event): + destination = 1 + hdr = ApplicationLayerMessageHeader(ApplicationLayerMessageTypes.ACCEPT, self.componentinstancenumber, + destination) + payload = ApplicationLayerMessagePayload("23") + proposalmessage = GenericMessage(hdr, payload) + self.send_down(Event(self, EventTypes.MFRT, proposalmessage)) + + def on_agree(self, eventobj: Event): + print(f"Agreed on {eventobj.eventcontent}") + + def on_timer_expired(self, eventobj: Event): + pass + + def __init__(self, componentname, componentinstancenumber): + super().__init__(componentname, componentinstancenumber) + self.eventhandlers["propose"] = self.on_propose + self.eventhandlers["agree"] = self.on_agree + self.eventhandlers["timerexpired"] = self.on_timer_expired - def __init__(self, componentname, componentinstancenumber): - super().__init__(componentname, componentinstancenumber) - self.eventhandlers["propose"] = self.on_propose - self.eventhandlers["agree"] = self.on_agree - self.eventhandlers["timerexpired"] = self.on_timer_expired class AdHocNode(ComponentModel): - def on_init(self, eventobj: Event): - print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + def on_init(self, eventobj: Event): + print(f"Initializing {self.componentname}.{self.componentinstancenumber}") - def on_message_from_top(self, eventobj: Event): - self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent)) + def on_message_from_top(self, eventobj: Event): + self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent)) - def on_message_from_bottom(self, eventobj: Event): - self.send_up(Event(self, EventTypes.MFRB, eventobj.eventcontent)) + def on_message_from_bottom(self, eventobj: Event): + self.send_up(Event(self, EventTypes.MFRB, eventobj.eventcontent)) - def __init__(self, componentname, componentid): - # SUBCOMPONENTS - self.appllayer = ApplicationLayerComponent("ApplicationLayer", componentid) - self.netlayer = AllSeingEyeNetworkLayer("NetworkLayer", componentid) - self.linklayer = LinkLayer("LinkLayer", componentid) - # self.failuredetect = GenericFailureDetector("FailureDetector", componentid) + def __init__(self, componentname, componentid): + # SUBCOMPONENTS + self.appllayer = ApplicationLayerComponent("ApplicationLayer", componentid) + self.netlayer = AllSeingEyeNetworkLayer("NetworkLayer", componentid) + self.linklayer = LinkLayer("LinkLayer", componentid) + # self.failuredetect = GenericFailureDetector("FailureDetector", componentid) - # CONNECTIONS AMONG SUBCOMPONENTS - self.appllayer.connect_me_to_component(ConnectorTypes.DOWN, self.netlayer) - # self.failuredetect.connectMeToComponent(PortNames.DOWN, self.netlayer) - self.netlayer.connect_me_to_component(ConnectorTypes.UP, self.appllayer) - # self.netlayer.connectMeToComponent(PortNames.UP, self.failuredetect) - self.netlayer.connect_me_to_component(ConnectorTypes.DOWN, self.linklayer) - self.linklayer.connect_me_to_component(ConnectorTypes.UP, self.netlayer) + # CONNECTIONS AMONG SUBCOMPONENTS + self.appllayer.connect_me_to_component(ConnectorTypes.DOWN, self.netlayer) + # self.failuredetect.connectMeToComponent(PortNames.DOWN, self.netlayer) + self.netlayer.connect_me_to_component(ConnectorTypes.UP, self.appllayer) + # self.netlayer.connectMeToComponent(PortNames.UP, self.failuredetect) + self.netlayer.connect_me_to_component(ConnectorTypes.DOWN, self.linklayer) + self.linklayer.connect_me_to_component(ConnectorTypes.UP, self.netlayer) - # Connect the bottom component to the composite component.... - self.linklayer.connect_me_to_component(ConnectorTypes.DOWN, self) - self.connect_me_to_component(ConnectorTypes.UP, self.linklayer) + # Connect the bottom component to the composite component.... + self.linklayer.connect_me_to_component(ConnectorTypes.DOWN, self) + self.connect_me_to_component(ConnectorTypes.UP, self.linklayer) + + super().__init__(componentname, componentid) - super().__init__(componentname, componentid) def main(): - # G = nx.Graph() - # G.add_nodes_from([1, 2]) - # G.add_edges_from([(1, 2)]) - # nx.draw(G, with_labels=True, font_weight='bold') - # plt.draw() - G = nx.random_geometric_graph(19, 0.5) - nx.draw(G, with_labels=True, font_weight='bold') - plt.draw() + # G = nx.Graph() + # G.add_nodes_from([1, 2]) + # G.add_edges_from([(1, 2)]) + # nx.draw(G, with_labels=True, font_weight='bold') + # plt.draw() + G = nx.random_geometric_graph(19, 0.5) + nx.draw(G, with_labels=True, font_weight='bold') + plt.draw() + + topo = Topology() + topo.construct_from_graph(G, AdHocNode, P2PFIFOPerfectChannel) + topo.start() - topo = Topology() - topo.construct_from_graph(G, AdHocNode, P2PFIFOPerfectChannel) - topo.start() + plt.show() # while (True): pass - plt.show() # while (True): pass if __name__ == "__main__": - main() + main() diff --git a/tests/testraftconsensus.py b/tests/testraftconsensus.py new file mode 100644 index 0000000..ac7a146 --- /dev/null +++ b/tests/testraftconsensus.py @@ -0,0 +1,69 @@ +import random +import time +from threading import Thread + +import matplotlib.pyplot as plt +import networkx as nx + +from Ahc import ComponentModel, Event, ConnectorTypes, Topology, EventTypes, GenericMessage, GenericMessageHeader +from Ahc import ComponentRegistry +from Broadcasting.Broadcasting import ControlledFlooding +from Channels import P2PFIFOFairLossChannel, Channel, BasicLossyChannel +from Consensus.raft_component import ConsensusComponent, RaftConsensusComponent +from LinkLayers.GenericLinkLayer import LinkLayer +from itertools import combinations + +registry = ComponentRegistry() + + +class AdHocNode(ComponentModel): + def on_message_from_top(self, eventobj: Event): + time.sleep(0.1) + print("received event from", eventobj.eventsource.componentinstancenumber) + self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent)) + + def on_message_from_bottom(self, eventobj: Event): + self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent + self.componentinstancenumber)) + + def __init__(self, componentname, componentid): + super().__init__(componentname, componentid) + + +# self.eventhandlers[EventTypes.MFRT] = self.onMessageFromTop +# self.eventhandlers["messagefromchannel"] = self.onMessageFromChannel + +class Client: + + def send(self, message): + print(tuple(message)) + + +def main(): + nodes = ['A', 'B', 'C', 'D', 'E'] + #nodes = ['A', 'B', 'C'] + # nodes = ['A', 'B'] + + edges = combinations(nodes, 2) + G = nx.Graph() + G.add_nodes_from(nodes) + G.add_edges_from(edges) + topo = Topology() + topo.construct_from_graph(G, RaftConsensusComponent, BasicLossyChannel) + client = Client() + + topo.start() + time.sleep(5) + a_node: RaftConsensusComponent = topo.nodes.get('A') + cluster = a_node.registry.get_non_channel_components() + a_node = topo.nodes.get(cluster[0].state.leaderId) + a_node.data_received_client(client, {'type':'append', 'data': { + 'key': 'hello', + 'value': 'hello',}}) + a_node.data_received_client(client, {'type':'append', 'data': { + 'key': 'hello', + 'value': 'hello',}}) + waitforit = input("hit something to exit...") + + +if __name__ == "__main__": + main() -- GitLab From 6f2744b67382afb3a7aef5fed59d9e57bc3663f8 Mon Sep 17 00:00:00 2001 From: Ali Atli Date: Mon, 24 May 2021 02:57:28 +0300 Subject: [PATCH 2/5] paxos initial impl and some rafactoring for raft --- Consensus/Paxos/__init__.py | 0 Consensus/Paxos/paxos_component.py | 444 +++++++++++++++++++++++++ Consensus/{ => Raft}/raft_component.py | 31 -- Consensus/Raft/states.py | 70 ++-- tests/testpaxosconsensus.py | 39 +++ tests/testraftconsensus.py | 41 +-- 6 files changed, 518 insertions(+), 107 deletions(-) create mode 100644 Consensus/Paxos/__init__.py create mode 100644 Consensus/Paxos/paxos_component.py rename Consensus/{ => Raft}/raft_component.py (63%) create mode 100644 tests/testpaxosconsensus.py diff --git a/Consensus/Paxos/__init__.py b/Consensus/Paxos/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/Consensus/Paxos/paxos_component.py b/Consensus/Paxos/paxos_component.py new file mode 100644 index 0000000..9aff715 --- /dev/null +++ b/Consensus/Paxos/paxos_component.py @@ -0,0 +1,444 @@ + +import collections +import sys + +from Ahc import ComponentModel, Event, ConnectorTypes, EventTypes, ComponentRegistry +import logging +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + +logger = logging.getLogger(__name__) + +ProposalID = collections.namedtuple('ProposalID', ['number', 'uid']) + +class PaxosMessage(object): + ''' + Base class for all messages defined in this module + ''' + from_uid = None # Set by subclass constructor + + +class Prepare(PaxosMessage): + ''' + Prepare messages should be broadcast to all Acceptors. + ''' + + def __init__(self, from_uid, proposal_id): + self.from_uid = from_uid + self.proposal_id = proposal_id + + +class NackPrepare(PaxosMessage): + ''' + NACKs are technically optional though few practical applications will + want to omit their use. They are used to signal a proposer that their + current proposal number is out of date and that a new one should be + chosen. NACKs may be sent in response to both Prepare and Accept + messages + ''' + + def __init__(self, from_uid, proposer_uid, proposal_id, promised_proposal_id): + self.from_uid = from_uid + self.proposal_id = proposal_id + self.proposer_uid = proposer_uid + self.promised_proposal_id = promised_proposal_id + +class NackAccept(PaxosMessage): + ''' + NACKs are technically optional though few practical applications will + want to omit their use. They are used to signal a proposer that their + current proposal number is out of date and that a new one should be + chosen. NACKs may be sent in response to both Prepare and Accept + messages + ''' + + def __init__(self, from_uid, proposer_uid, proposal_id, promised_proposal_id): + self.from_uid = from_uid + self.proposal_id = proposal_id + self.proposer_uid = proposer_uid + self.promised_proposal_id = promised_proposal_id + +class Promise(PaxosMessage): + ''' + Promise messages should be sent to at least the Proposer specified in + the proposer_uid field + ''' + + def __init__(self, from_uid, proposer_uid, proposal_id, last_accepted_id, last_accepted_value): + self.from_uid = from_uid + self.proposer_uid = proposer_uid + self.proposal_id = proposal_id + self.last_accepted_id = last_accepted_id + self.last_accepted_value = last_accepted_value + + +class Accept(PaxosMessage): + ''' + Accept messages should be broadcast to all Acceptors + ''' + + def __init__(self, from_uid, proposal_id, proposal_value): + self.from_uid = from_uid + self.proposal_id = proposal_id + self.proposal_value = proposal_value + + +class Accepted(PaxosMessage): + ''' + Accepted messages should be sent to all Learners + ''' + + def __init__(self, from_uid, proposal_id, proposal_value): + self.from_uid = from_uid + self.proposal_id = proposal_id + self.proposal_value = proposal_value + + +class Resolution(PaxosMessage): + ''' + Optional message used to indicate that the final value has been selected + ''' + + def __init__(self, from_uid, value): + self.from_uid = from_uid + self.value = value + + +class InvalidMessageError(Exception): + ''' + Thrown if a PaxosMessage subclass is passed to a class that does not + support it + ''' + + +class MessageHandler(object): + + def receive(self, msg): + ''' + Message dispatching function. This function accepts any PaxosMessage subclass and calls + the appropriate handler function + ''' + handler = getattr(self, 'receive_' + msg.__class__.__name__.lower(), None) + if handler is None: + raise InvalidMessageError('Receiving class does not support messages of type: ' + msg.__class__.__name__) + return handler(msg) + + +class Proposer(MessageHandler): + ''' + The 'leader' attribute is a boolean value indicating the Proposer's + belief in whether or not it is the current leader. This is not a reliable + value as multiple nodes may simultaneously believe themselves to be the + leader. + ''' + + leader = False + proposed_value = None + proposal_id = ProposalID(0, ' ') + highest_accepted_id = ProposalID(0, ' ') + promises_received = None + nacks_received = None + current_prepare_msg = None + current_accept_msg = None + + def __init__(self, network_uid, quorum_size): + self.network_uid = network_uid + self.quorum_size = quorum_size + self.proposal_id = ProposalID(0, network_uid) + self.highest_proposal_id = ProposalID(0, network_uid) + + def propose_value(self, value): + ''' + Sets the proposal value for this node iff this node is not already aware of + a previous proposal value. If the node additionally believes itself to be + the current leader, an Accept message will be returned + ''' + if self.proposed_value is None: + self.proposed_value = value + + if self.leader: + self.current_accept_msg = Accept(self.network_uid, self.proposal_id, value) + return self.current_accept_msg + else: + return None + + else: + return None + + def prepare(self): + ''' + Returns a new Prepare message with a proposal id higher than + that of any observed proposals. A side effect of this method is + to clear the leader flag if it is currently set. + ''' + + self.leader = False + self.promises_received = set() + self.nacks_received = set() + self.proposal_id = ProposalID(self.highest_proposal_id.number + 1, self.network_uid) + self.highest_proposal_id = self.proposal_id + self.current_prepare_msg = Prepare(self.network_uid, self.proposal_id) + return self.current_prepare_msg + + def observe_proposal(self, proposal_id): + ''' + Optional method used to update the proposal counter as proposals are + seen on the network. When co-located with Acceptors and/or Learners, + this method may be used to avoid a message delay when attempting to + assume leadership (guaranteed NACK if the proposal number is too low). + This method is automatically called for all received Promise and Nack + messages. + ''' + if proposal_id > self.highest_proposal_id: + self.highest_proposal_id = proposal_id + + def receive_nack(self, msg): + ''' + Returns a new Prepare message if the number of Nacks received reaches + a quorum. + ''' + self.observe_proposal(msg.promised_proposal_id) + + if msg.proposal_id == self.proposal_id and self.nacks_received is not None: + self.nacks_received.add(msg.from_uid) + + if len(self.nacks_received) == self.quorum_size: + return self.prepare() # Lost leadership or failed to acquire it + + def receive_promise(self, msg): + ''' + Returns an Accept messages if a quorum of Promise messages is achieved + ''' + self.observe_proposal(msg.proposal_id) + logger.info("For component %s Promise received from %s", self.network_uid.componentinstancenumber, + msg.from_uid.componentinstancenumber) + if not self.leader and msg.proposal_id == self.proposal_id and msg.from_uid not in self.promises_received: + + self.promises_received.add(msg.from_uid) + + if msg.last_accepted_id is not None and msg.last_accepted_id > self.highest_accepted_id: + self.highest_accepted_id = msg.last_accepted_id + if msg.last_accepted_value is not None: + self.proposed_value = msg.last_accepted_value + # apart from self + if len(self.promises_received) == self.quorum_size: + self.leader = True + + if self.proposed_value is not None: + self.current_accept_msg = Accept(self.network_uid, self.proposal_id, self.proposed_value) + return self.current_accept_msg + + +class Acceptor(MessageHandler): + ''' + Acceptors act as the fault-tolerant memory for Paxos. To ensure correctness + in the presense of failure, Acceptors must be able to remember the promises + they've made even in the event of power outages. Consequently, any changes + to the promised_id, accepted_id, and/or accepted_value must be persisted to + stable media prior to sending promise and accepted messages. + + When an Acceptor instance is composed alongside a Proposer instance, it + is generally advantageous to call the proposer's observe_proposal() + method when methods of this class are called. + ''' + + def __init__(self, network_uid, promised_id=None, accepted_id=None, accepted_value=None): + ''' + promised_id, accepted_id, and accepted_value should be provided if and only if this + instance is recovering from persistent state. + ''' + self.network_uid = network_uid + self.promised_id = promised_id + self.accepted_id = accepted_id + self.accepted_value = accepted_value + + def receive_prepare(self, msg): + ''' + Returns either a Promise or a Nack in response. The Acceptor's state must be persisted to disk + prior to transmitting the Promise message. + ''' + if msg is None or msg.proposal_id is None: + return NackPrepare(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) + + if self.promised_id is None or msg.proposal_id >= self.promised_id: + self.promised_id = msg.proposal_id + return Promise(self.network_uid, msg.from_uid, self.promised_id, self.accepted_id, self.accepted_value) + else: + return NackPrepare(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) + + def receive_accept(self, msg): + ''' + Returns either an Accepted or Nack message in response. The Acceptor's state must be persisted + to disk prior to transmitting the Accepted message. + ''' + logger.info("For component %s Accept received from %s", self.network_uid.componentinstancenumber, + msg.from_uid.componentinstancenumber) + if msg is None or msg.proposal_id is None: + return NackAccept(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) + if self.promised_id is None or msg.proposal_id >= self.promised_id: + self.promised_id = msg.proposal_id + self.accepted_id = msg.proposal_id + self.accepted_value = msg.proposal_value + return Accepted(self.network_uid, msg.proposal_id, msg.proposal_value) + else: + return NackAccept(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) + + +class Learner(MessageHandler): + ''' + This class listens to Accepted messages, determines when the final value is + selected, and tracks which peers have accepted the final value. + ''' + + class ProposalStatus(object): + __slots__ = ['accept_count', 'retain_count', 'acceptors', 'value'] + + def __init__(self, value): + self.accept_count = 0 + self.retain_count = 0 + self.acceptors = set() + self.value = value + + def __init__(self, network_uid, quorum_size): + self.network_uid = network_uid + self.quorum_size = quorum_size + self.proposals = dict() # maps proposal_id => ProposalStatus + self.acceptors = dict() # maps from_uid => last_accepted_proposal_id + self.final_value = None + self.final_acceptors = None # Will be a set of acceptor UIDs once the final value is chosen + self.final_proposal_id = ProposalID(0, ' ') + + def receive_accepted(self, msg): + ''' + Called when an Accepted message is received from an acceptor. Once the final value + is determined, the return value of this method will be a Resolution message containing + the consentual value. Subsequent calls after the resolution is chosen will continue to add + new Acceptors to the final_acceptors set and return Resolution messages. + ''' + if self.final_value is not None: + if msg.proposal_id >= self.final_proposal_id and msg.proposal_value == self.final_value: + self.final_acceptors.add(msg.from_uid) + return Resolution(self.network_uid, self.final_value) + + last_pn = self.acceptors.get(msg.from_uid) + + if last_pn is not None and msg.proposal_id <= last_pn: + return # Old message + + self.acceptors[msg.from_uid] = msg.proposal_id + + if last_pn is not None: + ps = self.proposals[last_pn] + ps.retain_count -= 1 + ps.acceptors.remove(msg.from_uid) + if ps.retain_count == 0: + del self.proposals[last_pn] + + if not msg.proposal_id in self.proposals: + self.proposals[msg.proposal_id] = Learner.ProposalStatus(msg.proposal_value) + + ps = self.proposals[msg.proposal_id] + + assert msg.proposal_value == ps.value, 'Value mismatch for single proposal!' + + ps.accept_count += 1 + ps.retain_count += 1 + ps.acceptors.add(msg.from_uid) + + if ps.accept_count == self.quorum_size: + self.final_proposal_id = msg.proposal_id + self.final_value = msg.proposal_value + self.final_acceptors = ps.acceptors + self.proposals = None + self.acceptors = None + + return Resolution(self.network_uid, self.final_value) + + +class PaxosInstance(Proposer, Acceptor, Learner): + ''' + Aggregate Proposer, Accepter, & Learner class. + ''' + + def __init__(self, network_uid, quorum_size, promised_id=ProposalID(0, ' '), accepted_id=ProposalID(0, ' '), + accepted_value=None): + Proposer.__init__(self, network_uid, quorum_size) + Acceptor.__init__(self, network_uid, promised_id, accepted_id, accepted_value) + Learner.__init__(self, network_uid, quorum_size) + + def receive_prepare(self, msg): + self.observe_proposal(msg.proposal_id) + return super(PaxosInstance, self).receive_prepare(msg) + + def receive_accept(self, msg): + self.observe_proposal(msg.proposal_id) + return super(PaxosInstance, self).receive_accept(msg) + + +class PaxosConsensusComponentModel(ComponentModel): + + def __init__(self, componentname, componentinstancenumber): + super().__init__(componentname, componentinstancenumber) + self.acceptor: Acceptor = None + self.proposer: Proposer = None + self.learner: Learner = None + self.client = None + self.cluster_size = 1 + + def on_message_from_bottom(self, eventobj: Event): + self.data_received_peer(eventobj.eventsource, eventobj.eventcontent) + + def on_message_from_top(self, eventobj: Event): + print("client asking for a trouble...") + self.data_received_client(eventobj.eventsource, eventobj.eventcontent) + + def data_received_peer(self, sender, message): + if isinstance(message, Prepare): + result_prepare = self.acceptor.receive_prepare(message) + self.send_to_component(sender, result_prepare) + elif isinstance(message, Promise): + result_promise = self.proposer.receive_promise(message) + if result_promise is not None: + self.broadcast_peers(result_promise) + + elif isinstance(message, NackAccept) or isinstance(message, NackPrepare): + result = self.proposer.receive_nack(message) + if result is not None: + self.broadcast_peers(result) + elif isinstance(message, Accept): + self.proposer.observe_proposal(message.proposal_id) + result = self.acceptor.receive_accept(message) + if result is not None: + self.broadcast_peers(result) + elif isinstance(message, Accepted): + result = self.learner.receive_accepted(message) + if result is not None: + self.broadcast_peers(result) + elif isinstance(message, Resolution): + if self.client is not None: + self.send(message) + + def data_received_client(self, client, message): + self.client = client + prep_message = self.proposer.prepare() + self.broadcast_peers(prep_message) + proposal = self.proposer.propose_value(message) + if proposal is not None: + self.broadcast_peers(proposal) + + def send(self, client, message): + client.send(message) + + def send_to_component(self, recipient, message): + if recipient != self: + for conn in self.connectors[ConnectorTypes.DOWN]: + if conn.componentinstancenumber.find(recipient.componentinstancenumber) != -1: + conn.trigger_event(Event(self, EventTypes.MFRT, message)) + + def broadcast_peers(self, message): + self.send_down(Event(self, EventTypes.MFRT, message)) + + def on_init(self, eventobj: Event): + self.cluster_size = len(self.registry.get_non_channel_components()) + self.acceptor = Acceptor(self, ProposalID(0, self.componentinstancenumber), None, None) + self.proposer = Proposer(self, self.cluster_size) + self.learner = Learner(self, self.cluster_size) diff --git a/Consensus/raft_component.py b/Consensus/Raft/raft_component.py similarity index 63% rename from Consensus/raft_component.py rename to Consensus/Raft/raft_component.py index fb3242a..7fcc712 100644 --- a/Consensus/raft_component.py +++ b/Consensus/Raft/raft_component.py @@ -48,34 +48,3 @@ class RaftConsensusComponent(ComponentModel): def on_init(self, eventobj: Event): self.state = Follower(server=self) - - -class ConsensusComponent(ComponentModel): - - def __init__(self, component_name, component_id): - super().__init__(component_name, component_id, 1) - self._commitIndex = 0 - self._currentTerm = 0 - self._lastApplied = 0 - self._lastLogIndex = 0 - self._lastLogTerm = None - self._state = Follower() - self._log = [] - - def set_log(self, log): - self._log = log - - def on_message(self, message): - state, response = self._state.on_message(message) - self._state = state - - def on_message_from_bottom(self, eventobj: Event): - print(f"{EventTypes.MFRT} {self.componentname}.{self.componentinstancenumber}") - self.on_message(eventobj) - - def send_message(self, message): - self.send_down(Event(self, EventTypes.MFRT, message)) - - # also means start election - def on_init(self, eventobj: Event): - self._state.set_server(self) diff --git a/Consensus/Raft/states.py b/Consensus/Raft/states.py index 671c9d9..38045bb 100644 --- a/Consensus/Raft/states.py +++ b/Consensus/Raft/states.py @@ -1,26 +1,18 @@ -import asyncio import logging import statistics import sys import threading -from random import randrange -from os.path import join -from threading import Thread - -from Ahc import ComponentRegistry from Consensus.Raft.log import LogManager + logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) logger = logging.getLogger(__name__) class State: - """Abstract state for subclassing.""" + def __init__(self, old_state=None, server=None): - """State is initialized passing an orchestator instance when first - deployed. Subsequent state changes use the old_state parameter to - preserve the environment. - """ + if old_state: self.server = old_state.server self.votedFor = old_state.votedFor @@ -35,8 +27,7 @@ class State: self.log = LogManager() def data_received_peer(self, peer, msg): - """Receive peer messages from orchestrator and pass them to the - appropriate method.""" + logger.debug('For component %s Received %s from %s', msg['type'], self.server.componentinstancenumber ,peer.componentinstancenumber) if self.currentTerm < msg['term']: @@ -53,14 +44,13 @@ class State: logger.info('For component %s Unrecognized message from %s: %s', self.server.componentinstancenumber,peer.componentinstancenumber, msg) def data_received_client(self, client, msg): - """Receive client messages from orchestrator and pass them to the - appropriate method.""" + method = getattr(self, 'on_client_' + msg['type'], None) if method: method(client, msg) def on_client_append(self, client, msg): - """Redirect client to leader upon receiving a client_append message.""" + msg = {'type': 'redirect', 'leader': self.leaderId} client.send(msg) @@ -68,29 +58,29 @@ class State: self.leaderId) def on_client_get(self, client, msg): - """Return state machine to client.""" + client.send(self.log) class Follower(State): - """Follower state.""" + def __init__(self, old_state=None, server=None): - """Initialize parent and start election timer.""" + super().__init__(old_state, server) self.votedFor = None self.restart_election_timer() def teardown(self): - """Stop timers before changing state.""" + self.election_timer.cancel() def restart_election_timer(self): - """Delays transition to the Candidate state by timer.""" + if hasattr(self, 'election_timer'): self.election_timer.cancel() - timeout = randrange(1, 4) + timeout = ord(self.server.componentinstancenumber) - ord('A') + 1 #loop = get_or_create_eventloop() #self.election_timer = loop. \ # call_later(timeout, self.server.change_state, Candidate) @@ -99,7 +89,7 @@ class Follower(State): logger.debug(' For component %s Election timer restarted: %s s', self.server.componentinstancenumber,timeout) def on_peer_request_vote(self, peer, msg): - """Grant this node's vote to Candidates.""" + term_is_current = msg['term'] >= self.currentTerm can_vote = self.votedFor is None index_is_current = (msg['lastLogTerm'] > self.log.term() or @@ -144,10 +134,10 @@ class Follower(State): class Candidate(Follower): - """Candidate state. Notice that this state subclasses Follower.""" + def __init__(self, old_state=None, server=None): - """Initialize parent, increase term, vote for self, ask for votes.""" + super().__init__(old_state, server) self.currentTerm += 1 self.votes_count = 0 @@ -164,7 +154,7 @@ class Candidate(Follower): def send_vote_requests(self): - """Ask peers for votes.""" + logger.info(' For component %s Broadcasting request_vote', self.server.componentinstancenumber) msg = {'type': 'request_vote', 'term': self.currentTerm, 'candidateId': self.votedFor, @@ -173,13 +163,13 @@ class Candidate(Follower): self.server.broadcast_peers(msg) def on_peer_append_entries(self, peer, msg): - """Transition back to Follower upon receiving an append_entries.""" + logger.debug('For component %s Converting to Follower', self.server.componentinstancenumber) self.server.change_state(Follower) self.server.state.on_peer_append_entries(peer, msg) def on_peer_response_vote(self, peer, msg): - """Register peers votes, transition to Leader upon majority vote.""" + self.votes_count += msg['voteGranted'] logger.info(' For component %s Vote count: %s', self.server.componentinstancenumber, self.votes_count) if self.votes_count > len(self.server.registry.get_non_channel_components()) / 2: @@ -187,11 +177,10 @@ class Candidate(Follower): class Leader(State): - """Leader state.""" + def __init__(self, old_state=None, server=None): - """Initialize parent, sets leader variables, start periodic - append_entries""" + super().__init__(old_state, server) logger.info('For component %s Leader of term: %s', self.server.componentinstancenumber,self.currentTerm) self.leaderId = self.server.componentinstancenumber @@ -211,7 +200,7 @@ class Leader(State): self.log.commit(self.log.index) def teardown(self): - """Stop timers before changing state.""" + self.append_timer.cancel() for clients in self.waiting_clients.values(): @@ -220,11 +209,7 @@ class Leader(State): logger.error('Sent unsuccessful response to client') def send_append_entries(self): - """Send append_entries to the cluster, containing: - - nothing: if remote node is up to date. - - compacted log: if remote node has to catch up. - - log entries: if available. - Finally schedules itself for later esecution.""" + cluster = self.server.registry.get_non_channel_components() for peer in cluster: if peer.componentinstancenumber == self.server.componentinstancenumber: @@ -242,15 +227,14 @@ class Leader(State): len(msg['entries']), peer.componentinstancenumber, self.nextIndex[peer.componentinstancenumber]) self.server.send_to_component(peer, msg) - timeout = randrange(1, 4) * 10 ** -1 + timeout = 1 #loop = get_or_create_eventloop() #self.append_timer = loop.call_later(timeout, self.send_append_entries) self.append_timer = threading.Timer(timeout, self.send_append_entries) self.append_timer.start() + def on_peer_response_append(self, peer, msg): - """Handle peer response to append_entries. - If successful RPC, try to commit new entries. - If RPC unsuccessful, backtrack.""" + if msg['success']: self.matchIndex[peer] = msg['matchIndex'] self.nextIndex[peer] = msg['matchIndex'] + 1 @@ -264,7 +248,7 @@ class Leader(State): self.nextIndex[peer.componentinstancenumber] = max(0, self.nextIndex[peer.componentinstancenumber] - 1) def on_client_append(self, client, msg): - """Append new entries to Leader log.""" + entry = {'term': self.currentTerm, 'data': msg['data']} self.log.append_entries([entry], self.log.index) @@ -277,7 +261,7 @@ class Leader(State): 'matchIndex': self.log.commitIndex}) def send_client_append_response(self): - """Respond to client upon commitment of log entries.""" + to_delete = [] for client_index, clients in self.waiting_clients.items(): if client_index <= self.log.commitIndex: diff --git a/tests/testpaxosconsensus.py b/tests/testpaxosconsensus.py new file mode 100644 index 0000000..ff199cf --- /dev/null +++ b/tests/testpaxosconsensus.py @@ -0,0 +1,39 @@ +import time + +import networkx as nx + +from Ahc import Topology +from Ahc import ComponentRegistry +from Channels import BasicLossyChannel +from Consensus.Paxos.paxos_component import PaxosConsensusComponentModel +from Consensus.Raft.raft_component import RaftConsensusComponent +from itertools import combinations + +registry = ComponentRegistry() + +class Client: + + def send(self, message): + print(tuple(message)) + + +def main(): + nodes = ['A', 'B', 'C', 'D', 'E'] + + edges = combinations(nodes, 2) + G = nx.Graph() + G.add_nodes_from(nodes) + G.add_edges_from(edges) + topo = Topology() + topo.construct_from_graph(G, PaxosConsensusComponentModel, BasicLossyChannel) + client = Client() + + topo.start() + time.sleep(2) + a_node: PaxosConsensusComponentModel = topo.nodes.get('A') + a_node.data_received_client(client, "message") + waitforit = input("hit something to exit...") + + +if __name__ == "__main__": + main() diff --git a/tests/testraftconsensus.py b/tests/testraftconsensus.py index ac7a146..53ab626 100644 --- a/tests/testraftconsensus.py +++ b/tests/testraftconsensus.py @@ -1,37 +1,15 @@ -import random import time -from threading import Thread -import matplotlib.pyplot as plt import networkx as nx -from Ahc import ComponentModel, Event, ConnectorTypes, Topology, EventTypes, GenericMessage, GenericMessageHeader +from Ahc import Topology from Ahc import ComponentRegistry -from Broadcasting.Broadcasting import ControlledFlooding -from Channels import P2PFIFOFairLossChannel, Channel, BasicLossyChannel -from Consensus.raft_component import ConsensusComponent, RaftConsensusComponent -from LinkLayers.GenericLinkLayer import LinkLayer +from Channels import BasicLossyChannel +from Consensus.Raft.raft_component import RaftConsensusComponent from itertools import combinations registry = ComponentRegistry() - -class AdHocNode(ComponentModel): - def on_message_from_top(self, eventobj: Event): - time.sleep(0.1) - print("received event from", eventobj.eventsource.componentinstancenumber) - self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent)) - - def on_message_from_bottom(self, eventobj: Event): - self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent + self.componentinstancenumber)) - - def __init__(self, componentname, componentid): - super().__init__(componentname, componentid) - - -# self.eventhandlers[EventTypes.MFRT] = self.onMessageFromTop -# self.eventhandlers["messagefromchannel"] = self.onMessageFromChannel - class Client: def send(self, message): @@ -40,8 +18,6 @@ class Client: def main(): nodes = ['A', 'B', 'C', 'D', 'E'] - #nodes = ['A', 'B', 'C'] - # nodes = ['A', 'B'] edges = combinations(nodes, 2) G = nx.Graph() @@ -56,12 +32,11 @@ def main(): a_node: RaftConsensusComponent = topo.nodes.get('A') cluster = a_node.registry.get_non_channel_components() a_node = topo.nodes.get(cluster[0].state.leaderId) - a_node.data_received_client(client, {'type':'append', 'data': { - 'key': 'hello', - 'value': 'hello',}}) - a_node.data_received_client(client, {'type':'append', 'data': { - 'key': 'hello', - 'value': 'hello',}}) + for i in range(10): + a_node.data_received_client(client, {'type':'append', 'data': { + 'key': i, + 'value': 'hello + '+str(i),}}) + time.sleep(1) waitforit = input("hit something to exit...") -- GitLab From 7b3049cdbea117b86c0543ea4564216969ef92d2 Mon Sep 17 00:00:00 2001 From: Ali Atli Date: Mon, 24 May 2021 03:18:33 +0300 Subject: [PATCH 3/5] windows please kill CRLF --- Consensus/Paxos/paxos_component.py | 888 ++++++++++++++--------------- Consensus/Raft/raft_component.py | 100 ++-- 2 files changed, 494 insertions(+), 494 deletions(-) diff --git a/Consensus/Paxos/paxos_component.py b/Consensus/Paxos/paxos_component.py index 9aff715..35253d5 100644 --- a/Consensus/Paxos/paxos_component.py +++ b/Consensus/Paxos/paxos_component.py @@ -1,444 +1,444 @@ - -import collections -import sys - -from Ahc import ComponentModel, Event, ConnectorTypes, EventTypes, ComponentRegistry -import logging -logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) - -logger = logging.getLogger(__name__) - -ProposalID = collections.namedtuple('ProposalID', ['number', 'uid']) - -class PaxosMessage(object): - ''' - Base class for all messages defined in this module - ''' - from_uid = None # Set by subclass constructor - - -class Prepare(PaxosMessage): - ''' - Prepare messages should be broadcast to all Acceptors. - ''' - - def __init__(self, from_uid, proposal_id): - self.from_uid = from_uid - self.proposal_id = proposal_id - - -class NackPrepare(PaxosMessage): - ''' - NACKs are technically optional though few practical applications will - want to omit their use. They are used to signal a proposer that their - current proposal number is out of date and that a new one should be - chosen. NACKs may be sent in response to both Prepare and Accept - messages - ''' - - def __init__(self, from_uid, proposer_uid, proposal_id, promised_proposal_id): - self.from_uid = from_uid - self.proposal_id = proposal_id - self.proposer_uid = proposer_uid - self.promised_proposal_id = promised_proposal_id - -class NackAccept(PaxosMessage): - ''' - NACKs are technically optional though few practical applications will - want to omit their use. They are used to signal a proposer that their - current proposal number is out of date and that a new one should be - chosen. NACKs may be sent in response to both Prepare and Accept - messages - ''' - - def __init__(self, from_uid, proposer_uid, proposal_id, promised_proposal_id): - self.from_uid = from_uid - self.proposal_id = proposal_id - self.proposer_uid = proposer_uid - self.promised_proposal_id = promised_proposal_id - -class Promise(PaxosMessage): - ''' - Promise messages should be sent to at least the Proposer specified in - the proposer_uid field - ''' - - def __init__(self, from_uid, proposer_uid, proposal_id, last_accepted_id, last_accepted_value): - self.from_uid = from_uid - self.proposer_uid = proposer_uid - self.proposal_id = proposal_id - self.last_accepted_id = last_accepted_id - self.last_accepted_value = last_accepted_value - - -class Accept(PaxosMessage): - ''' - Accept messages should be broadcast to all Acceptors - ''' - - def __init__(self, from_uid, proposal_id, proposal_value): - self.from_uid = from_uid - self.proposal_id = proposal_id - self.proposal_value = proposal_value - - -class Accepted(PaxosMessage): - ''' - Accepted messages should be sent to all Learners - ''' - - def __init__(self, from_uid, proposal_id, proposal_value): - self.from_uid = from_uid - self.proposal_id = proposal_id - self.proposal_value = proposal_value - - -class Resolution(PaxosMessage): - ''' - Optional message used to indicate that the final value has been selected - ''' - - def __init__(self, from_uid, value): - self.from_uid = from_uid - self.value = value - - -class InvalidMessageError(Exception): - ''' - Thrown if a PaxosMessage subclass is passed to a class that does not - support it - ''' - - -class MessageHandler(object): - - def receive(self, msg): - ''' - Message dispatching function. This function accepts any PaxosMessage subclass and calls - the appropriate handler function - ''' - handler = getattr(self, 'receive_' + msg.__class__.__name__.lower(), None) - if handler is None: - raise InvalidMessageError('Receiving class does not support messages of type: ' + msg.__class__.__name__) - return handler(msg) - - -class Proposer(MessageHandler): - ''' - The 'leader' attribute is a boolean value indicating the Proposer's - belief in whether or not it is the current leader. This is not a reliable - value as multiple nodes may simultaneously believe themselves to be the - leader. - ''' - - leader = False - proposed_value = None - proposal_id = ProposalID(0, ' ') - highest_accepted_id = ProposalID(0, ' ') - promises_received = None - nacks_received = None - current_prepare_msg = None - current_accept_msg = None - - def __init__(self, network_uid, quorum_size): - self.network_uid = network_uid - self.quorum_size = quorum_size - self.proposal_id = ProposalID(0, network_uid) - self.highest_proposal_id = ProposalID(0, network_uid) - - def propose_value(self, value): - ''' - Sets the proposal value for this node iff this node is not already aware of - a previous proposal value. If the node additionally believes itself to be - the current leader, an Accept message will be returned - ''' - if self.proposed_value is None: - self.proposed_value = value - - if self.leader: - self.current_accept_msg = Accept(self.network_uid, self.proposal_id, value) - return self.current_accept_msg - else: - return None - - else: - return None - - def prepare(self): - ''' - Returns a new Prepare message with a proposal id higher than - that of any observed proposals. A side effect of this method is - to clear the leader flag if it is currently set. - ''' - - self.leader = False - self.promises_received = set() - self.nacks_received = set() - self.proposal_id = ProposalID(self.highest_proposal_id.number + 1, self.network_uid) - self.highest_proposal_id = self.proposal_id - self.current_prepare_msg = Prepare(self.network_uid, self.proposal_id) - return self.current_prepare_msg - - def observe_proposal(self, proposal_id): - ''' - Optional method used to update the proposal counter as proposals are - seen on the network. When co-located with Acceptors and/or Learners, - this method may be used to avoid a message delay when attempting to - assume leadership (guaranteed NACK if the proposal number is too low). - This method is automatically called for all received Promise and Nack - messages. - ''' - if proposal_id > self.highest_proposal_id: - self.highest_proposal_id = proposal_id - - def receive_nack(self, msg): - ''' - Returns a new Prepare message if the number of Nacks received reaches - a quorum. - ''' - self.observe_proposal(msg.promised_proposal_id) - - if msg.proposal_id == self.proposal_id and self.nacks_received is not None: - self.nacks_received.add(msg.from_uid) - - if len(self.nacks_received) == self.quorum_size: - return self.prepare() # Lost leadership or failed to acquire it - - def receive_promise(self, msg): - ''' - Returns an Accept messages if a quorum of Promise messages is achieved - ''' - self.observe_proposal(msg.proposal_id) - logger.info("For component %s Promise received from %s", self.network_uid.componentinstancenumber, - msg.from_uid.componentinstancenumber) - if not self.leader and msg.proposal_id == self.proposal_id and msg.from_uid not in self.promises_received: - - self.promises_received.add(msg.from_uid) - - if msg.last_accepted_id is not None and msg.last_accepted_id > self.highest_accepted_id: - self.highest_accepted_id = msg.last_accepted_id - if msg.last_accepted_value is not None: - self.proposed_value = msg.last_accepted_value - # apart from self - if len(self.promises_received) == self.quorum_size: - self.leader = True - - if self.proposed_value is not None: - self.current_accept_msg = Accept(self.network_uid, self.proposal_id, self.proposed_value) - return self.current_accept_msg - - -class Acceptor(MessageHandler): - ''' - Acceptors act as the fault-tolerant memory for Paxos. To ensure correctness - in the presense of failure, Acceptors must be able to remember the promises - they've made even in the event of power outages. Consequently, any changes - to the promised_id, accepted_id, and/or accepted_value must be persisted to - stable media prior to sending promise and accepted messages. - - When an Acceptor instance is composed alongside a Proposer instance, it - is generally advantageous to call the proposer's observe_proposal() - method when methods of this class are called. - ''' - - def __init__(self, network_uid, promised_id=None, accepted_id=None, accepted_value=None): - ''' - promised_id, accepted_id, and accepted_value should be provided if and only if this - instance is recovering from persistent state. - ''' - self.network_uid = network_uid - self.promised_id = promised_id - self.accepted_id = accepted_id - self.accepted_value = accepted_value - - def receive_prepare(self, msg): - ''' - Returns either a Promise or a Nack in response. The Acceptor's state must be persisted to disk - prior to transmitting the Promise message. - ''' - if msg is None or msg.proposal_id is None: - return NackPrepare(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) - - if self.promised_id is None or msg.proposal_id >= self.promised_id: - self.promised_id = msg.proposal_id - return Promise(self.network_uid, msg.from_uid, self.promised_id, self.accepted_id, self.accepted_value) - else: - return NackPrepare(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) - - def receive_accept(self, msg): - ''' - Returns either an Accepted or Nack message in response. The Acceptor's state must be persisted - to disk prior to transmitting the Accepted message. - ''' - logger.info("For component %s Accept received from %s", self.network_uid.componentinstancenumber, - msg.from_uid.componentinstancenumber) - if msg is None or msg.proposal_id is None: - return NackAccept(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) - if self.promised_id is None or msg.proposal_id >= self.promised_id: - self.promised_id = msg.proposal_id - self.accepted_id = msg.proposal_id - self.accepted_value = msg.proposal_value - return Accepted(self.network_uid, msg.proposal_id, msg.proposal_value) - else: - return NackAccept(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) - - -class Learner(MessageHandler): - ''' - This class listens to Accepted messages, determines when the final value is - selected, and tracks which peers have accepted the final value. - ''' - - class ProposalStatus(object): - __slots__ = ['accept_count', 'retain_count', 'acceptors', 'value'] - - def __init__(self, value): - self.accept_count = 0 - self.retain_count = 0 - self.acceptors = set() - self.value = value - - def __init__(self, network_uid, quorum_size): - self.network_uid = network_uid - self.quorum_size = quorum_size - self.proposals = dict() # maps proposal_id => ProposalStatus - self.acceptors = dict() # maps from_uid => last_accepted_proposal_id - self.final_value = None - self.final_acceptors = None # Will be a set of acceptor UIDs once the final value is chosen - self.final_proposal_id = ProposalID(0, ' ') - - def receive_accepted(self, msg): - ''' - Called when an Accepted message is received from an acceptor. Once the final value - is determined, the return value of this method will be a Resolution message containing - the consentual value. Subsequent calls after the resolution is chosen will continue to add - new Acceptors to the final_acceptors set and return Resolution messages. - ''' - if self.final_value is not None: - if msg.proposal_id >= self.final_proposal_id and msg.proposal_value == self.final_value: - self.final_acceptors.add(msg.from_uid) - return Resolution(self.network_uid, self.final_value) - - last_pn = self.acceptors.get(msg.from_uid) - - if last_pn is not None and msg.proposal_id <= last_pn: - return # Old message - - self.acceptors[msg.from_uid] = msg.proposal_id - - if last_pn is not None: - ps = self.proposals[last_pn] - ps.retain_count -= 1 - ps.acceptors.remove(msg.from_uid) - if ps.retain_count == 0: - del self.proposals[last_pn] - - if not msg.proposal_id in self.proposals: - self.proposals[msg.proposal_id] = Learner.ProposalStatus(msg.proposal_value) - - ps = self.proposals[msg.proposal_id] - - assert msg.proposal_value == ps.value, 'Value mismatch for single proposal!' - - ps.accept_count += 1 - ps.retain_count += 1 - ps.acceptors.add(msg.from_uid) - - if ps.accept_count == self.quorum_size: - self.final_proposal_id = msg.proposal_id - self.final_value = msg.proposal_value - self.final_acceptors = ps.acceptors - self.proposals = None - self.acceptors = None - - return Resolution(self.network_uid, self.final_value) - - -class PaxosInstance(Proposer, Acceptor, Learner): - ''' - Aggregate Proposer, Accepter, & Learner class. - ''' - - def __init__(self, network_uid, quorum_size, promised_id=ProposalID(0, ' '), accepted_id=ProposalID(0, ' '), - accepted_value=None): - Proposer.__init__(self, network_uid, quorum_size) - Acceptor.__init__(self, network_uid, promised_id, accepted_id, accepted_value) - Learner.__init__(self, network_uid, quorum_size) - - def receive_prepare(self, msg): - self.observe_proposal(msg.proposal_id) - return super(PaxosInstance, self).receive_prepare(msg) - - def receive_accept(self, msg): - self.observe_proposal(msg.proposal_id) - return super(PaxosInstance, self).receive_accept(msg) - - -class PaxosConsensusComponentModel(ComponentModel): - - def __init__(self, componentname, componentinstancenumber): - super().__init__(componentname, componentinstancenumber) - self.acceptor: Acceptor = None - self.proposer: Proposer = None - self.learner: Learner = None - self.client = None - self.cluster_size = 1 - - def on_message_from_bottom(self, eventobj: Event): - self.data_received_peer(eventobj.eventsource, eventobj.eventcontent) - - def on_message_from_top(self, eventobj: Event): - print("client asking for a trouble...") - self.data_received_client(eventobj.eventsource, eventobj.eventcontent) - - def data_received_peer(self, sender, message): - if isinstance(message, Prepare): - result_prepare = self.acceptor.receive_prepare(message) - self.send_to_component(sender, result_prepare) - elif isinstance(message, Promise): - result_promise = self.proposer.receive_promise(message) - if result_promise is not None: - self.broadcast_peers(result_promise) - - elif isinstance(message, NackAccept) or isinstance(message, NackPrepare): - result = self.proposer.receive_nack(message) - if result is not None: - self.broadcast_peers(result) - elif isinstance(message, Accept): - self.proposer.observe_proposal(message.proposal_id) - result = self.acceptor.receive_accept(message) - if result is not None: - self.broadcast_peers(result) - elif isinstance(message, Accepted): - result = self.learner.receive_accepted(message) - if result is not None: - self.broadcast_peers(result) - elif isinstance(message, Resolution): - if self.client is not None: - self.send(message) - - def data_received_client(self, client, message): - self.client = client - prep_message = self.proposer.prepare() - self.broadcast_peers(prep_message) - proposal = self.proposer.propose_value(message) - if proposal is not None: - self.broadcast_peers(proposal) - - def send(self, client, message): - client.send(message) - - def send_to_component(self, recipient, message): - if recipient != self: - for conn in self.connectors[ConnectorTypes.DOWN]: - if conn.componentinstancenumber.find(recipient.componentinstancenumber) != -1: - conn.trigger_event(Event(self, EventTypes.MFRT, message)) - - def broadcast_peers(self, message): - self.send_down(Event(self, EventTypes.MFRT, message)) - - def on_init(self, eventobj: Event): - self.cluster_size = len(self.registry.get_non_channel_components()) - self.acceptor = Acceptor(self, ProposalID(0, self.componentinstancenumber), None, None) - self.proposer = Proposer(self, self.cluster_size) - self.learner = Learner(self, self.cluster_size) + +import collections +import sys + +from Ahc import ComponentModel, Event, ConnectorTypes, EventTypes, ComponentRegistry +import logging +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + +logger = logging.getLogger(__name__) + +ProposalID = collections.namedtuple('ProposalID', ['number', 'uid']) + +class PaxosMessage(object): + ''' + Base class for all messages defined in this module + ''' + from_uid = None # Set by subclass constructor + + +class Prepare(PaxosMessage): + ''' + Prepare messages should be broadcast to all Acceptors. + ''' + + def __init__(self, from_uid, proposal_id): + self.from_uid = from_uid + self.proposal_id = proposal_id + + +class NackPrepare(PaxosMessage): + ''' + NACKs are technically optional though few practical applications will + want to omit their use. They are used to signal a proposer that their + current proposal number is out of date and that a new one should be + chosen. NACKs may be sent in response to both Prepare and Accept + messages + ''' + + def __init__(self, from_uid, proposer_uid, proposal_id, promised_proposal_id): + self.from_uid = from_uid + self.proposal_id = proposal_id + self.proposer_uid = proposer_uid + self.promised_proposal_id = promised_proposal_id + +class NackAccept(PaxosMessage): + ''' + NACKs are technically optional though few practical applications will + want to omit their use. They are used to signal a proposer that their + current proposal number is out of date and that a new one should be + chosen. NACKs may be sent in response to both Prepare and Accept + messages + ''' + + def __init__(self, from_uid, proposer_uid, proposal_id, promised_proposal_id): + self.from_uid = from_uid + self.proposal_id = proposal_id + self.proposer_uid = proposer_uid + self.promised_proposal_id = promised_proposal_id + +class Promise(PaxosMessage): + ''' + Promise messages should be sent to at least the Proposer specified in + the proposer_uid field + ''' + + def __init__(self, from_uid, proposer_uid, proposal_id, last_accepted_id, last_accepted_value): + self.from_uid = from_uid + self.proposer_uid = proposer_uid + self.proposal_id = proposal_id + self.last_accepted_id = last_accepted_id + self.last_accepted_value = last_accepted_value + + +class Accept(PaxosMessage): + ''' + Accept messages should be broadcast to all Acceptors + ''' + + def __init__(self, from_uid, proposal_id, proposal_value): + self.from_uid = from_uid + self.proposal_id = proposal_id + self.proposal_value = proposal_value + + +class Accepted(PaxosMessage): + ''' + Accepted messages should be sent to all Learners + ''' + + def __init__(self, from_uid, proposal_id, proposal_value): + self.from_uid = from_uid + self.proposal_id = proposal_id + self.proposal_value = proposal_value + + +class Resolution(PaxosMessage): + ''' + Optional message used to indicate that the final value has been selected + ''' + + def __init__(self, from_uid, value): + self.from_uid = from_uid + self.value = value + + +class InvalidMessageError(Exception): + ''' + Thrown if a PaxosMessage subclass is passed to a class that does not + support it + ''' + + +class MessageHandler(object): + + def receive(self, msg): + ''' + Message dispatching function. This function accepts any PaxosMessage subclass and calls + the appropriate handler function + ''' + handler = getattr(self, 'receive_' + msg.__class__.__name__.lower(), None) + if handler is None: + raise InvalidMessageError('Receiving class does not support messages of type: ' + msg.__class__.__name__) + return handler(msg) + + +class Proposer(MessageHandler): + ''' + The 'leader' attribute is a boolean value indicating the Proposer's + belief in whether or not it is the current leader. This is not a reliable + value as multiple nodes may simultaneously believe themselves to be the + leader. + ''' + + leader = False + proposed_value = None + proposal_id = ProposalID(0, ' ') + highest_accepted_id = ProposalID(0, ' ') + promises_received = None + nacks_received = None + current_prepare_msg = None + current_accept_msg = None + + def __init__(self, network_uid, quorum_size): + self.network_uid = network_uid + self.quorum_size = quorum_size + self.proposal_id = ProposalID(0, network_uid) + self.highest_proposal_id = ProposalID(0, network_uid) + + def propose_value(self, value): + ''' + Sets the proposal value for this node iff this node is not already aware of + a previous proposal value. If the node additionally believes itself to be + the current leader, an Accept message will be returned + ''' + if self.proposed_value is None: + self.proposed_value = value + + if self.leader: + self.current_accept_msg = Accept(self.network_uid, self.proposal_id, value) + return self.current_accept_msg + else: + return None + + else: + return None + + def prepare(self): + ''' + Returns a new Prepare message with a proposal id higher than + that of any observed proposals. A side effect of this method is + to clear the leader flag if it is currently set. + ''' + + self.leader = False + self.promises_received = set() + self.nacks_received = set() + self.proposal_id = ProposalID(self.highest_proposal_id.number + 1, self.network_uid) + self.highest_proposal_id = self.proposal_id + self.current_prepare_msg = Prepare(self.network_uid, self.proposal_id) + return self.current_prepare_msg + + def observe_proposal(self, proposal_id): + ''' + Optional method used to update the proposal counter as proposals are + seen on the network. When co-located with Acceptors and/or Learners, + this method may be used to avoid a message delay when attempting to + assume leadership (guaranteed NACK if the proposal number is too low). + This method is automatically called for all received Promise and Nack + messages. + ''' + if proposal_id > self.highest_proposal_id: + self.highest_proposal_id = proposal_id + + def receive_nack(self, msg): + ''' + Returns a new Prepare message if the number of Nacks received reaches + a quorum. + ''' + self.observe_proposal(msg.promised_proposal_id) + + if msg.proposal_id == self.proposal_id and self.nacks_received is not None: + self.nacks_received.add(msg.from_uid) + + if len(self.nacks_received) == self.quorum_size: + return self.prepare() # Lost leadership or failed to acquire it + + def receive_promise(self, msg): + ''' + Returns an Accept messages if a quorum of Promise messages is achieved + ''' + self.observe_proposal(msg.proposal_id) + logger.info("For component %s Promise received from %s", self.network_uid.componentinstancenumber, + msg.from_uid.componentinstancenumber) + if not self.leader and msg.proposal_id == self.proposal_id and msg.from_uid not in self.promises_received: + + self.promises_received.add(msg.from_uid) + + if msg.last_accepted_id is not None and msg.last_accepted_id > self.highest_accepted_id: + self.highest_accepted_id = msg.last_accepted_id + if msg.last_accepted_value is not None: + self.proposed_value = msg.last_accepted_value + # apart from self + if len(self.promises_received) == self.quorum_size: + self.leader = True + + if self.proposed_value is not None: + self.current_accept_msg = Accept(self.network_uid, self.proposal_id, self.proposed_value) + return self.current_accept_msg + + +class Acceptor(MessageHandler): + ''' + Acceptors act as the fault-tolerant memory for Paxos. To ensure correctness + in the presense of failure, Acceptors must be able to remember the promises + they've made even in the event of power outages. Consequently, any changes + to the promised_id, accepted_id, and/or accepted_value must be persisted to + stable media prior to sending promise and accepted messages. + + When an Acceptor instance is composed alongside a Proposer instance, it + is generally advantageous to call the proposer's observe_proposal() + method when methods of this class are called. + ''' + + def __init__(self, network_uid, promised_id=None, accepted_id=None, accepted_value=None): + ''' + promised_id, accepted_id, and accepted_value should be provided if and only if this + instance is recovering from persistent state. + ''' + self.network_uid = network_uid + self.promised_id = promised_id + self.accepted_id = accepted_id + self.accepted_value = accepted_value + + def receive_prepare(self, msg): + ''' + Returns either a Promise or a Nack in response. The Acceptor's state must be persisted to disk + prior to transmitting the Promise message. + ''' + if msg is None or msg.proposal_id is None: + return NackPrepare(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) + + if self.promised_id is None or msg.proposal_id >= self.promised_id: + self.promised_id = msg.proposal_id + return Promise(self.network_uid, msg.from_uid, self.promised_id, self.accepted_id, self.accepted_value) + else: + return NackPrepare(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) + + def receive_accept(self, msg): + ''' + Returns either an Accepted or Nack message in response. The Acceptor's state must be persisted + to disk prior to transmitting the Accepted message. + ''' + logger.info("For component %s Accept received from %s", self.network_uid.componentinstancenumber, + msg.from_uid.componentinstancenumber) + if msg is None or msg.proposal_id is None: + return NackAccept(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) + if self.promised_id is None or msg.proposal_id >= self.promised_id: + self.promised_id = msg.proposal_id + self.accepted_id = msg.proposal_id + self.accepted_value = msg.proposal_value + return Accepted(self.network_uid, msg.proposal_id, msg.proposal_value) + else: + return NackAccept(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) + + +class Learner(MessageHandler): + ''' + This class listens to Accepted messages, determines when the final value is + selected, and tracks which peers have accepted the final value. + ''' + + class ProposalStatus(object): + __slots__ = ['accept_count', 'retain_count', 'acceptors', 'value'] + + def __init__(self, value): + self.accept_count = 0 + self.retain_count = 0 + self.acceptors = set() + self.value = value + + def __init__(self, network_uid, quorum_size): + self.network_uid = network_uid + self.quorum_size = quorum_size + self.proposals = dict() # maps proposal_id => ProposalStatus + self.acceptors = dict() # maps from_uid => last_accepted_proposal_id + self.final_value = None + self.final_acceptors = None # Will be a set of acceptor UIDs once the final value is chosen + self.final_proposal_id = ProposalID(0, ' ') + + def receive_accepted(self, msg): + ''' + Called when an Accepted message is received from an acceptor. Once the final value + is determined, the return value of this method will be a Resolution message containing + the consentual value. Subsequent calls after the resolution is chosen will continue to add + new Acceptors to the final_acceptors set and return Resolution messages. + ''' + if self.final_value is not None: + if msg.proposal_id >= self.final_proposal_id and msg.proposal_value == self.final_value: + self.final_acceptors.add(msg.from_uid) + return Resolution(self.network_uid, self.final_value) + + last_pn = self.acceptors.get(msg.from_uid) + + if last_pn is not None and msg.proposal_id <= last_pn: + return # Old message + + self.acceptors[msg.from_uid] = msg.proposal_id + + if last_pn is not None: + ps = self.proposals[last_pn] + ps.retain_count -= 1 + ps.acceptors.remove(msg.from_uid) + if ps.retain_count == 0: + del self.proposals[last_pn] + + if not msg.proposal_id in self.proposals: + self.proposals[msg.proposal_id] = Learner.ProposalStatus(msg.proposal_value) + + ps = self.proposals[msg.proposal_id] + + assert msg.proposal_value == ps.value, 'Value mismatch for single proposal!' + + ps.accept_count += 1 + ps.retain_count += 1 + ps.acceptors.add(msg.from_uid) + + if ps.accept_count == self.quorum_size: + self.final_proposal_id = msg.proposal_id + self.final_value = msg.proposal_value + self.final_acceptors = ps.acceptors + self.proposals = None + self.acceptors = None + + return Resolution(self.network_uid, self.final_value) + + +class PaxosInstance(Proposer, Acceptor, Learner): + ''' + Aggregate Proposer, Accepter, & Learner class. + ''' + + def __init__(self, network_uid, quorum_size, promised_id=ProposalID(0, ' '), accepted_id=ProposalID(0, ' '), + accepted_value=None): + Proposer.__init__(self, network_uid, quorum_size) + Acceptor.__init__(self, network_uid, promised_id, accepted_id, accepted_value) + Learner.__init__(self, network_uid, quorum_size) + + def receive_prepare(self, msg): + self.observe_proposal(msg.proposal_id) + return super(PaxosInstance, self).receive_prepare(msg) + + def receive_accept(self, msg): + self.observe_proposal(msg.proposal_id) + return super(PaxosInstance, self).receive_accept(msg) + + +class PaxosConsensusComponentModel(ComponentModel): + + def __init__(self, componentname, componentinstancenumber): + super().__init__(componentname, componentinstancenumber) + self.acceptor: Acceptor = None + self.proposer: Proposer = None + self.learner: Learner = None + self.client = None + self.cluster_size = 1 + + def on_message_from_bottom(self, eventobj: Event): + self.data_received_peer(eventobj.eventsource, eventobj.eventcontent) + + def on_message_from_top(self, eventobj: Event): + print("client asking for a trouble...") + self.data_received_client(eventobj.eventsource, eventobj.eventcontent) + + def data_received_peer(self, sender, message): + if isinstance(message, Prepare): + result_prepare = self.acceptor.receive_prepare(message) + self.send_to_component(sender, result_prepare) + elif isinstance(message, Promise): + result_promise = self.proposer.receive_promise(message) + if result_promise is not None: + self.broadcast_peers(result_promise) + + elif isinstance(message, NackAccept) or isinstance(message, NackPrepare): + result = self.proposer.receive_nack(message) + if result is not None: + self.broadcast_peers(result) + elif isinstance(message, Accept): + self.proposer.observe_proposal(message.proposal_id) + result = self.acceptor.receive_accept(message) + if result is not None: + self.broadcast_peers(result) + elif isinstance(message, Accepted): + result = self.learner.receive_accepted(message) + if result is not None: + self.broadcast_peers(result) + elif isinstance(message, Resolution): + if self.client is not None: + self.send(message) + + def data_received_client(self, client, message): + self.client = client + prep_message = self.proposer.prepare() + self.broadcast_peers(prep_message) + proposal = self.proposer.propose_value(message) + if proposal is not None: + self.broadcast_peers(proposal) + + def send(self, client, message): + client.send(message) + + def send_to_component(self, recipient, message): + if recipient != self: + for conn in self.connectors[ConnectorTypes.DOWN]: + if conn.componentinstancenumber.find(recipient.componentinstancenumber) != -1: + conn.trigger_event(Event(self, EventTypes.MFRT, message)) + + def broadcast_peers(self, message): + self.send_down(Event(self, EventTypes.MFRT, message)) + + def on_init(self, eventobj: Event): + self.cluster_size = len(self.registry.get_non_channel_components()) + self.acceptor = Acceptor(self, ProposalID(0, self.componentinstancenumber), None, None) + self.proposer = Proposer(self, self.cluster_size) + self.learner = Learner(self, self.cluster_size) diff --git a/Consensus/Raft/raft_component.py b/Consensus/Raft/raft_component.py index 7fcc712..ab47341 100644 --- a/Consensus/Raft/raft_component.py +++ b/Consensus/Raft/raft_component.py @@ -1,50 +1,50 @@ -import sys - -from Ahc import ComponentModel, EventTypes, ConnectorTypes -from Ahc import Event - -from Consensus.Raft.states import Follower -import logging -logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) - -logger = logging.getLogger(__name__) - - -class RaftConsensusComponent(ComponentModel): - - def __init__(self, componentname, componentinstancenumber): - super().__init__(componentname, componentinstancenumber, 1) - self.state = None - - def on_message_from_bottom(self, eventobj: Event): - self.data_received_peer(eventobj.eventsource, eventobj.eventcontent) - - def on_message_from_top(self, eventobj: Event): - print("client asking for a trouble...") - self.data_received_client(eventobj.eventsource, eventobj.eventcontent) - - def change_state(self, new_state): - self.state.teardown() - logger.info('For component %s State change:' + new_state.__name__, self.componentinstancenumber) - self.state = new_state(old_state=self.state) - - def data_received_peer(self, sender, message): - self.state.data_received_peer(sender, message) - - def data_received_client(self, client, message): - self.state.data_received_client(client, message) - - def send(self, client, message): - client.send(message) - - def send_to_component(self, recipient, message): - if recipient != self: - for conn in self.connectors[ConnectorTypes.DOWN]: - if conn.componentinstancenumber.find(recipient.componentinstancenumber) != -1: - conn.trigger_event(Event(self, EventTypes.MFRT, message)) - - def broadcast_peers(self, message): - self.send_down(Event(self, EventTypes.MFRT, message)) - - def on_init(self, eventobj: Event): - self.state = Follower(server=self) +import sys + +from Ahc import ComponentModel, EventTypes, ConnectorTypes +from Ahc import Event + +from Consensus.Raft.states import Follower +import logging +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + +logger = logging.getLogger(__name__) + + +class RaftConsensusComponent(ComponentModel): + + def __init__(self, componentname, componentinstancenumber): + super().__init__(componentname, componentinstancenumber, 1) + self.state = None + + def on_message_from_bottom(self, eventobj: Event): + self.data_received_peer(eventobj.eventsource, eventobj.eventcontent) + + def on_message_from_top(self, eventobj: Event): + print("client asking for a trouble...") + self.data_received_client(eventobj.eventsource, eventobj.eventcontent) + + def change_state(self, new_state): + self.state.teardown() + logger.info('For component %s State change:' + new_state.__name__, self.componentinstancenumber) + self.state = new_state(old_state=self.state) + + def data_received_peer(self, sender, message): + self.state.data_received_peer(sender, message) + + def data_received_client(self, client, message): + self.state.data_received_client(client, message) + + def send(self, client, message): + client.send(message) + + def send_to_component(self, recipient, message): + if recipient != self: + for conn in self.connectors[ConnectorTypes.DOWN]: + if conn.componentinstancenumber.find(recipient.componentinstancenumber) != -1: + conn.trigger_event(Event(self, EventTypes.MFRT, message)) + + def broadcast_peers(self, message): + self.send_down(Event(self, EventTypes.MFRT, message)) + + def on_init(self, eventobj: Event): + self.state = Follower(server=self) -- GitLab From f35778cdfc80b4877e609be3a8107444c5ac586d Mon Sep 17 00:00:00 2001 From: Ali Atli Date: Mon, 24 May 2021 23:37:00 +0300 Subject: [PATCH 4/5] paxos majority of the quorum --- Consensus/Paxos/paxos_component.py | 151 +++++------------------------ tests/testpaxosconsensus.py | 15 ++- 2 files changed, 37 insertions(+), 129 deletions(-) diff --git a/Consensus/Paxos/paxos_component.py b/Consensus/Paxos/paxos_component.py index 35253d5..31f530e 100644 --- a/Consensus/Paxos/paxos_component.py +++ b/Consensus/Paxos/paxos_component.py @@ -11,16 +11,12 @@ logger = logging.getLogger(__name__) ProposalID = collections.namedtuple('ProposalID', ['number', 'uid']) class PaxosMessage(object): - ''' - Base class for all messages defined in this module - ''' + from_uid = None # Set by subclass constructor class Prepare(PaxosMessage): - ''' - Prepare messages should be broadcast to all Acceptors. - ''' + def __init__(self, from_uid, proposal_id): self.from_uid = from_uid @@ -28,13 +24,7 @@ class Prepare(PaxosMessage): class NackPrepare(PaxosMessage): - ''' - NACKs are technically optional though few practical applications will - want to omit their use. They are used to signal a proposer that their - current proposal number is out of date and that a new one should be - chosen. NACKs may be sent in response to both Prepare and Accept - messages - ''' + def __init__(self, from_uid, proposer_uid, proposal_id, promised_proposal_id): self.from_uid = from_uid @@ -43,13 +33,6 @@ class NackPrepare(PaxosMessage): self.promised_proposal_id = promised_proposal_id class NackAccept(PaxosMessage): - ''' - NACKs are technically optional though few practical applications will - want to omit their use. They are used to signal a proposer that their - current proposal number is out of date and that a new one should be - chosen. NACKs may be sent in response to both Prepare and Accept - messages - ''' def __init__(self, from_uid, proposer_uid, proposal_id, promised_proposal_id): self.from_uid = from_uid @@ -58,10 +41,7 @@ class NackAccept(PaxosMessage): self.promised_proposal_id = promised_proposal_id class Promise(PaxosMessage): - ''' - Promise messages should be sent to at least the Proposer specified in - the proposer_uid field - ''' + def __init__(self, from_uid, proposer_uid, proposal_id, last_accepted_id, last_accepted_value): self.from_uid = from_uid @@ -72,9 +52,6 @@ class Promise(PaxosMessage): class Accept(PaxosMessage): - ''' - Accept messages should be broadcast to all Acceptors - ''' def __init__(self, from_uid, proposal_id, proposal_value): self.from_uid = from_uid @@ -83,9 +60,6 @@ class Accept(PaxosMessage): class Accepted(PaxosMessage): - ''' - Accepted messages should be sent to all Learners - ''' def __init__(self, from_uid, proposal_id, proposal_value): self.from_uid = from_uid @@ -94,9 +68,6 @@ class Accepted(PaxosMessage): class Resolution(PaxosMessage): - ''' - Optional message used to indicate that the final value has been selected - ''' def __init__(self, from_uid, value): self.from_uid = from_uid @@ -104,32 +75,11 @@ class Resolution(PaxosMessage): class InvalidMessageError(Exception): - ''' - Thrown if a PaxosMessage subclass is passed to a class that does not - support it - ''' - + pass -class MessageHandler(object): - def receive(self, msg): - ''' - Message dispatching function. This function accepts any PaxosMessage subclass and calls - the appropriate handler function - ''' - handler = getattr(self, 'receive_' + msg.__class__.__name__.lower(), None) - if handler is None: - raise InvalidMessageError('Receiving class does not support messages of type: ' + msg.__class__.__name__) - return handler(msg) - -class Proposer(MessageHandler): - ''' - The 'leader' attribute is a boolean value indicating the Proposer's - belief in whether or not it is the current leader. This is not a reliable - value as multiple nodes may simultaneously believe themselves to be the - leader. - ''' +class Proposer(object): leader = False proposed_value = None @@ -147,11 +97,7 @@ class Proposer(MessageHandler): self.highest_proposal_id = ProposalID(0, network_uid) def propose_value(self, value): - ''' - Sets the proposal value for this node iff this node is not already aware of - a previous proposal value. If the node additionally believes itself to be - the current leader, an Accept message will be returned - ''' + if self.proposed_value is None: self.proposed_value = value @@ -165,11 +111,6 @@ class Proposer(MessageHandler): return None def prepare(self): - ''' - Returns a new Prepare message with a proposal id higher than - that of any observed proposals. A side effect of this method is - to clear the leader flag if it is currently set. - ''' self.leader = False self.promises_received = set() @@ -180,37 +121,26 @@ class Proposer(MessageHandler): return self.current_prepare_msg def observe_proposal(self, proposal_id): - ''' - Optional method used to update the proposal counter as proposals are - seen on the network. When co-located with Acceptors and/or Learners, - this method may be used to avoid a message delay when attempting to - assume leadership (guaranteed NACK if the proposal number is too low). - This method is automatically called for all received Promise and Nack - messages. - ''' + if proposal_id > self.highest_proposal_id: self.highest_proposal_id = proposal_id def receive_nack(self, msg): - ''' - Returns a new Prepare message if the number of Nacks received reaches - a quorum. - ''' + logger.info("For component %s NACK received from %s", self.network_uid.componentinstancenumber, + msg.from_uid.componentinstancenumber) self.observe_proposal(msg.promised_proposal_id) if msg.proposal_id == self.proposal_id and self.nacks_received is not None: self.nacks_received.add(msg.from_uid) - if len(self.nacks_received) == self.quorum_size: + if len(self.nacks_received) == self.quorum_size-1: return self.prepare() # Lost leadership or failed to acquire it def receive_promise(self, msg): - ''' - Returns an Accept messages if a quorum of Promise messages is achieved - ''' - self.observe_proposal(msg.proposal_id) logger.info("For component %s Promise received from %s", self.network_uid.componentinstancenumber, msg.from_uid.componentinstancenumber) + self.observe_proposal(msg.proposal_id) + if not self.leader and msg.proposal_id == self.proposal_id and msg.from_uid not in self.promises_received: self.promises_received.add(msg.from_uid) @@ -220,7 +150,7 @@ class Proposer(MessageHandler): if msg.last_accepted_value is not None: self.proposed_value = msg.last_accepted_value # apart from self - if len(self.promises_received) == self.quorum_size: + if len(self.promises_received) == self.quorum_size-1: self.leader = True if self.proposed_value is not None: @@ -228,34 +158,18 @@ class Proposer(MessageHandler): return self.current_accept_msg -class Acceptor(MessageHandler): - ''' - Acceptors act as the fault-tolerant memory for Paxos. To ensure correctness - in the presense of failure, Acceptors must be able to remember the promises - they've made even in the event of power outages. Consequently, any changes - to the promised_id, accepted_id, and/or accepted_value must be persisted to - stable media prior to sending promise and accepted messages. - - When an Acceptor instance is composed alongside a Proposer instance, it - is generally advantageous to call the proposer's observe_proposal() - method when methods of this class are called. - ''' +class Acceptor(object): def __init__(self, network_uid, promised_id=None, accepted_id=None, accepted_value=None): - ''' - promised_id, accepted_id, and accepted_value should be provided if and only if this - instance is recovering from persistent state. - ''' + self.network_uid = network_uid self.promised_id = promised_id self.accepted_id = accepted_id self.accepted_value = accepted_value def receive_prepare(self, msg): - ''' - Returns either a Promise or a Nack in response. The Acceptor's state must be persisted to disk - prior to transmitting the Promise message. - ''' + logger.info("For component %s Prepare received from %s", self.network_uid.componentinstancenumber, + msg.from_uid.componentinstancenumber) if msg is None or msg.proposal_id is None: return NackPrepare(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) @@ -266,10 +180,7 @@ class Acceptor(MessageHandler): return NackPrepare(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) def receive_accept(self, msg): - ''' - Returns either an Accepted or Nack message in response. The Acceptor's state must be persisted - to disk prior to transmitting the Accepted message. - ''' + logger.info("For component %s Accept received from %s", self.network_uid.componentinstancenumber, msg.from_uid.componentinstancenumber) if msg is None or msg.proposal_id is None: @@ -283,11 +194,8 @@ class Acceptor(MessageHandler): return NackAccept(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) -class Learner(MessageHandler): - ''' - This class listens to Accepted messages, determines when the final value is - selected, and tracks which peers have accepted the final value. - ''' +class Learner(object): + class ProposalStatus(object): __slots__ = ['accept_count', 'retain_count', 'acceptors', 'value'] @@ -308,12 +216,8 @@ class Learner(MessageHandler): self.final_proposal_id = ProposalID(0, ' ') def receive_accepted(self, msg): - ''' - Called when an Accepted message is received from an acceptor. Once the final value - is determined, the return value of this method will be a Resolution message containing - the consentual value. Subsequent calls after the resolution is chosen will continue to add - new Acceptors to the final_acceptors set and return Resolution messages. - ''' + logger.info("For component %s AcceptED received from %s", self.network_uid.componentinstancenumber, + msg.from_uid.componentinstancenumber) if self.final_value is not None: if msg.proposal_id >= self.final_proposal_id and msg.proposal_value == self.final_value: self.final_acceptors.add(msg.from_uid) @@ -344,7 +248,7 @@ class Learner(MessageHandler): ps.retain_count += 1 ps.acceptors.add(msg.from_uid) - if ps.accept_count == self.quorum_size: + if ps.accept_count >= self.quorum_size/2: self.final_proposal_id = msg.proposal_id self.final_value = msg.proposal_value self.final_acceptors = ps.acceptors @@ -355,9 +259,6 @@ class Learner(MessageHandler): class PaxosInstance(Proposer, Acceptor, Learner): - ''' - Aggregate Proposer, Accepter, & Learner class. - ''' def __init__(self, network_uid, quorum_size, promised_id=ProposalID(0, ' '), accepted_id=ProposalID(0, ' '), accepted_value=None): @@ -415,7 +316,7 @@ class PaxosConsensusComponentModel(ComponentModel): self.broadcast_peers(result) elif isinstance(message, Resolution): if self.client is not None: - self.send(message) + self.send(self.client, message) def data_received_client(self, client, message): self.client = client @@ -425,7 +326,7 @@ class PaxosConsensusComponentModel(ComponentModel): if proposal is not None: self.broadcast_peers(proposal) - def send(self, client, message): + def send(self, client, message:Resolution): client.send(message) def send_to_component(self, recipient, message): diff --git a/tests/testpaxosconsensus.py b/tests/testpaxosconsensus.py index ff199cf..eb77b89 100644 --- a/tests/testpaxosconsensus.py +++ b/tests/testpaxosconsensus.py @@ -1,3 +1,4 @@ +import sys import time import networkx as nx @@ -5,16 +6,22 @@ import networkx as nx from Ahc import Topology from Ahc import ComponentRegistry from Channels import BasicLossyChannel -from Consensus.Paxos.paxos_component import PaxosConsensusComponentModel +from Consensus.Paxos.paxos_component import PaxosConsensusComponentModel, Resolution from Consensus.Raft.raft_component import RaftConsensusComponent from itertools import combinations +import logging +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + +logger = logging.getLogger(__name__) registry = ComponentRegistry() class Client: - def send(self, message): - print(tuple(message)) + def send(self, message:Resolution): + logger.info("For client Resolution message is received from component %s", + message.from_uid.componentinstancenumber) + logger.info("Client received new set value %s", message.value) def main(): @@ -31,7 +38,7 @@ def main(): topo.start() time.sleep(2) a_node: PaxosConsensusComponentModel = topo.nodes.get('A') - a_node.data_received_client(client, "message") + a_node.data_received_client(client, "Hello World!!!") waitforit = input("hit something to exit...") -- GitLab From 93723f35913b402582160576f210ceef3b75885c Mon Sep 17 00:00:00 2001 From: WINS Laboratory Date: Thu, 26 Aug 2021 14:26:46 +0300 Subject: [PATCH 5/5] Update Ahc.py --- Ahc.py | 53 +++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 4 deletions(-) diff --git a/Ahc.py b/Ahc.py index 70bd8f4..2ff80b9 100644 --- a/Ahc.py +++ b/Ahc.py @@ -34,6 +34,7 @@ import datetime import queue from enum import Enum from threading import Thread, Lock +from random import sample import matplotlib.pyplot as plt import networkx as nx @@ -82,10 +83,21 @@ class ConnectorTypes(Enum): UP = "UP" PEER = "PEER" +def auto_str(cls): + def __str__(self): + return '%s(%s)' % ( + type(self).__name__, + ', '.join('%s=%s' % item for item in vars(self).items()) + ) + cls.__str__ = __str__ + return cls + +@auto_str class GenericMessagePayload: def __init__(self, messagepayload): self.messagepayload = messagepayload +@auto_str class GenericMessageHeader: def __init__(self, messagetype, messagefrom, messageto, nexthop=float('inf'), interfaceid=float('inf'), sequencenumber=-1): self.messagetype = messagetype @@ -95,19 +107,37 @@ class GenericMessageHeader: self.interfaceid = interfaceid self.sequencenumber = sequencenumber +@auto_str class GenericMessage: def __init__(self, header, payload): self.header = header self.payload = payload self.uniqueid = str(header.messagefrom) + "-" + str(header.sequencenumber) +@auto_str class Event: - def __init__(self, eventsource, event, eventcontent, fromchannel=None): + curr_event_id = 0 + + def __init__(self, eventsource, event, eventcontent, fromchannel=None, + eventid=-1): self.eventsource = eventsource self.event = event self.time = datetime.datetime.now() self.eventcontent = eventcontent self.fromchannel = fromchannel + self.eventid = eventid + if self.eventid == -1: + self.eventid = self.curr_event_id + self.curr_event_id += 1 + + def __eq__(self, other) -> bool: + if type(other) is not Event: + return False + + return self.eventid == other.eventid + + def __hash__(self) -> int: + return self.eventid def singleton(cls): instance = [None] @@ -153,6 +183,7 @@ class ComponentRegistry: for p in connectedcmp: print(f"\t{i} {p.componentname}.{p.componentinstancenumber}") + def get_non_channel_components(self): res = [] for itemkey in self.components: @@ -168,7 +199,7 @@ class ComponentModel: terminated = False def on_init(self, eventobj: Event): - print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + # print(f"Initializing {self.componentname}.{self.componentinstancenumber}") pass def on_message_from_bottom(self, eventobj: Event): @@ -218,6 +249,16 @@ class ComponentModel: self.connectors[name] = channel connectornameforchannel = self.componentname + str(self.componentinstancenumber) channel.connect_me_to_component(connectornameforchannel, self) + self.on_connected_to_channel(name, channel) + + def on_connected_to_channel(self, name, channel): + print(f"Connected to channel: {name}:{channel.componentinstancenumber}") + + def on_pre_event(self, event): + pass + + def unique_name(self): + return f"{self.componentname}.{self.componentinstancenumber}" def terminate(self): self.terminated = True @@ -251,6 +292,7 @@ class ComponentModel: while not self.terminated: workitem = myqueue.get() if workitem.event in self.eventhandlers: + self.on_pre_event(workitem) self.eventhandlers[workitem.event](eventobj=workitem) # call the handler else: print(f"Event Handler: {workitem.event} is not implemented") @@ -324,7 +366,7 @@ class Topology: #mypath = path[i][j] # print(f"{i}to{j} path = {path[i][j]} nexthop = {path[i][j][1]}") #self.ForwardingTable[i][j] = path[i][j][1] - + # print(f"{i}to{j}path = NONE") #self.ForwardingTable[i][j] = inf # No paths #except IndexError: @@ -368,4 +410,7 @@ class Topology: nx.draw(self.G, self.nodepos, node_color=self.nodecolors, with_labels=True, font_weight='bold') plt.draw() print(self.nodecolors) - #self.lock.release() \ No newline at end of file + #self.lock.release() + + def get_random_node(self): + return self.nodes[sample(self.G.nodes(), 1)[0]] -- GitLab