Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for tool_shed.webapp.model.migrate.versions.0022_add_repository_admin_roles

"""
Migration script to create the repository_role_association table, insert name-spaced
repository administrative roles into the role table and associate each repository and
owner with the appropriate name-spaced role.
"""

import datetime
import logging
import sys

from sqlalchemy import (
    Column,
    DateTime,
    ForeignKey,
    Integer,
    MetaData,
    Table,
)
from sqlalchemy.exc import NoSuchTableError

log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
format = "%(name)s %(levelname)s %(asctime)s %(message)s"
formatter = logging.Formatter(format)
handler.setFormatter(formatter)
log.addHandler(handler)

metadata = MetaData()

NOW = datetime.datetime.utcnow
ROLE_TYPE = "system"

RepositoryRoleAssociation_table = Table(
    "repository_role_association",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("repository_id", Integer, ForeignKey("repository.id"), index=True),
    Column("role_id", Integer, ForeignKey("role.id"), index=True),
    Column("create_time", DateTime, default=NOW),
    Column("update_time", DateTime, default=NOW, onupdate=NOW),
)


[docs]def nextval(migrate_engine, table, col="id"): if migrate_engine.name in ["postgresql", "postgres"]: return f"nextval('{table}_{col}_seq')" elif migrate_engine.name in ["mysql", "sqlite"]: return "null" else: raise Exception(f"Unable to convert data for unknown database type: {migrate_engine.name}")
[docs]def localtimestamp(migrate_engine): if migrate_engine.name in ["postgresql", "postgres", "mysql"]: return "LOCALTIMESTAMP" elif migrate_engine.name == "sqlite": return "current_date || ' ' || current_time" else: raise Exception(f"Unable to convert data for unknown database type: {migrate_engine.name}")
[docs]def boolean_false(migrate_engine): if migrate_engine.name in ["postgresql", "postgres", "mysql"]: return False elif migrate_engine.name == "sqlite": return 0 else: raise Exception(f"Unable to convert data for unknown database type: {migrate_engine.name}")
[docs]def upgrade(migrate_engine): print(__doc__) metadata.bind = migrate_engine metadata.reflect() # Create the new repository_role_association table. try: RepositoryRoleAssociation_table.create() except Exception: log.exception("Creating repository_role_association table failed.") # Select the list of repositories and associated public user names for their owners. user_ids = [] repository_ids = [] role_names = [] cmd = "SELECT repository.id, repository.name, repository.user_id, galaxy_user.username FROM repository, galaxy_user WHERE repository.user_id = galaxy_user.id;" for row in migrate_engine.execute(cmd): repository_id = row[0] name = row[1] user_id = row[2] username = row[3] repository_ids.append(int(repository_id)) role_names.append(f"{str(name)}_{str(username)}_admin") user_ids.append(int(user_id)) # Insert a new record into the role table for each new role. for tup in zip(repository_ids, user_ids, role_names): repository_id, user_id, role_name = tup cmd = "INSERT INTO role VALUES (" cmd += f"{nextval(migrate_engine, 'role')}, " cmd += f"{localtimestamp(migrate_engine)}, " cmd += f"{localtimestamp(migrate_engine)}, " cmd += f"'{role_name}', " cmd += "'A user or group member with this role can administer this repository.', " cmd += f"'{ROLE_TYPE}', " cmd += f"{boolean_false(migrate_engine)}" cmd += ");" migrate_engine.execute(cmd) # Get the id of the new role. cmd = f"SELECT id FROM role WHERE name = '{role_name}' and type = '{ROLE_TYPE}';" row = migrate_engine.execute(cmd).fetchone() if row: role_id = row[0] else: role_id = None if role_id: # Create a repository_role_association record to associate the repository with the new role. cmd = "INSERT INTO repository_role_association VALUES (" cmd += f"{nextval(migrate_engine, 'repository_role_association')}, " cmd += "%d, " % int(repository_id) cmd += "%d, " % int(role_id) cmd += f"{localtimestamp(migrate_engine)}, " cmd += f"{localtimestamp(migrate_engine)} " cmd += ");" migrate_engine.execute(cmd) # Create a user_role_association record to associate the repository owner with the new role. cmd = "INSERT INTO user_role_association VALUES (" cmd += f"{nextval(migrate_engine, 'user_role_association')}, " cmd += "%d, " % int(user_id) cmd += "%d, " % int(role_id) cmd += f"{localtimestamp(migrate_engine)}, " cmd += f"{localtimestamp(migrate_engine)} " cmd += ");" migrate_engine.execute(cmd)
[docs]def downgrade(migrate_engine): metadata.bind = migrate_engine metadata.reflect() # Determine the list of roles to delete by first selecting the list of repositories and associated # public user names for their owners. role_names = [] cmd = "SELECT name, username FROM repository, galaxy_user WHERE repository.user_id = galaxy_user.id;" for row in migrate_engine.execute(cmd): name = row[0] username = row[1] role_names.append(f"{str(name)}_{str(username)}_admin") # Delete each role as well as all users associated with each role. for role_name in role_names: # Select the id of the record associated with the current role_name from the role table. cmd = f"SELECT id, name FROM role WHERE name = '{role_name}';" row = migrate_engine.execute(cmd).fetchone() if row: role_id = row[0] else: role_id = None if role_id: # Delete all user_role_association records for the current role. cmd = "DELETE FROM user_role_association WHERE role_id = %d;" % int(role_id) migrate_engine.execute(cmd) # Delete all repository_role_association records for the current role. cmd = "DELETE FROM repository_role_association WHERE role_id = %d;" % int(role_id) migrate_engine.execute(cmd) # Delete the role from the role table. cmd = "DELETE FROM role WHERE id = %d;" % int(role_id) migrate_engine.execute(cmd) # Drop the repository_role_association table. try: RepositoryRoleAssociation_table = Table("repository_role_association", metadata, autoload=True) except NoSuchTableError: log.debug("Failed loading table repository_role_association") try: RepositoryRoleAssociation_table.drop() except Exception: log.exception("Dropping repository_role_association table failed.")