Source code for fortrace.core.virsh_console

import logging
import time

import pexpect
from pexpect import replwrap

from fortrace.utility.console_applications.console_application import (
    GenericConsoleApplication,
)
from fortrace.utility.console_applications.console_application_factory import (
    get_console_application,
)
from fortrace.utility.distribution_constants import PackageManager, ShellType
from fortrace.utility.logger_helper import setup_logger
from fortrace.utility.string_filtering import ansi_escape, unicode_control_escape

logger = setup_logger(__name__)


[docs] class VirshConsole: """Class to handle interaction with virsh console connections. This class is only supported on Linux and macOS domains, since Windows does not implement this technique. """ def __init__( self, domain: str, user: str, password: str, shell: ShellType, hypervisor: str = "qemu:///system", timeout: int = 2, sudo_timeout: float = 300, ): """Constructs and opens a shell via virsh console. The specified user is logged in, and the prompt is changed to simplify the code of this class. Args: domain: name of the domain to connect to user: username password: password to specified user account shell: which type of shell should be expected --> determines how the prompt is changed hypervisor: the address of the used hypervisor timeout: timeout [s] for commands before a TIMEOUT exception is raised. This value sets the default timeout value for the whole console session. Can be overwritten for each command (see VirshConsole.run_command) sudo_timeout: timeout [s] for credential caching. Set it to the systems value. Will not work otherwise. """ self._user = user self._password = password self._shell_type = shell self._sudo_timeout = sudo_timeout self._last_sudo = 0 # TODO: add --devname flag to command # consoles will be named console0, console1 --> extract from domain config how many consoles there are self._virsh = pexpect.spawn( f"virsh --connect {hypervisor} console {domain} --force", echo=False, timeout=timeout, encoding="utf-8", ) self._virsh.expect("Connected to domain .*") while True: try: self._virsh.sendline() self._virsh.expect(".* login:") break except pexpect.exceptions.ExceptionPexpect: pass self._virsh.sendline(user) self._virsh.expect("Password:") self._virsh.sendline(password) # now it is a REPL shell # modify the prompt to enable directory changes without updating the prompt pattern each time if shell == ShellType.BASH: ps1 = "\\u >" ps2 = "~~" elif shell == ShellType.ZSH: ps1 = "%n >" ps2 = "~~" else: raise ValueError("ShellType not implemented") prompt_change = f" PS1='{ps1}' PS2='{ps2}' PROMPT_COMMAND=''" # open a REPL shell self._pty = replwrap.REPLWrapper( self._virsh, "\\$", prompt_change, f"{user} >", "~~ " ) self._default_prompt = self._pty.prompt self._default_continuation_prompt = self._pty.continuation_prompt # self._pty.run_command('Defaults passwd_timeout=0') # ask every time for sudo passwd --> easier
[docs] def run_command( self, command: str, elevated: bool = False, timeout: int | None = -1, new_prompt: str | None = None, ) -> str | list[str]: """Run a program in the opened console session. Run a command in pty that might be elevated or result in a new prompt. Do not use this method to call editors or start graphical programs! Args: command: the command to be run elevated: should the command run elevated, e.g. sudo timeout: overwrite the standard timeout for a command (e.g. when downloading something from the internet) new_prompt: if the command changes the prompt, provide the new expected prompt here (see tests, for example) Returns: adjusted string written to stdout """ if elevated: command = "sudo " + command # FIXME: What about Windows? if (time.time() - self._last_sudo) >= self._sudo_timeout: self._pty.prompt = f"[sudo] password for {self._user}:" self._pty.run_command(command, timeout) self._pty.prompt = ( self._default_prompt if not new_prompt else new_prompt ) self._last_sudo = ( time.time() ) # passwd_timeout is not refreshed by sudo commands stdout = self._pty.run_command(self._password, timeout) return self._adjust_stdout(stdout, command) else: if new_prompt is not None: self._pty.prompt = new_prompt stdout = self._pty.run_command(command, timeout) return self._adjust_stdout(stdout, command) else: stdout = self._pty.run_command(command, timeout) return self._adjust_stdout(stdout, command)
@staticmethod def _adjust_stdout(stdout: str, command: str) -> str | list[str]: """Formatting method for console output. This method will remove leading and trailing spaces from the output, removes ansi characters, and splits multiline outputs into a list. If the entered command is the first item of the list, it will be removed for convenience. Args: stdout: The console output to be processed command: The entered command producing the output Returns: If the output is a single line, a string is returned. Multiline outputs result in a list of strings. Examples: > echo Hello --> _adjust_stdout() --> "Hello" > whoami && echo Hello --> _adjust_stdout() --> ["user", "Hello"] """ stdout = ansi_escape(stdout).splitlines() # e.g. BASH uses ansi encoding stdout = [ unicode_control_escape(line).strip() for line in stdout ] # e.g. ZSH uses unicode if stdout[0].find(command) != -1: stdout = stdout[1:] stdout = list(filter(None, stdout)) if logger.isEnabledFor(logging.DEBUG): logger.debug("Entered command %s", command) logger.debug("\t%s", stdout) if len(stdout) == 1: return stdout[0] else: return stdout
[docs] def restore_prompt(self): """Restores the prompt of the console to the initial one. Call this method BEFORE exiting a program with a different prompt (can be called in their __del__ method) Returns: """ self._pty.prompt = self._default_prompt self._pty.continuation_prompt = self._default_continuation_prompt
[docs] def install_packages(self, packages: list[str], package_manager: PackageManager): """Install packages via a package manager through the console. Provide a list of packages that need to be installed. Make sure the packages exist before calling this method. Args: packages: packages to be installed (use the names of the package manager) package_manager: As each package installer has a different approach to install packages, provide it here Returns: """ enter_password = False if (time.time() - self._last_sudo) >= self._sudo_timeout: self._pty.prompt = f"[sudo] password for {self._user}:" self._last_sudo = ( time.time() ) # passwd_timeout is not refreshed by sudo commands enter_password = True if package_manager == PackageManager.APT: self._pty.run_command("sudo apt update", 100) if enter_password: self.restore_prompt() self._pty.run_command(self._password, 100) command = "yes | sudo apt install " + " ".join( packages ) # circumvent to set prompt self._pty.run_command( command, 300 ) # FIXME: maybe need to use virsh because of new lines... elif package_manager == PackageManager.PACMAN: pass else: raise ValueError("Unknown package manager") logger.info("Successfully installed the following packages: %s", packages)
# TODO: add more package managers, including Windows'
[docs] def remove_packages(self, packages: list[str], package_manager: PackageManager): """Provide a list of packages to be removed from the guest. Make sure that any package in the list of provided ones is really installed on the system. Args: packages: packages to be removed package_manager: As each package installer has a different approach to install packages, provide it here """ enter_password = False if (time.time() - self._last_sudo) >= self._sudo_timeout: self._pty.prompt = f"[sudo] password for {self._user}:" self._last_sudo = ( time.time() ) # passwd_timeout is not refreshed by sudo commands enter_password = True if package_manager == PackageManager.APT: command = "yes | sudo apt remove " + " ".join(packages) if enter_password: self.restore_prompt() self._pty.run_command(self._password) self._pty.run_command(command, 100) elif package_manager == PackageManager.PACMAN: pass else: raise ValueError("Unknown package manager") logger.info("Removed the following packages: %s", packages)
[docs] def open_console_application( self, command: str, elevated: bool = False, timeout: int | None = -1, new_prompt: str | None = None, ) -> GenericConsoleApplication: """Open a console application and retrieve the associated object for interaction. Opens an application on the terminal, which might render REPLWrapper useless, as sometimes there will be no prompt until the application is closed. Args: command: command (do NOT prepend sudo here) elevated: should the command run elevated? timeout: timeout in seconds for command new_prompt: if there is a new prompt expected after a command is run, specify it here (otherwise a TIMEOUT exception will be thrown) Returns: Handle the opened console application. """ name = command.split()[0] if elevated: command = "sudo " + command # FIXME: What about Windows? # TODO: Rework this whole part --> wait short period for password prompt, catch timeout error and continue? if (time.time() - self._last_sudo) >= self._sudo_timeout: self._pty.prompt = f"[sudo] password for {self._user}:" self._pty.run_command(command, timeout) if ( new_prompt ): # there will be a prompt in the application, so we can continue to use REPLwrapper self._pty.prompt = new_prompt self._pty.run_command(self._password, timeout) else: self.restore_prompt() self._virsh.sendline(self._password) self._last_sudo = ( time.time() ) # sudo commands do not refresh passwd_timeout else: if ( new_prompt ): # there will be a prompt in the application, so we can continue to use REPLwrapper self._pty.prompt = new_prompt self._pty.run_command(command, timeout) else: self.restore_prompt() self._virsh.sendline(command) else: if new_prompt: self._pty.prompt = new_prompt self._pty.run_command(command) else: self.restore_prompt() self._virsh.sendline( command ) # cannot use run_command, as it will wait indefinitely return get_console_application(name, self._pty)
[docs] def close(self): """Close the console connection.""" self._virsh.sendline("exit") # important to close pty connection properly if not ShellType.ZSH: self._virsh.expect(["logout", pexpect.EOF])
@property def virsh(self): return self._virsh @property def shell_type(self): return self._shell_type