Source code for galaxy.model.migrations

import logging
import os
import urllib.parse
from typing import (
    cast,
    Dict,
    Iterable,
    NamedTuple,
    NewType,
    NoReturn,
    Optional,
    Union,
)

import alembic
from alembic import command
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
from alembic.script.base import Script
from sqlalchemy import (
    create_engine,
    MetaData,
    Table,
    text,
)
from sqlalchemy.engine import (
    Connection,
    CursorResult,
    Engine,
)
from sqlalchemy.exc import OperationalError

from galaxy.model import Base as gxy_base
from galaxy.model.database_utils import (
    create_database,
    database_exists,
)
from galaxy.model.mapping import create_additional_database_objects
from galaxy.model.tool_shed_install import Base as tsi_base

ModelId = NewType("ModelId", str)
# These identifiers are used throughout the migrations system to distinquish
# between the two models; they refer to version directories, branch labels, etc.
# (if you rename these, you need to rename branch labels in alembic version directories)
GXY = ModelId("gxy")  # galaxy model identifier
TSI = ModelId("tsi")  # tool_shed_install model identifier

ALEMBIC_TABLE = "alembic_version"
SQLALCHEMYMIGRATE_TABLE = "migrate_version"
SQLALCHEMYMIGRATE_LAST_VERSION_GXY = 180
SQLALCHEMYMIGRATE_LAST_VERSION_TSI = 17
log = logging.getLogger(__name__)


[docs]class DatabaseConfig(NamedTuple): url: str template: str encoding: str
[docs]class NoVersionTableError(Exception): # The database has no version table (neither SQLAlchemy Migrate, nor Alembic), so it is # impossible to automatically determine the state of the database. Manual update required.
[docs] def __init__(self, model: str) -> None: super().__init__(f"Your {model} database has no version table; manual update is required")
[docs]class IncorrectVersionError(Exception): # The database has a SQLAlchemy Migrate version table, but its version is either older or more recent # than {SQLALCHEMYMIGRATE_LAST_VERSION_GXY/TSI}, so it cannot be upgraded with Alembic. # (A more recent version may indicate that something has changed in the database past the point # where we can automatically migrate from SQLAlchemy Migrate to Alembic.) # Manual update required.
[docs] def __init__(self, model: str, expected_version: int) -> None: msg = f"Your {model} database version is incorrect; version {expected_version} is expected. " msg += "Manual update is required. " msg += "Please see documentation: https://docs.galaxyproject.org/en/master/admin/db_migration.html#how-to-handle-migrations-incorrectversionerror" super().__init__(msg)
[docs]class OutdatedDatabaseError(Exception): # The database is under Alembic version control, but is out-of-date. Automatic upgrade possible.
[docs] def __init__(self, model: str, db_version: str, code_version: str) -> None: msg = f"Your {model} database has version {db_version}, but this code expects " msg += f"version {code_version}. " msg += "To upgrade your database, run `manage_db.sh upgrade`. " msg += "For more options (e.g. upgrading/downgrading to a specific version) see instructions in that file. " msg += "Please remember to backup your database before migrating." super().__init__(msg)
[docs]class InvalidModelIdError(Exception):
[docs] def __init__(self, model: str) -> None: super().__init__(f"Invalid model: {model}")
[docs]class RevisionNotFoundError(Exception): # The database has an Alembic version table; however, that table does not contain a revision identifier # for the given model. As a result, it is impossible to determine the state of the database for this model # (gxy or tsi).
[docs] def __init__(self, model: str) -> None: msg = "The database has an alembic version table, but that table does not contain " msg += f"a revision for the {model} model" super().__init__(msg)
[docs]class AlembicManager: """ Alembic operations on one database. """
[docs] @staticmethod def is_at_revision(engine: Engine, revision: Union[str, Iterable[str]]) -> bool: """ True if revision is a subset of the set of version heads stored in the database. """ revision = listify(revision) with engine.connect() as conn: context = MigrationContext.configure(conn) db_version_heads = context.get_current_heads() return set(revision) <= set(db_version_heads)
[docs] def __init__(self, engine: Engine, config_dict: Optional[dict] = None) -> None: self.engine = engine self.alembic_cfg = self._load_config(config_dict) self.script_directory = ScriptDirectory.from_config(self.alembic_cfg) self._db_heads: Optional[Iterable[str]] self._reset_db_heads()
def _load_config(self, config_dict: Optional[dict]) -> Config: alembic_root = os.path.dirname(__file__) _alembic_file = os.path.join(alembic_root, "alembic.ini") config = Config(_alembic_file) url = get_url_string(self.engine) config.set_main_option("sqlalchemy.url", url) if config_dict: for key, value in config_dict.items(): config.set_main_option(key, value) return config
[docs] def stamp_model_head(self, model: ModelId) -> None: """Partial proxy to alembic's stamp command.""" command.stamp(self.alembic_cfg, f"{model}@head") self._reset_db_heads()
[docs] def stamp_revision(self, revision: Union[str, Iterable[str]]) -> None: """Partial proxy to alembic's stamp command.""" command.stamp(self.alembic_cfg, revision) # type: ignore[arg-type] # https://alembic.sqlalchemy.org/en/latest/api/commands.html#alembic.command.stamp.params.revision self._reset_db_heads()
[docs] def upgrade(self, model: ModelId) -> None: """Partial proxy to alembic's upgrade command.""" # This works with or without an existing alembic version table. command.upgrade(self.alembic_cfg, f"{model}@head") self._reset_db_heads()
[docs] def is_under_version_control(self, model: ModelId) -> bool: """ True if the database version table contains a revision that corresponds to a revision in the script directory that has branch label `model`. """ if self.db_heads: for db_head in self.db_heads: try: revision = self._get_revision(db_head) if revision and model in revision.branch_labels: log.info(f"The version of the {model} model in the database is {db_head}.") return True except alembic.util.exc.CommandError: # No need to raise exception. log.info(f"Revision {db_head} does not exist in the script directory.") return False
[docs] def is_up_to_date(self, model: ModelId) -> bool: """ True if the head revision for `model` in the script directory is stored in the database. """ head_id = self.get_model_script_head(model) return bool(self.db_heads and head_id in self.db_heads)
[docs] def get_model_db_head(self, model: ModelId) -> Optional[str]: return self._get_head_revision(model, cast(Iterable[str], self.db_heads))
[docs] def get_model_script_head(self, model: ModelId) -> Optional[str]: return self._get_head_revision(model, self.script_directory.get_heads())
def _get_head_revision(self, model: ModelId, heads: Iterable[str]) -> Optional[str]: for head in heads: revision = self._get_revision(head) if revision and model in revision.branch_labels: return head return None @property def db_heads(self) -> Iterable: if self._db_heads is None: # Explicitly check for None: could be an empty tuple. with self.engine.connect() as conn: context: MigrationContext = MigrationContext.configure(conn) self._db_heads = context.get_current_heads() # We get a tuple as long as we use branches. Otherwise, we'd get a single value. # listify() is a safeguard in case we stop using branches. self._db_heads = listify(self._db_heads) return self._db_heads def _get_revision(self, revision_id: str) -> Optional[Script]: try: return self.script_directory.get_revision(revision_id) except alembic.util.exc.CommandError as e: log.error(f"Revision {revision_id} not found in the script directory") raise e def _reset_db_heads(self) -> None: self._db_heads = None
[docs]class DatabaseStateCache: """ Snapshot of database state. """
[docs] def __init__(self, engine: Engine) -> None: self._load_db(engine)
@property def tables(self) -> Dict[str, Table]: return self.db_metadata.tables
[docs] def is_database_empty(self) -> bool: return not bool(self.db_metadata.tables)
[docs] def contains_only_kombu_tables(self) -> bool: return metadata_contains_only_kombu_tables(self.db_metadata)
[docs] def has_alembic_version_table(self) -> bool: return ALEMBIC_TABLE in self.db_metadata.tables
[docs] def has_sqlalchemymigrate_version_table(self) -> bool: return SQLALCHEMYMIGRATE_TABLE in self.db_metadata.tables
[docs] def is_last_sqlalchemymigrate_version(self, last_version: int) -> bool: return self.sqlalchemymigrate_version == last_version
def _load_db(self, engine: Engine) -> None: with engine.connect() as conn: self.db_metadata = self._load_db_metadata(conn) self.sqlalchemymigrate_version = self._load_sqlalchemymigrate_version(conn) def _load_db_metadata(self, conn: Connection) -> MetaData: metadata = MetaData() metadata.reflect(bind=conn) return metadata def _load_sqlalchemymigrate_version(self, conn: Connection) -> CursorResult: if self.has_sqlalchemymigrate_version_table(): sql = text(f"select version from {SQLALCHEMYMIGRATE_TABLE}") return conn.execute(sql).scalar()
[docs]def metadata_contains_only_kombu_tables(metadata: MetaData) -> bool: """ Return True if metadata contains only kombu-related tables. (ref: https://github.com/galaxyproject/galaxy/issues/13689) """ return all(table.startswith("kombu_") or table.startswith("sqlite_") for table in metadata.tables.keys())
[docs]def verify_databases_via_script( gxy_config: DatabaseConfig, tsi_config: DatabaseConfig, is_auto_migrate: bool = False, ) -> None: # This function serves a use case when an engine has not been created yet # (e.g. when called from a script). gxy_engine = create_engine(gxy_config.url) tsi_engine = None if tsi_config.url and tsi_config.url != gxy_config.url: tsi_engine = create_engine(tsi_config.url) verify_databases( gxy_engine, gxy_config.template, gxy_config.encoding, tsi_engine, tsi_config.template, tsi_config.encoding, is_auto_migrate, ) gxy_engine.dispose() if tsi_engine: tsi_engine.dispose()
[docs]def verify_databases( gxy_engine: Engine, gxy_template: Optional[str], gxy_encoding: Optional[str], tsi_engine: Optional[Engine], tsi_template: Optional[str], tsi_encoding: Optional[str], is_auto_migrate: bool, ) -> None: # Verify gxy model. gxy_verifier = DatabaseStateVerifier(gxy_engine, GXY, gxy_template, gxy_encoding, is_auto_migrate) gxy_verifier.run() # New database = one engine or same engine, and gxy model has just been initialized. is_new_database = (not tsi_engine or gxy_engine == tsi_engine) and gxy_verifier.is_new_database # Determine engine for tsi model. tsi_engine = tsi_engine or gxy_engine # Verify tsi model model. tsi_verifier = DatabaseStateVerifier(tsi_engine, TSI, tsi_template, tsi_encoding, is_auto_migrate, is_new_database) tsi_verifier.run()
[docs]class DatabaseStateVerifier:
[docs] def __init__( self, engine: Engine, model: ModelId, database_template: Optional[str], database_encoding: Optional[str], is_auto_migrate: bool, is_new_database: Optional[bool] = False, ) -> None: self.engine = engine self.model = model self.database_template = database_template self.database_encoding = database_encoding self._is_auto_migrate = is_auto_migrate self.metadata = get_metadata(model) # True if database has been initialized for another model. self.is_new_database = is_new_database # These values may or may not be required, so do a lazy load. self._db_state: Optional[DatabaseStateCache] = None self._alembic_manager: Optional[AlembicManager] = None
@property def is_auto_migrate(self) -> bool: return self._is_auto_migrate @property def db_state(self) -> DatabaseStateCache: if not self._db_state: self._db_state = DatabaseStateCache(engine=self.engine) return self._db_state @property def alembic_manager(self) -> AlembicManager: if not self._alembic_manager: self._alembic_manager = get_alembic_manager(self.engine) return self._alembic_manager
[docs] def run(self) -> None: if self._handle_no_database(): return if self._handle_empty_database(): return self._handle_nonempty_database()
def _handle_no_database(self) -> bool: url = get_url_string(self.engine) try: # connect using the database name from the sqlalchemy engine exists = database_exists(url, database=self.engine.url.database) except OperationalError: exists = False if not exists: self._create_database(url) self._initialize_database() return True return False def _handle_empty_database(self) -> bool: if self.is_new_database or self._is_database_empty() or self._contains_only_kombu_tables(): self._initialize_database() return True return False def _handle_nonempty_database(self) -> None: if self._has_alembic_version_table(): self._handle_with_alembic() elif self._has_sqlalchemymigrate_version_table(): if self._is_last_sqlalchemymigrate_version(): self._try_to_upgrade() else: self._handle_wrong_sqlalchemymigrate_version() else: self._handle_no_version_table() def _handle_with_alembic(self) -> None: am = self.alembic_manager model = self._get_model_name() if am.is_up_to_date(self.model): log.info(f"Your {model} database is up-to-date") return if am.is_under_version_control(self.model): # Model is under version control, but outdated. Try to upgrade. self._try_to_upgrade() else: # Model is not under version control. We fail for the gxy model because we can't guess # what the state of the database is if there is an alembic table without a gxy revision. # For the tsi model, we can guess. If there are no tsi tables in the database, we treat it # as a new install; but if there is at least one table, we assume it is the same version as gxy. # See more details in this PR description: https://github.com/galaxyproject/galaxy/pull/13108 if self.model == TSI: if self._no_model_tables_exist(): self._initialize_database() else: self._try_to_upgrade() else: raise RevisionNotFoundError(model) def _try_to_upgrade(self): am = self.alembic_manager model = self._get_model_name() code_version = am.get_model_script_head(self.model) if not self.is_auto_migrate: db_version = am.get_model_db_head(self.model) raise OutdatedDatabaseError(model, cast(str, db_version), cast(str, code_version)) else: log.info("Database is being upgraded to current version: {code_version}") am.upgrade(self.model) return def _get_model_name(self) -> str: return "galaxy" if self.model == GXY else "tool shed install" def _no_model_tables_exist(self) -> bool: # True if there are no tables from `self.model` in the database. db_tables = self.db_state.tables for tablename in set(self.metadata.tables) - {ALEMBIC_TABLE}: if tablename in db_tables: return False return True def _create_database(self, url: str) -> None: create_kwds = {} message = f"Creating database for URI [{url}]" if self.database_template: message += f" from template [{self.database_template}]" create_kwds["template"] = self.database_template if self.database_encoding: message += f" with encoding [{self.database_encoding}]" create_kwds["encoding"] = self.database_encoding log.info(message) create_database(url, **create_kwds) def _initialize_database(self) -> None: load_metadata(self.metadata, self.engine) if self.model == GXY: self._create_additional_database_objects() self.alembic_manager.stamp_model_head(self.model) self.is_new_database = True def _create_additional_database_objects(self) -> None: create_additional_database_objects(self.engine) def _is_database_empty(self) -> bool: return self.db_state.is_database_empty() def _contains_only_kombu_tables(self) -> bool: return self.db_state.contains_only_kombu_tables() def _has_alembic_version_table(self) -> bool: return self.db_state.has_alembic_version_table() def _has_sqlalchemymigrate_version_table(self) -> bool: return self.db_state.has_sqlalchemymigrate_version_table() def _is_last_sqlalchemymigrate_version(self) -> bool: last_version = get_last_sqlalchemymigrate_version(self.model) return self.db_state.is_last_sqlalchemymigrate_version(last_version) def _handle_no_version_table(self) -> NoReturn: model = self._get_model_name() raise NoVersionTableError(model) def _handle_wrong_sqlalchemymigrate_version(self) -> NoReturn: if self.model == GXY: expected_version = SQLALCHEMYMIGRATE_LAST_VERSION_GXY else: expected_version = SQLALCHEMYMIGRATE_LAST_VERSION_TSI model = self._get_model_name() raise IncorrectVersionError(model, expected_version)
[docs]def get_last_sqlalchemymigrate_version(model: ModelId) -> int: if model == GXY: return SQLALCHEMYMIGRATE_LAST_VERSION_GXY elif model == TSI: return SQLALCHEMYMIGRATE_LAST_VERSION_TSI else: raise InvalidModelIdError(model)
[docs]def get_url_string(engine: Engine) -> str: db_url = engine.url.render_as_string(hide_password=False) return urllib.parse.unquote(db_url)
[docs]def get_alembic_manager(engine: Engine) -> AlembicManager: return AlembicManager(engine)
[docs]def get_metadata(model: ModelId) -> MetaData: if model == GXY: return get_gxy_metadata() elif model == TSI: return get_tsi_metadata() else: raise InvalidModelIdError(model)
[docs]def load_metadata(metadata: MetaData, engine: Engine) -> None: with engine.connect() as conn: metadata.create_all(bind=conn)
[docs]def listify(data: Union[str, Iterable[str]]) -> Iterable[str]: if not isinstance(data, (list, tuple)): return [cast(str, data)] return data
[docs]def get_gxy_metadata() -> MetaData: return gxy_base.metadata
[docs]def get_tsi_metadata() -> MetaData: return tsi_base.metadata