Source code for fortrace.core.qemu_monitor

"""Module for interaction with the QEMU Monitor of a domain.

From the QEMU Monitor documentation
(https://www.qemu.org/docs/master/system/monitor.html).
The QEMU monitor is used to give complex commands to the QEMU emulator.

In case of ForTrace++ it is used to control the mouse and the keyboard of a domain.
Furthermore, it can be used to mount devices on a domain.
"""

import datetime
import pathlib
from collections import deque
from enum import Enum
from time import sleep
from typing import Callable

import cv2
import libvirt
import numpy as np
from libvirt_qemu import VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP, qemuMonitorCommand

from fortrace.utility.image_processing.image_similarity import image_difference
from fortrace.utility.image_processing.opencv_utils import read_image_gray_scale
from fortrace.utility.logger_helper import setup_logger

logger = setup_logger(__name__)


[docs] class MouseButton(Enum): """Map QEMUs codes to mouse buttons (taken from documentation).""" LEFT = 1 RIGHT = 4 MIDDLE = 2
[docs] class Mouse: """Mouse pointer, controlled via QEMUMonitor.""" def __init__( self, domain: libvirt.virDomain, take_screenshot: Callable[[int | None], bytes] ): """Initialize Mouse pointer class. From within ForTrace call the public init method. Args: domain: handle to active domain take_screenshot: """ self._movement_stack = deque(maxlen=10) self._domain = domain self._take_screenshot = take_screenshot self._mouse_ptr = None # template of mouse ptr, set in find_mouse_ptr self._display_dimensions = (0, 0)
[docs] def init(self): """Initialize the mouse class""" self._update_display_dimension() self.find_and_zero_mouse_ptr()
def _clear_stack(self): self._movement_stack.clear()
[docs] def get_position(self, position: int = -1) -> tuple[int, int]: """Return n-th position in movement stack. Args: position: use '-1' to return the last position, Returns: (x,y) coordinates of the mouse pointer """ return self._movement_stack[position]
[docs] def find_and_zero_mouse_ptr(self): """Method to find the mouse pointer on the domain screen. The determined position is put on the movement stack. """ screen_0 = read_image_gray_scale(self._take_screenshot()) threshold, screen_0_t = cv2.threshold( screen_0, 0, 255, cv2.THRESH_BINARY | cv2.THRESH_OTSU ) qemuMonitorCommand( self._domain, f"mouse_move 100 100", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP, ) # move mouse ptr to the right sleep(1) screen_1 = read_image_gray_scale(self._take_screenshot()) _, screen_1_t = cv2.threshold(screen_1, threshold, 255, cv2.THRESH_BINARY) bounding_boxes, area = image_difference(screen_0_t, screen_1_t) logger.debug("Mouse bounding boxes %s and area %s", bounding_boxes, area) try: mouse_ptr_box = max( bounding_boxes, key=lambda x: x[0] ) # take the bounding box that is more to the right logger.debug("Chose %s as mouse bounding box", mouse_ptr_box) self._mouse_ptr = screen_1[ mouse_ptr_box[1] : mouse_ptr_box[3], mouse_ptr_box[0] : mouse_ptr_box[2] ] top_left = mouse_ptr_box[:2] logger.debug("Initial mouse position %s.", top_left) except ValueError: logger.warning( "Cannot determine initial position of mouse pointer, thus no" " pattern of it." ) # zero mouse pointer self._move_in_steps(-10000, -10000, 0) self._movement_stack.append((0, 0)) logger.debug("Zeroed mouse pointer in top left corner.")
def _update_display_dimension(self): """Determine the correct display dimensions once a domain has started. Has to be called to move the mouse correctly. """ if self._domain.isActive(): # to get screen dimensions img_arr = np.frombuffer(self._take_screenshot(), dtype=np.uint8) height, width = cv2.imdecode(img_arr, cv2.IMREAD_GRAYSCALE).shape self._display_dimensions = [width, height] def _move_in_steps(self, dx: int, dy: int, dz: int): """Move the mouse in steps to circumvent Operating systems seem to introduce a barrier to the maximum movement a mouse can make. In Windows 10, this barrier seems to be around 500 px. With this method it is made sure that the mouse movement is done in smaller steps, so it remains precise. Args: dx: movement in pixels in x-direction dy: movement in pixels in y-direction dz: movement in scroll-wheel steps in z-direction """ q_x, r_x = divmod(dx, 100) q_y, r_y = divmod(dy, 100) for _ in range(abs(q_x)): qemuMonitorCommand( self._domain, f"mouse_move {100 if dx >= 0 else -100} 0 0", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP, ) for _ in range(abs(q_y)): qemuMonitorCommand( self._domain, f"mouse_move 0 {100 if dy >= 0 else -100} 0", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP, ) for _ in range(abs(dz)): qemuMonitorCommand( self._domain, f"mouse_move 0 0 {1 if dz >= 0 else -1}", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP, ) qemuMonitorCommand( self._domain, f"mouse_move {r_x} {r_y}", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP, )
[docs] def move(self, dx: int = 0, dy: int = 0, dz: int = 0, absolute: bool = False): """Move the mouse to specified coordinates. The origin is the top left corner of the screen. Args: dx: x-direction [px] (positive: move to right, negative: move to left) dy: y-direction [px] (positive: move down, negative: move up) dz: z-direction [scroll wheel steps] (positive: move up, negative move down) absolute: place the mouse to an absolute position specified by dx and dy """ if len(self._movement_stack) == 0: self.find_and_zero_mouse_ptr() x, y = self.get_position() if absolute: if not ( 0 <= dx <= self._display_dimensions[0] and 0 <= dy <= self._display_dimensions[1] ): logger.warning("Mouse position out of bounds") return self._move_in_steps((dx - x), (dy - y), dz) self._movement_stack.append((dx, dy)) else: if not (0 <= (x + dx) <= self._display_dimensions[0]) or not ( 0 <= (y + dy) <= self._display_dimensions[1] ): logger.warning("Mouse position out of bounds") return self._move_in_steps(dx, dy, dz) self._movement_stack.append((x + dx, y + dy))
[docs] def move_back(self, position: int): """Move mouse back to specified position. Make the selected position the latest in movement stack of mouse. Args: position: position in mouse movement stack to move back to """ x = y = 0 for _ in range(position): x, y = self._movement_stack.pop() self.move(x, y, absolute=True)
[docs] def click(self, button: MouseButton = MouseButton.LEFT, number_of_clicks: int = 1): """Perform n mouse clicks at current position. Args: button: which button to press number_of_clicks: how many clicks to perform """ for _ in range(number_of_clicks): qemuMonitorCommand( self._domain, f"mouse_button {button.value}", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP, ) sleep(0.125) qemuMonitorCommand( self._domain, "mouse_button 0", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP ) sleep(0.125)
[docs] def move_and_click( self, dx: int, dy: int, dz: int | None = None, absolute: bool = False, button: MouseButton = MouseButton.LEFT, number_of_clicks: int = 1, ): """Move the mouse to a specific position and perform a click there. Args: dx: x-direction [px] (positive: move to right, negative: move to left) dy: y-direction [px] (positive: move down, negative: move up) dz: z-direction [scroll wheel steps] (positive: move up, negative: move down) absolute: place the mouse to an absolute position button: which button to press number_of_clicks: how many clicks to perform """ self.move(dx, dy, dz, absolute) sleep(0.125) self.click(button, number_of_clicks)
[docs] def drag( self, dx: int, dy: int, dz: int | None = None, absolute: bool = False, button: MouseButton = MouseButton.LEFT, ): """Drag an item from current position to specified position. Press a mouse button at the current position, go to specified position and release the button. Args: dx: x-direction (positive: move to right, negative: move to left) dy: y-direction (positive: move down, negative: move up) dz: z-direction=scroll-wheel absolute: place the mouse to an absolute position button: which button to press """ qemuMonitorCommand( self._domain, f"mouse_button {button.value}", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP, ) self.move(dx, dy, dz, absolute) qemuMonitorCommand( self._domain, "mouse_button 0", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP )
[docs] class QEMUMonitorSession: """Representation of the QEMU Monitor connection to one specific domain. This class wraps the libvirt_qemu bindings to provide handy methods for interaction with a domain. Attributes: _mouse: Handle to the mouse pointer object _special_keys_mapping: list of key-codes used by qemu on a US layout """ _mouse: Mouse # us-mapping # list of key-codes: https://en.wikibooks.org/wiki/QEMU/Monitor#sendkey_keys # TODO: think about specifying keyboard layout in qemu # https://wiki.archlinux.org/title/QEMU#Keyboard_seems_broken_or_the_arrow_keys_do_not_work _special_keys_mapping = { "`": "grave_accent", "~": "shift-grave_accent", " ": "spc", "!": "shift-1", "@": "shift-2", "#": "shift-3", "$": "shift-4", "%": "shift-5", "^": "shift-6", "&": "shift-7", "*": "asterisk", "(": "shift-9", ")": "shift-0", "-": "minus", "_": "shift-minus", "+": "kp_add", "=": "equal", "[": "bracket_left", "{": "shift-bracket_left", "]": "bracket_right", "}": "shift-bracket_right", "\\": "backslash", "|": "shift-backslash", ";": "semicolon", ":": "shift-semicolon", "'": "apostrophe", '"': "shift-apostrophe", ",": "comma", "<": "shift-comma", ".": "dot", ">": "shift-dot", "/": "slash", "?": "shift-slash", } def __init__( self, domain: libvirt.virDomain, take_screenshot: Callable[[int | None], bytes], domain_output_path: pathlib.Path, ): self._domain = domain self._domain_output_path = domain_output_path self._take_screenshot = ( take_screenshot # as screenshot is a libvirt functionality ) self._mouse = Mouse(self._domain, take_screenshot) @property def mouse(self): return self._mouse
[docs] def take_screenshot(self, screen: int = 0): """See documentation of VirshDomain.take_screenshot.""" return self._take_screenshot(screen)
[docs] def save_screenshot(self, name: str | None = None): """Save a screenshot of the domain to path using qemu monitor command. Args: name: if no name is specified, the current time is used. File ending .ppm is automatically appended """ if name is None: name = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") qemuMonitorCommand( self._domain, f"screendump {pathlib.Path(self._domain_output_path, name)}.ppm", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP, )
[docs] def send_text(self, text: str, end_ret: bool = False): """Send the specified text to the active domain. Args: text: text to be sent end_ret: should return be pressed afterward? """ for character in text: if character.isupper(): qemuMonitorCommand( self._domain, f"sendkey shift-{character.lower()}", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP, ) elif character in self._special_keys_mapping: qemuMonitorCommand( self._domain, f"sendkey {self._special_keys_mapping[character]}", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP, ) else: qemuMonitorCommand( self._domain, f"sendkey {character}", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP, ) sleep(0.05) # so no keystrokes are omitted # press return to type command if end_ret: sleep(0.5) # is necessary so ret is not omitted qemuMonitorCommand( self._domain, "sendkey ret", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP )
[docs] def direct_command(self, command: str) -> str: """Directly send a command to QEMU monitor. Args: command: qemu-monitor command to be sent Returns: the output of the command """ return qemuMonitorCommand( self._domain, command, VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP )
[docs] def send_key_combination(self, key_combination: str, times: int = 1): """Send a key combination via the monitor. Args: key_combination: string must consist of qemu-monitor keycodes separated by '-', e.g. 'alt-f4' times: how many times the key combination should be entered """ for _ in range(times): sleep(1) qemuMonitorCommand( self._domain, f"sendkey {key_combination}", VIR_DOMAIN_QEMU_MONITOR_COMMAND_HMP, )