"""Module for interaction with the QEMU Monitor of a domain.
From the QEMU Monitor documentation
(https://www.qemu.org/docs/master/system/monitor.html).
The QEMU monitor is used to give complex commands to the QEMU emulator.
In case of ForTrace++ it is used to control the mouse and the keyboard of a domain.
Furthermore, it can be used to mount devices on a domain.
"""
import datetime
import pathlib
from collections import deque
from enum import Enum
from time import sleep
from typing import Callable
import cv2
import libvirt
import numpy as np
from libvirt_qemu import VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP, qemuMonitorCommand
from fortrace.utility.image_processing.image_similarity import image_difference
from fortrace.utility.image_processing.opencv_utils import read_image_gray_scale
from fortrace.utility.logger_helper import setup_logger
logger = setup_logger(__name__)
[docs]
class Mouse:
"""Mouse pointer, controlled via QEMUMonitor."""
def __init__(
self, domain: libvirt.virDomain, take_screenshot: Callable[[int | None], bytes]
):
"""Initialize Mouse pointer class.
From within ForTrace call the public init method.
Args:
domain: handle to active domain
take_screenshot:
"""
self._movement_stack = deque(maxlen=10)
self._domain = domain
self._take_screenshot = take_screenshot
self._mouse_ptr = None # template of mouse ptr, set in find_mouse_ptr
self._display_dimensions = (0, 0)
[docs]
def init(self):
"""Initialize the mouse class"""
self._update_display_dimension()
self.find_and_zero_mouse_ptr()
def _clear_stack(self):
self._movement_stack.clear()
[docs]
def get_position(self, position: int = -1) -> tuple[int, int]:
"""Return n-th position in movement stack.
Args:
position: use '-1' to return the last position,
Returns:
(x,y) coordinates of the mouse pointer
"""
return self._movement_stack[position]
[docs]
def find_and_zero_mouse_ptr(self):
"""Method to find the mouse pointer on the domain screen.
The determined position is put on the movement stack.
"""
screen_0 = read_image_gray_scale(self._take_screenshot())
threshold, screen_0_t = cv2.threshold(
screen_0, 0, 255, cv2.THRESH_BINARY | cv2.THRESH_OTSU
)
qemuMonitorCommand(
self._domain,
f"mouse_move 100 100",
VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP,
) # move mouse ptr to the right
sleep(1)
screen_1 = read_image_gray_scale(self._take_screenshot())
_, screen_1_t = cv2.threshold(screen_1, threshold, 255, cv2.THRESH_BINARY)
bounding_boxes, area = image_difference(screen_0_t, screen_1_t)
logger.debug("Mouse bounding boxes %s and area %s", bounding_boxes, area)
try:
mouse_ptr_box = max(
bounding_boxes, key=lambda x: x[0]
) # take the bounding box that is more to the right
logger.debug("Chose %s as mouse bounding box", mouse_ptr_box)
self._mouse_ptr = screen_1[
mouse_ptr_box[1] : mouse_ptr_box[3], mouse_ptr_box[0] : mouse_ptr_box[2]
]
top_left = mouse_ptr_box[:2]
logger.debug("Initial mouse position %s.", top_left)
except ValueError:
logger.warning(
"Cannot determine initial position of mouse pointer, thus no"
" pattern of it."
)
# zero mouse pointer
self._move_in_steps(-10000, -10000, 0)
self._movement_stack.append((0, 0))
logger.debug("Zeroed mouse pointer in top left corner.")
def _update_display_dimension(self):
"""Determine the correct display dimensions once a domain has started.
Has to be called to move the mouse correctly.
"""
if self._domain.isActive():
# to get screen dimensions
img_arr = np.frombuffer(self._take_screenshot(), dtype=np.uint8)
height, width = cv2.imdecode(img_arr, cv2.IMREAD_GRAYSCALE).shape
self._display_dimensions = [width, height]
def _move_in_steps(self, dx: int, dy: int, dz: int):
"""Move the mouse in steps to circumvent
Operating systems seem to introduce a barrier to the maximum movement a mouse
can make. In Windows 10, this barrier seems to be around 500 px. With this
method it is made sure that the mouse movement is done in smaller steps, so it
remains precise.
Args:
dx: movement in pixels in x-direction
dy: movement in pixels in y-direction
dz: movement in scroll-wheel steps in z-direction
"""
q_x, r_x = divmod(dx, 100)
q_y, r_y = divmod(dy, 100)
for _ in range(abs(q_x)):
qemuMonitorCommand(
self._domain,
f"mouse_move {100 if dx >= 0 else -100} 0 0",
VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP,
)
for _ in range(abs(q_y)):
qemuMonitorCommand(
self._domain,
f"mouse_move 0 {100 if dy >= 0 else -100} 0",
VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP,
)
for _ in range(abs(dz)):
qemuMonitorCommand(
self._domain,
f"mouse_move 0 0 {1 if dz >= 0 else -1}",
VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP,
)
qemuMonitorCommand(
self._domain,
f"mouse_move {r_x} {r_y}",
VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP,
)
[docs]
def move(self, dx: int = 0, dy: int = 0, dz: int = 0, absolute: bool = False):
"""Move the mouse to specified coordinates.
The origin is the top left corner of the screen.
Args:
dx: x-direction [px] (positive: move to right, negative: move to left)
dy: y-direction [px] (positive: move down, negative: move up)
dz: z-direction [scroll wheel steps] (positive: move up, negative move down)
absolute: place the mouse to an absolute position specified by dx and dy
"""
if len(self._movement_stack) == 0:
self.find_and_zero_mouse_ptr()
x, y = self.get_position()
if absolute:
if not (
0 <= dx <= self._display_dimensions[0]
and 0 <= dy <= self._display_dimensions[1]
):
logger.warning("Mouse position out of bounds")
return
self._move_in_steps((dx - x), (dy - y), dz)
self._movement_stack.append((dx, dy))
else:
if not (0 <= (x + dx) <= self._display_dimensions[0]) or not (
0 <= (y + dy) <= self._display_dimensions[1]
):
logger.warning("Mouse position out of bounds")
return
self._move_in_steps(dx, dy, dz)
self._movement_stack.append((x + dx, y + dy))
[docs]
def move_back(self, position: int):
"""Move mouse back to specified position.
Make the selected position the latest in movement stack of mouse.
Args:
position: position in mouse movement stack to move back to
"""
x = y = 0
for _ in range(position):
x, y = self._movement_stack.pop()
self.move(x, y, absolute=True)
[docs]
def click(self, button: MouseButton = MouseButton.LEFT, number_of_clicks: int = 1):
"""Perform n mouse clicks at current position.
Args:
button: which button to press
number_of_clicks: how many clicks to perform
"""
for _ in range(number_of_clicks):
qemuMonitorCommand(
self._domain,
f"mouse_button {button.value}",
VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP,
)
sleep(0.125)
qemuMonitorCommand(
self._domain, "mouse_button 0", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP
)
sleep(0.125)
[docs]
def move_and_click(
self,
dx: int,
dy: int,
dz: int | None = None,
absolute: bool = False,
button: MouseButton = MouseButton.LEFT,
number_of_clicks: int = 1,
):
"""Move the mouse to a specific position and perform a click there.
Args:
dx: x-direction [px] (positive: move to right, negative: move to left)
dy: y-direction [px] (positive: move down, negative: move up)
dz: z-direction [scroll wheel steps] (positive: move up, negative: move down)
absolute: place the mouse to an absolute position
button: which button to press
number_of_clicks: how many clicks to perform
"""
self.move(dx, dy, dz, absolute)
sleep(0.125)
self.click(button, number_of_clicks)
[docs]
def drag(
self,
dx: int,
dy: int,
dz: int | None = None,
absolute: bool = False,
button: MouseButton = MouseButton.LEFT,
):
"""Drag an item from current position to specified position.
Press a mouse button at the current position, go to specified position and release the button.
Args:
dx: x-direction (positive: move to right, negative: move to left)
dy: y-direction (positive: move down, negative: move up)
dz: z-direction=scroll-wheel
absolute: place the mouse to an absolute position
button: which button to press
"""
qemuMonitorCommand(
self._domain,
f"mouse_button {button.value}",
VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP,
)
self.move(dx, dy, dz, absolute)
qemuMonitorCommand(
self._domain, "mouse_button 0", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP
)
[docs]
class QEMUMonitorSession:
"""Representation of the QEMU Monitor connection to one specific domain.
This class wraps the libvirt_qemu bindings to provide handy methods for
interaction with a domain.
Attributes:
_mouse: Handle to the mouse pointer object
_special_keys_mapping: list of key-codes used by qemu on a US layout
"""
_mouse: Mouse
# us-mapping
# list of key-codes: https://en.wikibooks.org/wiki/QEMU/Monitor#sendkey_keys
# TODO: think about specifying keyboard layout in qemu
# https://wiki.archlinux.org/title/QEMU#Keyboard_seems_broken_or_the_arrow_keys_do_not_work
_special_keys_mapping = {
"`": "grave_accent",
"~": "shift-grave_accent",
" ": "spc",
"!": "shift-1",
"@": "shift-2",
"#": "shift-3",
"$": "shift-4",
"%": "shift-5",
"^": "shift-6",
"&": "shift-7",
"*": "asterisk",
"(": "shift-9",
")": "shift-0",
"-": "minus",
"_": "shift-minus",
"+": "kp_add",
"=": "equal",
"[": "bracket_left",
"{": "shift-bracket_left",
"]": "bracket_right",
"}": "shift-bracket_right",
"\\": "backslash",
"|": "shift-backslash",
";": "semicolon",
":": "shift-semicolon",
"'": "apostrophe",
'"': "shift-apostrophe",
",": "comma",
"<": "shift-comma",
".": "dot",
">": "shift-dot",
"/": "slash",
"?": "shift-slash",
}
def __init__(
self,
domain: libvirt.virDomain,
take_screenshot: Callable[[int | None], bytes],
domain_output_path: pathlib.Path,
):
self._domain = domain
self._domain_output_path = domain_output_path
self._take_screenshot = (
take_screenshot # as screenshot is a libvirt functionality
)
self._mouse = Mouse(self._domain, take_screenshot)
@property
def mouse(self):
return self._mouse
[docs]
def take_screenshot(self, screen: int = 0):
"""See documentation of VirshDomain.take_screenshot."""
return self._take_screenshot(screen)
[docs]
def save_screenshot(self, name: str | None = None):
"""Save a screenshot of the domain to path using qemu monitor command.
Args:
name: if no name is specified, the current time is used. File ending .ppm is
automatically appended
"""
if name is None:
name = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
qemuMonitorCommand(
self._domain,
f"screendump {pathlib.Path(self._domain_output_path, name)}.ppm",
VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP,
)
[docs]
def send_text(self, text: str, end_ret: bool = False):
"""Send the specified text to the active domain.
Args:
text: text to be sent
end_ret: should return be pressed afterward?
"""
for character in text:
if character.isupper():
qemuMonitorCommand(
self._domain,
f"sendkey shift-{character.lower()}",
VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP,
)
elif character in self._special_keys_mapping:
qemuMonitorCommand(
self._domain,
f"sendkey {self._special_keys_mapping[character]}",
VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP,
)
else:
qemuMonitorCommand(
self._domain,
f"sendkey {character}",
VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP,
)
sleep(0.05) # so no keystrokes are omitted
# press return to type command
if end_ret:
sleep(0.5) # is necessary so ret is not omitted
qemuMonitorCommand(
self._domain, "sendkey ret", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP
)
[docs]
def direct_command(self, command: str) -> str:
"""Directly send a command to QEMU monitor.
Args:
command: qemu-monitor command to be sent
Returns:
the output of the command
"""
return qemuMonitorCommand(
self._domain, command, VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP
)
[docs]
def send_key_combination(self, key_combination: str, times: int = 1):
"""Send a key combination via the monitor.
Args:
key_combination: string must consist of qemu-monitor keycodes separated by
'-', e.g. 'alt-f4'
times: how many times the key combination should be entered
"""
for _ in range(times):
sleep(1)
qemuMonitorCommand(
self._domain,
f"sendkey {key_combination}",
VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP,
)