import logging
import time
import pexpect
from pexpect import replwrap
from fortrace.utility.console_applications.console_application import (
GenericConsoleApplication,
)
from fortrace.utility.console_applications.console_application_factory import (
get_console_application,
)
from fortrace.utility.distribution_constants import PackageManager, ShellType
from fortrace.utility.logger_helper import setup_logger
from fortrace.utility.string_filtering import ansi_escape, unicode_control_escape
logger = setup_logger(__name__)
[docs]
class VirshConsole:
"""Class to handle interaction with virsh console connections.
This class is only supported on Linux and macOS domains, since Windows does
not implement this technique.
"""
def __init__(
self,
domain: str,
user: str,
password: str,
shell: ShellType,
hypervisor: str = "qemu:///system",
timeout: int = 2,
sudo_timeout: float = 300,
):
"""Constructs and opens a shell via virsh console.
The specified user is logged in, and the prompt is changed to simplify the code of this class.
Args:
domain: name of the domain to connect to
user: username
password: password to specified user account
shell: which type of shell should be expected --> determines how the prompt is changed
hypervisor: the address of the used hypervisor
timeout: timeout [s] for commands before a TIMEOUT exception is raised. This value sets the default timeout
value for the whole console session.
Can be overwritten for each command (see VirshConsole.run_command)
sudo_timeout: timeout [s] for credential caching. Set it to the systems value. Will not work otherwise.
"""
self._user = user
self._password = password
self._shell_type = shell
self._sudo_timeout = sudo_timeout
self._last_sudo = 0
# TODO: add --devname flag to command
# consoles will be named console0, console1 --> extract from domain config how many consoles there are
self._virsh = pexpect.spawn(
f"virsh --connect {hypervisor} console {domain} --force",
echo=False,
timeout=timeout,
encoding="utf-8",
)
self._virsh.expect("Connected to domain .*")
while True:
try:
self._virsh.sendline()
self._virsh.expect(".* login:")
break
except pexpect.exceptions.ExceptionPexpect:
pass
self._virsh.sendline(user)
self._virsh.expect("Password:")
self._virsh.sendline(password)
# now it is a REPL shell
# modify the prompt to enable directory changes without updating the prompt pattern each time
if shell == ShellType.BASH:
ps1 = "\\u >"
ps2 = "~~"
elif shell == ShellType.ZSH:
ps1 = "%n >"
ps2 = "~~"
else:
raise ValueError("ShellType not implemented")
prompt_change = f" PS1='{ps1}' PS2='{ps2}' PROMPT_COMMAND=''"
# open a REPL shell
self._pty = replwrap.REPLWrapper(
self._virsh, "\\$", prompt_change, f"{user} >", "~~ "
)
self._default_prompt = self._pty.prompt
self._default_continuation_prompt = self._pty.continuation_prompt
# self._pty.run_command('Defaults passwd_timeout=0') # ask every time for sudo passwd --> easier
[docs]
def run_command(
self,
command: str,
elevated: bool = False,
timeout: int | None = -1,
new_prompt: str | None = None,
) -> str | list[str]:
"""Run a program in the opened console session.
Run a command in pty that might be elevated or result in a new prompt. Do not use this method to call editors or
start graphical programs!
Args:
command: the command to be run
elevated: should the command run elevated, e.g. sudo
timeout: overwrite the standard timeout for a command (e.g. when downloading something from the internet)
new_prompt: if the command changes the prompt, provide the new expected prompt here (see tests, for example)
Returns:
adjusted string written to stdout
"""
if elevated:
command = "sudo " + command # FIXME: What about Windows?
if (time.time() - self._last_sudo) >= self._sudo_timeout:
self._pty.prompt = f"[sudo] password for {self._user}:"
self._pty.run_command(command, timeout)
self._pty.prompt = (
self._default_prompt if not new_prompt else new_prompt
)
self._last_sudo = (
time.time()
) # passwd_timeout is not refreshed by sudo commands
stdout = self._pty.run_command(self._password, timeout)
return self._adjust_stdout(stdout, command)
else:
if new_prompt is not None:
self._pty.prompt = new_prompt
stdout = self._pty.run_command(command, timeout)
return self._adjust_stdout(stdout, command)
else:
stdout = self._pty.run_command(command, timeout)
return self._adjust_stdout(stdout, command)
@staticmethod
def _adjust_stdout(stdout: str, command: str) -> str | list[str]:
"""Formatting method for console output.
This method will remove leading and trailing spaces from the output, removes ansi characters, and splits
multiline outputs into a list. If the entered command is the first item of the list, it will be removed for
convenience.
Args:
stdout: The console output to be processed
command: The entered command producing the output
Returns:
If the output is a single line, a string is returned. Multiline outputs result in a list of strings.
Examples:
> echo Hello --> _adjust_stdout() --> "Hello"
> whoami && echo Hello --> _adjust_stdout() --> ["user", "Hello"]
"""
stdout = ansi_escape(stdout).splitlines() # e.g. BASH uses ansi encoding
stdout = [
unicode_control_escape(line).strip() for line in stdout
] # e.g. ZSH uses unicode
if stdout[0].find(command) != -1:
stdout = stdout[1:]
stdout = list(filter(None, stdout))
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Entered command %s", command)
logger.debug("\t%s", stdout)
if len(stdout) == 1:
return stdout[0]
else:
return stdout
[docs]
def restore_prompt(self):
"""Restores the prompt of the console to the initial one.
Call this method BEFORE exiting a program with a different prompt (can be called in their __del__ method)
Returns:
"""
self._pty.prompt = self._default_prompt
self._pty.continuation_prompt = self._default_continuation_prompt
[docs]
def install_packages(self, packages: list[str], package_manager: PackageManager):
"""Install packages via a package manager through the console.
Provide a list of packages that need to be installed. Make sure the packages exist before calling this method.
Args:
packages: packages to be installed (use the names of the package manager)
package_manager: As each package installer has a different approach to install packages, provide it here
Returns:
"""
enter_password = False
if (time.time() - self._last_sudo) >= self._sudo_timeout:
self._pty.prompt = f"[sudo] password for {self._user}:"
self._last_sudo = (
time.time()
) # passwd_timeout is not refreshed by sudo commands
enter_password = True
if package_manager == PackageManager.APT:
self._pty.run_command("sudo apt update", 100)
if enter_password:
self.restore_prompt()
self._pty.run_command(self._password, 100)
command = "yes | sudo apt install " + " ".join(
packages
) # circumvent to set prompt
self._pty.run_command(
command, 300
) # FIXME: maybe need to use virsh because of new lines...
elif package_manager == PackageManager.PACMAN:
pass
else:
raise ValueError("Unknown package manager")
logger.info("Successfully installed the following packages: %s", packages)
# TODO: add more package managers, including Windows'
[docs]
def remove_packages(self, packages: list[str], package_manager: PackageManager):
"""Provide a list of packages to be removed from the guest.
Make sure that any package in the list of provided ones is really installed on the system.
Args:
packages: packages to be removed
package_manager: As each package installer has a different approach to install packages, provide it here
"""
enter_password = False
if (time.time() - self._last_sudo) >= self._sudo_timeout:
self._pty.prompt = f"[sudo] password for {self._user}:"
self._last_sudo = (
time.time()
) # passwd_timeout is not refreshed by sudo commands
enter_password = True
if package_manager == PackageManager.APT:
command = "yes | sudo apt remove " + " ".join(packages)
if enter_password:
self.restore_prompt()
self._pty.run_command(self._password)
self._pty.run_command(command, 100)
elif package_manager == PackageManager.PACMAN:
pass
else:
raise ValueError("Unknown package manager")
logger.info("Removed the following packages: %s", packages)
[docs]
def open_console_application(
self,
command: str,
elevated: bool = False,
timeout: int | None = -1,
new_prompt: str | None = None,
) -> GenericConsoleApplication:
"""Open a console application and retrieve the associated object for interaction.
Opens an application on the terminal, which might render REPLWrapper useless, as sometimes there will be no
prompt until the application is closed.
Args:
command: command (do NOT prepend sudo here)
elevated: should the command run elevated?
timeout: timeout in seconds for command
new_prompt: if there is a new prompt expected after a command is run, specify it here (otherwise a TIMEOUT
exception will be thrown)
Returns:
Handle the opened console application.
"""
name = command.split()[0]
if elevated:
command = "sudo " + command # FIXME: What about Windows?
# TODO: Rework this whole part --> wait short period for password prompt, catch timeout error and continue?
if (time.time() - self._last_sudo) >= self._sudo_timeout:
self._pty.prompt = f"[sudo] password for {self._user}:"
self._pty.run_command(command, timeout)
if (
new_prompt
): # there will be a prompt in the application, so we can continue to use REPLwrapper
self._pty.prompt = new_prompt
self._pty.run_command(self._password, timeout)
else:
self.restore_prompt()
self._virsh.sendline(self._password)
self._last_sudo = (
time.time()
) # sudo commands do not refresh passwd_timeout
else:
if (
new_prompt
): # there will be a prompt in the application, so we can continue to use REPLwrapper
self._pty.prompt = new_prompt
self._pty.run_command(command, timeout)
else:
self.restore_prompt()
self._virsh.sendline(command)
else:
if new_prompt:
self._pty.prompt = new_prompt
self._pty.run_command(command)
else:
self.restore_prompt()
self._virsh.sendline(
command
) # cannot use run_command, as it will wait indefinitely
return get_console_application(name, self._pty)
[docs]
def close(self):
"""Close the console connection."""
self._virsh.sendline("exit") # important to close pty connection properly
if not ShellType.ZSH:
self._virsh.expect(["logout", pexpect.EOF])
@property
def virsh(self):
return self._virsh
@property
def shell_type(self):
return self._shell_type