Warning

This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.model.migrations

import logging
import os
import urllib.parse
from typing import (
    cast,
    Dict,
    Iterable,
    NamedTuple,
    NewType,
    NoReturn,
    Optional,
    Union,
)

import alembic
from alembic import command
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
from alembic.script.base import Script
from sqlalchemy import (
    create_engine,
    MetaData,
    Table,
    text,
)
from sqlalchemy.engine import (
    Connection,
    CursorResult,
    Engine,
)
from sqlalchemy.exc import OperationalError

from galaxy.model import Base as gxy_base
from galaxy.model.database_utils import (
    create_database,
    database_exists,
)
from galaxy.model.mapping import create_additional_database_objects
from galaxy.model.tool_shed_install import Base as tsi_base

ModelId = NewType("ModelId", str)
# These identifiers are used throughout the migrations system to distinquish
# between the two models; they refer to version directories, branch labels, etc.
# (if you rename these, you need to rename branch labels in alembic version directories)
GXY = ModelId("gxy")  # galaxy model identifier
TSI = ModelId("tsi")  # tool_shed_install model identifier

ALEMBIC_TABLE = "alembic_version"
SQLALCHEMYMIGRATE_TABLE = "migrate_version"
SQLALCHEMYMIGRATE_LAST_VERSION_GXY = 180
SQLALCHEMYMIGRATE_LAST_VERSION_TSI = 17
log = logging.getLogger(__name__)


[docs]class DatabaseConfig(NamedTuple): url: str template: str encoding: str
[docs]class NoVersionTableError(Exception): # The database has no version table (neither SQLAlchemy Migrate, nor Alembic), so it is # impossible to automatically determine the state of the database. Manual update required.
[docs] def __init__(self, model: str) -> None: super().__init__(f"Your {model} database has no version table; manual update is required")
[docs]class IncorrectVersionError(Exception): # The database has a SQLAlchemy Migrate version table, but its version is either older or more recent # than {SQLALCHEMYMIGRATE_LAST_VERSION_GXY/TSI}, so it cannot be upgraded with Alembic. # (A more recent version may indicate that something has changed in the database past the point # where we can automatically migrate from SQLAlchemy Migrate to Alembic.) # Manual update required.
[docs] def __init__(self, model: str, expected_version: int) -> None: msg = f"Your {model} database version is incorrect; version {expected_version} is expected. " msg += "Manual update is required. " msg += "Please see documentation: https://docs.galaxyproject.org/en/master/admin/db_migration.html#how-to-handle-migrations-incorrectversionerror" super().__init__(msg)
[docs]class OutdatedDatabaseError(Exception): # The database is under Alembic version control, but is out-of-date. Automatic upgrade possible.
[docs] def __init__(self, model: str, db_version: str, code_version: str) -> None: msg = f"Your {model} database has version {db_version}, but this code expects " msg += f"version {code_version}. " msg += "To upgrade your database, run `manage_db.sh upgrade`. " msg += "For more options (e.g. upgrading/downgrading to a specific version) see instructions in that file. " msg += "Please remember to backup your database before migrating." super().__init__(msg)
[docs]class InvalidModelIdError(Exception):
[docs] def __init__(self, model: str) -> None: super().__init__(f"Invalid model: {model}")
[docs]class RevisionNotFoundError(Exception): # The database has an Alembic version table; however, that table does not contain a revision identifier # for the given model. As a result, it is impossible to determine the state of the database for this model # (gxy or tsi).
[docs] def __init__(self, model: str) -> None: msg = "The database has an alembic version table, but that table does not contain " msg += f"a revision for the {model} model" super().__init__(msg)
[docs]class AlembicManager: """ Alembic operations on one database. """
[docs] @staticmethod def is_at_revision(engine: Engine, revision: Union[str, Iterable[str]]) -> bool: """ True if revision is a subset of the set of version heads stored in the database. """ revision = listify(revision) with engine.connect() as conn: context = MigrationContext.configure(conn) db_version_heads = context.get_current_heads() return set(revision) <= set(db_version_heads)
[docs] def __init__(self, engine: Engine, config_dict: Optional[dict] = None) -> None: self.engine = engine self.alembic_cfg = self._load_config(config_dict) self.script_directory = ScriptDirectory.from_config(self.alembic_cfg) self._db_heads: Optional[Iterable[str]] self._reset_db_heads()
def _load_config(self, config_dict: Optional[dict]) -> Config: alembic_root = os.path.dirname(__file__) _alembic_file = os.path.join(alembic_root, "alembic.ini") config = Config(_alembic_file) url = get_url_string(self.engine) config.set_main_option("sqlalchemy.url", url) if config_dict: for key, value in config_dict.items(): config.set_main_option(key, value) return config
[docs] def stamp_model_head(self, model: ModelId) -> None: """Partial proxy to alembic's stamp command.""" command.stamp(self.alembic_cfg, f"{model}@head") self._reset_db_heads()
[docs] def stamp_revision(self, revision: Union[str, Iterable[str]]) -> None: """Partial proxy to alembic's stamp command.""" command.stamp(self.alembic_cfg, revision) # type: ignore[arg-type] # https://alembic.sqlalchemy.org/en/latest/api/commands.html#alembic.command.stamp.params.revision self._reset_db_heads()
[docs] def upgrade(self, model: ModelId) -> None: """Partial proxy to alembic's upgrade command.""" # This works with or without an existing alembic version table. command.upgrade(self.alembic_cfg, f"{model}@head") self._reset_db_heads()
[docs] def is_under_version_control(self, model: ModelId) -> bool: """ True if the database version table contains a revision that corresponds to a revision in the script directory that has branch label `model`. """ if self.db_heads: for db_head in self.db_heads: try: revision = self._get_revision(db_head) if revision and model in revision.branch_labels: log.info(f"The version of the {model} model in the database is {db_head}.") return True except alembic.util.exc.CommandError: # No need to raise exception. log.info(f"Revision {db_head} does not exist in the script directory.") return False
[docs] def is_up_to_date(self, model: ModelId) -> bool: """ True if the head revision for `model` in the script directory is stored in the database. """ head_id = self.get_model_script_head(model) return bool(self.db_heads and head_id in self.db_heads)
[docs] def get_model_db_head(self, model: ModelId) -> Optional[str]: return self._get_head_revision(model, cast(Iterable[str], self.db_heads))
[docs] def get_model_script_head(self, model: ModelId) -> Optional[str]: return self._get_head_revision(model, self.script_directory.get_heads())
def _get_head_revision(self, model: ModelId, heads: Iterable[str]) -> Optional[str]: for head in heads: revision = self._get_revision(head) if revision and model in revision.branch_labels: return head return None @property def db_heads(self) -> Iterable: if self._db_heads is None: # Explicitly check for None: could be an empty tuple. with self.engine.connect() as conn: context: MigrationContext = MigrationContext.configure(conn) self._db_heads = context.get_current_heads() # We get a tuple as long as we use branches. Otherwise, we'd get a single value. # listify() is a safeguard in case we stop using branches. self._db_heads = listify(self._db_heads) return self._db_heads def _get_revision(self, revision_id: str) -> Optional[Script]: try: return self.script_directory.get_revision(revision_id) except alembic.util.exc.CommandError as e: log.error(f"Revision {revision_id} not found in the script directory") raise e def _reset_db_heads(self) -> None: self._db_heads = None
[docs]class DatabaseStateCache: """ Snapshot of database state. """
[docs] def __init__(self, engine: Engine) -> None: self._load_db(engine)
@property def tables(self) -> Dict[str, Table]: return self.db_metadata.tables
[docs] def is_database_empty(self) -> bool: return not bool(self.db_metadata.tables)
[docs] def contains_only_kombu_tables(self) -> bool: return metadata_contains_only_kombu_tables(self.db_metadata)
[docs] def has_alembic_version_table(self) -> bool: return ALEMBIC_TABLE in self.db_metadata.tables
[docs] def has_sqlalchemymigrate_version_table(self) -> bool: return SQLALCHEMYMIGRATE_TABLE in self.db_metadata.tables
[docs] def is_last_sqlalchemymigrate_version(self, last_version: int) -> bool: return self.sqlalchemymigrate_version == last_version
def _load_db(self, engine: Engine) -> None: with engine.connect() as conn: self.db_metadata = self._load_db_metadata(conn) self.sqlalchemymigrate_version = self._load_sqlalchemymigrate_version(conn) def _load_db_metadata(self, conn: Connection) -> MetaData: metadata = MetaData() metadata.reflect(bind=conn) return metadata def _load_sqlalchemymigrate_version(self, conn: Connection) -> CursorResult: if self.has_sqlalchemymigrate_version_table(): sql = text(f"select version from {SQLALCHEMYMIGRATE_TABLE}") return conn.execute(sql).scalar()
[docs]def metadata_contains_only_kombu_tables(metadata: MetaData) -> bool: """ Return True if metadata contains only kombu-related tables. (ref: https://github.com/galaxyproject/galaxy/issues/13689) """ return all(table.startswith("kombu_") or table.startswith("sqlite_") for table in metadata.tables.keys())
[docs]def verify_databases_via_script( gxy_config: DatabaseConfig, tsi_config: DatabaseConfig, is_auto_migrate: bool = False, ) -> None: # This function serves a use case when an engine has not been created yet # (e.g. when called from a script). gxy_engine = create_engine(gxy_config.url) tsi_engine = None if tsi_config.url and tsi_config.url != gxy_config.url: tsi_engine = create_engine(tsi_config.url) verify_databases( gxy_engine, gxy_config.template, gxy_config.encoding, tsi_engine, tsi_config.template, tsi_config.encoding, is_auto_migrate, ) gxy_engine.dispose() if tsi_engine: tsi_engine.dispose()
[docs]def verify_databases( gxy_engine: Engine, gxy_template: Optional[str], gxy_encoding: Optional[str], tsi_engine: Optional[Engine], tsi_template: Optional[str], tsi_encoding: Optional[str], is_auto_migrate: bool, ) -> None: # Verify gxy model. gxy_verifier = DatabaseStateVerifier(gxy_engine, GXY, gxy_template, gxy_encoding, is_auto_migrate) gxy_verifier.run() # New database = one engine or same engine, and gxy model has just been initialized. is_new_database = (not tsi_engine or gxy_engine == tsi_engine) and gxy_verifier.is_new_database # Determine engine for tsi model. tsi_engine = tsi_engine or gxy_engine # Verify tsi model model. tsi_verifier = DatabaseStateVerifier(tsi_engine, TSI, tsi_template, tsi_encoding, is_auto_migrate, is_new_database) tsi_verifier.run()
[docs]class DatabaseStateVerifier:
[docs] def __init__( self, engine: Engine, model: ModelId, database_template: Optional[str], database_encoding: Optional[str], is_auto_migrate: bool, is_new_database: Optional[bool] = False, ) -> None: self.engine = engine self.model = model self.database_template = database_template self.database_encoding = database_encoding self._is_auto_migrate = is_auto_migrate self.metadata = get_metadata(model) # True if database has been initialized for another model. self.is_new_database = is_new_database # These values may or may not be required, so do a lazy load. self._db_state: Optional[DatabaseStateCache] = None self._alembic_manager: Optional[AlembicManager] = None
@property def is_auto_migrate(self) -> bool: return self._is_auto_migrate @property def db_state(self) -> DatabaseStateCache: if not self._db_state: self._db_state = DatabaseStateCache(engine=self.engine) return self._db_state @property def alembic_manager(self) -> AlembicManager: if not self._alembic_manager: self._alembic_manager = get_alembic_manager(self.engine) return self._alembic_manager
[docs] def run(self) -> None: if self._handle_no_database(): return if self._handle_empty_database(): return self._handle_nonempty_database()
def _handle_no_database(self) -> bool: url = get_url_string(self.engine) try: # connect using the database name from the sqlalchemy engine exists = database_exists(url, database=self.engine.url.database) except OperationalError: exists = False if not exists: self._create_database(url) self._initialize_database() return True return False def _handle_empty_database(self) -> bool: if self.is_new_database or self._is_database_empty() or self._contains_only_kombu_tables(): self._initialize_database() return True return False def _handle_nonempty_database(self) -> None: if self._has_alembic_version_table(): self._handle_with_alembic() elif self._has_sqlalchemymigrate_version_table(): if self._is_last_sqlalchemymigrate_version(): self._try_to_upgrade() else: self._handle_wrong_sqlalchemymigrate_version() else: self._handle_no_version_table() def _handle_with_alembic(self) -> None: am = self.alembic_manager model = self._get_model_name() if am.is_up_to_date(self.model): log.info(f"Your {model} database is up-to-date") return if am.is_under_version_control(self.model): # Model is under version control, but outdated. Try to upgrade. self._try_to_upgrade() else: # Model is not under version control. We fail for the gxy model because we can't guess # what the state of the database is if there is an alembic table without a gxy revision. # For the tsi model, we can guess. If there are no tsi tables in the database, we treat it # as a new install; but if there is at least one table, we assume it is the same version as gxy. # See more details in this PR description: https://github.com/galaxyproject/galaxy/pull/13108 if self.model == TSI: if self._no_model_tables_exist(): self._initialize_database() else: self._try_to_upgrade() else: raise RevisionNotFoundError(model) def _try_to_upgrade(self): am = self.alembic_manager model = self._get_model_name() code_version = am.get_model_script_head(self.model) if not self.is_auto_migrate: db_version = am.get_model_db_head(self.model) raise OutdatedDatabaseError(model, cast(str, db_version), cast(str, code_version)) else: log.info("Database is being upgraded to current version: {code_version}") am.upgrade(self.model) return def _get_model_name(self) -> str: return "galaxy" if self.model == GXY else "tool shed install" def _no_model_tables_exist(self) -> bool: # True if there are no tables from `self.model` in the database. db_tables = self.db_state.tables for tablename in set(self.metadata.tables) - {ALEMBIC_TABLE}: if tablename in db_tables: return False return True def _create_database(self, url: str) -> None: create_kwds = {} message = f"Creating database for URI [{url}]" if self.database_template: message += f" from template [{self.database_template}]" create_kwds["template"] = self.database_template if self.database_encoding: message += f" with encoding [{self.database_encoding}]" create_kwds["encoding"] = self.database_encoding log.info(message) create_database(url, **create_kwds) def _initialize_database(self) -> None: load_metadata(self.metadata, self.engine) if self.model == GXY: self._create_additional_database_objects() self.alembic_manager.stamp_model_head(self.model) self.is_new_database = True def _create_additional_database_objects(self) -> None: create_additional_database_objects(self.engine) def _is_database_empty(self) -> bool: return self.db_state.is_database_empty() def _contains_only_kombu_tables(self) -> bool: return self.db_state.contains_only_kombu_tables() def _has_alembic_version_table(self) -> bool: return self.db_state.has_alembic_version_table() def _has_sqlalchemymigrate_version_table(self) -> bool: return self.db_state.has_sqlalchemymigrate_version_table() def _is_last_sqlalchemymigrate_version(self) -> bool: last_version = get_last_sqlalchemymigrate_version(self.model) return self.db_state.is_last_sqlalchemymigrate_version(last_version) def _handle_no_version_table(self) -> NoReturn: model = self._get_model_name() raise NoVersionTableError(model) def _handle_wrong_sqlalchemymigrate_version(self) -> NoReturn: if self.model == GXY: expected_version = SQLALCHEMYMIGRATE_LAST_VERSION_GXY else: expected_version = SQLALCHEMYMIGRATE_LAST_VERSION_TSI model = self._get_model_name() raise IncorrectVersionError(model, expected_version)
[docs]def get_last_sqlalchemymigrate_version(model: ModelId) -> int: if model == GXY: return SQLALCHEMYMIGRATE_LAST_VERSION_GXY elif model == TSI: return SQLALCHEMYMIGRATE_LAST_VERSION_TSI else: raise InvalidModelIdError(model)
[docs]def get_url_string(engine: Engine) -> str: db_url = engine.url.render_as_string(hide_password=False) return urllib.parse.unquote(db_url)
[docs]def get_alembic_manager(engine: Engine) -> AlembicManager: return AlembicManager(engine)
[docs]def get_metadata(model: ModelId) -> MetaData: if model == GXY: return get_gxy_metadata() elif model == TSI: return get_tsi_metadata() else: raise InvalidModelIdError(model)
[docs]def load_metadata(metadata: MetaData, engine: Engine) -> None: with engine.connect() as conn: metadata.create_all(bind=conn)
[docs]def listify(data: Union[str, Iterable[str]]) -> Iterable[str]: if not isinstance(data, (list, tuple)): return [cast(str, data)] return data
[docs]def get_gxy_metadata() -> MetaData: return gxy_base.metadata
[docs]def get_tsi_metadata() -> MetaData: return tsi_base.metadata