Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.model.migrate.versions.0065_add_name_to_form_fields_and_values

"""
Migration script to add 'name' attribute to the JSON dict which describes
a form definition field and the form values in the database. In the 'form_values'
table, the 'content' column is now a JSON dict instead of a list.
"""
from __future__ import print_function

import logging
from json import (
    dumps,
    loads
)

from sqlalchemy import (
    MetaData,
    Table
)

from galaxy.model.custom_types import _sniffnfix_pg9_hex

log = logging.getLogger(__name__)
metadata = MetaData()


[docs]def upgrade(migrate_engine): print(__doc__) metadata.bind = migrate_engine metadata.reflect() Table("form_definition", metadata, autoload=True) Table("form_values", metadata, autoload=True) def get_value(lst, index): try: return str(lst[index]).replace("'", "''") except IndexError: return '' # Go through the entire table and add a 'name' attribute for each field # in the list of fields for each form definition cmd = "SELECT f.id, f.fields FROM form_definition AS f" result = migrate_engine.execute(cmd) for row in result: form_definition_id = row[0] fields = str(row[1]) if not fields.strip(): continue fields_list = loads(_sniffnfix_pg9_hex(fields)) if len(fields_list): for index, field in enumerate(fields_list): field['name'] = 'field_%i' % index field['helptext'] = field['helptext'].replace("'", "''").replace('"', "") field['label'] = field['label'].replace("'", "''") fields_json = dumps(fields_list) if migrate_engine.name == 'mysql': cmd = "UPDATE form_definition AS f SET f.fields='%s' WHERE f.id=%i" % (fields_json, form_definition_id) else: cmd = "UPDATE form_definition SET fields='%s' WHERE id=%i" % (fields_json, form_definition_id) migrate_engine.execute(cmd) # replace the values list in the content field of the form_values table with a name:value dict cmd = "SELECT form_values.id, form_values.content, form_definition.fields" \ " FROM form_values, form_definition" \ " WHERE form_values.form_definition_id=form_definition.id" \ " ORDER BY form_values.id ASC" result = migrate_engine.execute(cmd) for row in result: form_values_id = int(row[0]) if not str(row[1]).strip(): continue row1 = str(row[1]).replace('\n', '').replace('\r', '') values_list = loads(str(row1).strip()) if not str(row[2]).strip(): continue fields_list = loads(str(row[2]).strip()) if fields_list and isinstance(values_list, list): values_dict = {} for field_index, field in enumerate(fields_list): field_name = field['name'] values_dict[field_name] = get_value(values_list, field_index) cmd = "UPDATE form_values SET content='%s' WHERE id=%i" % (dumps(values_dict), form_values_id) migrate_engine.execute(cmd)
[docs]def downgrade(migrate_engine): metadata.bind = migrate_engine metadata.reflect() Table("form_definition", metadata, autoload=True) Table("form_values", metadata, autoload=True) # remove the name attribute in the content column JSON dict in the form_values table # and restore it to a list of values cmd = "SELECT form_values.id, form_values.content, form_definition.fields" \ " FROM form_values, form_definition" \ " WHERE form_values.form_definition_id=form_definition.id" \ " ORDER BY form_values.id ASC" result = migrate_engine.execute(cmd) for row in result: form_values_id = int(row[0]) if not str(row[1]).strip(): continue values_dict = loads(str(row[1])) if not str(row[2]).strip(): continue fields_list = loads(str(row[2])) if fields_list: values_list = [] for field_index, field in enumerate(fields_list): field_name = field['name'] field_value = values_dict[field_name] values_list.append(field_value) cmd = "UPDATE form_values SET content='%s' WHERE id=%i" % (dumps(values_list), form_values_id) migrate_engine.execute(cmd) # remove name attribute from the field column of the form_definition table cmd = "SELECT f.id, f.fields FROM form_definition AS f" result = migrate_engine.execute(cmd) for row in result: form_definition_id = row[0] fields = str(row[1]) if not fields.strip(): continue fields_list = loads(_sniffnfix_pg9_hex(fields)) if len(fields_list): for index, field in enumerate(fields_list): if 'name' in field: del field['name'] if migrate_engine.name == 'mysql': cmd = "UPDATE form_definition AS f SET f.fields='%s' WHERE f.id=%i" % (dumps(fields_list), form_definition_id) else: cmd = "UPDATE form_definition SET fields='%s' WHERE id=%i" % (dumps(fields_list), form_definition_id) migrate_engine.execute(cmd)