Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.model.migrations

import logging
import os
from typing import (
    cast,
    Iterable,
    NamedTuple,
    NewType,
    NoReturn,
    Optional,
)

import alembic
from alembic import command
from sqlalchemy import (
    create_engine,
    MetaData,
)
from sqlalchemy.engine import Engine
from sqlalchemy.exc import OperationalError

from galaxy.model import Base as gxy_base
from galaxy.model.database_utils import (
    create_database,
    database_exists,
)
from galaxy.model.mapping import create_additional_database_objects
from galaxy.model.migrations.base import (
    ALEMBIC_TABLE,
    BaseAlembicManager,
    DatabaseStateCache,
    get_url_string,
    load_metadata,
)
from galaxy.model.migrations.exceptions import (
    IncorrectSAMigrateVersionError,
    NoVersionTableError,
    OutdatedDatabaseError,
    RevisionNotFoundError,
    SAMigrateError,
)
from galaxy.model.tool_shed_install import Base as tsi_base

ModelId = NewType("ModelId", str)
# These identifiers are used throughout the migrations system to distinquish
# between the two models; they refer to version directories, branch labels, etc.
# (if you rename these, you need to rename branch labels in alembic version directories)
GXY = ModelId("gxy")  # galaxy model identifier
TSI = ModelId("tsi")  # tool_shed_install model identifier

SQLALCHEMYMIGRATE_LAST_VERSION_GXY = 180
SQLALCHEMYMIGRATE_LAST_VERSION_TSI = 17
log = logging.getLogger(__name__)


[docs]class DatabaseConfig(NamedTuple): url: str template: str encoding: str
[docs]class InvalidModelIdError(Exception):
[docs] def __init__(self, model: str) -> None: super().__init__(f"Invalid model: {model}")
[docs]class AlembicManager(BaseAlembicManager): def _get_alembic_root(self): return os.path.dirname(__file__)
[docs] def stamp_model_head(self, model: ModelId) -> None: """Partial proxy to alembic's stamp command.""" revision = f"{model}@head" self.stamp_revision(revision)
[docs] def upgrade(self, model: ModelId) -> None: """Partial proxy to alembic's upgrade command.""" # This works with or without an existing alembic version table. revision = f"{model}@head" command.upgrade(self.alembic_cfg, revision) self._reset_db_heads()
[docs] def is_up_to_date(self, model: ModelId) -> bool: """ True if the head revision for `model` in the script directory is stored in the database. """ head_id = self.get_model_script_head(model) return bool(self.db_heads and head_id in self.db_heads)
[docs] def is_under_version_control(self, model: ModelId) -> bool: """ True if the database version table contains a revision that corresponds to a revision in the script directory that has branch label `model`. """ if self.db_heads: for db_head in self.db_heads: try: revision = self._get_revision(db_head) if revision and model in revision.branch_labels: log.info(f"The version of the {model} model in the database is {db_head}.") return True except alembic.util.exc.CommandError: # No need to raise exception. log.info(f"Revision {db_head} does not exist in the script directory.") return False
[docs] def get_model_db_head(self, model: ModelId) -> Optional[str]: return self._get_head_revision(model, cast(Iterable[str], self.db_heads))
[docs] def get_model_script_head(self, model: ModelId) -> Optional[str]: return self._get_head_revision(model, self.script_directory.get_heads())
def _get_head_revision(self, model: ModelId, heads: Iterable[str]) -> Optional[str]: for head in heads: revision = self._get_revision(head) if revision and model in revision.branch_labels: return head return None
[docs]def verify_databases_via_script( gxy_config: DatabaseConfig, tsi_config: DatabaseConfig, is_auto_migrate: bool = False, ) -> None: # This function serves a use case when an engine has not been created yet # (e.g. when called from a script). gxy_engine = create_engine(gxy_config.url) tsi_engine = None if tsi_config.url and tsi_config.url != gxy_config.url: tsi_engine = create_engine(tsi_config.url) verify_databases( gxy_engine, gxy_config.template, gxy_config.encoding, tsi_engine, tsi_config.template, tsi_config.encoding, is_auto_migrate, ) gxy_engine.dispose() if tsi_engine: tsi_engine.dispose()
[docs]def verify_databases( gxy_engine: Engine, gxy_template: Optional[str], gxy_encoding: Optional[str], tsi_engine: Optional[Engine], tsi_template: Optional[str], tsi_encoding: Optional[str], is_auto_migrate: bool, ) -> None: # Verify gxy model. gxy_verifier = DatabaseStateVerifier(gxy_engine, GXY, gxy_template, gxy_encoding, is_auto_migrate) gxy_verifier.run() # New database = one engine or same engine, and gxy model has just been initialized. is_new_database = (not tsi_engine or gxy_engine == tsi_engine) and gxy_verifier.is_new_database # Determine engine for tsi model. tsi_engine = tsi_engine or gxy_engine # Verify tsi model model. tsi_verifier = DatabaseStateVerifier(tsi_engine, TSI, tsi_template, tsi_encoding, is_auto_migrate, is_new_database) tsi_verifier.run()
[docs]class DatabaseStateVerifier:
[docs] def __init__( self, engine: Engine, model: ModelId, database_template: Optional[str], database_encoding: Optional[str], is_auto_migrate: bool, is_new_database: Optional[bool] = False, ) -> None: self.engine = engine self.model = model self.database_template = database_template self.database_encoding = database_encoding self._is_auto_migrate = is_auto_migrate self.metadata = get_metadata(model) # True if database has been initialized for another model. self.is_new_database = is_new_database # These values may or may not be required, so do a lazy load. self._db_state: Optional[DatabaseStateCache] = None self._alembic_manager: Optional[AlembicManager] = None
@property def is_auto_migrate(self) -> bool: return self._is_auto_migrate @property def db_state(self) -> DatabaseStateCache: if not self._db_state: self._db_state = DatabaseStateCache(engine=self.engine) return self._db_state @property def alembic_manager(self) -> AlembicManager: if not self._alembic_manager: self._alembic_manager = get_alembic_manager(self.engine) return self._alembic_manager
[docs] def run(self) -> None: if self._handle_no_database(): return if self._handle_empty_database(): return self._handle_nonempty_database()
def _handle_no_database(self) -> bool: url = get_url_string(self.engine) try: # connect using the database name from the sqlalchemy engine exists = database_exists(url, database=self.engine.url.database) except OperationalError: exists = False if not exists: self._create_database(url) self._initialize_database() return True return False def _handle_empty_database(self) -> bool: if self.is_new_database or self._is_database_empty() or self._contains_only_kombu_tables(): self._initialize_database() return True return False def _handle_nonempty_database(self) -> None: if self._has_alembic_version_table(): self._handle_with_alembic() elif self._has_sqlalchemymigrate_version_table(): if self._is_last_sqlalchemymigrate_version(): self._try_to_upgrade(has_alembic=False) else: self._handle_wrong_sqlalchemymigrate_version() else: self._handle_no_version_table() def _handle_with_alembic(self) -> None: am = self.alembic_manager model = self._get_model_name() if am.is_up_to_date(self.model): log.info(f"Your {model} database is up-to-date") return if am.is_under_version_control(self.model): # Model is under version control, but outdated. Try to upgrade. self._try_to_upgrade() else: # Model is not under version control. We fail for the gxy model because we can't guess # what the state of the database is if there is an alembic table without a gxy revision. # For the tsi model, we can guess. If there are no tsi tables in the database, we treat it # as a new install; but if there is at least one table, we assume it is the same version as gxy. # See more details in this PR description: https://github.com/galaxyproject/galaxy/pull/13108 if self.model == TSI: if self._no_model_tables_exist(): self._initialize_database() else: self._try_to_upgrade() else: raise RevisionNotFoundError(model) def _try_to_upgrade(self, has_alembic=True): am = self.alembic_manager model = self._get_model_name() code_version = am.get_model_script_head(self.model) if not self.is_auto_migrate: db_version = am.get_model_db_head(self.model) script = "manage_db.sh upgrade" if has_alembic: raise OutdatedDatabaseError(model, cast(str, db_version), cast(str, code_version), script) else: raise SAMigrateError(model, script) else: log.info("Database is being upgraded to current version: {code_version}") am.upgrade(self.model) return def _get_model_name(self) -> str: return "galaxy" if self.model == GXY else "tool shed install" def _no_model_tables_exist(self) -> bool: # True if there are no tables from `self.model` in the database. db_tables = self.db_state.tables for tablename in set(self.metadata.tables) - {ALEMBIC_TABLE}: if tablename in db_tables: return False return True def _create_database(self, url: str) -> None: create_kwds = {} message = f"Creating database for URI [{url}]" if self.database_template: message += f" from template [{self.database_template}]" create_kwds["template"] = self.database_template if self.database_encoding: message += f" with encoding [{self.database_encoding}]" create_kwds["encoding"] = self.database_encoding log.info(message) create_database(url, **create_kwds) def _initialize_database(self) -> None: load_metadata(self.metadata, self.engine) if self.model == GXY: self._create_additional_database_objects() self.alembic_manager.stamp_model_head(self.model) self.is_new_database = True def _create_additional_database_objects(self) -> None: create_additional_database_objects(self.engine) def _is_database_empty(self) -> bool: return self.db_state.is_database_empty() def _contains_only_kombu_tables(self) -> bool: return self.db_state.contains_only_kombu_tables() def _has_alembic_version_table(self) -> bool: return self.db_state.has_alembic_version_table() def _has_sqlalchemymigrate_version_table(self) -> bool: return self.db_state.has_sqlalchemymigrate_version_table() def _is_last_sqlalchemymigrate_version(self) -> bool: last_version = get_last_sqlalchemymigrate_version(self.model) return self.db_state.is_last_sqlalchemymigrate_version(last_version) def _handle_no_version_table(self) -> NoReturn: model = self._get_model_name() raise NoVersionTableError(model) def _handle_wrong_sqlalchemymigrate_version(self) -> NoReturn: if self.model == GXY: expected_version = SQLALCHEMYMIGRATE_LAST_VERSION_GXY else: expected_version = SQLALCHEMYMIGRATE_LAST_VERSION_TSI model = self._get_model_name() raise IncorrectSAMigrateVersionError(model, expected_version)
[docs]def get_last_sqlalchemymigrate_version(model: ModelId) -> int: if model == GXY: return SQLALCHEMYMIGRATE_LAST_VERSION_GXY elif model == TSI: return SQLALCHEMYMIGRATE_LAST_VERSION_TSI else: raise InvalidModelIdError(model)
[docs]def get_alembic_manager(engine: Engine) -> AlembicManager: return AlembicManager(engine)
[docs]def get_metadata(model: ModelId) -> MetaData: if model == GXY: return get_gxy_metadata() elif model == TSI: return get_tsi_metadata() else: raise InvalidModelIdError(model)
[docs]def get_gxy_metadata() -> MetaData: return gxy_base.metadata
[docs]def get_tsi_metadata() -> MetaData: return tsi_base.metadata