Source code for fortrace.core.virsh_session

import os
from typing import Optional

import libvirt

from fortrace.core.virsh_domain import GraphicalVirshDomain, VirshDomain
from fortrace.utility.desktop_environments.desktop_environment import (
    DesktopEnvironmentType,
)
from fortrace.utility.distribution_constants import OSType
from fortrace.utility.logger_helper import setup_logger

logger = setup_logger(__name__)


[docs] class VirshSession: """ForTrace's representation of the hypervisor connection.""" _domains: list[VirshDomain] _networks: list[tuple[libvirt.virNetwork, bool]] def __init__( self, session_output: os.PathLike, hypervisor: str = "qemu:///system", ): """Creates a new handle to the Hypervisor. Creates a new ForTrace session, ready to register domains and start the simulation. Args: session_output: the base directory of the session. All output files will be created here and below hypervisor: the hypervisor this session should connect to """ self._session_output = session_output os.makedirs(self._session_output, exist_ok=True) self._domains = [] self._networks = [] try: self._conn = libvirt.open(hypervisor) except libvirt.libvirtError as e: logger.error(repr(e)) exit(1) def _create_network(self, network_name: str): """Create a network with the given name. Note: The network has to exist, in order to be created """ try: try: network = self._conn.networkLookupByName(network_name) except libvirt.libvirtError: network = self._conn.networkLookupByUUIDString(network_name) if not (initial_state := network.isActive()): network.create() self._networks.append((network, initial_state)) except libvirt.libvirtError as e: logger.error(repr(e)) raise e def __del__(self): for domain in self._domains: if domain is not None: domain.stop_sniffer() domain.destroy() for network, initial_state in self._networks: if not initial_state: network.destroy() self._conn.close() @property def domains(self): return self._domains @property def networks(self): return [network[0] for network in self._networks]
[docs] def create_domain( self, domain_name: str, os_type: OSType, domain_network: str | None = "default", desktop_environment: DesktopEnvironmentType | None = None, ) -> VirshDomain | GraphicalVirshDomain: """Creates a new domain with the given name. The domain is not started, which is done via the domain object. Args: domain_name: name or UUID of the domain to be created os_type: type of OS to expect domain_network: name/UUIDString of the network the domain should connect to desktop_environment: if domain is a graphical target, please provide the expected desktop environment Note: 'Create' uses the meaning as it is defined by libvirt. A domain to be created has to be installed on the system. Returns: a handle to the domain (graphical or non-graphical) """ if domain_network: self._create_network(domain_network) if desktop_environment is not None: self._domains.append( GraphicalVirshDomain( domain_name, self._conn, domain_network, os_type, self._session_output, desktop_environment, ) ) else: self._domains.append( VirshDomain( domain_name, self._conn, domain_network, os_type, self._session_output, ) ) return self._domains[-1]
[docs] def remove_domain(self, domain: VirshDomain): """Remove the provided domain from the session. The removed domain will be powered down (with force, if necessary). """ try: self._domains.remove(domain) except ValueError as exc: logger.error( "Cannot remove domain %s, since it is not in the list of managed " "domains", domain.domain_name, ) raise ValueError( f"The provided domain {domain.domain_name} is not under control of this" f" session, thus cannot be removed." ) from exc domain.destroy()
[docs] def get_domain_by_name(self, domain_name: str) -> VirshDomain: """Get a domain based on its name of the ForTrace related domains. Args: domain_name: name of the domain to retrieve Returns: Domain, if there is one with the given name. """ return next( iter( [ domain for domain in self._domains if domain.domain.name() == domain_name ] ), None, )
[docs] def get_domain_by_id(self, domain_id: str) -> Optional[VirshDomain]: """Get the domain with the specified domain from the list of session domains. Args: domain_id: UUIDString of the domain to be retrieved Returns: The VirshDomain with the given UUID if it is in the list of session domains. """ return next( iter( [ domain for domain in self._domains if domain.domain.UUIDString() == domain_id ] ), None, )
[docs] def get_network_by_name(self, network_name: str) -> Optional[libvirt.virNetwork]: """Get a network based on its name of the ForTrace networks. Args: network_name: name of the network to retrieve Returns: The libvirt virNetwork object or None """ return next( iter( [ network[0] for network in self._networks if network[0].name() == network_name ] ), None, )
[docs] def get_network_by_id(self, network_id: str) -> Optional[libvirt.virNetwork]: """Get a network based on its UUID of the ForTrace networks. Args: network_id: UUID string of the network to retrieve Returns: The libvirt virNetwork object or None """ return next( iter( [ network[0] for network in self._networks if network[0].UUIDString() == network_id ] ), None, )
[docs] def list_available_domains(self) -> list[tuple[str, str]]: """List all available domains of the hypervisor with their UUID.""" return [ (domain.name(), domain.UUIDString()) for domain in self._conn.listAllDomains() ]
[docs] def list_available_networks(self) -> list[tuple[str, str]]: """List all available networks of the hypervisor with their UUID.""" return [ (network.name(), network.UUIDString()) for network in self._conn.listAllNetworks() ]
[docs] def clone(self, domain_name: str) -> str: """Clone the specified domain. Returns: name of the cloned domain """ pass