import contextlib
import os
import threading
import time
import paramiko
from scp import SCPClient
from fortrace.utility.logger_helper import setup_logger
from fortrace.utility.usb_spoofing.usb_device import USBDevice
logger = setup_logger(__name__)
[docs]
class USBGadget:
active = threading.Lock()
def __init__(self, gadget_name: str, keep_backing_store: bool = False, **kwargs):
self.gadget_name = gadget_name
self.keep_backing_store = keep_backing_store
self.device = USBDevice(**kwargs)
[docs]
class USBSpoofer:
def __init__(self, usb_host: dict):
self._client = paramiko.SSHClient()
self._devices: list[USBGadget] = []
self._config = usb_host
self._script_root = f"sudo /home/{self._config['username']}/usb-spoofer"
self._connect_to_usb_host()
self._create_devices()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
[docs]
def close(self):
"""Cleanup all devices and close connection."""
self._remove_devices()
self._client.close()
def _connect_to_usb_host(self):
self._client.load_system_host_keys() # assume SSH key authentication
self._client.connect(
self._config["hostname"],
port=self._config.get("port", 22),
username=self._config["username"],
password=self._config.get("password"),
)
def _create_devices(self):
for device in self._config["devices"]:
self._devices.append(USBGadget(**device))
usb_device = self._devices[-1]
# assumes usb-spoofer script is at /home/$USER/usb-spoofer
command = self._script_root + f" create {usb_device.gadget_name}"
command += f" --vendor-id='{usb_device.device.vendor_id}'"
command += f" --product-id='{usb_device.device.product_id}'"
command += f" --manufacturer='{usb_device.device.manufacturer}'"
command += f" --product='{usb_device.device.product_name}'"
command += f" --serial-number='{usb_device.device.serial_number}'"
command += f" --size='{usb_device.device.size}'"
command += f" --file-system='{usb_device.device.file_system}'"
if usb_device.device.label is not None:
command += f" --label='{usb_device.device.label}'"
_, stdout, stderr = self._client.exec_command(command)
logger.info(stdout.readlines())
if err := stderr.readlines():
logger.error(err)
def _remove_devices(self):
for device in self._devices:
command = self._script_root + f" remove {device.gadget_name}"
if device.keep_backing_store:
command += f" --keep-backing-store"
_, stdout, stderr = self._client.exec_command(command)
logger.debug(stdout.readlines())
if err := stderr.readlines():
logger.error(err)
[docs]
def activate_device(self, gadget_name: str):
"""Activate a pre-configured gadget.
Args:
gadget_name: name of the gadget to be activated
"""
device = next((dev for dev in self._devices if dev.gadget_name == gadget_name))
active_device = next(
(dev for dev in self._devices if dev.active.locked()), None
)
if active_device is None:
device.active.acquire()
_, stdout, stderr = self._client.exec_command(
self._script_root + f" activate {device.gadget_name}"
)
logger.debug(stdout.readlines())
if err := stderr.readlines():
logger.error(err)
else:
logger.error(
f"Device {gadget_name} cannot be activated since {active_device.gadget_name} is currently activated."
)
[docs]
def deactivate_device(self, gadget_name: str):
"""Deactivate a previously activated gadget.
Args:
gadget_name: name of the gadget to be activated
"""
device = next((dev for dev in self._devices if dev.gadget_name == gadget_name))
_, stdout, stderr = self._client.exec_command(
self._script_root + f" deactivate {device.gadget_name}"
)
logger.debug(stdout.readlines())
if err := stderr.readlines():
logger.error(err)
device.active.release()
[docs]
@contextlib.contextmanager
def activate(self, gadget_name: str):
"""Implements the contextmanager for device activation.
Args:
gadget_name: name of the gadget to activate.
Example:
with usb_spoofing.activate(gadget_name) as gadget:
domain.attach_usb_device(gadget.device.vendor_id, gadget.device.product_id)
"""
self.activate_device(gadget_name)
time.sleep(5)
try:
yield self.get_device(gadget_name)
finally:
self.deactivate_device(gadget_name)
[docs]
@contextlib.contextmanager
def serve_backing_store(self, device: USBGadget):
"""Implements the contextmanager for mounting the backing store.
Args:
device: USBGaget to mount (must not be activated)
"""
command = self._script_root + f" mount {device.gadget_name}"
_, stdout, stderr = self._client.exec_command(command)
logger.debug(stdout.readlines())
if err := stderr.readlines():
logger.error(err)
try:
yield f"/mnt/{device.gadget_name}"
finally:
command = self._script_root + f" umount {device.gadget_name}"
_, stdout, stderr = self._client.exec_command(command)
logger.debug(stdout.readlines())
if err := stderr.readlines():
logger.error(err)
[docs]
def transfer_to_device(self, gadget_name: str, source: os.PathLike):
"""Transfer a file/folder to a (deactivated) device.
Args:
gadget_name: name of the configured gadget
source: path to file/folder to be copied
"""
device = next((dev for dev in self._devices if dev.gadget_name == gadget_name))
if device.active.locked():
raise RuntimeError("Cannot upload file(s) to active device")
with self.serve_backing_store(device):
with SCPClient(self._client.get_transport()) as scp:
scp.put(str(source), remote_path=f"/mnt/{gadget_name}", recursive=True)
[docs]
def transfer_from_device(self, gadget_name: str, destination: os.PathLike = ""):
"""Transfer the backing store to the host computer.
Args:
gadget_name: name of the configured gadget
destination: path on the host computer
Returns:
"""
device = next((dev for dev in self._devices if dev.gadget_name == gadget_name))
if device.active.locked():
raise RuntimeError("Cannot upload file(s) to active device")
with self.serve_backing_store(device):
with SCPClient(self._client.get_transport()) as scp:
scp.get(
f"/mnt/{gadget_name}", recursive=True, local_path=str(destination)
)
[docs]
def get_device(self, gadget_name: str) -> USBGadget | None:
"""Get a spoofed USB device, based on its name.
Args:
gadget_name: name of the gadget
Returns:
USBGadget or None
"""
return next(
(dev for dev in self._devices if dev.gadget_name == gadget_name), None
)