Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.actions.library

"""
Contains library functions
"""

import json
import logging
import os.path

from markupsafe import escape

from galaxy import (
    exceptions,
    util,
)
from galaxy.managers.collections_util import (
    api_payload_to_create_params,
    dictify_dataset_collection_instance,
)
from galaxy.model import LibraryFolder
from galaxy.model.base import transaction
from galaxy.tools.actions import upload_common
from galaxy.tools.parameters import populate_state
from galaxy.util.path import (
    safe_contains,
    safe_relpath,
    unsafe_walk,
)

log = logging.getLogger(__name__)


[docs]def validate_server_directory_upload(trans, server_dir): if server_dir in [None, "None", ""]: raise exceptions.RequestParameterInvalidException("Invalid or unspecified server_dir parameter") if trans.user_is_admin: import_dir = trans.app.config.library_import_dir import_dir_desc = "library_import_dir" if not import_dir: raise exceptions.ConfigDoesNotAllowException('"library_import_dir" is not set in the Galaxy configuration') else: import_dir = trans.app.config.user_library_import_dir if not import_dir: raise exceptions.ConfigDoesNotAllowException( '"user_library_import_dir" is not set in the Galaxy configuration' ) if server_dir != trans.user.email: import_dir = os.path.join(import_dir, trans.user.email) import_dir_desc = "user_library_import_dir" full_dir = os.path.join(import_dir, server_dir) unsafe = None if safe_relpath(server_dir): username = trans.user.username if trans.app.config.user_library_import_check_permissions else None if import_dir_desc == "user_library_import_dir" and safe_contains( import_dir, full_dir, allowlist=trans.app.config.user_library_import_symlink_allowlist ): for unsafe in unsafe_walk( full_dir, allowlist=[import_dir] + trans.app.config.user_library_import_symlink_allowlist, username=username, ): log.error( "User attempted to import a path that resolves to a path outside of their import dir: %s -> %s", unsafe, os.path.realpath(unsafe), ) else: log.error( "User attempted to import a directory path that resolves to a path outside of their import dir: %s -> %s", server_dir, os.path.realpath(full_dir), ) unsafe = True if unsafe: raise exceptions.RequestParameterInvalidException("Invalid server_dir specified") return full_dir, import_dir_desc
[docs]def validate_path_upload(trans): if not trans.app.config.allow_library_path_paste: raise exceptions.ConfigDoesNotAllowException( '"allow_path_paste" is not set to True in the Galaxy configuration file' ) if not trans.user_is_admin: raise exceptions.AdminRequiredException( "Uploading files via filesystem paths can only be performed by administrators" )
[docs]class LibraryActions: """ Mixin for controllers that provide library functionality. """ def _upload_dataset(self, trans, folder_id: int, payload): # Set up the traditional tool state/params cntrller = "api" tool_id = "upload1" upload_common.validate_datatype_extension( datatypes_registry=trans.app.datatypes_registry, ext=payload.file_type ) tool = trans.app.toolbox.get_tool(tool_id) state = tool.new_state(trans) populate_state(trans, tool.inputs, payload.model_dump(), state.inputs) tool_params = state.inputs dataset_upload_inputs = [] for input in tool.inputs.values(): if input.type == "upload_dataset": dataset_upload_inputs.append(input) # Library-specific params if payload.upload_option == "upload_directory": full_dir, import_dir_desc = validate_server_directory_upload(trans, payload.server_dir) elif payload.upload_option == "upload_paths": # Library API already checked this - following check isn't actually needed. validate_path_upload(trans) # Some error handling should be added to this method. try: # FIXME: instead of passing params here ( which have been processed by util.Params(), the original payload # should be passed so that complex objects that may have been included in the initial request remain. library_bunch = upload_common.handle_library_params(trans, payload.model_dump(), folder_id, None) except Exception: raise exceptions.InvalidFileFormatError("Invalid folder specified") # Proceed with (mostly) regular upload processing if we're still errorless if payload.upload_option == "upload_file": for i, upload_dataset in enumerate(tool_params["files"]): upload_dataset["file_data"] = payload.upload_files[i] tool_params = upload_common.persist_uploads(tool_params, trans) uploaded_datasets = upload_common.get_uploaded_datasets( trans, cntrller, tool_params, dataset_upload_inputs, library_bunch=library_bunch ) elif payload.upload_option == "upload_directory": uploaded_datasets = self._get_server_dir_uploaded_datasets( trans, payload, full_dir, import_dir_desc, library_bunch ) elif payload.upload_option == "upload_paths": uploaded_datasets, response_code, message = self._get_path_paste_uploaded_datasets( trans, payload.model_dump(), library_bunch, 200, None ) if response_code != 200: raise exceptions.RequestParameterInvalidException(message) if payload.upload_option == "upload_file" and not uploaded_datasets: raise exceptions.RequestParameterInvalidException("Select a file, enter a URL or enter text") json_file_path = upload_common.create_paramfile(trans, uploaded_datasets) data_list = [ud.data for ud in uploaded_datasets] job_params = {} job_params["link_data_only"] = json.dumps(payload.link_data_only) job_params["uuid"] = json.dumps(payload.uuid) job, output = upload_common.create_job( trans, tool_params, tool, json_file_path, data_list, folder=library_bunch.folder, job_params=job_params ) trans.app.job_manager.enqueue(job, tool=tool) if not output: raise exceptions.RequestParameterInvalidException("Upload failed") return output def _get_server_dir_uploaded_datasets(self, trans, payload, full_dir, import_dir_desc, library_bunch): files = self._get_server_dir_files(payload, full_dir, import_dir_desc) uploaded_datasets = [] for file in files: name = os.path.basename(file) uploaded_datasets.append( self._make_library_uploaded_dataset( trans, payload.model_dump(), name, file, "server_dir", library_bunch ) ) return uploaded_datasets def _get_server_dir_files(self, payload, full_dir, import_dir_desc): files = [] try: for entry in os.listdir(full_dir): # Only import regular files path = os.path.join(full_dir, entry) if os.path.islink(full_dir) and payload.link_data_only == "link_to_files": # If we're linking instead of copying and the # sub-"directory" in the import dir is actually a symlink, # dereference the symlink, but not any of its contents. link_path = os.readlink(full_dir) if os.path.isabs(link_path): path = os.path.join(link_path, entry) else: path = os.path.abspath(os.path.join(link_path, entry)) elif os.path.islink(path) and os.path.isfile(path) and payload.link_data_only == "link_to_files": # If we're linking instead of copying and the "file" in the # sub-directory of the import dir is actually a symlink, # dereference the symlink (one dereference only, Vasili). link_path = os.readlink(path) if os.path.isabs(link_path): path = link_path else: path = os.path.abspath(os.path.join(os.path.dirname(path), link_path)) if os.path.isfile(path): files.append(path) except Exception as e: raise exceptions.InternalServerError( f"Unable to get file list for configured {import_dir_desc}, error: {util.unicodify(e)}" ) if not files: raise exceptions.ObjectAttributeMissingException(f"The directory '{full_dir}' contains no valid files") return files def _get_path_paste_uploaded_datasets(self, trans, params, library_bunch, response_code, message): preserve_dirs = util.string_as_bool(params.get("preserve_dirs", False)) uploaded_datasets = [] (files_and_folders, _response_code, _message) = self._get_path_files_and_folders(params, preserve_dirs) if _response_code: return (uploaded_datasets, _response_code, _message) for path, name, folder in files_and_folders: uploaded_datasets.append( self._make_library_uploaded_dataset(trans, params, name, path, "path_paste", library_bunch, folder) ) return uploaded_datasets, 200, None def _get_path_files_and_folders(self, params, preserve_dirs): if problem_response := self._check_path_paste_params(params): return problem_response files_and_folders = [] for line, path in self._paths_list(params): line_files_and_folders = self._get_single_path_files_and_folders(line, path, preserve_dirs) files_and_folders.extend(line_files_and_folders) return files_and_folders, None, None def _get_single_path_files_and_folders(self, line, path, preserve_dirs): files_and_folders = [] if os.path.isfile(path): name = os.path.basename(path) files_and_folders.append((path, name, None)) for basedir, _dirs, files in os.walk(line): for file in files: file_path = os.path.abspath(os.path.join(basedir, file)) if preserve_dirs: in_folder = os.path.dirname(file_path.replace(path, "", 1).lstrip("/")) else: in_folder = None files_and_folders.append((file_path, file, in_folder)) return files_and_folders def _paths_list(self, params): return [ (line.strip(), os.path.abspath(line.strip())) for line in params.get("filesystem_paths", "").splitlines() if line.strip() ] def _check_path_paste_params(self, params): if params.get("filesystem_paths", "") == "": message = "No paths entered in the upload form" response_code = 400 return None, response_code, message bad_paths = [] for _, path in self._paths_list(params): if not os.path.exists(path): bad_paths.append(path) if bad_paths: message = 'Invalid paths: "{}".'.format('", "'.join(bad_paths)) response_code = 400 return None, response_code, message return None def _make_library_uploaded_dataset(self, trans, params, name, path, type, library_bunch, in_folder=None): link_data_only = params.get("link_data_only", "copy_files") uuid_str = params.get("uuid", None) file_type = params.get("file_type", None) library_bunch.replace_dataset = None # not valid for these types of upload uploaded_dataset = util.bunch.Bunch() new_name = name # Remove compressed file extensions, if any, but only if # we're copying files into Galaxy's file space. if link_data_only == "copy_files": if new_name.endswith(".gz"): new_name = new_name.rstrip(".gz") elif new_name.endswith(".zip"): new_name = new_name.rstrip(".zip") uploaded_dataset.name = new_name uploaded_dataset.path = path uploaded_dataset.type = type uploaded_dataset.ext = None uploaded_dataset.file_type = file_type uploaded_dataset.dbkey = params.get("dbkey", None) uploaded_dataset.to_posix_lines = params.get("to_posix_lines", None) uploaded_dataset.space_to_tab = params.get("space_to_tab", None) uploaded_dataset.tag_using_filenames = params.get("tag_using_filenames", False) uploaded_dataset.tags = params.get("tags", None) uploaded_dataset.purge_source = getattr(trans.app.config, "ftp_upload_purge", True) if in_folder: uploaded_dataset.in_folder = in_folder uploaded_dataset.data = upload_common.new_upload(trans, "api", uploaded_dataset, library_bunch) uploaded_dataset.link_data_only = link_data_only uploaded_dataset.uuid = uuid_str if link_data_only == "link_to_files": uploaded_dataset.data.link_to(path) trans.sa_session.add_all((uploaded_dataset.data, uploaded_dataset.data.dataset)) with transaction(trans.sa_session): trans.sa_session.commit() return uploaded_dataset def _upload_library_dataset(self, trans, payload): is_admin = trans.user_is_admin current_user_roles = trans.get_current_user_roles() folder = trans.sa_session.get(LibraryFolder, payload.folder_id) if not folder: raise exceptions.RequestParameterInvalidException("Invalid folder id specified.") self._check_access(trans, is_admin, folder, current_user_roles) self._check_add(trans, is_admin, folder, current_user_roles) if payload.roles: # Check to see if the user selected roles to associate with the DATASET_ACCESS permission # on the dataset that would cause accessibility issues. vars = dict(DATASET_ACCESS_in=payload.roles) permissions, in_roles, error, message = trans.app.security_agent.derive_roles_from_access( trans, folder.parent_library.id, "api", library=True, **vars ) if error: raise exceptions.RequestParameterInvalidException(message) created_outputs_dict = self._upload_dataset(trans, folder.id, payload) return created_outputs_dict def _create_folder(self, trans, payload): is_admin = trans.user_is_admin current_user_roles = trans.get_current_user_roles() parent_folder = trans.sa_session.get(LibraryFolder, payload.folder_id) if not parent_folder: raise exceptions.RequestParameterInvalidException("Invalid folder id specified.") # Check the library which actually contains the user-supplied parent folder, not the user-supplied # library, which could be anything. self._check_access(trans, is_admin, parent_folder, current_user_roles) self._check_add(trans, is_admin, parent_folder, current_user_roles) new_folder = LibraryFolder(name=payload.name, description=payload.description) # We are associating the last used genome build with folders, so we will always # initialize a new folder with the first dbkey in genome builds list which is currently # ? unspecified (?) new_folder.genome_build = trans.app.genome_builds.default_value parent_folder.add_folder(new_folder) trans.sa_session.add(new_folder) with transaction(trans.sa_session): trans.sa_session.commit() # New folders default to having the same permissions as their parent folder trans.app.security_agent.copy_library_permissions(trans, parent_folder, new_folder) new_folder_dict = dict(created=new_folder) return new_folder_dict def _create_collection(self, trans, payload, parent): # Not delegating to library_common, so need to check access to parent folder here. self.check_user_can_add_to_library_item(trans, parent, check_accessible=True) create_params = api_payload_to_create_params(payload.model_dump()) # collection_manager.create needs trans as one of the params create_params["trans"] = trans create_params["parent"] = parent dataset_collection_instance = self.collection_manager.create(**create_params) dataset_collection = dictify_dataset_collection_instance( dataset_collection_instance, security=trans.security, url_builder=trans.url_builder, parent=parent ) return [dataset_collection] def _check_access(self, trans, is_admin, item, current_user_roles): if isinstance(item, trans.model.HistoryDatasetAssociation): # Make sure the user has the DATASET_ACCESS permission on the history_dataset_association. if not item: message = f"Invalid history dataset ({escape(str(item))}) specified." raise exceptions.ObjectNotFound(message) elif ( not trans.app.security_agent.can_access_dataset(current_user_roles, item.dataset) and item.user == trans.user ): message = f"You do not have permission to access the history dataset with id ({str(item.id)})." raise exceptions.ItemAccessibilityException(message) else: # Make sure the user has the LIBRARY_ACCESS permission on the library item. if not item: message = f"Invalid library item ({escape(str(item))}) specified." raise exceptions.ObjectNotFound(message) elif not ( is_admin or trans.app.security_agent.can_access_library_item(current_user_roles, item, trans.user) ): if isinstance(item, trans.model.Library): item_type = "data library" elif isinstance(item, LibraryFolder): item_type = "folder" else: item_type = "(unknown item type)" message = f"You do not have permission to access the {escape(item_type)} with id ({str(item.id)})." raise exceptions.ItemAccessibilityException(message) def _check_add(self, trans, is_admin, item, current_user_roles): # Deny access if the user is not an admin and does not have the LIBRARY_ADD permission. if not (is_admin or trans.app.security_agent.can_add_library_item(current_user_roles, item)): message = f"You are not authorized to add an item to ({escape(item.name)})." raise exceptions.ItemAccessibilityException(message)