Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.model.migrate.triggers.history_update_time_field
"""
Database trigger installation and removal
"""
from galaxy.model.migrate.versions.util import execute_statements
[docs]def install_timestamp_triggers(engine):
"""
Install update_time propagation triggers for history table
"""
statements = get_timestamp_install_sql(engine.name)
execute_statements(engine, statements)
[docs]def drop_timestamp_triggers(engine):
"""
Remove update_time propagation triggers for history table
"""
statements = get_timestamp_drop_sql(engine.name)
execute_statements(engine, statements)
[docs]def get_timestamp_install_sql(variant):
"""
Generate a list of SQL statements for installation of timestamp triggers
"""
sql = get_timestamp_drop_sql(variant)
if 'postgres' in variant:
# PostgreSQL has a separate function definition and a trigger
# assignment. The first two statements the functions, and
# the later assign those functions to triggers on tables
fn_name = 'update_history_update_time'
sql.append(build_pg_timestamp_fn(fn_name, 'history', source_key='history_id'))
sql.append(build_pg_trigger('history_dataset_association', fn_name))
sql.append(build_pg_trigger('history_dataset_collection_association', fn_name))
else:
# Other database variants are more granular. Requiring separate
# statements for INSERT/UPDATE/DELETE, and the body of the trigger
# is not necessarily reusable with a function
for operation in ['INSERT', 'UPDATE', 'DELETE']:
# change hda -> update history
sql.append(build_timestamp_trigger(
operation,
'history_dataset_association',
'history',
source_key='history_id'
))
# change hdca -> update history
sql.append(build_timestamp_trigger(
operation,
'history_dataset_collection_association',
'history',
source_key='history_id'
))
return sql
[docs]def get_timestamp_drop_sql(variant):
"""
Generate a list of statements to drop the timestamp update triggers
"""
sql = []
if 'postgres' in variant:
sql.append("DROP FUNCTION IF EXISTS update_history_update_time() CASCADE;")
else:
for operation in ['INSERT', 'UPDATE', 'DELETE']:
for when in ['BEFORE', 'AFTER']:
sql.append(build_drop_trigger(operation, 'history_dataset_association', when))
sql.append(build_drop_trigger(operation, 'history_dataset_collection_association', when))
return sql
[docs]def build_pg_timestamp_fn(fn_name, target_table, source_key, target_key='id'):
"""Generates a PostgreSQL history update timestamp function"""
return f"""
CREATE OR REPLACE FUNCTION {fn_name}()
RETURNS trigger
LANGUAGE 'plpgsql'
AS $BODY$
BEGIN
IF (TG_OP = 'DELETE') THEN
UPDATE {target_table}
SET update_time = (CURRENT_TIMESTAMP AT TIME ZONE 'UTC')
WHERE {target_key} = OLD.{source_key};
RETURN OLD;
ELSEIF (TG_OP = 'UPDATE') THEN
UPDATE {target_table}
SET update_time = (CURRENT_TIMESTAMP AT TIME ZONE 'UTC')
WHERE {target_key} = NEW.{source_key} OR {target_key} = OLD.{source_key};
RETURN NEW;
ELSIF (TG_OP = 'INSERT') THEN
UPDATE {target_table}
SET update_time = (CURRENT_TIMESTAMP AT TIME ZONE 'UTC')
WHERE {target_key} = NEW.{source_key};
RETURN NEW;
END IF;
END;
$BODY$;
"""
[docs]def build_pg_trigger(source_table, fn_name, when='AFTER'):
"""Assigns a PostgreSQL trigger to indicated table, calling user-defined function"""
when_initial = when.lower()[0]
trigger_name = f"trigger_{source_table}_{when_initial}iudr"
return f"""
CREATE TRIGGER {trigger_name}
{when} INSERT OR DELETE OR UPDATE
ON {source_table}
FOR EACH ROW
EXECUTE PROCEDURE {fn_name}();
"""
[docs]def build_timestamp_trigger(operation, source_table, target_table, source_key, target_key='id', when='AFTER'):
"""Creates a non-PostgreSQL update_time trigger"""
trigger_name = get_trigger_name(operation, source_table, when)
# three different update clauses depending on update/insert/delete
clause = ""
if operation == "DELETE":
clause = f"{target_key} = OLD.{source_key}"
elif operation == "UPDATE":
clause = f"{target_key} = NEW.{source_key} OR {target_key} = OLD.{source_key}"
else:
clause = f"{target_key} = NEW.{source_key}"
return f"""
CREATE TRIGGER {trigger_name}
{when} {operation}
ON {source_table}
FOR EACH ROW
BEGIN
UPDATE {target_table}
SET update_time = CURRENT_TIMESTAMP
WHERE {clause};
END;
"""
[docs]def build_drop_trigger(operation, source_table, when='AFTER'):
"""Drops a non-PostgreSQL trigger by name"""
trigger_name = get_trigger_name(operation, source_table, when)
return f"DROP TRIGGER IF EXISTS {trigger_name}"
[docs]def get_trigger_name(operation, source_table, when):
"""Non-PostgreSQL trigger name"""
op_initial = operation.lower()[0]
when_initial = when.lower()[0]
return f"trigger_{source_table}_{when_initial}{op_initial}r"