class StreamSources: def __init__(self, buffer_size, normal_send_interval, failure_mode_thres, failure_mode_check_past_n, normal_mode_thres, normal_mode_check_past_n): assert failure_mode_thres <= failure_mode_check_past_n, f"failure_mode checker condition is invaild!, failure_mode needs {failure_mode_thres} fails in {failure_mode_check_past_n}, which is not possible!" assert failure_mode_thres <= failure_mode_check_past_n, f"normal_mode checker condition is invaild!, normal_mode needs {failure_mode_thres} fails in {normal_mode_check_past_n}, which is not possible!" assert buffer_size < failure_mode_check_past_n, f"'buffer_size' is smaller then failure_mode_thres! This is not possible! This will cause program to never enter failure mode!! \nPrinting relevent args and shutting down!\n buffer_size : {buffer_size}\n failure_mode_thres : {failure_mode_thres}" assert buffer_size < normal_mode_thres, f"'buffer_size' is smaller then normal_mode_thres! This is will cause the program to never revert back to normal mode!! \nPrinting relevent args and shutting down!\n buffer_size : {buffer_size}\n normal_mode_thres : {normal_mode_thres}" self.sources = {} self.buffer_size = buffer_size self.normal_send_interval = normal_send_interval if failure_mode_thres == failure_mode_check_past_n: self.switching_fail_consecutive_mode = True else: self.switching_fail_consecutive_mode = False if normal_mode_thres == normal_mode_check_past_n: self.switching_normal_consecutive_mode = True else self.switching_normal_consecutive_mode = False self.failure_mode_thres = failure_mode_thres self.failure_mode_check_past_n = failure_mode_check_past_n self.normal_mode_thres = normal_mode_thres def __setitem__(self, key): if key not in self.sources: self.sources[key] = { "status_counts": [], "ok_counts": 0, "force_send_mode": False } else : raise "Error! Attempting to access that has no key!" # Update logic here if needed def __getitem__(self, key): return self.sources[key] def add_status(self, source, status): assert status == "OK" or status == "FAIL" , f"Invalid status was given!, status must be one of 'OK' or 'FAIL', but given '{status}'!" if source not in self.sources: self[source] = {} # Initializes source if not existing self.sources[source]["status_counts"].append(status) if len(self.sources[source]["status_counts"]) > self.buffer_size): self.sources[source]["status_counts"].pop(0) # Your existing logic for updating counts and checking statuses if status == 'OK': self.sources[source]["ok_counts"] += 1 if self.sources[source]["force_send_mode"] and self.sources[source]["ok_counts"] >= self.normal_mode_thres: self.sources[source]["force_send_mode"] = False self.send_message(source, "OK SEND") else: self.sources[source]["ok_counts"] = 0 # Reset on FAIL self.check_failures(source) def check_failures(self, source): if self.switching_fail_consecutive_mode: if len(self.sources[source]["status_counts"]) >= failure_mode_thres and all(status == 'FAIL' for status in self.sources[source]["status_counts"][-failure_mode_thres:]): print(f"Source {source} has 5 consecutive FAILs!") self.sources[source]["force_send_mode"] = True self.send_message(source, "FORCE SEND") else : pass def send_message(self, source, message_type): print(f"Sending message for {source} - Status: {message_type}") # Reset the count after sending message self.sources[source]["ok_counts"] = 0