mirror of
https://github.com/markqvist/Reticulum.git
synced 2026-05-21 07:14:48 -07:00
Implemented remote blackhole list updater
This commit is contained in:
@@ -375,3 +375,89 @@ class InterfaceDiscovery():
|
||||
return
|
||||
|
||||
if self.discovery_callback and callable(self.discovery_callback): self.discovery_callback(info)
|
||||
|
||||
class BlackholeUpdater():
|
||||
INITIAL_WAIT = 20
|
||||
JOB_INTERVAL = 60
|
||||
UPDATE_INTERVAL = 3*60*60
|
||||
SOURCE_TIMEOUT = 25
|
||||
|
||||
def __init__(self):
|
||||
self.last_updates = {}
|
||||
self.should_run = False
|
||||
self.job_interval = self.JOB_INTERVAL
|
||||
self.update_lock = threading.Lock()
|
||||
|
||||
def start(self):
|
||||
if not self.should_run:
|
||||
source_count = len(RNS.Reticulum.blackhole_sources())
|
||||
ms = "" if source_count == 1 else "s"
|
||||
RNS.log(f"Starting blackhole updater with {source_count} source{ms}", RNS.LOG_DEBUG)
|
||||
self.should_run = True
|
||||
threading.Thread(target=self.job, daemon=True).start()
|
||||
|
||||
def stop(self): self.should_run = False
|
||||
|
||||
def update_link_established(self, link):
|
||||
remote_identity = link.get_remote_identity()
|
||||
RNS.log(f"Link established for blackhole list update from {RNS.prettyhexrep(remote_identity.hash)}", RNS.LOG_DEBUG)
|
||||
receipt = link.request("/list")
|
||||
while not receipt.concluded(): time.sleep(0.2)
|
||||
response = receipt.get_response()
|
||||
link.teardown()
|
||||
|
||||
if type(response) == dict: blackhole_list = response
|
||||
else: blackhole_list = None
|
||||
|
||||
if blackhole_list:
|
||||
added = 0
|
||||
for identity_hash in blackhole_list:
|
||||
entry = blackhole_list[identity_hash]
|
||||
if not identity_hash in RNS.Transport.blackholed_identities:
|
||||
RNS.Transport.blackholed_identities[identity_hash] = entry
|
||||
added += 1
|
||||
|
||||
if added > 0:
|
||||
spec = "identity" if added == 1 else "identities"
|
||||
RNS.log(f"Added {added} blackholed {spec} from {RNS.prettyhexrep(remote_identity.hash)}", RNS.LOG_DEBUG)
|
||||
|
||||
try:
|
||||
sourcelistpath = os.path.join(RNS.Reticulum.blackholepath, RNS.hexrep(remote_identity.hash, delimit=False))
|
||||
tmppath = f"{sourcelistpath}.tmp"
|
||||
with open(tmppath, "wb") as f: f.write(msgpack.packb(blackhole_list))
|
||||
if os.path.isfile(sourcelistpath): os.unlink(sourcelistpath)
|
||||
os.rename(tmppath, sourcelistpath)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"Error while persisting blackhole list from {RNS.prettyhexrep(remote_identity.hash)}: {e}", RNS.LOG_ERROR)
|
||||
|
||||
RNS.log(f"Blackhole list update from {RNS.prettyhexrep(remote_identity.hash)} completed", RNS.LOG_DEBUG)
|
||||
|
||||
def job(self):
|
||||
time.sleep(self.INITIAL_WAIT)
|
||||
while self.should_run:
|
||||
try:
|
||||
now = time.time()
|
||||
for identity_hash in RNS.Reticulum.blackhole_sources():
|
||||
if identity_hash in self.last_updates: last_update = self.last_updates[identity_hash]
|
||||
else: last_update = 0
|
||||
|
||||
if now > last_update+self.UPDATE_INTERVAL:
|
||||
try:
|
||||
destination_hash = RNS.Destination.hash_from_name_and_identity("rnstransport.info.blackhole", identity_hash)
|
||||
RNS.log(f"Attempting blackhole list update from {RNS.prettyhexrep(identity_hash)}...", RNS.LOG_DEBUG)
|
||||
if not RNS.Transport.await_path(destination_hash): RNS.log(f"No path available for blackhole list update from {RNS.prettyhexrep(identity_hash)}, retrying later", RNS.LOG_VERBOSE)
|
||||
else:
|
||||
remote_identity = RNS.Identity.recall(destination_hash)
|
||||
destination = RNS.Destination(remote_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, "rnstransport", "info", "blackhole")
|
||||
RNS.Link(destination, established_callback=self.update_link_established)
|
||||
self.last_updates[identity_hash] = time.time()
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"Error while establishing link for blackhole list update from {RNS.prettyhexrep(identity_hash)}: {e}", RNS.LOG_ERROR)
|
||||
|
||||
except Exception as e:
|
||||
RNS.log(f"Error in blackhole list updater job: {e}", RNS.LOG_ERROR)
|
||||
RNS.trace_exception(e)
|
||||
|
||||
time.sleep(self.job_interval)
|
||||
Reference in New Issue
Block a user