Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.model.migrate.versions.0065_add_name_to_form_fields_and_values
"""
Migration script to add 'name' attribute to the JSON dict which describes
a form definition field and the form values in the database. In the 'form_values'
table, the 'content' column is now a JSON dict instead of a list.
"""
from __future__ import print_function
import logging
from json import (
dumps,
loads
)
from sqlalchemy import (
MetaData,
Table
)
from galaxy.model.custom_types import _sniffnfix_pg9_hex
log = logging.getLogger(__name__)
metadata = MetaData()
[docs]def upgrade(migrate_engine):
print(__doc__)
metadata.bind = migrate_engine
metadata.reflect()
Table("form_definition", metadata, autoload=True)
Table("form_values", metadata, autoload=True)
def get_value(lst, index):
try:
return str(lst[index]).replace("'", "''")
except IndexError:
return ''
# Go through the entire table and add a 'name' attribute for each field
# in the list of fields for each form definition
cmd = "SELECT f.id, f.fields FROM form_definition AS f"
result = migrate_engine.execute(cmd)
for row in result:
form_definition_id = row[0]
fields = str(row[1])
if not fields.strip():
continue
fields_list = loads(_sniffnfix_pg9_hex(fields))
if len(fields_list):
for index, field in enumerate(fields_list):
field['name'] = 'field_%i' % index
field['helptext'] = field['helptext'].replace("'", "''").replace('"', "")
field['label'] = field['label'].replace("'", "''")
fields_json = dumps(fields_list)
if migrate_engine.name == 'mysql':
cmd = "UPDATE form_definition AS f SET f.fields='%s' WHERE f.id=%i" % (fields_json, form_definition_id)
else:
cmd = "UPDATE form_definition SET fields='%s' WHERE id=%i" % (fields_json, form_definition_id)
migrate_engine.execute(cmd)
# replace the values list in the content field of the form_values table with a name:value dict
cmd = "SELECT form_values.id, form_values.content, form_definition.fields" \
" FROM form_values, form_definition" \
" WHERE form_values.form_definition_id=form_definition.id" \
" ORDER BY form_values.id ASC"
result = migrate_engine.execute(cmd)
for row in result:
form_values_id = int(row[0])
if not str(row[1]).strip():
continue
row1 = str(row[1]).replace('\n', '').replace('\r', '')
values_list = loads(str(row1).strip())
if not str(row[2]).strip():
continue
fields_list = loads(str(row[2]).strip())
if fields_list and isinstance(values_list, list):
values_dict = {}
for field_index, field in enumerate(fields_list):
field_name = field['name']
values_dict[field_name] = get_value(values_list, field_index)
cmd = "UPDATE form_values SET content='%s' WHERE id=%i" % (dumps(values_dict), form_values_id)
migrate_engine.execute(cmd)
[docs]def downgrade(migrate_engine):
metadata.bind = migrate_engine
metadata.reflect()
Table("form_definition", metadata, autoload=True)
Table("form_values", metadata, autoload=True)
# remove the name attribute in the content column JSON dict in the form_values table
# and restore it to a list of values
cmd = "SELECT form_values.id, form_values.content, form_definition.fields" \
" FROM form_values, form_definition" \
" WHERE form_values.form_definition_id=form_definition.id" \
" ORDER BY form_values.id ASC"
result = migrate_engine.execute(cmd)
for row in result:
form_values_id = int(row[0])
if not str(row[1]).strip():
continue
values_dict = loads(str(row[1]))
if not str(row[2]).strip():
continue
fields_list = loads(str(row[2]))
if fields_list:
values_list = []
for field_index, field in enumerate(fields_list):
field_name = field['name']
field_value = values_dict[field_name]
values_list.append(field_value)
cmd = "UPDATE form_values SET content='%s' WHERE id=%i" % (dumps(values_list), form_values_id)
migrate_engine.execute(cmd)
# remove name attribute from the field column of the form_definition table
cmd = "SELECT f.id, f.fields FROM form_definition AS f"
result = migrate_engine.execute(cmd)
for row in result:
form_definition_id = row[0]
fields = str(row[1])
if not fields.strip():
continue
fields_list = loads(_sniffnfix_pg9_hex(fields))
if len(fields_list):
for index, field in enumerate(fields_list):
if 'name' in field:
del field['name']
if migrate_engine.name == 'mysql':
cmd = "UPDATE form_definition AS f SET f.fields='%s' WHERE f.id=%i" % (dumps(fields_list), form_definition_id)
else:
cmd = "UPDATE form_definition SET fields='%s' WHERE id=%i" % (dumps(fields_list), form_definition_id)
migrate_engine.execute(cmd)