Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.model.migrate.versions.0065_add_name_to_form_fields_and_values
"""
Migration script to add 'name' attribute to the JSON dict which describes
a form definition field and the form values in the database. In the 'form_values'
table, the 'content' column is now a JSON dict instead of a list.
"""
import logging
from json import (
dumps,
loads
)
from sqlalchemy import (
MetaData,
Table
)
from galaxy.model.custom_types import _sniffnfix_pg9_hex
log = logging.getLogger(__name__)
metadata = MetaData()
[docs]def upgrade(migrate_engine):
print(__doc__)
metadata.bind = migrate_engine
metadata.reflect()
Table("form_definition", metadata, autoload=True)
Table("form_values", metadata, autoload=True)
def get_value(lst, index):
try:
return str(lst[index]).replace("'", "''")
except IndexError:
return ''
# Go through the entire table and add a 'name' attribute for each field
# in the list of fields for each form definition
cmd = "SELECT f.id, f.fields FROM form_definition AS f"
result = migrate_engine.execute(cmd)
for row in result:
form_definition_id = row[0]
fields = str(row[1])
if not fields.strip():
continue
fields_list = loads(_sniffnfix_pg9_hex(fields))
if len(fields_list):
for index, field in enumerate(fields_list):
field['name'] = 'field_%i' % index
field['helptext'] = field['helptext'].replace("'", "''").replace('"', "")
field['label'] = field['label'].replace("'", "''")
fields_json = dumps(fields_list)
if migrate_engine.name == 'mysql':
cmd = "UPDATE form_definition AS f SET f.fields='%s' WHERE f.id=%i" % (fields_json, form_definition_id)
else:
cmd = "UPDATE form_definition SET fields='%s' WHERE id=%i" % (fields_json, form_definition_id)
migrate_engine.execute(cmd)
# replace the values list in the content field of the form_values table with a name:value dict
cmd = "SELECT form_values.id, form_values.content, form_definition.fields" \
" FROM form_values, form_definition" \
" WHERE form_values.form_definition_id=form_definition.id" \
" ORDER BY form_values.id ASC"
result = migrate_engine.execute(cmd)
for row in result:
form_values_id = int(row[0])
if not str(row[1]).strip():
continue
row1 = str(row[1]).replace('\n', '').replace('\r', '')
values_list = loads(str(row1).strip())
if not str(row[2]).strip():
continue
fields_list = loads(str(row[2]).strip())
if fields_list and isinstance(values_list, list):
values_dict = {}
for field_index, field in enumerate(fields_list):
field_name = field['name']
values_dict[field_name] = get_value(values_list, field_index)
cmd = "UPDATE form_values SET content='%s' WHERE id=%i" % (dumps(values_dict), form_values_id)
migrate_engine.execute(cmd)
[docs]def downgrade(migrate_engine):
metadata.bind = migrate_engine
metadata.reflect()
Table("form_definition", metadata, autoload=True)
Table("form_values", metadata, autoload=True)
# remove the name attribute in the content column JSON dict in the form_values table
# and restore it to a list of values
cmd = "SELECT form_values.id, form_values.content, form_definition.fields" \
" FROM form_values, form_definition" \
" WHERE form_values.form_definition_id=form_definition.id" \
" ORDER BY form_values.id ASC"
result = migrate_engine.execute(cmd)
for row in result:
form_values_id = int(row[0])
if not str(row[1]).strip():
continue
values_dict = loads(str(row[1]))
if not str(row[2]).strip():
continue
fields_list = loads(str(row[2]))
if fields_list:
values_list = []
for field in fields_list:
field_name = field['name']
field_value = values_dict[field_name]
values_list.append(field_value)
cmd = "UPDATE form_values SET content='%s' WHERE id=%i" % (dumps(values_list), form_values_id)
migrate_engine.execute(cmd)
# remove name attribute from the field column of the form_definition table
cmd = "SELECT f.id, f.fields FROM form_definition AS f"
result = migrate_engine.execute(cmd)
for row in result:
form_definition_id = row[0]
fields = str(row[1])
if not fields.strip():
continue
fields_list = loads(_sniffnfix_pg9_hex(fields))
if len(fields_list):
for field in fields_list:
if 'name' in field:
del field['name']
if migrate_engine.name == 'mysql':
cmd = "UPDATE form_definition AS f SET f.fields='%s' WHERE f.id=%i" % (dumps(fields_list), form_definition_id)
else:
cmd = "UPDATE form_definition SET fields='%s' WHERE id=%i" % (dumps(fields_list), form_definition_id)
migrate_engine.execute(cmd)