diff --git a/RNS/Reticulum.py b/RNS/Reticulum.py index 321ce153..720a5cae 100755 --- a/RNS/Reticulum.py +++ b/RNS/Reticulum.py @@ -48,6 +48,7 @@ else: from RNS.vendor.configobj import ConfigObj from threading import Lock +import RNS.vendor.umsgpack as mp import configparser import multiprocessing.connection import importlib.util @@ -1179,60 +1180,63 @@ class Reticulum: os.makedirs(Reticulum.configdir) self.config.write() + def rpc_return(self, connection, response): + connection.send_bytes(mp.packb(response)) + def rpc_loop(self): while RNS.Transport._should_run: try: - rpc_connection = self.rpc_listener.accept() - call = rpc_connection.recv() + conn = self.rpc_listener.accept() + call = mp.unpackb(conn.recv_bytes()) if "get" in call: path = call["get"] if path == "path_table": mh = call["max_hops"] - rpc_connection.send(self.get_path_table(max_hops=mh)) + self.rpc_return(conn, self.get_path_table(max_hops=mh)) - if path == "interface_stats": rpc_connection.send(self.get_interface_stats()) - if path == "rate_table": rpc_connection.send(self.get_rate_table()) - if path == "next_hop_if_name": rpc_connection.send(self.get_next_hop_if_name(call["destination_hash"])) - if path == "next_hop": rpc_connection.send(self.get_next_hop(call["destination_hash"])) - if path == "first_hop_timeout": rpc_connection.send(self.get_first_hop_timeout(call["destination_hash"])) - if path == "link_count": rpc_connection.send(self.get_link_count()) - if path == "packet_rssi": rpc_connection.send(self.get_packet_rssi(call["packet_hash"])) - if path == "packet_snr": rpc_connection.send(self.get_packet_snr(call["packet_hash"])) - if path == "packet_q": rpc_connection.send(self.get_packet_q(call["packet_hash"])) - if path == "blackholed_identities": rpc_connection.send(self.get_blackholed_identities()) - if path == "is_blackholed": rpc_connection.send(self.is_blackholed(call["identity_hash"])) + if path == "interface_stats": self.rpc_return(conn, self.get_interface_stats()) + if path == "rate_table": self.rpc_return(conn, self.get_rate_table()) + if path == "next_hop_if_name": self.rpc_return(conn, self.get_next_hop_if_name(call["destination_hash"])) + if path == "next_hop": self.rpc_return(conn, self.get_next_hop(call["destination_hash"])) + if path == "first_hop_timeout": self.rpc_return(conn, self.get_first_hop_timeout(call["destination_hash"])) + if path == "link_count": self.rpc_return(conn, self.get_link_count()) + if path == "packet_rssi": self.rpc_return(conn, self.get_packet_rssi(call["packet_hash"])) + if path == "packet_snr": self.rpc_return(conn, self.get_packet_snr(call["packet_hash"])) + if path == "packet_q": self.rpc_return(conn, self.get_packet_q(call["packet_hash"])) + if path == "blackholed_identities": self.rpc_return(conn, self.get_blackholed_identities()) + if path == "is_blackholed": self.rpc_return(conn, self.is_blackholed(call["identity_hash"])) if "drop" in call: path = call["drop"] - if path == "path": rpc_connection.send(self.drop_path(call["destination_hash"])) - if path == "all_via": rpc_connection.send(self.drop_all_via(call["destination_hash"])) - if path == "announce_queues": rpc_connection.send(self.drop_announce_queues()) + if path == "path": self.rpc_return(conn, self.drop_path(call["destination_hash"])) + if path == "all_via": self.rpc_return(conn, self.drop_all_via(call["destination_hash"])) + if path == "announce_queues": self.rpc_return(conn, self.drop_announce_queues()) if "blackhole_identity" in call: identity_hash = call["blackhole_identity"] until = call["until"] reason = call["reason"] - rpc_connection.send(self.blackhole_identity(identity_hash, until=until, reason=reason)) + self.rpc_return(conn, self.blackhole_identity(identity_hash, until=until, reason=reason)) if "unblackhole_identity" in call: identity_hash = call["unblackhole_identity"] - rpc_connection.send(self.unblackhole_identity(identity_hash)) + self.rpc_return(conn, self.unblackhole_identity(identity_hash)) if "destination_data" in call: operation = call["destination_data"] destination_hash = call["destination_hash"] - if operation == "used": rpc_connection.send(self._used_destination_data(destination_hash)) - elif operation == "retain": rpc_connection.send(self._retain_destination_data(destination_hash)) - elif operation == "unretain": rpc_connection.send(self._unretain_destination_data(destination_hash)) + if operation == "used": self.rpc_return(conn, self._used_destination_data(destination_hash)) + elif operation == "retain": self.rpc_return(conn, self._retain_destination_data(destination_hash)) + elif operation == "unretain": self.rpc_return(conn, self._unretain_destination_data(destination_hash)) if "identity_data" in call: operation = call["identity_data"] identity_hash = call["identity_hash"] - if operation == "retain": rpc_connection.send(self._retain_identity(identity_hash)) + if operation == "retain": self.rpc_return(conn, self._retain_identity(identity_hash)) - rpc_connection.close() + conn.close() except Exception as e: RNS.log("An error ocurred while handling RPC call from local client: "+str(e), RNS.LOG_ERROR) @@ -1243,8 +1247,8 @@ class Reticulum: if self.is_connected_to_shared_instance: try: rpc_connection = self.get_rpc_client() - rpc_connection.send({"destination_data": "used", "destination_hash": destination_hash}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"destination_data": "used", "destination_hash": destination_hash})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response except Exception as e: @@ -1257,8 +1261,8 @@ class Reticulum: if self.is_connected_to_shared_instance: try: rpc_connection = self.get_rpc_client() - rpc_connection.send({"destination_data": "retain", "destination_hash": destination_hash}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"destination_data": "retain", "destination_hash": destination_hash})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response except Exception as e: @@ -1271,8 +1275,8 @@ class Reticulum: if self.is_connected_to_shared_instance: try: rpc_connection = self.get_rpc_client() - rpc_connection.send({"destination_data": "unretain", "destination_hash": destination_hash}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"destination_data": "unretain", "destination_hash": destination_hash})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response except Exception as e: @@ -1288,8 +1292,8 @@ class Reticulum: if self.is_connected_to_shared_instance: try: rpc_connection = self.get_rpc_client() - rpc_connection.send({"identity_data": "retain", "identity_hash": identity_hash}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"identity_data": "retain", "identity_hash": identity_hash})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response except Exception as e: @@ -1301,8 +1305,8 @@ class Reticulum: def get_interface_stats(self): if self.is_connected_to_shared_instance: rpc_connection = self.get_rpc_client() - rpc_connection.send({"get": "interface_stats"}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"get": "interface_stats"})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response else: interfaces = [] @@ -1491,8 +1495,8 @@ class Reticulum: def get_path_table(self, max_hops=None): if self.is_connected_to_shared_instance: rpc_connection = self.get_rpc_client() - rpc_connection.send({"get": "path_table", "max_hops": max_hops}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"get": "path_table", "max_hops": max_hops})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response else: @@ -1515,8 +1519,8 @@ class Reticulum: def get_rate_table(self): if self.is_connected_to_shared_instance: rpc_connection = self.get_rpc_client() - rpc_connection.send({"get": "rate_table"}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"get": "rate_table"})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response else: @@ -1536,8 +1540,8 @@ class Reticulum: def drop_path(self, destination): if self.is_connected_to_shared_instance: rpc_connection = self.get_rpc_client() - rpc_connection.send({"drop": "path", "destination_hash": destination}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"drop": "path", "destination_hash": destination})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response else: @@ -1546,8 +1550,8 @@ class Reticulum: def drop_all_via(self, transport_hash): if self.is_connected_to_shared_instance: rpc_connection = self.get_rpc_client() - rpc_connection.send({"drop": "all_via", "destination_hash": transport_hash}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"drop": "all_via", "destination_hash": transport_hash})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response else: @@ -1562,8 +1566,8 @@ class Reticulum: def drop_announce_queues(self): if self.is_connected_to_shared_instance: rpc_connection = self.get_rpc_client() - rpc_connection.send({"drop": "announce_queues"}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"drop": "announce_queues"})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response else: @@ -1572,8 +1576,8 @@ class Reticulum: def get_next_hop_if_name(self, destination): if self.is_connected_to_shared_instance: rpc_connection = self.get_rpc_client() - rpc_connection.send({"get": "next_hop_if_name", "destination_hash": destination}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"get": "next_hop_if_name", "destination_hash": destination})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response else: @@ -1583,8 +1587,8 @@ class Reticulum: if self.is_connected_to_shared_instance: try: rpc_connection = self.get_rpc_client() - rpc_connection.send({"get": "first_hop_timeout", "destination_hash": destination}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"get": "first_hop_timeout", "destination_hash": destination})) + response = mp.unpackb(rpc_connection.recv_bytes()) if self.is_connected_to_shared_instance and hasattr(self, "_force_shared_instance_bitrate") and self._force_shared_instance_bitrate: simulated_latency = ((1/self._force_shared_instance_bitrate)*8)*RNS.Reticulum.MTU @@ -1602,8 +1606,8 @@ class Reticulum: def get_next_hop(self, destination): if self.is_connected_to_shared_instance: rpc_connection = self.get_rpc_client() - rpc_connection.send({"get": "next_hop", "destination_hash": destination}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"get": "next_hop", "destination_hash": destination})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response @@ -1613,8 +1617,8 @@ class Reticulum: def get_link_count(self): if self.is_connected_to_shared_instance: rpc_connection = self.get_rpc_client() - rpc_connection.send({"get": "link_count"}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"get": "link_count"})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response else: @@ -1623,8 +1627,8 @@ class Reticulum: def get_packet_rssi(self, packet_hash): if self.is_connected_to_shared_instance: rpc_connection = self.get_rpc_client() - rpc_connection.send({"get": "packet_rssi", "packet_hash": packet_hash}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"get": "packet_rssi", "packet_hash": packet_hash})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response else: @@ -1637,8 +1641,8 @@ class Reticulum: def get_packet_snr(self, packet_hash): if self.is_connected_to_shared_instance: rpc_connection = self.get_rpc_client() - rpc_connection.send({"get": "packet_snr", "packet_hash": packet_hash}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"get": "packet_snr", "packet_hash": packet_hash})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response else: @@ -1651,8 +1655,8 @@ class Reticulum: def get_packet_q(self, packet_hash): if self.is_connected_to_shared_instance: rpc_connection = self.get_rpc_client() - rpc_connection.send({"get": "packet_q", "packet_hash": packet_hash}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"get": "packet_q", "packet_hash": packet_hash})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response else: @@ -1674,8 +1678,8 @@ class Reticulum: def get_blackholed_identities(self): if self.is_connected_to_shared_instance: rpc_connection = self.get_rpc_client() - rpc_connection.send({"get": "blackholed_identities"}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"get": "blackholed_identities"})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response else: return RNS.Transport.blackholed_identities @@ -1688,8 +1692,8 @@ class Reticulum: if self.is_connected_to_shared_instance: rpc_connection = self.get_rpc_client() - rpc_connection.send({"get": "is_blackholed", "identity_hash": identity_hash}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"get": "is_blackholed", "identity_hash": identity_hash})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response else: return identity_hash in RNS.Transport.blackholed_identities @@ -1699,8 +1703,8 @@ class Reticulum: else: if self.is_connected_to_shared_instance: rpc_connection = self.get_rpc_client() - rpc_connection.send({"blackhole_identity": identity_hash, "until": until, "reason": reason}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"blackhole_identity": identity_hash, "until": until, "reason": reason})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response else: return RNS.Transport.blackhole_identity(identity_hash, until=until, reason=reason) @@ -1710,8 +1714,8 @@ class Reticulum: else: if self.is_connected_to_shared_instance: rpc_connection = self.get_rpc_client() - rpc_connection.send({"unblackhole_identity": identity_hash}) - response = rpc_connection.recv() + rpc_connection.send_bytes(mp.packb({"unblackhole_identity": identity_hash})) + response = mp.unpackb(rpc_connection.recv_bytes()) return response else: return RNS.Transport.unblackhole_identity(identity_hash)