Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.model.migrate.versions.0067_populate_sequencer_table
"""
Migration script to populate the 'sequencer' table and it is populated using unique
entries in the 'datatx_info' column in the 'request_type' table. It also deletes the 'datatx_info'
column in the 'request_type' table and adds a foreign key to the 'sequencer' table. The
actual contents of the datatx_info column are stored as form_values.
"""
import logging
from json import dumps, loads
from sqlalchemy import (
Column,
ForeignKey,
Integer,
MetaData,
Table
)
from galaxy.model.custom_types import JSONType
from galaxy.model.migrate.versions.util import (
add_column,
drop_column,
engine_false,
localtimestamp,
nextval
)
log = logging.getLogger(__name__)
metadata = MetaData()
[docs]def get_latest_id(migrate_engine, table):
result = migrate_engine.execute("select id from %s order by id desc" % table)
row = result.fetchone()
if row:
return row[0]
else:
raise Exception('Unable to get the latest id in the %s table.' % table)
[docs]def create_sequencer_form_definition(migrate_engine):
'''
Create a new form_definition containing 5 fields (host, username, password,
data_dir & rename_datasets) which described the existing datatx_info json
dict in the request_type table
'''
# create new form_definition_current in the db
cmd = "INSERT INTO form_definition_current VALUES ( {}, {}, {}, {}, {} )".format(
nextval(migrate_engine, 'form_definition_current'),
localtimestamp(migrate_engine),
localtimestamp(migrate_engine),
'NULL',
engine_false(migrate_engine))
migrate_engine.execute(cmd)
# get this form_definition_current id
form_definition_current_id = get_latest_id(migrate_engine, 'form_definition_current')
# create new form_definition in the db
form_definition_name = 'Generic sequencer form'
form_definition_desc = ''
form_definition_fields = []
fields = [('Host', 'TextField'),
('User name', 'TextField'),
('Password', 'PasswordField'),
('Data directory', 'TextField')]
for index, (label, field_type) in enumerate(fields):
form_definition_fields.append({'name': 'field_%i' % index,
'label': label,
'helptext': '',
'visible': True,
'required': False,
'type': field_type,
'selectlist': [],
'layout': 'none',
'default': ''})
form_definition_fields.append({'name': 'field_%i' % len(fields),
'label': 'Prepend the experiment name and sample name to the dataset name?',
'helptext': 'Galaxy datasets are renamed by prepending the experiment name and sample name to the dataset name, ensuring dataset names remain unique in Galaxy even when multiple datasets have the same name on the sequencer.',
'visible': True,
'required': False,
'type': 'SelectField',
'selectlist': ['Do not rename',
'Preprend sample name',
'Prepend experiment name',
'Prepend experiment and sample name'],
'layout': 'none',
'default': ''})
form_definition_type = 'Sequencer Information Form'
form_definition_layout = dumps('[]')
cmd = "INSERT INTO form_definition VALUES ( %s, %s, %s, '%s', '%s', %s, '%s', '%s', '%s' )"
cmd = cmd % (nextval(migrate_engine, 'form_definition'),
localtimestamp(migrate_engine),
localtimestamp(migrate_engine),
form_definition_name,
form_definition_desc,
form_definition_current_id,
dumps(form_definition_fields),
form_definition_type,
form_definition_layout)
migrate_engine.execute(cmd)
# get this form_definition id
form_definition_id = get_latest_id(migrate_engine, 'form_definition')
# update the form_definition_id column in form_definition_current
cmd = "UPDATE form_definition_current SET latest_form_id=%i WHERE id=%i" % (form_definition_id, form_definition_current_id)
migrate_engine.execute(cmd)
return form_definition_id
[docs]def get_sequencer_id(migrate_engine, sequencer_info):
'''Get the sequencer id corresponding to the sequencer information'''
# Check if there is any existing sequencer which have the same sequencer
# information fields & values
cmd = "SELECT sequencer.id, form_values.content FROM sequencer, form_values WHERE sequencer.form_values_id=form_values.id"
result = migrate_engine.execute(cmd)
for row in result:
sequencer_id = row[0]
values = str(row[1])
if not values.strip():
continue
values = loads(values)
# proceed only if sequencer_info is a valid list
if values and isinstance(values, dict):
if sequencer_info.get('host', '') == values.get('field_0', '') \
and sequencer_info.get('username', '') == values.get('field_1', '') \
and sequencer_info.get('password', '') == values.get('field_2', '') \
and sequencer_info.get('data_dir', '') == values.get('field_3', '') \
and sequencer_info.get('rename_dataset', '') == values.get('field_4', ''):
return sequencer_id
return None
[docs]def add_sequencer(migrate_engine, sequencer_index, sequencer_form_definition_id, sequencer_info):
'''Adds a new sequencer to the sequencer table along with its form values.'''
# Create a new form values record with the supplied sequencer information
values = dumps({'field_0': sequencer_info.get('host', ''),
'field_1': sequencer_info.get('username', ''),
'field_2': sequencer_info.get('password', ''),
'field_3': sequencer_info.get('data_dir', ''),
'field_4': sequencer_info.get('rename_dataset', '')})
cmd = "INSERT INTO form_values VALUES ( {}, {}, {}, {}, '{}' )".format(nextval(migrate_engine, 'form_values'),
localtimestamp(migrate_engine),
localtimestamp(migrate_engine),
sequencer_form_definition_id,
values)
migrate_engine.execute(cmd)
sequencer_form_values_id = get_latest_id(migrate_engine, 'form_values')
# Create a new sequencer record with reference to the form value created above.
name = 'Sequencer_%i' % sequencer_index
desc = ''
version = ''
sequencer_type_id = 'simple_unknown_sequencer'
cmd = "INSERT INTO sequencer VALUES ( {}, {}, {}, '{}', '{}', '{}', '{}', {}, {}, {} )".format(
nextval(migrate_engine, 'sequencer'),
localtimestamp(migrate_engine),
localtimestamp(migrate_engine),
name,
desc,
sequencer_type_id,
version,
sequencer_form_definition_id,
sequencer_form_values_id,
engine_false(migrate_engine))
migrate_engine.execute(cmd)
return get_latest_id(migrate_engine, 'sequencer')
[docs]def update_sequencer_id_in_request_type(migrate_engine, request_type_id, sequencer_id):
'''Update the foreign key to the sequencer table in the request_type table'''
cmd = "UPDATE request_type SET sequencer_id=%i WHERE id=%i" % (sequencer_id, request_type_id)
migrate_engine.execute(cmd)
[docs]def upgrade(migrate_engine):
print(__doc__)
metadata.bind = migrate_engine
metadata.reflect()
RequestType_table = Table("request_type", metadata, autoload=True)
# create foreign key field to the sequencer table in the request_type table
col = Column("sequencer_id", Integer, ForeignKey("sequencer.id"), nullable=True)
add_column(col, RequestType_table, metadata)
# copy the sequencer information contained in the 'datatx_info' column
# of the request_type table to the form values referenced in the sequencer table
cmd = "SELECT id, name, datatx_info FROM request_type ORDER BY id ASC"
result = migrate_engine.execute(cmd)
results_list = result.fetchall()
# Proceed only if request_types exists
if len(results_list):
# In this migration script the all the contents of the datatx_info are stored as form_values
# with a pointer to the sequencer table. This way the sequencer information can be customized
# by the admin and is no longer restricted to host, username, password, data directory.
# For the existing request_types in the database, we add a new form_definition
# with these 4 fields. Then we populate the sequencer table with unique datatx_info
# column from the existing request_types.
sequencer_form_definition_id = create_sequencer_form_definition(migrate_engine)
sequencer_index = 1
for row in results_list:
request_type_id = row[0]
sequencer_info = str(row[2]) # datatx_info column
# skip if sequencer_info is empty
if not sequencer_info.strip() or sequencer_info in ['None', 'null']:
continue
sequencer_info = loads(sequencer_info.strip())
# proceed only if sequencer_info is a valid dict
if sequencer_info and isinstance(sequencer_info, dict):
# check if this sequencer has already been added to the sequencer table
sequencer_id = get_sequencer_id(migrate_engine, sequencer_info)
if not sequencer_id:
# add to the sequencer table
sequencer_id = add_sequencer(migrate_engine, sequencer_index, sequencer_form_definition_id, sequencer_info)
# now update the sequencer_id column in request_type table
update_sequencer_id_in_request_type(migrate_engine, request_type_id, sequencer_id)
sequencer_index = sequencer_index + 1
# Finally delete the 'datatx_info' column from the request_type table
drop_column('datatx_info', RequestType_table)
[docs]def downgrade(migrate_engine):
metadata.bind = migrate_engine
metadata.reflect()
RequestType_table = Table("request_type", metadata, autoload=True)
# create the 'datatx_info' column
col = Column("datatx_info", JSONType())
add_column(col, RequestType_table, metadata)
# restore the datatx_info column data in the request_type table with data from
# the sequencer and the form_values table
cmd = "SELECT request_type.id, form_values.content "\
+ " FROM request_type, sequencer, form_values "\
+ " WHERE request_type.sequencer_id=sequencer.id AND sequencer.form_values_id=form_values.id "\
+ " ORDER BY request_type.id ASC"
result = migrate_engine.execute(cmd)
for row in result:
request_type_id = row[0]
seq_values = loads(str(row[1]))
# create the datatx_info json dict
datatx_info = dumps(dict(host=seq_values.get('field_0', ''),
username=seq_values.get('field_1', ''),
password=seq_values.get('field_2', ''),
data_dir=seq_values.get('field_3', ''),
rename_dataset=seq_values.get('field_4', '')))
# update the column
cmd = "UPDATE request_type SET datatx_info='%s' WHERE id=%i" % (datatx_info, request_type_id)
migrate_engine.execute(cmd)
# delete foreign key field to the sequencer table in the request_type table
drop_column('sequencer_id', RequestType_table)