"""
Contains library functions
"""
import json
import logging
import os.path
from markupsafe import escape
from galaxy import (
exceptions,
util,
)
from galaxy.managers.collections_util import (
api_payload_to_create_params,
dictify_dataset_collection_instance,
)
from galaxy.model import LibraryFolder
from galaxy.model.base import transaction
from galaxy.tools.actions import upload_common
from galaxy.tools.parameters import populate_state
from galaxy.util.path import (
safe_contains,
safe_relpath,
unsafe_walk,
)
log = logging.getLogger(__name__)
[docs]def validate_server_directory_upload(trans, server_dir):
if server_dir in [None, "None", ""]:
raise exceptions.RequestParameterInvalidException("Invalid or unspecified server_dir parameter")
if trans.user_is_admin:
import_dir = trans.app.config.library_import_dir
import_dir_desc = "library_import_dir"
if not import_dir:
raise exceptions.ConfigDoesNotAllowException('"library_import_dir" is not set in the Galaxy configuration')
else:
import_dir = trans.app.config.user_library_import_dir
if not import_dir:
raise exceptions.ConfigDoesNotAllowException(
'"user_library_import_dir" is not set in the Galaxy configuration'
)
if server_dir != trans.user.email:
import_dir = os.path.join(import_dir, trans.user.email)
import_dir_desc = "user_library_import_dir"
full_dir = os.path.join(import_dir, server_dir)
unsafe = None
if safe_relpath(server_dir):
username = trans.user.username if trans.app.config.user_library_import_check_permissions else None
if import_dir_desc == "user_library_import_dir" and safe_contains(
import_dir, full_dir, allowlist=trans.app.config.user_library_import_symlink_allowlist
):
for unsafe in unsafe_walk(
full_dir,
allowlist=[import_dir] + trans.app.config.user_library_import_symlink_allowlist,
username=username,
):
log.error(
"User attempted to import a path that resolves to a path outside of their import dir: %s -> %s",
unsafe,
os.path.realpath(unsafe),
)
else:
log.error(
"User attempted to import a directory path that resolves to a path outside of their import dir: %s -> %s",
server_dir,
os.path.realpath(full_dir),
)
unsafe = True
if unsafe:
raise exceptions.RequestParameterInvalidException("Invalid server_dir specified")
return full_dir, import_dir_desc
[docs]def validate_path_upload(trans):
if not trans.app.config.allow_library_path_paste:
raise exceptions.ConfigDoesNotAllowException(
'"allow_path_paste" is not set to True in the Galaxy configuration file'
)
if not trans.user_is_admin:
raise exceptions.AdminRequiredException(
"Uploading files via filesystem paths can only be performed by administrators"
)
[docs]class LibraryActions:
"""
Mixin for controllers that provide library functionality.
"""
def _upload_dataset(self, trans, folder_id: int, payload):
# Set up the traditional tool state/params
cntrller = "api"
tool_id = "upload1"
upload_common.validate_datatype_extension(
datatypes_registry=trans.app.datatypes_registry, ext=payload.file_type
)
tool = trans.app.toolbox.get_tool(tool_id)
state = tool.new_state(trans)
populate_state(trans, tool.inputs, payload.model_dump(), state.inputs)
tool_params = state.inputs
dataset_upload_inputs = []
for input in tool.inputs.values():
if input.type == "upload_dataset":
dataset_upload_inputs.append(input)
# Library-specific params
if payload.upload_option == "upload_directory":
full_dir, import_dir_desc = validate_server_directory_upload(trans, payload.server_dir)
elif payload.upload_option == "upload_paths":
# Library API already checked this - following check isn't actually needed.
validate_path_upload(trans)
# Some error handling should be added to this method.
try:
# FIXME: instead of passing params here ( which have been processed by util.Params(), the original payload
# should be passed so that complex objects that may have been included in the initial request remain.
library_bunch = upload_common.handle_library_params(trans, payload.model_dump(), folder_id, None)
except Exception:
raise exceptions.InvalidFileFormatError("Invalid folder specified")
# Proceed with (mostly) regular upload processing if we're still errorless
if payload.upload_option == "upload_file":
if payload.upload_files:
for i, upload_dataset in enumerate(tool_params["files"]):
upload_dataset["file_data"] = payload.upload_files[i]
tool_params = upload_common.persist_uploads(tool_params, trans)
uploaded_datasets = upload_common.get_uploaded_datasets(
trans, cntrller, tool_params, dataset_upload_inputs, library_bunch=library_bunch
)
elif payload.upload_option == "upload_directory":
uploaded_datasets = self._get_server_dir_uploaded_datasets(
trans, payload, full_dir, import_dir_desc, library_bunch
)
elif payload.upload_option == "upload_paths":
uploaded_datasets, response_code, message = self._get_path_paste_uploaded_datasets(
trans, payload.model_dump(), library_bunch, 200, None
)
if response_code != 200:
raise exceptions.RequestParameterInvalidException(message)
if payload.upload_option == "upload_file" and not uploaded_datasets:
raise exceptions.RequestParameterInvalidException("Select a file, enter a URL or enter text")
json_file_path = upload_common.create_paramfile(trans, uploaded_datasets)
data_list = [ud.data for ud in uploaded_datasets]
job_params = {}
job_params["link_data_only"] = json.dumps(payload.link_data_only)
job_params["uuid"] = json.dumps(payload.uuid)
job, output = upload_common.create_job(
trans, tool_params, tool, json_file_path, data_list, folder=library_bunch.folder, job_params=job_params
)
trans.app.job_manager.enqueue(job, tool=tool)
if not output:
raise exceptions.RequestParameterInvalidException("Upload failed")
return output
def _get_server_dir_uploaded_datasets(self, trans, payload, full_dir, import_dir_desc, library_bunch):
files = self._get_server_dir_files(payload, full_dir, import_dir_desc)
uploaded_datasets = []
for file in files:
name = os.path.basename(file)
uploaded_datasets.append(
self._make_library_uploaded_dataset(
trans, payload.model_dump(), name, file, "server_dir", library_bunch
)
)
return uploaded_datasets
def _get_server_dir_files(self, payload, full_dir, import_dir_desc):
files = []
try:
for entry in os.listdir(full_dir):
# Only import regular files
path = os.path.join(full_dir, entry)
if os.path.islink(full_dir) and payload.link_data_only == "link_to_files":
# If we're linking instead of copying and the
# sub-"directory" in the import dir is actually a symlink,
# dereference the symlink, but not any of its contents.
link_path = os.readlink(full_dir)
if os.path.isabs(link_path):
path = os.path.join(link_path, entry)
else:
path = os.path.abspath(os.path.join(link_path, entry))
elif os.path.islink(path) and os.path.isfile(path) and payload.link_data_only == "link_to_files":
# If we're linking instead of copying and the "file" in the
# sub-directory of the import dir is actually a symlink,
# dereference the symlink (one dereference only, Vasili).
link_path = os.readlink(path)
if os.path.isabs(link_path):
path = link_path
else:
path = os.path.abspath(os.path.join(os.path.dirname(path), link_path))
if os.path.isfile(path):
files.append(path)
except Exception as e:
raise exceptions.InternalServerError(
f"Unable to get file list for configured {import_dir_desc}, error: {util.unicodify(e)}"
)
if not files:
raise exceptions.ObjectAttributeMissingException(f"The directory '{full_dir}' contains no valid files")
return files
def _get_path_paste_uploaded_datasets(self, trans, params, library_bunch, response_code, message):
preserve_dirs = util.string_as_bool(params.get("preserve_dirs", False))
uploaded_datasets = []
(files_and_folders, _response_code, _message) = self._get_path_files_and_folders(params, preserve_dirs)
if _response_code:
return (uploaded_datasets, _response_code, _message)
for path, name, folder in files_and_folders:
uploaded_datasets.append(
self._make_library_uploaded_dataset(trans, params, name, path, "path_paste", library_bunch, folder)
)
return uploaded_datasets, 200, None
def _get_path_files_and_folders(self, params, preserve_dirs):
if problem_response := self._check_path_paste_params(params):
return problem_response
files_and_folders = []
for line, path in self._paths_list(params):
line_files_and_folders = self._get_single_path_files_and_folders(line, path, preserve_dirs)
files_and_folders.extend(line_files_and_folders)
return files_and_folders, None, None
def _get_single_path_files_and_folders(self, line, path, preserve_dirs):
files_and_folders = []
if os.path.isfile(path):
name = os.path.basename(path)
files_and_folders.append((path, name, None))
for basedir, _dirs, files in os.walk(line):
for file in files:
file_path = os.path.abspath(os.path.join(basedir, file))
if preserve_dirs:
in_folder = os.path.dirname(file_path.replace(path, "", 1).lstrip("/"))
else:
in_folder = None
files_and_folders.append((file_path, file, in_folder))
return files_and_folders
def _paths_list(self, params):
return [
(line.strip(), os.path.abspath(line.strip()))
for line in params.get("filesystem_paths", "").splitlines()
if line.strip()
]
def _check_path_paste_params(self, params):
if params.get("filesystem_paths", "") == "":
message = "No paths entered in the upload form"
response_code = 400
return None, response_code, message
bad_paths = []
for _, path in self._paths_list(params):
if not os.path.exists(path):
bad_paths.append(path)
if bad_paths:
message = 'Invalid paths: "{}".'.format('", "'.join(bad_paths))
response_code = 400
return None, response_code, message
return None
def _make_library_uploaded_dataset(self, trans, params, name, path, type, library_bunch, in_folder=None):
link_data_only = params.get("link_data_only", "copy_files")
uuid_str = params.get("uuid", None)
file_type = params.get("file_type", None)
library_bunch.replace_dataset = None # not valid for these types of upload
uploaded_dataset = util.bunch.Bunch()
new_name = name
# Remove compressed file extensions, if any, but only if
# we're copying files into Galaxy's file space.
if link_data_only == "copy_files":
if new_name.endswith(".gz"):
new_name = new_name.rstrip(".gz")
elif new_name.endswith(".zip"):
new_name = new_name.rstrip(".zip")
uploaded_dataset.name = new_name
uploaded_dataset.path = path
uploaded_dataset.type = type
uploaded_dataset.ext = None
uploaded_dataset.file_type = file_type
uploaded_dataset.dbkey = params.get("dbkey", None)
uploaded_dataset.to_posix_lines = params.get("to_posix_lines", None)
uploaded_dataset.space_to_tab = params.get("space_to_tab", None)
uploaded_dataset.tag_using_filenames = params.get("tag_using_filenames", False)
uploaded_dataset.tags = params.get("tags", None)
uploaded_dataset.purge_source = getattr(trans.app.config, "ftp_upload_purge", True)
if in_folder:
uploaded_dataset.in_folder = in_folder
uploaded_dataset.data = upload_common.new_upload(trans, "api", uploaded_dataset, library_bunch)
uploaded_dataset.link_data_only = link_data_only
uploaded_dataset.uuid = uuid_str
if link_data_only == "link_to_files":
uploaded_dataset.data.link_to(path)
trans.sa_session.add_all((uploaded_dataset.data, uploaded_dataset.data.dataset))
with transaction(trans.sa_session):
trans.sa_session.commit()
return uploaded_dataset
def _upload_library_dataset(self, trans, payload):
is_admin = trans.user_is_admin
current_user_roles = trans.get_current_user_roles()
folder = trans.sa_session.get(LibraryFolder, payload.folder_id)
if not folder:
raise exceptions.RequestParameterInvalidException("Invalid folder id specified.")
self._check_access(trans, is_admin, folder, current_user_roles)
self._check_add(trans, is_admin, folder, current_user_roles)
if payload.roles:
# Check to see if the user selected roles to associate with the DATASET_ACCESS permission
# on the dataset that would cause accessibility issues.
vars = dict(DATASET_ACCESS_in=payload.roles)
permissions, in_roles, error, message = trans.app.security_agent.derive_roles_from_access(
trans, folder.parent_library.id, "api", library=True, **vars
)
if error:
raise exceptions.RequestParameterInvalidException(message)
created_outputs_dict = self._upload_dataset(trans, folder.id, payload)
return created_outputs_dict
def _create_folder(self, trans, payload):
is_admin = trans.user_is_admin
current_user_roles = trans.get_current_user_roles()
parent_folder = trans.sa_session.get(LibraryFolder, payload.folder_id)
if not parent_folder:
raise exceptions.RequestParameterInvalidException("Invalid folder id specified.")
# Check the library which actually contains the user-supplied parent folder, not the user-supplied
# library, which could be anything.
self._check_access(trans, is_admin, parent_folder, current_user_roles)
self._check_add(trans, is_admin, parent_folder, current_user_roles)
new_folder = LibraryFolder(name=payload.name, description=payload.description)
# We are associating the last used genome build with folders, so we will always
# initialize a new folder with the first dbkey in genome builds list which is currently
# ? unspecified (?)
new_folder.genome_build = trans.app.genome_builds.default_value
parent_folder.add_folder(new_folder)
trans.sa_session.add(new_folder)
with transaction(trans.sa_session):
trans.sa_session.commit()
# New folders default to having the same permissions as their parent folder
trans.app.security_agent.copy_library_permissions(trans, parent_folder, new_folder)
new_folder_dict = dict(created=new_folder)
return new_folder_dict
def _create_collection(self, trans, payload, parent):
# Not delegating to library_common, so need to check access to parent folder here.
self.check_user_can_add_to_library_item(trans, parent, check_accessible=True)
create_params = api_payload_to_create_params(payload.model_dump())
# collection_manager.create needs trans as one of the params
create_params["trans"] = trans
create_params["parent"] = parent
dataset_collection_instance = self.collection_manager.create(**create_params)
dataset_collection = dictify_dataset_collection_instance(
dataset_collection_instance, security=trans.security, url_builder=trans.url_builder, parent=parent
)
return [dataset_collection]
def _check_access(self, trans, is_admin, item, current_user_roles):
if isinstance(item, trans.model.HistoryDatasetAssociation):
# Make sure the user has the DATASET_ACCESS permission on the history_dataset_association.
if not item:
message = f"Invalid history dataset ({escape(str(item))}) specified."
raise exceptions.ObjectNotFound(message)
elif (
not trans.app.security_agent.can_access_dataset(current_user_roles, item.dataset)
and item.user == trans.user
):
message = f"You do not have permission to access the history dataset with id ({str(item.id)})."
raise exceptions.ItemAccessibilityException(message)
else:
# Make sure the user has the LIBRARY_ACCESS permission on the library item.
if not item:
message = f"Invalid library item ({escape(str(item))}) specified."
raise exceptions.ObjectNotFound(message)
elif not (
is_admin or trans.app.security_agent.can_access_library_item(current_user_roles, item, trans.user)
):
if isinstance(item, trans.model.Library):
item_type = "data library"
elif isinstance(item, LibraryFolder):
item_type = "folder"
else:
item_type = "(unknown item type)"
message = f"You do not have permission to access the {escape(item_type)} with id ({str(item.id)})."
raise exceptions.ItemAccessibilityException(message)
def _check_add(self, trans, is_admin, item, current_user_roles):
# Deny access if the user is not an admin and does not have the LIBRARY_ADD permission.
if not (is_admin or trans.app.security_agent.can_add_library_item(current_user_roles, item)):
message = f"You are not authorized to add an item to ({escape(item.name)})."
raise exceptions.ItemAccessibilityException(message)