diff --git a/Ahc.py b/Ahc.py index b2d548773c96c44c90ddf9c6564077306238592b..2ff80b97b8ddc0492821b350190da6c6730a7a35 100644 --- a/Ahc.py +++ b/Ahc.py @@ -34,6 +34,7 @@ import datetime import queue from enum import Enum from threading import Thread, Lock +from random import sample import matplotlib.pyplot as plt import networkx as nx @@ -82,10 +83,21 @@ class ConnectorTypes(Enum): UP = "UP" PEER = "PEER" +def auto_str(cls): + def __str__(self): + return '%s(%s)' % ( + type(self).__name__, + ', '.join('%s=%s' % item for item in vars(self).items()) + ) + cls.__str__ = __str__ + return cls + +@auto_str class GenericMessagePayload: def __init__(self, messagepayload): self.messagepayload = messagepayload +@auto_str class GenericMessageHeader: def __init__(self, messagetype, messagefrom, messageto, nexthop=float('inf'), interfaceid=float('inf'), sequencenumber=-1): self.messagetype = messagetype @@ -95,19 +107,37 @@ class GenericMessageHeader: self.interfaceid = interfaceid self.sequencenumber = sequencenumber +@auto_str class GenericMessage: def __init__(self, header, payload): self.header = header self.payload = payload self.uniqueid = str(header.messagefrom) + "-" + str(header.sequencenumber) +@auto_str class Event: - def __init__(self, eventsource, event, eventcontent, fromchannel=None): + curr_event_id = 0 + + def __init__(self, eventsource, event, eventcontent, fromchannel=None, + eventid=-1): self.eventsource = eventsource self.event = event self.time = datetime.datetime.now() self.eventcontent = eventcontent self.fromchannel = fromchannel + self.eventid = eventid + if self.eventid == -1: + self.eventid = self.curr_event_id + self.curr_event_id += 1 + + def __eq__(self, other) -> bool: + if type(other) is not Event: + return False + + return self.eventid == other.eventid + + def __hash__(self) -> int: + return self.eventid def singleton(cls): instance = [None] @@ -153,6 +183,16 @@ class ComponentRegistry: for p in connectedcmp: print(f"\t{i} {p.componentname}.{p.componentinstancenumber}") + + def get_non_channel_components(self): + res = [] + for itemkey in self.components: + cmp = self.components[itemkey] + if cmp.componentname.find("Channel") != -1: + continue + res.append(cmp) + return res + registry = ComponentRegistry() class ComponentModel: @@ -209,6 +249,16 @@ class ComponentModel: self.connectors[name] = channel connectornameforchannel = self.componentname + str(self.componentinstancenumber) channel.connect_me_to_component(connectornameforchannel, self) + self.on_connected_to_channel(name, channel) + + def on_connected_to_channel(self, name, channel): + print(f"Connected to channel: {name}:{channel.componentinstancenumber}") + + def on_pre_event(self, event): + pass + + def unique_name(self): + return f"{self.componentname}.{self.componentinstancenumber}" def terminate(self): self.terminated = True @@ -242,6 +292,7 @@ class ComponentModel: while not self.terminated: workitem = myqueue.get() if workitem.event in self.eventhandlers: + self.on_pre_event(workitem) self.eventhandlers[workitem.event](eventobj=workitem) # call the handler else: print(f"Event Handler: {workitem.event} is not implemented") @@ -315,7 +366,7 @@ class Topology: #mypath = path[i][j] # print(f"{i}to{j} path = {path[i][j]} nexthop = {path[i][j][1]}") #self.ForwardingTable[i][j] = path[i][j][1] - + # print(f"{i}to{j}path = NONE") #self.ForwardingTable[i][j] = inf # No paths #except IndexError: @@ -360,3 +411,6 @@ class Topology: plt.draw() print(self.nodecolors) #self.lock.release() + + def get_random_node(self): + return self.nodes[sample(self.G.nodes(), 1)[0]] diff --git a/Channels.py b/Channels.py index a6e9bbd2bd74c43b2f86d21833f171255b41a6c8..49f45e6ed82b8d2d9b115c1e670cce29faa980c4 100644 --- a/Channels.py +++ b/Channels.py @@ -75,6 +75,30 @@ class Channel(ComponentModel): t.start() t1.start() + +class BasicLossyChannel(Channel): + def __init__(self, componentname, componentinstancenum, loss_percentage=0): + super().__init__(componentname, componentinstancenum,) + self.loss_percentage = loss_percentage + # Overwrite onDeliverToComponent if you want to do something in the last pipeline stage + # onDeliver will deliver the message from the channel to the receiver component using messagefromchannel event + def on_deliver_to_component(self, eventobj: Event): + callername = eventobj.eventsource.componentinstancenumber + for item in self.connectors: + callees = self.connectors[item] + for callee in callees: + calleename = callee.componentinstancenumber + # print(f"I am connected to {calleename}. Will check if I have to distribute it to {item}") + if calleename == callername: + pass + else: + randomnum = random.uniform(0, 1) + # print("random number here is " , randomnum, " for ", self.componentinstancenumber) + if randomnum >= self.loss_percentage: + myevent = Event(eventobj.eventsource, EventTypes.MFRB, eventobj.eventcontent, self.componentinstancenumber) + callee.trigger_event(myevent) + + class AHCChannelError(Exception): pass diff --git a/Consensus/Paxos/__init__.py b/Consensus/Paxos/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/Consensus/Paxos/paxos_component.py b/Consensus/Paxos/paxos_component.py new file mode 100644 index 0000000000000000000000000000000000000000..31f530e62a37a7a7c1cfd94ff5472ecda6336c0e --- /dev/null +++ b/Consensus/Paxos/paxos_component.py @@ -0,0 +1,345 @@ + +import collections +import sys + +from Ahc import ComponentModel, Event, ConnectorTypes, EventTypes, ComponentRegistry +import logging +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + +logger = logging.getLogger(__name__) + +ProposalID = collections.namedtuple('ProposalID', ['number', 'uid']) + +class PaxosMessage(object): + + from_uid = None # Set by subclass constructor + + +class Prepare(PaxosMessage): + + + def __init__(self, from_uid, proposal_id): + self.from_uid = from_uid + self.proposal_id = proposal_id + + +class NackPrepare(PaxosMessage): + + + def __init__(self, from_uid, proposer_uid, proposal_id, promised_proposal_id): + self.from_uid = from_uid + self.proposal_id = proposal_id + self.proposer_uid = proposer_uid + self.promised_proposal_id = promised_proposal_id + +class NackAccept(PaxosMessage): + + def __init__(self, from_uid, proposer_uid, proposal_id, promised_proposal_id): + self.from_uid = from_uid + self.proposal_id = proposal_id + self.proposer_uid = proposer_uid + self.promised_proposal_id = promised_proposal_id + +class Promise(PaxosMessage): + + + def __init__(self, from_uid, proposer_uid, proposal_id, last_accepted_id, last_accepted_value): + self.from_uid = from_uid + self.proposer_uid = proposer_uid + self.proposal_id = proposal_id + self.last_accepted_id = last_accepted_id + self.last_accepted_value = last_accepted_value + + +class Accept(PaxosMessage): + + def __init__(self, from_uid, proposal_id, proposal_value): + self.from_uid = from_uid + self.proposal_id = proposal_id + self.proposal_value = proposal_value + + +class Accepted(PaxosMessage): + + def __init__(self, from_uid, proposal_id, proposal_value): + self.from_uid = from_uid + self.proposal_id = proposal_id + self.proposal_value = proposal_value + + +class Resolution(PaxosMessage): + + def __init__(self, from_uid, value): + self.from_uid = from_uid + self.value = value + + +class InvalidMessageError(Exception): + pass + + + +class Proposer(object): + + leader = False + proposed_value = None + proposal_id = ProposalID(0, ' ') + highest_accepted_id = ProposalID(0, ' ') + promises_received = None + nacks_received = None + current_prepare_msg = None + current_accept_msg = None + + def __init__(self, network_uid, quorum_size): + self.network_uid = network_uid + self.quorum_size = quorum_size + self.proposal_id = ProposalID(0, network_uid) + self.highest_proposal_id = ProposalID(0, network_uid) + + def propose_value(self, value): + + if self.proposed_value is None: + self.proposed_value = value + + if self.leader: + self.current_accept_msg = Accept(self.network_uid, self.proposal_id, value) + return self.current_accept_msg + else: + return None + + else: + return None + + def prepare(self): + + self.leader = False + self.promises_received = set() + self.nacks_received = set() + self.proposal_id = ProposalID(self.highest_proposal_id.number + 1, self.network_uid) + self.highest_proposal_id = self.proposal_id + self.current_prepare_msg = Prepare(self.network_uid, self.proposal_id) + return self.current_prepare_msg + + def observe_proposal(self, proposal_id): + + if proposal_id > self.highest_proposal_id: + self.highest_proposal_id = proposal_id + + def receive_nack(self, msg): + logger.info("For component %s NACK received from %s", self.network_uid.componentinstancenumber, + msg.from_uid.componentinstancenumber) + self.observe_proposal(msg.promised_proposal_id) + + if msg.proposal_id == self.proposal_id and self.nacks_received is not None: + self.nacks_received.add(msg.from_uid) + + if len(self.nacks_received) == self.quorum_size-1: + return self.prepare() # Lost leadership or failed to acquire it + + def receive_promise(self, msg): + logger.info("For component %s Promise received from %s", self.network_uid.componentinstancenumber, + msg.from_uid.componentinstancenumber) + self.observe_proposal(msg.proposal_id) + + if not self.leader and msg.proposal_id == self.proposal_id and msg.from_uid not in self.promises_received: + + self.promises_received.add(msg.from_uid) + + if msg.last_accepted_id is not None and msg.last_accepted_id > self.highest_accepted_id: + self.highest_accepted_id = msg.last_accepted_id + if msg.last_accepted_value is not None: + self.proposed_value = msg.last_accepted_value + # apart from self + if len(self.promises_received) == self.quorum_size-1: + self.leader = True + + if self.proposed_value is not None: + self.current_accept_msg = Accept(self.network_uid, self.proposal_id, self.proposed_value) + return self.current_accept_msg + + +class Acceptor(object): + + def __init__(self, network_uid, promised_id=None, accepted_id=None, accepted_value=None): + + self.network_uid = network_uid + self.promised_id = promised_id + self.accepted_id = accepted_id + self.accepted_value = accepted_value + + def receive_prepare(self, msg): + logger.info("For component %s Prepare received from %s", self.network_uid.componentinstancenumber, + msg.from_uid.componentinstancenumber) + if msg is None or msg.proposal_id is None: + return NackPrepare(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) + + if self.promised_id is None or msg.proposal_id >= self.promised_id: + self.promised_id = msg.proposal_id + return Promise(self.network_uid, msg.from_uid, self.promised_id, self.accepted_id, self.accepted_value) + else: + return NackPrepare(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) + + def receive_accept(self, msg): + + logger.info("For component %s Accept received from %s", self.network_uid.componentinstancenumber, + msg.from_uid.componentinstancenumber) + if msg is None or msg.proposal_id is None: + return NackAccept(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) + if self.promised_id is None or msg.proposal_id >= self.promised_id: + self.promised_id = msg.proposal_id + self.accepted_id = msg.proposal_id + self.accepted_value = msg.proposal_value + return Accepted(self.network_uid, msg.proposal_id, msg.proposal_value) + else: + return NackAccept(self.network_uid, msg.from_uid, msg.proposal_id, self.promised_id) + + +class Learner(object): + + + class ProposalStatus(object): + __slots__ = ['accept_count', 'retain_count', 'acceptors', 'value'] + + def __init__(self, value): + self.accept_count = 0 + self.retain_count = 0 + self.acceptors = set() + self.value = value + + def __init__(self, network_uid, quorum_size): + self.network_uid = network_uid + self.quorum_size = quorum_size + self.proposals = dict() # maps proposal_id => ProposalStatus + self.acceptors = dict() # maps from_uid => last_accepted_proposal_id + self.final_value = None + self.final_acceptors = None # Will be a set of acceptor UIDs once the final value is chosen + self.final_proposal_id = ProposalID(0, ' ') + + def receive_accepted(self, msg): + logger.info("For component %s AcceptED received from %s", self.network_uid.componentinstancenumber, + msg.from_uid.componentinstancenumber) + if self.final_value is not None: + if msg.proposal_id >= self.final_proposal_id and msg.proposal_value == self.final_value: + self.final_acceptors.add(msg.from_uid) + return Resolution(self.network_uid, self.final_value) + + last_pn = self.acceptors.get(msg.from_uid) + + if last_pn is not None and msg.proposal_id <= last_pn: + return # Old message + + self.acceptors[msg.from_uid] = msg.proposal_id + + if last_pn is not None: + ps = self.proposals[last_pn] + ps.retain_count -= 1 + ps.acceptors.remove(msg.from_uid) + if ps.retain_count == 0: + del self.proposals[last_pn] + + if not msg.proposal_id in self.proposals: + self.proposals[msg.proposal_id] = Learner.ProposalStatus(msg.proposal_value) + + ps = self.proposals[msg.proposal_id] + + assert msg.proposal_value == ps.value, 'Value mismatch for single proposal!' + + ps.accept_count += 1 + ps.retain_count += 1 + ps.acceptors.add(msg.from_uid) + + if ps.accept_count >= self.quorum_size/2: + self.final_proposal_id = msg.proposal_id + self.final_value = msg.proposal_value + self.final_acceptors = ps.acceptors + self.proposals = None + self.acceptors = None + + return Resolution(self.network_uid, self.final_value) + + +class PaxosInstance(Proposer, Acceptor, Learner): + + def __init__(self, network_uid, quorum_size, promised_id=ProposalID(0, ' '), accepted_id=ProposalID(0, ' '), + accepted_value=None): + Proposer.__init__(self, network_uid, quorum_size) + Acceptor.__init__(self, network_uid, promised_id, accepted_id, accepted_value) + Learner.__init__(self, network_uid, quorum_size) + + def receive_prepare(self, msg): + self.observe_proposal(msg.proposal_id) + return super(PaxosInstance, self).receive_prepare(msg) + + def receive_accept(self, msg): + self.observe_proposal(msg.proposal_id) + return super(PaxosInstance, self).receive_accept(msg) + + +class PaxosConsensusComponentModel(ComponentModel): + + def __init__(self, componentname, componentinstancenumber): + super().__init__(componentname, componentinstancenumber) + self.acceptor: Acceptor = None + self.proposer: Proposer = None + self.learner: Learner = None + self.client = None + self.cluster_size = 1 + + def on_message_from_bottom(self, eventobj: Event): + self.data_received_peer(eventobj.eventsource, eventobj.eventcontent) + + def on_message_from_top(self, eventobj: Event): + print("client asking for a trouble...") + self.data_received_client(eventobj.eventsource, eventobj.eventcontent) + + def data_received_peer(self, sender, message): + if isinstance(message, Prepare): + result_prepare = self.acceptor.receive_prepare(message) + self.send_to_component(sender, result_prepare) + elif isinstance(message, Promise): + result_promise = self.proposer.receive_promise(message) + if result_promise is not None: + self.broadcast_peers(result_promise) + + elif isinstance(message, NackAccept) or isinstance(message, NackPrepare): + result = self.proposer.receive_nack(message) + if result is not None: + self.broadcast_peers(result) + elif isinstance(message, Accept): + self.proposer.observe_proposal(message.proposal_id) + result = self.acceptor.receive_accept(message) + if result is not None: + self.broadcast_peers(result) + elif isinstance(message, Accepted): + result = self.learner.receive_accepted(message) + if result is not None: + self.broadcast_peers(result) + elif isinstance(message, Resolution): + if self.client is not None: + self.send(self.client, message) + + def data_received_client(self, client, message): + self.client = client + prep_message = self.proposer.prepare() + self.broadcast_peers(prep_message) + proposal = self.proposer.propose_value(message) + if proposal is not None: + self.broadcast_peers(proposal) + + def send(self, client, message:Resolution): + client.send(message) + + def send_to_component(self, recipient, message): + if recipient != self: + for conn in self.connectors[ConnectorTypes.DOWN]: + if conn.componentinstancenumber.find(recipient.componentinstancenumber) != -1: + conn.trigger_event(Event(self, EventTypes.MFRT, message)) + + def broadcast_peers(self, message): + self.send_down(Event(self, EventTypes.MFRT, message)) + + def on_init(self, eventobj: Event): + self.cluster_size = len(self.registry.get_non_channel_components()) + self.acceptor = Acceptor(self, ProposalID(0, self.componentinstancenumber), None, None) + self.proposer = Proposer(self, self.cluster_size) + self.learner = Learner(self, self.cluster_size) diff --git a/Consensus/Raft/__init__.py b/Consensus/Raft/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/Consensus/Raft/log.py b/Consensus/Raft/log.py new file mode 100644 index 0000000000000000000000000000000000000000..65c64b0480c1def830c28175f112da7de31aa3c5 --- /dev/null +++ b/Consensus/Raft/log.py @@ -0,0 +1,57 @@ +import logging + +logger = logging.getLogger(__name__) + + +class LogManager: + """Instantiate and manage the components of the "Log" subsystem. + That is: the log, the compactor and the state machine.""" + + def __init__(self): + self.log = [] + self.commitIndex = len(self.log) + + def __getitem__(self, index): + """Get item or slice from the log, based on absolute log indexes. + Item(s) already compacted cannot be requested.""" + if type(index) is slice: + start = index.start + stop = index.stop + return self.log[start:stop:index.step] + elif type(index) is int: + return self.log[index] + + @property + def index(self): + """Log tip index.""" + return len(self.log) + + def term(self, index=None): + """Return a term given a log index. If no index is passed, return + log tip term.""" + if index is None: + return self.term(self.index) + elif index == -1: + return 0 + elif not len(self.log): + return 0 + else: + return self.log[index]['term'] + + def append_entries(self, entries, prevLogIndex): + start = prevLogIndex + if len(self.log) >= start: + del self.log[:start] + self.log.extend(entries) + else: + self.log.extend(entries) + if entries: + logger.debug('Appending. New log: %s', self.log) + + def commit(self, leaderCommit): + if leaderCommit <= self.commitIndex: + return + + self.commitIndex = min(leaderCommit, self.index) # no overshoots + logger.debug('Advancing commit to %s', self.commitIndex) + # above is the actual commit operation, just incrementing the counter! diff --git a/Consensus/Raft/raft_component.py b/Consensus/Raft/raft_component.py new file mode 100644 index 0000000000000000000000000000000000000000..ab4734125de278b2e7626c4a2062559d346ae484 --- /dev/null +++ b/Consensus/Raft/raft_component.py @@ -0,0 +1,50 @@ +import sys + +from Ahc import ComponentModel, EventTypes, ConnectorTypes +from Ahc import Event + +from Consensus.Raft.states import Follower +import logging +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + +logger = logging.getLogger(__name__) + + +class RaftConsensusComponent(ComponentModel): + + def __init__(self, componentname, componentinstancenumber): + super().__init__(componentname, componentinstancenumber, 1) + self.state = None + + def on_message_from_bottom(self, eventobj: Event): + self.data_received_peer(eventobj.eventsource, eventobj.eventcontent) + + def on_message_from_top(self, eventobj: Event): + print("client asking for a trouble...") + self.data_received_client(eventobj.eventsource, eventobj.eventcontent) + + def change_state(self, new_state): + self.state.teardown() + logger.info('For component %s State change:' + new_state.__name__, self.componentinstancenumber) + self.state = new_state(old_state=self.state) + + def data_received_peer(self, sender, message): + self.state.data_received_peer(sender, message) + + def data_received_client(self, client, message): + self.state.data_received_client(client, message) + + def send(self, client, message): + client.send(message) + + def send_to_component(self, recipient, message): + if recipient != self: + for conn in self.connectors[ConnectorTypes.DOWN]: + if conn.componentinstancenumber.find(recipient.componentinstancenumber) != -1: + conn.trigger_event(Event(self, EventTypes.MFRT, message)) + + def broadcast_peers(self, message): + self.send_down(Event(self, EventTypes.MFRT, message)) + + def on_init(self, eventobj: Event): + self.state = Follower(server=self) diff --git a/Consensus/Raft/states.py b/Consensus/Raft/states.py new file mode 100644 index 0000000000000000000000000000000000000000..38045bb7db62cd44b0241bbd2182035b44dfccce --- /dev/null +++ b/Consensus/Raft/states.py @@ -0,0 +1,273 @@ +import logging +import statistics +import sys +import threading +from Consensus.Raft.log import LogManager + +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + +logger = logging.getLogger(__name__) + +class State: + + + def __init__(self, old_state=None, server=None): + + if old_state: + self.server = old_state.server + self.votedFor = old_state.votedFor + self.currentTerm = old_state.currentTerm + self.leaderId = old_state.leaderId + self.log = old_state.log + else: + self.server = server + self.votedFor = None + self.currentTerm = 0 + self.leaderId = None + self.log = LogManager() + + def data_received_peer(self, peer, msg): + + logger.debug('For component %s Received %s from %s', msg['type'], self.server.componentinstancenumber ,peer.componentinstancenumber) + + if self.currentTerm < msg['term']: + self.currentTerm = msg['term'] + if not type(self) is Follower: + logger.info('For component %s Remote term is higher, converting to Follower', self.server.componentinstancenumber) + self.server.change_state(Follower) + self.server.state.data_received_peer(peer, msg) + return + method = getattr(self, 'on_peer_' + msg['type'], None) + if method: + method(peer, msg) + else: + logger.info('For component %s Unrecognized message from %s: %s', self.server.componentinstancenumber,peer.componentinstancenumber, msg) + + def data_received_client(self, client, msg): + + method = getattr(self, 'on_client_' + msg['type'], None) + if method: + method(client, msg) + + def on_client_append(self, client, msg): + + msg = {'type': 'redirect', + 'leader': self.leaderId} + client.send(msg) + logger.debug('Redirect client %s to leader', + self.leaderId) + + def on_client_get(self, client, msg): + + client.send(self.log) + + +class Follower(State): + + + def __init__(self, old_state=None, server=None): + + super().__init__(old_state, server) + self.votedFor = None + self.restart_election_timer() + + def teardown(self): + + self.election_timer.cancel() + + def restart_election_timer(self): + + if hasattr(self, 'election_timer'): + self.election_timer.cancel() + + timeout = ord(self.server.componentinstancenumber) - ord('A') + 1 + #loop = get_or_create_eventloop() + #self.election_timer = loop. \ + # call_later(timeout, self.server.change_state, Candidate) + self.election_timer = threading.Timer(timeout, self.server.change_state, (Candidate,)) + self.election_timer.start() + logger.debug(' For component %s Election timer restarted: %s s', self.server.componentinstancenumber,timeout) + + def on_peer_request_vote(self, peer, msg): + + term_is_current = msg['term'] >= self.currentTerm + can_vote = self.votedFor is None + index_is_current = (msg['lastLogTerm'] > self.log.term() or + (msg['lastLogTerm'] == self.log.term() and + msg['lastLogIndex'] >= self.log.index)) + granted = term_is_current and can_vote and index_is_current + + if granted: + self.votedFor = msg['candidateId'] + self.restart_election_timer() + + logger.debug('For component %s Voting for %s. Term:%s Vote:%s Index:%s', self.server.componentinstancenumber, + peer.componentinstancenumber, term_is_current, can_vote, index_is_current) + + response = {'type': 'response_vote', 'voteGranted': granted, + 'term': self.currentTerm} + self.server.send_to_component(peer, response) + + def on_peer_append_entries(self, peer, msg): + + self.restart_election_timer() + + term_is_current = msg['term'] >= self.currentTerm + prev_log_term_match = msg['prevLogTerm'] is None or \ + self.log.term(msg['prevLogIndex']) == msg['prevLogTerm'] + success = term_is_current and prev_log_term_match + + if success: + self.log.append_entries(msg['entries'], msg['prevLogIndex']) + self.log.commit(msg['leaderCommit']) + self.leaderId = msg['leaderId'] + logger.debug('For component %s Log index is now %s', self.server.componentinstancenumber, self.log.index) + else: + logger.warning('For component %s Could not append entries. cause: %s', self.server.componentinstancenumber, + 'wrong\ + term' if not term_is_current else 'prev log term mismatch') + + resp = {'type': 'response_append', 'success': success, + 'term': self.currentTerm, + 'matchIndex': self.log.index} + self.server.send_to_component(peer, resp) + + +class Candidate(Follower): + + + def __init__(self, old_state=None, server=None): + + super().__init__(old_state, server) + self.currentTerm += 1 + self.votes_count = 0 + logger.info('New Election. Term: %s', self.currentTerm) + #TODO: put it in seperate thread + def vote_self(): + self.votedFor = self.server.componentinstancenumber + self.on_peer_response_vote( + self.votedFor, {'voteGranted': True}) + + vote_self() + self.send_vote_requests() + + + + def send_vote_requests(self): + + logger.info(' For component %s Broadcasting request_vote', self.server.componentinstancenumber) + msg = {'type': 'request_vote', 'term': self.currentTerm, + 'candidateId': self.votedFor, + 'lastLogIndex': self.log.index, + 'lastLogTerm': self.log.term()} + self.server.broadcast_peers(msg) + + def on_peer_append_entries(self, peer, msg): + + logger.debug('For component %s Converting to Follower', self.server.componentinstancenumber) + self.server.change_state(Follower) + self.server.state.on_peer_append_entries(peer, msg) + + def on_peer_response_vote(self, peer, msg): + + self.votes_count += msg['voteGranted'] + logger.info(' For component %s Vote count: %s', self.server.componentinstancenumber, self.votes_count) + if self.votes_count > len(self.server.registry.get_non_channel_components()) / 2: + self.server.change_state(Leader) + + +class Leader(State): + + + def __init__(self, old_state=None, server=None): + + super().__init__(old_state, server) + logger.info('For component %s Leader of term: %s', self.server.componentinstancenumber,self.currentTerm) + self.leaderId = self.server.componentinstancenumber + cluster = self.server.registry.get_non_channel_components() + cluster_names = [c.componentinstancenumber for c in cluster] + self.matchIndex = {p: 0 for p in cluster_names} + self.nextIndex = {p: self.log.commitIndex + 1 for p in self.matchIndex} + self.waiting_clients = {} + self.send_append_entries() + + self.log.append_entries([{'term': self.currentTerm, + 'data': { + 'key': 'leaderId', + 'value': self.server.componentinstancenumber, + }}], + self.log.index) + self.log.commit(self.log.index) + + def teardown(self): + + self.append_timer.cancel() + + for clients in self.waiting_clients.values(): + for client in clients: + client.send({'type': 'result', 'success': False}) + logger.error('Sent unsuccessful response to client') + + def send_append_entries(self): + + cluster = self.server.registry.get_non_channel_components() + for peer in cluster: + if peer.componentinstancenumber == self.server.componentinstancenumber: + continue + msg = {'type': 'append_entries', + 'term': self.currentTerm, + 'leaderCommit': self.log.commitIndex, + 'leaderId': self.server.componentinstancenumber, + 'prevLogIndex': self.nextIndex[peer.componentinstancenumber] - 1, + 'entries': self.log.log + } + msg.update({'prevLogTerm': self.log.term(msg['prevLogIndex'])}) + + logger.debug(' For component %s Sending %s entries to %s. Start index %s',self.server.componentinstancenumber, + len(msg['entries']), peer.componentinstancenumber, self.nextIndex[peer.componentinstancenumber]) + self.server.send_to_component(peer, msg) + + timeout = 1 + #loop = get_or_create_eventloop() + #self.append_timer = loop.call_later(timeout, self.send_append_entries) + self.append_timer = threading.Timer(timeout, self.send_append_entries) + self.append_timer.start() + + def on_peer_response_append(self, peer, msg): + + if msg['success']: + self.matchIndex[peer] = msg['matchIndex'] + self.nextIndex[peer] = msg['matchIndex'] + 1 + + self.matchIndex[self.server.componentinstancenumber] = self.log.index + self.nextIndex[self.server.componentinstancenumber] = self.log.index + 1 + index = statistics.median_low(self.matchIndex.values()) + self.log.commit(index) + self.send_client_append_response() + else: + self.nextIndex[peer.componentinstancenumber] = max(0, self.nextIndex[peer.componentinstancenumber] - 1) + + def on_client_append(self, client, msg): + + entry = {'term': self.currentTerm, 'data': msg['data']} + + self.log.append_entries([entry], self.log.index) + if self.log.index in self.waiting_clients: + self.waiting_clients[self.log.index].append(client) + else: + self.waiting_clients[self.log.index] = [client] + self.on_peer_response_append( + self.server.componentinstancenumber, {'success': True, + 'matchIndex': self.log.commitIndex}) + + def send_client_append_response(self): + + to_delete = [] + for client_index, clients in self.waiting_clients.items(): + if client_index <= self.log.commitIndex: + for client in clients: + client.send({'type': 'result', 'success': True}) # TODO + logger.debug('Sent successful response to client') + to_delete.append(client_index) + for index in to_delete: + del self.waiting_clients[index] diff --git a/Consensus/__init__.py b/Consensus/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/test.py b/tests/test.py index 7b217f88cfae231293ccb0c11c325f61c40e7400..91f96dbd76c8fa6a0c4aaeb9113c7948cde8fb55 100644 --- a/tests/test.py +++ b/tests/test.py @@ -14,120 +14,130 @@ from NetworkLayers.AllSeeingEyeNetworkLayer import AllSeingEyeNetworkLayer registry = ComponentRegistry() + # define your own message types class ApplicationLayerMessageTypes(Enum): - PROPOSE = "PROPOSE" - ACCEPT = "ACCEPT" + PROPOSE = "PROPOSE" + ACCEPT = "ACCEPT" + # define your own message header structure class ApplicationLayerMessageHeader(GenericMessageHeader): - pass + pass + # define your own message payload structure class ApplicationLayerMessagePayload(GenericMessagePayload): - pass + pass + class ApplicationLayerComponent(ComponentModel): - def on_init(self, eventobj: Event): - print(f"Initializing {self.componentname}.{self.componentinstancenumber}") - - if self.componentinstancenumber == 0: - # destination = random.randint(len(Topology.G.nodes)) - destination = 1 - hdr = ApplicationLayerMessageHeader(ApplicationLayerMessageTypes.PROPOSE, self.componentinstancenumber, - destination) - payload = ApplicationLayerMessagePayload("23") - proposalmessage = GenericMessage(hdr, payload) - randdelay = random.randint(0, 5) - time.sleep(randdelay) - self.send_self(Event(self, "propose", proposalmessage)) - else: - pass - - def on_message_from_bottom(self, eventobj: Event): - try: - applmessage = eventobj.eventcontent - hdr = applmessage.header - if hdr.messagetype == ApplicationLayerMessageTypes.ACCEPT: - print(f"Node-{self.componentinstancenumber} says Node-{hdr.messagefrom} has sent {hdr.messagetype} message") - elif hdr.messagetype == ApplicationLayerMessageTypes.PROPOSE: - print(f"Node-{self.componentinstancenumber} says Node-{hdr.messagefrom} has sent {hdr.messagetype} message") - except AttributeError: - print("Attribute Error") - - # print(f"{self.componentname}.{self.componentinstancenumber}: Gotton message {eventobj.content} ") - # value = eventobj.content.value - # value += 1 - # newmsg = MessageContent( value ) - # myevent = Event( self, "agree", newmsg ) - # self.trigger_event(myevent) - - def on_propose(self, eventobj: Event): - destination = 1 - hdr = ApplicationLayerMessageHeader(ApplicationLayerMessageTypes.ACCEPT, self.componentinstancenumber, destination) - payload = ApplicationLayerMessagePayload("23") - proposalmessage = GenericMessage(hdr, payload) - self.send_down(Event(self, EventTypes.MFRT, proposalmessage)) - - def on_agree(self, eventobj: Event): - print(f"Agreed on {eventobj.eventcontent}") - - def on_timer_expired(self, eventobj: Event): - pass + def on_init(self, eventobj: Event): + print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + + if self.componentinstancenumber == 0: + # destination = random.randint(len(Topology.G.nodes)) + destination = 1 + hdr = ApplicationLayerMessageHeader(ApplicationLayerMessageTypes.PROPOSE, self.componentinstancenumber, + destination) + payload = ApplicationLayerMessagePayload("23") + proposalmessage = GenericMessage(hdr, payload) + randdelay = random.randint(0, 5) + time.sleep(randdelay) + self.send_self(Event(self, "propose", proposalmessage)) + else: + pass + + def on_message_from_bottom(self, eventobj: Event): + try: + applmessage = eventobj.eventcontent + hdr = applmessage.header + if hdr.messagetype == ApplicationLayerMessageTypes.ACCEPT: + print( + f"Node-{self.componentinstancenumber} says Node-{hdr.messagefrom} has sent {hdr.messagetype} message") + elif hdr.messagetype == ApplicationLayerMessageTypes.PROPOSE: + print( + f"Node-{self.componentinstancenumber} says Node-{hdr.messagefrom} has sent {hdr.messagetype} message") + except AttributeError: + print("Attribute Error") + + # print(f"{self.componentname}.{self.componentinstancenumber}: Gotton message {eventobj.content} ") + # value = eventobj.content.value + # value += 1 + # newmsg = MessageContent( value ) + # myevent = Event( self, "agree", newmsg ) + # self.trigger_event(myevent) + + def on_propose(self, eventobj: Event): + destination = 1 + hdr = ApplicationLayerMessageHeader(ApplicationLayerMessageTypes.ACCEPT, self.componentinstancenumber, + destination) + payload = ApplicationLayerMessagePayload("23") + proposalmessage = GenericMessage(hdr, payload) + self.send_down(Event(self, EventTypes.MFRT, proposalmessage)) + + def on_agree(self, eventobj: Event): + print(f"Agreed on {eventobj.eventcontent}") + + def on_timer_expired(self, eventobj: Event): + pass + + def __init__(self, componentname, componentinstancenumber): + super().__init__(componentname, componentinstancenumber) + self.eventhandlers["propose"] = self.on_propose + self.eventhandlers["agree"] = self.on_agree + self.eventhandlers["timerexpired"] = self.on_timer_expired - def __init__(self, componentname, componentinstancenumber): - super().__init__(componentname, componentinstancenumber) - self.eventhandlers["propose"] = self.on_propose - self.eventhandlers["agree"] = self.on_agree - self.eventhandlers["timerexpired"] = self.on_timer_expired class AdHocNode(ComponentModel): - def on_init(self, eventobj: Event): - print(f"Initializing {self.componentname}.{self.componentinstancenumber}") + def on_init(self, eventobj: Event): + print(f"Initializing {self.componentname}.{self.componentinstancenumber}") - def on_message_from_top(self, eventobj: Event): - self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent)) + def on_message_from_top(self, eventobj: Event): + self.send_down(Event(self, EventTypes.MFRT, eventobj.eventcontent)) - def on_message_from_bottom(self, eventobj: Event): - self.send_up(Event(self, EventTypes.MFRB, eventobj.eventcontent)) + def on_message_from_bottom(self, eventobj: Event): + self.send_up(Event(self, EventTypes.MFRB, eventobj.eventcontent)) - def __init__(self, componentname, componentid): - # SUBCOMPONENTS - self.appllayer = ApplicationLayerComponent("ApplicationLayer", componentid) - self.netlayer = AllSeingEyeNetworkLayer("NetworkLayer", componentid) - self.linklayer = LinkLayer("LinkLayer", componentid) - # self.failuredetect = GenericFailureDetector("FailureDetector", componentid) + def __init__(self, componentname, componentid): + # SUBCOMPONENTS + self.appllayer = ApplicationLayerComponent("ApplicationLayer", componentid) + self.netlayer = AllSeingEyeNetworkLayer("NetworkLayer", componentid) + self.linklayer = LinkLayer("LinkLayer", componentid) + # self.failuredetect = GenericFailureDetector("FailureDetector", componentid) - # CONNECTIONS AMONG SUBCOMPONENTS - self.appllayer.connect_me_to_component(ConnectorTypes.DOWN, self.netlayer) - # self.failuredetect.connectMeToComponent(PortNames.DOWN, self.netlayer) - self.netlayer.connect_me_to_component(ConnectorTypes.UP, self.appllayer) - # self.netlayer.connectMeToComponent(PortNames.UP, self.failuredetect) - self.netlayer.connect_me_to_component(ConnectorTypes.DOWN, self.linklayer) - self.linklayer.connect_me_to_component(ConnectorTypes.UP, self.netlayer) + # CONNECTIONS AMONG SUBCOMPONENTS + self.appllayer.connect_me_to_component(ConnectorTypes.DOWN, self.netlayer) + # self.failuredetect.connectMeToComponent(PortNames.DOWN, self.netlayer) + self.netlayer.connect_me_to_component(ConnectorTypes.UP, self.appllayer) + # self.netlayer.connectMeToComponent(PortNames.UP, self.failuredetect) + self.netlayer.connect_me_to_component(ConnectorTypes.DOWN, self.linklayer) + self.linklayer.connect_me_to_component(ConnectorTypes.UP, self.netlayer) - # Connect the bottom component to the composite component.... - self.linklayer.connect_me_to_component(ConnectorTypes.DOWN, self) - self.connect_me_to_component(ConnectorTypes.UP, self.linklayer) + # Connect the bottom component to the composite component.... + self.linklayer.connect_me_to_component(ConnectorTypes.DOWN, self) + self.connect_me_to_component(ConnectorTypes.UP, self.linklayer) + + super().__init__(componentname, componentid) - super().__init__(componentname, componentid) def main(): - # G = nx.Graph() - # G.add_nodes_from([1, 2]) - # G.add_edges_from([(1, 2)]) - # nx.draw(G, with_labels=True, font_weight='bold') - # plt.draw() - G = nx.random_geometric_graph(19, 0.5) - nx.draw(G, with_labels=True, font_weight='bold') - plt.draw() + # G = nx.Graph() + # G.add_nodes_from([1, 2]) + # G.add_edges_from([(1, 2)]) + # nx.draw(G, with_labels=True, font_weight='bold') + # plt.draw() + G = nx.random_geometric_graph(19, 0.5) + nx.draw(G, with_labels=True, font_weight='bold') + plt.draw() + + topo = Topology() + topo.construct_from_graph(G, AdHocNode, P2PFIFOPerfectChannel) + topo.start() - topo = Topology() - topo.construct_from_graph(G, AdHocNode, P2PFIFOPerfectChannel) - topo.start() + plt.show() # while (True): pass - plt.show() # while (True): pass if __name__ == "__main__": - main() + main() diff --git a/tests/testpaxosconsensus.py b/tests/testpaxosconsensus.py new file mode 100644 index 0000000000000000000000000000000000000000..eb77b894980e89c2cc5ba692a990b445cade5dc3 --- /dev/null +++ b/tests/testpaxosconsensus.py @@ -0,0 +1,46 @@ +import sys +import time + +import networkx as nx + +from Ahc import Topology +from Ahc import ComponentRegistry +from Channels import BasicLossyChannel +from Consensus.Paxos.paxos_component import PaxosConsensusComponentModel, Resolution +from Consensus.Raft.raft_component import RaftConsensusComponent +from itertools import combinations +import logging +logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) + +logger = logging.getLogger(__name__) + +registry = ComponentRegistry() + +class Client: + + def send(self, message:Resolution): + logger.info("For client Resolution message is received from component %s", + message.from_uid.componentinstancenumber) + logger.info("Client received new set value %s", message.value) + + +def main(): + nodes = ['A', 'B', 'C', 'D', 'E'] + + edges = combinations(nodes, 2) + G = nx.Graph() + G.add_nodes_from(nodes) + G.add_edges_from(edges) + topo = Topology() + topo.construct_from_graph(G, PaxosConsensusComponentModel, BasicLossyChannel) + client = Client() + + topo.start() + time.sleep(2) + a_node: PaxosConsensusComponentModel = topo.nodes.get('A') + a_node.data_received_client(client, "Hello World!!!") + waitforit = input("hit something to exit...") + + +if __name__ == "__main__": + main() diff --git a/tests/testraftconsensus.py b/tests/testraftconsensus.py new file mode 100644 index 0000000000000000000000000000000000000000..53ab62661b1fbfb91140a02f20322d7c6c64b2da --- /dev/null +++ b/tests/testraftconsensus.py @@ -0,0 +1,44 @@ +import time + +import networkx as nx + +from Ahc import Topology +from Ahc import ComponentRegistry +from Channels import BasicLossyChannel +from Consensus.Raft.raft_component import RaftConsensusComponent +from itertools import combinations + +registry = ComponentRegistry() + +class Client: + + def send(self, message): + print(tuple(message)) + + +def main(): + nodes = ['A', 'B', 'C', 'D', 'E'] + + edges = combinations(nodes, 2) + G = nx.Graph() + G.add_nodes_from(nodes) + G.add_edges_from(edges) + topo = Topology() + topo.construct_from_graph(G, RaftConsensusComponent, BasicLossyChannel) + client = Client() + + topo.start() + time.sleep(5) + a_node: RaftConsensusComponent = topo.nodes.get('A') + cluster = a_node.registry.get_non_channel_components() + a_node = topo.nodes.get(cluster[0].state.leaderId) + for i in range(10): + a_node.data_received_client(client, {'type':'append', 'data': { + 'key': i, + 'value': 'hello + '+str(i),}}) + time.sleep(1) + waitforit = input("hit something to exit...") + + +if __name__ == "__main__": + main()