Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.model.mapping
"""
This module no longer contains the mapping of data model classes to the
relational database.
The module will be revised during migration from SQLAlchemy Migrate to Alembic.
"""
import logging
from threading import local
from typing import Optional, Type
from galaxy import model
from galaxy.config import GalaxyAppConfiguration
from galaxy.model import mapper_registry
from galaxy.model.base import SharedModelMapping
from galaxy.model.migrate.triggers.update_audit_table import install as install_timestamp_triggers
from galaxy.model.orm.engine_factory import build_engine
from galaxy.model.security import GalaxyRBACAgent
from galaxy.model.view.utils import install_views
log = logging.getLogger(__name__)
metadata = mapper_registry.metadata
[docs]class GalaxyModelMapping(SharedModelMapping):
security_agent: GalaxyRBACAgent
thread_local_log: Optional[local]
create_tables: bool
User: Type
GalaxySession: Type
[docs]def init(file_path, url, engine_options=None, create_tables=False, map_install_models=False,
database_query_profiling_proxy=False, object_store=None, trace_logger=None, use_pbkdf2=True,
slow_query_log_threshold=0, thread_local_log: Optional[local] = None, log_query_counts=False) -> GalaxyModelMapping:
"""Connect mappings to the database"""
if engine_options is None:
engine_options = {}
# Connect dataset to the file path
model.Dataset.file_path = file_path
# Connect dataset to object store
model.Dataset.object_store = object_store
# Use PBKDF2 password hashing?
model.User.use_pbkdf2 = use_pbkdf2
# Load the appropriate db module
engine = build_engine(url, engine_options, database_query_profiling_proxy, trace_logger, slow_query_log_threshold, thread_local_log=thread_local_log, log_query_counts=log_query_counts)
model_modules = [model]
if map_install_models:
import galaxy.model.tool_shed_install.mapping # noqa: F401
from galaxy.model import tool_shed_install
galaxy.model.tool_shed_install.mapping.init(url=url, engine_options=engine_options, create_tables=create_tables)
model_modules.append(tool_shed_install)
result = GalaxyModelMapping(model_modules, engine=engine)
# Create tables if needed
if create_tables:
metadata.create_all(bind=engine)
install_timestamp_triggers(engine)
install_views(engine)
result.create_tables = create_tables
# load local galaxy security policy
result.security_agent = GalaxyRBACAgent(result)
result.thread_local_log = thread_local_log
return result
[docs]def init_models_from_config(config: GalaxyAppConfiguration, map_install_models=False, object_store=None, trace_logger=None):
model = init(
config.file_path,
config.database_connection,
config.database_engine_options,
map_install_models=map_install_models,
database_query_profiling_proxy=config.database_query_profiling_proxy,
object_store=object_store,
trace_logger=trace_logger,
use_pbkdf2=config.get_bool('use_pbkdf2', True),
slow_query_log_threshold=config.slow_query_log_threshold,
thread_local_log=config.thread_local_log,
log_query_counts=config.database_log_query_counts,
)
return model