Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.webapps.galaxy.controllers.forms
import copy
import csv
import logging
import re
from sqlalchemy import (
false,
true,
)
from galaxy import model
from galaxy.managers.forms import get_form
from galaxy.model.base import transaction
from galaxy.model.index_filter_util import (
raw_text_column_filter,
text_column_filter,
)
from galaxy.util.search import (
FilteredTerm,
parse_filters_structured,
RawTextTerm,
)
from galaxy.web.framework.helpers import grids
from galaxy.webapps.base.controller import (
BaseUIController,
web,
)
log = logging.getLogger(__name__)
VALID_FIELDNAME_RE = re.compile(r"^[a-zA-Z0-9\_]+$")
[docs]class FormsGrid(grids.GridData):
# Custom column types
# Grid definition
title = "Forms"
model_class = model.FormDefinitionCurrent
default_sort_key = "update_time"
columns = [
NameColumn(
"Name",
key="name",
model_class=model.FormDefinition,
),
DescriptionColumn("Description", key="desc", model_class=model.FormDefinition),
TypeColumn("Type", key="type", model_class=model.FormDefinition),
grids.GridColumn("Last Updated", key="update_time"),
grids.GridColumn("Deleted", key="deleted", escape=False),
]
[docs] def apply_query_filter(self, query, **kwargs):
INDEX_SEARCH_FILTERS = {
"name": "name",
"description": "description",
"is": "is",
}
deleted = False
query = query.join(model.FormDefinition, self.model_class.latest_form_id == model.FormDefinition.id)
search_query = kwargs.get("search")
if search_query:
parsed_search = parse_filters_structured(search_query, INDEX_SEARCH_FILTERS)
for term in parsed_search.terms:
if isinstance(term, FilteredTerm):
key = term.filter
q = term.text
if key == "name":
query = query.filter(text_column_filter(model.FormDefinition.name, term))
elif key == "description":
query = query.filter(text_column_filter(model.FormDefinition.desc, term))
elif key == "is":
if q == "deleted":
deleted = True
elif isinstance(term, RawTextTerm):
query = query.filter(
raw_text_column_filter(
[
model.FormDefinition.name,
model.FormDefinition.desc,
],
term,
)
)
query = query.filter(self.model_class.deleted == (true() if deleted else false()))
return query
[docs]class Forms(BaseUIController):
forms_grid = FormsGrid()
[docs] @web.legacy_expose_api
@web.require_admin
def forms_list(self, trans, payload=None, **kwd):
return self.forms_grid(trans, **kwd)
[docs] @web.legacy_expose_api
@web.require_admin
def create_form(self, trans, payload=None, **kwd):
if trans.request.method == "GET":
fd_types = sorted(trans.app.model.FormDefinition.types.__members__.items())
return {
"title": "Create new form",
"inputs": [
{"name": "name", "label": "Name"},
{"name": "desc", "label": "Description"},
{
"name": "type",
"type": "select",
"options": [(ft[1], ft[1]) for ft in fd_types],
"label": "Type",
},
{
"name": "csv_file",
"label": "Import from CSV",
"type": "upload",
"help": "Import fields from CSV-file with the following format: Label, Help, Type, Value, Options, Required=True/False.",
"optional": True,
},
],
}
else:
# csv-file format: label, helptext, type, default, selectlist, required '''
csv_file = payload.get("csv_file")
index = 0
if csv_file:
lines = csv_file.splitlines()
rows = csv.reader(lines)
for row in rows:
if len(row) >= 6:
for column in range(len(row)):
row[column] = str(row[column]).strip('"')
prefix = "fields_%i|" % index
payload[f"{prefix}name"] = "%i_imported_field" % (index + 1)
payload[f"{prefix}label"] = row[0]
payload[f"{prefix}helptext"] = row[1]
payload[f"{prefix}type"] = row[2]
payload[f"{prefix}default"] = row[3]
payload[f"{prefix}selectlist"] = row[4]
payload[f"{prefix}required"] = row[5].lower() == "true"
index = index + 1
new_form, message = self.save_form_definition(trans, None, payload)
if new_form is None:
return self.message_exception(trans, message)
imported = (" with %i imported fields" % index) if index > 0 else ""
message = f"The form '{payload.get('name')}' has been created{imported}."
return {"message": message}
[docs] @web.legacy_expose_api
@web.require_admin
def edit_form(self, trans, payload=None, **kwd):
id = kwd.get("id")
if not id:
return self.message_exception(trans, "No form id received for editing.")
form = get_form(trans, id)
latest_form = form.latest_form
if trans.request.method == "GET":
fd_types = sorted(trans.app.model.FormDefinition.types.__members__.items())
ff_types = [(t.__name__, t.__name__) for t in trans.model.FormDefinition.supported_field_types]
field_cache = []
field_inputs = [
{
"name": "name",
"label": "Name",
"value": "field_name",
"help": "The field name must be unique for each field and must contain only alphanumeric characters and underscore.",
},
{"name": "label", "label": "Label", "value": "Field label"},
{"name": "helptext", "label": "Help text"},
{"name": "type", "label": "Type", "type": "select", "options": ff_types},
{"name": "default", "label": "Default value"},
{
"name": "selectlist",
"label": "Options",
"help": "*Only for fields which allow multiple selections, provide comma-separated values.",
},
{"name": "required", "label": "Required", "type": "boolean", "value": False},
]
form_dict = {
"title": f"Edit form for '{latest_form.name}'",
"inputs": [
{"name": "name", "label": "Name", "value": latest_form.name},
{"name": "desc", "label": "Description", "value": latest_form.desc},
{
"name": "type",
"type": "select",
"options": [(ft[1], ft[1]) for ft in fd_types],
"label": "Type",
"value": latest_form.type,
},
{
"name": "fields",
"title": "Field",
"type": "repeat",
"cache": field_cache,
"inputs": field_inputs,
},
],
}
for field in latest_form.fields:
new_field = copy.deepcopy(field_inputs)
for field_input in new_field:
field_value = field.get(field_input["name"])
if field_value:
if isinstance(field_value, list):
field_value = ",".join(field_value)
field_input["value"] = str(field_value)
field_cache.append(new_field)
return form_dict
else:
new_form, message = self.save_form_definition(trans, id, payload)
if new_form is None:
return self.message_exception(trans, message)
message = f"The form '{payload.get('name')}' has been updated."
return {"message": message}
[docs] def get_current_form(self, trans, payload=None, **kwd):
"""
This method gets all the unsaved user-entered form details and returns a
dictionary containing the name, desc, type, layout & fields of the form
"""
name = payload.get("name")
desc = payload.get("desc") or ""
type = payload.get("type")
fields = []
index = 0
while True:
prefix = "fields_%i|" % index
if f"{prefix}label" in payload:
field_attributes = ["name", "label", "helptext", "required", "type", "selectlist", "default"]
field_dict = {attr: payload.get(f"{prefix}{attr}") for attr in field_attributes}
field_dict["visible"] = True
if isinstance(field_dict["selectlist"], str):
field_dict["selectlist"] = field_dict["selectlist"].split(",")
else:
field_dict["selectlist"] = []
fields.append(field_dict)
index = index + 1
else:
break
return dict(name=name, desc=desc, type=type, layout=[], fields=fields)
[docs] def save_form_definition(self, trans, form_id=None, payload=None, **kwd):
"""
This method saves a form given an id
"""
if not payload.get("name"):
return None, "Please provide a form name."
if payload.get("type") == "none":
return None, "Please select a form type."
current_form = self.get_current_form(trans, payload)
# validate fields
field_names_dict = {}
for field in current_form["fields"]:
if not field["label"]:
return None, "All the field labels must be completed."
if not VALID_FIELDNAME_RE.match(field["name"]):
return None, f"{field['name']} is not a valid field name."
if field["name"] in field_names_dict:
return None, f"Each field name must be unique in the form definition. {field['name']} is not unique."
else:
field_names_dict[field["name"]] = 1
# create a new form definition
form_definition = trans.app.model.FormDefinition(
name=current_form["name"],
desc=current_form["desc"],
fields=current_form["fields"],
form_definition_current=None,
type=current_form["type"],
layout=current_form["layout"],
)
# save changes to the existing form
if form_id:
form_definition_current = trans.sa_session.query(trans.app.model.FormDefinitionCurrent).get(
trans.security.decode_id(form_id)
)
if form_definition_current is None:
return None, f"Invalid form id ({form_id}) provided. Cannot save form."
else:
form_definition_current = trans.app.model.FormDefinitionCurrent()
# create corresponding row in the form_definition_current table
form_definition.form_definition_current = form_definition_current
form_definition_current.latest_form = form_definition
trans.sa_session.add(form_definition_current)
with transaction(trans.sa_session):
trans.sa_session.commit()
return form_definition, None