import os
from typing import Optional
import libvirt
from fortrace.core.virsh_domain import GraphicalVirshDomain, VirshDomain
from fortrace.utility.desktop_environments.desktop_environment import (
DesktopEnvironmentType,
)
from fortrace.utility.distribution_constants import OSType
from fortrace.utility.logger_helper import setup_logger
logger = setup_logger(__name__)
[docs]
class VirshSession:
"""ForTrace's representation of the hypervisor connection."""
_domains: list[VirshDomain]
_networks: list[tuple[libvirt.virNetwork, bool]]
def __init__(
self,
session_output: os.PathLike,
hypervisor: str = "qemu:///system",
):
"""Creates a new handle to the Hypervisor.
Creates a new ForTrace session, ready to register domains and start the
simulation.
Args:
session_output: the base directory of the session. All output files will be
created here and below
hypervisor: the hypervisor this session should connect to
"""
self._session_output = session_output
os.makedirs(self._session_output, exist_ok=True)
self._domains = []
self._networks = []
try:
self._conn = libvirt.open(hypervisor)
except libvirt.libvirtError as e:
logger.error(repr(e))
exit(1)
def _create_network(self, network_name: str):
"""Create a network with the given name.
Note:
The network has to exist, in order to be created
"""
try:
try:
network = self._conn.networkLookupByName(network_name)
except libvirt.libvirtError:
network = self._conn.networkLookupByUUIDString(network_name)
if not (initial_state := network.isActive()):
network.create()
self._networks.append((network, initial_state))
except libvirt.libvirtError as e:
logger.error(repr(e))
raise e
def __del__(self):
for domain in self._domains:
if domain is not None:
domain.stop_sniffer()
domain.destroy()
for network, initial_state in self._networks:
if not initial_state:
network.destroy()
self._conn.close()
@property
def domains(self):
return self._domains
@property
def networks(self):
return [network[0] for network in self._networks]
[docs]
def create_domain(
self,
domain_name: str,
os_type: OSType,
domain_network: str | None = "default",
desktop_environment: DesktopEnvironmentType | None = None,
) -> VirshDomain | GraphicalVirshDomain:
"""Creates a new domain with the given name.
The domain is not started, which is done via the domain object.
Args:
domain_name: name or UUID of the domain to be created
os_type: type of OS to expect
domain_network: name/UUIDString of the network the domain should connect to
desktop_environment: if domain is a graphical target, please provide the
expected desktop environment
Note:
'Create' uses the meaning as it is defined by libvirt. A domain to be
created has to be installed on the system.
Returns:
a handle to the domain (graphical or non-graphical)
"""
if domain_network:
self._create_network(domain_network)
if desktop_environment is not None:
self._domains.append(
GraphicalVirshDomain(
domain_name,
self._conn,
domain_network,
os_type,
self._session_output,
desktop_environment,
)
)
else:
self._domains.append(
VirshDomain(
domain_name,
self._conn,
domain_network,
os_type,
self._session_output,
)
)
return self._domains[-1]
[docs]
def remove_domain(self, domain: VirshDomain):
"""Remove the provided domain from the session.
The removed domain will be powered down (with force, if necessary).
"""
try:
self._domains.remove(domain)
except ValueError as exc:
logger.error(
"Cannot remove domain %s, since it is not in the list of managed "
"domains",
domain.domain_name,
)
raise ValueError(
f"The provided domain {domain.domain_name} is not under control of this"
f" session, thus cannot be removed."
) from exc
domain.destroy()
[docs]
def get_domain_by_name(self, domain_name: str) -> VirshDomain:
"""Get a domain based on its name of the ForTrace related domains.
Args:
domain_name: name of the domain to retrieve
Returns:
Domain, if there is one with the given name.
"""
return next(
iter(
[
domain
for domain in self._domains
if domain.domain.name() == domain_name
]
),
None,
)
[docs]
def get_domain_by_id(self, domain_id: str) -> Optional[VirshDomain]:
"""Get the domain with the specified domain from the list of session domains.
Args:
domain_id: UUIDString of the domain to be retrieved
Returns:
The VirshDomain with the given UUID if it is in the list of session domains.
"""
return next(
iter(
[
domain
for domain in self._domains
if domain.domain.UUIDString() == domain_id
]
),
None,
)
[docs]
def get_network_by_name(self, network_name: str) -> Optional[libvirt.virNetwork]:
"""Get a network based on its name of the ForTrace networks.
Args:
network_name: name of the network to retrieve
Returns:
The libvirt virNetwork object or None
"""
return next(
iter(
[
network[0]
for network in self._networks
if network[0].name() == network_name
]
),
None,
)
[docs]
def get_network_by_id(self, network_id: str) -> Optional[libvirt.virNetwork]:
"""Get a network based on its UUID of the ForTrace networks.
Args:
network_id: UUID string of the network to retrieve
Returns:
The libvirt virNetwork object or None
"""
return next(
iter(
[
network[0]
for network in self._networks
if network[0].UUIDString() == network_id
]
),
None,
)
[docs]
def list_available_domains(self) -> list[tuple[str, str]]:
"""List all available domains of the hypervisor with their UUID."""
return [
(domain.name(), domain.UUIDString())
for domain in self._conn.listAllDomains()
]
[docs]
def list_available_networks(self) -> list[tuple[str, str]]:
"""List all available networks of the hypervisor with their UUID."""
return [
(network.name(), network.UUIDString())
for network in self._conn.listAllNetworks()
]
[docs]
def clone(self, domain_name: str) -> str:
"""Clone the specified domain.
Returns:
name of the cloned domain
"""
pass