Source code for fortrace.utility.server_interaction.ftp_interaction

import ftplib
import io
import os
import pathlib
import random
from typing import BinaryIO

import paramiko

from fortrace.core.virsh_domain import VirshDomain
from fortrace.utility.exceptions import ServerInteractionException
from fortrace.utility.server_interaction.server_interaction import (
    GenericServerInteraction,
    ServerFeedback,
    logger,
)


[docs] class FTPServerInteraction(GenericServerInteraction): """Class for interaction with a domain hosting an FTP server. Attributes: _generated_files: list of files on the server, grouped by users/owners _ftp: handle to FTP/FTPS connection """ _generated_files = dict[str, list[pathlib.Path]] _ftp: ftplib.FTP | ftplib.FTP_TLS def __init__(self, server: VirshDomain, config: dict, ftps: bool = False): """Create an object for interaction with an FTP server Args: server: handle to the server domain config: configuration of the server domain (plain config, no subsection of it) ftps: whether to use FTP over TLS """ super().__init__(server, config) self._ftp = None self._ftps = ftps self._generated_files = dict.fromkeys(self._server_config.get("users", []), []) self._extract_login_data() def _login(self, user: str | None): """Log in as specific user. Will try to find the password for the user from config. Throws RuntimeError if login is unsuccessful. Args: user: username """ password = self._login_data[user] try: out = self._ftp.login(user, password) if not out.startswith("230"): raise ServerInteractionException( "Something went wrong with the FTP login" ) except ftplib.error_perm: logger.error("FTP login failed due to permission error.") if user is None and password is None: logger.error("Anonymous user is not supported on this FTP server.") else: logger.error( "Provided the wrong credentials for %s (Password: %s)", user, password, ) def _logout(self): """Logs out the active user.""" self._ftp.quit() self._ftp = None
[docs] def perform_interaction(self, action: dict) -> ServerFeedback | None: """Performs an interaction with the FTP server, based on the provided action config. Args: action: action configuration with the following structure type: "server_interaction" service: "ProFTPD" name: "stor" | "retr" source: <collection> | "ProFTPD" (target: <path to file>) user: <username> Returns: ServerFeedback enum for an issue, otherwise None ("No news, are good news") """ try: user = action["user"] self._ftp = ( ftplib.FTP(str(self._domain.ipv4)) if not self._ftps else ftplib.FTP_TLS(str(self._domain.ipv4)) ) self._login(user) if action["name"] == "stor": # in any case there is a file path for the store interaction with open(action["target"], "rb") as file: target_path = pathlib.Path( "/home", user, "Documents", pathlib.Path(action["target"]).name ) self._store_file(target_path, file) self._generated_files[user].append(target_path) elif action["name"] == "retr": if action["source"] == "ProFTPD": try: target_path = random.choice(self._generated_files[user]) except IndexError: logger.warning( "Currently there are no files to retrieve for %s", user ) return ServerFeedback.USER_HAS_NO_FILES action["target"] = ( target_path # this is the path on the FTP server, not the one to the tmp directory ) else: # user provided a file to retrieve # TODO: check if file is present on the FTP server (for now trust user ;) ) target_path = action["target"] self._retrieve_file(target_path) else: raise ValueError(f"Unknown action {action} for FTP server") finally: self._logout() # is always executed "on the way out"
def _store_file(self, target_path: pathlib.Path, file: BinaryIO): """Stores the provided file object on the FTP server. User must be logged in first. Args: target_path: path on the server to save file to (should be absolute and contain the file name) file: handle to the file to be saved """ self._build_dir_structure(target_path) self._ftp.storbinary(f"STOR {target_path}", file) logger.info( "Stored %s on FTP server running on %s", target_path, self._domain.ipv4 ) def _build_dir_structure(self, target_path: pathlib.Path): """Build directory structure on server. Builds the dir structure specified by target_path (omitting a file name, if given). Make sure you have the correct access rights to create all directories along the path. Args: target_path: path to create (with file name or without) """ target_path = target_path.parent if target_path.is_file() else target_path pwd = pathlib.Path(self._ftp.pwd()) path_to_build = target_path.relative_to(pwd) for idx in reversed( range(len(path_to_build.parents) - 1) ): # -1 because of "." in relative path we want to skip dirs = dict(self._ftp.mlsd(str(path_to_build.parents[idx + 1]), ["name"])) # check if the current directory already exists, if yes skip creation if path_to_build.parents[idx].name in dirs: continue self._ftp.mkd(str(path_to_build.parents[idx].name)) # create dir def _retrieve_file( self, path: os.PathLike, return_file: bool = False ) -> None | io.BytesIO: """Retrieve or simulate the retrival of a file from the FTP server. Args: path: path of the file to retrieve return_file: if true, a BytesIO object with the file content is returned. Remember to call close() on it! Returns: A BytesIO object with the file contents or None (to simulate retrieval) """ try: if return_file: file = io.BytesIO() self._ftp.retrbinary(f"RETR {path}", file.write) return file else: with open(os.devnull, "wb") as null: self._ftp.retrbinary(f"RETR {path}", null.write) except ftplib.error_perm as e: if repr(e).endswith("No such file or directory"): logger.error("The file %s does not exist on the server", path) else: raise e
[docs] class SFTPServerInteraction(GenericServerInteraction): """Class to interact with a domain hosting a SFTP server. Attributes: _sftp: SFTP client object _transport: SSH Transport _generated_files: list of files on the server, grouped by users/owners """ _sftp: paramiko.SFTPClient _transport: paramiko.Transport _generated_files = dict[str, list[pathlib.Path]] def __init__(self, server: VirshDomain, config: dict): """Create a SFTPServerInteraction object. Args: server: domain hosting the SFTP server config: plain YAML configuration of the domain """ super().__init__(server, config) self._sftp = None self._transport = None self._generated_files = dict.fromkeys(self._server_config.get("users", []), []) self._extract_login_data() def _login(self, user: str): self._transport = paramiko.Transport((str(self._domain.ipv4), 22)) # Auth self._transport.connect(None, user, self._login_data[user]) self._sftp = paramiko.SFTPClient.from_transport(self._transport) def _logout(self): if self._sftp: self._sftp.close() if self._transport: self._transport.close()
[docs] def perform_interaction(self, action: dict) -> ServerFeedback | None: """Performs an interaction with the SFTP server, based on the provided action config. Args: action: action configuration with the following structure type: "server_interaction" service: "ProFTPD" name: "stor" | "retr" source: <collection> | "ProFTPD" (target: <path to file>) user: <username> Returns: ServerFeedback enum for an issue, otherwise None ("No news, are good news") """ try: user = action["user"] self._login(user) if action["name"] == "stor": # in any case, there is a file path for the store interaction target_path = pathlib.Path( "/home", user, "Documents", pathlib.Path(action["target"]).name ) self._build_dir_structure(target_path, user) self._sftp.put(action["target"], str(target_path)) # use absolute paths self._generated_files[user].append(target_path) elif action["name"] == "retr": if action["source"] == "ProFTPD": try: target_path = random.choice(self._generated_files[user]) except IndexError: logger.warning( "Currently there are no files to retrieve for %s", user ) return ServerFeedback.USER_HAS_NO_FILES action["target"] = target_path else: # user provided a file to retrieve # TODO: check if file is present on the server (for now trust user ;) ) target_path = action["target"] try: self._sftp.get(str(target_path), os.devnull) except OSError: # method compares file sizes to see if transmission was successful. For devnull they always mismatch pass else: raise ValueError(f"Unknown action {action} for SFTP server") finally: self._logout() # is always executed "on the way out"
def _build_dir_structure(self, target_path: pathlib.Path, user: str): """Build directory structure on server. Builds the dir structure specified by target_path (omitting a file name, if given). Make sure you have the correct access rights to create all directories along the path. Args: target_path: path to create (with file name or without) user: username of logged-in user """ target_path = target_path.parent if target_path.is_file() else target_path pwd = ( pathlib.Path(self._sftp.getcwd()) if self._sftp.getcwd() else pathlib.Path("/home", user) ) path_to_build = target_path.relative_to(pwd) for idx in reversed( range(len(path_to_build.parents) - 1) ): # -1 because of "." in relative path we want to skip dirs = self._sftp.listdir(str(path_to_build.parents[idx + 1])) # check if the current directory already exists, if yes skip creation if path_to_build.parents[idx].name in dirs: continue self._sftp.mkdir(str(path_to_build.parents[idx].name)) # create dir