# Code Examples A number of examples are included in the source distribution of Reticulum. You can use these examples to learn how to write your own programs. ## Minimal The *Minimal* example demonstrates the bare-minimum setup required to connect to a Reticulum network from your program. In about five lines of code, you will have the Reticulum Network Stack initialised, and ready to pass traffic in your program. ```default ########################################################## # This RNS example demonstrates a minimal setup, that # # will start up the Reticulum Network Stack, generate a # # new destination, and let the user send an announce. # ########################################################## import argparse import sys import RNS # Let's define an app name. We'll use this for all # destinations we create. Since this basic example # is part of a range of example utilities, we'll put # them all within the app namespace "example_utilities" APP_NAME = "example_utilities" # This initialisation is executed when the program is started def program_setup(configpath): # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # Randomly create a new identity for our example identity = RNS.Identity() # Using the identity we just created, we create a destination. # Destinations are endpoints in Reticulum, that can be addressed # and communicated with. Destinations can also announce their # existence, which will let the network know they are reachable # and automatically create paths to them, from anywhere else # in the network. destination = RNS.Destination( identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "minimalsample" ) # We configure the destination to automatically prove all # packets addressed to it. By doing this, RNS will automatically # generate a proof for each incoming packet and transmit it # back to the sender of that packet. This will let anyone that # tries to communicate with the destination know whether their # communication was received correctly. destination.set_proof_strategy(RNS.Destination.PROVE_ALL) # Everything's ready! # Let's hand over control to the announce loop announceLoop(destination) def announceLoop(destination): # Let the user know that everything is ready RNS.log( "Minimal example "+ RNS.prettyhexrep(destination.hash)+ " running, hit enter to manually send an announce (Ctrl-C to quit)" ) # We enter a loop that runs until the users exits. # If the user hits enter, we will announce our server # destination on the network, which will let clients # know how to create messages directed towards it. while True: entered = input() destination.announce() RNS.log("Sent announce from "+RNS.prettyhexrep(destination.hash)) ########################################################## #### Program Startup ##################################### ########################################################## # This part of the program gets run at startup, # and parses input from the user, and then starts # the desired program mode. if __name__ == "__main__": try: parser = argparse.ArgumentParser( description="Minimal example to start Reticulum and create a destination" ) parser.add_argument( "--config", action="store", default=None, help="path to alternative Reticulum config directory", type=str ) args = parser.parse_args() if args.config: configarg = args.config else: configarg = None program_setup(configarg) except KeyboardInterrupt: print("") sys.exit(0) ``` This example can also be found at [https://github.com/markqvist/Reticulum/blob/master/Examples/Minimal.py](https://github.com/markqvist/Reticulum/blob/master/Examples/Minimal.py). ## Announce The *Announce* example builds upon the previous example by exploring how to announce a destination on the network, and how to let your program receive notifications about announces from relevant destinations. ```default ########################################################## # This RNS example demonstrates setting up announce # # callbacks, which will let an application receive a # # notification when an announce relevant for it arrives # ########################################################## import argparse import random import sys import RNS # Let's define an app name. We'll use this for all # destinations we create. Since this basic example # is part of a range of example utilities, we'll put # them all within the app namespace "example_utilities" APP_NAME = "example_utilities" # We initialise two lists of strings to use as app_data fruits = ["Peach", "Quince", "Date", "Tangerine", "Pomelo", "Carambola", "Grape"] noble_gases = ["Helium", "Neon", "Argon", "Krypton", "Xenon", "Radon", "Oganesson"] # This initialisation is executed when the program is started def program_setup(configpath): # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # Randomly create a new identity for our example identity = RNS.Identity() # Using the identity we just created, we create two destinations # in the "example_utilities.announcesample" application space. # # Destinations are endpoints in Reticulum, that can be addressed # and communicated with. Destinations can also announce their # existence, which will let the network know they are reachable # and automatically create paths to them, from anywhere else # in the network. destination_1 = RNS.Destination( identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "announcesample", "fruits" ) destination_2 = RNS.Destination( identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "announcesample", "noble_gases" ) # We configure the destinations to automatically prove all # packets addressed to it. By doing this, RNS will automatically # generate a proof for each incoming packet and transmit it # back to the sender of that packet. This will let anyone that # tries to communicate with the destination know whether their # communication was received correctly. destination_1.set_proof_strategy(RNS.Destination.PROVE_ALL) destination_2.set_proof_strategy(RNS.Destination.PROVE_ALL) # We create an announce handler and configure it to only ask for # announces from "example_utilities.announcesample.fruits". # Try changing the filter and see what happens. announce_handler = ExampleAnnounceHandler( aspect_filter="example_utilities.announcesample.fruits" ) # We register the announce handler with Reticulum RNS.Transport.register_announce_handler(announce_handler) # Everything's ready! # Let's hand over control to the announce loop announceLoop(destination_1, destination_2) def announceLoop(destination_1, destination_2): # Let the user know that everything is ready RNS.log("Announce example running, hit enter to manually send an announce (Ctrl-C to quit)") # We enter a loop that runs until the users exits. # If the user hits enter, we will announce our server # destination on the network, which will let clients # know how to create messages directed towards it. while True: entered = input() # Randomly select a fruit fruit = fruits[random.randint(0,len(fruits)-1)] # Send the announce including the app data destination_1.announce(app_data=fruit.encode("utf-8")) RNS.log( "Sent announce from "+ RNS.prettyhexrep(destination_1.hash)+ " ("+destination_1.name+")" ) # Randomly select a noble gas noble_gas = noble_gases[random.randint(0,len(noble_gases)-1)] # Send the announce including the app data destination_2.announce(app_data=noble_gas.encode("utf-8")) RNS.log( "Sent announce from "+ RNS.prettyhexrep(destination_2.hash)+ " ("+destination_2.name+")" ) # We will need to define an announce handler class that # Reticulum can message when an announce arrives. class ExampleAnnounceHandler: # The initialisation method takes the optional # aspect_filter argument. If aspect_filter is set to # None, all announces will be passed to the instance. # If only some announces are wanted, it can be set to # an aspect string. def __init__(self, aspect_filter=None): self.aspect_filter = aspect_filter # This method will be called by Reticulums Transport # system when an announce arrives that matches the # configured aspect filter. Filters must be specific, # and cannot use wildcards. def received_announce(self, destination_hash, announced_identity, app_data): RNS.log( "Received an announce from "+ RNS.prettyhexrep(destination_hash) ) if app_data: RNS.log( "The announce contained the following app data: "+ app_data.decode("utf-8") ) ########################################################## #### Program Startup ##################################### ########################################################## # This part of the program gets run at startup, # and parses input from the user, and then starts # the desired program mode. if __name__ == "__main__": try: parser = argparse.ArgumentParser( description="Reticulum example that demonstrates announces and announce handlers" ) parser.add_argument( "--config", action="store", default=None, help="path to alternative Reticulum config directory", type=str ) args = parser.parse_args() if args.config: configarg = args.config else: configarg = None program_setup(configarg) except KeyboardInterrupt: print("") sys.exit(0) ``` This example can also be found at [https://github.com/markqvist/Reticulum/blob/master/Examples/Announce.py](https://github.com/markqvist/Reticulum/blob/master/Examples/Announce.py). ## Broadcast The *Broadcast* example explores how to transmit plaintext broadcast messages over the network. ```default ########################################################## # This RNS example demonstrates broadcasting unencrypted # # information to any listening destinations. # ########################################################## import sys import argparse import RNS # Let's define an app name. We'll use this for all # destinations we create. Since this basic example # is part of a range of example utilities, we'll put # them all within the app namespace "example_utilities" APP_NAME = "example_utilities" # This initialisation is executed when the program is started def program_setup(configpath, channel=None): # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # If the user did not select a "channel" we use # a default one called "public_information". # This "channel" is added to the destination name- # space, so the user can select different broadcast # channels. if channel == None: channel = "public_information" # We create a PLAIN destination. This is an uncencrypted endpoint # that anyone can listen to and send information to. broadcast_destination = RNS.Destination( None, RNS.Destination.IN, RNS.Destination.PLAIN, APP_NAME, "broadcast", channel ) # We specify a callback that will get called every time # the destination receives data. broadcast_destination.set_packet_callback(packet_callback) # Everything's ready! # Let's hand over control to the main loop broadcastLoop(broadcast_destination) def packet_callback(data, packet): # Simply print out the received data print("") print("Received data: "+data.decode("utf-8")+"\r\n> ", end="") sys.stdout.flush() def broadcastLoop(destination): # Let the user know that everything is ready RNS.log( "Broadcast example "+ RNS.prettyhexrep(destination.hash)+ " running, enter text and hit enter to broadcast (Ctrl-C to quit)" ) # We enter a loop that runs until the users exits. # If the user hits enter, we will send the information # that the user entered into the prompt. while True: print("> ", end="") entered = input() if entered != "": data = entered.encode("utf-8") packet = RNS.Packet(destination, data) packet.send() ########################################################## #### Program Startup ##################################### ########################################################## # This part of the program gets run at startup, # and parses input from the user, and then starts # the program. if __name__ == "__main__": try: parser = argparse.ArgumentParser( description="Reticulum example demonstrating sending and receiving broadcasts" ) parser.add_argument( "--config", action="store", default=None, help="path to alternative Reticulum config directory", type=str ) parser.add_argument( "--channel", action="store", default=None, help="broadcast channel name", type=str ) args = parser.parse_args() if args.config: configarg = args.config else: configarg = None if args.channel: channelarg = args.channel else: channelarg = None program_setup(configarg, channelarg) except KeyboardInterrupt: print("") sys.exit(0) ``` This example can also be found at [https://github.com/markqvist/Reticulum/blob/master/Examples/Broadcast.py](https://github.com/markqvist/Reticulum/blob/master/Examples/Broadcast.py). ## Echo The *Echo* example demonstrates communication between two destinations using the Packet interface. ```default ########################################################## # This RNS example demonstrates a simple client/server # # echo utility. A client can send an echo request to the # # server, and the server will respond by proving receipt # # of the packet. # ########################################################## import argparse import sys import RNS # Let's define an app name. We'll use this for all # destinations we create. Since this echo example # is part of a range of example utilities, we'll put # them all within the app namespace "example_utilities" APP_NAME = "example_utilities" ########################################################## #### Server Part ######################################### ########################################################## # This initialisation is executed when the users chooses # to run as a server def server(configpath): global reticulum # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # Randomly create a new identity for our echo server server_identity = RNS.Identity() # We create a destination that clients can query. We want # to be able to verify echo replies to our clients, so we # create a "single" destination that can receive encrypted # messages. This way the client can send a request and be # certain that no-one else than this destination was able # to read it. echo_destination = RNS.Destination( server_identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "echo", "request" ) # We configure the destination to automatically prove all # packets addressed to it. By doing this, RNS will automatically # generate a proof for each incoming packet and transmit it # back to the sender of that packet. echo_destination.set_proof_strategy(RNS.Destination.PROVE_ALL) # Tell the destination which function in our program to # run when a packet is received. We do this so we can # print a log message when the server receives a request echo_destination.set_packet_callback(server_callback) # Everything's ready! # Let's Wait for client requests or user input announceLoop(echo_destination) def announceLoop(destination): # Let the user know that everything is ready RNS.log( "Echo server "+ RNS.prettyhexrep(destination.hash)+ " running, hit enter to manually send an announce (Ctrl-C to quit)" ) # We enter a loop that runs until the users exits. # If the user hits enter, we will announce our server # destination on the network, which will let clients # know how to create messages directed towards it. while True: entered = input() destination.announce() RNS.log("Sent announce from "+RNS.prettyhexrep(destination.hash)) def server_callback(message, packet): global reticulum # Tell the user that we received an echo request, and # that we are going to send a reply to the requester. # Sending the proof is handled automatically, since we # set up the destination to prove all incoming packets. reception_stats = "" if reticulum.is_connected_to_shared_instance: reception_rssi = reticulum.get_packet_rssi(packet.packet_hash) reception_snr = reticulum.get_packet_snr(packet.packet_hash) if reception_rssi != None: reception_stats += " [RSSI "+str(reception_rssi)+" dBm]" if reception_snr != None: reception_stats += " [SNR "+str(reception_snr)+" dBm]" else: if packet.rssi != None: reception_stats += " [RSSI "+str(packet.rssi)+" dBm]" if packet.snr != None: reception_stats += " [SNR "+str(packet.snr)+" dB]" RNS.log("Received packet from echo client, proof sent"+reception_stats) ########################################################## #### Client Part ######################################### ########################################################## # This initialisation is executed when the users chooses # to run as a client def client(destination_hexhash, configpath, timeout=None): global reticulum # We need a binary representation of the destination # hash that was entered on the command line try: dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 if len(destination_hexhash) != dest_len: raise ValueError( "Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2) ) destination_hash = bytes.fromhex(destination_hexhash) except Exception as e: RNS.log("Invalid destination entered. Check your input!") RNS.log(str(e)+"\n") sys.exit(0) # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # We override the loglevel to provide feedback when # an announce is received if RNS.loglevel < RNS.LOG_INFO: RNS.loglevel = RNS.LOG_INFO # Tell the user that the client is ready! RNS.log( "Echo client ready, hit enter to send echo request to "+ destination_hexhash+ " (Ctrl-C to quit)" ) # We enter a loop that runs until the user exits. # If the user hits enter, we will try to send an # echo request to the destination specified on the # command line. while True: input() # Let's first check if RNS knows a path to the destination. # If it does, we'll load the server identity and create a packet if RNS.Transport.has_path(destination_hash): # To address the server, we need to know it's public # key, so we check if Reticulum knows this destination. # This is done by calling the "recall" method of the # Identity module. If the destination is known, it will # return an Identity instance that can be used in # outgoing destinations. server_identity = RNS.Identity.recall(destination_hash) # We got the correct identity instance from the # recall method, so let's create an outgoing # destination. We use the naming convention: # example_utilities.echo.request # This matches the naming we specified in the # server part of the code. request_destination = RNS.Destination( server_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "echo", "request" ) # The destination is ready, so let's create a packet. # We set the destination to the request_destination # that was just created, and the only data we add # is a random hash. echo_request = RNS.Packet(request_destination, RNS.Identity.get_random_hash()) # Send the packet! If the packet is successfully # sent, it will return a PacketReceipt instance. packet_receipt = echo_request.send() # If the user specified a timeout, we set this # timeout on the packet receipt, and configure # a callback function, that will get called if # the packet times out. if timeout != None: packet_receipt.set_timeout(timeout) packet_receipt.set_timeout_callback(packet_timed_out) # We can then set a delivery callback on the receipt. # This will get automatically called when a proof for # this specific packet is received from the destination. packet_receipt.set_delivery_callback(packet_delivered) # Tell the user that the echo request was sent RNS.log("Sent echo request to "+RNS.prettyhexrep(request_destination.hash)) else: # If we do not know this destination, tell the # user to wait for an announce to arrive. RNS.log("Destination is not yet known. Requesting path...") RNS.log("Hit enter to manually retry once an announce is received.") RNS.Transport.request_path(destination_hash) # This function is called when our reply destination # receives a proof packet. def packet_delivered(receipt): global reticulum if receipt.status == RNS.PacketReceipt.DELIVERED: rtt = receipt.get_rtt() if (rtt >= 1): rtt = round(rtt, 3) rttstring = str(rtt)+" seconds" else: rtt = round(rtt*1000, 3) rttstring = str(rtt)+" milliseconds" reception_stats = "" if reticulum.is_connected_to_shared_instance: reception_rssi = reticulum.get_packet_rssi(receipt.proof_packet.packet_hash) reception_snr = reticulum.get_packet_snr(receipt.proof_packet.packet_hash) if reception_rssi != None: reception_stats += " [RSSI "+str(reception_rssi)+" dBm]" if reception_snr != None: reception_stats += " [SNR "+str(reception_snr)+" dB]" else: if receipt.proof_packet != None: if receipt.proof_packet.rssi != None: reception_stats += " [RSSI "+str(receipt.proof_packet.rssi)+" dBm]" if receipt.proof_packet.snr != None: reception_stats += " [SNR "+str(receipt.proof_packet.snr)+" dB]" RNS.log( "Valid reply received from "+ RNS.prettyhexrep(receipt.destination.hash)+ ", round-trip time is "+rttstring+ reception_stats ) # This function is called if a packet times out. def packet_timed_out(receipt): if receipt.status == RNS.PacketReceipt.FAILED: RNS.log("Packet "+RNS.prettyhexrep(receipt.hash)+" timed out") ########################################################## #### Program Startup ##################################### ########################################################## # This part of the program gets run at startup, # and parses input from the user, and then starts # the desired program mode. if __name__ == "__main__": try: parser = argparse.ArgumentParser(description="Simple echo server and client utility") parser.add_argument( "-s", "--server", action="store_true", help="wait for incoming packets from clients" ) parser.add_argument( "-t", "--timeout", action="store", metavar="s", default=None, help="set a reply timeout in seconds", type=float ) parser.add_argument("--config", action="store", default=None, help="path to alternative Reticulum config directory", type=str ) parser.add_argument( "destination", nargs="?", default=None, help="hexadecimal hash of the server destination", type=str ) args = parser.parse_args() if args.server: configarg=None if args.config: configarg = args.config server(configarg) else: if args.config: configarg = args.config else: configarg = None if args.timeout: timeoutarg = float(args.timeout) else: timeoutarg = None if (args.destination == None): print("") parser.print_help() print("") else: client(args.destination, configarg, timeout=timeoutarg) except KeyboardInterrupt: print("") sys.exit(0) ``` This example can also be found at [https://github.com/markqvist/Reticulum/blob/master/Examples/Echo.py](https://github.com/markqvist/Reticulum/blob/master/Examples/Echo.py). ## Link The *Link* example explores establishing an encrypted link to a remote destination, and passing traffic back and forth over the link. ```default ########################################################## # This RNS example demonstrates how to set up a link to # # a destination, and pass data back and forth over it. # ########################################################## import os import sys import time import argparse import RNS # Let's define an app name. We'll use this for all # destinations we create. Since this echo example # is part of a range of example utilities, we'll put # them all within the app namespace "example_utilities" APP_NAME = "example_utilities" ########################################################## #### Server Part ######################################### ########################################################## # A reference to the latest client link that connected latest_client_link = None # This initialisation is executed when the users chooses # to run as a server def server(configpath): # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # Randomly create a new identity for our link example server_identity = RNS.Identity() # We create a destination that clients can connect to. We # want clients to create links to this destination, so we # need to create a "single" destination type. server_destination = RNS.Destination( server_identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "linkexample" ) # We configure a function that will get called every time # a new client creates a link to this destination. server_destination.set_link_established_callback(client_connected) # Everything's ready! # Let's Wait for client requests or user input server_loop(server_destination) def server_loop(destination): # Let the user know that everything is ready RNS.log( "Link example "+ RNS.prettyhexrep(destination.hash)+ " running, waiting for a connection." ) RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)") # We enter a loop that runs until the users exits. # If the user hits enter, we will announce our server # destination on the network, which will let clients # know how to create messages directed towards it. while True: entered = input() destination.announce() RNS.log("Sent announce from "+RNS.prettyhexrep(destination.hash)) # When a client establishes a link to our server # destination, this function will be called with # a reference to the link. def client_connected(link): global latest_client_link RNS.log("Client connected") link.set_link_closed_callback(client_disconnected) link.set_packet_callback(server_packet_received) latest_client_link = link def client_disconnected(link): RNS.log("Client disconnected") def server_packet_received(message, packet): global latest_client_link # When data is received over any active link, # it will all be directed to the last client # that connected. text = message.decode("utf-8") RNS.log("Received data on the link: "+text) reply_text = "I received \""+text+"\" over the link" reply_data = reply_text.encode("utf-8") RNS.Packet(latest_client_link, reply_data).send() ########################################################## #### Client Part ######################################### ########################################################## # A reference to the server link server_link = None # This initialisation is executed when the users chooses # to run as a client def client(destination_hexhash, configpath): # We need a binary representation of the destination # hash that was entered on the command line try: dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 if len(destination_hexhash) != dest_len: raise ValueError( "Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2) ) destination_hash = bytes.fromhex(destination_hexhash) except: RNS.log("Invalid destination entered. Check your input!\n") sys.exit(0) # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # Check if we know a path to the destination if not RNS.Transport.has_path(destination_hash): RNS.log("Destination is not yet known. Requesting path and waiting for announce to arrive...") RNS.Transport.request_path(destination_hash) while not RNS.Transport.has_path(destination_hash): time.sleep(0.1) # Recall the server identity server_identity = RNS.Identity.recall(destination_hash) # Inform the user that we'll begin connecting RNS.log("Establishing link with server...") # When the server identity is known, we set # up a destination server_destination = RNS.Destination( server_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "linkexample" ) # And create a link link = RNS.Link(server_destination) # We set a callback that will get executed # every time a packet is received over the # link link.set_packet_callback(client_packet_received) # We'll also set up functions to inform the # user when the link is established or closed link.set_link_established_callback(link_established) link.set_link_closed_callback(link_closed) # Everything is set up, so let's enter a loop # for the user to interact with the example client_loop() def client_loop(): global server_link # Wait for the link to become active while not server_link: time.sleep(0.1) should_quit = False while not should_quit: try: print("> ", end=" ") text = input() # Check if we should quit the example if text == "quit" or text == "q" or text == "exit": should_quit = True server_link.teardown() # If not, send the entered text over the link if text != "": data = text.encode("utf-8") if len(data) <= RNS.Link.MDU: RNS.Packet(server_link, data).send() else: RNS.log( "Cannot send this packet, the data size of "+ str(len(data))+" bytes exceeds the link packet MDU of "+ str(RNS.Link.MDU)+" bytes", RNS.LOG_ERROR ) except Exception as e: RNS.log("Error while sending data over the link: "+str(e)) should_quit = True server_link.teardown() # This function is called when a link # has been established with the server def link_established(link): # We store a reference to the link # instance for later use global server_link server_link = link # Inform the user that the server is # connected RNS.log("Link established with server, enter some text to send, or \"quit\" to quit") # When a link is closed, we'll inform the # user, and exit the program def link_closed(link): if link.teardown_reason == RNS.Link.TIMEOUT: RNS.log("The link timed out, exiting now") elif link.teardown_reason == RNS.Link.DESTINATION_CLOSED: RNS.log("The link was closed by the server, exiting now") else: RNS.log("Link closed, exiting now") time.sleep(1.5) sys.exit(0) # When a packet is received over the link, we # simply print out the data. def client_packet_received(message, packet): text = message.decode("utf-8") RNS.log("Received data on the link: "+text) print("> ", end=" ") sys.stdout.flush() ########################################################## #### Program Startup ##################################### ########################################################## # This part of the program runs at startup, # and parses input of from the user, and then # starts up the desired program mode. if __name__ == "__main__": try: parser = argparse.ArgumentParser(description="Simple link example") parser.add_argument( "-s", "--server", action="store_true", help="wait for incoming link requests from clients" ) parser.add_argument( "--config", action="store", default=None, help="path to alternative Reticulum config directory", type=str ) parser.add_argument( "destination", nargs="?", default=None, help="hexadecimal hash of the server destination", type=str ) args = parser.parse_args() if args.config: configarg = args.config else: configarg = None if args.server: server(configarg) else: if (args.destination == None): print("") parser.print_help() print("") else: client(args.destination, configarg) except KeyboardInterrupt: print("") sys.exit(0) ``` This example can also be found at [https://github.com/markqvist/Reticulum/blob/master/Examples/Link.py](https://github.com/markqvist/Reticulum/blob/master/Examples/Link.py). ## Identification The *Identify* example explores identifying an intiator of a link, once the link has been established. ```default ########################################################## # This RNS example demonstrates how to set up a link to # # a destination, and identify the initiator to it's peer # ########################################################## import os import sys import time import argparse import RNS # Let's define an app name. We'll use this for all # destinations we create. Since this echo example # is part of a range of example utilities, we'll put # them all within the app namespace "example_utilities" APP_NAME = "example_utilities" ########################################################## #### Server Part ######################################### ########################################################## # A reference to the latest client link that connected latest_client_link = None # This initialisation is executed when the users chooses # to run as a server def server(configpath): # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # Randomly create a new identity for our link example server_identity = RNS.Identity() # We create a destination that clients can connect to. We # want clients to create links to this destination, so we # need to create a "single" destination type. server_destination = RNS.Destination( server_identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "identifyexample" ) # We configure a function that will get called every time # a new client creates a link to this destination. server_destination.set_link_established_callback(client_connected) # Everything's ready! # Let's Wait for client requests or user input server_loop(server_destination) def server_loop(destination): # Let the user know that everything is ready RNS.log( "Link identification example "+ RNS.prettyhexrep(destination.hash)+ " running, waiting for a connection." ) RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)") # We enter a loop that runs until the users exits. # If the user hits enter, we will announce our server # destination on the network, which will let clients # know how to create messages directed towards it. while True: entered = input() destination.announce() RNS.log("Sent announce from "+RNS.prettyhexrep(destination.hash)) # When a client establishes a link to our server # destination, this function will be called with # a reference to the link. def client_connected(link): global latest_client_link RNS.log("Client connected") link.set_link_closed_callback(client_disconnected) link.set_packet_callback(server_packet_received) link.set_remote_identified_callback(remote_identified) latest_client_link = link def client_disconnected(link): RNS.log("Client disconnected") def remote_identified(link, identity): RNS.log("Remote identified as: "+str(identity)) def server_packet_received(message, packet): global latest_client_link # Get the originating identity for display remote_peer = "unidentified peer" if packet.link.get_remote_identity() != None: remote_peer = str(packet.link.get_remote_identity()) # When data is received over any active link, # it will all be directed to the last client # that connected. text = message.decode("utf-8") RNS.log("Received data from "+remote_peer+": "+text) reply_text = "I received \""+text+"\" over the link from "+remote_peer reply_data = reply_text.encode("utf-8") RNS.Packet(latest_client_link, reply_data).send() ########################################################## #### Client Part ######################################### ########################################################## # A reference to the server link server_link = None # A reference to the client identity client_identity = None # This initialisation is executed when the users chooses # to run as a client def client(destination_hexhash, configpath): global client_identity # We need a binary representation of the destination # hash that was entered on the command line try: dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 if len(destination_hexhash) != dest_len: raise ValueError( "Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2) ) destination_hash = bytes.fromhex(destination_hexhash) except: RNS.log("Invalid destination entered. Check your input!\n") sys.exit(0) # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # Create a new client identity client_identity = RNS.Identity() RNS.log( "Client created new identity "+ str(client_identity) ) # Check if we know a path to the destination if not RNS.Transport.has_path(destination_hash): RNS.log("Destination is not yet known. Requesting path and waiting for announce to arrive...") RNS.Transport.request_path(destination_hash) while not RNS.Transport.has_path(destination_hash): time.sleep(0.1) # Recall the server identity server_identity = RNS.Identity.recall(destination_hash) # Inform the user that we'll begin connecting RNS.log("Establishing link with server...") # When the server identity is known, we set # up a destination server_destination = RNS.Destination( server_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "identifyexample" ) # And create a link link = RNS.Link(server_destination) # We set a callback that will get executed # every time a packet is received over the # link link.set_packet_callback(client_packet_received) # We'll also set up functions to inform the # user when the link is established or closed link.set_link_established_callback(link_established) link.set_link_closed_callback(link_closed) # Everything is set up, so let's enter a loop # for the user to interact with the example client_loop() def client_loop(): global server_link # Wait for the link to become active while not server_link: time.sleep(0.1) should_quit = False while not should_quit: try: print("> ", end=" ") text = input() # Check if we should quit the example if text == "quit" or text == "q" or text == "exit": should_quit = True server_link.teardown() # If not, send the entered text over the link if text != "": data = text.encode("utf-8") if len(data) <= RNS.Link.MDU: RNS.Packet(server_link, data).send() else: RNS.log( "Cannot send this packet, the data size of "+ str(len(data))+" bytes exceeds the link packet MDU of "+ str(RNS.Link.MDU)+" bytes", RNS.LOG_ERROR ) except Exception as e: RNS.log("Error while sending data over the link: "+str(e)) should_quit = True server_link.teardown() # This function is called when a link # has been established with the server def link_established(link): # We store a reference to the link # instance for later use global server_link, client_identity server_link = link # Inform the user that the server is # connected RNS.log("Link established with server, identifying to remote peer...") link.identify(client_identity) # When a link is closed, we'll inform the # user, and exit the program def link_closed(link): if link.teardown_reason == RNS.Link.TIMEOUT: RNS.log("The link timed out, exiting now") elif link.teardown_reason == RNS.Link.DESTINATION_CLOSED: RNS.log("The link was closed by the server, exiting now") else: RNS.log("Link closed, exiting now") time.sleep(1.5) sys.exit(0) # When a packet is received over the link, we # simply print out the data. def client_packet_received(message, packet): text = message.decode("utf-8") RNS.log("Received data on the link: "+text) print("> ", end=" ") sys.stdout.flush() ########################################################## #### Program Startup ##################################### ########################################################## # This part of the program runs at startup, # and parses input of from the user, and then # starts up the desired program mode. if __name__ == "__main__": try: parser = argparse.ArgumentParser(description="Simple link example") parser.add_argument( "-s", "--server", action="store_true", help="wait for incoming link requests from clients" ) parser.add_argument( "--config", action="store", default=None, help="path to alternative Reticulum config directory", type=str ) parser.add_argument( "destination", nargs="?", default=None, help="hexadecimal hash of the server destination", type=str ) args = parser.parse_args() if args.config: configarg = args.config else: configarg = None if args.server: server(configarg) else: if (args.destination == None): print("") parser.print_help() print("") else: client(args.destination, configarg) except KeyboardInterrupt: print("") sys.exit(0) ``` This example can also be found at [https://github.com/markqvist/Reticulum/blob/master/Examples/Identify.py](https://github.com/markqvist/Reticulum/blob/master/Examples/Identify.py). ## Requests & Responses The *Request* example explores sending requests and receiving responses. ```default ########################################################## # This RNS example demonstrates how to perform requests # # and receive responses over a link. # ########################################################## import os import sys import time import random import argparse import RNS # Let's define an app name. We'll use this for all # destinations we create. Since this echo example # is part of a range of example utilities, we'll put # them all within the app namespace "example_utilities" APP_NAME = "example_utilities" ########################################################## #### Server Part ######################################### ########################################################## # A reference to the latest client link that connected latest_client_link = None def random_text_generator(path, data, request_id, link_id, remote_identity, requested_at): RNS.log("Generating response to request "+RNS.prettyhexrep(request_id)+" on link "+RNS.prettyhexrep(link_id)) texts = ["They looked up", "On each full moon", "Becky was upset", "I’ll stay away from it", "The pet shop stocks everything"] return texts[random.randint(0, len(texts)-1)] # This initialisation is executed when the users chooses # to run as a server def server(configpath): # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # Randomly create a new identity for our link example server_identity = RNS.Identity() # We create a destination that clients can connect to. We # want clients to create links to this destination, so we # need to create a "single" destination type. server_destination = RNS.Destination( server_identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "requestexample" ) # We configure a function that will get called every time # a new client creates a link to this destination. server_destination.set_link_established_callback(client_connected) # We register a request handler for handling incoming # requests over any established links. server_destination.register_request_handler( "/random/text", response_generator = random_text_generator, allow = RNS.Destination.ALLOW_ALL ) # Everything's ready! # Let's Wait for client requests or user input server_loop(server_destination) def server_loop(destination): # Let the user know that everything is ready RNS.log( "Request example "+ RNS.prettyhexrep(destination.hash)+ " running, waiting for a connection." ) RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)") # We enter a loop that runs until the users exits. # If the user hits enter, we will announce our server # destination on the network, which will let clients # know how to create messages directed towards it. while True: entered = input() destination.announce() RNS.log("Sent announce from "+RNS.prettyhexrep(destination.hash)) # When a client establishes a link to our server # destination, this function will be called with # a reference to the link. def client_connected(link): global latest_client_link RNS.log("Client connected") link.set_link_closed_callback(client_disconnected) latest_client_link = link def client_disconnected(link): RNS.log("Client disconnected") ########################################################## #### Client Part ######################################### ########################################################## # A reference to the server link server_link = None # This initialisation is executed when the users chooses # to run as a client def client(destination_hexhash, configpath): # We need a binary representation of the destination # hash that was entered on the command line try: dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 if len(destination_hexhash) != dest_len: raise ValueError( "Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2) ) destination_hash = bytes.fromhex(destination_hexhash) except: RNS.log("Invalid destination entered. Check your input!\n") sys.exit(0) # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # Check if we know a path to the destination if not RNS.Transport.has_path(destination_hash): RNS.log("Destination is not yet known. Requesting path and waiting for announce to arrive...") RNS.Transport.request_path(destination_hash) while not RNS.Transport.has_path(destination_hash): time.sleep(0.1) # Recall the server identity server_identity = RNS.Identity.recall(destination_hash) # Inform the user that we'll begin connecting RNS.log("Establishing link with server...") # When the server identity is known, we set # up a destination server_destination = RNS.Destination( server_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "requestexample" ) # And create a link link = RNS.Link(server_destination) # We'll set up functions to inform the # user when the link is established or closed link.set_link_established_callback(link_established) link.set_link_closed_callback(link_closed) # Everything is set up, so let's enter a loop # for the user to interact with the example client_loop() def client_loop(): global server_link # Wait for the link to become active while not server_link: time.sleep(0.1) should_quit = False while not should_quit: try: print("> ", end=" ") text = input() # Check if we should quit the example if text == "quit" or text == "q" or text == "exit": should_quit = True server_link.teardown() else: server_link.request( "/random/text", data = None, response_callback = got_response, failed_callback = request_failed ) except Exception as e: RNS.log("Error while sending request over the link: "+str(e)) should_quit = True server_link.teardown() def got_response(request_receipt): request_id = request_receipt.request_id response = request_receipt.response RNS.log("Got response for request "+RNS.prettyhexrep(request_id)+": "+str(response)) def request_received(request_receipt): RNS.log("The request "+RNS.prettyhexrep(request_receipt.request_id)+" was received by the remote peer.") def request_failed(request_receipt): RNS.log("The request "+RNS.prettyhexrep(request_receipt.request_id)+" failed.") # This function is called when a link # has been established with the server def link_established(link): # We store a reference to the link # instance for later use global server_link server_link = link # Inform the user that the server is # connected RNS.log("Link established with server, hit enter to perform a request, or type in \"quit\" to quit") # When a link is closed, we'll inform the # user, and exit the program def link_closed(link): if link.teardown_reason == RNS.Link.TIMEOUT: RNS.log("The link timed out, exiting now") elif link.teardown_reason == RNS.Link.DESTINATION_CLOSED: RNS.log("The link was closed by the server, exiting now") else: RNS.log("Link closed, exiting now") time.sleep(1.5) sys.exit(0) ########################################################## #### Program Startup ##################################### ########################################################## # This part of the program runs at startup, # and parses input of from the user, and then # starts up the desired program mode. if __name__ == "__main__": try: parser = argparse.ArgumentParser(description="Simple request/response example") parser.add_argument( "-s", "--server", action="store_true", help="wait for incoming requests from clients" ) parser.add_argument( "--config", action="store", default=None, help="path to alternative Reticulum config directory", type=str ) parser.add_argument( "destination", nargs="?", default=None, help="hexadecimal hash of the server destination", type=str ) args = parser.parse_args() if args.config: configarg = args.config else: configarg = None if args.server: server(configarg) else: if (args.destination == None): print("") parser.print_help() print("") else: client(args.destination, configarg) except KeyboardInterrupt: print("") sys.exit(0) ``` This example can also be found at [https://github.com/markqvist/Reticulum/blob/master/Examples/Request.py](https://github.com/markqvist/Reticulum/blob/master/Examples/Request.py). ## Channel The *Channel* example explores using a `Channel` to send structured data between peers of a `Link`. ```default ########################################################## # This RNS example demonstrates how to set up a link to # # a destination, and pass structured messages over it # # using a channel. # ########################################################## import os import sys import time import argparse from datetime import datetime import RNS from RNS.vendor import umsgpack # Let's define an app name. We'll use this for all # destinations we create. Since this echo example # is part of a range of example utilities, we'll put # them all within the app namespace "example_utilities" APP_NAME = "example_utilities" ########################################################## #### Shared Objects ###################################### ########################################################## # Channel data must be structured in a subclass of # MessageBase. This ensures that the channel will be able # to serialize and deserialize the object and multiplex it # with other objects. Both ends of a link will need the # same object definitions to be able to communicate over # a channel. # # Note: The objects we wish to use over the channel must # be registered with the channel, and each link has a # different channel instance. See the client_connected # and link_established functions in this example to see # how message types are registered. # Let's make a simple message class called StringMessage # that will convey a string with a timestamp. class StringMessage(RNS.MessageBase): # The MSGTYPE class variable needs to be assigned a # 2 byte integer value. This identifier allows the # channel to look up your message's constructor when a # message arrives over the channel. # # MSGTYPE must be unique across all message types we # register with the channel. MSGTYPEs >= 0xf000 are # reserved for the system. MSGTYPE = 0x0101 # The constructor of our object must be callable with # no arguments. We can have parameters, but they must # have a default assignment. # # This is needed so the channel can create an empty # version of our message into which the incoming # message can be unpacked. def __init__(self, data=None): self.data = data self.timestamp = datetime.now() # Finally, our message needs to implement functions # the channel can call to pack and unpack our message # to/from the raw packet payload. We'll use the # umsgpack package bundled with RNS. We could also use # the struct package bundled with Python if we wanted # more control over the structure of the packed bytes. # # Also note that packed message objects must fit # entirely in one packet. The number of bytes # available for message payloads can be queried from # the channel using the Channel.MDU property. The # channel MDU is slightly less than the link MDU due # to encoding the message header. # The pack function encodes the message contents into # a byte stream. def pack(self) -> bytes: return umsgpack.packb((self.data, self.timestamp)) # And the unpack function decodes a byte stream into # the message contents. def unpack(self, raw): self.data, self.timestamp = umsgpack.unpackb(raw) ########################################################## #### Server Part ######################################### ########################################################## # A reference to the latest client link that connected latest_client_link = None # This initialisation is executed when the users chooses # to run as a server def server(configpath): # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # Randomly create a new identity for our link example server_identity = RNS.Identity() # We create a destination that clients can connect to. We # want clients to create links to this destination, so we # need to create a "single" destination type. server_destination = RNS.Destination( server_identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "channelexample" ) # We configure a function that will get called every time # a new client creates a link to this destination. server_destination.set_link_established_callback(client_connected) # Everything's ready! # Let's Wait for client requests or user input server_loop(server_destination) def server_loop(destination): # Let the user know that everything is ready RNS.log( "Channel example "+ RNS.prettyhexrep(destination.hash)+ " running, waiting for a connection." ) RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)") # We enter a loop that runs until the users exits. # If the user hits enter, we will announce our server # destination on the network, which will let clients # know how to create messages directed towards it. while True: entered = input() destination.announce() RNS.log("Sent announce from "+RNS.prettyhexrep(destination.hash)) # When a client establishes a link to our server # destination, this function will be called with # a reference to the link. def client_connected(link): global latest_client_link latest_client_link = link RNS.log("Client connected") link.set_link_closed_callback(client_disconnected) # Register message types and add callback to channel channel = link.get_channel() channel.register_message_type(StringMessage) channel.add_message_handler(server_message_received) def client_disconnected(link): RNS.log("Client disconnected") def server_message_received(message): """ A message handler @param message: An instance of a subclass of MessageBase @return: True if message was handled """ global latest_client_link # When a message is received over any active link, # the replies will all be directed to the last client # that connected. # In a message handler, any deserializable message # that arrives over the link's channel will be passed # to all message handlers, unless a preceding handler indicates it # has handled the message. # # if isinstance(message, StringMessage): RNS.log("Received data on the link: " + message.data + " (message created at " + str(message.timestamp) + ")") reply_message = StringMessage("I received \""+message.data+"\" over the link") latest_client_link.get_channel().send(reply_message) # Incoming messages are sent to each message # handler added to the channel, in the order they # were added. # If any message handler returns True, the message # is considered handled and any subsequent # handlers are skipped. return True ########################################################## #### Client Part ######################################### ########################################################## # A reference to the server link server_link = None # This initialisation is executed when the users chooses # to run as a client def client(destination_hexhash, configpath): # We need a binary representation of the destination # hash that was entered on the command line try: dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 if len(destination_hexhash) != dest_len: raise ValueError( "Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2) ) destination_hash = bytes.fromhex(destination_hexhash) except: RNS.log("Invalid destination entered. Check your input!\n") sys.exit(0) # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # Check if we know a path to the destination if not RNS.Transport.has_path(destination_hash): RNS.log("Destination is not yet known. Requesting path and waiting for announce to arrive...") RNS.Transport.request_path(destination_hash) while not RNS.Transport.has_path(destination_hash): time.sleep(0.1) # Recall the server identity server_identity = RNS.Identity.recall(destination_hash) # Inform the user that we'll begin connecting RNS.log("Establishing link with server...") # When the server identity is known, we set # up a destination server_destination = RNS.Destination( server_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "channelexample" ) # And create a link link = RNS.Link(server_destination) # We'll also set up functions to inform the # user when the link is established or closed link.set_link_established_callback(link_established) link.set_link_closed_callback(link_closed) # Everything is set up, so let's enter a loop # for the user to interact with the example client_loop() def client_loop(): global server_link # Wait for the link to become active while not server_link: time.sleep(0.1) should_quit = False while not should_quit: try: print("> ", end=" ") text = input() # Check if we should quit the example if text == "quit" or text == "q" or text == "exit": should_quit = True server_link.teardown() # If not, send the entered text over the link if text != "": message = StringMessage(text) packed_size = len(message.pack()) channel = server_link.get_channel() if channel.is_ready_to_send(): if packed_size <= channel.mdu: channel.send(message) else: RNS.log( "Cannot send this packet, the data size of "+ str(packed_size)+" bytes exceeds the link packet MDU of "+ str(channel.MDU)+" bytes", RNS.LOG_ERROR ) else: RNS.log("Channel is not ready to send, please wait for " + "pending messages to complete.", RNS.LOG_ERROR) except Exception as e: RNS.log("Error while sending data over the link: "+str(e)) should_quit = True server_link.teardown() # This function is called when a link # has been established with the server def link_established(link): # We store a reference to the link # instance for later use global server_link server_link = link # Register messages and add handler to channel channel = link.get_channel() channel.register_message_type(StringMessage) channel.add_message_handler(client_message_received) # Inform the user that the server is # connected RNS.log("Link established with server, enter some text to send, or \"quit\" to quit") # When a link is closed, we'll inform the # user, and exit the program def link_closed(link): if link.teardown_reason == RNS.Link.TIMEOUT: RNS.log("The link timed out, exiting now") elif link.teardown_reason == RNS.Link.DESTINATION_CLOSED: RNS.log("The link was closed by the server, exiting now") else: RNS.log("Link closed, exiting now") time.sleep(1.5) sys.exit(0) # When a packet is received over the channel, we # simply print out the data. def client_message_received(message): if isinstance(message, StringMessage): RNS.log("Received data on the link: " + message.data + " (message created at " + str(message.timestamp) + ")") print("> ", end=" ") sys.stdout.flush() ########################################################## #### Program Startup ##################################### ########################################################## # This part of the program runs at startup, # and parses input of from the user, and then # starts up the desired program mode. if __name__ == "__main__": try: parser = argparse.ArgumentParser(description="Simple channel example") parser.add_argument( "-s", "--server", action="store_true", help="wait for incoming link requests from clients" ) parser.add_argument( "--config", action="store", default=None, help="path to alternative Reticulum config directory", type=str ) parser.add_argument( "destination", nargs="?", default=None, help="hexadecimal hash of the server destination", type=str ) args = parser.parse_args() if args.config: configarg = args.config else: configarg = None if args.server: server(configarg) else: if (args.destination == None): print("") parser.print_help() print("") else: client(args.destination, configarg) except KeyboardInterrupt: print("") sys.exit(0) ``` This example can also be found at [https://github.com/markqvist/Reticulum/blob/master/Examples/Channel.py](https://github.com/markqvist/Reticulum/blob/master/Examples/Channel.py). ## Buffer The *Buffer* example explores using buffered readers and writers to send binary data between peers of a `Link`. ```default ########################################################## # This RNS example demonstrates how to set up a link to # # a destination, and pass binary data over it using a # # channel buffer. # ########################################################## from __future__ import annotations import os import sys import time import argparse from datetime import datetime import RNS from RNS.vendor import umsgpack # Let's define an app name. We'll use this for all # destinations we create. Since this echo example # is part of a range of example utilities, we'll put # them all within the app namespace "example_utilities" APP_NAME = "example_utilities" ########################################################## #### Server Part ######################################### ########################################################## # A reference to the latest client link that connected latest_client_link = None # A reference to the latest buffer object latest_buffer = None # This initialisation is executed when the users chooses # to run as a server def server(configpath): # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # Randomly create a new identity for our example server_identity = RNS.Identity() # We create a destination that clients can connect to. We # want clients to create links to this destination, so we # need to create a "single" destination type. server_destination = RNS.Destination( server_identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "bufferexample" ) # We configure a function that will get called every time # a new client creates a link to this destination. server_destination.set_link_established_callback(client_connected) # Everything's ready! # Let's Wait for client requests or user input server_loop(server_destination) def server_loop(destination): # Let the user know that everything is ready RNS.log( "Link buffer example "+ RNS.prettyhexrep(destination.hash)+ " running, waiting for a connection." ) RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)") # We enter a loop that runs until the users exits. # If the user hits enter, we will announce our server # destination on the network, which will let clients # know how to create messages directed towards it. while True: entered = input() destination.announce() RNS.log("Sent announce from "+RNS.prettyhexrep(destination.hash)) # When a client establishes a link to our server # destination, this function will be called with # a reference to the link. def client_connected(link): global latest_client_link, latest_buffer latest_client_link = link RNS.log("Client connected") link.set_link_closed_callback(client_disconnected) # If a new connection is received, the old reader # needs to be disconnected. if latest_buffer: latest_buffer.close() # Create buffer objects. # The stream_id parameter to these functions is # a bit like a file descriptor, except that it # is unique to the *receiver*. # # In this example, both the reader and the writer # use stream_id = 0, but there are actually two # separate unidirectional streams flowing in # opposite directions. # channel = link.get_channel() latest_buffer = RNS.Buffer.create_bidirectional_buffer(0, 0, channel, server_buffer_ready) def client_disconnected(link): RNS.log("Client disconnected") def server_buffer_ready(ready_bytes: int): """ Callback from buffer when buffer has data available :param ready_bytes: The number of bytes ready to read """ global latest_buffer data = latest_buffer.read(ready_bytes) data = data.decode("utf-8") RNS.log("Received data over the buffer: " + data) reply_message = "I received \""+data+"\" over the buffer" reply_message = reply_message.encode("utf-8") latest_buffer.write(reply_message) latest_buffer.flush() ########################################################## #### Client Part ######################################### ########################################################## # A reference to the server link server_link = None # A reference to the buffer object, needed to share the # object from the link connected callback to the client # loop. buffer = None # This initialisation is executed when the users chooses # to run as a client def client(destination_hexhash, configpath): # We need a binary representation of the destination # hash that was entered on the command line try: dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 if len(destination_hexhash) != dest_len: raise ValueError( "Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2) ) destination_hash = bytes.fromhex(destination_hexhash) except: RNS.log("Invalid destination entered. Check your input!\n") sys.exit(0) # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # Check if we know a path to the destination if not RNS.Transport.has_path(destination_hash): RNS.log("Destination is not yet known. Requesting path and waiting for announce to arrive...") RNS.Transport.request_path(destination_hash) while not RNS.Transport.has_path(destination_hash): time.sleep(0.1) # Recall the server identity server_identity = RNS.Identity.recall(destination_hash) # Inform the user that we'll begin connecting RNS.log("Establishing link with server...") # When the server identity is known, we set # up a destination server_destination = RNS.Destination( server_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "bufferexample" ) # And create a link link = RNS.Link(server_destination) # We'll also set up functions to inform the # user when the link is established or closed link.set_link_established_callback(link_established) link.set_link_closed_callback(link_closed) # Everything is set up, so let's enter a loop # for the user to interact with the example client_loop() def client_loop(): global server_link # Wait for the link to become active while not server_link: time.sleep(0.1) should_quit = False while not should_quit: try: print("> ", end=" ") text = input() # Check if we should quit the example if text == "quit" or text == "q" or text == "exit": should_quit = True server_link.teardown() else: # Otherwise, encode the text and write it to the buffer. text = text.encode("utf-8") buffer.write(text) # Flush the buffer to force the data to be sent. buffer.flush() except Exception as e: RNS.log("Error while sending data over the link buffer: "+str(e)) should_quit = True server_link.teardown() # This function is called when a link # has been established with the server def link_established(link): # We store a reference to the link # instance for later use global server_link, buffer server_link = link # Create buffer, see server_client_connected() for # more detail about setting up the buffer. channel = link.get_channel() buffer = RNS.Buffer.create_bidirectional_buffer(0, 0, channel, client_buffer_ready) # Inform the user that the server is # connected RNS.log("Link established with server, enter some text to send, or \"quit\" to quit") # When a link is closed, we'll inform the # user, and exit the program def link_closed(link): if link.teardown_reason == RNS.Link.TIMEOUT: RNS.log("The link timed out, exiting now") elif link.teardown_reason == RNS.Link.DESTINATION_CLOSED: RNS.log("The link was closed by the server, exiting now") else: RNS.log("Link closed, exiting now") time.sleep(1.5) sys.exit(0) # When the buffer has new data, read it and write it to the terminal. def client_buffer_ready(ready_bytes: int): global buffer data = buffer.read(ready_bytes) RNS.log("Received data over the link buffer: " + data.decode("utf-8")) print("> ", end=" ") sys.stdout.flush() ########################################################## #### Program Startup ##################################### ########################################################## # This part of the program runs at startup, # and parses input of from the user, and then # starts up the desired program mode. if __name__ == "__main__": try: parser = argparse.ArgumentParser(description="Simple buffer example") parser.add_argument( "-s", "--server", action="store_true", help="wait for incoming link requests from clients" ) parser.add_argument( "--config", action="store", default=None, help="path to alternative Reticulum config directory", type=str ) parser.add_argument( "destination", nargs="?", default=None, help="hexadecimal hash of the server destination", type=str ) args = parser.parse_args() if args.config: configarg = args.config else: configarg = None if args.server: server(configarg) else: if (args.destination == None): print("") parser.print_help() print("") else: client(args.destination, configarg) except KeyboardInterrupt: print("") sys.exit(0) ``` This example can also be found at [https://github.com/markqvist/Reticulum/blob/master/Examples/Buffer.py](https://github.com/markqvist/Reticulum/blob/master/Examples/Buffer.py). ## Filetransfer The *Filetransfer* example implements a basic file-server program that allow clients to connect and download files. The program uses the Resource interface to efficiently pass files of any size over a Reticulum [Link](reference.md#api-link). ```default ########################################################## # This RNS example demonstrates a simple filetransfer # # server and client program. The server will serve a # # directory of files, and the clients can list and # # download files from the server. # # # # Please note that using RNS Resources for large file # # transfers is not recommended, since compression, # # encryption and hashmap sequencing can take a long time # # on systems with slow CPUs, which will probably result # # in the client timing out before the resource sender # # can complete preparing the resource. # # # # If you need to transfer large files, use the Bundle # # class instead, which will automatically slice the data # # into chunks suitable for packing as a Resource. # ########################################################## import os import sys import time import threading import argparse import RNS import RNS.vendor.umsgpack as umsgpack # Let's define an app name. We'll use this for all # destinations we create. Since this echo example # is part of a range of example utilities, we'll put # them all within the app namespace "example_utilities" APP_NAME = "example_utilities" # We'll also define a default timeout, in seconds APP_TIMEOUT = 45.0 ########################################################## #### Server Part ######################################### ########################################################## serve_path = None # This initialisation is executed when the users chooses # to run as a server def server(configpath, path): # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # Randomly create a new identity for our file server server_identity = RNS.Identity() global serve_path serve_path = path # We create a destination that clients can connect to. We # want clients to create links to this destination, so we # need to create a "single" destination type. server_destination = RNS.Destination( server_identity, RNS.Destination.IN, RNS.Destination.SINGLE, APP_NAME, "filetransfer", "server" ) # We configure a function that will get called every time # a new client creates a link to this destination. server_destination.set_link_established_callback(client_connected) # Everything's ready! # Let's Wait for client requests or user input announceLoop(server_destination) def announceLoop(destination): # Let the user know that everything is ready RNS.log("File server "+RNS.prettyhexrep(destination.hash)+" running") RNS.log("Hit enter to manually send an announce (Ctrl-C to quit)") # We enter a loop that runs until the users exits. # If the user hits enter, we will announce our server # destination on the network, which will let clients # know how to create messages directed towards it. while True: entered = input() destination.announce() RNS.log("Sent announce from "+RNS.prettyhexrep(destination.hash)) # Here's a convenience function for listing all files # in our served directory def list_files(): # We add all entries from the directory that are # actual files, and does not start with "." global serve_path return [file for file in os.listdir(serve_path) if os.path.isfile(os.path.join(serve_path, file)) and file[:1] != "."] # When a client establishes a link to our server # destination, this function will be called with # a reference to the link. We then send the client # a list of files hosted on the server. def client_connected(link): # Check if the served directory still exists if os.path.isdir(serve_path): RNS.log("Client connected, sending file list...") link.set_link_closed_callback(client_disconnected) # We pack a list of files for sending in a packet data = umsgpack.packb(list_files()) # Check the size of the packed data if len(data) <= RNS.Link.MDU: # If it fits in one packet, we will just # send it as a single packet over the link. list_packet = RNS.Packet(link, data) list_receipt = list_packet.send() list_receipt.set_timeout(APP_TIMEOUT) list_receipt.set_delivery_callback(list_delivered) list_receipt.set_timeout_callback(list_timeout) else: RNS.log("Too many files in served directory!", RNS.LOG_ERROR) RNS.log("You should implement a function to split the filelist over multiple packets.", RNS.LOG_ERROR) RNS.log("Hint: The client already supports it :)", RNS.LOG_ERROR) # After this, we're just going to keep the link # open until the client requests a file. We'll # configure a function that get's called when # the client sends a packet with a file request. link.set_packet_callback(client_request) else: RNS.log("Client connected, but served path no longer exists!", RNS.LOG_ERROR) link.teardown() def client_disconnected(link): RNS.log("Client disconnected") def client_request(message, packet): global serve_path try: filename = message.decode("utf-8") except Exception as e: filename = None if filename in list_files(): try: # If we have the requested file, we'll # read it and pack it as a resource RNS.log("Client requested \""+filename+"\"") file = open(os.path.join(serve_path, filename), "rb") file_resource = RNS.Resource( file, packet.link, callback=resource_sending_concluded ) file_resource.filename = filename except Exception as e: # If somethign went wrong, we close # the link RNS.log("Error while reading file \""+filename+"\"", RNS.LOG_ERROR) packet.link.teardown() raise e else: # If we don't have it, we close the link RNS.log("Client requested an unknown file") packet.link.teardown() # This function is called on the server when a # resource transfer concludes. def resource_sending_concluded(resource): if hasattr(resource, "filename"): name = resource.filename else: name = "resource" if resource.status == RNS.Resource.COMPLETE: RNS.log("Done sending \""+name+"\" to client") elif resource.status == RNS.Resource.FAILED: RNS.log("Sending \""+name+"\" to client failed") def list_delivered(receipt): RNS.log("The file list was received by the client") def list_timeout(receipt): RNS.log("Sending list to client timed out, closing this link") link = receipt.destination link.teardown() ########################################################## #### Client Part ######################################### ########################################################## # We store a global list of files available on the server server_files = [] # A reference to the server link server_link = None # And a reference to the current download current_download = None current_filename = None # Variables to store download statistics download_started = 0 download_finished = 0 download_time = 0 transfer_size = 0 file_size = 0 # This initialisation is executed when the users chooses # to run as a client def client(destination_hexhash, configpath): # We need a binary representation of the destination # hash that was entered on the command line try: dest_len = (RNS.Reticulum.TRUNCATED_HASHLENGTH//8)*2 if len(destination_hexhash) != dest_len: raise ValueError( "Destination length is invalid, must be {hex} hexadecimal characters ({byte} bytes).".format(hex=dest_len, byte=dest_len//2) ) destination_hash = bytes.fromhex(destination_hexhash) except: RNS.log("Invalid destination entered. Check your input!\n") sys.exit(0) # We must first initialise Reticulum reticulum = RNS.Reticulum(configpath) # Check if we know a path to the destination if not RNS.Transport.has_path(destination_hash): RNS.log("Destination is not yet known. Requesting path and waiting for announce to arrive...") RNS.Transport.request_path(destination_hash) while not RNS.Transport.has_path(destination_hash): time.sleep(0.1) # Recall the server identity server_identity = RNS.Identity.recall(destination_hash) # Inform the user that we'll begin connecting RNS.log("Establishing link with server...") # When the server identity is known, we set # up a destination server_destination = RNS.Destination( server_identity, RNS.Destination.OUT, RNS.Destination.SINGLE, APP_NAME, "filetransfer", "server" ) # We also want to automatically prove incoming packets server_destination.set_proof_strategy(RNS.Destination.PROVE_ALL) # And create a link link = RNS.Link(server_destination) # We expect any normal data packets on the link # to contain a list of served files, so we set # a callback accordingly link.set_packet_callback(filelist_received) # We'll also set up functions to inform the # user when the link is established or closed link.set_link_established_callback(link_established) link.set_link_closed_callback(link_closed) # And set the link to automatically begin # downloading advertised resources link.set_resource_strategy(RNS.Link.ACCEPT_ALL) link.set_resource_started_callback(download_began) link.set_resource_concluded_callback(download_concluded) menu() # Requests the specified file from the server def download(filename): global server_link, menu_mode, current_filename, transfer_size, download_started current_filename = filename download_started = 0 transfer_size = 0 # We just create a packet containing the # requested filename, and send it down the # link. We also specify we don't need a # packet receipt. request_packet = RNS.Packet(server_link, filename.encode("utf-8"), create_receipt=False) request_packet.send() print("") print(("Requested \""+filename+"\" from server, waiting for download to begin...")) menu_mode = "download_started" # This function runs a simple menu for the user # to select which files to download, or quit menu_mode = None def menu(): global server_files, server_link # Wait until we have a filelist while len(server_files) == 0: time.sleep(0.1) RNS.log("Ready!") time.sleep(0.5) global menu_mode menu_mode = "main" should_quit = False while (not should_quit): print_menu() while not menu_mode == "main": # Wait time.sleep(0.25) user_input = input() if user_input == "q" or user_input == "quit" or user_input == "exit": should_quit = True print("") else: if user_input in server_files: download(user_input) else: try: if 0 <= int(user_input) < len(server_files): download(server_files[int(user_input)]) except: pass if should_quit: server_link.teardown() # Prints out menus or screens for the # various states of the client program. # It's simple and quite uninteresting. # I won't go into detail here. Just # strings basically. def print_menu(): global menu_mode, download_time, download_started, download_finished, transfer_size, file_size if menu_mode == "main": clear_screen() print_filelist() print("") print("Select a file to download by entering name or number, or q to quit") print(("> "), end=' ') elif menu_mode == "download_started": download_began = time.time() while menu_mode == "download_started": time.sleep(0.1) if time.time() > download_began+APP_TIMEOUT: print("The download timed out") time.sleep(1) server_link.teardown() if menu_mode == "downloading": print("Download started") print("") while menu_mode == "downloading": global current_download percent = round(current_download.get_progress() * 100.0, 1) print(("\rProgress: "+str(percent)+" % "), end=' ') sys.stdout.flush() time.sleep(0.1) if menu_mode == "save_error": print(("\rProgress: 100.0 %"), end=' ') sys.stdout.flush() print("") print("Could not write downloaded file to disk") current_download.status = RNS.Resource.FAILED menu_mode = "download_concluded" if menu_mode == "download_concluded": if current_download.status == RNS.Resource.COMPLETE: print(("\rProgress: 100.0 %"), end=' ') sys.stdout.flush() # Print statistics hours, rem = divmod(download_time, 3600) minutes, seconds = divmod(rem, 60) timestring = "{:0>2}:{:0>2}:{:05.2f}".format(int(hours),int(minutes),seconds) print("") print("") print("--- Statistics -----") print("\tTime taken : "+timestring) print("\tFile size : "+size_str(file_size)) print("\tData transferred : "+size_str(transfer_size)) print("\tEffective rate : "+size_str(file_size/download_time, suffix='b')+"/s") print("\tTransfer rate : "+size_str(transfer_size/download_time, suffix='b')+"/s") print("") print("The download completed! Press enter to return to the menu.") print("") input() else: print("") print("The download failed! Press enter to return to the menu.") input() current_download = None menu_mode = "main" print_menu() # This function prints out a list of files # on the connected server. def print_filelist(): global server_files print("Files on server:") for index,file in enumerate(server_files): print("\t("+str(index)+")\t"+file) def filelist_received(filelist_data, packet): global server_files, menu_mode try: # Unpack the list and extend our # local list of available files filelist = umsgpack.unpackb(filelist_data) for file in filelist: if not file in server_files: server_files.append(file) # If the menu is already visible, # we'll update it with what was # just received if menu_mode == "main": print_menu() except: RNS.log("Invalid file list data received, closing link") packet.link.teardown() # This function is called when a link # has been established with the server def link_established(link): # We store a reference to the link # instance for later use global server_link server_link = link # Inform the user that the server is # connected RNS.log("Link established with server") RNS.log("Waiting for filelist...") # And set up a small job to check for # a potential timeout in receiving the # file list thread = threading.Thread(target=filelist_timeout_job, daemon=True) thread.start() # This job just sleeps for the specified # time, and then checks if the file list # was received. If not, the program will # exit. def filelist_timeout_job(): time.sleep(APP_TIMEOUT) global server_files if len(server_files) == 0: RNS.log("Timed out waiting for filelist, exiting") sys.exit(0) # When a link is closed, we'll inform the # user, and exit the program def link_closed(link): if link.teardown_reason == RNS.Link.TIMEOUT: RNS.log("The link timed out, exiting now") elif link.teardown_reason == RNS.Link.DESTINATION_CLOSED: RNS.log("The link was closed by the server, exiting now") else: RNS.log("Link closed, exiting now") time.sleep(1.5) sys.exit(0) # When RNS detects that the download has # started, we'll update our menu state # so the user can be shown a progress of # the download. def download_began(resource): global menu_mode, current_download, download_started, transfer_size, file_size current_download = resource if download_started == 0: download_started = time.time() transfer_size += resource.size file_size = resource.total_size menu_mode = "downloading" # When the download concludes, successfully # or not, we'll update our menu state and # inform the user about how it all went. def download_concluded(resource): global menu_mode, current_filename, download_started, download_finished, download_time download_finished = time.time() download_time = download_finished - download_started saved_filename = current_filename if resource.status == RNS.Resource.COMPLETE: counter = 0 while os.path.isfile(saved_filename): counter += 1 saved_filename = current_filename+"."+str(counter) try: file = open(saved_filename, "wb") file.write(resource.data.read()) file.close() menu_mode = "download_concluded" except: menu_mode = "save_error" else: menu_mode = "download_concluded" # A convenience function for printing a human- # readable file size def size_str(num, suffix='B'): units = ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi'] last_unit = 'Yi' if suffix == 'b': num *= 8 units = ['','K','M','G','T','P','E','Z'] last_unit = 'Y' for unit in units: if abs(num) < 1024.0: return "%3.2f %s%s" % (num, unit, suffix) num /= 1024.0 return "%.2f %s%s" % (num, last_unit, suffix) # A convenience function for clearing the screen def clear_screen(): os.system('cls' if os.name=='nt' else 'clear') ########################################################## #### Program Startup ##################################### ########################################################## # This part of the program runs at startup, # and parses input of from the user, and then # starts up the desired program mode. if __name__ == "__main__": try: parser = argparse.ArgumentParser( description="Simple file transfer server and client utility" ) parser.add_argument( "-s", "--serve", action="store", metavar="dir", help="serve a directory of files to clients" ) parser.add_argument( "--config", action="store", default=None, help="path to alternative Reticulum config directory", type=str ) parser.add_argument( "destination", nargs="?", default=None, help="hexadecimal hash of the server destination", type=str ) args = parser.parse_args() if args.config: configarg = args.config else: configarg = None if args.serve: if os.path.isdir(args.serve): server(configarg, args.serve) else: RNS.log("The specified directory does not exist") else: if (args.destination == None): print("") parser.print_help() print("") else: client(args.destination, configarg) except KeyboardInterrupt: print("") sys.exit(0) ``` This example can also be found at [https://github.com/markqvist/Reticulum/blob/master/Examples/Filetransfer.py](https://github.com/markqvist/Reticulum/blob/master/Examples/Filetransfer.py). ## Custom Interfaces The *ExampleInterface* demonstrates creating custom interfaces for Reticulum. Any number of custom interfaces can be loaded and utilised by Reticulum, and will be fully on-par with natively included interfaces, including all supported [interface modes](interfaces.md#interfaces-modes) and [common configuration options](interfaces.md#interfaces-options). ```default # This example illustrates creating a custom interface # definition, that can be loaded and used by Reticulum at # runtime. Any number of custom interfaces can be created # and loaded. To use the interface place it in the folder # ~/.reticulum/interfaces, and add an interface entry to # your Reticulum configuration file similar to this: # [[Example Custom Interface]] # type = ExampleInterface # enabled = no # mode = gateway # port = /dev/ttyUSB0 # speed = 115200 # databits = 8 # parity = none # stopbits = 1 from time import sleep import sys import threading import time # This HDLC helper class is used by the interface # to delimit and packetize data over the physical # medium - in this case a serial connection. class HDLC(): # This example interface packetizes data using # simplified HDLC framing, similar to PPP FLAG = 0x7E ESC = 0x7D ESC_MASK = 0x20 @staticmethod def escape(data): data = data.replace(bytes([HDLC.ESC]), bytes([HDLC.ESC, HDLC.ESC^HDLC.ESC_MASK])) data = data.replace(bytes([HDLC.FLAG]), bytes([HDLC.ESC, HDLC.FLAG^HDLC.ESC_MASK])) return data # Let's define our custom interface class. It must # be a sub-class of the RNS "Interface" class. class ExampleInterface(Interface): # All interface classes must define a default # IFAC size, used in IFAC setup when the user # has not specified a custom IFAC size. This # option is specified in bytes. DEFAULT_IFAC_SIZE = 8 # The following properties are local to this # particular interface implementation. owner = None port = None speed = None databits = None parity = None stopbits = None serial = None # All Reticulum interfaces must have an __init__ # method that takes 2 positional arguments: # The owner RNS Transport instance, and a dict # of configuration values. def __init__(self, owner, configuration): # The following lines demonstrate handling # potential dependencies required for the # interface to function correctly. import importlib if importlib.util.find_spec('serial') != None: import serial else: RNS.log("Using this interface requires a serial communication module to be installed.", RNS.LOG_CRITICAL) RNS.log("You can install one with the command: python3 -m pip install pyserial", RNS.LOG_CRITICAL) RNS.panic() # We start out by initialising the super-class super().__init__() # To make sure the configuration data is in the # correct format, we parse it through the following # method on the generic Interface class. This step # is required to ensure compatibility on all the # platforms that Reticulum supports. ifconf = Interface.get_config_obj(configuration) # Read the interface name from the configuration # and set it on our interface instance. name = ifconf["name"] self.name = name # We read configuration parameters from the supplied # configuration data, and provide default values in # case any are missing. port = ifconf["port"] if "port" in ifconf else None speed = int(ifconf["speed"]) if "speed" in ifconf else 9600 databits = int(ifconf["databits"]) if "databits" in ifconf else 8 parity = ifconf["parity"] if "parity" in ifconf else "N" stopbits = int(ifconf["stopbits"]) if "stopbits" in ifconf else 1 # In case no port is specified, we abort setup by # raising an exception. if port == None: raise ValueError(f"No port specified for {self}") # All interfaces must supply a hardware MTU value # to the RNS Transport instance. This value should # be the maximum data packet payload size that the # underlying medium is capable of handling in all # cases without any segmentation. self.HW_MTU = 564 # We initially set the "online" property to false, # since the interface has not actually been fully # initialised and connected yet. self.online = False # In this case, we can also set the indicated bit- # rate of the interface to the serial port speed. self.bitrate = speed # Configure internal properties on the interface # according to the supplied configuration. self.pyserial = serial self.serial = None self.owner = owner self.port = port self.speed = speed self.databits = databits self.parity = serial.PARITY_NONE self.stopbits = stopbits self.timeout = 100 if parity.lower() == "e" or parity.lower() == "even": self.parity = serial.PARITY_EVEN if parity.lower() == "o" or parity.lower() == "odd": self.parity = serial.PARITY_ODD # Since all required parameters are now configured, # we will try opening the serial port. try: self.open_port() except Exception as e: RNS.log("Could not open serial port for interface "+str(self), RNS.LOG_ERROR) raise e # If opening the port succeeded, run any post-open # configuration required. if self.serial.is_open: self.configure_device() else: raise IOError("Could not open serial port") # Open the serial port with supplied configuration # parameters and store a reference to the open port. def open_port(self): RNS.log("Opening serial port "+self.port+"...", RNS.LOG_VERBOSE) self.serial = self.pyserial.Serial( port = self.port, baudrate = self.speed, bytesize = self.databits, parity = self.parity, stopbits = self.stopbits, xonxoff = False, rtscts = False, timeout = 0, inter_byte_timeout = None, write_timeout = None, dsrdtr = False, ) # The only thing required after opening the port # is to wait a small amount of time for the # hardware to initialise and then start a thread # that reads any incoming data from the device. def configure_device(self): sleep(0.5) thread = threading.Thread(target=self.read_loop) thread.daemon = True thread.start() self.online = True RNS.log("Serial port "+self.port+" is now open", RNS.LOG_VERBOSE) # This method will be called from our read-loop # whenever a full packet has been received over # the underlying medium. def process_incoming(self, data): # Update our received bytes counter self.rxb += len(data) # And send the data packet to the Transport # instance for processing. self.owner.inbound(data, self) # The running Reticulum Transport instance will # call this method on the interface whenever the # interface must transmit a packet. def process_outgoing(self,data): if self.online: # First, escape and packetize the data # according to HDLC framing. data = bytes([HDLC.FLAG])+HDLC.escape(data)+bytes([HDLC.FLAG]) # Then write the framed data to the port written = self.serial.write(data) # Update the transmitted bytes counter # and ensure that all data was written self.txb += len(data) if written != len(data): raise IOError("Serial interface only wrote "+str(written)+" bytes of "+str(len(data))) # This read loop runs in a thread and continously # receives bytes from the underlying serial port. # When a full packet has been received, it will # be sent to the process_incoming methed, which # will in turn pass it to the Transport instance. def read_loop(self): try: in_frame = False escape = False data_buffer = b"" last_read_ms = int(time.time()*1000) while self.serial.is_open: if self.serial.in_waiting: byte = ord(self.serial.read(1)) last_read_ms = int(time.time()*1000) if (in_frame and byte == HDLC.FLAG): in_frame = False self.process_incoming(data_buffer) elif (byte == HDLC.FLAG): in_frame = True data_buffer = b"" elif (in_frame and len(data_buffer) < self.HW_MTU): if (byte == HDLC.ESC): escape = True else: if (escape): if (byte == HDLC.FLAG ^ HDLC.ESC_MASK): byte = HDLC.FLAG if (byte == HDLC.ESC ^ HDLC.ESC_MASK): byte = HDLC.ESC escape = False data_buffer = data_buffer+bytes([byte]) else: time_since_last = int(time.time()*1000) - last_read_ms if len(data_buffer) > 0 and time_since_last > self.timeout: data_buffer = b"" in_frame = False escape = False sleep(0.08) except Exception as e: self.online = False RNS.log("A serial port error occurred, the contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("The interface "+str(self)+" experienced an unrecoverable error and is now offline.", RNS.LOG_ERROR) if RNS.Reticulum.panic_on_interface_error: RNS.panic() RNS.log("Reticulum will attempt to reconnect the interface periodically.", RNS.LOG_ERROR) self.online = False self.serial.close() self.reconnect_port() # This method handles serial port disconnects. def reconnect_port(self): while not self.online: try: time.sleep(5) RNS.log("Attempting to reconnect serial port "+str(self.port)+" for "+str(self)+"...", RNS.LOG_VERBOSE) self.open_port() if self.serial.is_open: self.configure_device() except Exception as e: RNS.log("Error while reconnecting port, the contained exception was: "+str(e), RNS.LOG_ERROR) RNS.log("Reconnected serial port for "+str(self)) # Signal to Reticulum that this interface should # not perform any ingress limiting. def should_ingress_limit(self): return False # We must provide a string representation of this # interface, that is used whenever the interface # is printed in logs or external programs. def __str__(self): return "ExampleInterface["+self.name+"]" # Finally, register the defined interface class as the # target class for Reticulum to use as an interface interface_class = ExampleInterface ``` This example can also be found at [https://github.com/markqvist/Reticulum/blob/master/Examples/ExampleInterface.py](https://github.com/markqvist/Reticulum/blob/master/Examples/ExampleInterface.py).