Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.model.mapping

"""
This module no longer contains the mapping of data model classes to the
relational database.
The module will be revised during migration from SQLAlchemy Migrate to Alembic.
"""

import logging
from threading import local
from typing import Optional, Type

from sqlalchemy import (
    and_,
    select,
)
from sqlalchemy.orm import class_mapper, object_session, relation

from galaxy import model
from galaxy.model import mapper_registry
from galaxy.model.base import SharedModelMapping
from galaxy.model.migrate.triggers.update_audit_table import install as install_timestamp_triggers
from galaxy.model.orm.engine_factory import build_engine
from galaxy.model.orm.now import now
from galaxy.model.security import GalaxyRBACAgent
from galaxy.model.view.utils import install_views

log = logging.getLogger(__name__)

metadata = mapper_registry.metadata

class_mapper(model.HistoryDatasetCollectionAssociation).add_property(
    "creating_job_associations", relation(model.JobToOutputDatasetCollectionAssociation, viewonly=True))


# Helper methods.
[docs]def db_next_hid(self, n=1): """ db_next_hid( self ) Override __next_hid to generate from the database in a concurrency safe way. Loads the next history ID from the DB and returns it. It also saves the future next_id into the DB. :rtype: int :returns: the next history id """ session = object_session(self) table = self.table if "postgres" not in session.bind.dialect.name: next_hid = select([table.c.hid_counter], table.c.id == model.cached_id(self)).with_for_update().scalar() table.update(table.c.id == self.id).execute(hid_counter=(next_hid + n)) else: stmt = table.update().where(table.c.id == model.cached_id(self)).values(hid_counter=(table.c.hid_counter + n)).returning(table.c.hid_counter) next_hid = session.execute(stmt).scalar() - n return next_hid
model.History._next_hid = db_next_hid # type: ignore def _workflow_invocation_update(self): session = object_session(self) table = self.table now_val = now() stmt = table.update().values(update_time=now_val).where(and_(table.c.id == self.id, table.c.update_time < now_val)) session.execute(stmt) model.WorkflowInvocation.update = _workflow_invocation_update # type: ignore
[docs]class GalaxyModelMapping(SharedModelMapping): security_agent: GalaxyRBACAgent thread_local_log: Optional[local] create_tables: bool User: Type GalaxySession: Type
[docs]def init(file_path, url, engine_options=None, create_tables=False, map_install_models=False, database_query_profiling_proxy=False, object_store=None, trace_logger=None, use_pbkdf2=True, slow_query_log_threshold=0, thread_local_log: Optional[local] = None, log_query_counts=False) -> GalaxyModelMapping: """Connect mappings to the database""" if engine_options is None: engine_options = {} # Connect dataset to the file path model.Dataset.file_path = file_path # Connect dataset to object store model.Dataset.object_store = object_store # Use PBKDF2 password hashing? model.User.use_pbkdf2 = use_pbkdf2 # Load the appropriate db module engine = build_engine(url, engine_options, database_query_profiling_proxy, trace_logger, slow_query_log_threshold, thread_local_log=thread_local_log, log_query_counts=log_query_counts) # Connect the metadata to the database. metadata.bind = engine model_modules = [model] if map_install_models: import galaxy.model.tool_shed_install.mapping # noqa: F401 from galaxy.model import tool_shed_install galaxy.model.tool_shed_install.mapping.init(url=url, engine_options=engine_options, create_tables=create_tables) model_modules.append(tool_shed_install) result = GalaxyModelMapping(model_modules, engine=engine) # Create tables if needed if create_tables: metadata.create_all() install_timestamp_triggers(engine) install_views(engine) result.create_tables = create_tables # load local galaxy security policy result.security_agent = GalaxyRBACAgent(result) result.thread_local_log = thread_local_log return result