Source code for fortrace.utility.server_interaction.ftp_interaction
import ftplib
import io
import os
import pathlib
import random
from typing import BinaryIO
import paramiko
from fortrace.core.virsh_domain import VirshDomain
from fortrace.utility.exceptions import ServerInteractionException
from fortrace.utility.server_interaction.server_interaction import (
GenericServerInteraction,
ServerFeedback,
logger,
)
[docs]
class FTPServerInteraction(GenericServerInteraction):
"""Class for interaction with a domain hosting an FTP server.
Attributes:
_generated_files: list of files on the server, grouped by users/owners
_ftp: handle to FTP/FTPS connection
"""
_generated_files = dict[str, list[pathlib.Path]]
_ftp: ftplib.FTP | ftplib.FTP_TLS
def __init__(self, server: VirshDomain, config: dict, ftps: bool = False):
"""Create an object for interaction with an FTP server
Args:
server: handle to the server domain
config: configuration of the server domain (plain config, no subsection of it)
ftps: whether to use FTP over TLS
"""
super().__init__(server, config)
self._ftp = None
self._ftps = ftps
self._generated_files = dict.fromkeys(self._server_config.get("users", []), [])
self._extract_login_data()
def _login(self, user: str | None):
"""Log in as specific user.
Will try to find the password for the user from config. Throws RuntimeError if login is unsuccessful.
Args:
user: username
"""
password = self._login_data[user]
try:
out = self._ftp.login(user, password)
if not out.startswith("230"):
raise ServerInteractionException(
"Something went wrong with the FTP login"
)
except ftplib.error_perm:
logger.error("FTP login failed due to permission error.")
if user is None and password is None:
logger.error("Anonymous user is not supported on this FTP server.")
else:
logger.error(
"Provided the wrong credentials for %s (Password: %s)",
user,
password,
)
def _logout(self):
"""Logs out the active user."""
self._ftp.quit()
self._ftp = None
[docs]
def perform_interaction(self, action: dict) -> ServerFeedback | None:
"""Performs an interaction with the FTP server, based on the provided action config.
Args:
action: action configuration with the following structure
type: "server_interaction"
service: "ProFTPD"
name: "stor" | "retr"
source: <collection> | "ProFTPD"
(target: <path to file>)
user: <username>
Returns:
ServerFeedback enum for an issue, otherwise None ("No news, are good news")
"""
try:
user = action["user"]
self._ftp = (
ftplib.FTP(str(self._domain.ipv4))
if not self._ftps
else ftplib.FTP_TLS(str(self._domain.ipv4))
)
self._login(user)
if action["name"] == "stor":
# in any case there is a file path for the store interaction
with open(action["target"], "rb") as file:
target_path = pathlib.Path(
"/home", user, "Documents", pathlib.Path(action["target"]).name
)
self._store_file(target_path, file)
self._generated_files[user].append(target_path)
elif action["name"] == "retr":
if action["source"] == "ProFTPD":
try:
target_path = random.choice(self._generated_files[user])
except IndexError:
logger.warning(
"Currently there are no files to retrieve for %s", user
)
return ServerFeedback.USER_HAS_NO_FILES
action["target"] = (
target_path
# this is the path on the FTP server, not the one to the tmp directory
)
else:
# user provided a file to retrieve
# TODO: check if file is present on the FTP server (for now trust user ;) )
target_path = action["target"]
self._retrieve_file(target_path)
else:
raise ValueError(f"Unknown action {action} for FTP server")
finally:
self._logout() # is always executed "on the way out"
def _store_file(self, target_path: pathlib.Path, file: BinaryIO):
"""Stores the provided file object on the FTP server. User must be logged in first.
Args:
target_path: path on the server to save file to (should be absolute and contain the file name)
file: handle to the file to be saved
"""
self._build_dir_structure(target_path)
self._ftp.storbinary(f"STOR {target_path}", file)
logger.info(
"Stored %s on FTP server running on %s", target_path, self._domain.ipv4
)
def _build_dir_structure(self, target_path: pathlib.Path):
"""Build directory structure on server.
Builds the dir structure specified by target_path (omitting a file name, if given). Make sure you have the
correct access rights to create all directories along the path.
Args:
target_path: path to create (with file name or without)
"""
target_path = target_path.parent if target_path.is_file() else target_path
pwd = pathlib.Path(self._ftp.pwd())
path_to_build = target_path.relative_to(pwd)
for idx in reversed(
range(len(path_to_build.parents) - 1)
): # -1 because of "." in relative path we want to skip
dirs = dict(self._ftp.mlsd(str(path_to_build.parents[idx + 1]), ["name"]))
# check if the current directory already exists, if yes skip creation
if path_to_build.parents[idx].name in dirs:
continue
self._ftp.mkd(str(path_to_build.parents[idx].name)) # create dir
def _retrieve_file(
self, path: os.PathLike, return_file: bool = False
) -> None | io.BytesIO:
"""Retrieve or simulate the retrival of a file from the FTP server.
Args:
path: path of the file to retrieve
return_file: if true, a BytesIO object with the file content is returned. Remember to call close() on it!
Returns:
A BytesIO object with the file contents or None (to simulate retrieval)
"""
try:
if return_file:
file = io.BytesIO()
self._ftp.retrbinary(f"RETR {path}", file.write)
return file
else:
with open(os.devnull, "wb") as null:
self._ftp.retrbinary(f"RETR {path}", null.write)
except ftplib.error_perm as e:
if repr(e).endswith("No such file or directory"):
logger.error("The file %s does not exist on the server", path)
else:
raise e
[docs]
class SFTPServerInteraction(GenericServerInteraction):
"""Class to interact with a domain hosting a SFTP server.
Attributes:
_sftp: SFTP client object
_transport: SSH Transport
_generated_files: list of files on the server, grouped by users/owners
"""
_sftp: paramiko.SFTPClient
_transport: paramiko.Transport
_generated_files = dict[str, list[pathlib.Path]]
def __init__(self, server: VirshDomain, config: dict):
"""Create a SFTPServerInteraction object.
Args:
server: domain hosting the SFTP server
config: plain YAML configuration of the domain
"""
super().__init__(server, config)
self._sftp = None
self._transport = None
self._generated_files = dict.fromkeys(self._server_config.get("users", []), [])
self._extract_login_data()
def _login(self, user: str):
self._transport = paramiko.Transport((str(self._domain.ipv4), 22))
# Auth
self._transport.connect(None, user, self._login_data[user])
self._sftp = paramiko.SFTPClient.from_transport(self._transport)
def _logout(self):
if self._sftp:
self._sftp.close()
if self._transport:
self._transport.close()
[docs]
def perform_interaction(self, action: dict) -> ServerFeedback | None:
"""Performs an interaction with the SFTP server, based on the provided action config.
Args:
action: action configuration with the following structure
type: "server_interaction"
service: "ProFTPD"
name: "stor" | "retr"
source: <collection> | "ProFTPD"
(target: <path to file>)
user: <username>
Returns:
ServerFeedback enum for an issue, otherwise None ("No news, are good news")
"""
try:
user = action["user"]
self._login(user)
if action["name"] == "stor":
# in any case, there is a file path for the store interaction
target_path = pathlib.Path(
"/home", user, "Documents", pathlib.Path(action["target"]).name
)
self._build_dir_structure(target_path, user)
self._sftp.put(action["target"], str(target_path)) # use absolute paths
self._generated_files[user].append(target_path)
elif action["name"] == "retr":
if action["source"] == "ProFTPD":
try:
target_path = random.choice(self._generated_files[user])
except IndexError:
logger.warning(
"Currently there are no files to retrieve for %s", user
)
return ServerFeedback.USER_HAS_NO_FILES
action["target"] = target_path
else:
# user provided a file to retrieve
# TODO: check if file is present on the server (for now trust user ;) )
target_path = action["target"]
try:
self._sftp.get(str(target_path), os.devnull)
except OSError:
# method compares file sizes to see if transmission was successful. For devnull they always mismatch
pass
else:
raise ValueError(f"Unknown action {action} for SFTP server")
finally:
self._logout() # is always executed "on the way out"
def _build_dir_structure(self, target_path: pathlib.Path, user: str):
"""Build directory structure on server.
Builds the dir structure specified by target_path (omitting a file name, if given). Make sure you have the
correct access rights to create all directories along the path.
Args:
target_path: path to create (with file name or without)
user: username of logged-in user
"""
target_path = target_path.parent if target_path.is_file() else target_path
pwd = (
pathlib.Path(self._sftp.getcwd())
if self._sftp.getcwd()
else pathlib.Path("/home", user)
)
path_to_build = target_path.relative_to(pwd)
for idx in reversed(
range(len(path_to_build.parents) - 1)
): # -1 because of "." in relative path we want to skip
dirs = self._sftp.listdir(str(path_to_build.parents[idx + 1]))
# check if the current directory already exists, if yes skip creation
if path_to_build.parents[idx].name in dirs:
continue
self._sftp.mkdir(str(path_to_build.parents[idx].name)) # create dir