Use msgpack for shared instance RPC

This commit is contained in:
Mark Qvist
2026-05-29 07:30:41 +02:00
parent 5b3bb050e7
commit a2ef978208
+70 -66
View File
@@ -48,6 +48,7 @@ else:
from RNS.vendor.configobj import ConfigObj
from threading import Lock
import RNS.vendor.umsgpack as mp
import configparser
import multiprocessing.connection
import importlib.util
@@ -1179,60 +1180,63 @@ class Reticulum:
os.makedirs(Reticulum.configdir)
self.config.write()
def rpc_return(self, connection, response):
connection.send_bytes(mp.packb(response))
def rpc_loop(self):
while RNS.Transport._should_run:
try:
rpc_connection = self.rpc_listener.accept()
call = rpc_connection.recv()
conn = self.rpc_listener.accept()
call = mp.unpackb(conn.recv_bytes())
if "get" in call:
path = call["get"]
if path == "path_table":
mh = call["max_hops"]
rpc_connection.send(self.get_path_table(max_hops=mh))
self.rpc_return(conn, self.get_path_table(max_hops=mh))
if path == "interface_stats": rpc_connection.send(self.get_interface_stats())
if path == "rate_table": rpc_connection.send(self.get_rate_table())
if path == "next_hop_if_name": rpc_connection.send(self.get_next_hop_if_name(call["destination_hash"]))
if path == "next_hop": rpc_connection.send(self.get_next_hop(call["destination_hash"]))
if path == "first_hop_timeout": rpc_connection.send(self.get_first_hop_timeout(call["destination_hash"]))
if path == "link_count": rpc_connection.send(self.get_link_count())
if path == "packet_rssi": rpc_connection.send(self.get_packet_rssi(call["packet_hash"]))
if path == "packet_snr": rpc_connection.send(self.get_packet_snr(call["packet_hash"]))
if path == "packet_q": rpc_connection.send(self.get_packet_q(call["packet_hash"]))
if path == "blackholed_identities": rpc_connection.send(self.get_blackholed_identities())
if path == "is_blackholed": rpc_connection.send(self.is_blackholed(call["identity_hash"]))
if path == "interface_stats": self.rpc_return(conn, self.get_interface_stats())
if path == "rate_table": self.rpc_return(conn, self.get_rate_table())
if path == "next_hop_if_name": self.rpc_return(conn, self.get_next_hop_if_name(call["destination_hash"]))
if path == "next_hop": self.rpc_return(conn, self.get_next_hop(call["destination_hash"]))
if path == "first_hop_timeout": self.rpc_return(conn, self.get_first_hop_timeout(call["destination_hash"]))
if path == "link_count": self.rpc_return(conn, self.get_link_count())
if path == "packet_rssi": self.rpc_return(conn, self.get_packet_rssi(call["packet_hash"]))
if path == "packet_snr": self.rpc_return(conn, self.get_packet_snr(call["packet_hash"]))
if path == "packet_q": self.rpc_return(conn, self.get_packet_q(call["packet_hash"]))
if path == "blackholed_identities": self.rpc_return(conn, self.get_blackholed_identities())
if path == "is_blackholed": self.rpc_return(conn, self.is_blackholed(call["identity_hash"]))
if "drop" in call:
path = call["drop"]
if path == "path": rpc_connection.send(self.drop_path(call["destination_hash"]))
if path == "all_via": rpc_connection.send(self.drop_all_via(call["destination_hash"]))
if path == "announce_queues": rpc_connection.send(self.drop_announce_queues())
if path == "path": self.rpc_return(conn, self.drop_path(call["destination_hash"]))
if path == "all_via": self.rpc_return(conn, self.drop_all_via(call["destination_hash"]))
if path == "announce_queues": self.rpc_return(conn, self.drop_announce_queues())
if "blackhole_identity" in call:
identity_hash = call["blackhole_identity"]
until = call["until"]
reason = call["reason"]
rpc_connection.send(self.blackhole_identity(identity_hash, until=until, reason=reason))
self.rpc_return(conn, self.blackhole_identity(identity_hash, until=until, reason=reason))
if "unblackhole_identity" in call:
identity_hash = call["unblackhole_identity"]
rpc_connection.send(self.unblackhole_identity(identity_hash))
self.rpc_return(conn, self.unblackhole_identity(identity_hash))
if "destination_data" in call:
operation = call["destination_data"]
destination_hash = call["destination_hash"]
if operation == "used": rpc_connection.send(self._used_destination_data(destination_hash))
elif operation == "retain": rpc_connection.send(self._retain_destination_data(destination_hash))
elif operation == "unretain": rpc_connection.send(self._unretain_destination_data(destination_hash))
if operation == "used": self.rpc_return(conn, self._used_destination_data(destination_hash))
elif operation == "retain": self.rpc_return(conn, self._retain_destination_data(destination_hash))
elif operation == "unretain": self.rpc_return(conn, self._unretain_destination_data(destination_hash))
if "identity_data" in call:
operation = call["identity_data"]
identity_hash = call["identity_hash"]
if operation == "retain": rpc_connection.send(self._retain_identity(identity_hash))
if operation == "retain": self.rpc_return(conn, self._retain_identity(identity_hash))
rpc_connection.close()
conn.close()
except Exception as e:
RNS.log("An error ocurred while handling RPC call from local client: "+str(e), RNS.LOG_ERROR)
@@ -1243,8 +1247,8 @@ class Reticulum:
if self.is_connected_to_shared_instance:
try:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"destination_data": "used", "destination_hash": destination_hash})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"destination_data": "used", "destination_hash": destination_hash}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
except Exception as e:
@@ -1257,8 +1261,8 @@ class Reticulum:
if self.is_connected_to_shared_instance:
try:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"destination_data": "retain", "destination_hash": destination_hash})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"destination_data": "retain", "destination_hash": destination_hash}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
except Exception as e:
@@ -1271,8 +1275,8 @@ class Reticulum:
if self.is_connected_to_shared_instance:
try:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"destination_data": "unretain", "destination_hash": destination_hash})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"destination_data": "unretain", "destination_hash": destination_hash}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
except Exception as e:
@@ -1288,8 +1292,8 @@ class Reticulum:
if self.is_connected_to_shared_instance:
try:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"identity_data": "retain", "identity_hash": identity_hash})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"identity_data": "retain", "identity_hash": identity_hash}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
except Exception as e:
@@ -1301,8 +1305,8 @@ class Reticulum:
def get_interface_stats(self):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"get": "interface_stats"})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"get": "interface_stats"}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
else:
interfaces = []
@@ -1491,8 +1495,8 @@ class Reticulum:
def get_path_table(self, max_hops=None):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"get": "path_table", "max_hops": max_hops})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"get": "path_table", "max_hops": max_hops}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
else:
@@ -1515,8 +1519,8 @@ class Reticulum:
def get_rate_table(self):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"get": "rate_table"})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"get": "rate_table"}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
else:
@@ -1536,8 +1540,8 @@ class Reticulum:
def drop_path(self, destination):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"drop": "path", "destination_hash": destination})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"drop": "path", "destination_hash": destination}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
else:
@@ -1546,8 +1550,8 @@ class Reticulum:
def drop_all_via(self, transport_hash):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"drop": "all_via", "destination_hash": transport_hash})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"drop": "all_via", "destination_hash": transport_hash}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
else:
@@ -1562,8 +1566,8 @@ class Reticulum:
def drop_announce_queues(self):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"drop": "announce_queues"})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"drop": "announce_queues"}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
else:
@@ -1572,8 +1576,8 @@ class Reticulum:
def get_next_hop_if_name(self, destination):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"get": "next_hop_if_name", "destination_hash": destination})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"get": "next_hop_if_name", "destination_hash": destination}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
else:
@@ -1583,8 +1587,8 @@ class Reticulum:
if self.is_connected_to_shared_instance:
try:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"get": "first_hop_timeout", "destination_hash": destination})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"get": "first_hop_timeout", "destination_hash": destination}))
response = mp.unpackb(rpc_connection.recv_bytes())
if self.is_connected_to_shared_instance and hasattr(self, "_force_shared_instance_bitrate") and self._force_shared_instance_bitrate:
simulated_latency = ((1/self._force_shared_instance_bitrate)*8)*RNS.Reticulum.MTU
@@ -1602,8 +1606,8 @@ class Reticulum:
def get_next_hop(self, destination):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"get": "next_hop", "destination_hash": destination})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"get": "next_hop", "destination_hash": destination}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
@@ -1613,8 +1617,8 @@ class Reticulum:
def get_link_count(self):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"get": "link_count"})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"get": "link_count"}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
else:
@@ -1623,8 +1627,8 @@ class Reticulum:
def get_packet_rssi(self, packet_hash):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"get": "packet_rssi", "packet_hash": packet_hash})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"get": "packet_rssi", "packet_hash": packet_hash}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
else:
@@ -1637,8 +1641,8 @@ class Reticulum:
def get_packet_snr(self, packet_hash):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"get": "packet_snr", "packet_hash": packet_hash})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"get": "packet_snr", "packet_hash": packet_hash}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
else:
@@ -1651,8 +1655,8 @@ class Reticulum:
def get_packet_q(self, packet_hash):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"get": "packet_q", "packet_hash": packet_hash})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"get": "packet_q", "packet_hash": packet_hash}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
else:
@@ -1674,8 +1678,8 @@ class Reticulum:
def get_blackholed_identities(self):
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"get": "blackholed_identities"})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"get": "blackholed_identities"}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
else: return RNS.Transport.blackholed_identities
@@ -1688,8 +1692,8 @@ class Reticulum:
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"get": "is_blackholed", "identity_hash": identity_hash})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"get": "is_blackholed", "identity_hash": identity_hash}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
else: return identity_hash in RNS.Transport.blackholed_identities
@@ -1699,8 +1703,8 @@ class Reticulum:
else:
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"blackhole_identity": identity_hash, "until": until, "reason": reason})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"blackhole_identity": identity_hash, "until": until, "reason": reason}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
else: return RNS.Transport.blackhole_identity(identity_hash, until=until, reason=reason)
@@ -1710,8 +1714,8 @@ class Reticulum:
else:
if self.is_connected_to_shared_instance:
rpc_connection = self.get_rpc_client()
rpc_connection.send({"unblackhole_identity": identity_hash})
response = rpc_connection.recv()
rpc_connection.send_bytes(mp.packb({"unblackhole_identity": identity_hash}))
response = mp.unpackb(rpc_connection.recv_bytes())
return response
else: return RNS.Transport.unblackhole_identity(identity_hash)