Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.tool_shed.metadata.metadata_generator
import logging
import os
import tempfile
from typing import (
Any,
cast,
Dict,
List,
Optional,
Tuple,
TYPE_CHECKING,
Union,
)
from typing_extensions import (
Protocol,
TypedDict,
)
from galaxy import util
from galaxy.model.tool_shed_install import ToolShedRepository
from galaxy.tool_shed.galaxy_install.client import (
DataManagerInterface,
InstallationTarget,
)
from galaxy.tool_shed.repository_type import (
REPOSITORY_DEPENDENCY_DEFINITION_FILENAME,
TOOL_DEPENDENCY_DEFINITION_FILENAME,
)
from galaxy.tool_shed.tools.tool_validator import ToolValidator
from galaxy.tool_shed.util import (
shed_util_common as suc,
tool_dependency_util,
tool_util,
)
from galaxy.tool_shed.util.basic_util import (
remove_dir,
strip_path,
)
from galaxy.tool_shed.util.hg_util import get_config_from_disk
from galaxy.tool_shed.util.metadata_util import get_updated_changeset_revisions_from_tool_shed
from galaxy.tool_shed.util.repository_util import get_repository_for_dependency_relationship
from galaxy.tool_util.loader_directory import looks_like_a_tool
from galaxy.tool_util.parser.interface import TestCollectionDef
from galaxy.tools.repositories import ValidationContext
from galaxy.util.tool_shed.common_util import (
generate_clone_url_for_installed_repository,
remove_protocol_and_user_from_clone_url,
remove_protocol_from_tool_shed_url,
)
from galaxy.util.tool_shed.xml_util import parse_xml
if TYPE_CHECKING:
from galaxy.structured_app import BasicSharedApp
log = logging.getLogger(__name__)
InvalidFileT = Tuple[str, str]
HandleResultT = Tuple[List, bool, str]
NOT_TOOL_CONFIGS = [
suc.DATATYPES_CONFIG_FILENAME,
REPOSITORY_DEPENDENCY_DEFINITION_FILENAME,
TOOL_DEPENDENCY_DEFINITION_FILENAME,
suc.REPOSITORY_DATA_MANAGER_CONFIG_FILENAME,
]
[docs]class RepositoryMetadataToolDict(TypedDict):
id: str
guid: str
name: str
version: str
profile: str
description: Optional[str]
version_string_cmd: Optional[str]
tool_config: str
tool_type: str
requirements: Optional[Any]
tests: Optional[Any]
add_to_tool_panel: bool
[docs]class BaseMetadataGenerator:
app: Union["BasicSharedApp", InstallationTarget]
repository: Optional[RepositoryProtocol]
invalid_file_tups: List[InvalidFileT]
changeset_revision: Optional[str]
repository_clone_url: Optional[str]
shed_config_dict: Dict[str, Any]
metadata_dict: Dict[str, Any]
relative_install_dir: Optional[str]
repository_files_dir: Optional[str]
persist: bool
[docs] def handle_repository_elem(self, repository_elem, only_if_compiling_contained_td=False) -> HandleResultT:
raise NotImplementedError()
def _generate_data_manager_metadata(
self, repo_dir, data_manager_config_filename, metadata_dict: Dict[str, Any], shed_config_dict=None
) -> Dict[str, Any]:
"""
Update the received metadata_dict with information from the parsed data_manager_config_filename.
"""
if data_manager_config_filename is None:
return metadata_dict
assert self.repository
repo_path = self.repository.repo_path(self.app)
if hasattr(self.repository, "repo_files_directory"):
# Galaxy Side.
repo_files_directory = self.repository.repo_files_directory(self.app)
repo_dir = repo_files_directory
else:
# Tool Shed side.
repo_files_directory = repo_path
relative_data_manager_dir = util.relpath(os.path.split(data_manager_config_filename)[0], repo_dir)
rel_data_manager_config_filename = os.path.join(
relative_data_manager_dir, os.path.split(data_manager_config_filename)[1]
)
data_managers: Dict[str, dict] = {}
invalid_data_managers: List[dict] = []
data_manager_metadata = {
"config_filename": rel_data_manager_config_filename,
"data_managers": data_managers,
"invalid_data_managers": invalid_data_managers,
"error_messages": [],
}
metadata_dict["data_manager"] = data_manager_metadata
tree, error_message = parse_xml(data_manager_config_filename)
if tree is None:
# We are not able to load any data managers.
data_manager_metadata["error_messages"].append(error_message)
return metadata_dict
tool_path = None
if shed_config_dict:
tool_path = shed_config_dict.get("tool_path", None)
tools = {}
for tool in metadata_dict.get("tools", []):
tool_conf_name = tool["tool_config"]
if tool_path:
tool_conf_name = os.path.join(tool_path, tool_conf_name)
tools[tool_conf_name] = tool
root = tree.getroot()
if data_manager_tool_path := root.get("tool_path", None):
relative_data_manager_dir = os.path.join(relative_data_manager_dir, data_manager_tool_path)
for i, data_manager_elem in enumerate(root.findall("data_manager")):
tool_file = data_manager_elem.get("tool_file", None)
data_manager_id = data_manager_elem.get("id", None)
if data_manager_id is None:
log.error(f'Data Manager entry is missing id attribute in "{data_manager_config_filename}".')
invalid_data_managers.append(
{"index": i, "error_message": "Data Manager entry is missing id attribute"}
)
continue
# FIXME: default behavior is to fall back to tool.name.
data_manager_name = data_manager_elem.get("name", data_manager_id)
version = data_manager_elem.get("version", DataManagerInterface.DEFAULT_VERSION)
guid = self._generate_guid_for_object(DataManagerInterface.GUID_TYPE, data_manager_id, version)
data_tables = []
if tool_file is None:
log.error(f'Data Manager entry is missing tool_file attribute in "{data_manager_config_filename}".')
invalid_data_managers.append(
{"index": i, "error_message": "Data Manager entry is missing tool_file attribute"}
)
continue
else:
bad_data_table = False
for data_table_elem in data_manager_elem.findall("data_table"):
data_table_name = data_table_elem.get("name", None)
if data_table_name is None:
log.error(
f'Data Manager data_table entry is missing name attribute in "{data_manager_config_filename}".'
)
invalid_data_managers.append(
{"index": i, "error_message": "Data Manager entry is missing name attribute"}
)
bad_data_table = True
break
else:
data_tables.append(data_table_name)
if bad_data_table:
continue
data_manager_metadata_tool_file = os.path.normpath(os.path.join(relative_data_manager_dir, tool_file))
tool_metadata_tool_file = os.path.join(repo_files_directory, data_manager_metadata_tool_file)
tool = tools.get(tool_metadata_tool_file, None)
if tool is None:
log.error(f"Unable to determine tools metadata for '{data_manager_metadata_tool_file}'.")
invalid_data_managers.append({"index": i, "error_message": "Unable to determine tools metadata"})
continue
data_managers[data_manager_id] = {
"id": data_manager_id,
"name": data_manager_name,
"guid": guid,
"version": version,
"tool_config_file": data_manager_metadata_tool_file,
"data_tables": data_tables,
"tool_guid": tool["guid"],
}
log.debug(f"Loaded Data Manager tool_files: {tool_file}")
return metadata_dict
[docs] def generate_environment_dependency_metadata(self, elem, valid_tool_dependencies_dict):
"""
The value of env_var_name must match the value of the "set_environment" type
in the tool config's <requirements> tag set, or the tool dependency will be
considered an orphan.
"""
# The value of the received elem looks something like this:
# <set_environment version="1.0">
# <environment_variable name="JAVA_JAR_PATH" action="set_to">$INSTALL_DIR</environment_variable>
# </set_environment>
for env_elem in elem:
# <environment_variable name="JAVA_JAR_PATH" action="set_to">$INSTALL_DIR</environment_variable>
env_name = env_elem.get("name", None)
if env_name:
requirements_dict = dict(name=env_name, type="set_environment")
if "set_environment" in valid_tool_dependencies_dict:
valid_tool_dependencies_dict["set_environment"].append(requirements_dict)
else:
valid_tool_dependencies_dict["set_environment"] = [requirements_dict]
return valid_tool_dependencies_dict
def _generate_guid_for_object(self, guid_type, obj_id, version) -> str:
assert self.repository_clone_url
tmp_url = remove_protocol_and_user_from_clone_url(self.repository_clone_url)
return f"{tmp_url}/{guid_type}/{obj_id}/{version}"
[docs] def generate_metadata_for_changeset_revision(self):
"""
Generate metadata for a repository using its files on disk. To generate metadata
for changeset revisions older than the repository tip, the repository will have been
cloned to a temporary location and updated to a specified changeset revision to access
that changeset revision's disk files, so the value of self.repository_files_dir will not
always be self.repository.repo_path( self.app ) (it could be an absolute path to a temporary
directory containing a clone). If it is an absolute path, the value of self.relative_install_dir
must contain repository.repo_path( self.app ).
The value of self.persist will be True when the installed repository contains a valid
tool_data_table_conf.xml.sample file, in which case the entries should ultimately be
persisted to the file referred to by self.app.config.shed_tool_data_table_config.
"""
if self.shed_config_dict is None:
self.shed_config_dict = {}
assert self.repository
if self.updating_installed_repository:
# Keep the original tool shed repository metadata if setting metadata on a repository
# installed into a local Galaxy instance for which we have pulled updates.
gx_repository = cast(ToolShedRepository, self.repository) # definitely in Galaxy version
original_repository_metadata = gx_repository.metadata_
else:
original_repository_metadata = None
readme_file_names = _get_readme_file_names(str(self.repository.name))
metadata_dict = self.initial_metadata_dict()
readme_files = []
invalid_tool_configs = []
if self.resetting_all_metadata_on_repository:
if not self.relative_install_dir:
raise Exception(
"The value of self.repository.repo_path must be set when resetting all metadata on a repository."
)
# Keep track of the location where the repository is temporarily cloned so that we can
# strip the path when setting metadata. The value of self.repository_files_dir is the
# full path to the temporary directory to which self.repository was cloned.
work_dir = self.repository_files_dir
files_dir = self.repository_files_dir
# Since we're working from a temporary directory, we can safely copy sample files included
# in the repository to the repository root.
else:
# Use a temporary working directory to copy all sample files.
work_dir = tempfile.mkdtemp(prefix="tmp-toolshed-gmfcr")
# All other files are on disk in the repository's repo_path, which is the value of
# self.relative_install_dir.
assert self.relative_install_dir
files_dir = self.relative_install_dir
if self.shed_config_dict.get("tool_path"):
files_dir = os.path.join(self.shed_config_dict["tool_path"], files_dir)
assert files_dir
# Create ValidationContext to load and validate tools, data tables and datatypes
with ValidationContext.from_app(app=self.app, work_dir=work_dir) as validation_context:
tv = ToolValidator(validation_context)
# Get the relative path to all sample files included in the repository for storage in
# the repository's metadata.
sample_file_metadata_paths, sample_file_copy_paths = self.get_sample_files_from_disk(
repository_files_dir=files_dir,
tool_path=self.shed_config_dict.get("tool_path"),
relative_install_dir=self.relative_install_dir,
)
if sample_file_metadata_paths:
metadata_dict["sample_files"] = sample_file_metadata_paths
# Copy all sample files included in the repository to a single directory location so we
# can load tools that depend on them.
data_table_conf_xml_sample_files = []
for sample_file in sample_file_copy_paths:
tool_util.copy_sample_file(self.app.config.tool_data_path, sample_file, dest_path=work_dir)
# If the list of sample files includes a tool_data_table_conf.xml.sample file, load
# its table elements into memory.
relative_path, filename = os.path.split(sample_file)
if filename == "tool_data_table_conf.xml.sample":
data_table_conf_xml_sample_files.append(sample_file)
for data_table_conf_xml_sample_file in data_table_conf_xml_sample_files:
# We create a new ToolDataTableManager to avoid adding entries to the app-wide
# tool data tables. This is only used for checking that the data table is valid.
new_table_elems, error_message = validation_context.tool_data_tables.add_new_entries_from_config_file(
config_filename=data_table_conf_xml_sample_file,
tool_data_path=work_dir,
shed_tool_data_table_config=work_dir,
persist=False,
)
if error_message:
self.invalid_file_tups.append((filename, error_message))
for root, dirs, files in os.walk(files_dir):
if root.find(".hg") < 0 and root.find("hgrc") < 0:
if ".hg" in dirs:
dirs.remove(".hg")
for name in files:
# See if we have a repository dependencies defined.
if name == REPOSITORY_DEPENDENCY_DEFINITION_FILENAME:
path_to_repository_dependencies_config = os.path.join(root, name)
metadata_dict, error_message = self.generate_repository_dependency_metadata(
path_to_repository_dependencies_config, metadata_dict
)
if error_message:
self.invalid_file_tups.append((name, error_message))
# See if we have one or more READ_ME files.
elif name.lower() in readme_file_names:
relative_path_to_readme = self.get_relative_path_to_repository_file(
root, name, self.relative_install_dir, work_dir, self.shed_config_dict
)
readme_files.append(relative_path_to_readme)
# See if we have a tool config.
elif looks_like_a_tool(os.path.join(root, name), invalid_names=NOT_TOOL_CONFIGS):
full_path = str(os.path.abspath(os.path.join(root, name))) # why the str, seems very odd
element_tree, error_message = parse_xml(full_path)
if element_tree is None:
is_tool = False
else:
element_tree_root = element_tree.getroot()
is_tool = element_tree_root.tag == "tool"
if is_tool:
tool, valid, error_message = tv.load_tool_from_config(
self.app.security.encode_id(self.repository.id), full_path
)
if tool is None:
if not valid:
invalid_tool_configs.append(name)
self.invalid_file_tups.append((name, error_message))
else:
invalid_files_and_errors_tups = tv.check_tool_input_params(
files_dir, name, tool, sample_file_copy_paths
)
can_set_metadata = True
for tup in invalid_files_and_errors_tups:
if name in tup:
can_set_metadata = False
invalid_tool_configs.append(name)
break
if can_set_metadata:
relative_path_to_tool_config = self.get_relative_path_to_repository_file(
root, name, self.relative_install_dir, work_dir, self.shed_config_dict
)
metadata_dict = self.generate_tool_metadata(
relative_path_to_tool_config, tool, metadata_dict
)
else:
for tup in invalid_files_and_errors_tups:
self.invalid_file_tups.append(tup)
# Handle any data manager entries
data_manager_config = get_config_from_disk(suc.REPOSITORY_DATA_MANAGER_CONFIG_FILENAME, files_dir)
metadata_dict = self._generate_data_manager_metadata(
files_dir, data_manager_config, metadata_dict, shed_config_dict=self.shed_config_dict
)
if readme_files:
metadata_dict["readme_files"] = readme_files
# This step must be done after metadata for tools has been defined.
tool_dependencies_config = get_config_from_disk(TOOL_DEPENDENCY_DEFINITION_FILENAME, files_dir)
if tool_dependencies_config:
metadata_dict, error_message = self.generate_tool_dependency_metadata(
tool_dependencies_config, metadata_dict, original_repository_metadata=original_repository_metadata
)
if error_message:
self.invalid_file_tups.append((TOOL_DEPENDENCY_DEFINITION_FILENAME, error_message))
if invalid_tool_configs:
metadata_dict["invalid_tools"] = invalid_tool_configs
self.metadata_dict = metadata_dict
remove_dir(work_dir)
[docs] def generate_package_dependency_metadata(self, elem, valid_tool_dependencies_dict, invalid_tool_dependencies_dict):
"""
Generate the metadata for a tool dependencies package defined for a repository. The
value of package_name must match the value of the "package" type in the tool config's
<requirements> tag set.
"""
# TODO: make this function a class.
repository_dependency_is_valid = True
repository_dependency_tup: list = []
requirements_dict = {}
error_message = ""
package_name = elem.get("name", None)
package_version = elem.get("version", None)
if package_name and package_version:
requirements_dict["name"] = package_name
requirements_dict["version"] = package_version
requirements_dict["type"] = "package"
for sub_elem in elem:
if sub_elem.tag == "readme":
requirements_dict["readme"] = sub_elem.text
elif sub_elem.tag == "repository":
# We have a complex repository dependency. If the returned value of repository_dependency_is_valid
# is True, the tool dependency definition will be set as invalid. This is currently the only case
# where a tool dependency definition is considered invalid.
(
repository_dependency_tup,
repository_dependency_is_valid,
error_message,
) = self.handle_repository_elem(repository_elem=sub_elem, only_if_compiling_contained_td=False)
elif sub_elem.tag == "install":
package_install_version = sub_elem.get("version", "1.0")
if package_install_version == "1.0":
# Complex repository dependencies can be defined within the last <actions> tag set contained in an
# <actions_group> tag set. Comments, <repository> tag sets and <readme> tag sets will be skipped
# in tool_dependency_util.parse_package_elem().
actions_elem_tuples = tool_dependency_util.parse_package_elem(
sub_elem, platform_info_dict=None, include_after_install_actions=False
)
if actions_elem_tuples:
# We now have a list of a single tuple that looks something like:
# [(True, <Element 'actions' at 0x104017850>)]
actions_elem_tuple = actions_elem_tuples[0]
in_actions_group, actions_elem = actions_elem_tuple
if in_actions_group:
# Since we're inside an <actions_group> tag set, inspect the actions_elem to see if a complex
# repository dependency is defined. By definition, complex repository dependency definitions
# contained within the last <actions> tag set within an <actions_group> tag set will have the
# value of "only_if_compiling_contained_td" set to True in
for action_elem in actions_elem:
if action_elem.tag == "package":
# <package name="libgtextutils" version="0.6">
# <repository name="package_libgtextutils_0_6" owner="test" prior_installation_required="True" />
# </package>
ae_package_name = action_elem.get("name", None)
ae_package_version = action_elem.get("version", None)
if ae_package_name and ae_package_version:
for sub_action_elem in action_elem:
if sub_action_elem.tag == "repository":
# We have a complex repository dependency.
(
repository_dependency_tup,
repository_dependency_is_valid,
error_message,
) = self.handle_repository_elem(
repository_elem=sub_action_elem,
only_if_compiling_contained_td=True,
)
elif action_elem.tag == "action":
# <action type="set_environment_for_install">
# <repository changeset_revision="b107b91b3574" name="package_readline_6_2" owner="devteam" prior_installation_required="True" toolshed="http://localhost:9009">
# <package name="readline" version="6.2" />
# </repository>
# </action>
for sub_action_elem in action_elem:
if sub_action_elem.tag == "repository":
# We have a complex repository dependency.
(
repository_dependency_tup,
repository_dependency_is_valid,
error_message,
) = self.handle_repository_elem(
repository_elem=sub_action_elem, only_if_compiling_contained_td=True
)
if requirements_dict:
dependency_key = f"{package_name}/{package_version}"
if repository_dependency_is_valid:
valid_tool_dependencies_dict[dependency_key] = requirements_dict
else:
# Append the error message to the requirements_dict.
requirements_dict["error"] = error_message
invalid_tool_dependencies_dict[dependency_key] = requirements_dict
return (
valid_tool_dependencies_dict,
invalid_tool_dependencies_dict,
repository_dependency_tup,
repository_dependency_is_valid,
error_message,
)
[docs] def generate_repository_dependency_metadata(self, repository_dependencies_config, metadata_dict):
"""
Generate a repository dependencies dictionary based on valid information defined in the received
repository_dependencies_config. This method is called from the tool shed as well as from Galaxy.
"""
# Make sure we're looking at a valid repository_dependencies.xml file.
tree, error_message = parse_xml(repository_dependencies_config)
if tree is None:
xml_is_valid = False
else:
root = tree.getroot()
xml_is_valid = root.tag == "repositories"
if xml_is_valid:
invalid_repository_dependencies_dict: Dict[str, Any] = dict(description=root.get("description"))
invalid_repository_dependency_tups = []
valid_repository_dependencies_dict: Dict[str, Any] = dict(description=root.get("description"))
valid_repository_dependency_tups = []
for repository_elem in root.findall("repository"):
repository_dependency_tup, repository_dependency_is_valid, err_msg = self.handle_repository_elem(
repository_elem, only_if_compiling_contained_td=False
)
if repository_dependency_is_valid:
valid_repository_dependency_tups.append(repository_dependency_tup)
else:
# Append the error_message to the repository dependencies tuple.
(
toolshed,
name,
owner,
changeset_revision,
prior_installation_required,
only_if_compiling_contained_td,
) = repository_dependency_tup
invalid_repository_dependency_tup = (
toolshed,
name,
owner,
changeset_revision,
prior_installation_required,
only_if_compiling_contained_td,
err_msg,
)
invalid_repository_dependency_tups.append(invalid_repository_dependency_tup)
error_message += err_msg
if invalid_repository_dependency_tups:
invalid_repository_dependencies_dict["repository_dependencies"] = invalid_repository_dependency_tups
metadata_dict["invalid_repository_dependencies"] = invalid_repository_dependencies_dict
if valid_repository_dependency_tups:
valid_repository_dependencies_dict["repository_dependencies"] = valid_repository_dependency_tups
metadata_dict["repository_dependencies"] = valid_repository_dependencies_dict
return metadata_dict, error_message
[docs] def generate_tool_metadata(self, tool_config, tool, metadata_dict):
"""Update the received metadata_dict with changes that have been applied to the received tool."""
# Generate the guid.
guid = suc.generate_tool_guid(self.repository_clone_url, tool)
# Handle tool.requirements.
tool_requirements = []
for tool_requirement in tool.requirements:
name = str(tool_requirement.name)
tool_type = str(tool_requirement.type)
version = str(tool_requirement.version) if tool_requirement.version else None
requirement_dict = dict(name=name, type=tool_type, version=version)
tool_requirements.append(requirement_dict)
# Handle tool.tests.
tool_tests = []
if tool.tests:
for ttb in tool.tests:
required_files = []
for required_file in ttb.required_files:
value, extra = required_file
required_files.append(value)
inputs = []
for param_name, values in ttb.inputs.items():
# Handle improperly defined or strange test parameters and values.
if param_name is not None:
if values in [None, False]:
# An example is the third test in http://testtoolshed.g2.bx.psu.edu/view/devteam/samtools_rmdup
# which is defined as:
# <test>
# <param name="input1" value="1.bam" ftype="bam" />
# <param name="bam_paired_end_type_selector" value="PE" />
# <param name="force_se" />
# <output name="output1" file="1.bam" ftype="bam" sort="True" />
# </test>
inputs.append((param_name, values))
else:
if isinstance(values, TestCollectionDef):
# Nested required files are being populated correctly,
# not sure we need the value here to be anything else?
collection_type = values.collection_type
metadata_display_value = f"{collection_type} collection"
inputs.append((param_name, metadata_display_value))
else:
try:
if len(values) == 1:
inputs.append((param_name, values[0]))
continue
except TypeError:
log.exception(
'Expected a list of values for tool "%s" parameter "%s", got %s: %s',
tool.id,
param_name,
type(values),
values,
)
inputs.append((param_name, values))
outputs = []
for output in ttb.outputs:
name, file_name, extra = output
outputs.append((name, strip_path(file_name) if file_name else None))
if file_name not in required_files and file_name is not None:
required_files.append(file_name)
test_dict = dict(name=str(ttb.name), required_files=required_files, inputs=inputs, outputs=outputs)
tool_tests.append(test_dict)
# Determine if the tool should be loaded into the tool panel. Examples of valid tools that
# should not be displayed in the tool panel are datatypes converters and DataManager tools
# (which are of type 'manage_data').
add_to_tool_panel_attribute = self._set_add_to_tool_panel_attribute_for_tool(tool)
tool_dict = RepositoryMetadataToolDict(
id=tool.id,
guid=guid,
name=tool.name,
version=tool.version,
profile=tool.profile,
description=tool.description,
version_string_cmd=tool.version_string_cmd,
tool_config=tool_config,
tool_type=tool.tool_type,
requirements=tool_requirements,
tests=tool_tests,
add_to_tool_panel=add_to_tool_panel_attribute,
)
if "tools" in metadata_dict:
metadata_dict["tools"].append(tool_dict)
else:
metadata_dict["tools"] = [tool_dict]
return metadata_dict
[docs] def generate_tool_dependency_metadata(
self, tool_dependencies_config, metadata_dict, original_repository_metadata=None
):
"""
If the combination of name, version and type of each element is defined in the <requirement> tag for
at least one tool in self.repository, then update the received metadata_dict with information from the
parsed tool_dependencies_config.
"""
error_message = ""
if original_repository_metadata:
# Keep a copy of the original tool dependencies dictionary and the list of tool
# dictionaries in the metadata.
original_valid_tool_dependencies_dict = original_repository_metadata.get("tool_dependencies", None)
else:
original_valid_tool_dependencies_dict = None
tree, error_message = parse_xml(tool_dependencies_config)
if tree is None:
return metadata_dict, error_message
root = tree.getroot()
class RecurserValueStore:
valid_tool_dependencies_dict = {}
invalid_tool_dependencies_dict = {}
rvs = RecurserValueStore()
valid_repository_dependency_tups = []
invalid_repository_dependency_tups = []
description = root.get("description")
def _check_elem_for_dep(elems):
error_messages = []
for elem in elems:
if elem.tag == "package":
(
rvs.valid_tool_dependencies_dict,
rvs.invalid_tool_dependencies_dict,
repository_dependency_tup,
repository_dependency_is_valid,
message,
) = self.generate_package_dependency_metadata(
elem, rvs.valid_tool_dependencies_dict, rvs.invalid_tool_dependencies_dict
)
if repository_dependency_is_valid:
if (
repository_dependency_tup
and repository_dependency_tup not in valid_repository_dependency_tups
):
# We have a valid complex repository dependency.
valid_repository_dependency_tups.append(repository_dependency_tup)
else:
if (
repository_dependency_tup
and repository_dependency_tup not in invalid_repository_dependency_tups
):
# We have an invalid complex repository dependency, so mark the tool dependency as invalid.
# Append the error message to the invalid repository dependency tuple.
(
toolshed,
name,
owner,
changeset_revision,
prior_installation_required,
only_if_compiling_contained_td,
) = repository_dependency_tup
repository_dependency_tup = (
toolshed,
name,
owner,
changeset_revision,
prior_installation_required,
only_if_compiling_contained_td,
message,
)
invalid_repository_dependency_tups.append(repository_dependency_tup)
error_messages.append(f"{error_message} {message}")
elif elem.tag == "set_environment":
rvs.valid_tool_dependencies_dict = self.generate_environment_dependency_metadata(
elem, rvs.valid_tool_dependencies_dict
)
error_messages += _check_elem_for_dep(elem)
return error_messages
error_message = "\n".join([error_message] + _check_elem_for_dep(root))
if rvs.valid_tool_dependencies_dict:
if original_valid_tool_dependencies_dict:
# We're generating metadata on an update pulled to a tool shed repository installed
# into a Galaxy instance, so handle changes to tool dependencies appropriately.
installation_target = cast(InstallationTarget, self.app)
irm = installation_target.installed_repository_manager
(
updated_tool_dependency_names,
deleted_tool_dependency_names,
) = irm.handle_existing_tool_dependencies_that_changed_in_update(
self.repository, original_valid_tool_dependencies_dict, rvs.valid_tool_dependencies_dict
)
metadata_dict["tool_dependencies"] = rvs.valid_tool_dependencies_dict
if rvs.invalid_tool_dependencies_dict:
metadata_dict["invalid_tool_dependencies"] = rvs.invalid_tool_dependencies_dict
if valid_repository_dependency_tups:
metadata_dict = self._update_repository_dependencies_metadata(
metadata=metadata_dict,
repository_dependency_tups=valid_repository_dependency_tups,
is_valid=True,
description=description,
)
if invalid_repository_dependency_tups:
metadata_dict = self._update_repository_dependencies_metadata(
metadata=metadata_dict,
repository_dependency_tups=invalid_repository_dependency_tups,
is_valid=False,
description=description,
)
return metadata_dict, error_message
[docs] def get_relative_path_to_repository_file(self, root, name, relative_install_dir, work_dir, shed_config_dict):
if self.resetting_all_metadata_on_repository:
full_path_to_file = os.path.join(root, name)
stripped_path_to_file = full_path_to_file.replace(work_dir, "")
if stripped_path_to_file.startswith("/"):
stripped_path_to_file = stripped_path_to_file[1:]
relative_path_to_file = os.path.join(relative_install_dir, stripped_path_to_file)
else:
relative_path_to_file = os.path.join(root, name)
if (
relative_install_dir
and shed_config_dict.get("tool_path")
and relative_path_to_file.startswith(
os.path.join(shed_config_dict.get("tool_path"), relative_install_dir)
)
):
relative_path_to_file = relative_path_to_file[len(shed_config_dict.get("tool_path")) + 1 :]
return relative_path_to_file
[docs] def get_sample_files_from_disk(self, repository_files_dir, tool_path=None, relative_install_dir=None):
work_dir = ""
if self.resetting_all_metadata_on_repository:
# Keep track of the location where the repository is temporarily cloned so that we can strip
# it when setting metadata.
work_dir = repository_files_dir
sample_file_metadata_paths = []
sample_file_copy_paths = []
for root, _dirs, files in os.walk(repository_files_dir):
if root.find(".hg") < 0:
for name in files:
if name.endswith(".sample"):
if self.resetting_all_metadata_on_repository:
full_path_to_sample_file = os.path.join(root, name)
stripped_path_to_sample_file = full_path_to_sample_file.replace(work_dir, "")
if stripped_path_to_sample_file.startswith("/"):
stripped_path_to_sample_file = stripped_path_to_sample_file[1:]
relative_path_to_sample_file = os.path.join(
relative_install_dir, stripped_path_to_sample_file
)
if os.path.exists(relative_path_to_sample_file):
sample_file_copy_paths.append(relative_path_to_sample_file)
else:
sample_file_copy_paths.append(full_path_to_sample_file)
else:
relative_path_to_sample_file = os.path.join(root, name)
sample_file_copy_paths.append(relative_path_to_sample_file)
if tool_path and relative_install_dir:
if relative_path_to_sample_file.startswith(
os.path.join(tool_path, relative_install_dir)
):
relative_path_to_sample_file = relative_path_to_sample_file[len(tool_path) + 1 :]
sample_file_metadata_paths.append(relative_path_to_sample_file)
return sample_file_metadata_paths, sample_file_copy_paths
def _set_add_to_tool_panel_attribute_for_tool(self, tool):
"""
Determine if a tool should be loaded into the Galaxy tool panel. Examples of valid tools that
should not be displayed in the tool panel are DataManager tools.
"""
if hasattr(tool, "tool_type"):
if tool.tool_type in ["manage_data"]:
# We have a DataManager tool.
return False
return True
[docs] def set_changeset_revision(self, changeset_revision: Optional[str]):
self.changeset_revision = changeset_revision
[docs] def set_relative_install_dir(self, relative_install_dir: Optional[str]):
self.relative_install_dir = relative_install_dir
def _reset_attributes_after_repository_update(self, relative_install_dir: Optional[str]):
self.metadata_dict = self.initial_metadata_dict()
self.set_relative_install_dir(relative_install_dir)
self.set_repository_files_dir()
self.resetting_all_metadata_on_repository = False
self.updating_installed_repository = False
self.persist = False
self.invalid_file_tups = []
[docs] def set_repository_files_dir(self, repository_files_dir: Optional[str] = None):
self.repository_files_dir = repository_files_dir
def _update_repository_dependencies_metadata(
self,
metadata: Dict[str, Any],
repository_dependency_tups: List[tuple],
is_valid: bool,
description: Optional[str],
) -> Dict[str, Any]:
if is_valid:
repository_dependencies_dict = metadata.get("repository_dependencies", None)
else:
repository_dependencies_dict = metadata.get("invalid_repository_dependencies", None)
for repository_dependency_tup in repository_dependency_tups:
if repository_dependencies_dict:
repository_dependencies = repository_dependencies_dict.get("repository_dependencies", [])
for repository_dependency_tup in repository_dependency_tups:
if repository_dependency_tup not in repository_dependencies:
repository_dependencies.append(repository_dependency_tup)
repository_dependencies_dict["repository_dependencies"] = repository_dependencies
else:
repository_dependencies_dict = dict(
description=description, repository_dependencies=repository_dependency_tups
)
if repository_dependencies_dict:
if is_valid:
metadata["repository_dependencies"] = repository_dependencies_dict
else:
metadata["invalid_repository_dependencies"] = repository_dependencies_dict
return metadata
[docs]class GalaxyMetadataGenerator(BaseMetadataGenerator):
"""A MetadataGenerator building on Galaxy's app and repository constructs."""
app: InstallationTarget
repository: Optional[ToolShedRepository] # type:ignore[assignment]
[docs] def __init__(
self,
app: InstallationTarget,
repository=None,
changeset_revision: Optional[str] = None,
repository_clone_url: Optional[str] = None,
shed_config_dict: Optional[Dict[str, Any]] = None,
relative_install_dir=None,
repository_files_dir=None,
resetting_all_metadata_on_repository=False,
updating_installed_repository=False,
persist=False,
metadata_dict=None,
user=None,
):
self.app = app
self.user = user
self.repository = repository
if changeset_revision is None and self.repository is not None:
self.changeset_revision = self.repository.changeset_revision
else:
self.changeset_revision = changeset_revision
if repository_clone_url is None and self.repository is not None:
self.repository_clone_url = generate_clone_url_for_installed_repository(self.app, self.repository)
else:
self.repository_clone_url = repository_clone_url
if shed_config_dict is None:
if self.repository is not None:
self.shed_config_dict = self.repository.get_shed_config_dict(self.app)
else:
self.shed_config_dict = {}
else:
self.shed_config_dict = shed_config_dict
if relative_install_dir is None and self.repository is not None:
tool_path, relative_install_dir = self.repository.get_tool_relative_path(self.app)
if repository_files_dir is None and self.repository is not None:
repository_files_dir = self.repository.repo_files_directory(self.app)
if metadata_dict is None:
# Shed related tool panel configs are only relevant to Galaxy.
self.metadata_dict = {"shed_config_filename": self.shed_config_dict.get("config_filename", None)}
else:
self.metadata_dict = metadata_dict
self.relative_install_dir = relative_install_dir
self.repository_files_dir = repository_files_dir
self.resetting_all_metadata_on_repository = resetting_all_metadata_on_repository
self.updating_installed_repository = updating_installed_repository
self.persist = persist
self.invalid_file_tups = []
[docs] def initial_metadata_dict(self) -> Dict[str, Any]:
# Shed related tool panel configs are only relevant to Galaxy.
metadata_dict = {"shed_config_filename": self.shed_config_dict.get("config_filename")}
return metadata_dict
[docs] def set_repository(
self, repository, relative_install_dir: Optional[str] = None, changeset_revision: Optional[str] = None
):
self.repository = repository
if relative_install_dir is None and self.repository is not None:
tool_path, relative_install_dir = self.repository.get_tool_relative_path(self.app)
if changeset_revision is None and self.repository is not None:
self.set_changeset_revision(self.repository.changeset_revision)
else:
self.set_changeset_revision(changeset_revision)
self.shed_config_dict = repository.get_shed_config_dict(self.app)
self._reset_attributes_after_repository_update(relative_install_dir)
[docs] def handle_repository_elem(self, repository_elem, only_if_compiling_contained_td=False) -> HandleResultT:
"""
Process the received repository_elem which is a <repository> tag either from a
repository_dependencies.xml file or a tool_dependencies.xml file. If the former,
we're generating repository dependencies metadata for a repository in the Tool Shed.
If the latter, we're generating package dependency metadata within Galaxy or the
Tool Shed.
"""
is_valid = True
error_message = ""
toolshed = repository_elem.get("toolshed", None)
name = repository_elem.get("name", None)
owner = repository_elem.get("owner", None)
changeset_revision = repository_elem.get("changeset_revision", None)
prior_installation_required = str(repository_elem.get("prior_installation_required", False))
repository_dependency_tup = [
toolshed,
name,
owner,
changeset_revision,
prior_installation_required,
str(only_if_compiling_contained_td),
]
if self.updating_installed_repository:
pass
else:
# We're installing a repository into Galaxy, so make sure its contained repository
# dependency definition is valid.
if toolshed is None or name is None or owner is None or changeset_revision is None:
# Several packages exist in the Tool Shed that contain invalid repository
# definitions, but will still install. We will report these errors to the
# installing user. Previously, we would:
# Raise an exception here instead of returning an error_message to keep the
# installation from proceeding. Reaching here implies a bug in the Tool Shed
# framework.
error_message = "Installation encountered an invalid repository dependency definition:\n"
error_message += util.xml_to_string(repository_elem, pretty=True)
log.error(error_message)
return repository_dependency_tup, False, error_message
# Must be present in Galaxy side code I think.
assert toolshed
toolshed = remove_protocol_from_tool_shed_url(toolshed)
# We're in Galaxy. We reach here when we're generating the metadata for a tool
# dependencies package defined for a repository or when we're generating metadata
# for an installed repository. See if we can locate the installed repository via
# the changeset_revision defined in the repository_elem (it may be outdated). If
# we're successful in locating an installed repository with the attributes defined
# in the repository_elem, we know it is valid.
repository = get_repository_for_dependency_relationship(self.app, toolshed, name, owner, changeset_revision)
if repository:
return repository_dependency_tup, is_valid, error_message
else:
# Send a request to the tool shed to retrieve appropriate additional changeset
# revisions with which the repository
# may have been installed.
text = get_updated_changeset_revisions_from_tool_shed(self.app, toolshed, name, owner, changeset_revision)
if text:
updated_changeset_revisions = util.listify(text)
for updated_changeset_revision in updated_changeset_revisions:
repository = get_repository_for_dependency_relationship(
self.app, toolshed, name, owner, updated_changeset_revision
)
if repository:
return repository_dependency_tup, is_valid, error_message
if self.updating_installed_repository:
# The repository dependency was included in an update to the installed
# repository, so it will not yet be installed. Return the tuple for later
# installation.
return repository_dependency_tup, is_valid, error_message
if self.updating_installed_repository:
# The repository dependency was included in an update to the installed repository,
# so it will not yet be installed. Return the tuple for later installation.
return repository_dependency_tup, is_valid, error_message
# Don't generate an error message for missing repository dependencies that are required
# only if compiling the dependent repository's tool dependency.
if not only_if_compiling_contained_td:
# We'll currently default to setting the repository dependency definition as invalid
# if an installed repository cannot be found. This may not be ideal because the tool
# shed may have simply been inaccessible when metadata was being generated for the
# installed tool shed repository.
error_message = f"Ignoring invalid repository dependency definition for tool shed {toolshed}, name {name}, owner {owner}, "
error_message += f"changeset revision {changeset_revision}."
log.debug(error_message)
is_valid = False
return repository_dependency_tup, is_valid, error_message
return repository_dependency_tup, is_valid, error_message
def _get_readme_file_names(repository_name: str) -> List[str]:
"""Return a list of file names that will be categorized as README files for the received repository_name."""
readme_files = ["readme", "read_me", "install"]
valid_filenames = [f"{f}.txt" for f in readme_files]
valid_filenames.extend([f"{f}.rst" for f in readme_files])
valid_filenames.extend(readme_files)
valid_filenames.append(f"{repository_name}.txt")
valid_filenames.append(f"{repository_name}.rst")
return valid_filenames