mirror of
https://github.com/Next-Flip/Momentum-Firmware.git
synced 2026-07-02 22:18:56 -07:00
Merge branch 'dev' of https://github.com/ClaraCrazy/Flipper-Xtreme into badusb-ble
This commit is contained in:
+51
-23
@@ -1,5 +1,5 @@
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional, Tuple
|
||||
from typing import List, Optional, Tuple, Callable
|
||||
from enum import Enum
|
||||
import os
|
||||
|
||||
@@ -72,6 +72,9 @@ class FlipperApplication:
|
||||
_appdir: Optional[object] = None
|
||||
_apppath: Optional[str] = None
|
||||
|
||||
def supports_hardware_target(self, target: str):
|
||||
return target in self.targets or "all" in self.targets
|
||||
|
||||
|
||||
class AppManager:
|
||||
def __init__(self):
|
||||
@@ -159,15 +162,26 @@ class AppBuildset:
|
||||
FlipperAppType.STARTUP,
|
||||
)
|
||||
|
||||
def __init__(self, appmgr: AppManager, appnames: List[str], hw_target: str):
|
||||
@staticmethod
|
||||
def print_writer(message):
|
||||
print(message)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
appmgr: AppManager,
|
||||
appnames: List[str],
|
||||
hw_target: str,
|
||||
message_writer: Callable = None,
|
||||
):
|
||||
self.appmgr = appmgr
|
||||
self.appnames = set(appnames)
|
||||
self.hw_target = hw_target
|
||||
self._orig_appnames = appnames
|
||||
self.hw_target = hw_target
|
||||
self._writer = message_writer if message_writer else self.print_writer
|
||||
self._process_deps()
|
||||
self._filter_by_target()
|
||||
self._check_conflicts()
|
||||
self._check_unsatisfied() # unneeded?
|
||||
self._check_target_match()
|
||||
self.apps = sorted(
|
||||
list(map(self.appmgr.get, self.appnames)),
|
||||
key=lambda app: app.appid,
|
||||
@@ -176,28 +190,32 @@ class AppBuildset:
|
||||
def _is_missing_dep(self, dep_name: str):
|
||||
return dep_name not in self.appnames
|
||||
|
||||
def _filter_by_target(self):
|
||||
for appname in self.appnames.copy():
|
||||
app = self.appmgr.get(appname)
|
||||
# if app.apptype not in self.BUILTIN_APP_TYPES:
|
||||
if not any(map(lambda t: t in app.targets, ["all", self.hw_target])):
|
||||
print(
|
||||
f"Removing {appname} due to target mismatch (building for {self.hw_target}, app supports {app.targets}"
|
||||
)
|
||||
self.appnames.remove(appname)
|
||||
def _check_if_app_target_supported(self, app_name: str):
|
||||
return self.appmgr.get(app_name).supports_hardware_target(self.hw_target)
|
||||
|
||||
def _get_app_depends(self, app_name: str) -> List[str]:
|
||||
# Skip app if its target is not supported by the target we are building for
|
||||
if not self._check_if_app_target_supported(app_name):
|
||||
self._writer(
|
||||
f"Skipping {app_name} due to target mismatch (building for {self.hw_target}, app supports {app_def.targets}"
|
||||
)
|
||||
return []
|
||||
|
||||
app_def = self.appmgr.get(app_name)
|
||||
return list(
|
||||
filter(
|
||||
self._check_if_app_target_supported,
|
||||
filter(self._is_missing_dep, app_def.provides + app_def.requires),
|
||||
)
|
||||
)
|
||||
|
||||
def _process_deps(self):
|
||||
while True:
|
||||
provided = []
|
||||
for app in self.appnames:
|
||||
# print(app)
|
||||
provided.extend(
|
||||
filter(
|
||||
self._is_missing_dep,
|
||||
self.appmgr.get(app).provides + self.appmgr.get(app).requires,
|
||||
)
|
||||
)
|
||||
# print("provides round", provided)
|
||||
for app_name in self.appnames:
|
||||
provided.extend(self._get_app_depends(app_name))
|
||||
|
||||
# print("provides round: ", provided)
|
||||
if len(provided) == 0:
|
||||
break
|
||||
self.appnames.update(provided)
|
||||
@@ -205,7 +223,6 @@ class AppBuildset:
|
||||
def _check_conflicts(self):
|
||||
conflicts = []
|
||||
for app in self.appnames:
|
||||
# print(app)
|
||||
if conflict_app_name := list(
|
||||
filter(
|
||||
lambda dep_name: dep_name in self.appnames,
|
||||
@@ -232,6 +249,17 @@ class AppBuildset:
|
||||
f"Unsatisfied dependencies for {', '.join(f'{missing_dep[0]}: {missing_dep[1]}' for missing_dep in unsatisfied)}"
|
||||
)
|
||||
|
||||
def _check_target_match(self):
|
||||
incompatible = []
|
||||
for app in self.appnames:
|
||||
if not self.appmgr.get(app).supports_hardware_target(self.hw_target):
|
||||
incompatible.append(app)
|
||||
|
||||
if len(incompatible):
|
||||
raise AppBuilderException(
|
||||
f"Apps incompatible with target {self.hw_target}: {', '.join(incompatible)}"
|
||||
)
|
||||
|
||||
def get_apps_cdefs(self):
|
||||
cdefs = set()
|
||||
for app in self.apps:
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
from SCons.Builder import Builder
|
||||
from SCons.Action import Action
|
||||
from SCons.Errors import StopError
|
||||
from SCons.Warnings import warn, WarningOnByDefault
|
||||
from ansi.color import fg
|
||||
|
||||
@@ -32,9 +33,13 @@ def LoadAppManifest(env, entry):
|
||||
|
||||
|
||||
def PrepareApplicationsBuild(env):
|
||||
appbuild = env["APPBUILD"] = env["APPMGR"].filter_apps(
|
||||
env["APPS"], env.subst("f${TARGET_HW}")
|
||||
)
|
||||
try:
|
||||
appbuild = env["APPBUILD"] = env["APPMGR"].filter_apps(
|
||||
env["APPS"], env.subst("f${TARGET_HW}")
|
||||
)
|
||||
except Exception as e:
|
||||
raise StopError(e)
|
||||
|
||||
env.Append(
|
||||
SDK_HEADERS=appbuild.get_sdk_headers(),
|
||||
)
|
||||
|
||||
@@ -0,0 +1,134 @@
|
||||
from SCons.Builder import Builder
|
||||
from SCons.Action import Action
|
||||
import json
|
||||
|
||||
|
||||
class HardwareTargetLoader:
|
||||
def __init__(self, env, target_scons_dir, target_id):
|
||||
self.env = env
|
||||
self.target_scons_dir = target_scons_dir
|
||||
self.target_dir = self._getTargetDir(target_id)
|
||||
# self.target_id = target_id
|
||||
self.layered_target_dirs = []
|
||||
|
||||
self.include_paths = []
|
||||
self.sdk_header_paths = []
|
||||
self.startup_script = None
|
||||
self.linker_script_flash = None
|
||||
self.linker_script_ram = None
|
||||
self.linker_script_app = None
|
||||
self.sdk_symbols = None
|
||||
self.linker_dependencies = []
|
||||
self.excluded_sources = []
|
||||
self.excluded_headers = []
|
||||
self.excluded_modules = []
|
||||
self._processTargetDefinitions(target_id)
|
||||
|
||||
def _getTargetDir(self, target_id):
|
||||
return self.target_scons_dir.Dir(f"f{target_id}")
|
||||
|
||||
def _loadDescription(self, target_id):
|
||||
target_json_file = self._getTargetDir(target_id).File("target.json")
|
||||
if not target_json_file.exists():
|
||||
raise Exception(f"Target file {target_json_file} does not exist")
|
||||
with open(target_json_file.get_abspath(), "r") as f:
|
||||
vals = json.load(f)
|
||||
return vals
|
||||
|
||||
def _processTargetDefinitions(self, target_id):
|
||||
self.layered_target_dirs.append(f"targets/f{target_id}")
|
||||
|
||||
config = self._loadDescription(target_id)
|
||||
|
||||
for path_list in ("include_paths", "sdk_header_paths"):
|
||||
getattr(self, path_list).extend(
|
||||
f"#/firmware/targets/f{target_id}/{p}"
|
||||
for p in config.get(path_list, [])
|
||||
)
|
||||
|
||||
self.excluded_sources.extend(config.get("excluded_sources", []))
|
||||
self.excluded_headers.extend(config.get("excluded_headers", []))
|
||||
self.excluded_modules.extend(config.get("excluded_modules", []))
|
||||
|
||||
file_attrs = (
|
||||
# (name, use_src_node)
|
||||
("startup_script", False),
|
||||
("linker_script_flash", True),
|
||||
("linker_script_ram", True),
|
||||
("linker_script_app", True),
|
||||
("sdk_symbols", True),
|
||||
)
|
||||
|
||||
for attr_name, use_src_node in file_attrs:
|
||||
if (val := config.get(attr_name)) and not getattr(self, attr_name):
|
||||
node = self.env.File(f"firmware/targets/f{target_id}/{val}")
|
||||
if use_src_node:
|
||||
node = node.srcnode()
|
||||
setattr(self, attr_name, node)
|
||||
|
||||
for attr_name in ("linker_dependencies",):
|
||||
if (val := config.get(attr_name)) and not getattr(self, attr_name):
|
||||
setattr(self, attr_name, val)
|
||||
|
||||
if inherited_target := config.get("inherit", None):
|
||||
self._processTargetDefinitions(inherited_target)
|
||||
|
||||
def gatherSources(self):
|
||||
sources = [self.startup_script]
|
||||
seen_filenames = set(self.excluded_sources)
|
||||
# print("Layers: ", self.layered_target_dirs)
|
||||
for target_dir in self.layered_target_dirs:
|
||||
accepted_sources = list(
|
||||
filter(
|
||||
lambda f: f.name not in seen_filenames,
|
||||
self.env.GlobRecursive("*.c", target_dir),
|
||||
)
|
||||
)
|
||||
seen_filenames.update(f.name for f in accepted_sources)
|
||||
sources.extend(accepted_sources)
|
||||
# print(f"Found {len(sources)} sources: {list(f.name for f in sources)}")
|
||||
return sources
|
||||
|
||||
def gatherSdkHeaders(self):
|
||||
sdk_headers = []
|
||||
seen_sdk_headers = set(self.excluded_headers)
|
||||
for sdk_path in self.sdk_header_paths:
|
||||
# dirty, but fast - exclude headers from overlayed targets by name
|
||||
# proper way would be to use relative paths, but names will do for now
|
||||
for header in self.env.GlobRecursive("*.h", sdk_path, "*_i.h"):
|
||||
if header.name not in seen_sdk_headers:
|
||||
seen_sdk_headers.add(header.name)
|
||||
sdk_headers.append(header)
|
||||
return sdk_headers
|
||||
|
||||
|
||||
def ConfigureForTarget(env, target_id):
|
||||
target_loader = HardwareTargetLoader(env, env.Dir("#/firmware/targets"), target_id)
|
||||
env.Replace(
|
||||
TARGET_CFG=target_loader,
|
||||
SDK_DEFINITION=target_loader.sdk_symbols,
|
||||
SKIP_MODULES=target_loader.excluded_modules,
|
||||
)
|
||||
|
||||
env.Append(
|
||||
CPPPATH=target_loader.include_paths,
|
||||
SDK_HEADERS=target_loader.gatherSdkHeaders(),
|
||||
)
|
||||
|
||||
|
||||
def ApplyLibFlags(env):
|
||||
flags_to_apply = env["FW_LIB_OPTS"].get(
|
||||
env.get("FW_LIB_NAME"),
|
||||
env["FW_LIB_OPTS"]["Default"],
|
||||
)
|
||||
# print("Flags for ", env.get("FW_LIB_NAME", "Default"), flags_to_apply)
|
||||
env.MergeFlags(flags_to_apply)
|
||||
|
||||
|
||||
def generate(env):
|
||||
env.AddMethod(ConfigureForTarget)
|
||||
env.AddMethod(ApplyLibFlags)
|
||||
|
||||
|
||||
def exists(env):
|
||||
return True
|
||||
@@ -22,6 +22,8 @@ def BuildModule(env, module):
|
||||
def BuildModules(env, modules):
|
||||
result = []
|
||||
for module in modules:
|
||||
if module in env.get("SKIP_MODULES", []):
|
||||
continue
|
||||
build_res = env.BuildModule(module)
|
||||
# print("module ", module, build_res)
|
||||
if build_res is None:
|
||||
|
||||
Executable
+459
@@ -0,0 +1,459 @@
|
||||
#!/usr/bin/env python3
|
||||
import typing
|
||||
import subprocess
|
||||
import logging
|
||||
import time
|
||||
import os
|
||||
import socket
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
from flipper.app import App
|
||||
|
||||
|
||||
class Programmer(ABC):
|
||||
@abstractmethod
|
||||
def flash(self, bin: str) -> bool:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def probe(self) -> bool:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_name(self) -> str:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def set_serial(self, serial: str):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass
|
||||
class OpenOCDInterface:
|
||||
name: str
|
||||
file: str
|
||||
serial_cmd: str
|
||||
additional_args: typing.Optional[list[str]] = None
|
||||
|
||||
|
||||
class OpenOCDProgrammer(Programmer):
|
||||
def __init__(self, interface: OpenOCDInterface):
|
||||
self.interface = interface
|
||||
self.logger = logging.getLogger("OpenOCD")
|
||||
self.serial: typing.Optional[str] = None
|
||||
|
||||
def _add_file(self, params: list[str], file: str):
|
||||
params.append("-f")
|
||||
params.append(file)
|
||||
|
||||
def _add_command(self, params: list[str], command: str):
|
||||
params.append("-c")
|
||||
params.append(command)
|
||||
|
||||
def _add_serial(self, params: list[str], serial: str):
|
||||
self._add_command(params, f"{self.interface.serial_cmd} {serial}")
|
||||
|
||||
def set_serial(self, serial: str):
|
||||
self.serial = serial
|
||||
|
||||
def flash(self, bin: str) -> bool:
|
||||
i = self.interface
|
||||
|
||||
if os.altsep:
|
||||
bin = bin.replace(os.sep, os.altsep)
|
||||
|
||||
openocd_launch_params = ["openocd"]
|
||||
self._add_file(openocd_launch_params, i.file)
|
||||
if self.serial:
|
||||
self._add_serial(openocd_launch_params, self.serial)
|
||||
if i.additional_args:
|
||||
for a in i.additional_args:
|
||||
self._add_command(openocd_launch_params, a)
|
||||
self._add_file(openocd_launch_params, "target/stm32wbx.cfg")
|
||||
self._add_command(openocd_launch_params, "init")
|
||||
self._add_command(openocd_launch_params, f"program {bin} reset exit 0x8000000")
|
||||
|
||||
# join the list of parameters into a string, but add quote if there are spaces
|
||||
openocd_launch_params_string = " ".join(
|
||||
[f'"{p}"' if " " in p else p for p in openocd_launch_params]
|
||||
)
|
||||
|
||||
self.logger.debug(f"Launching: {openocd_launch_params_string}")
|
||||
|
||||
process = subprocess.Popen(
|
||||
openocd_launch_params,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
)
|
||||
|
||||
while process.poll() is None:
|
||||
time.sleep(0.25)
|
||||
print(".", end="", flush=True)
|
||||
print()
|
||||
|
||||
success = process.returncode == 0
|
||||
|
||||
if not success:
|
||||
self.logger.error("OpenOCD failed to flash")
|
||||
if process.stdout:
|
||||
self.logger.error(process.stdout.read().decode("utf-8").strip())
|
||||
|
||||
return success
|
||||
|
||||
def probe(self) -> bool:
|
||||
i = self.interface
|
||||
|
||||
openocd_launch_params = ["openocd"]
|
||||
self._add_file(openocd_launch_params, i.file)
|
||||
if self.serial:
|
||||
self._add_serial(openocd_launch_params, self.serial)
|
||||
if i.additional_args:
|
||||
for a in i.additional_args:
|
||||
self._add_command(openocd_launch_params, a)
|
||||
self._add_file(openocd_launch_params, "target/stm32wbx.cfg")
|
||||
self._add_command(openocd_launch_params, "init")
|
||||
self._add_command(openocd_launch_params, "exit")
|
||||
|
||||
self.logger.debug(f"Launching: {' '.join(openocd_launch_params)}")
|
||||
|
||||
process = subprocess.Popen(
|
||||
openocd_launch_params,
|
||||
stderr=subprocess.STDOUT,
|
||||
stdout=subprocess.PIPE,
|
||||
)
|
||||
|
||||
# Wait for OpenOCD to end and get the return code
|
||||
process.wait()
|
||||
found = process.returncode == 0
|
||||
|
||||
if process.stdout:
|
||||
self.logger.debug(process.stdout.read().decode("utf-8").strip())
|
||||
|
||||
return found
|
||||
|
||||
def get_name(self) -> str:
|
||||
return self.interface.name
|
||||
|
||||
|
||||
def blackmagic_find_serial(serial: str):
|
||||
import serial.tools.list_ports as list_ports
|
||||
|
||||
if serial and os.name == "nt":
|
||||
if not serial.startswith("\\\\.\\"):
|
||||
serial = f"\\\\.\\{serial}"
|
||||
|
||||
ports = list(list_ports.grep("blackmagic"))
|
||||
if len(ports) == 0:
|
||||
return None
|
||||
elif len(ports) > 2:
|
||||
if serial:
|
||||
ports = list(
|
||||
filter(
|
||||
lambda p: p.serial_number == serial
|
||||
or p.name == serial
|
||||
or p.device == serial,
|
||||
ports,
|
||||
)
|
||||
)
|
||||
if len(ports) == 0:
|
||||
return None
|
||||
|
||||
if len(ports) > 2:
|
||||
raise Exception("More than one Blackmagic probe found")
|
||||
|
||||
# If you're getting any issues with auto lookup, uncomment this
|
||||
# print("\n".join([f"{p.device} {vars(p)}" for p in ports]))
|
||||
port = sorted(ports, key=lambda p: f"{p.location}_{p.name}")[0]
|
||||
|
||||
if serial:
|
||||
if (
|
||||
serial != port.serial_number
|
||||
and serial != port.name
|
||||
and serial != port.device
|
||||
):
|
||||
return None
|
||||
|
||||
if os.name == "nt":
|
||||
port.device = f"\\\\.\\{port.device}"
|
||||
return port.device
|
||||
|
||||
|
||||
def _resolve_hostname(hostname):
|
||||
try:
|
||||
return socket.gethostbyname(hostname)
|
||||
except socket.gaierror:
|
||||
return None
|
||||
|
||||
|
||||
def blackmagic_find_networked(serial: str):
|
||||
if not serial:
|
||||
serial = "blackmagic.local"
|
||||
|
||||
# remove the tcp: prefix if it's there
|
||||
if serial.startswith("tcp:"):
|
||||
serial = serial[4:]
|
||||
|
||||
# remove the port if it's there
|
||||
if ":" in serial:
|
||||
serial = serial.split(":")[0]
|
||||
|
||||
if not (probe := _resolve_hostname(serial)):
|
||||
return None
|
||||
|
||||
return f"tcp:{probe}:2345"
|
||||
|
||||
|
||||
class BlackmagicProgrammer(Programmer):
|
||||
def __init__(
|
||||
self,
|
||||
port_resolver, # typing.Callable[typing.Union[str, None], typing.Optional[str]]
|
||||
name: str,
|
||||
):
|
||||
self.port_resolver = port_resolver
|
||||
self.name = name
|
||||
self.logger = logging.getLogger("BlackmagicUSB")
|
||||
self.port: typing.Optional[str] = None
|
||||
|
||||
def _add_command(self, params: list[str], command: str):
|
||||
params.append("-ex")
|
||||
params.append(command)
|
||||
|
||||
def _valid_ip(self, address):
|
||||
try:
|
||||
socket.inet_aton(address)
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
def set_serial(self, serial: str):
|
||||
if self._valid_ip(serial):
|
||||
self.port = f"{serial}:2345"
|
||||
elif ip := _resolve_hostname(serial):
|
||||
self.port = f"{ip}:2345"
|
||||
else:
|
||||
self.port = serial
|
||||
|
||||
def flash(self, bin: str) -> bool:
|
||||
if not self.port:
|
||||
if not self.probe():
|
||||
return False
|
||||
|
||||
# We can convert .bin to .elf with objcopy:
|
||||
# arm-none-eabi-objcopy -I binary -O elf32-littlearm --change-section-address=.data=0x8000000 -B arm -S app.bin app.elf
|
||||
# But I choose to use the .elf file directly because we are flashing our own firmware and it always has an elf predecessor.
|
||||
elf = bin.replace(".bin", ".elf")
|
||||
if not os.path.exists(elf):
|
||||
self.logger.error(
|
||||
f"Sorry, but Blackmagic can't flash .bin file, and {elf} doesn't exist"
|
||||
)
|
||||
return False
|
||||
|
||||
# arm-none-eabi-gdb build/f7-firmware-D/firmware.bin
|
||||
# -ex 'set pagination off'
|
||||
# -ex 'target extended-remote /dev/cu.usbmodem21201'
|
||||
# -ex 'set confirm off'
|
||||
# -ex 'monitor swdp_scan'
|
||||
# -ex 'attach 1'
|
||||
# -ex 'set mem inaccessible-by-default off'
|
||||
# -ex 'load'
|
||||
# -ex 'compare-sections'
|
||||
# -ex 'quit'
|
||||
|
||||
gdb_launch_params = ["arm-none-eabi-gdb", elf]
|
||||
self._add_command(gdb_launch_params, f"target extended-remote {self.port}")
|
||||
self._add_command(gdb_launch_params, "set pagination off")
|
||||
self._add_command(gdb_launch_params, "set confirm off")
|
||||
self._add_command(gdb_launch_params, "monitor swdp_scan")
|
||||
self._add_command(gdb_launch_params, "attach 1")
|
||||
self._add_command(gdb_launch_params, "set mem inaccessible-by-default off")
|
||||
self._add_command(gdb_launch_params, "load")
|
||||
self._add_command(gdb_launch_params, "compare-sections")
|
||||
self._add_command(gdb_launch_params, "quit")
|
||||
|
||||
self.logger.debug(f"Launching: {' '.join(gdb_launch_params)}")
|
||||
|
||||
process = subprocess.Popen(
|
||||
gdb_launch_params,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
)
|
||||
|
||||
while process.poll() is None:
|
||||
time.sleep(0.5)
|
||||
print(".", end="", flush=True)
|
||||
print()
|
||||
|
||||
if not process.stdout:
|
||||
return False
|
||||
|
||||
output = process.stdout.read().decode("utf-8").strip()
|
||||
flashed = "Loading section .text," in output
|
||||
|
||||
# Check flash verification
|
||||
if "MIS-MATCHED!" in output:
|
||||
flashed = False
|
||||
|
||||
if "target image does not match the loaded file" in output:
|
||||
flashed = False
|
||||
|
||||
if not flashed:
|
||||
self.logger.error("Blackmagic failed to flash")
|
||||
self.logger.error(output)
|
||||
|
||||
return flashed
|
||||
|
||||
def probe(self) -> bool:
|
||||
if not (port := self.port_resolver(self.port)):
|
||||
return False
|
||||
|
||||
self.port = port
|
||||
return True
|
||||
|
||||
def get_name(self) -> str:
|
||||
return self.name
|
||||
|
||||
|
||||
programmers: list[Programmer] = [
|
||||
OpenOCDProgrammer(
|
||||
OpenOCDInterface(
|
||||
"cmsis-dap",
|
||||
"interface/cmsis-dap.cfg",
|
||||
"cmsis_dap_serial",
|
||||
["transport select swd"],
|
||||
),
|
||||
),
|
||||
OpenOCDProgrammer(
|
||||
OpenOCDInterface(
|
||||
"stlink", "interface/stlink.cfg", "hla_serial", ["transport select hla_swd"]
|
||||
),
|
||||
),
|
||||
BlackmagicProgrammer(blackmagic_find_serial, "blackmagic_usb"),
|
||||
]
|
||||
|
||||
network_programmers = [
|
||||
BlackmagicProgrammer(blackmagic_find_networked, "blackmagic_wifi")
|
||||
]
|
||||
|
||||
|
||||
class Main(App):
|
||||
def init(self):
|
||||
self.subparsers = self.parser.add_subparsers(help="sub-command help")
|
||||
self.parser_flash = self.subparsers.add_parser("flash", help="Flash a binary")
|
||||
self.parser_flash.add_argument(
|
||||
"bin",
|
||||
type=str,
|
||||
help="Binary to flash",
|
||||
)
|
||||
interfaces = [i.get_name() for i in programmers]
|
||||
interfaces.extend([i.get_name() for i in network_programmers])
|
||||
self.parser_flash.add_argument(
|
||||
"--interface",
|
||||
choices=interfaces,
|
||||
type=str,
|
||||
help="Interface to use",
|
||||
)
|
||||
self.parser_flash.add_argument(
|
||||
"--serial",
|
||||
type=str,
|
||||
help="Serial number or port of the programmer",
|
||||
)
|
||||
self.parser_flash.set_defaults(func=self.flash)
|
||||
|
||||
def _search_interface(self, serial: typing.Optional[str]) -> list[Programmer]:
|
||||
found_programmers = []
|
||||
|
||||
for p in programmers:
|
||||
name = p.get_name()
|
||||
if serial:
|
||||
p.set_serial(serial)
|
||||
self.logger.debug(f"Trying {name} with {serial}")
|
||||
else:
|
||||
self.logger.debug(f"Trying {name}")
|
||||
|
||||
if p.probe():
|
||||
self.logger.debug(f"Found {name}")
|
||||
found_programmers += [p]
|
||||
else:
|
||||
self.logger.debug(f"Failed to probe {name}")
|
||||
|
||||
return found_programmers
|
||||
|
||||
def _search_network_interface(
|
||||
self, serial: typing.Optional[str]
|
||||
) -> list[Programmer]:
|
||||
found_programmers = []
|
||||
|
||||
for p in network_programmers:
|
||||
name = p.get_name()
|
||||
|
||||
if serial:
|
||||
p.set_serial(serial)
|
||||
self.logger.debug(f"Trying {name} with {serial}")
|
||||
else:
|
||||
self.logger.debug(f"Trying {name}")
|
||||
|
||||
if p.probe():
|
||||
self.logger.debug(f"Found {name}")
|
||||
found_programmers += [p]
|
||||
else:
|
||||
self.logger.debug(f"Failed to probe {name}")
|
||||
|
||||
return found_programmers
|
||||
|
||||
def flash(self):
|
||||
start_time = time.time()
|
||||
bin_path = os.path.abspath(self.args.bin)
|
||||
|
||||
if not os.path.exists(bin_path):
|
||||
self.logger.error(f"Binary file not found: {bin_path}")
|
||||
return 1
|
||||
|
||||
if self.args.interface:
|
||||
i_name = self.args.interface
|
||||
interfaces = [p for p in programmers if p.get_name() == i_name]
|
||||
if len(interfaces) == 0:
|
||||
interfaces = [p for p in network_programmers if p.get_name() == i_name]
|
||||
else:
|
||||
self.logger.info(f"Probing for interfaces...")
|
||||
interfaces = self._search_interface(self.args.serial)
|
||||
|
||||
if len(interfaces) == 0:
|
||||
# Probe network blackmagic
|
||||
self.logger.info(f"Probing for network interfaces...")
|
||||
interfaces = self._search_network_interface(self.args.serial)
|
||||
|
||||
if len(interfaces) == 0:
|
||||
self.logger.error("No interface found")
|
||||
return 1
|
||||
|
||||
if len(interfaces) > 1:
|
||||
self.logger.error("Multiple interfaces found: ")
|
||||
self.logger.error(
|
||||
f"Please specify '--interface={[i.get_name() for i in interfaces]}'"
|
||||
)
|
||||
return 1
|
||||
|
||||
interface = interfaces[0]
|
||||
|
||||
if self.args.serial:
|
||||
interface.set_serial(self.args.serial)
|
||||
self.logger.info(
|
||||
f"Flashing {bin_path} via {interface.get_name()} with {self.args.serial}"
|
||||
)
|
||||
else:
|
||||
self.logger.info(f"Flashing {bin_path} via {interface.get_name()}")
|
||||
|
||||
if not interface.flash(bin_path):
|
||||
self.logger.error(f"Failed to flash via {interface.get_name()}")
|
||||
return 1
|
||||
|
||||
flash_time = time.time() - start_time
|
||||
bin_size = os.path.getsize(bin_path)
|
||||
self.logger.info(f"Flashed successfully in {flash_time:.2f}s")
|
||||
self.logger.info(f"Effective speed: {bin_size / flash_time / 1024:.2f} KiB/s")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
Main()()
|
||||
Reference in New Issue
Block a user