From 8a222965359e7f24c57f4318415ebc7faec6e490 Mon Sep 17 00:00:00 2001 From: Erman Yafay Date: Tue, 18 May 2021 17:34:36 +0300 Subject: [PATCH 1/3] initial chandy-lamport snapshot algorithm --- Ahc.py | 9 ++- Snapshot.py | 143 ++++++++++++++++++++++++++++++++++++++++++ tests/testsnapshot.py | 22 +++++++ 3 files changed, 173 insertions(+), 1 deletion(-) mode change 100644 => 100755 Ahc.py create mode 100644 Snapshot.py create mode 100755 tests/testsnapshot.py diff --git a/Ahc.py b/Ahc.py old mode 100644 new mode 100755 index b2d5487..303256a --- a/Ahc.py +++ b/Ahc.py @@ -209,6 +209,13 @@ class ComponentModel: self.connectors[name] = channel connectornameforchannel = self.componentname + str(self.componentinstancenumber) channel.connect_me_to_component(connectornameforchannel, self) + self.on_connected_to_channel(name, channel) + + def on_connected_to_channel(self, name, channel): + print(f"Connected to channel: {name}:{channel.componentinstancenumber}") + + def unique_name(self): + return f"{self.componentname}.{self.componentinstancenumber}" def terminate(self): self.terminated = True @@ -315,7 +322,7 @@ class Topology: #mypath = path[i][j] # print(f"{i}to{j} path = {path[i][j]} nexthop = {path[i][j][1]}") #self.ForwardingTable[i][j] = path[i][j][1] - + # print(f"{i}to{j}path = NONE") #self.ForwardingTable[i][j] = inf # No paths #except IndexError: diff --git a/Snapshot.py b/Snapshot.py new file mode 100644 index 0000000..b5f5fda --- /dev/null +++ b/Snapshot.py @@ -0,0 +1,143 @@ +from enum import IntEnum, auto +from Ahc import ComponentModel, Event, GenericMessage, GenericMessageHeader +from Ahc import EventTypes +from collections import defaultdict + + +class ChandyLamportMessageTypes(IntEnum): + MARK = auto() + GSU = auto() + + +class ChandyLamportComponentModel(ComponentModel): + """A ComponentModel that you can take a snapshot of using the + Chandy-Lamport algorithm""" + + def channel_of(self, eventobj: Event): + from_chnl = eventobj.fromchannel + if from_chnl is None: + raise Exception(f"Received {ChandyLamportEventTypes.MARK} from a " + "non-channel component") + + return from_chnl + + def gsu_recv(self, recv_state, from_chnl): + self.global_state |= recv_state + self.gsu_chnls.add(from_chnl) + if self.gsu_chnls != self.in_chnls: + return + + self.gsu_chnls.clear() + if not self.init_snapshot: + return + + print(f"Reporting snapshot result:") + for comp, state in self.global_state.items(): + print(f"State of: {comp}:") + for i in range(len(state)): + print(f" {i}: {state[i].event}") + + self.init_snapshot = False + + + def broadcast(self, event: Event): + self.send_down(event) + self.send_peer(event) + self.send_up(event) + + def mark_send(self): + self.state = list(self.inputqueue.queue) + mark_msg = GenericMessage( + GenericMessageHeader(ChandyLamportMessageTypes.MARK, None, None), + None) + self.broadcast(Event(self, EventTypes.MFRT, mark_msg)) + + def report_snapshot(self): + """Initializes a global snapshot and a report will be printed out when + complete""" + self.init_snapshot = True + self.mark_send() + + def mark_recv(self, from_chnl): + print(f"MARK recevied from channel: {from_chnl}") + if self.state is None: + self.mark_send() + self.in_chnl_states[from_chnl] = [] + else: + self.in_chnl_states[from_chnl] = self.in_chnl_events[from_chnl] + + self.mark_recv_chnls.add(from_chnl) + if self.mark_recv_chnls == self.in_chnls: + # Local snapshot completed, broadcast and reset the local state + local_state = { self.unique_name(): self.state } + for c, s in self.in_chnl_states.items(): + local_state[str(c)] = s + + self.global_state |= local_state + gsu_msg = GenericMessage( + GenericMessageHeader(ChandyLamportMessageTypes.GSU, None, None), + self.global_state) + self.broadcast(Event(self, EventTypes.MFRT, gsu_msg)) + self.reset_state() + + + def msg_recv(self, event: Event): + from_chnl = self.channel_of(event) + # If received message is of type MARK or GSU; process them separately + if type(contnt := event.eventcontent) == GenericMessage and\ + type(header := contnt.header) == GenericMessageHeader: + if header.messagetype == ChandyLamportMessageTypes.MARK: + self.mark_recv(from_chnl) + return + + if header.messagetype == ChandyLamportMessageTypes.GSU: + self.gsu_recv(contnt.payload, from_chnl) + return + + if self.state is None: + return + + # If the state is not recorded + if from_chnl not in self.in_chnl_states: + self.in_chnl_events[from_chnl].append(event) + + # When overridden call this function with 'super' + def on_message_from_bottom(self, eventobj: Event): + return self.msg_recv(eventobj) + + # When overridden call this function with 'super' + def on_message_from_peer(self, eventobj: Event): + return self.msg_recv(eventobj) + + # When overridden call this function with 'super' + def on_message_from_top(self, eventobj: Event): + return self.msg_recv(eventobj) + + def on_connected_to_channel(self, name, channel): + super().on_connected_to_channel(name, channel) + self.in_chnls.add(channel.componentinstancenumber) + + + def connect_me_to_component(self, name, component): + raise Exception(f"Only channels are allowed for connection to" + " {self.__class__}") + + + def reset_state(self): + self.state = None + self.in_chnl_states.clear() + self.in_chnl_events.clear() + self.mark_recv_chnls.clear() + + def __init__(self, componentname, componentinstancenumber, + num_worker_threads=1): + super().__init__(componentname, componentinstancenumber, + num_worker_threads=num_worker_threads) + self.state = None + self.global_state = dict() + self.in_chnl_states = dict() + self.in_chnl_events = defaultdict(list) + self.mark_recv_chnls = set() + self.in_chnls = set() + self.init_snapshot = False + self.gsu_chnls = set() diff --git a/tests/testsnapshot.py b/tests/testsnapshot.py new file mode 100755 index 0000000..251b429 --- /dev/null +++ b/tests/testsnapshot.py @@ -0,0 +1,22 @@ +#!/usr/bin/env python3 + +from Channels import Channel +from Snapshot import ChandyLamportComponentModel +from Ahc import Topology +import matplotlib.pyplot as plt +import networkx as nx + + +def main(): + topo = Topology() + topo.construct_sender_receiver(ChandyLamportComponentModel, + ChandyLamportComponentModel, Channel) + nx.draw(topo.G, with_labels=True, font_weight='bold') + plt.draw() + topo.start() + topo.sender.report_snapshot() + plt.show() + + +if __name__ == "__main__": + exit(main()) -- GitLab From 7e6097f1171c5bcc7504cf1649dfbd56996c1dba Mon Sep 17 00:00:00 2001 From: Erman Yafay Date: Wed, 26 May 2021 12:25:13 +0300 Subject: [PATCH 2/3] lai-yang algorithm --- Ahc.py | 35 ++++- Channels.py | 13 +- Snapshot.py | 318 +++++++++++++++++++++++++++++++----------- tests/testsnapshot.py | 15 +- 4 files changed, 296 insertions(+), 85 deletions(-) diff --git a/Ahc.py b/Ahc.py index 303256a..e3ed5f9 100755 --- a/Ahc.py +++ b/Ahc.py @@ -82,10 +82,21 @@ class ConnectorTypes(Enum): UP = "UP" PEER = "PEER" +def auto_str(cls): + def __str__(self): + return '%s(%s)' % ( + type(self).__name__, + ', '.join('%s=%s' % item for item in vars(self).items()) + ) + cls.__str__ = __str__ + return cls + +@auto_str class GenericMessagePayload: def __init__(self, messagepayload): self.messagepayload = messagepayload +@auto_str class GenericMessageHeader: def __init__(self, messagetype, messagefrom, messageto, nexthop=float('inf'), interfaceid=float('inf'), sequencenumber=-1): self.messagetype = messagetype @@ -95,19 +106,37 @@ class GenericMessageHeader: self.interfaceid = interfaceid self.sequencenumber = sequencenumber +@auto_str class GenericMessage: def __init__(self, header, payload): self.header = header self.payload = payload self.uniqueid = str(header.messagefrom) + "-" + str(header.sequencenumber) +@auto_str class Event: - def __init__(self, eventsource, event, eventcontent, fromchannel=None): + curr_event_id = 0 + + def __init__(self, eventsource, event, eventcontent, fromchannel=None, + eventid=-1): self.eventsource = eventsource self.event = event self.time = datetime.datetime.now() self.eventcontent = eventcontent self.fromchannel = fromchannel + self.eventid = eventid + if self.eventid == -1: + self.eventid = self.curr_event_id + self.curr_event_id += 1 + + def __eq__(self, other) -> bool: + if type(other) is not Event: + return False + + return self.eventid == other.eventid + + def __hash__(self) -> int: + return self.eventid def singleton(cls): instance = [None] @@ -214,6 +243,9 @@ class ComponentModel: def on_connected_to_channel(self, name, channel): print(f"Connected to channel: {name}:{channel.componentinstancenumber}") + def on_pre_event(self, event): + pass + def unique_name(self): return f"{self.componentname}.{self.componentinstancenumber}" @@ -249,6 +281,7 @@ class ComponentModel: while not self.terminated: workitem = myqueue.get() if workitem.event in self.eventhandlers: + self.on_pre_event(workitem) self.eventhandlers[workitem.event](eventobj=workitem) # call the handler else: print(f"Event Handler: {workitem.event} is not implemented") diff --git a/Channels.py b/Channels.py index a6e9bbd..0e9a274 100644 --- a/Channels.py +++ b/Channels.py @@ -34,14 +34,18 @@ class Channel(ComponentModel): # Overwrite onSendToChannel if you want to do something in the first pipeline stage def on_message_from_top(self, eventobj: Event): # channel receives the input message and will process the message by the process event in the next pipeline stage - myevent = Event(eventobj.eventsource, ChannelEventTypes.INCH, eventobj.eventcontent) + # Preserve the event id through the pipeline + myevent = Event(eventobj.eventsource, ChannelEventTypes.INCH, + eventobj.eventcontent, eventid=eventobj.eventid) self.channelqueue.put_nowait(myevent) # Overwrite onProcessInChannel if you want to do something in interim pipeline stage def on_process_in_channel(self, eventobj: Event): # Add delay, drop, change order whatever.... # Finally put the message in outputqueue with event deliver - myevent = Event(eventobj.eventsource, ChannelEventTypes.DLVR, eventobj.eventcontent) + # Preserve the event id through the pipeline + myevent = Event(eventobj.eventsource, ChannelEventTypes.DLVR, + eventobj.eventcontent, eventid=eventobj.eventid) self.outputqueue.put_nowait(myevent) # Overwrite onDeliverToComponent if you want to do something in the last pipeline stage @@ -56,7 +60,10 @@ class Channel(ComponentModel): if calleename == callername: pass else: - myevent = Event(eventobj.eventsource, EventTypes.MFRB, eventobj.eventcontent, self.componentinstancenumber) + # Preserve the event id through the pipeline + myevent = Event(eventobj.eventsource, EventTypes.MFRB, + eventobj.eventcontent, self.componentinstancenumber, + eventid=eventobj.eventid) callee.trigger_event(myevent) def __init__(self, componentname, componentinstancenumber): diff --git a/Snapshot.py b/Snapshot.py index b5f5fda..366c628 100644 --- a/Snapshot.py +++ b/Snapshot.py @@ -1,85 +1,172 @@ -from enum import IntEnum, auto +from enum import Enum from Ahc import ComponentModel, Event, GenericMessage, GenericMessageHeader from Ahc import EventTypes from collections import defaultdict +class SnapshotEventTypes(Enum): + # Take snapshot event + TS = "TS" -class ChandyLamportMessageTypes(IntEnum): - MARK = auto() - GSU = auto() +class SnapshotMessageTypes(Enum): + GSU = "GSU" -class ChandyLamportComponentModel(ComponentModel): - """A ComponentModel that you can take a snapshot of using the - Chandy-Lamport algorithm""" +class SnapshotComponentModel(ComponentModel): + + def __init__(self, componentname, componentinstancenumber, + num_worker_threads=1): + super().__init__(componentname, componentinstancenumber, + num_worker_threads=num_worker_threads) + self.state = None + self.gsu_redirected_comps = set() + self.recv_events = [] + self.chnls = set() + self.init_snapshot = False + self.eventhandlers[SnapshotEventTypes.TS] = self.take_snapshot + + def connect_me_to_component(self, name, component): + raise Exception(f"Only channels are allowed for connection to" + " {self.__class__}") + + def on_connected_to_channel(self, name, channel): + super().on_connected_to_channel(name, channel) + self.chnls.add(channel.componentinstancenumber) def channel_of(self, eventobj: Event): from_chnl = eventobj.fromchannel if from_chnl is None: - raise Exception(f"Received {ChandyLamportEventTypes.MARK} from a " - "non-channel component") + raise Exception(f"Received a message from a non-channel component") return from_chnl - def gsu_recv(self, recv_state, from_chnl): - self.global_state |= recv_state - self.gsu_chnls.add(from_chnl) - if self.gsu_chnls != self.in_chnls: - return + def on_pre_event(self, event): + return self.recv_events.append(event) - self.gsu_chnls.clear() - if not self.init_snapshot: - return + def msg_recv(self, event: Event): + """Generic message received function""" + pass - print(f"Reporting snapshot result:") - for comp, state in self.global_state.items(): - print(f"State of: {comp}:") - for i in range(len(state)): - print(f" {i}: {state[i].event}") + def send_msg(self, event: Event): + pass - self.init_snapshot = False + def send_gsu(self, local_state): + gsu_msg = GenericMessage( + GenericMessageHeader(SnapshotMessageTypes.GSU, None, None), + local_state) + self.send_msg(Event(self, EventTypes.MFRT, gsu_msg)) + + def gsu_recv(self, state): + # Redirect the GSU if we are not the source component of the snapshot + if state.component_id not in self.gsu_redirected_comps: + self.gsu_redirected_comps.add(state.component_id) + self.send_gsu(state) + + self.on_gsu_recv(state) + + def on_gsu_recv(self, state): + pass + + def on_take_snapshot(self): + """Generic report snapshot""" + pass + + def take_snapshot(self, eventobj: Event): + self.init_snapshot = True + self.on_take_snapshot() + + # When overridden call this function with 'super' + def on_message_from_bottom(self, eventobj: Event): + return self.msg_recv(eventobj) + + # When overridden call this function with 'super' + def on_message_from_peer(self, eventobj: Event): + return self.msg_recv(eventobj) + + # When overridden call this function with 'super' + def on_message_from_top(self, eventobj: Event): + return self.msg_recv(eventobj) + + def reset_state(self): + self.state = None + self.gsu_redirected_comps.clear() + +class ChandyLamportMessageTypes(Enum): + MARK = "MARK" + + +class ChandyLamportState: + def __init__(self, component, state, chnl_states): + self.component_id = component + self.component_state = [] + for s in state: + self.component_state.append(s) + + self.chnl_states = defaultdict(list) + for c, s in chnl_states.items(): + self.chnl_states[c].append(s) + +class ChandyLamportComponentModel(SnapshotComponentModel): + """A ComponentModel that you can take a snapshot of using the + Chandy-Lamport algorithm""" + + def __init__(self, componentname, componentinstancenumber, + num_worker_threads=1): + super().__init__(componentname, componentinstancenumber, + num_worker_threads=num_worker_threads) + self.global_state = dict() + self.in_chnl_states = defaultdict(list) + self.in_chnl_events = defaultdict(list) + self.mark_recv_chnls = set() + self.gsu_chnls = set() + + def on_gsu_recv(self, state: ChandyLamportState): + if not self.init_snapshot: + return + report=f"State of component: {state.component_id}=" + report += ", ".join(str(e) for e in state.component_state) + print(report) + for chnl, events in state.chnl_states.items(): + chnl_rep = f"State of channel: {chnl}=" + chnl_rep += ", ".join(str(e) for e in events) + print(chnl_rep) - def broadcast(self, event: Event): + def send_msg(self, event: Event): self.send_down(event) - self.send_peer(event) - self.send_up(event) def mark_send(self): - self.state = list(self.inputqueue.queue) + self.state = [] + for re in self.recv_events: + self.state.append(re) + mark_msg = GenericMessage( GenericMessageHeader(ChandyLamportMessageTypes.MARK, None, None), None) - self.broadcast(Event(self, EventTypes.MFRT, mark_msg)) + self.send_msg(Event(self, EventTypes.MFRT, mark_msg)) - def report_snapshot(self): + def on_take_snapshot(self): """Initializes a global snapshot and a report will be printed out when complete""" - self.init_snapshot = True self.mark_send() def mark_recv(self, from_chnl): - print(f"MARK recevied from channel: {from_chnl}") if self.state is None: self.mark_send() self.in_chnl_states[from_chnl] = [] else: - self.in_chnl_states[from_chnl] = self.in_chnl_events[from_chnl] + for e in self.in_chnl_events[from_chnl]: + self.in_chnl_states[e].append(e) self.mark_recv_chnls.add(from_chnl) - if self.mark_recv_chnls == self.in_chnls: + if self.mark_recv_chnls == self.chnls: # Local snapshot completed, broadcast and reset the local state - local_state = { self.unique_name(): self.state } - for c, s in self.in_chnl_states.items(): - local_state[str(c)] = s - - self.global_state |= local_state + local_state = ChandyLamportState(self.componentinstancenumber, + self.state, self.in_chnl_states) gsu_msg = GenericMessage( - GenericMessageHeader(ChandyLamportMessageTypes.GSU, None, None), - self.global_state) - self.broadcast(Event(self, EventTypes.MFRT, gsu_msg)) - self.reset_state() - + GenericMessageHeader(SnapshotMessageTypes.GSU, None, None), + local_state) + self.send_msg(Event(self, EventTypes.MFRT, gsu_msg)) + self.gsu_recv(local_state) def msg_recv(self, event: Event): from_chnl = self.channel_of(event) @@ -88,56 +175,131 @@ class ChandyLamportComponentModel(ComponentModel): type(header := contnt.header) == GenericMessageHeader: if header.messagetype == ChandyLamportMessageTypes.MARK: self.mark_recv(from_chnl) - return + elif header.messagetype == SnapshotMessageTypes.GSU: + self.gsu_recv(contnt.payload) - if header.messagetype == ChandyLamportMessageTypes.GSU: - self.gsu_recv(contnt.payload, from_chnl) - return + return event if self.state is None: - return + return event # If the state is not recorded if from_chnl not in self.in_chnl_states: self.in_chnl_events[from_chnl].append(event) - # When overridden call this function with 'super' - def on_message_from_bottom(self, eventobj: Event): - return self.msg_recv(eventobj) - - # When overridden call this function with 'super' - def on_message_from_peer(self, eventobj: Event): - return self.msg_recv(eventobj) - - # When overridden call this function with 'super' - def on_message_from_top(self, eventobj: Event): - return self.msg_recv(eventobj) - - def on_connected_to_channel(self, name, channel): - super().on_connected_to_channel(name, channel) - self.in_chnls.add(channel.componentinstancenumber) - - - def connect_me_to_component(self, name, component): - raise Exception(f"Only channels are allowed for connection to" - " {self.__class__}") - + return event def reset_state(self): - self.state = None + super().reset_state() self.in_chnl_states.clear() self.in_chnl_events.clear() self.mark_recv_chnls.clear() + +class LaiYangState: + def __init__(self, comp_id, comp_state, received, sent): + self.component_id = comp_id + self.component_state = [] + for cs in comp_state: + self.component_state.append(cs) + + self.received = defaultdict(list) + for chnl, r in received.items(): + self.received[chnl].append(r) + + self.sent = defaultdict(list) + for chnl, s in sent.items(): + self.sent[chnl].append(s) + +class LaiYangComponentModel(SnapshotComponentModel): def __init__(self, componentname, componentinstancenumber, - num_worker_threads=1): + num_worker_threads=1): super().__init__(componentname, componentinstancenumber, num_worker_threads=num_worker_threads) - self.state = None + self.chnl_recv = defaultdict(list) + self.chnl_sent = defaultdict(list) self.global_state = dict() - self.in_chnl_states = dict() - self.in_chnl_events = defaultdict(list) - self.mark_recv_chnls = set() - self.in_chnls = set() - self.init_snapshot = False - self.gsu_chnls = set() + self.sent_remaining = dict() + self.recv_remaining = dict() + + def send_msg(self, event: Event): + event.eventcontent = (event.eventcontent, self.state is not None) + for c in self.chnls: + self.chnl_sent[c].append(event) + + self.send_down(event) + + def handle_snapshot(self): + # Take a snapshot + self.state = LaiYangState(self.componentinstancenumber, + self.recv_events, self.chnl_recv, + self.chnl_sent) + self.gsu_recv(self.state) + + def on_take_snapshot(self): + self.handle_snapshot() + + # Broadcast a dummy message so that other components record + # and broadcast their snapshots + self.send_msg(Event(self, EventTypes.MFRT, "dummy")) + + def report_and_save_channel_state(self, channel, set_recv, set_sent): + if not set_recv.issubset(set_sent): + raise Exception("Not a consistent global state") + + chnl_state = list(set_sent - set_recv) + self.global_state[channel] = chnl_state + print(f"State of channel: {channel}=chnl_state") + + def on_gsu_recv(self, state: LaiYangState): + if not self.init_snapshot: + return + # Report the snapshot if we are the source component of the snapshot + self.global_state[state.component_id] = state.component_state + report = f"State of component: {state.component_id}=" + report += ", ".join(str(e) for e in state.component_state) + print(report) + + # Compute the messages in transit + for chnl, recv in state.received: + if chnl in self.sent_remaining: + self.report_and_save_channel_state( + chnl, set(recv), set(self.sent_remaining[chnl])) + else: + self.recv_remaining[chnl] = recv + + for chnl, sent in state.sent: + if chnl in self.recv_remaining: + self.report_and_save_channel_state( + chnl, set(self.recv_remaining[chnl]), set(sent)) + else: + self.sent_remaining[chnl] = sent + + def msg_recv(self, event: Event): + content = event.eventcontent + if type(content) is not tuple or len(content) != 2: + raise Exception("Malformed message received by: " + "{self.unique_name()}") + + # Unpack the event content and modify the event with the actual content + act_cntnt, post_snapshot = content + event.eventcontent = act_cntnt + + if self.state is None and post_snapshot: + self.handle_snapshot() + + from_chnl = self.channel_of(event) + self.chnl_recv[from_chnl].append(event) + + # If not a GSU message return the modified event + if type(act_cntnt) != GenericMessage or\ + type(header := act_cntnt.header) != GenericMessageHeader or\ + header.messagetype != SnapshotMessageTypes.GSU: + return event + + self.gsu_recv(act_cntnt.payload) + return event + + def reset_state(self): + super().reset_state() + self.global_state.clear() diff --git a/tests/testsnapshot.py b/tests/testsnapshot.py index 251b429..7c70584 100755 --- a/tests/testsnapshot.py +++ b/tests/testsnapshot.py @@ -1,8 +1,9 @@ #!/usr/bin/env python3 from Channels import Channel -from Snapshot import ChandyLamportComponentModel -from Ahc import Topology +from Snapshot import ChandyLamportComponentModel, LaiYangComponentModel +from Snapshot import SnapshotEventTypes +from Ahc import Event, Topology import matplotlib.pyplot as plt import networkx as nx @@ -14,7 +15,15 @@ def main(): nx.draw(topo.G, with_labels=True, font_weight='bold') plt.draw() topo.start() - topo.sender.report_snapshot() + topo.sender.send_self(Event(topo.sender, SnapshotEventTypes.TS, None)) + plt.show() + + topo.construct_sender_receiver(LaiYangComponentModel, + LaiYangComponentModel, Channel) + nx.draw(topo.G, with_labels=True, font_weight='bold') + plt.draw() + topo.start() + topo.sender.send_self(Event(topo.sender, SnapshotEventTypes.TS, None)) plt.show() -- GitLab From bd6b8e2e226d5de988ed92f76b895938a155c3bd Mon Sep 17 00:00:00 2001 From: Erman Yafay Date: Wed, 26 May 2021 17:04:56 +0300 Subject: [PATCH 3/3] Final submission --- Snapshot.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/Snapshot.py b/Snapshot.py index 366c628..e569716 100644 --- a/Snapshot.py +++ b/Snapshot.py @@ -135,10 +135,12 @@ class ChandyLamportComponentModel(SnapshotComponentModel): self.send_down(event) def mark_send(self): + # Record the state self.state = [] for re in self.recv_events: self.state.append(re) + # Broadcast the mark message mark_msg = GenericMessage( GenericMessageHeader(ChandyLamportMessageTypes.MARK, None, None), None) @@ -151,15 +153,17 @@ class ChandyLamportComponentModel(SnapshotComponentModel): def mark_recv(self, from_chnl): if self.state is None: + # First mark message, save component and channel state self.mark_send() self.in_chnl_states[from_chnl] = [] else: + # Consequent mark messages, save channel states for e in self.in_chnl_events[from_chnl]: self.in_chnl_states[e].append(e) self.mark_recv_chnls.add(from_chnl) if self.mark_recv_chnls == self.chnls: - # Local snapshot completed, broadcast and reset the local state + # Local snapshot completed, broadcast the local state local_state = ChandyLamportState(self.componentinstancenumber, self.state, self.in_chnl_states) gsu_msg = GenericMessage( @@ -285,6 +289,7 @@ class LaiYangComponentModel(SnapshotComponentModel): act_cntnt, post_snapshot = content event.eventcontent = act_cntnt + # We are white and the message is post-snapshot if self.state is None and post_snapshot: self.handle_snapshot() -- GitLab