Source code for galaxy.actions.library

"""
Contains library functions
"""

import json
import logging
import os.path

from markupsafe import escape

from galaxy import (
    exceptions,
    util,
)
from galaxy.managers.collections_util import (
    api_payload_to_create_params,
    dictify_dataset_collection_instance,
)
from galaxy.model import LibraryFolder
from galaxy.model.base import transaction
from galaxy.tools.actions import upload_common
from galaxy.tools.parameters import populate_state
from galaxy.util.path import (
    safe_contains,
    safe_relpath,
    unsafe_walk,
)

log = logging.getLogger(__name__)


[docs]def validate_server_directory_upload(trans, server_dir): if server_dir in [None, "None", ""]: raise exceptions.RequestParameterInvalidException("Invalid or unspecified server_dir parameter") if trans.user_is_admin: import_dir = trans.app.config.library_import_dir import_dir_desc = "library_import_dir" if not import_dir: raise exceptions.ConfigDoesNotAllowException('"library_import_dir" is not set in the Galaxy configuration') else: import_dir = trans.app.config.user_library_import_dir if not import_dir: raise exceptions.ConfigDoesNotAllowException( '"user_library_import_dir" is not set in the Galaxy configuration' ) if server_dir != trans.user.email: import_dir = os.path.join(import_dir, trans.user.email) import_dir_desc = "user_library_import_dir" full_dir = os.path.join(import_dir, server_dir) unsafe = None if safe_relpath(server_dir): username = trans.user.username if trans.app.config.user_library_import_check_permissions else None if import_dir_desc == "user_library_import_dir" and safe_contains( import_dir, full_dir, allowlist=trans.app.config.user_library_import_symlink_allowlist ): for unsafe in unsafe_walk( full_dir, allowlist=[import_dir] + trans.app.config.user_library_import_symlink_allowlist, username=username, ): log.error( "User attempted to import a path that resolves to a path outside of their import dir: %s -> %s", unsafe, os.path.realpath(unsafe), ) else: log.error( "User attempted to import a directory path that resolves to a path outside of their import dir: %s -> %s", server_dir, os.path.realpath(full_dir), ) unsafe = True if unsafe: raise exceptions.RequestParameterInvalidException("Invalid server_dir specified") return full_dir, import_dir_desc
[docs]def validate_path_upload(trans): if not trans.app.config.allow_library_path_paste: raise exceptions.ConfigDoesNotAllowException( '"allow_path_paste" is not set to True in the Galaxy configuration file' ) if not trans.user_is_admin: raise exceptions.AdminRequiredException( "Uploading files via filesystem paths can only be performed by administrators" )
[docs]class LibraryActions: """ Mixin for controllers that provide library functionality. """ def _upload_dataset(self, trans, folder_id: int, payload): # Set up the traditional tool state/params cntrller = "api" tool_id = "upload1" upload_common.validate_datatype_extension( datatypes_registry=trans.app.datatypes_registry, ext=payload.file_type ) tool = trans.app.toolbox.get_tool(tool_id) state = tool.new_state(trans) populate_state(trans, tool.inputs, payload.model_dump(), state.inputs) tool_params = state.inputs dataset_upload_inputs = [] for input in tool.inputs.values(): if input.type == "upload_dataset": dataset_upload_inputs.append(input) # Library-specific params if payload.upload_option == "upload_directory": full_dir, import_dir_desc = validate_server_directory_upload(trans, payload.server_dir) elif payload.upload_option == "upload_paths": # Library API already checked this - following check isn't actually needed. validate_path_upload(trans) # Some error handling should be added to this method. try: # FIXME: instead of passing params here ( which have been processed by util.Params(), the original payload # should be passed so that complex objects that may have been included in the initial request remain. library_bunch = upload_common.handle_library_params(trans, payload.model_dump(), folder_id, None) except Exception: raise exceptions.InvalidFileFormatError("Invalid folder specified") # Proceed with (mostly) regular upload processing if we're still errorless if payload.upload_option == "upload_file": if payload.upload_files: for i, upload_dataset in enumerate(tool_params["files"]): upload_dataset["file_data"] = payload.upload_files[i] tool_params = upload_common.persist_uploads(tool_params, trans) uploaded_datasets = upload_common.get_uploaded_datasets( trans, cntrller, tool_params, dataset_upload_inputs, library_bunch=library_bunch ) elif payload.upload_option == "upload_directory": uploaded_datasets = self._get_server_dir_uploaded_datasets( trans, payload, full_dir, import_dir_desc, library_bunch ) elif payload.upload_option == "upload_paths": uploaded_datasets, response_code, message = self._get_path_paste_uploaded_datasets( trans, payload.model_dump(), library_bunch, 200, None ) if response_code != 200: raise exceptions.RequestParameterInvalidException(message) if payload.upload_option == "upload_file" and not uploaded_datasets: raise exceptions.RequestParameterInvalidException("Select a file, enter a URL or enter text") json_file_path = upload_common.create_paramfile(trans, uploaded_datasets) data_list = [ud.data for ud in uploaded_datasets] job_params = {} job_params["link_data_only"] = json.dumps(payload.link_data_only) job_params["uuid"] = json.dumps(payload.uuid) job, output = upload_common.create_job( trans, tool_params, tool, json_file_path, data_list, folder=library_bunch.folder, job_params=job_params ) trans.app.job_manager.enqueue(job, tool=tool) if not output: raise exceptions.RequestParameterInvalidException("Upload failed") return output def _get_server_dir_uploaded_datasets(self, trans, payload, full_dir, import_dir_desc, library_bunch): files = self._get_server_dir_files(payload, full_dir, import_dir_desc) uploaded_datasets = [] for file in files: name = os.path.basename(file) uploaded_datasets.append( self._make_library_uploaded_dataset( trans, payload.model_dump(), name, file, "server_dir", library_bunch ) ) return uploaded_datasets def _get_server_dir_files(self, payload, full_dir, import_dir_desc): files = [] try: for entry in os.listdir(full_dir): # Only import regular files path = os.path.join(full_dir, entry) if os.path.islink(full_dir) and payload.link_data_only == "link_to_files": # If we're linking instead of copying and the # sub-"directory" in the import dir is actually a symlink, # dereference the symlink, but not any of its contents. link_path = os.readlink(full_dir) if os.path.isabs(link_path): path = os.path.join(link_path, entry) else: path = os.path.abspath(os.path.join(link_path, entry)) elif os.path.islink(path) and os.path.isfile(path) and payload.link_data_only == "link_to_files": # If we're linking instead of copying and the "file" in the # sub-directory of the import dir is actually a symlink, # dereference the symlink (one dereference only, Vasili). link_path = os.readlink(path) if os.path.isabs(link_path): path = link_path else: path = os.path.abspath(os.path.join(os.path.dirname(path), link_path)) if os.path.isfile(path): files.append(path) except Exception as e: raise exceptions.InternalServerError( f"Unable to get file list for configured {import_dir_desc}, error: {util.unicodify(e)}" ) if not files: raise exceptions.ObjectAttributeMissingException(f"The directory '{full_dir}' contains no valid files") return files def _get_path_paste_uploaded_datasets(self, trans, params, library_bunch, response_code, message): preserve_dirs = util.string_as_bool(params.get("preserve_dirs", False)) uploaded_datasets = [] (files_and_folders, _response_code, _message) = self._get_path_files_and_folders(params, preserve_dirs) if _response_code: return (uploaded_datasets, _response_code, _message) for path, name, folder in files_and_folders: uploaded_datasets.append( self._make_library_uploaded_dataset(trans, params, name, path, "path_paste", library_bunch, folder) ) return uploaded_datasets, 200, None def _get_path_files_and_folders(self, params, preserve_dirs): if problem_response := self._check_path_paste_params(params): return problem_response files_and_folders = [] for line, path in self._paths_list(params): line_files_and_folders = self._get_single_path_files_and_folders(line, path, preserve_dirs) files_and_folders.extend(line_files_and_folders) return files_and_folders, None, None def _get_single_path_files_and_folders(self, line, path, preserve_dirs): files_and_folders = [] if os.path.isfile(path): name = os.path.basename(path) files_and_folders.append((path, name, None)) for basedir, _dirs, files in os.walk(line): for file in files: file_path = os.path.abspath(os.path.join(basedir, file)) if preserve_dirs: in_folder = os.path.dirname(file_path.replace(path, "", 1).lstrip("/")) else: in_folder = None files_and_folders.append((file_path, file, in_folder)) return files_and_folders def _paths_list(self, params): return [ (line.strip(), os.path.abspath(line.strip())) for line in params.get("filesystem_paths", "").splitlines() if line.strip() ] def _check_path_paste_params(self, params): if params.get("filesystem_paths", "") == "": message = "No paths entered in the upload form" response_code = 400 return None, response_code, message bad_paths = [] for _, path in self._paths_list(params): if not os.path.exists(path): bad_paths.append(path) if bad_paths: message = 'Invalid paths: "{}".'.format('", "'.join(bad_paths)) response_code = 400 return None, response_code, message return None def _make_library_uploaded_dataset(self, trans, params, name, path, type, library_bunch, in_folder=None): link_data_only = params.get("link_data_only", "copy_files") uuid_str = params.get("uuid", None) file_type = params.get("file_type", None) library_bunch.replace_dataset = None # not valid for these types of upload uploaded_dataset = util.bunch.Bunch() new_name = name # Remove compressed file extensions, if any, but only if # we're copying files into Galaxy's file space. if link_data_only == "copy_files": if new_name.endswith(".gz"): new_name = new_name.rstrip(".gz") elif new_name.endswith(".zip"): new_name = new_name.rstrip(".zip") uploaded_dataset.name = new_name uploaded_dataset.path = path uploaded_dataset.type = type uploaded_dataset.ext = None uploaded_dataset.file_type = file_type uploaded_dataset.dbkey = params.get("dbkey", None) uploaded_dataset.to_posix_lines = params.get("to_posix_lines", None) uploaded_dataset.space_to_tab = params.get("space_to_tab", None) uploaded_dataset.tag_using_filenames = params.get("tag_using_filenames", False) uploaded_dataset.tags = params.get("tags", None) uploaded_dataset.purge_source = getattr(trans.app.config, "ftp_upload_purge", True) if in_folder: uploaded_dataset.in_folder = in_folder uploaded_dataset.data = upload_common.new_upload(trans, "api", uploaded_dataset, library_bunch) uploaded_dataset.link_data_only = link_data_only uploaded_dataset.uuid = uuid_str if link_data_only == "link_to_files": uploaded_dataset.data.link_to(path) trans.sa_session.add_all((uploaded_dataset.data, uploaded_dataset.data.dataset)) with transaction(trans.sa_session): trans.sa_session.commit() return uploaded_dataset def _upload_library_dataset(self, trans, payload): is_admin = trans.user_is_admin current_user_roles = trans.get_current_user_roles() folder = trans.sa_session.get(LibraryFolder, payload.folder_id) if not folder: raise exceptions.RequestParameterInvalidException("Invalid folder id specified.") self._check_access(trans, is_admin, folder, current_user_roles) self._check_add(trans, is_admin, folder, current_user_roles) if payload.roles: # Check to see if the user selected roles to associate with the DATASET_ACCESS permission # on the dataset that would cause accessibility issues. vars = dict(DATASET_ACCESS_in=payload.roles) permissions, in_roles, error, message = trans.app.security_agent.derive_roles_from_access( trans, folder.parent_library.id, "api", library=True, **vars ) if error: raise exceptions.RequestParameterInvalidException(message) created_outputs_dict = self._upload_dataset(trans, folder.id, payload) return created_outputs_dict def _create_folder(self, trans, payload): is_admin = trans.user_is_admin current_user_roles = trans.get_current_user_roles() parent_folder = trans.sa_session.get(LibraryFolder, payload.folder_id) if not parent_folder: raise exceptions.RequestParameterInvalidException("Invalid folder id specified.") # Check the library which actually contains the user-supplied parent folder, not the user-supplied # library, which could be anything. self._check_access(trans, is_admin, parent_folder, current_user_roles) self._check_add(trans, is_admin, parent_folder, current_user_roles) new_folder = LibraryFolder(name=payload.name, description=payload.description) # We are associating the last used genome build with folders, so we will always # initialize a new folder with the first dbkey in genome builds list which is currently # ? unspecified (?) new_folder.genome_build = trans.app.genome_builds.default_value parent_folder.add_folder(new_folder) trans.sa_session.add(new_folder) with transaction(trans.sa_session): trans.sa_session.commit() # New folders default to having the same permissions as their parent folder trans.app.security_agent.copy_library_permissions(trans, parent_folder, new_folder) new_folder_dict = dict(created=new_folder) return new_folder_dict def _create_collection(self, trans, payload, parent): # Not delegating to library_common, so need to check access to parent folder here. self.check_user_can_add_to_library_item(trans, parent, check_accessible=True) create_params = api_payload_to_create_params(payload.model_dump()) # collection_manager.create needs trans as one of the params create_params["trans"] = trans create_params["parent"] = parent dataset_collection_instance = self.collection_manager.create(**create_params) dataset_collection = dictify_dataset_collection_instance( dataset_collection_instance, security=trans.security, url_builder=trans.url_builder, parent=parent ) return [dataset_collection] def _check_access(self, trans, is_admin, item, current_user_roles): if isinstance(item, trans.model.HistoryDatasetAssociation): # Make sure the user has the DATASET_ACCESS permission on the history_dataset_association. if not item: message = f"Invalid history dataset ({escape(str(item))}) specified." raise exceptions.ObjectNotFound(message) elif ( not trans.app.security_agent.can_access_dataset(current_user_roles, item.dataset) and item.user == trans.user ): message = f"You do not have permission to access the history dataset with id ({str(item.id)})." raise exceptions.ItemAccessibilityException(message) else: # Make sure the user has the LIBRARY_ACCESS permission on the library item. if not item: message = f"Invalid library item ({escape(str(item))}) specified." raise exceptions.ObjectNotFound(message) elif not ( is_admin or trans.app.security_agent.can_access_library_item(current_user_roles, item, trans.user) ): if isinstance(item, trans.model.Library): item_type = "data library" elif isinstance(item, LibraryFolder): item_type = "folder" else: item_type = "(unknown item type)" message = f"You do not have permission to access the {escape(item_type)} with id ({str(item.id)})." raise exceptions.ItemAccessibilityException(message) def _check_add(self, trans, is_admin, item, current_user_roles): # Deny access if the user is not an admin and does not have the LIBRARY_ADD permission. if not (is_admin or trans.app.security_agent.can_add_library_item(current_user_roles, item)): message = f"You are not authorized to add an item to ({escape(item.name)})." raise exceptions.ItemAccessibilityException(message)