Source code for galaxy.model.metadata
"""
Galaxy Metadata
"""
import copy
import json
import logging
import os
import sys
import tempfile
import weakref
from collections import OrderedDict
from collections.abc import (
Iterator,
Mapping,
)
from os.path import abspath
from typing import (
Any,
Optional,
TYPE_CHECKING,
Union,
)
from sqlalchemy import select
from sqlalchemy.orm import object_session
from sqlalchemy.orm.attributes import flag_modified
import galaxy.model
from galaxy.security.object_wrapper import sanitize_lists_to_string
from galaxy.util import (
form_builder,
listify,
string_as_bool,
stringify_dictionary_keys,
unicodify,
)
from galaxy.util.json import safe_dumps
if TYPE_CHECKING:
from sqlalchemy.orm import scoped_session
from galaxy.model import DatasetInstance
from galaxy.model.none_like import NoneDataset
from galaxy.model.store import SessionlessContext
log = logging.getLogger(__name__)
STATEMENTS = "__galaxy_statements__" # this is the name of the property in a Datatype class where new metadata spec element Statements are stored
[docs]
class Statement:
"""
This class inserts its target into a list in the surrounding
class. the data.Data class has a metaclass which executes these
statements. This is how we shove the metadata element spec into
the class.
"""
def __call__(self, *args, **kwargs):
# get the locals dictionary of the frame object one down in the call stack (i.e. the Datatype class calling MetadataElement)
class_locals = sys._getframe(1).f_locals
# get and set '__galaxy_statements__' to an empty list if not in locals dict
statements = class_locals.setdefault(STATEMENTS, [])
# add Statement containing info to populate a MetadataElementSpec
statements.append((self, args, kwargs))
[docs]
@classmethod
def process(cls, element):
for statement, args, kwargs in getattr(element, STATEMENTS, []):
statement.target(
element, *args, **kwargs
) # statement.target is MetadataElementSpec, element is a Datatype class
[docs]
class MetadataCollection(Mapping):
"""
MetadataCollection is not a collection at all, but rather a proxy
to the real metadata which is stored as a Dictionary. This class
handles processing the metadata elements when they are set and
retrieved, returning default values in cases when metadata is not set.
"""
[docs]
def __init__(
self,
parent: Union["DatasetInstance", "NoneDataset"],
session: Optional[Union["scoped_session", "SessionlessContext"]] = None,
) -> None:
self.parent = parent
self._session = session
# initialize dict if needed
assert self.parent is not None
if self.parent._metadata is None:
self.parent._metadata = {}
@property
def parent(self) -> Union["DatasetInstance", "NoneDataset", None]:
if "_parent" in self.__dict__:
return self.__dict__["_parent"]()
return None
@parent.setter
def parent(self, parent: Union["DatasetInstance", "NoneDataset"]) -> None:
# use weakref to prevent a circular reference interfering with garbage
# collection: hda/lda (parent) <--> MetadataCollection (self) ; needs to be
# hashable, so cannot use proxy.
self.__dict__["_parent"] = weakref.ref(parent)
@property
def spec(self):
return self.parent.datatype.metadata_spec
def _object_session(self, item):
return self._session if self._session else object_session(item)
def __iter__(self) -> Iterator[Any]:
yield from self.spec.keys()
def __getitem__(self, key):
try:
self.__getattribute__(key)
except AttributeError:
try:
return self.__getattr__(key)
except Exception:
raise KeyError
# `key` is an attribute of this instance, not some metadata: raise
# KeyError to prevent e.g. `'items' in dataset.metadata` from returning
# True
# Not doing this would also break Cheetah's NameMapper._valueForName()
# since dataset.metadata['items'] would be None
raise KeyError
def __len__(self):
return len(self.spec)
def __str__(self):
return dict(self.items()).__str__()
def __bool__(self):
return bool(self.parent._metadata)
__nonzero__ = __bool__
def __getattr__(self, name):
if name in self.spec:
if name in self.parent._metadata:
return self.spec[name].wrap(self.parent._metadata[name], self._object_session(self.parent))
return self.spec[name].wrap(self.spec[name].default, self._object_session(self.parent))
if name in self.parent._metadata:
return self.parent._metadata[name]
# Instead of raising an AttributeError for non-existing metadata, we return None
return None
def __setattr__(self, name, value):
# Workaround for properties like "parent"
class_attr = getattr(self.__class__, name, None)
if isinstance(class_attr, property):
if class_attr.fset is None:
raise AttributeError(f"Property {name} does not have a setter")
class_attr.fset(self, value)
elif name == "_session":
super().__setattr__(name, value)
else:
if name in self.spec:
self.parent._metadata[name] = self.spec[name].unwrap(value)
else:
self.parent._metadata[name] = value
flag_modified(self.parent, "_metadata")
[docs]
def remove_key(self, name):
if name in self.parent._metadata:
del self.parent._metadata[name]
else:
log.info(f"Attempted to delete invalid key '{name}' from MetadataCollection")
[docs]
def element_is_set(self, name) -> bool:
"""
check if the meta data with the given name is set, i.e.
- if the such a metadata actually exists and
- if its value differs from no_value
:param name: the name of the metadata element
:returns: True if the value differes from the no_value
False if its equal of if no metadata with the name is specified
"""
meta_val = self[name]
if self.parent is None:
return False
try:
meta_spec = self.parent.metadata.spec[name]
except KeyError:
log.debug(f"No metadata element with name '{name}' found")
return False
return meta_val != meta_spec.no_value
[docs]
def get_metadata_parameter(self, name, **kwd):
if name in self.spec:
field = self.spec[name].param.get_field(getattr(self, name), self, None, **kwd)
field.value = getattr(self, name)
return field
[docs]
def make_dict_copy(self, to_copy):
"""Makes a deep copy of input iterable to_copy according to self.spec"""
rval = {}
for key, value in to_copy.items():
if key in self.spec:
rval[key] = self.spec[key].param.make_copy(value, target_context=self, source_context=to_copy)
return rval
@property
def requires_dataset_id(self):
for key in self.spec:
if isinstance(self.spec[key].param, FileParameter):
return True
return False
[docs]
def from_JSON_dict(self, filename=None, path_rewriter=None, json_dict=None):
dataset = self.parent
if filename is not None:
log.debug(f"loading metadata from file for: {dataset.__class__.__name__} {dataset.id}")
with open(filename) as fh:
JSONified_dict = json.load(fh)
elif json_dict is not None:
log.debug(f"loading metadata from dict for: {dataset.__class__.__name__} {dataset.id}")
if isinstance(json_dict, str):
JSONified_dict = json.loads(json_dict)
elif isinstance(json_dict, dict):
JSONified_dict = json_dict
else:
raise ValueError(f"json_dict must be either a dictionary or a string, got {type(json_dict)}.")
else:
raise ValueError("You must provide either a filename or a json_dict")
# We build a dictionary for metadata name / value pairs
# because when we copy MetadataTempFile objects we flush the datasets'
# session, but only include the newly created MetadataFile object.
# If we were to set the metadata elements in the first for loop we'd
# lose all previously set metadata elements
metadata_name_value = {}
for name, spec in self.spec.items():
if name in JSONified_dict:
from_ext_kwds = {}
external_value = JSONified_dict[name]
param = spec.param
if isinstance(param, FileParameter):
from_ext_kwds["path_rewriter"] = path_rewriter
value = param.from_external_value(external_value, dataset, **from_ext_kwds)
metadata_name_value[name] = value
elif name in dataset._metadata:
# if the metadata value is not found in our externally set metadata but it has a value in the 'old'
# metadata associated with our dataset, we'll delete it from our dataset's metadata dict
del dataset._metadata[name]
for name, value in metadata_name_value.items():
dataset._metadata[name] = value
if "__extension__" in JSONified_dict:
dataset.extension = JSONified_dict["__extension__"]
if "__validated_state__" in JSONified_dict:
dataset.validated_state = JSONified_dict["__validated_state__"]
if "__validated_state_message__" in JSONified_dict:
dataset.validated_state_message = JSONified_dict["__validated_state_message__"]
flag_modified(dataset, "_metadata")
[docs]
def to_JSON_dict(self, filename=None):
meta_dict = {}
dataset_meta_dict = self.parent._metadata
for name, spec in self.spec.items():
if name in dataset_meta_dict:
meta_dict[name] = spec.param.to_external_value(dataset_meta_dict[name])
if "__extension__" in dataset_meta_dict:
meta_dict["__extension__"] = dataset_meta_dict["__extension__"]
if "__validated_state__" in dataset_meta_dict:
meta_dict["__validated_state__"] = dataset_meta_dict["__validated_state__"]
if "__validated_state_message__" in dataset_meta_dict:
meta_dict["__validated_state_message__"] = dataset_meta_dict["__validated_state_message__"]
try:
encoded_meta_dict = galaxy.model.custom_types.json_encoder.encode(meta_dict)
except Exception as e:
raise Exception(f"Failed encoding metadata dictionary: {meta_dict}") from e
if filename is None:
return encoded_meta_dict
with open(filename, "w+") as fh:
fh.write(encoded_meta_dict)
def __getstate__(self):
# cannot pickle a weakref item (self._parent), when
# data._metadata_collection is None, it will be recreated on demand
return None
[docs]
class MetadataSpecCollection(OrderedDict):
"""
A simple extension of OrderedDict which allows cleaner access to items
and allows the values to be iterated over directly as if it were a
list. append() is also implemented for simplicity and does not
"append".
"""
def __getattr__(self, name):
if name not in self:
raise AttributeError
return self.get(name)
def __repr__(self):
# force elements to draw with __str__ for sphinx-apidoc
return ", ".join(item.__str__() for item in self.values())
[docs]
class MetadataParameter:
[docs]
def get_field(self, value=None, context=None, other_values=None, **kwd):
context = context or {}
other_values = other_values or {}
return form_builder.TextField(self.spec.name, value=value)
[docs]
def make_copy(self, value, target_context: MetadataCollection, source_context):
return copy.deepcopy(value)
[docs]
@classmethod
def marshal(cls, value):
"""
This method should/can be overridden to convert the incoming
value to whatever type it is supposed to be.
"""
return value
[docs]
def unwrap(self, form_value):
"""
Turns a value into its storable form.
"""
value = self.marshal(form_value)
self.validate(value)
return value
[docs]
def from_external_value(self, value, parent):
"""
Turns a value read from an external dict into its value to be pushed directly into the metadata dict.
"""
return value
[docs]
def to_external_value(self, value):
"""
Turns a value read from a metadata into its value to be pushed directly into the external dict.
"""
return value
[docs]
class MetadataElementSpec:
"""
Defines a metadata element and adds it to the metadata_spec (which
is a MetadataSpecCollection) of datatype.
"""
[docs]
def __init__(
self,
datatype,
name=None,
desc=None,
param=MetadataParameter,
default=None,
no_value=None,
visible=True,
set_in_upload=False,
optional=False,
**kwargs,
):
self.name = name
self.desc = desc or name
self.default = default
self.no_value = no_value
self.visible = visible
self.set_in_upload = set_in_upload
self.optional = optional
# Catch-all, allows for extra attributes to be set
self.__dict__.update(kwargs)
# set up param last, as it uses values set above
self.param = param(self)
# add spec element to the spec
datatype.metadata_spec.append(self)
# Should we validate that non-optional elements have been set ?
# (The answer is yes, but not all datatypes control optionality appropriately at this point.)
# This allows us to check that inherited MetadataElement instances from datatypes that set
# check_required_metadata have been reviewed and considered really required.
self.check_required_metadata = datatype.__dict__.get("check_required_metadata", False)
[docs]
def wrap(self, value, session):
"""
Turns a stored value into its usable form.
"""
return self.param.wrap(value, session)
[docs]
def unwrap(self, value):
"""
Turns an incoming value into its storable form.
"""
return self.param.unwrap(value)
def __str__(self):
# TODO??: assuming param is the class of this MetadataElementSpec - add the plain class name for that
spec_dict = dict(param_class=self.param.__class__.__name__)
spec_dict.update(self.__dict__)
return "{name} ({param_class}): {desc}, defaults to '{default}'".format(**spec_dict)
# create a statement class that, when called,
# will add a new MetadataElementSpec to a class's metadata_spec
MetadataElement = Statement(MetadataElementSpec)
"""
MetadataParameter sub-classes.
"""
[docs]
class SelectParameter(MetadataParameter):
[docs]
def __init__(self, spec):
MetadataParameter.__init__(self, spec)
self.values = self.spec.get("values")
self.multiple = string_as_bool(self.spec.get("multiple"))
[docs]
def to_string(self, value):
if value in [None, []]:
return str(self.spec.no_value)
if not isinstance(value, list):
value = [value]
return ",".join(map(str, value))
[docs]
def get_field(self, value=None, context=None, other_values=None, values=None, **kwd):
context = context or {}
other_values = other_values or {}
field = form_builder.SelectField(self.spec.name, multiple=self.multiple, display=self.spec.get("display"))
if self.values:
value_list = self.values
elif values:
value_list = values
elif value:
value_list = [(v, v) for v in listify(value)]
else:
value_list = []
for val, label in value_list:
try:
if (self.multiple and val in value) or (not self.multiple and val == value):
field.add_option(label, val, selected=True)
else:
field.add_option(label, val, selected=False)
except TypeError:
field.add_option(val, label, selected=False)
return field
[docs]
def wrap(self, value, session):
# do we really need this (wasteful)? - yes because we are not sure that
# all existing selects have been stored previously as lists. Also this
# will handle the case where defaults/no_values are specified and are
# single non-list values.
value = self.marshal(value)
if self.multiple:
return value
elif value:
return value[0] # single select, only return the first value
return None
[docs]
@classmethod
def marshal(cls, value):
# Store select as list, even if single item
if value is None:
return []
if not isinstance(value, list):
return [value]
return value
[docs]
class DBKeyParameter(SelectParameter):
[docs]
def get_field(self, value=None, context=None, other_values=None, values=None, **kwd):
context = context or {}
other_values = other_values or {}
try:
values = kwd["trans"].app.genome_builds.get_genome_build_names(kwd["trans"])
except KeyError:
pass
return super().get_field(value, context, other_values, values, **kwd)
[docs]
def make_copy(self, value, target_context: MetadataCollection, source_context):
value = listify(value)
return super().make_copy(value, target_context, source_context)
[docs]
class RangeParameter(SelectParameter):
[docs]
def __init__(self, spec):
SelectParameter.__init__(self, spec)
# The spec must be set with min and max values
self.min = spec.get("min") or 1
self.max = spec.get("max") or 1
self.step = self.spec.get("step") or 1
[docs]
def get_field(self, value=None, context=None, other_values=None, values=None, **kwd):
context = context or {}
other_values = other_values or {}
if values is None:
values = list(zip(range(self.min, self.max, self.step), range(self.min, self.max, self.step)))
return SelectParameter.get_field(
self, value=value, context=context, other_values=other_values, values=values, **kwd
)
[docs]
@classmethod
def marshal(cls, value):
value = SelectParameter.marshal(value)
values = [int(x) for x in value]
return values
[docs]
class ColumnParameter(RangeParameter):
[docs]
def get_field(self, value=None, context=None, other_values=None, values=None, **kwd):
context = context or {}
other_values = other_values or {}
if values is None and context:
column_range = range(1, (context.columns or 0) + 1, 1)
values = list(zip(column_range, column_range))
return RangeParameter.get_field(
self, value=value, context=context, other_values=other_values, values=values, **kwd
)
[docs]
class PythonObjectParameter(MetadataParameter):
[docs]
def to_string(self, value):
if not value:
return self.spec._to_string(self.spec.no_value)
return self.spec._to_string(value)
[docs]
def get_field(self, value=None, context=None, other_values=None, **kwd):
context = context or {}
other_values = other_values or {}
return form_builder.TextField(self.spec.name, value=self._to_string(value))
[docs]
class FileParameter(MetadataParameter):
[docs]
def to_string(self, value):
if not value:
return str(self.spec.no_value)
return value.get_file_name()
[docs]
def to_safe_string(self, value):
# We do not sanitize file names
return self.to_string(value)
[docs]
def get_field(self, value=None, context=None, other_values=None, **kwd):
context = context or {}
other_values = other_values or {}
return form_builder.TextField(self.spec.name, value=str(value.id))
[docs]
def wrap(self, value, session):
if value is None:
return None
if isinstance(value, galaxy.model.MetadataFile) or isinstance(value, MetadataTempFile):
return value
if isinstance(value, int):
return session.get(galaxy.model.MetadataFile, value)
else:
wrapped_value = session.execute(
select(galaxy.model.MetadataFile).filter_by(uuid=value)
).scalar_one_or_none()
if wrapped_value:
return wrapped_value
else:
# If we've simultaneously copied the dataset and we've changed the datatype on the
# copy we may not have committed the MetadataFile yet, so we need to commit the session.
# TODO: It would be great if we can avoid the commit in the future.
session.commit()
return session.execute(select(galaxy.model.MetadataFile).filter_by(uuid=value)).scalar_one_or_none()
[docs]
def make_copy(self, value, target_context: MetadataCollection, source_context):
session = target_context._object_session(target_context.parent)
value = self.wrap(value, session=session)
if value and not value.id:
# This is a new MetadataFile object, we're not copying to another dataset.
# Just use it.
return self.unwrap(value)
if target_context.parent is None:
return None
target_dataset = target_context.parent.dataset
if value and target_dataset.object_store.exists(target_dataset):
# Only copy MetadataFile if the target dataset has been created in an object store.
# All current datatypes re-generate MetadataFile objects when setting metadata,
# so this would ultimately get overwritten anyway.
new_value = galaxy.model.MetadataFile(dataset=target_context.parent, name=self.spec.name)
session.add(new_value)
try:
new_value.update_from_file(value.get_file_name())
except AssertionError:
tmp_session = session(target_context.parent)
tmp_session.commit()
new_value.update_from_file(value.get_file_name())
return self.unwrap(new_value)
return None
[docs]
@classmethod
def marshal(cls, value):
if isinstance(value, galaxy.model.MetadataFile):
# We want to push value.id to the database, but need to skip this when no session is available,
# as in extended_metadata mode, so there we just accept MetadataFile.
# We will only serialize MetadataFile in this mode and not push to the database, so this is OK.
value = value.id or value
if not isinstance(value, int) and object_session(value):
value = str(value.uuid)
return value
[docs]
def from_external_value(self, value, parent, path_rewriter=None):
"""
Turns a value read from a external dict into its value to be pushed directly into the metadata dict.
"""
if MetadataTempFile.is_JSONified_value(value):
value = MetadataTempFile.from_JSON(value)
if isinstance(value, MetadataTempFile):
mf = parent.metadata.get(self.spec.name, None)
if mf is None:
mf = self.new_file(dataset=parent, **value.kwds)
# Ensure the metadata file gets updated with content
file_name = value.get_file_name()
if path_rewriter:
# Job may have run with a different (non-local) tmp/working
# directory. Correct.
file_name = path_rewriter(file_name)
mf.update_from_file(file_name)
value = str(mf.uuid)
return value
[docs]
def to_external_value(self, value):
"""
Turns a value read from a metadata into its value to be pushed directly into the external dict.
"""
if isinstance(value, galaxy.model.MetadataFile):
value = value.id
elif isinstance(value, MetadataTempFile):
value = MetadataTempFile.to_JSON(value)
return value
[docs]
def new_file(self, dataset=None, metadata_tmp_files_dir=None, **kwds):
# If there is a place to store the file (i.e. an object_store has been bound to
# Dataset) then use a MetadataFile and assume it is accessible. Otherwise use
# a MetadataTempFile.
if getattr(dataset.dataset, "object_store", False) and not metadata_tmp_files_dir:
mf = galaxy.model.MetadataFile(name=self.spec.name, dataset=dataset, **kwds)
sa_session = object_session(dataset)
if sa_session:
sa_session.add(mf)
return mf
else:
# we need to make a tmp file that is accessible to the head node,
# we will be copying its contents into the MetadataFile objects filename after restoring from JSON
# we do not include 'dataset' in the kwds passed, as from_JSON_value() will handle this for us
return MetadataTempFile(metadata_tmp_files_dir=metadata_tmp_files_dir, **kwds)
# This class is used when a database file connection is not available
[docs]
class MetadataTempFile:
tmp_dir = "database/tmp" # this should be overwritten as necessary in calling scripts
[docs]
def __init__(self, metadata_tmp_files_dir=None, **kwds):
self.kwds = kwds
if metadata_tmp_files_dir:
self.tmp_dir = metadata_tmp_files_dir
self._filename = None
[docs]
def get_file_name(self):
if self._filename is None:
# we need to create a tmp file, accessable across all nodes/heads, save the name, and return it
self._filename = abspath(tempfile.NamedTemporaryFile(dir=self.tmp_dir, prefix="metadata_temp_file_").name)
open(self._filename, "wb+") # create an empty file, so it can't be reused using tempfile
return self._filename
[docs]
def to_JSON(self):
return {"__class__": self.__class__.__name__, "filename": self.get_file_name(), "kwds": self.kwds}
[docs]
@classmethod
def from_JSON(cls, json_dict):
# need to ensure our keywords are not unicode
rval = cls(**stringify_dictionary_keys(json_dict["kwds"]))
rval._filename = json_dict["filename"]
return rval
[docs]
@classmethod
def is_JSONified_value(cls, value):
return isinstance(value, dict) and value.get("__class__", None) == cls.__name__
[docs]
@classmethod
def cleanup_from_JSON_dict_filename(cls, filename):
try:
with open(filename) as fh:
for value in json.load(fh).values():
if cls.is_JSONified_value(value):
value = cls.from_JSON(value)
if isinstance(value, cls) and os.path.exists(value.get_file_name()):
log.debug("Cleaning up abandoned MetadataTempFile file: %s", value.get_file_name())
os.unlink(value.get_file_name())
except Exception as e:
log.debug("Failed to cleanup MetadataTempFile temp files from %s: %s", filename, unicodify(e))
__all__ = (
"Statement",
"MetadataElement",
"MetadataCollection",
"MetadataSpecCollection",
"MetadataParameter",
"MetadataElementSpec",
"SelectParameter",
"DBKeyParameter",
"RangeParameter",
"ColumnParameter",
"ColumnTypesParameter",
"ListParameter",
"DictParameter",
"PythonObjectParameter",
"FileParameter",
"MetadataTempFile",
)