Source code for galaxy.webapps.galaxy.controllers.forms

import copy
import csv
import logging
import re

from sqlalchemy import (
    false,
    true,
)

from galaxy import model
from galaxy.managers.forms import get_form
from galaxy.model.base import transaction
from galaxy.model.index_filter_util import (
    raw_text_column_filter,
    text_column_filter,
)
from galaxy.util.search import (
    FilteredTerm,
    parse_filters_structured,
    RawTextTerm,
)
from galaxy.web.framework.helpers import grids
from galaxy.webapps.base.controller import (
    BaseUIController,
    web,
)

log = logging.getLogger(__name__)

VALID_FIELDNAME_RE = re.compile(r"^[a-zA-Z0-9\_]+$")


[docs]class FormsGrid(grids.GridData): # Custom column types
[docs] class NameColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, form): return form.latest_form.name
[docs] class DescriptionColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, form): return form.latest_form.desc
[docs] class TypeColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, form): return form.latest_form.type
# Grid definition title = "Forms" model_class = model.FormDefinitionCurrent default_sort_key = "update_time" columns = [ NameColumn( "Name", key="name", model_class=model.FormDefinition, ), DescriptionColumn("Description", key="desc", model_class=model.FormDefinition), TypeColumn("Type", key="type", model_class=model.FormDefinition), grids.GridColumn("Last Updated", key="update_time"), grids.GridColumn("Deleted", key="deleted", escape=False), ]
[docs] def apply_query_filter(self, query, **kwargs): INDEX_SEARCH_FILTERS = { "name": "name", "description": "description", "is": "is", } deleted = False query = query.join(model.FormDefinition, self.model_class.latest_form_id == model.FormDefinition.id) if search_query := kwargs.get("search"): parsed_search = parse_filters_structured(search_query, INDEX_SEARCH_FILTERS) for term in parsed_search.terms: if isinstance(term, FilteredTerm): key = term.filter q = term.text if key == "name": query = query.filter(text_column_filter(model.FormDefinition.name, term)) elif key == "description": query = query.filter(text_column_filter(model.FormDefinition.desc, term)) elif key == "is": if q == "deleted": deleted = True elif isinstance(term, RawTextTerm): query = query.filter( raw_text_column_filter( [ model.FormDefinition.name, model.FormDefinition.desc, ], term, ) ) query = query.filter(self.model_class.deleted == (true() if deleted else false())) return query
[docs]class Forms(BaseUIController): forms_grid = FormsGrid()
[docs] @web.legacy_expose_api @web.require_admin def forms_list(self, trans, payload=None, **kwd): return self.forms_grid(trans, **kwd)
[docs] @web.legacy_expose_api @web.require_admin def create_form(self, trans, payload=None, **kwd): if trans.request.method == "GET": fd_types = sorted(trans.app.model.FormDefinition.types.__members__.items()) return { "title": "Create new form", "inputs": [ {"name": "name", "label": "Name"}, {"name": "desc", "label": "Description"}, { "name": "type", "type": "select", "options": [(ft[1], ft[1]) for ft in fd_types], "label": "Type", }, { "name": "csv_file", "label": "Import from CSV", "type": "upload", "help": "Import fields from CSV-file with the following format: Label, Help, Type, Value, Options, Required=True/False.", "optional": True, }, ], } else: # csv-file format: label, helptext, type, default, selectlist, required ''' csv_file = payload.get("csv_file") index = 0 if csv_file: lines = csv_file.splitlines() rows = csv.reader(lines) for row in rows: if len(row) >= 6: for column in range(len(row)): row[column] = str(row[column]).strip('"') prefix = "fields_%i|" % index payload[f"{prefix}name"] = "%i_imported_field" % (index + 1) payload[f"{prefix}label"] = row[0] payload[f"{prefix}helptext"] = row[1] payload[f"{prefix}type"] = row[2] payload[f"{prefix}default"] = row[3] payload[f"{prefix}selectlist"] = row[4] payload[f"{prefix}required"] = row[5].lower() == "true" index = index + 1 new_form, message = self.save_form_definition(trans, None, payload) if new_form is None: return self.message_exception(trans, message) imported = (" with %i imported fields" % index) if index > 0 else "" message = f"The form '{payload.get('name')}' has been created{imported}." return {"message": message}
[docs] @web.legacy_expose_api @web.require_admin def edit_form(self, trans, payload=None, **kwd): id = kwd.get("id") if not id: return self.message_exception(trans, "No form id received for editing.") form = get_form(trans, id) latest_form = form.latest_form if trans.request.method == "GET": fd_types = sorted(trans.app.model.FormDefinition.types.__members__.items()) ff_types = [(t.__name__, t.__name__) for t in trans.model.FormDefinition.supported_field_types] field_cache = [] field_inputs = [ { "name": "name", "label": "Name", "value": "field_name", "help": "The field name must be unique for each field and must contain only alphanumeric characters and underscore.", }, {"name": "label", "label": "Label", "value": "Field label"}, {"name": "helptext", "label": "Help text"}, {"name": "type", "label": "Type", "type": "select", "options": ff_types}, {"name": "default", "label": "Default value"}, { "name": "selectlist", "label": "Options", "help": "*Only for fields which allow multiple selections, provide comma-separated values.", }, {"name": "required", "label": "Required", "type": "boolean", "value": False}, ] form_dict = { "title": f"Edit form for '{latest_form.name}'", "inputs": [ {"name": "name", "label": "Name", "value": latest_form.name}, {"name": "desc", "label": "Description", "value": latest_form.desc}, { "name": "type", "type": "select", "options": [(ft[1], ft[1]) for ft in fd_types], "label": "Type", "value": latest_form.type, }, { "name": "fields", "title": "Field", "type": "repeat", "cache": field_cache, "inputs": field_inputs, }, ], } for field in latest_form.fields: new_field = copy.deepcopy(field_inputs) for field_input in new_field: field_value = field.get(field_input["name"]) if field_value: if isinstance(field_value, list): field_value = ",".join(field_value) field_input["value"] = str(field_value) field_cache.append(new_field) return form_dict else: new_form, message = self.save_form_definition(trans, id, payload) if new_form is None: return self.message_exception(trans, message) message = f"The form '{payload.get('name')}' has been updated." return {"message": message}
[docs] def get_current_form(self, trans, payload=None, **kwd): """ This method gets all the unsaved user-entered form details and returns a dictionary containing the name, desc, type, layout & fields of the form """ name = payload.get("name") desc = payload.get("desc") or "" type = payload.get("type") fields = [] index = 0 while True: prefix = "fields_%i|" % index if f"{prefix}label" in payload: field_attributes = ["name", "label", "helptext", "required", "type", "selectlist", "default"] field_dict = {attr: payload.get(f"{prefix}{attr}") for attr in field_attributes} field_dict["visible"] = True if isinstance(field_dict["selectlist"], str): field_dict["selectlist"] = field_dict["selectlist"].split(",") else: field_dict["selectlist"] = [] fields.append(field_dict) index = index + 1 else: break return dict(name=name, desc=desc, type=type, layout=[], fields=fields)
[docs] def save_form_definition(self, trans, form_id=None, payload=None, **kwd): """ This method saves a form given an id """ if not payload.get("name"): return None, "Please provide a form name." if payload.get("type") == "none": return None, "Please select a form type." current_form = self.get_current_form(trans, payload) # validate fields field_names_dict = {} for field in current_form["fields"]: if not field["label"]: return None, "All the field labels must be completed." if not VALID_FIELDNAME_RE.match(field["name"]): return None, f"{field['name']} is not a valid field name." if field["name"] in field_names_dict: return None, f"Each field name must be unique in the form definition. {field['name']} is not unique." else: field_names_dict[field["name"]] = 1 # create a new form definition form_definition = trans.app.model.FormDefinition( name=current_form["name"], desc=current_form["desc"], fields=current_form["fields"], form_definition_current=None, type=current_form["type"], layout=current_form["layout"], ) # save changes to the existing form if form_id: form_definition_current = trans.sa_session.query(trans.app.model.FormDefinitionCurrent).get( trans.security.decode_id(form_id) ) if form_definition_current is None: return None, f"Invalid form id ({form_id}) provided. Cannot save form." else: form_definition_current = trans.app.model.FormDefinitionCurrent() # create corresponding row in the form_definition_current table form_definition.form_definition_current = form_definition_current form_definition_current.latest_form = form_definition trans.sa_session.add(form_definition_current) with transaction(trans.sa_session): trans.sa_session.commit() return form_definition, None