diff --git a/RNS/Interfaces/WeaveInterface.py b/RNS/Interfaces/WeaveInterface.py index a896a7b7..37511e72 100644 --- a/RNS/Interfaces/WeaveInterface.py +++ b/RNS/Interfaces/WeaveInterface.py @@ -68,7 +68,8 @@ class WDCL(): self.should_run = True self.receiver = None self.frame_queue = deque() - self.id = RNS.Identity.full_hash(port.hwid.encode("utf-8")) + if not self.as_interface: + self.id = RNS.Identity.full_hash(port.hwid.encode("utf-8")) if self.as_interface: try: @@ -97,10 +98,15 @@ class WDCL(): def open_port(self): - if self.as_interface: RNS.log(f"Opening serial port {self.port.device}...", RNS.LOG_VERBOSE) - else: self.owner.wlog(f"Opening serial port {self.port.device}...") + if self.as_interface: + RNS.log(f"Opening serial port {self.port}...", RNS.LOG_VERBOSE) + target_port = self.port + else: + self.owner.wlog(f"Opening serial port {self.port.device}...") + target_port = self.port.device + self.serial = self.pyserial.Serial( - port = self.port.device, + port = target_port, baudrate = self.speed, bytesize = self.databits, parity = self.parity, @@ -116,7 +122,7 @@ class WDCL(): self.should_run = False if self.serial: self.serial.close() - if self.as_interface: RNS.LOG((f"Closed serial port {str(self.port.device)} for {str(self)}"), RNS.LOG_VERBOSE) + if self.as_interface: RNS.LOG((f"Closed serial port {str(self.port)} for {str(self)}"), RNS.LOG_VERBOSE) else: self.owner.wlog(f"Closed serial port {str(self.port.device)} for {str(self)}") def configure_device(self): @@ -124,7 +130,7 @@ class WDCL(): thread.daemon = True thread.start() self.online = True - if self.as_interface: RNS.log(f"Serial port {self.port.device} is now open, discovering remote device...", RNS.LOG_VERBOSE) + if self.as_interface: RNS.log(f"Serial port {self.port} is now open, discovering remote device...", RNS.LOG_VERBOSE) else: self.owner.wlog("Serial port "+self.port.device+" is now open") self.device.discover() @@ -188,7 +194,7 @@ class WDCL(): while not self.online: try: time.sleep(5) - if self.as_interface: RNS.log("Attempting to reconnect serial port "+str(self.port.device)+" for "+str(self)+"...", RNS.LOG_INFO) + if self.as_interface: RNS.log("Attempting to reconnect serial port "+str(self.port)+" for "+str(self)+"...", RNS.LOG_INFO) else: self.owner.wlog("Attempting to reconnect serial port "+str(self.port.device)+" for "+str(self)+"...") self.open_port() if self.serial.is_open: self.configure_device() @@ -196,13 +202,16 @@ class WDCL(): if self.as_interface: RNS.log("Error while reconnecting port, the contained exception was: "+str(e), RNS.LOG_ERROR) else: self.owner.wlog("Error while reconnecting port, the contained exception was: "+str(e)) - if self.as_interface: RNS.log("Reconnected serial port for "+str(self), RNS.LOG_IN) + if self.as_interface: RNS.log("Reconnected serial port for "+str(self), RNS.LOG_INFO) else: self.owner.wlog("Reconnected serial port for "+str(self)) def __str__(self): - if self.port.serial_number: sn_str = f" {self.port.serial_number}" - else: sn_str = "" - return f"{self.port.product}{sn_str} (USB)" + if self.as_interface: + return self.port + else: + if self.port.serial_number: sn_str = f" {self.port.serial_number}" + else: sn_str = "" + return f"{self.port.product}{sn_str} (USB)" class Cmd(): WDCL_CMD_ENDPOINT_PKT = 0x0001 @@ -248,6 +257,8 @@ class Evt(): ET_PROTO_WDCL_CONNECTION = 0x3002 ET_PROTO_WEAVE_INIT = 0x3100 ET_PROTO_WEAVE_RUNNING = 0x3101 + ET_PROTO_WEAVE_EP_ALIVE = 0x3102 + ET_PROTO_WEAVE_EP_TIMEOUT = 0x3103 ET_SRVCTL_REMOTE_DISPLAY = 0xA000 ET_INTERFACE_REGISTERED = 0xD000 ET_STAT_STATE = 0xE000 @@ -311,6 +322,8 @@ class Evt(): ET_PROTO_WDCL_CONNECTION: "WDCL host connection", ET_PROTO_WEAVE_INIT: "Weave protocol initialization", ET_PROTO_WEAVE_RUNNING: "Weave protocol activation", + ET_PROTO_WEAVE_EP_ALIVE: "Weave endpoint appeared", + ET_PROTO_WEAVE_EP_TIMEOUT: "Weave endpoint disappeared", ET_SRVCTL_REMOTE_DISPLAY: "Remote display service control event", ET_INTERFACE_REGISTERED: "Interface registration", ET_SYSERR_MEM_EXHAUSTED: "System memory exhausted", @@ -436,11 +449,12 @@ class WeaveDevice(): WEAVE_PRVKEY_SIZE = 64 WEAVE_SIGNATURE_LEN = 64 - def __init__(self, as_interface=False): + def __init__(self, as_interface=False, rns_interface=None): self.identity = None self.receiver = None self.switch_id = None self.owner = None + self.rns_interface = rns_interface self.as_interface = as_interface self.endpoints = {} self.active_tasks = {} @@ -547,18 +561,22 @@ class WeaveDevice(): if not endpoint_id in self.endpoints: self.endpoints[endpoint_id] = WeaveEndpoint(endpoint_id) else: self.endpoints[endpoint_id].alive = time.time() + if self.as_interface: self.rns_interface.add_peer(endpoint_id) + def deliver_packet(self, endpoint_id, data): - self.wdcl_send_command(Cmd.WDCL_CMD_ENDPOINT_PKT, endpoint_id+data) + packet_data = endpoint_id+data + self.wdcl_send_command(Cmd.WDCL_CMD_ENDPOINT_PKT, packet_data) def received_packet(self, source, data): self.endpoint_alive(source) - self.endpoints[source].receive(data) + if self.as_interface: + self.rns_interface.process_incoming(data, source) def incoming_frame(self, data): if len(data) > self.WEAVE_SWITCH_ID_LEN+2 and data[self.WEAVE_SWITCH_ID_LEN] == WDCL.WDCL_T_ENDPOINT_PKT and data[:self.WEAVE_SWITCH_ID_LEN] == self.connection.switch_id: payload = data[self.WEAVE_SWITCH_ID_LEN+1:-self.WEAVE_ENDPOINT_ID_LEN] src_endpoint = data[-self.WEAVE_ENDPOINT_ID_LEN:] - self.received_packet(src_endpoint, data) + self.received_packet(src_endpoint, payload) elif len(data) > self.WEAVE_SWITCH_ID_LEN+1 and data[self.WEAVE_SWITCH_ID_LEN] == WDCL.WDCL_T_DISCOVER: discovery_response_len = self.WEAVE_SWITCH_ID_LEN+1+self.WEAVE_PUBKEY_SIZE+self.WEAVE_SIGNATURE_LEN @@ -602,7 +620,9 @@ class WeaveDevice(): def log_handle(self, frame): # Handle system event signalling - if frame.event == Evt.ET_STAT_TASK_CPU: self.active_tasks[frame.data[1:].decode("utf-8")] = { "cpu_load": frame.data[0], "timestamp": time.time() } + if frame.event == Evt.ET_PROTO_WEAVE_EP_ALIVE and len(frame.data) == 8: + self.endpoint_alive(frame.data) + elif frame.event == Evt.ET_STAT_TASK_CPU: self.active_tasks[frame.data[1:].decode("utf-8")] = { "cpu_load": frame.data[0], "timestamp": time.time() } elif frame.event == Evt.ET_STAT_CPU: self.cpu_load = frame.data[0] self.capture_stats_cpu() @@ -617,7 +637,7 @@ class WeaveDevice(): else: ts = RNS.prettytime(frame.timestamp) if frame.event == Evt.ET_MSG: - if len(frame.data): data_string = frame.data.decode("utf-8") + if len(frame.data): data_string = f"{frame.data.decode("utf-8")}" else: data_string = "" rendered = f"[{ts}] [{Evt.level(frame.level)}]: {data_string}" @@ -655,20 +675,21 @@ class WeaveDevice(): rendered = f"[{ts}] [{Evt.level(frame.level)}] [{event_description}]{data_string}" - if self.receiver and self.receiver.ready: - while len(self.log_queue): self.receiver.log(self.log_queue.pop()) - self.receiver.log(rendered) - else: self.log_queue.append(rendered) + if self.as_interface: + RNS.log(f"{self.rns_interface}: {rendered}", RNS.LOG_EXTREME) + else: + if self.receiver and self.receiver.ready: + while len(self.log_queue): self.receiver.log(self.log_queue.pop()) + self.receiver.log(rendered) + else: self.log_queue.append(rendered) class WeaveInterface(Interface): - HW_MTU = 1196 + HW_MTU = 1024 FIXED_MTU = True DEFAULT_IFAC_SIZE = 16 - PEERING_TIMEOUT = 20.0 - - BITRATE_GUESS = 1*1000*1000 + BITRATE_GUESS = 500*1000 MULTI_IF_DEQUE_LEN = 48 MULTI_IF_DEQUE_TTL = 0.75 @@ -683,13 +704,14 @@ class WeaveInterface(Interface): super().__init__() self.netinfo = netinfo - self.HW_MTU = AutoInterface.HW_MTU + self.HW_MTU = WeaveInterface.HW_MTU self.IN = True self.OUT = False self.name = name self.port = port - self.device = WeaveDevice(as_interface=True) - self.connection = WDCL(owner=None, device=self.device, port=self.port, as_interface=True) + self.switch_identity = RNS.Identity() + self.device = WeaveDevice(as_interface=True, rns_interface=self) + self.connection = WDCL(owner=self, device=self.device, port=self.port, as_interface=True) self.owner = owner self.online = False self.final_init_done = False @@ -697,8 +719,8 @@ class WeaveInterface(Interface): self.timed_out_interfaces = {} self.spawned_interfaces = {} self.write_lock = threading.Lock() - self.mif_deque = deque(maxlen=AutoInterface.MULTI_IF_DEQUE_LEN) - self.mif_deque_times = deque(maxlen=AutoInterface.MULTI_IF_DEQUE_LEN) + self.mif_deque = deque(maxlen=WeaveInterface.MULTI_IF_DEQUE_LEN) + self.mif_deque_times = deque(maxlen=WeaveInterface.MULTI_IF_DEQUE_LEN) self.announce_rate_target = None self.peer_job_interval = WeaveInterface.PEERING_TIMEOUT*1.1 @@ -715,8 +737,6 @@ class WeaveInterface(Interface): job_thread.daemon = True job_thread.start() - time.sleep(peering_wait) - self.online = True self.final_init_done = True @@ -746,74 +766,62 @@ class WeaveInterface(Interface): def peer_count(self): return len(self.spawned_interfaces) - def add_peer(self, addr, ifname): - if addr in self.link_local_addresses: - ifname = None - for interface_name in self.adopted_interfaces: - if self.adopted_interfaces[interface_name] == addr: - ifname = interface_name + def add_peer(self, endpoint_addr): + if not endpoint_addr in self.peers: + self.peers[endpoint_addr] = [endpoint_addr, time.time()] - if ifname != None: - self.multicast_echoes[ifname] = time.time() - else: - RNS.log(str(self)+" received multicast echo on unexpected interface "+str(ifname), RNS.LOG_WARNING) + spawned_interface = WeaveInterfacePeer(self, endpoint_addr) + spawned_interface.OUT = self.OUT + spawned_interface.IN = self.IN + spawned_interface.parent_interface = self + spawned_interface.bitrate = self.bitrate + + spawned_interface.ifac_size = self.ifac_size + spawned_interface.ifac_netname = self.ifac_netname + spawned_interface.ifac_netkey = self.ifac_netkey + if spawned_interface.ifac_netname != None or spawned_interface.ifac_netkey != None: + ifac_origin = b"" + if spawned_interface.ifac_netname != None: + ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netname.encode("utf-8")) + if spawned_interface.ifac_netkey != None: + ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netkey.encode("utf-8")) + ifac_origin_hash = RNS.Identity.full_hash(ifac_origin) + spawned_interface.ifac_key = RNS.Cryptography.hkdf( + length=64, + derive_from=ifac_origin_hash, + salt=RNS.Reticulum.IFAC_SALT, + context=None + ) + spawned_interface.ifac_identity = RNS.Identity.from_bytes(spawned_interface.ifac_key) + spawned_interface.ifac_signature = spawned_interface.ifac_identity.sign(RNS.Identity.full_hash(spawned_interface.ifac_key)) + + spawned_interface.announce_rate_target = self.announce_rate_target + spawned_interface.announce_rate_grace = self.announce_rate_grace + spawned_interface.announce_rate_penalty = self.announce_rate_penalty + spawned_interface.mode = self.mode + spawned_interface.HW_MTU = self.HW_MTU + spawned_interface.online = True + RNS.Transport.interfaces.append(spawned_interface) + if endpoint_addr in self.spawned_interfaces: + self.spawned_interfaces[endpoint_addr].detach() + self.spawned_interfaces[endpoint_addr].teardown() + self.spawned_interfaces.pop(spawned_interface) + self.spawned_interfaces[endpoint_addr] = spawned_interface + + RNS.log(f"{self} added peer {RNS.hexrep(endpoint_addr)}", RNS.LOG_DEBUG) else: - if not addr in self.peers: - self.peers[addr] = [ifname, time.time()] + self.refresh_peer(endpoint_addr) - spawned_interface = AutoInterfacePeer(self, addr, ifname) - spawned_interface.OUT = self.OUT - spawned_interface.IN = self.IN - spawned_interface.parent_interface = self - spawned_interface.bitrate = self.bitrate - - spawned_interface.ifac_size = self.ifac_size - spawned_interface.ifac_netname = self.ifac_netname - spawned_interface.ifac_netkey = self.ifac_netkey - if spawned_interface.ifac_netname != None or spawned_interface.ifac_netkey != None: - ifac_origin = b"" - if spawned_interface.ifac_netname != None: - ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netname.encode("utf-8")) - if spawned_interface.ifac_netkey != None: - ifac_origin += RNS.Identity.full_hash(spawned_interface.ifac_netkey.encode("utf-8")) - - ifac_origin_hash = RNS.Identity.full_hash(ifac_origin) - spawned_interface.ifac_key = RNS.Cryptography.hkdf( - length=64, - derive_from=ifac_origin_hash, - salt=RNS.Reticulum.IFAC_SALT, - context=None - ) - spawned_interface.ifac_identity = RNS.Identity.from_bytes(spawned_interface.ifac_key) - spawned_interface.ifac_signature = spawned_interface.ifac_identity.sign(RNS.Identity.full_hash(spawned_interface.ifac_key)) - - spawned_interface.announce_rate_target = self.announce_rate_target - spawned_interface.announce_rate_grace = self.announce_rate_grace - spawned_interface.announce_rate_penalty = self.announce_rate_penalty - spawned_interface.mode = self.mode - spawned_interface.HW_MTU = self.HW_MTU - spawned_interface.online = True - RNS.Transport.interfaces.append(spawned_interface) - if addr in self.spawned_interfaces: - self.spawned_interfaces[addr].detach() - self.spawned_interfaces[addr].teardown() - self.spawned_interfaces.pop(spawned_interface) - self.spawned_interfaces[addr] = spawned_interface - - RNS.log(str(self)+" added peer "+str(addr)+" on "+str(ifname), RNS.LOG_DEBUG) - else: - self.refresh_peer(addr) - - def refresh_peer(self, addr): + def refresh_peer(self, endpoint_addr): try: - self.peers[addr][1] = time.time() + self.peers[endpoint_addr][1] = time.time() except Exception as e: - RNS.log(f"An error occurred while refreshing peer {RNS.hexrep(addr)} on {self}: {e}", RNS.LOG_ERROR) + RNS.log(f"An error occurred while refreshing peer {RNS.hexrep(endpoint_addr)} on {self}: {e}", RNS.LOG_ERROR) - def process_incoming(self, data, addr=None): - if self.online and addr in self.spawned_interfaces: - self.spawned_interfaces[addr].process_incoming(data, addr) + def process_incoming(self, data, endpoint_addr=None): + if self.online and endpoint_addr in self.spawned_interfaces: + self.spawned_interfaces[endpoint_addr].process_incoming(data, endpoint_addr) def process_outgoing(self,data): pass @@ -829,32 +837,31 @@ class WeaveInterface(Interface): class WeaveInterfacePeer(Interface): - def __init__(self, owner, addr, ifname): + def __init__(self, owner, endpoint_addr): super().__init__() self.owner = owner self.parent_interface = owner - self.addr = addr - self.ifname = ifname + self.endpoint_addr = endpoint_addr self.peer_addr = None self.addr_info = None self.HW_MTU = self.owner.HW_MTU self.FIXED_MTU = self.owner.FIXED_MTU def __str__(self): - return f"WeaveInterfacePeer[{self.ifname}/{self.addr}]" + return f"WeaveInterfacePeer[{RNS.hexrep(self.endpoint_addr)}]" - def process_incoming(self, data, addr=None): + def process_incoming(self, data, endpoint_addr=None): if self.online and self.owner.online: data_hash = RNS.Identity.full_hash(data) deque_hit = False if data_hash in self.owner.mif_deque: for te in self.owner.mif_deque_times: - if te[0] == data_hash and time.time() < te[1]+AutoInterface.MULTI_IF_DEQUE_TTL: + if te[0] == data_hash and time.time() < te[1]+WeaveInterface.MULTI_IF_DEQUE_TTL: deque_hit = True break if not deque_hit: - self.owner.refresh_peer(self.addr) + self.owner.refresh_peer(self.endpoint_addr) self.owner.mif_deque.append(data_hash) self.owner.mif_deque_times.append([data_hash, time.time()]) self.rxb += len(data) @@ -865,7 +872,7 @@ class WeaveInterfacePeer(Interface): if self.online: with self.owner.write_lock: try: - # TODO: Push to weave endpoint + self.owner.device.deliver_packet(self.endpoint_addr, data) self.txb += len(data) self.owner.txb += len(data) except Exception as e: @@ -888,8 +895,8 @@ class WeaveInterfacePeer(Interface): self.OUT = False self.IN = False - if self.addr in self.owner.spawned_interfaces: - try: self.owner.spawned_interfaces.pop(self.addr) + if self.endpoint_addr in self.owner.spawned_interfaces: + try: self.owner.spawned_interfaces.pop(self.endpoint_addr) except Exception as e: RNS.log(f"Could not remove {self} from parent interface on detach. The contained exception was: {e}", RNS.LOG_ERROR)