Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.model.migrations.scripts
import os
import sys
from typing import (
List,
Optional,
Tuple,
)
import alembic.config
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from galaxy.model.database_utils import database_exists
from galaxy.model.migrations import (
AlembicManager,
DatabaseConfig,
DatabaseStateCache,
GXY,
IncorrectVersionError,
NoVersionTableError,
SQLALCHEMYMIGRATE_LAST_VERSION_GXY,
TSI,
)
from galaxy.util.properties import (
find_config_file,
get_data_dir,
load_app_properties,
)
DEFAULT_CONFIG_NAMES = ["galaxy", "universe_wsgi"]
CONFIG_FILE_ARG = "--galaxy-config"
CONFIG_DIR_NAME = "config"
GXY_CONFIG_PREFIX = "GALAXY_CONFIG_"
TSI_CONFIG_PREFIX = "GALAXY_INSTALL_CONFIG_"
[docs]class DatabaseDoesNotExistError(Exception):
[docs] def __init__(self, db_url: str) -> None:
super().__init__(
f"""The database at {db_url} does not exist. You must
create and initialize the database before running this script. You
can do so by (a) running `create_db.sh`; or by (b) starting Galaxy,
in which case Galaxy will create and initialize the database
automatically."""
)
[docs]class DatabaseNotInitializedError(Exception):
[docs] def __init__(self, db_url: str) -> None:
super().__init__(
f"""The database at {db_url} is empty. You must
initialize the database before running this script. You can do so by
(a) running `create_db.sh`; or by (b) starting Galaxy, in which case
Galaxy will initialize the database automatically."""
)
[docs]def verify_database_is_initialized(db_url: str) -> None:
"""
Intended for use by scripts that run database migrations (manage_db.sh,
run_alembic.sh). Those scripts are meant to run on a database that has been
initialized with the appropriate metadata (e.g. galaxy or install model).
This function will raise an error if the database does not exist or has not
been initialized*.
*NOTE: this function cannot determine whether a database has been properly
initialized; it can only tell when a database has *not* been initialized.
"""
if not database_exists(db_url):
raise DatabaseDoesNotExistError(db_url)
engine = create_engine(db_url)
try:
db_state = DatabaseStateCache(engine=engine)
if db_state.is_database_empty() or db_state.contains_only_kombu_tables():
raise DatabaseNotInitializedError(db_url)
finally:
engine.dispose()
[docs]def get_configuration(argv: List[str], cwd: str) -> Tuple[DatabaseConfig, DatabaseConfig, bool]:
"""
Return a 3-item-tuple with configuration values used for managing databases.
"""
config_file = _pop_config_file(argv)
return get_configuration_from_file(cwd, config_file)
[docs]def get_configuration_from_file(
cwd: str, config_file: Optional[str] = None
) -> Tuple[DatabaseConfig, DatabaseConfig, bool]:
if config_file is None:
cwds = [cwd, os.path.join(cwd, CONFIG_DIR_NAME)]
config_file = find_config_file(DEFAULT_CONFIG_NAMES, dirs=cwds)
# load gxy properties and auto-migrate
properties = load_app_properties(config_file=config_file, config_prefix=GXY_CONFIG_PREFIX)
default_url = f"sqlite:///{os.path.join(get_data_dir(properties), 'universe.sqlite')}?isolation_level=IMMEDIATE"
url = properties.get("database_connection", default_url)
template = properties.get("database_template", None)
encoding = properties.get("database_encoding", None)
is_auto_migrate = properties.get("database_auto_migrate", False)
gxy_config = DatabaseConfig(url, template, encoding)
# load tsi properties
properties = load_app_properties(config_file=config_file, config_prefix=TSI_CONFIG_PREFIX)
default_url = gxy_config.url
url = properties.get("install_database_connection", default_url)
template = properties.get("database_template", None)
encoding = properties.get("database_encoding", None)
tsi_config = DatabaseConfig(url, template, encoding)
return (gxy_config, tsi_config, is_auto_migrate)
def _pop_config_file(argv: List[str]) -> Optional[str]:
if CONFIG_FILE_ARG in argv:
pos = argv.index(CONFIG_FILE_ARG)
argv.pop(pos) # pop argument name
return argv.pop(pos) # pop and return argument value
return None
[docs]def add_db_urls_to_command_arguments(argv: List[str], gxy_url: str, tsi_url: str) -> None:
_insert_x_argument(argv, "tsi_url", tsi_url)
_insert_x_argument(argv, "gxy_url", gxy_url)
def _insert_x_argument(argv, key: str, value: str) -> None:
# `_insert_x_argument('mykey', 'myval')` transforms `foo -a 1` into `foo -x mykey=myval -a 42`
argv.insert(1, f"{key}={value}")
argv.insert(1, "-x")
[docs]def invoke_alembic() -> None:
"""
Invoke the Alembic command line runner.
Accept 'heads' as the target revision argument to enable upgrading both gxy and tsi in one command.
This is consistent with Alembic's CLI, which allows `upgrade heads`. However, this would not work for
separate gxy and tsi databases: we can't attach a database url to a revision after Alembic has been
invoked with the 'upgrade' command and the 'heads' argument. So, instead we invoke Alembic for each head.
"""
if "heads" in sys.argv and "upgrade" in sys.argv:
i = sys.argv.index("heads")
sys.argv[i] = f"{GXY}@head"
alembic.config.main()
sys.argv[i] = f"{TSI}@head"
alembic.config.main()
else:
alembic.config.main()
[docs]class LegacyScriptsException(Exception):
# Misc. errors caused by incorrect arguments passed to a legacy script.
[docs]class LegacyManageDb:
LEGACY_CONFIG_FILE_ARG_NAMES = ["-c", "--config", "--config-file"]
[docs] def get_gxy_version(self):
"""
Get the head revision for the gxy branch from the Alembic script directory.
(previously referred to as "max/repository version")
"""
script_directory = self._get_script_directory()
heads = script_directory.get_heads()
for head in heads:
revision = script_directory.get_revision(head)
if revision and GXY in revision.branch_labels:
return head
return None
[docs] def get_gxy_db_version(self, gxy_db_url=None):
"""
Get the head revision for the gxy branch from the galaxy database. If there
is no alembic_version table, get the sqlalchemy migrate version. Raise error
if that version is not the latest.
(previously referred to as "database version")
"""
db_url = gxy_db_url or self.gxy_db_url
try:
engine = create_engine(db_url)
version = self._get_gxy_alembic_db_version(engine)
if not version:
version = self._get_gxy_sam_db_version(engine)
if version is None:
raise NoVersionTableError(GXY)
elif version != SQLALCHEMYMIGRATE_LAST_VERSION_GXY:
raise IncorrectVersionError(GXY, SQLALCHEMYMIGRATE_LAST_VERSION_GXY)
return version
finally:
engine.dispose()
[docs] def run_upgrade(self, gxy_db_url=None, tsi_db_url=None):
"""
Alembic will upgrade both branches, gxy and tsi, to their head revisions.
"""
gxy_db_url = gxy_db_url or self.gxy_db_url
tsi_db_url = tsi_db_url or self.tsi_db_url
self._upgrade(gxy_db_url, GXY)
self._upgrade(tsi_db_url, TSI)
[docs] def rename_config_argument(self, argv: List[str]) -> None:
"""
Rename the optional config argument: we can't use '-c' because that option is used by Alembic.
"""
for arg in self.LEGACY_CONFIG_FILE_ARG_NAMES:
if arg in argv:
self._rename_arg(argv, arg, CONFIG_FILE_ARG)
return
def _rename_arg(self, argv, old_name, new_name) -> None:
pos = argv.index(old_name)
argv[pos] = new_name
def _upgrade(self, db_url, model):
try:
engine = create_engine(db_url)
am = get_alembic_manager(engine)
am.upgrade(model)
finally:
engine.dispose()
def _set_db_urls(self):
self.rename_config_argument(sys.argv)
self._load_db_urls()
def _load_db_urls(self):
gxy_config, tsi_config, _ = get_configuration(sys.argv, os.getcwd())
self.gxy_db_url = gxy_config.url
self.tsi_db_url = tsi_config.url
def _get_gxy_sam_db_version(self, engine):
dbcache = DatabaseStateCache(engine)
return dbcache.sqlalchemymigrate_version
def _get_script_directory(self):
alembic_cfg = get_alembic_cfg()
return ScriptDirectory.from_config(alembic_cfg)
def _get_gxy_alembic_db_version(self, engine):
# We may get 2 values, one for each branch (gxy and tsi). So we need to
# determine which one is the gxy head.
with engine.connect() as conn:
context = MigrationContext.configure(conn)
db_heads = context.get_current_heads()
if db_heads:
gxy_revisions = self._get_all_gxy_revisions()
for db_head in db_heads:
if db_head in gxy_revisions:
return db_head
return None
def _get_all_gxy_revisions(self):
gxy_revisions = set()
script_directory = self._get_script_directory()
for rev in script_directory.walk_revisions():
if GXY in rev.branch_labels:
gxy_revisions.add(rev.revision)
return gxy_revisions
[docs]def get_alembic_cfg():
config_file = os.path.join(os.path.dirname(__file__), "alembic.ini")
config_file = os.path.abspath(config_file)
return Config(config_file)