Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.webapps.galaxy.controllers.forms

import copy
import csv
import logging
import re

from sqlalchemy import (
    false,
    true,
)

from galaxy import model
from galaxy.managers.forms import get_form
from galaxy.model.base import transaction
from galaxy.model.index_filter_util import (
    raw_text_column_filter,
    text_column_filter,
)
from galaxy.util.search import (
    FilteredTerm,
    parse_filters_structured,
    RawTextTerm,
)
from galaxy.web.framework.helpers import grids
from galaxy.webapps.base.controller import (
    BaseUIController,
    web,
)

log = logging.getLogger(__name__)

VALID_FIELDNAME_RE = re.compile(r"^[a-zA-Z0-9\_]+$")


[docs]class FormsGrid(grids.GridData): # Custom column types
[docs] class NameColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, form): return form.latest_form.name
[docs] class DescriptionColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, form): return form.latest_form.desc
[docs] class TypeColumn(grids.GridColumn):
[docs] def get_value(self, trans, grid, form): return form.latest_form.type
# Grid definition title = "Forms" model_class = model.FormDefinitionCurrent default_sort_key = "update_time" columns = [ NameColumn( "Name", key="name", model_class=model.FormDefinition, ), DescriptionColumn("Description", key="desc", model_class=model.FormDefinition), TypeColumn("Type", key="type", model_class=model.FormDefinition), grids.GridColumn("Last Updated", key="update_time"), grids.GridColumn("Deleted", key="deleted", escape=False), ]
[docs] def apply_query_filter(self, query, **kwargs): INDEX_SEARCH_FILTERS = { "name": "name", "description": "description", "is": "is", } deleted = False query = query.join(model.FormDefinition, self.model_class.latest_form_id == model.FormDefinition.id) if search_query := kwargs.get("search"): parsed_search = parse_filters_structured(search_query, INDEX_SEARCH_FILTERS) for term in parsed_search.terms: if isinstance(term, FilteredTerm): key = term.filter q = term.text if key == "name": query = query.filter(text_column_filter(model.FormDefinition.name, term)) elif key == "description": query = query.filter(text_column_filter(model.FormDefinition.desc, term)) elif key == "is": if q == "deleted": deleted = True elif isinstance(term, RawTextTerm): query = query.filter( raw_text_column_filter( [ model.FormDefinition.name, model.FormDefinition.desc, ], term, ) ) query = query.filter(self.model_class.deleted == (true() if deleted else false())) return query
[docs]class Forms(BaseUIController): forms_grid = FormsGrid()
[docs] @web.legacy_expose_api @web.require_admin def forms_list(self, trans, payload=None, **kwd): return self.forms_grid(trans, **kwd)
[docs] @web.legacy_expose_api @web.require_admin def create_form(self, trans, payload=None, **kwd): if trans.request.method == "GET": fd_types = sorted(trans.app.model.FormDefinition.types.__members__.items()) return { "title": "Create new form", "inputs": [ {"name": "name", "label": "Name"}, {"name": "desc", "label": "Description"}, { "name": "type", "type": "select", "options": [(ft[1], ft[1]) for ft in fd_types], "label": "Type", }, { "name": "csv_file", "label": "Import from CSV", "type": "upload", "help": "Import fields from CSV-file with the following format: Label, Help, Type, Value, Options, Required=True/False.", "optional": True, }, ], } else: # csv-file format: label, helptext, type, default, selectlist, required ''' csv_file = payload.get("csv_file") index = 0 if csv_file: lines = csv_file.splitlines() rows = csv.reader(lines) for row in rows: if len(row) >= 6: for column in range(len(row)): row[column] = str(row[column]).strip('"') prefix = "fields_%i|" % index payload[f"{prefix}name"] = "%i_imported_field" % (index + 1) payload[f"{prefix}label"] = row[0] payload[f"{prefix}helptext"] = row[1] payload[f"{prefix}type"] = row[2] payload[f"{prefix}default"] = row[3] payload[f"{prefix}selectlist"] = row[4] payload[f"{prefix}required"] = row[5].lower() == "true" index = index + 1 new_form, message = self.save_form_definition(trans, None, payload) if new_form is None: return self.message_exception(trans, message) imported = (" with %i imported fields" % index) if index > 0 else "" message = f"The form '{payload.get('name')}' has been created{imported}." return {"message": message}
[docs] @web.legacy_expose_api @web.require_admin def edit_form(self, trans, payload=None, **kwd): id = kwd.get("id") if not id: return self.message_exception(trans, "No form id received for editing.") form = get_form(trans, id) latest_form = form.latest_form if trans.request.method == "GET": fd_types = sorted(trans.app.model.FormDefinition.types.__members__.items()) ff_types = [(t.__name__, t.__name__) for t in trans.model.FormDefinition.supported_field_types] field_cache = [] field_inputs = [ { "name": "name", "label": "Name", "value": "field_name", "help": "The field name must be unique for each field and must contain only alphanumeric characters and underscore.", }, {"name": "label", "label": "Label", "value": "Field label"}, {"name": "helptext", "label": "Help text"}, {"name": "type", "label": "Type", "type": "select", "options": ff_types}, {"name": "default", "label": "Default value"}, { "name": "selectlist", "label": "Options", "help": "*Only for fields which allow multiple selections, provide comma-separated values.", }, {"name": "required", "label": "Required", "type": "boolean", "value": False}, ] form_dict = { "title": f"Edit form for '{latest_form.name}'", "inputs": [ {"name": "name", "label": "Name", "value": latest_form.name}, {"name": "desc", "label": "Description", "value": latest_form.desc}, { "name": "type", "type": "select", "options": [(ft[1], ft[1]) for ft in fd_types], "label": "Type", "value": latest_form.type, }, { "name": "fields", "title": "Field", "type": "repeat", "cache": field_cache, "inputs": field_inputs, }, ], } for field in latest_form.fields: new_field = copy.deepcopy(field_inputs) for field_input in new_field: field_value = field.get(field_input["name"]) if field_value: if isinstance(field_value, list): field_value = ",".join(field_value) field_input["value"] = str(field_value) field_cache.append(new_field) return form_dict else: new_form, message = self.save_form_definition(trans, id, payload) if new_form is None: return self.message_exception(trans, message) message = f"The form '{payload.get('name')}' has been updated." return {"message": message}
[docs] def get_current_form(self, trans, payload=None, **kwd): """ This method gets all the unsaved user-entered form details and returns a dictionary containing the name, desc, type, layout & fields of the form """ name = payload.get("name") desc = payload.get("desc") or "" type = payload.get("type") fields = [] index = 0 while True: prefix = "fields_%i|" % index if f"{prefix}label" in payload: field_attributes = ["name", "label", "helptext", "required", "type", "selectlist", "default"] field_dict = {attr: payload.get(f"{prefix}{attr}") for attr in field_attributes} field_dict["visible"] = True if isinstance(field_dict["selectlist"], str): field_dict["selectlist"] = field_dict["selectlist"].split(",") else: field_dict["selectlist"] = [] fields.append(field_dict) index = index + 1 else: break return dict(name=name, desc=desc, type=type, layout=[], fields=fields)
[docs] def save_form_definition(self, trans, form_id=None, payload=None, **kwd): """ This method saves a form given an id """ if not payload.get("name"): return None, "Please provide a form name." if payload.get("type") == "none": return None, "Please select a form type." current_form = self.get_current_form(trans, payload) # validate fields field_names_dict = {} for field in current_form["fields"]: if not field["label"]: return None, "All the field labels must be completed." if not VALID_FIELDNAME_RE.match(field["name"]): return None, f"{field['name']} is not a valid field name." if field["name"] in field_names_dict: return None, f"Each field name must be unique in the form definition. {field['name']} is not unique." else: field_names_dict[field["name"]] = 1 # create a new form definition form_definition = trans.app.model.FormDefinition( name=current_form["name"], desc=current_form["desc"], fields=current_form["fields"], form_definition_current=None, type=current_form["type"], layout=current_form["layout"], ) # save changes to the existing form if form_id: form_definition_current = trans.sa_session.query(trans.app.model.FormDefinitionCurrent).get( trans.security.decode_id(form_id) ) if form_definition_current is None: return None, f"Invalid form id ({form_id}) provided. Cannot save form." else: form_definition_current = trans.app.model.FormDefinitionCurrent() # create corresponding row in the form_definition_current table form_definition.form_definition_current = form_definition_current form_definition_current.latest_form = form_definition trans.sa_session.add(form_definition_current) with transaction(trans.sa_session): trans.sa_session.commit() return form_definition, None