Source code for fortrace.utility.usb_spoofing.pi_usb_spoofing

import contextlib
import os
import threading
import time

import paramiko
from scp import SCPClient

from fortrace.utility.logger_helper import setup_logger
from fortrace.utility.usb_spoofing.usb_device import USBDevice

logger = setup_logger(__name__)


[docs] class USBGadget: active = threading.Lock() def __init__(self, gadget_name: str, keep_backing_store: bool = False, **kwargs): self.gadget_name = gadget_name self.keep_backing_store = keep_backing_store self.device = USBDevice(**kwargs)
[docs] class USBSpoofer: def __init__(self, usb_host: dict): self._client = paramiko.SSHClient() self._devices: list[USBGadget] = [] self._config = usb_host self._script_root = f"sudo /home/{self._config['username']}/usb-spoofer" self._connect_to_usb_host() self._create_devices() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close()
[docs] def close(self): """Cleanup all devices and close connection.""" self._remove_devices() self._client.close()
def _connect_to_usb_host(self): self._client.load_system_host_keys() # assume SSH key authentication self._client.connect( self._config["hostname"], port=self._config.get("port", 22), username=self._config["username"], password=self._config.get("password"), ) def _create_devices(self): for device in self._config["devices"]: self._devices.append(USBGadget(**device)) usb_device = self._devices[-1] # assumes usb-spoofer script is at /home/$USER/usb-spoofer command = self._script_root + f" create {usb_device.gadget_name}" command += f" --vendor-id='{usb_device.device.vendor_id}'" command += f" --product-id='{usb_device.device.product_id}'" command += f" --manufacturer='{usb_device.device.manufacturer}'" command += f" --product='{usb_device.device.product_name}'" command += f" --serial-number='{usb_device.device.serial_number}'" command += f" --size='{usb_device.device.size}'" command += f" --file-system='{usb_device.device.file_system}'" if usb_device.device.label is not None: command += f" --label='{usb_device.device.label}'" _, stdout, stderr = self._client.exec_command(command) logger.info(stdout.readlines()) if err := stderr.readlines(): logger.error(err) def _remove_devices(self): for device in self._devices: command = self._script_root + f" remove {device.gadget_name}" if device.keep_backing_store: command += f" --keep-backing-store" _, stdout, stderr = self._client.exec_command(command) logger.debug(stdout.readlines()) if err := stderr.readlines(): logger.error(err)
[docs] def activate_device(self, gadget_name: str): """Activate a pre-configured gadget. Args: gadget_name: name of the gadget to be activated """ device = next((dev for dev in self._devices if dev.gadget_name == gadget_name)) active_device = next( (dev for dev in self._devices if dev.active.locked()), None ) if active_device is None: device.active.acquire() _, stdout, stderr = self._client.exec_command( self._script_root + f" activate {device.gadget_name}" ) logger.debug(stdout.readlines()) if err := stderr.readlines(): logger.error(err) else: logger.error( f"Device {gadget_name} cannot be activated since {active_device.gadget_name} is currently activated." )
[docs] def deactivate_device(self, gadget_name: str): """Deactivate a previously activated gadget. Args: gadget_name: name of the gadget to be activated """ device = next((dev for dev in self._devices if dev.gadget_name == gadget_name)) _, stdout, stderr = self._client.exec_command( self._script_root + f" deactivate {device.gadget_name}" ) logger.debug(stdout.readlines()) if err := stderr.readlines(): logger.error(err) device.active.release()
[docs] @contextlib.contextmanager def activate(self, gadget_name: str): """Implements the contextmanager for device activation. Args: gadget_name: name of the gadget to activate. Example: with usb_spoofing.activate(gadget_name) as gadget: domain.attach_usb_device(gadget.device.vendor_id, gadget.device.product_id) """ self.activate_device(gadget_name) time.sleep(5) try: yield self.get_device(gadget_name) finally: self.deactivate_device(gadget_name)
[docs] @contextlib.contextmanager def serve_backing_store(self, device: USBGadget): """Implements the contextmanager for mounting the backing store. Args: device: USBGaget to mount (must not be activated) """ command = self._script_root + f" mount {device.gadget_name}" _, stdout, stderr = self._client.exec_command(command) logger.debug(stdout.readlines()) if err := stderr.readlines(): logger.error(err) try: yield f"/mnt/{device.gadget_name}" finally: command = self._script_root + f" umount {device.gadget_name}" _, stdout, stderr = self._client.exec_command(command) logger.debug(stdout.readlines()) if err := stderr.readlines(): logger.error(err)
[docs] def transfer_to_device(self, gadget_name: str, source: os.PathLike): """Transfer a file/folder to a (deactivated) device. Args: gadget_name: name of the configured gadget source: path to file/folder to be copied """ device = next((dev for dev in self._devices if dev.gadget_name == gadget_name)) if device.active.locked(): raise RuntimeError("Cannot upload file(s) to active device") with self.serve_backing_store(device): with SCPClient(self._client.get_transport()) as scp: scp.put(str(source), remote_path=f"/mnt/{gadget_name}", recursive=True)
[docs] def transfer_from_device(self, gadget_name: str, destination: os.PathLike = ""): """Transfer the backing store to the host computer. Args: gadget_name: name of the configured gadget destination: path on the host computer Returns: """ device = next((dev for dev in self._devices if dev.gadget_name == gadget_name)) if device.active.locked(): raise RuntimeError("Cannot upload file(s) to active device") with self.serve_backing_store(device): with SCPClient(self._client.get_transport()) as scp: scp.get( f"/mnt/{gadget_name}", recursive=True, local_path=str(destination) )
[docs] def get_device(self, gadget_name: str) -> USBGadget | None: """Get a spoofed USB device, based on its name. Args: gadget_name: name of the gadget Returns: USBGadget or None """ return next( (dev for dev in self._devices if dev.gadget_name == gadget_name), None )