Source code for octoprobe.octoprobe

from __future__ import annotations

import enum
import pathlib
import time
import typing

from .lib_tentacle import TentacleBase
from .usb_tentacle.usb_constants import UsbPlug, UsbPlugs
from .usb_tentacle.usb_tentacle import UsbTentacles
from .util_baseclasses import OctoprobeAppExitException
from .util_pyudev import UdevPoller

FULL_POWERCYCLE_ALL_TENTACLES = False


[docs] class CtxTestRun: """ The context of a test run """ def __init__(self, connected_tentacles: typing.Sequence[TentacleBase]) -> None: assert isinstance(connected_tentacles, list) self.connected_tentacles = connected_tentacles
[docs] @staticmethod def session_powercycle_tentacles() -> UsbTentacles: """ Powers all RP2 infra. Finds all tentacle by finding rp2_unique_id of the RP2 infra. """ # We have to reset the power for all rp2-infra to become visible usb_tentacles = UsbTentacles.query(require_serial=True) usb_tentacles = usb_tentacles.select(serials=None) if FULL_POWERCYCLE_ALL_TENTACLES: # Powercycling ALL hubs usb_tentacles.set_plugs(plugs=UsbPlugs.default_off()) time.sleep(0.2) # success: 0.0 usb_tentacles.set_plugs(plugs=UsbPlugs({UsbPlug.INFRA: True})) # Without hub inbetween: failed: 0.4, success: 0.5 # With hub inbetween: failed: 0.7, success: 0.8 # RSHTECH 7 port hub produced errors using 1.2s time.sleep(2.0) # else: # usb_tentacles.set_plugs( # plugs=UsbPlugs( # { # UsbPlug.INFRA: True, # UsbPlug.INFRABOOT: True, # UsbPlug.DUT: False, # UsbPlug.ERROR: False, # } # ) # ) # time.sleep(2.0) # return util_usb_serial.QueryResultTentacle.query_fast() return usb_tentacles
def session_teardown(self) -> None: pass def function_setup_infa_and_dut( self, udev_poller: UdevPoller, tentacle: TentacleBase, directory_logs: pathlib.Path, ) -> None: self.function_prepare_dut(tentacle=tentacle) self.function_setup_infra(udev_poller=udev_poller, tentacle=tentacle) self.function_setup_dut_flash( udev_poller=udev_poller, tentacle=tentacle, directory_logs=directory_logs, )
[docs] def function_setup_infra( self, udev_poller: UdevPoller, tentacle: TentacleBase, ) -> None: """ Power off all other known usb power plugs. For each active tentacle: * Power on infa * Flash firmware * Get serial numbers and assert if it does not match the config * Return tty """ # Instantiate poller BEFORE switching on power to avoid a race condition tentacle.infra.setup_infra(udev_poller) tentacle.infra.mcu_infra.active_led(on=False) if not FULL_POWERCYCLE_ALL_TENTACLES: # As the tentacle infra has NOT been powercycled, we # have to reset the relays tentacle.infra.mcu_infra.relays( relays_close=[], relays_open=[1, 2, 3, 4, 5, 6, 7], )
def function_prepare_dut(self, tentacle: TentacleBase) -> None: tentacle.power.dut = False tentacle.infra.mp_remote_close() if tentacle.is_mcu: tentacle.dut.mp_remote_close() def function_setup_dut_flash( self, udev_poller: UdevPoller, tentacle: TentacleBase, directory_logs: pathlib.Path, ) -> None: if not tentacle.is_mcu: return # Flash the MCU tentacle.flash_dut( udev_poller=udev_poller, directory_logs=directory_logs, firmware_spec=tentacle.tentacle_state.firmware_spec, ) def function_teardown( self, active_tentacles: typing.Sequence[TentacleBase] ) -> None: if False: for tentacle in active_tentacles: tentacle.power_dut_off_and_wait() for tentacle in active_tentacles: # Before we power off the DUT: free mp_remote if not tentacle.is_mcu: continue tentacle.dut.mp_remote_close() # Before we can switch the relays: Connect to infra, power off and free mp_remote tentacle.infra.connect_mpremote_if_needed() try: tentacle.infra.mcu_infra.relays( relays_open=tentacle.infra.LIST_ALL_RELAYS ) except Exception as e: raise OctoprobeAppExitException( f"{tentacle.infra.label}: Failed to control relays: {e!r}" ) from e tentacle.infra.power.dut = False tentacle.infra.mp_remote_close() def setup_relays( self, tentacles: typing.Sequence[TentacleBase], futs: tuple[enum.StrEnum] ) -> None: assert isinstance(tentacles, list | tuple) assert isinstance(futs, list | tuple) assert len(futs) == 1 fut = futs[0] for tentacle in tentacles: tentacle.set_relays_by_FUT(fut=fut)