"""
Contains library functions
"""
import json
import logging
import os.path
from typing import Optional
from markupsafe import escape
from galaxy import util
from galaxy.exceptions import (
AdminRequiredException,
ConfigDoesNotAllowException,
ItemAccessibilityException,
ObjectNotFound,
RequestParameterInvalidException,
)
from galaxy.model import (
LibraryDataset,
LibraryFolder,
)
from galaxy.model.base import transaction
from galaxy.tools.actions import upload_common
from galaxy.tools.parameters import populate_state
from galaxy.util.path import (
safe_contains,
safe_relpath,
unsafe_walk,
)
log = logging.getLogger(__name__)
[docs]def validate_server_directory_upload(trans, server_dir):
if server_dir in [None, "None", ""]:
raise RequestParameterInvalidException("Invalid or unspecified server_dir parameter")
if trans.user_is_admin:
import_dir = trans.app.config.library_import_dir
import_dir_desc = "library_import_dir"
if not import_dir:
raise ConfigDoesNotAllowException('"library_import_dir" is not set in the Galaxy configuration')
else:
import_dir = trans.app.config.user_library_import_dir
if not import_dir:
raise ConfigDoesNotAllowException('"user_library_import_dir" is not set in the Galaxy configuration')
if server_dir != trans.user.email:
import_dir = os.path.join(import_dir, trans.user.email)
import_dir_desc = "user_library_import_dir"
full_dir = os.path.join(import_dir, server_dir)
unsafe = None
if safe_relpath(server_dir):
username = trans.user.username if trans.app.config.user_library_import_check_permissions else None
if import_dir_desc == "user_library_import_dir" and safe_contains(
import_dir, full_dir, allowlist=trans.app.config.user_library_import_symlink_allowlist
):
for unsafe in unsafe_walk(
full_dir,
allowlist=[import_dir] + trans.app.config.user_library_import_symlink_allowlist,
username=username,
):
log.error(
"User attempted to import a path that resolves to a path outside of their import dir: %s -> %s",
unsafe,
os.path.realpath(unsafe),
)
else:
log.error(
"User attempted to import a directory path that resolves to a path outside of their import dir: %s -> %s",
server_dir,
os.path.realpath(full_dir),
)
unsafe = True
if unsafe:
raise RequestParameterInvalidException("Invalid server_dir specified")
return full_dir, import_dir_desc
[docs]def validate_path_upload(trans):
if not trans.app.config.allow_library_path_paste:
raise ConfigDoesNotAllowException('"allow_path_paste" is not set to True in the Galaxy configuration file')
if not trans.user_is_admin:
raise AdminRequiredException("Uploading files via filesystem paths can only be performed by administrators")
[docs]class LibraryActions:
"""
Mixin for controllers that provide library functionality.
"""
def _upload_dataset(self, trans, folder_id: int, replace_dataset: Optional[LibraryDataset] = None, **kwd):
# Set up the traditional tool state/params
cntrller = "api"
tool_id = "upload1"
message = None
file_type = kwd.get("file_type")
try:
upload_common.validate_datatype_extension(datatypes_registry=trans.app.datatypes_registry, ext=file_type)
except RequestParameterInvalidException as e:
return (400, util.unicodify(e))
tool = trans.app.toolbox.get_tool(tool_id)
state = tool.new_state(trans)
populate_state(trans, tool.inputs, kwd, state.inputs)
tool_params = state.inputs
dataset_upload_inputs = []
for input in tool.inputs.values():
if input.type == "upload_dataset":
dataset_upload_inputs.append(input)
# Library-specific params
server_dir = kwd.get("server_dir", "")
upload_option = kwd.get("upload_option", "upload_file")
response_code = 200
if upload_option == "upload_directory":
full_dir, import_dir_desc = validate_server_directory_upload(trans, server_dir)
message = "Select a directory"
elif upload_option == "upload_paths":
# Library API already checked this - following check isn't actually needed.
validate_path_upload(trans)
# Some error handling should be added to this method.
try:
# FIXME: instead of passing params here ( which have been processed by util.Params(), the original kwd
# should be passed so that complex objects that may have been included in the initial request remain.
library_bunch = upload_common.handle_library_params(trans, kwd, folder_id, replace_dataset)
except Exception:
response_code = 500
message = "Unable to parse upload parameters, please report this error."
# Proceed with (mostly) regular upload processing if we're still errorless
if response_code == 200:
if upload_option == "upload_file":
tool_params = upload_common.persist_uploads(tool_params, trans)
uploaded_datasets = upload_common.get_uploaded_datasets(
trans, cntrller, tool_params, dataset_upload_inputs, library_bunch=library_bunch
)
elif upload_option == "upload_directory":
uploaded_datasets, response_code, message = self._get_server_dir_uploaded_datasets(
trans, kwd, full_dir, import_dir_desc, library_bunch, response_code, message
)
elif upload_option == "upload_paths":
uploaded_datasets, response_code, message = self._get_path_paste_uploaded_datasets(
trans, kwd, library_bunch, response_code, message
)
if upload_option == "upload_file" and not uploaded_datasets:
response_code = 400
message = "Select a file, enter a URL or enter text"
if response_code != 200:
return (response_code, message)
json_file_path = upload_common.create_paramfile(trans, uploaded_datasets)
data_list = [ud.data for ud in uploaded_datasets]
job_params = {}
job_params["link_data_only"] = json.dumps(kwd.get("link_data_only", "copy_files"))
job_params["uuid"] = json.dumps(kwd.get("uuid", None))
job, output = upload_common.create_job(
trans, tool_params, tool, json_file_path, data_list, folder=library_bunch.folder, job_params=job_params
)
trans.app.job_manager.enqueue(job, tool=tool)
return output
def _get_server_dir_uploaded_datasets(
self, trans, params, full_dir, import_dir_desc, library_bunch, response_code, message
):
dir_response = self._get_server_dir_files(params, full_dir, import_dir_desc)
files = dir_response[0]
if not files:
return dir_response
uploaded_datasets = []
for file in files:
name = os.path.basename(file)
uploaded_datasets.append(
self._make_library_uploaded_dataset(trans, params, name, file, "server_dir", library_bunch)
)
return uploaded_datasets, 200, None
def _get_server_dir_files(self, params, full_dir, import_dir_desc):
files = []
try:
for entry in os.listdir(full_dir):
# Only import regular files
path = os.path.join(full_dir, entry)
link_data_only = params.get("link_data_only", "copy_files")
if os.path.islink(full_dir) and link_data_only == "link_to_files":
# If we're linking instead of copying and the
# sub-"directory" in the import dir is actually a symlink,
# dereference the symlink, but not any of its contents.
link_path = os.readlink(full_dir)
if os.path.isabs(link_path):
path = os.path.join(link_path, entry)
else:
path = os.path.abspath(os.path.join(link_path, entry))
elif os.path.islink(path) and os.path.isfile(path) and link_data_only == "link_to_files":
# If we're linking instead of copying and the "file" in the
# sub-directory of the import dir is actually a symlink,
# dereference the symlink (one dereference only, Vasili).
link_path = os.readlink(path)
if os.path.isabs(link_path):
path = link_path
else:
path = os.path.abspath(os.path.join(os.path.dirname(path), link_path))
if os.path.isfile(path):
files.append(path)
except Exception as e:
message = f"Unable to get file list for configured {import_dir_desc}, error: {util.unicodify(e)}"
response_code = 500
return None, response_code, message
if not files:
message = f"The directory '{full_dir}' contains no valid files"
response_code = 400
return None, response_code, message
return files, None, None
def _get_path_paste_uploaded_datasets(self, trans, params, library_bunch, response_code, message):
preserve_dirs = util.string_as_bool(params.get("preserve_dirs", False))
uploaded_datasets = []
(files_and_folders, _response_code, _message) = self._get_path_files_and_folders(params, preserve_dirs)
if _response_code:
return (uploaded_datasets, _response_code, _message)
for path, name, folder in files_and_folders:
uploaded_datasets.append(
self._make_library_uploaded_dataset(trans, params, name, path, "path_paste", library_bunch, folder)
)
return uploaded_datasets, 200, None
def _get_path_files_and_folders(self, params, preserve_dirs):
if problem_response := self._check_path_paste_params(params):
return problem_response
files_and_folders = []
for line, path in self._paths_list(params):
line_files_and_folders = self._get_single_path_files_and_folders(line, path, preserve_dirs)
files_and_folders.extend(line_files_and_folders)
return files_and_folders, None, None
def _get_single_path_files_and_folders(self, line, path, preserve_dirs):
files_and_folders = []
if os.path.isfile(path):
name = os.path.basename(path)
files_and_folders.append((path, name, None))
for basedir, _dirs, files in os.walk(line):
for file in files:
file_path = os.path.abspath(os.path.join(basedir, file))
if preserve_dirs:
in_folder = os.path.dirname(file_path.replace(path, "", 1).lstrip("/"))
else:
in_folder = None
files_and_folders.append((file_path, file, in_folder))
return files_and_folders
def _paths_list(self, params):
return [
(line.strip(), os.path.abspath(line.strip()))
for line in params.get("filesystem_paths", "").splitlines()
if line.strip()
]
def _check_path_paste_params(self, params):
if params.get("filesystem_paths", "") == "":
message = "No paths entered in the upload form"
response_code = 400
return None, response_code, message
bad_paths = []
for _, path in self._paths_list(params):
if not os.path.exists(path):
bad_paths.append(path)
if bad_paths:
message = 'Invalid paths: "{}".'.format('", "'.join(bad_paths))
response_code = 400
return None, response_code, message
return None
def _make_library_uploaded_dataset(self, trans, params, name, path, type, library_bunch, in_folder=None):
link_data_only = params.get("link_data_only", "copy_files")
uuid_str = params.get("uuid", None)
file_type = params.get("file_type", None)
library_bunch.replace_dataset = None # not valid for these types of upload
uploaded_dataset = util.bunch.Bunch()
new_name = name
# Remove compressed file extensions, if any, but only if
# we're copying files into Galaxy's file space.
if link_data_only == "copy_files":
if new_name.endswith(".gz"):
new_name = new_name.rstrip(".gz")
elif new_name.endswith(".zip"):
new_name = new_name.rstrip(".zip")
uploaded_dataset.name = new_name
uploaded_dataset.path = path
uploaded_dataset.type = type
uploaded_dataset.ext = None
uploaded_dataset.file_type = file_type
uploaded_dataset.dbkey = params.get("dbkey", None)
uploaded_dataset.to_posix_lines = params.get("to_posix_lines", None)
uploaded_dataset.space_to_tab = params.get("space_to_tab", None)
uploaded_dataset.tag_using_filenames = params.get("tag_using_filenames", False)
uploaded_dataset.tags = params.get("tags", None)
uploaded_dataset.purge_source = getattr(trans.app.config, "ftp_upload_purge", True)
if in_folder:
uploaded_dataset.in_folder = in_folder
uploaded_dataset.data = upload_common.new_upload(trans, "api", uploaded_dataset, library_bunch)
uploaded_dataset.link_data_only = link_data_only
uploaded_dataset.uuid = uuid_str
if link_data_only == "link_to_files":
uploaded_dataset.data.link_to(path)
trans.sa_session.add_all((uploaded_dataset.data, uploaded_dataset.data.dataset))
with transaction(trans.sa_session):
trans.sa_session.commit()
return uploaded_dataset
def _create_folder(self, trans, parent_id: int, **kwd):
is_admin = trans.user_is_admin
current_user_roles = trans.get_current_user_roles()
parent_folder = trans.sa_session.get(LibraryFolder, parent_id)
# Check the library which actually contains the user-supplied parent folder, not the user-supplied
# library, which could be anything.
self._check_access(trans, is_admin, parent_folder, current_user_roles)
self._check_add(trans, is_admin, parent_folder, current_user_roles)
new_folder = LibraryFolder(name=kwd.get("name", ""), description=kwd.get("description", ""))
# We are associating the last used genome build with folders, so we will always
# initialize a new folder with the first dbkey in genome builds list which is currently
# ? unspecified (?)
new_folder.genome_build = trans.app.genome_builds.default_value
parent_folder.add_folder(new_folder)
trans.sa_session.add(new_folder)
with transaction(trans.sa_session):
trans.sa_session.commit()
# New folders default to having the same permissions as their parent folder
trans.app.security_agent.copy_library_permissions(trans, parent_folder, new_folder)
return 200, dict(created=new_folder)
def _check_access(self, trans, is_admin, item, current_user_roles):
if isinstance(item, trans.model.HistoryDatasetAssociation):
# Make sure the user has the DATASET_ACCESS permission on the history_dataset_association.
if not item:
message = f"Invalid history dataset ({escape(str(item))}) specified."
raise ObjectNotFound(message)
elif (
not trans.app.security_agent.can_access_dataset(current_user_roles, item.dataset)
and item.user == trans.user
):
message = f"You do not have permission to access the history dataset with id ({str(item.id)})."
raise ItemAccessibilityException(message)
else:
# Make sure the user has the LIBRARY_ACCESS permission on the library item.
if not item:
message = f"Invalid library item ({escape(str(item))}) specified."
raise ObjectNotFound(message)
elif not (
is_admin or trans.app.security_agent.can_access_library_item(current_user_roles, item, trans.user)
):
if isinstance(item, trans.model.Library):
item_type = "data library"
elif isinstance(item, LibraryFolder):
item_type = "folder"
else:
item_type = "(unknown item type)"
message = f"You do not have permission to access the {escape(item_type)} with id ({str(item.id)})."
raise ItemAccessibilityException(message)
def _check_add(self, trans, is_admin, item, current_user_roles):
# Deny access if the user is not an admin and does not have the LIBRARY_ADD permission.
if not (is_admin or trans.app.security_agent.can_add_library_item(current_user_roles, item)):
message = f"You are not authorized to add an item to ({escape(item.name)})."
raise ItemAccessibilityException(message)