#!/usr/bin/env python3 from __future__ import annotations import argparse import ipaddress import json import re import subprocess import sys import time from pathlib import Path import requests import serial ############ # Constant # ############ __version__ = "v0.1" PREFIX = "| " SERIAL = "/dev/ttyUSB1" ########### # Helpers # ########### def ip_to_network(ip: str) -> tuple[str, str]: dots = ip.split(".") assert len(dots) == 4 return (".".join(dots[:3] + ["1"]), ".".join(dots[:3] + ["0"])) def get_iface_addr(iface: str) -> str: data = subprocess.check_output(["ip", "-json", "addr", "show", "dev", iface]) ips = [ai["local"] for ai in json.loads(data)[0]["addr_info"] if ai["family"] == "inet"] assert len(ips) == 1 return ips[0] ############## # CLI parser # ############## def setup_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(prog="kv260-flasher", description="kv260 flasher") parser.add_argument( "--version", action="version", version=f"%(prog)s, {__version__}" ) parser.add_argument( "--image", "-i", choices=["A", "B", "WIC"], required=True, help="Image partition", ) parser.add_argument( "--fwuen_button", "-fb", choices=["1", "2"], default="2", help="LAA button to trigger the KV260 FWUEN button", ) parser.add_argument("filename", help="File to flash") return parser def handle_flasher(parser, options) -> int: match options.image: case "A" | "B": if options.filename.suffix != ".bin": parser.error( f"For Image{options.image}, the file must have a .bin suffix" ) case "WIC": if options.filename.suffix != ".wic": parser.error( f"For Image{options.image}, the file must have a .wic suffix" ) print(f"[1/ 8] Booting DUT in recovery with FWUEN button {options.fwuen_button}") pattern = re.compile(r"to http://(\d+\.\d+\.\d+\.\d+)") retriesLeft = 3 while retriesLeft > 0: subprocess.check_call(["laacli", "power", "12v", "off"]) subprocess.check_call(["laacli", "button", options.fwuen_button, "on"]) time.sleep(10) subprocess.check_call(["laacli", "power", "12v", "on"]) try: with serial.Serial(SERIAL, baudrate=115200) as ser: while True: line = ser.readline().decode("utf-8", errors="replace").replace("\r", "") if ip := pattern.findall(line): assert len(ip) == 1 ip = ip[0] break sys.stdout.write(PREFIX + line) sys.stdout.write(PREFIX + line) url = f"http://{ip}" print(f" * DUT IP {ip}") retriesLeft = 0 except UnicodeDecodeError as ex: retriesLeft = retriesLeft - 1 print(f"{ex} - retrying again retries left = {retriesLeft}") subprocess.check_call(["laacli", "power", "12v", "off"]) subprocess.check_call(["laacli", "button", options.fwuen_button, "off"]) if retriesLeft == 0: print("No more retries left") return 1 time.sleep(5) except serial.serialutil.SerialException as ex: subprocess.check_call(["laacli", "power", "12v", "off"]) subprocess.check_call(["laacli", "button", options.fwuen_button, "off"]) print(ex) return 1 print("[2 / 8] Setting up network") (gateway, network) = ip_to_network(ip) print(f" * gateway: {gateway}") print(f" * network: {network}") public = get_iface_addr("public") private = get_iface_addr("private") if not public: print("ERROR: unable to get LAA public ip") return 1 print(f" * LAA public ip: {public}") if ipaddress.ip_address(public) in ipaddress.ip_network(f"{network}/24"): print("ERROR: DUT in the LAA public network range") return 1 try: print(f" * LAA private ip: {private}") if ipaddress.ip_address(private) not in ipaddress.ip_network(f"{network}/24"): subprocess.run( ["ip", "address", "add", f"{gateway}/24", "dev", "private"], check=True ) subprocess.run(["ip", "route", "add", f"{network}/24", "via", gateway]) print(" => Done\n") print(f"[3 / 8] Check DUT is reachable at {url}") try: ret = requests.get(url, timeout=10) except Exception as exc: print(f"ERROR: unable to connect: {exc}") return 1 if ret.status_code != 200: print("ERROR: unable to connect to image selector") return 1 version = re.search(r'Version:\s*([0-9.]+)', ret.text).group(1) print(f"Recovery APP version: {version}") print(" => OK\n") print("[4 / 8] Erase phase") if version != "1.0": # Erase flash print(f"Erasing flash for Image{options.image}...") out = subprocess.check_output( [ "curl", "--silent", f"{url}/flash_erase_img{options.image}" ], stderr=subprocess.DEVNULL, ) print("Done.") print("Waiting for flash erase to complete ...") while True: progress = requests.get(f"{url}/flash_erase_status", timeout=2).json()["Progress"] print(f"Erase progress: {progress}") if progress in ["100", 100]: break time.sleep(1) else: print("Image erase is part of image uploading") print(" => Done\n") print(f"[5 / 8] Upload file to image{options.image}") out = subprocess.check_output( [ "curl", "--silent", "-X", "POST", f"{url}/download_img{options.image}", f"-F/download_img{options.image}=@{options.filename}", ], stderr=subprocess.DEVNULL, ) if out != b'{"Status":"Success"}': print("ERROR: unable to flash new image: {out}") return 1 print(" => Done\n") print("[6 / 8] Configure boot image") imgAboot = "true" if options.image == "A" else "false" imgBboot = "true" if options.image == "B" else "false" out = subprocess.check_output( [ "curl", "-X", "POST", f"{url}/cfg_boot_img", "-H", "Content-Type: application/json", "-d", f'{{"ImgABootable":{imgAboot},"ImgBBootable":{imgBboot},"ReqBootImg":"Image{options.image}"}}', ], stderr=subprocess.DEVNULL, ) if out != b'{"Status":"Success"}': print("ERROR: unable to configure image to boot") return 1 print("[7 / 8] Check boot configuration") ret = requests.get(f"{url}/boot_img_status", timeout=10) data = ret.json() print(data) print(f" * ImageA : {'bootable' if data['ImgABootable'] else 'NOT bootable'}") print(f" * ImageB : {'bootable' if data['ImgBBootable'] else 'NOT bootable'}") print(f" * Last Booted : {data['LastBootImg']}") print(f" * Requested Boot : {data['ReqBootImg']}") finally: print("\n[8 / 8] Removing network setup and leave recovery") subprocess.check_call(["laacli", "power", "12v", "off"]) subprocess.check_call(["laacli", "button", options.fwuen_button, "off"]) if ipaddress.ip_address(private) not in ipaddress.ip_network(f"{network}/24"): subprocess.run(["ip", "route", "del", f"{network}/24", "dev", "private"]) subprocess.run(["ip", "address", "del", f"{gateway}/24", "dev", "private"]) return 0 ############## # Entrypoint # ############## def main() -> int: parser = setup_parser() options = parser.parse_args() options.filename = Path(options.filename).resolve() if not options.filename.exists(): parser.error(f"'{options.filename}' does not exists") return handle_flasher(parser, options) if __name__ == "__main__": sys.exit(main())