From 348b657371b5e1f68b867b435798f0b63c1a6b3e Mon Sep 17 00:00:00 2001 From: Mehmet Ali Eken <75133551+MaliEken@users.noreply.github.com> Date: Tue, 25 May 2021 15:54:24 +0300 Subject: [PATCH] Add files via upload Chand-Toueg and Nakamoto consensus implementation --- ChandraConsensus.py | 301 +++++++++++++++++++++++++++++++++++++++++++ NakamotoConsensus.py | 252 ++++++++++++++++++++++++++++++++++++ 2 files changed, 553 insertions(+) create mode 100644 ChandraConsensus.py create mode 100644 NakamotoConsensus.py diff --git a/ChandraConsensus.py b/ChandraConsensus.py new file mode 100644 index 0000000..fb7ce0f --- /dev/null +++ b/ChandraConsensus.py @@ -0,0 +1,301 @@ +from enum import Enum +from threading import Timer +import random +from Ahc import Topology, ComponentRegistry, ComponentModel, ConnectorTypes, Event, GenericMessageHeader, \ + GenericMessage, EventTypes +from Channels import Channel + +registry = ComponentRegistry() +class ChandraMessageTypes(Enum): + PRP = "propose" + EST = "estimate" + ACK = "acknowledge" + NACK = "negacknowledge" + DCD = "decide" + IS_ALIVE = "iscoordinatoralive" + +class ChandraPhases(Enum): + IDLE = "idle" + PRP_PHASE = "proposalphase" + EST_PHASE = "estimatephase" + ACK_PHASE = "ackphase" + DCD_PHASE = "decisionphase" + + +class RepeatTimer(Timer): + def run(self): + while not self.finished.wait(self.interval): + self.function(*self.args, **self.kwargs) + + +def selectCoordinator(): + cmp = get_coordinator() + crdname = cmp.componentname + crdinstancenumber = (cmp.componentinstancenumber + 1) % cmp.numberofNodes + registry.get_component_by_key(crdname, crdinstancenumber).set_as_coordinator() + cmp.set_as_proposer() + +def start_next_round(): + print(f"No agreement. Next round coming up..") + selectCoordinator() + print(f"New coordinator is {get_coordinator().componentname}-{get_coordinator().componentinstancenumber}") + cnodelist = list() + for itemkey in registry.components: + cmp = registry.components[itemkey] + if cmp.componentname == "ChandraNode": + cnodelist.append(cmp) + #print(f"{cmp.componentname}-{cmp.componentinstancenumber}={type(cmp)}") + for cmp in cnodelist: + cmp.phase = ChandraPhases.IDLE + cmp.faulty = random.randrange(100) > 95 + cmp.timer.cancel() + cmp.timer = RepeatTimer(0.5, cmp.phase_handler) + cmp.timer.start() + cmp.start = True + +def get_coordinator(): + cnodelist= list() + for itemkey in registry.components: + cmp = registry.components[itemkey] + if cmp.componentname == "ChandraNode": + cnodelist.append(cmp) + + for n in range(0,len(cnodelist)): + if cnodelist[n].is_coordinator(): + return cnodelist[n] + +class ChandraComponents(ComponentModel): + def __init__(self, componentname, componentinstancenumber, numberofNodes): + super().__init__(componentname, componentinstancenumber) + self.phasehandlers = {ChandraPhases.IDLE: self.on_idle_phase, ChandraPhases.PRP_PHASE: self.on_proposal_phase, + ChandraPhases.EST_PHASE: self.on_estimate_phase, + ChandraPhases.ACK_PHASE: self.on_ACK_phase, + ChandraPhases.DCD_PHASE: self.on_decide_phase} + self.numberofNodes = numberofNodes + self.isCoordinator = False + + self.start = False + self.phase = ChandraPhases.IDLE + self.timer = RepeatTimer(0.5, self.phase_handler) + self.count_ACK = 0 + self.count_proposal = 0 + + self.value = random.randint(0, 3) + self.timestamp = random.randint(0, 5) + self.estimate = (self.value, self.timestamp) + + self.faulty = random.randrange(100) > 95 # Randomise faulty detection of coordinator + + + + print(f"Initial values for {self.componentname}-{self.componentinstancenumber} : " + f"(Value,Timestamp)=({self.value},{self.timestamp})") + + def on_init(self, eventobj: Event): + self.start = True + self.timer.start() + + def on_message_from_bottom(self, eventobj: Event): + message = eventobj.eventcontent + msgtype = message.header.messagetype + msgfrom = message.header.messagefrom + + messagePayload = message.payload + value, timestamp = [int(n) for n in messagePayload.split("-")] + + if msgtype is ChandraMessageTypes.PRP: + print(f"{msgfrom.componentname}-{msgfrom.componentinstancenumber} sent an proposal message to " + f"{self.componentname}-{self.componentinstancenumber} = ({value},{timestamp})") + self.count_proposal = self.count_proposal + 1 + if timestamp > self.estimate[1]: + self.estimate = (value, timestamp) + + elif msgtype is ChandraMessageTypes.EST: + self.estimate = (value, timestamp) + print(f"{msgfrom.componentname}-{msgfrom.componentinstancenumber} sent an estimate message to " + f"{self.componentname}-{self.componentinstancenumber} = ({value},{timestamp})") + + + elif msgtype is ChandraMessageTypes.ACK: + print(f"{msgfrom.componentname}-{msgfrom.componentinstancenumber} sent an ACK message to " + f"{self.componentname}-{self.componentinstancenumber}") + self.count_ACK = self.count_ACK + 1 + + elif msgtype is ChandraMessageTypes.NACK: + print(f"{msgfrom.componentname}-{msgfrom.componentinstancenumber} sent an NACK message to " + f"{self.componentname}-{self.componentinstancenumber}") + self.phase = ChandraPhases.IDLE + start_next_round() + + # change the new value with the decision taken by coordinator + elif msgtype is ChandraMessageTypes.DCD: + self.value = value + self.timestamp = timestamp + print(f"Decision message is arrived. The new values and time stamps of the component " + f"{self.componentname}-{self.componentinstancenumber} = ({value},{timestamp})") + + elif msgtype is ChandraMessageTypes.FDI: + # FailureDetector message + pass + + def phase_handler(self): + # print(f"{self.componentname}.{self.componentinstancenumber} previous phase is {self.phase}") + self.phasehandlers[self.phase]() + # print(f"{self.componentname}.{self.componentinstancenumber} next phase is {self.phase}") + + def on_idle_phase(self): + if self.start: + self.phase = ChandraPhases.PRP_PHASE + else: + self.timer.cancel() + + def on_proposal_phase(self): + self.phase = ChandraPhases.EST_PHASE + if not self.is_coordinator(): + self.send_propose_message() + else: + pass + + def on_estimate_phase(self): + self.phase = ChandraPhases.ACK_PHASE + if self.is_coordinator(): + if self.count_proposal >= (len(registry.components) + 1) / 2: + print(f"The estimate of coordinator is: {self.estimate}") + self.send_estimate_message() + else: + start_next_round() + else: + pass + + def on_ACK_phase(self): + self.phase = ChandraPhases.DCD_PHASE + if not self.is_coordinator(): + if not self.faulty: + self.send_ACK() + else: + self.send_NACK() + else: + pass + + def on_decide_phase(self): + self.phase = ChandraPhases.IDLE + self.start = False + if self.is_coordinator(): + if self.count_ACK >= (len(registry.components) + 1) / 2: + self.send_decide_message() + else: + start_next_round() + else: + pass + + def send_propose_message(self): + coordinator = get_coordinator() + if self is not coordinator: + cHeader = GenericMessageHeader(ChandraMessageTypes.PRP, self, coordinator) + cPayload = f"{self.value}-{self.timestamp}" + cEventContent = GenericMessage(cHeader, cPayload) + cEvent = Event(self, EventTypes.MFRT, cEventContent) + self.send_down(cEvent) + self.send_self(cEvent) + else: + pass + + def send_estimate_message(self): + coordinator = get_coordinator() + if self is coordinator: + cHeader = GenericMessageHeader(ChandraMessageTypes.EST, self, self.value, self.timestamp) + cPayload = f"{self.estimate[0]}-{self.estimate[1]}" + cEventContent = GenericMessage(cHeader, cPayload) + cEvent = Event(self, EventTypes.MFRT, cEventContent) + self.send_down(cEvent) + else: + pass + + def send_ACK(self): + coordinator = get_coordinator() + if self is not coordinator: + cHeader = GenericMessageHeader(ChandraMessageTypes.ACK, self, coordinator) + cPayload = f"{self.estimate[0]}-{self.estimate[1]}" + cEventContent = GenericMessage(cHeader, cPayload) + cEvent = Event(self, EventTypes.MFRT, cEventContent) + self.send_down(cEvent) + else: + pass + + def send_NACK(self): + coordinator = get_coordinator() + if self is not coordinator: + cHeader = GenericMessageHeader(ChandraMessageTypes.NACK, self, coordinator) + cPayload = f"{self.estimate[0]}-{self.estimate[1]}" + cEventContent = GenericMessage(cHeader, cPayload) + cEvent = Event(self, EventTypes.MFRT, cEventContent) + self.send_down(cEvent) + else: + pass + + def send_decide_message(self): + coordinator = get_coordinator() + if self is coordinator: + cHeader = GenericMessageHeader(ChandraMessageTypes.DCD, self, None) + cPayload = f"{self.estimate[0]}-{self.estimate[1]}" + cEventContent = GenericMessage(cHeader, cPayload) + cEvent = Event(self, EventTypes.MFRT, cEventContent) + self.send_down(cEvent) + self.value = self.estimate[0] + self.timestamp = self.estimate[1] + print(f"Agreement done. Change your values with decision: {cPayload}") + else: + pass + + def on_message_from_top(self, eventobj: Event): + pass + + def set_as_coordinator(self): + self.isCoordinator = True + + def set_as_proposer(self): + self.isCoordinator = False + self.count_ACK = 0 + self.count_proposal = 0 + + def is_coordinator(self): + return self.isCoordinator + +class ChandraChannel(Channel): + # Overwrite onDeliverToComponent if you want to do something in the last pipeline stage + # onDeliver will deliver the message from the channel to the receiver component using messagefromchannel event + def on_deliver_to_component(self, eventobj: Event): + coordinator = get_coordinator() + crdname = coordinator.componentinstancenumber + callername = eventobj.eventsource.componentinstancenumber + for item in self.connectors: + callees = self.connectors[item] + for callee in callees: + calleename = callee.componentinstancenumber + # print(f"I am connected to {calleename}. Will check if I have to distribute it to {item}") + if calleename == callername or (calleename!=crdname and callername !=crdname): + pass + else: + myevent = Event(eventobj.eventsource, EventTypes.MFRB, eventobj.eventcontent, self.componentinstancenumber) + callee.trigger_event(myevent) +class ChandraConsensusNode(ComponentModel): + + def __init__(self, componentname, componentid, numberofNodes=10): + self.channel = ChandraChannel("ChandraChannel", 0) + for n in range(0, numberofNodes): + tComp = ChandraComponents("ChandraNode", n, numberofNodes) + tComp.connect_me_to_channel(ConnectorTypes.DOWN, self.channel) + registry.add_component(tComp) + + registry.get_component_by_key("ChandraNode", 0).set_as_coordinator() + super().__init__(componentname, componentid) + +def main(): + topo = Topology() + topo.construct_single_node(ChandraConsensusNode, 0) + topo.start() + while True: pass + + +if __name__ == '__main__': + main() diff --git a/NakamotoConsensus.py b/NakamotoConsensus.py new file mode 100644 index 0000000..e686bd5 --- /dev/null +++ b/NakamotoConsensus.py @@ -0,0 +1,252 @@ +import binascii +import hashlib +import sys +import threading +import time +from copy import deepcopy +from enum import Enum + +import networkx as nx +from random import choice + +import matplotlib.pyplot as plt + +from Channels import Channel +from Ahc import ComponentRegistry, Topology, GenericMessageHeader, GenericMessage, EventTypes, Event, ComponentModel + +PATH_OF_DATA = "data" +sys.setrecursionlimit(1500) +def transaction_generator(numberofNodes,txn_count): + for n in range(0,numberofNodes): + filename = f"{PATH_OF_DATA}/transaction{n}.txt" + file = open(filename,"w") + for i in range(0, txn_count): + rnd_hex=''.join([str(y) for x in range(64) for y in choice('0123456789abcdef')]) + file.write(rnd_hex + "\n") + file.close() +def hash_double(firstTxHash, secondTxHash): + unhexReverseFirst = binascii.unhexlify(firstTxHash)[::-1] + unhexReverseSecond = binascii.unhexlify(secondTxHash)[::-1] + concatInputs = unhexReverseFirst + unhexReverseSecond + firstHashInputs = hashlib.sha256(concatInputs).digest() + finalHashInputs = hashlib.sha256(firstHashInputs).digest() + return binascii.hexlify(finalHashInputs[::-1]) + +def merkle_root_calculator(hashList): + if len(hashList) == 1: + return hashList[0].decode('utf-8') + hashes = [] + for i in range(0, len(hashList) - 1, 2): + hashes.append(hash_double(hashList[i], hashList[i + 1])) + if len(hashList) % 2 == 1: + hashes.append( hashList[-1]) + return merkle_root_calculator(hashes) + +class Block: + def __init__(self, hashPrevHeader, merkleroot, timestamp, transactions, blockhash, nonce , height = 0): + self.hashPrevHeader = hashPrevHeader + self.merkleRoot = merkleroot + self.timestamp = timestamp + self.transactions = transactions + self.nonce = nonce + self.hashBlockHeader = blockhash + self.height = height + +class Blockchain: + def __init__(self, difficulty=4): + self.blocks = [] + self.difficulty = difficulty + self.generate_genesis_block() + + + def generate_genesis_block(self): + genesis_block = Block("0", "0",time.time(), [], "0", 0, 0) + self.blocks.append(genesis_block) + + def get_last_block(self): + return self.blocks[-1] + +class NkMessageTypes(Enum): + TXN = "transaction" + VLD = "validation" + QUERY_LC = "querylongestchain" + RW_LC = "responsewithlongestchain" + + +class NkComponent(ComponentModel): + def __init__(self, componentname, componentinstancenumber, difficulty=5): + super().__init__(componentname, componentinstancenumber) + self.blockchain = Blockchain() + self.txnPool = [] + self.txnLog = [] + self.difficulty = difficulty + self.isMiner = True + + # To test components with different registeries + self.speedDown = 1 + + self.lock = threading.Lock() + minerThread = threading.Thread(target=self.mine_next_block) + minerThread.daemon = True + minerThread.start() + + def on_init(self, eventobj: Event): + + with open(f"data/transaction{self.componentinstancenumber}.txt", "r") as txns: + txn_list = txns.read() + self.txnPool = txn_list.splitlines() + self.passNextBlock = True + + def print_chain(self): + print(f"{self.componentname}-{self.componentinstancenumber} blockchain is : ") + for b in self.blockchain.blocks: + print(f"{b.hashPrevHeader}-{b.hashBlockHeader}\n") + + def on_message_from_bottom(self, eventobj: Event): + msg = eventobj.eventcontent + msgType = msg.header.messagetype + msgPayload = msg.payload + msgfrom = msg.header.messagefrom + msgto = msg.header.messageto + ''' + Three types of message could be transmitted: + 1) Transaction Messages : A new transaction from client is added to transaction pool and + the node broadcast that to nearby nodes (probably miners) + 2) Validation Message : Recently mined block is recieved for validation from other nodes and local blockchain is + updated if it obeys to rules + 3) Blockchain update message : Periodically a chain queries the blockchains from neighbour nodes, if their blockchain + is more up to date and valid, replace current chain with more recent one and add transactions stored in current + blockchain to pool which doesnt included in new chain + + ''' + self.lock.acquire() + if msgType == NkMessageTypes.TXN: + if msgPayload in self.txnPool: + pass + else: + self.txnPool.append(msgPayload) + nk_header = GenericMessageHeader(NkMessageTypes.TXN, self, None) + nk_payload = msgPayload + nk_eventContent = GenericMessage(nk_header, nk_payload) + nk_event = Event(self, EventTypes.MFRT, nk_eventContent) + self.send_down(nk_event) + + elif msgType == NkMessageTypes.VLD: + if self.is_valid(msgPayload) and self.is_prev_correct(msgPayload): + self.txnPool = [txn for txn in self.txnPool if not txn in msgPayload.transactions] + self.blockchain.blocks.append(msgPayload) + nk_header = GenericMessageHeader(NkMessageTypes.VLD, self, None) + nk_payload = msgPayload + nk_eventContent = GenericMessage(nk_header, nk_payload) + nk_event = Event(self, EventTypes.MFRT, nk_eventContent) + self.send_down(nk_event) + print(f"Component {self.componentinstancenumber} validated a new block: " + f"txn count= {len(self.txnPool)} - block count= {len(self.blockchain.blocks)}\n" + f"{self.blockchain.get_last_block().hashPrevHeader}") + self.passNextBlock = True + + elif msgPayload.height > self.blockchain.get_last_block().height: + print(f"Component {self.componentinstancenumber} the message block height is higher ") + self.search_longest_chain() + + elif msgType == NkMessageTypes.QUERY_LC: + # response to sender + nk_header = GenericMessageHeader(NkMessageTypes.RW_LC, self, msgfrom) + nk_payload = deepcopy(self.blockchain) + nk_eventContent = GenericMessage(nk_header, nk_payload) + nk_event = Event(self, EventTypes.MFRT, nk_eventContent) + self.send_down(nk_event) + + elif msgType == NkMessageTypes.RW_LC and msgto is self: + if msgPayload.get_last_block().height > self.blockchain.get_last_block().height: + nTxn = [] + self.txnPool.extend(self.txnLog) # Restore back all txn mined before to txn pool + self.txnLog.clear() # Not necessary but memory overload causes crashes + for block in msgPayload.blocks: + nTxn.extend(block.transactions) + self.txnPool = [txn for txn in self.txnPool if txn not in nTxn] # Eliminate the ones in new ledger + self.blockchain = msgPayload + self.passNextBlock = False + print(f"Component {self.componentinstancenumber} received a longer chain: " + f"txn count= {len(self.txnPool)} - block count= {len(self.blockchain.blocks)}\n" + f"{self.blockchain.get_last_block().hashPrevHeader}") + + else: + pass + self.lock.release() + + def is_valid(self, inBlock: Block): + prvHeader = inBlock.hashPrevHeader + mrklRoot = inBlock.merkleRoot + nonce = inBlock.nonce + hdr = (mrklRoot + str(nonce) + prvHeader).encode('utf-8') + blockHash = hashlib.sha256(hdr).hexdigest() + return blockHash.startswith("0" * self.difficulty) + + def is_prev_correct(self, inBlock: Block): + currBlockHash = self.blockchain.blocks[-1].hashBlockHeader + inBlockPrvHash = inBlock.hashPrevHeader + return currBlockHash == inBlockPrvHash + + def mine_next_block(self, minTxnPackage=10): + while self.isMiner: + if minTxnPackage <= len(self.txnPool): + txnPending = self.txnPool[:minTxnPackage] + lastBlock = self.blockchain.get_last_block() + hashPrevHeader = lastBlock.hashBlockHeader + height = lastBlock.height + 1 + merkleRoot = merkle_root_calculator(txnPending) + nonce = 0 + isMined = False + self.passNextBlock = False + time.sleep(self.speedDown) + while not self.passNextBlock and not isMined: + hdr = (merkleRoot + str(nonce) + hashPrevHeader).encode('utf-8') + blockHash = hashlib.sha256(hdr).hexdigest() + if blockHash.startswith("0" * self.difficulty): + self.lock.acquire() + self.txnPool = [txn for txn in self.txnPool if txn not in txnPending] + self.txnLog.extend(txnPending) + currTime = time.localtime() + newBlock = Block(hashPrevHeader, merkleRoot, currTime, txnPending, blockHash, nonce, height) + self.blockchain.blocks.append(newBlock) + + nkHeader = GenericMessageHeader(NkMessageTypes.VLD, self, None) + nkPayload = newBlock + nkEventContent = GenericMessage(nkHeader, nkPayload) + nkEvent = Event(self, EventTypes.MFRT, nkEventContent) + self.send_down(nkEvent) + + isMined = True + print(f"Component {self.componentinstancenumber} mined a new block: " + f"txn count= {len(self.txnPool)} - block count= {len(self.blockchain.blocks)}\n" + f"{self.blockchain.get_last_block().hashPrevHeader}") + self.lock.release() + else: + nonce = nonce + 1 + + def search_longest_chain(self): + self.passNextBlock = True + print(f"Component {self.componentinstancenumber} is asking for longer chain") + nk_payload = "asking for longer chain" + nk_header = GenericMessageHeader(NkMessageTypes.QUERY_LC, self, None) + nk_eventContent = GenericMessage(nk_header, nk_payload) + nk_event = Event(self, EventTypes.MFRT, nk_eventContent) + self.send_down(nk_event) + +def main(): + number_of_nodes = 5 + number_of_txn = 50 + G = nx.erdos_renyi_graph(number_of_nodes, 0.4) + transaction_generator(number_of_nodes,number_of_txn) + topo = Topology() + topo.construct_from_graph(G, NkComponent, Channel) + + ComponentRegistry().print_components() + topo.start() + nx.draw(G, with_labels=True, font_weight='bold') + plt.show() + +if __name__ == '__main__': + main() + while True: pass \ No newline at end of file -- GitLab