This commit is contained in:
Mark Qvist
2025-10-28 10:59:58 +01:00
parent 6d47b59b1e
commit 0bcb4b8573
+110 -103
View File
@@ -68,7 +68,8 @@ class WDCL():
self.should_run = True
self.receiver = None
self.frame_queue = deque()
self.id = RNS.Identity.full_hash(port.hwid.encode("utf-8"))
if not self.as_interface:
self.id = RNS.Identity.full_hash(port.hwid.encode("utf-8"))
if self.as_interface:
try:
@@ -97,10 +98,15 @@ class WDCL():
def open_port(self):
if self.as_interface: RNS.log(f"Opening serial port {self.port.device}...", RNS.LOG_VERBOSE)
else: self.owner.wlog(f"Opening serial port {self.port.device}...")
if self.as_interface:
RNS.log(f"Opening serial port {self.port}...", RNS.LOG_VERBOSE)
target_port = self.port
else:
self.owner.wlog(f"Opening serial port {self.port.device}...")
target_port = self.port.device
self.serial = self.pyserial.Serial(
port = self.port.device,
port = target_port,
baudrate = self.speed,
bytesize = self.databits,
parity = self.parity,
@@ -116,7 +122,7 @@ class WDCL():
self.should_run = False
if self.serial:
self.serial.close()
if self.as_interface: RNS.LOG((f"Closed serial port {str(self.port.device)} for {str(self)}"), RNS.LOG_VERBOSE)
if self.as_interface: RNS.LOG((f"Closed serial port {str(self.port)} for {str(self)}"), RNS.LOG_VERBOSE)
else: self.owner.wlog(f"Closed serial port {str(self.port.device)} for {str(self)}")
def configure_device(self):
@@ -124,7 +130,7 @@ class WDCL():
thread.daemon = True
thread.start()
self.online = True
if self.as_interface: RNS.log(f"Serial port {self.port.device} is now open, discovering remote device...", RNS.LOG_VERBOSE)
if self.as_interface: RNS.log(f"Serial port {self.port} is now open, discovering remote device...", RNS.LOG_VERBOSE)
else: self.owner.wlog("Serial port "+self.port.device+" is now open")
self.device.discover()
@@ -188,7 +194,7 @@ class WDCL():
while not self.online:
try:
time.sleep(5)
if self.as_interface: RNS.log("Attempting to reconnect serial port "+str(self.port.device)+" for "+str(self)+"...", RNS.LOG_INFO)
if self.as_interface: RNS.log("Attempting to reconnect serial port "+str(self.port)+" for "+str(self)+"...", RNS.LOG_INFO)
else: self.owner.wlog("Attempting to reconnect serial port "+str(self.port.device)+" for "+str(self)+"...")
self.open_port()
if self.serial.is_open: self.configure_device()
@@ -196,13 +202,16 @@ class WDCL():
if self.as_interface: RNS.log("Error while reconnecting port, the contained exception was: "+str(e), RNS.LOG_ERROR)
else: self.owner.wlog("Error while reconnecting port, the contained exception was: "+str(e))
if self.as_interface: RNS.log("Reconnected serial port for "+str(self), RNS.LOG_IN)
if self.as_interface: RNS.log("Reconnected serial port for "+str(self), RNS.LOG_INFO)
else: self.owner.wlog("Reconnected serial port for "+str(self))
def __str__(self):
if self.port.serial_number: sn_str = f" {self.port.serial_number}"
else: sn_str = ""
return f"{self.port.product}{sn_str} (USB)"
if self.as_interface:
return self.port
else:
if self.port.serial_number: sn_str = f" {self.port.serial_number}"
else: sn_str = ""
return f"{self.port.product}{sn_str} (USB)"
class Cmd():
WDCL_CMD_ENDPOINT_PKT = 0x0001
@@ -248,6 +257,8 @@ class Evt():
ET_PROTO_WDCL_CONNECTION = 0x3002
ET_PROTO_WEAVE_INIT = 0x3100
ET_PROTO_WEAVE_RUNNING = 0x3101
ET_PROTO_WEAVE_EP_ALIVE = 0x3102
ET_PROTO_WEAVE_EP_TIMEOUT = 0x3103
ET_SRVCTL_REMOTE_DISPLAY = 0xA000
ET_INTERFACE_REGISTERED = 0xD000
ET_STAT_STATE = 0xE000
@@ -311,6 +322,8 @@ class Evt():
ET_PROTO_WDCL_CONNECTION: "WDCL host connection",
ET_PROTO_WEAVE_INIT: "Weave protocol initialization",
ET_PROTO_WEAVE_RUNNING: "Weave protocol activation",
ET_PROTO_WEAVE_EP_ALIVE: "Weave endpoint appeared",
ET_PROTO_WEAVE_EP_TIMEOUT: "Weave endpoint disappeared",
ET_SRVCTL_REMOTE_DISPLAY: "Remote display service control event",
ET_INTERFACE_REGISTERED: "Interface registration",
ET_SYSERR_MEM_EXHAUSTED: "System memory exhausted",
@@ -436,11 +449,12 @@ class WeaveDevice():
WEAVE_PRVKEY_SIZE = 64
WEAVE_SIGNATURE_LEN = 64
def __init__(self, as_interface=False):
def __init__(self, as_interface=False, rns_interface=None):
self.identity = None
self.receiver = None
self.switch_id = None
self.owner = None
self.rns_interface = rns_interface
self.as_interface = as_interface
self.endpoints = {}
self.active_tasks = {}
@@ -547,18 +561,22 @@ class WeaveDevice():
if not endpoint_id in self.endpoints: self.endpoints[endpoint_id] = WeaveEndpoint(endpoint_id)
else: self.endpoints[endpoint_id].alive = time.time()
if self.as_interface: self.rns_interface.add_peer(endpoint_id)
def deliver_packet(self, endpoint_id, data):
self.wdcl_send_command(Cmd.WDCL_CMD_ENDPOINT_PKT, endpoint_id+data)
packet_data = endpoint_id+data
self.wdcl_send_command(Cmd.WDCL_CMD_ENDPOINT_PKT, packet_data)
def received_packet(self, source, data):
self.endpoint_alive(source)
self.endpoints[source].receive(data)
if self.as_interface:
self.rns_interface.process_incoming(data, source)
def incoming_frame(self, data):
if len(data) > self.WEAVE_SWITCH_ID_LEN+2 and data[self.WEAVE_SWITCH_ID_LEN] == WDCL.WDCL_T_ENDPOINT_PKT and data[:self.WEAVE_SWITCH_ID_LEN] == self.connection.switch_id:
payload = data[self.WEAVE_SWITCH_ID_LEN+1:-self.WEAVE_ENDPOINT_ID_LEN]
src_endpoint = data[-self.WEAVE_ENDPOINT_ID_LEN:]
self.received_packet(src_endpoint, data)
self.received_packet(src_endpoint, payload)
elif len(data) > self.WEAVE_SWITCH_ID_LEN+1 and data[self.WEAVE_SWITCH_ID_LEN] == WDCL.WDCL_T_DISCOVER:
discovery_response_len = self.WEAVE_SWITCH_ID_LEN+1+self.WEAVE_PUBKEY_SIZE+self.WEAVE_SIGNATURE_LEN
@@ -602,7 +620,9 @@ class WeaveDevice():
def log_handle(self, frame):
# Handle system event signalling
if frame.event == Evt.ET_STAT_TASK_CPU: self.active_tasks[frame.data[1:].decode("utf-8")] = { "cpu_load": frame.data[0], "timestamp": time.time() }
if frame.event == Evt.ET_PROTO_WEAVE_EP_ALIVE and len(frame.data) == 8:
self.endpoint_alive(frame.data)
elif frame.event == Evt.ET_STAT_TASK_CPU: self.active_tasks[frame.data[1:].decode("utf-8")] = { "cpu_load": frame.data[0], "timestamp": time.time() }
elif frame.event == Evt.ET_STAT_CPU:
self.cpu_load = frame.data[0]
self.capture_stats_cpu()
@@ -617,7 +637,7 @@ class WeaveDevice():
else:
ts = RNS.prettytime(frame.timestamp)
if frame.event == Evt.ET_MSG:
if len(frame.data): data_string = frame.data.decode("utf-8")
if len(frame.data): data_string = f"{frame.data.decode("utf-8")}"
else: data_string = ""
rendered = f"[{ts}] [{Evt.level(frame.level)}]: {data_string}"
@@ -655,20 +675,21 @@ class WeaveDevice():
rendered = f"[{ts}] [{Evt.level(frame.level)}] [{event_description}]{data_string}"
if self.receiver and self.receiver.ready:
while len(self.log_queue): self.receiver.log(self.log_queue.pop())
self.receiver.log(rendered)
else: self.log_queue.append(rendered)
if self.as_interface:
RNS.log(f"{self.rns_interface}: {rendered}", RNS.LOG_EXTREME)
else:
if self.receiver and self.receiver.ready:
while len(self.log_queue): self.receiver.log(self.log_queue.pop())
self.receiver.log(rendered)
else: self.log_queue.append(rendered)
class WeaveInterface(Interface):
HW_MTU = 1196
HW_MTU = 1024
FIXED_MTU = True
DEFAULT_IFAC_SIZE = 16
PEERING_TIMEOUT = 20.0
BITRATE_GUESS = 1*1000*1000
BITRATE_GUESS = 500*1000
MULTI_IF_DEQUE_LEN = 48
MULTI_IF_DEQUE_TTL = 0.75
@@ -683,13 +704,14 @@ class WeaveInterface(Interface):
super().__init__()
self.netinfo = netinfo
self.HW_MTU = AutoInterface.HW_MTU
self.HW_MTU = WeaveInterface.HW_MTU
self.IN = True
self.OUT = False
self.name = name
self.port = port
self.device = WeaveDevice(as_interface=True)
self.connection = WDCL(owner=None, device=self.device, port=self.port, as_interface=True)
self.switch_identity = RNS.Identity()
self.device = WeaveDevice(as_interface=True, rns_interface=self)
self.connection = WDCL(owner=self, device=self.device, port=self.port, as_interface=True)
self.owner = owner
self.online = False
self.final_init_done = False
@@ -697,8 +719,8 @@ class WeaveInterface(Interface):
self.timed_out_interfaces = {}
self.spawned_interfaces = {}
self.write_lock = threading.Lock()
self.mif_deque = deque(maxlen=AutoInterface.MULTI_IF_DEQUE_LEN)
self.mif_deque_times = deque(maxlen=AutoInterface.MULTI_IF_DEQUE_LEN)
self.mif_deque = deque(maxlen=WeaveInterface.MULTI_IF_DEQUE_LEN)
self.mif_deque_times = deque(maxlen=WeaveInterface.MULTI_IF_DEQUE_LEN)
self.announce_rate_target = None
self.peer_job_interval = WeaveInterface.PEERING_TIMEOUT*1.1
@@ -715,8 +737,6 @@ class WeaveInterface(Interface):
job_thread.daemon = True
job_thread.start()
time.sleep(peering_wait)
self.online = True
self.final_init_done = True
@@ -746,74 +766,62 @@ class WeaveInterface(Interface):
def peer_count(self):
return len(self.spawned_interfaces)
def add_peer(self, addr, ifname):
if addr in self.link_local_addresses:
ifname = None
for interface_name in self.adopted_interfaces:
if self.adopted_interfaces[interface_name] == addr:
ifname = interface_name
def add_peer(self, endpoint_addr):
if not endpoint_addr in self.peers:
self.peers[endpoint_addr] = [endpoint_addr, time.time()]
if ifname != None:
self.multicast_echoes[ifname] = time.time()
else:
RNS.log(str(self)+" received multicast echo on unexpected interface "+str(ifname), RNS.LOG_WARNING)
spawned_interface = WeaveInterfacePeer(self, endpoint_addr)
spawned_interface.OUT = self.OUT
spawned_interface.IN = self.IN
spawned_interface.parent_interface = self
spawned_interface.bitrate = self.bitrate
spawned_interface.ifac_size = self.ifac_size
spawned_interface.ifac_netname = self.ifac_netname
spawned_interface.ifac_netkey = self.ifac_netkey
if spawned_interface.ifac_netname != None or spawned_interface.ifac_netkey != None:
ifac_origin = b""
if spawned_interface.ifac_netname != None:
ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netname.encode("utf-8"))
if spawned_interface.ifac_netkey != None:
ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netkey.encode("utf-8"))
ifac_origin_hash = RNS.Identity.full_hash(ifac_origin)
spawned_interface.ifac_key = RNS.Cryptography.hkdf(
length=64,
derive_from=ifac_origin_hash,
salt=RNS.Reticulum.IFAC_SALT,
context=None
)
spawned_interface.ifac_identity = RNS.Identity.from_bytes(spawned_interface.ifac_key)
spawned_interface.ifac_signature = spawned_interface.ifac_identity.sign(RNS.Identity.full_hash(spawned_interface.ifac_key))
spawned_interface.announce_rate_target = self.announce_rate_target
spawned_interface.announce_rate_grace = self.announce_rate_grace
spawned_interface.announce_rate_penalty = self.announce_rate_penalty
spawned_interface.mode = self.mode
spawned_interface.HW_MTU = self.HW_MTU
spawned_interface.online = True
RNS.Transport.interfaces.append(spawned_interface)
if endpoint_addr in self.spawned_interfaces:
self.spawned_interfaces[endpoint_addr].detach()
self.spawned_interfaces[endpoint_addr].teardown()
self.spawned_interfaces.pop(spawned_interface)
self.spawned_interfaces[endpoint_addr] = spawned_interface
RNS.log(f"{self} added peer {RNS.hexrep(endpoint_addr)}", RNS.LOG_DEBUG)
else:
if not addr in self.peers:
self.peers[addr] = [ifname, time.time()]
self.refresh_peer(endpoint_addr)
spawned_interface = AutoInterfacePeer(self, addr, ifname)
spawned_interface.OUT = self.OUT
spawned_interface.IN = self.IN
spawned_interface.parent_interface = self
spawned_interface.bitrate = self.bitrate
spawned_interface.ifac_size = self.ifac_size
spawned_interface.ifac_netname = self.ifac_netname
spawned_interface.ifac_netkey = self.ifac_netkey
if spawned_interface.ifac_netname != None or spawned_interface.ifac_netkey != None:
ifac_origin = b""
if spawned_interface.ifac_netname != None:
ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netname.encode("utf-8"))
if spawned_interface.ifac_netkey != None:
ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netkey.encode("utf-8"))
ifac_origin_hash = RNS.Identity.full_hash(ifac_origin)
spawned_interface.ifac_key = RNS.Cryptography.hkdf(
length=64,
derive_from=ifac_origin_hash,
salt=RNS.Reticulum.IFAC_SALT,
context=None
)
spawned_interface.ifac_identity = RNS.Identity.from_bytes(spawned_interface.ifac_key)
spawned_interface.ifac_signature = spawned_interface.ifac_identity.sign(RNS.Identity.full_hash(spawned_interface.ifac_key))
spawned_interface.announce_rate_target = self.announce_rate_target
spawned_interface.announce_rate_grace = self.announce_rate_grace
spawned_interface.announce_rate_penalty = self.announce_rate_penalty
spawned_interface.mode = self.mode
spawned_interface.HW_MTU = self.HW_MTU
spawned_interface.online = True
RNS.Transport.interfaces.append(spawned_interface)
if addr in self.spawned_interfaces:
self.spawned_interfaces[addr].detach()
self.spawned_interfaces[addr].teardown()
self.spawned_interfaces.pop(spawned_interface)
self.spawned_interfaces[addr] = spawned_interface
RNS.log(str(self)+" added peer "+str(addr)+" on "+str(ifname), RNS.LOG_DEBUG)
else:
self.refresh_peer(addr)
def refresh_peer(self, addr):
def refresh_peer(self, endpoint_addr):
try:
self.peers[addr][1] = time.time()
self.peers[endpoint_addr][1] = time.time()
except Exception as e:
RNS.log(f"An error occurred while refreshing peer {RNS.hexrep(addr)} on {self}: {e}", RNS.LOG_ERROR)
RNS.log(f"An error occurred while refreshing peer {RNS.hexrep(endpoint_addr)} on {self}: {e}", RNS.LOG_ERROR)
def process_incoming(self, data, addr=None):
if self.online and addr in self.spawned_interfaces:
self.spawned_interfaces[addr].process_incoming(data, addr)
def process_incoming(self, data, endpoint_addr=None):
if self.online and endpoint_addr in self.spawned_interfaces:
self.spawned_interfaces[endpoint_addr].process_incoming(data, endpoint_addr)
def process_outgoing(self,data):
pass
@@ -829,32 +837,31 @@ class WeaveInterface(Interface):
class WeaveInterfacePeer(Interface):
def __init__(self, owner, addr, ifname):
def __init__(self, owner, endpoint_addr):
super().__init__()
self.owner = owner
self.parent_interface = owner
self.addr = addr
self.ifname = ifname
self.endpoint_addr = endpoint_addr
self.peer_addr = None
self.addr_info = None
self.HW_MTU = self.owner.HW_MTU
self.FIXED_MTU = self.owner.FIXED_MTU
def __str__(self):
return f"WeaveInterfacePeer[{self.ifname}/{self.addr}]"
return f"WeaveInterfacePeer[{RNS.hexrep(self.endpoint_addr)}]"
def process_incoming(self, data, addr=None):
def process_incoming(self, data, endpoint_addr=None):
if self.online and self.owner.online:
data_hash = RNS.Identity.full_hash(data)
deque_hit = False
if data_hash in self.owner.mif_deque:
for te in self.owner.mif_deque_times:
if te[0] == data_hash and time.time() < te[1]+AutoInterface.MULTI_IF_DEQUE_TTL:
if te[0] == data_hash and time.time() < te[1]+WeaveInterface.MULTI_IF_DEQUE_TTL:
deque_hit = True
break
if not deque_hit:
self.owner.refresh_peer(self.addr)
self.owner.refresh_peer(self.endpoint_addr)
self.owner.mif_deque.append(data_hash)
self.owner.mif_deque_times.append([data_hash, time.time()])
self.rxb += len(data)
@@ -865,7 +872,7 @@ class WeaveInterfacePeer(Interface):
if self.online:
with self.owner.write_lock:
try:
# TODO: Push to weave endpoint
self.owner.device.deliver_packet(self.endpoint_addr, data)
self.txb += len(data)
self.owner.txb += len(data)
except Exception as e:
@@ -888,8 +895,8 @@ class WeaveInterfacePeer(Interface):
self.OUT = False
self.IN = False
if self.addr in self.owner.spawned_interfaces:
try: self.owner.spawned_interfaces.pop(self.addr)
if self.endpoint_addr in self.owner.spawned_interfaces:
try: self.owner.spawned_interfaces.pop(self.endpoint_addr)
except Exception as e:
RNS.log(f"Could not remove {self} from parent interface on detach. The contained exception was: {e}", RNS.LOG_ERROR)