Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.web.legacy_framework.grids

import logging
import math
from json import (
    dumps,
    loads,
)
from typing import (
    Dict,
    List,
    Optional,
)

from markupsafe import escape
from sqlalchemy.sql.expression import (
    and_,
    false,
    func,
    null,
    or_,
    true,
)

from galaxy.model.item_attrs import (
    get_foreign_key,
    UsesAnnotations,
    UsesItemRatings,
)
from galaxy.util import (
    sanitize_text,
    unicodify,
)
from galaxy.web.framework import (
    decorators,
    url_for,
)
from galaxy.web.framework.helpers import iff

log = logging.getLogger(__name__)


[docs]class GridColumn:
[docs] def __init__( self, label, key=None, model_class=None, method=None, format=None, link=None, attach_popup=False, visible=True, nowrap=False, # Valid values for filterable are ['standard', 'advanced', None] filterable=None, sortable=True, label_id_prefix=None, target=None, ): """Create a grid column.""" self.label = label self.key = key self.model_class = model_class self.method = method self.format = format self.link = link self.target = target self.nowrap = nowrap self.attach_popup = attach_popup self.visible = visible self.filterable = filterable # Column must have a key to be sortable. self.sortable = self.key is not None and sortable self.label_id_prefix = label_id_prefix or ""
[docs] def get_value(self, trans, grid, item): if self.method: value = getattr(grid, self.method)(trans, item) elif self.key: value = getattr(item, self.key) else: value = None if self.format: value = self.format(value) return escape(unicodify(value))
[docs] def filter(self, trans, user, query, column_filter): """Modify query to reflect the column filter.""" if column_filter == "All": pass if column_filter == "True": query = query.filter_by(**{self.key: True}) elif column_filter == "False": query = query.filter_by(**{self.key: False}) return query
[docs] def get_accepted_filters(self): """Returns a list of accepted filters for this column.""" accepted_filters_vals = ["False", "True", "All"] accepted_filters = [] for val in accepted_filters_vals: args = {self.key: val} accepted_filters.append(GridColumnFilter(val, args)) return accepted_filters
[docs] def sort(self, trans, query, ascending, column_name=None): """Sort query using this column.""" if column_name is None: column_name = self.key if ascending: query = query.order_by(self.model_class.__table__.c.get(column_name).asc()) else: query = query.order_by(self.model_class.__table__.c.get(column_name).desc()) return query
[docs]class ReverseSortColumn(GridColumn): """Column that reverses sorting; this is useful when the natural sort is descending."""
[docs] def sort(self, trans, query, ascending, column_name=None): return GridColumn.sort(self, trans, query, (not ascending), column_name=column_name)
[docs]class TextColumn(GridColumn): """Generic column that employs freetext and, hence, supports freetext, case-independent filtering."""
[docs] def filter(self, trans, user, query, column_filter): """Modify query to filter using free text, case independence.""" if column_filter == "All": pass elif column_filter: query = query.filter(self.get_filter(trans, user, column_filter)) return query
[docs] def get_filter(self, trans, user, column_filter): """Returns a SQLAlchemy criterion derived from column_filter.""" if isinstance(column_filter, str): return self.get_single_filter(user, column_filter) elif isinstance(column_filter, list): clause_list = [] for filter in column_filter: clause_list.append(self.get_single_filter(user, filter)) return and_(*clause_list)
[docs] def get_single_filter(self, user, a_filter): """ Returns a SQLAlchemy criterion derived for a single filter. Single filter is the most basic filter--usually a string--and cannot be a list. """ # Queries that include table joins cannot guarantee that table column names will be # unique, so check to see if a_filter is of type <TableName>.<ColumnName>. if self.key.find(".") > -1: a_key = self.key.split(".")[1] else: a_key = self.key model_class_key_field = getattr(self.model_class, a_key) return func.lower(model_class_key_field).like(f"%{a_filter.lower()}%")
[docs] def sort(self, trans, query, ascending, column_name=None): """Sort column using case-insensitive alphabetical sorting.""" if column_name is None: column_name = self.key if ascending: query = query.order_by(func.lower(self.model_class.__table__.c.get(column_name)).asc()) else: query = query.order_by(func.lower(self.model_class.__table__.c.get(column_name)).desc()) return query
[docs]class DateTimeColumn(TextColumn):
[docs] def sort(self, trans, query, ascending, column_name=None): """Sort query using this column.""" return GridColumn.sort(self, trans, query, ascending, column_name=column_name)
[docs]class BooleanColumn(TextColumn):
[docs] def sort(self, trans, query, ascending, column_name=None): """Sort query using this column.""" return GridColumn.sort(self, trans, query, ascending, column_name=column_name)
[docs] def get_single_filter(self, user, a_filter): if self.key.find(".") > -1: a_key = self.key.split(".")[1] else: a_key = self.key model_class_key_field = getattr(self.model_class, a_key) return model_class_key_field == a_filter
[docs]class IntegerColumn(TextColumn): """ Integer column that employs freetext, but checks that the text is an integer, so support filtering on integer values. IMPORTANT NOTE: grids that use this column type should not include the column in the cols_to_filter list of MulticolFilterColumn ( i.e., searching on this column type should not be performed in the grid's standard search - it won't throw exceptions, but it also will not find what you're looking for ). Grids that search on this column should use 'filterable="advanced"' so that searching is only performed in the advanced search component, restricting the search to the specific column. This is useful for searching on object ids or other integer columns. See the JobIdColumn column in the SpecifiedDateListGrid class in the jobs controller of the reports webapp for an example. """
[docs] def get_single_filter(self, user, a_filter): model_class_key_field = getattr(self.model_class, self.key) assert int(a_filter), "The search entry must be an integer" return model_class_key_field == int(a_filter)
[docs] def sort(self, trans, query, ascending, column_name=None): """Sort query using this column.""" return GridColumn.sort(self, trans, query, ascending, column_name=column_name)
[docs]class CommunityRatingColumn(GridColumn, UsesItemRatings): """Column that displays community ratings for an item."""
[docs] def get_value(self, trans, grid, item): ave_item_rating, num_ratings = self.get_ave_item_rating_data(trans.sa_session, item, webapp_model=trans.model) return trans.fill_template( "tool_shed_rating.mako", ave_item_rating=ave_item_rating, num_ratings=num_ratings, item_id=trans.security.encode_id(item.id), )
[docs] def sort(self, trans, query, ascending, column_name=None): # Get the columns that connect item's table and item's rating association table. item_rating_assoc_class = getattr(trans.model, f"{self.model_class.__name__}RatingAssociation") foreign_key = get_foreign_key(item_rating_assoc_class, self.model_class) fk_col = foreign_key.parent referent_col = foreign_key.get_referent(self.model_class.__table__) # Do sorting using a subquery. # Subquery to get average rating for each item. ave_rating_subquery = ( trans.sa_session.query(fk_col, func.avg(item_rating_assoc_class.table.c.rating).label("avg_rating")) .group_by(fk_col) .subquery() ) # Integrate subquery into main query. query = query.outerjoin((ave_rating_subquery, referent_col == ave_rating_subquery.columns[fk_col.name])) # Sort using subquery results; use coalesce to avoid null values. if ( not ascending ): # TODO: for now, reverse sorting b/c first sort is ascending, and that should be the natural sort. query = query.order_by(func.coalesce(ave_rating_subquery.c.avg_rating, 0).asc()) else: query = query.order_by(func.coalesce(ave_rating_subquery.c.avg_rating, 0).desc()) return query
[docs]class OwnerAnnotationColumn(TextColumn, UsesAnnotations): """Column that displays and filters item owner's annotations."""
[docs] def __init__(self, col_name, key, model_class=None, model_annotation_association_class=None, filterable=None): GridColumn.__init__(self, col_name, key=key, model_class=model_class, filterable=filterable) self.sortable = False self.model_annotation_association_class = model_annotation_association_class
[docs] def get_value(self, trans, grid, item): """Returns first 150 characters of annotation.""" annotation = self.get_item_annotation_str(trans.sa_session, item.user, item) if annotation: ann_snippet = annotation[:155] if len(annotation) > 155: ann_snippet = ann_snippet[: ann_snippet.rfind(" ")] ann_snippet += "..." else: ann_snippet = "" return escape(ann_snippet)
[docs] def get_single_filter(self, user, a_filter): """Filter by annotation and annotation owner.""" return self.model_class.annotations.any( and_( func.lower(self.model_annotation_association_class.annotation).like(f"%{a_filter.lower()}%"), # TODO: not sure why, to filter by owner's annotations, we have to do this rather than # 'self.model_class.user==self.model_annotation_association_class.user' self.model_annotation_association_class.table.c.user_id == self.model_class.table.c.user_id, ) )
[docs]class CommunityTagsColumn(TextColumn): """Column that supports community tags."""
[docs] def __init__( self, col_name, key, model_class=None, model_tag_association_class=None, filterable=None, grid_name=None ): GridColumn.__init__( self, col_name, key=key, model_class=model_class, nowrap=True, filterable=filterable, sortable=False ) self.model_tag_association_class = model_tag_association_class # Column-specific attributes. self.grid_name = grid_name
[docs] def get_value(self, trans, grid, item): return trans.fill_template( "/tagging_common.mako", tag_type="community", trans=trans, user=trans.get_user(), tagged_item=item, elt_context=self.grid_name, tag_click_fn="add_tag_to_grid_filter", use_toggle_link=True, )
[docs] def filter(self, trans, user, query, column_filter): """Modify query to filter model_class by tag. Multiple filters are ANDed.""" if column_filter == "All": pass elif column_filter: query = query.filter(self.get_filter(trans, user, column_filter)) return query
[docs] def get_filter(self, trans, user, column_filter): # Parse filter to extract multiple tags. if isinstance(column_filter, list): # Collapse list of tags into a single string; this is redundant but effective. TODO: fix this by iterating over tags. column_filter = ",".join(column_filter) raw_tags = trans.app.tag_handler.parse_tags(column_filter.encode("utf-8")) clause_list = [] for name, value in raw_tags: if name: # Filter by all tags. clause_list.append( self.model_class.tags.any( func.lower(self.model_tag_association_class.user_tname).like(f"%{name.lower()}%") ) ) if value: # Filter by all values. clause_list.append( self.model_class.tags.any( func.lower(self.model_tag_association_class.user_value).like(f"%{value.lower()}%") ) ) return and_(*clause_list)
[docs]class IndividualTagsColumn(CommunityTagsColumn): """Column that supports individual tags."""
[docs] def get_value(self, trans, grid, item): return trans.fill_template( "/tagging_common.mako", tag_type="individual", user=trans.user, tagged_item=item, elt_context=self.grid_name, tag_click_fn="add_tag_to_grid_filter", use_toggle_link=True, )
[docs] def get_filter(self, trans, user, column_filter): # Parse filter to extract multiple tags. if isinstance(column_filter, list): # Collapse list of tags into a single string; this is redundant but effective. TODO: fix this by iterating over tags. column_filter = ",".join(column_filter) raw_tags = trans.app.tag_handler.parse_tags(column_filter.encode("utf-8")) clause_list = [] for name, value in raw_tags: if name: # Filter by individual's tag names. clause_list.append( self.model_class.tags.any( and_( func.lower(self.model_tag_association_class.user_tname).like(f"%{name.lower()}%"), self.model_tag_association_class.user == user, ) ) ) if value: # Filter by individual's tag values. clause_list.append( self.model_class.tags.any( and_( func.lower(self.model_tag_association_class.user_value).like(f"%{value.lower()}%"), self.model_tag_association_class.user == user, ) ) ) return and_(*clause_list)
[docs]class MulticolFilterColumn(TextColumn): """Column that performs multicolumn filtering."""
[docs] def __init__(self, col_name, cols_to_filter, key, visible, filterable="default"): GridColumn.__init__(self, col_name, key=key, visible=visible, filterable=filterable) self.cols_to_filter = cols_to_filter
[docs] def filter(self, trans, user, query, column_filter): """Modify query to filter model_class by tag. Multiple filters are ANDed.""" if column_filter == "All": return query if isinstance(column_filter, list): clause_list = [] for filter in column_filter: part_clause_list = [] for column in self.cols_to_filter: part_clause_list.append(column.get_filter(trans, user, filter)) clause_list.append(or_(*part_clause_list)) complete_filter = and_(*clause_list) else: clause_list = [] for column in self.cols_to_filter: clause_list.append(column.get_filter(trans, user, column_filter)) complete_filter = or_(*clause_list) return query.filter(complete_filter)
[docs]class OwnerColumn(TextColumn): """Column that lists item's owner."""
[docs] def get_value(self, trans, grid, item): return item.user.username
[docs] def sort(self, trans, query, ascending, column_name=None): """Sort column using case-insensitive alphabetical sorting on item's username.""" if ascending: query = query.order_by(func.lower(self.model_class.username).asc()) else: query = query.order_by(func.lower(self.model_class.username).desc()) return query
[docs]class PublicURLColumn(TextColumn): """Column displays item's public URL based on username and slug."""
[docs]class DeletedColumn(GridColumn): """Column that tracks and filters for items with deleted attribute."""
[docs] def get_accepted_filters(self): """Returns a list of accepted filters for this column.""" accepted_filter_labels_and_vals = {"active": "False", "deleted": "True", "all": "All"} accepted_filters = [] for label, val in accepted_filter_labels_and_vals.items(): args = {self.key: val} accepted_filters.append(GridColumnFilter(label, args)) return accepted_filters
[docs] def filter(self, trans, user, query, column_filter): """Modify query to filter self.model_class by state.""" if column_filter == "All": pass elif column_filter in ["True", "False"]: query = query.filter(self.model_class.deleted == (column_filter == "True")) return query
[docs]class StateColumn(GridColumn): """ Column that tracks and filters for items with state attribute. """
[docs] def get_value(self, trans, grid, item): return item.state
[docs] def filter(self, trans, user, query, column_filter): """Modify query to filter self.model_class by state.""" if column_filter == "All": pass elif column_filter in list(self.model_class.states): query = query.filter(self.model_class.state == column_filter) return query
[docs] def get_accepted_filters(self): """Returns a list of accepted filters for this column.""" all = GridColumnFilter("all", {self.key: "All"}) accepted_filters = [all] for state in self.model_class.states: args = {self.key: state.value} accepted_filters.append(GridColumnFilter(state.value, args)) return accepted_filters
[docs]class SharingStatusColumn(GridColumn): """Grid column to indicate sharing status."""
[docs] def get_value(self, trans, grid, item): # Delete items cannot be shared. if item.deleted: return "" # Build a list of sharing for this item. sharing_statuses = [] if item.users_shared_with: sharing_statuses.append("Shared") if item.importable: sharing_statuses.append("Accessible") if item.published: sharing_statuses.append("Published") return ", ".join(sharing_statuses)
[docs] def filter(self, trans, user, query, column_filter): """Modify query to filter histories by sharing status.""" if column_filter == "All": pass elif column_filter: if column_filter == "private": query = query.filter(self.model_class.users_shared_with == null()) query = query.filter(self.model_class.importable == false()) elif column_filter == "shared": query = query.filter(self.model_class.users_shared_with != null()) elif column_filter == "accessible": query = query.filter(self.model_class.importable == true()) elif column_filter == "published": query = query.filter(self.model_class.published == true()) return query
[docs] def get_accepted_filters(self): """Returns a list of accepted filters for this column.""" accepted_filter_labels_and_vals = {} accepted_filter_labels_and_vals["private"] = "private" accepted_filter_labels_and_vals["shared"] = "shared" accepted_filter_labels_and_vals["accessible"] = "accessible" accepted_filter_labels_and_vals["published"] = "published" accepted_filter_labels_and_vals["all"] = "All" accepted_filters = [] for label, val in accepted_filter_labels_and_vals.items(): args = {self.key: val} accepted_filters.append(GridColumnFilter(label, args)) return accepted_filters
[docs]class GridOperation:
[docs] def __init__( self, label, key=None, condition=None, allow_multiple=True, allow_popup=True, target=None, url_args=None, async_compatible=False, confirm=None, global_operation=None, ): self.label = label self.key = key self.allow_multiple = allow_multiple self.allow_popup = allow_popup self.condition = condition self.target = target self.url_args = url_args self.async_compatible = async_compatible # if 'confirm' is set, then ask before completing the operation self.confirm = confirm # specify a general operation that acts on the full grid # this should be a function returning a dictionary with parameters # to pass to the URL, similar to GridColumn links: # global_operation=(lambda: dict(operation="download") self.global_operation = global_operation
[docs] def get_url_args(self, item): if self.url_args: if callable(self.url_args): url_args = self.url_args(item) else: url_args = dict(self.url_args) url_args["id"] = item.id return url_args else: return dict(operation=self.label, id=item.id)
[docs] def allowed(self, item): if self.condition: return bool(self.condition(item)) else: return True
[docs]class DisplayByUsernameAndSlugGridOperation(GridOperation): """Operation to display an item by username and slug."""
[docs] def get_url_args(self, item): return {"action": "display_by_username_and_slug", "username": item.user.username, "slug": item.slug}
[docs]class GridAction:
[docs] def __init__(self, label=None, url_args=None, target=None): self.label = label self.url_args = url_args self.target = target
[docs]class GridColumnFilter:
[docs] def __init__(self, label, args=None): self.label = label self.args = args
[docs] def get_url_args(self): rval = {} for k, v in self.args.items(): rval[f"f-{k}"] = v return rval
[docs]class Grid: """ Specifies the content and format of a grid (data table). """ title = "" model_class: Optional[type] = None show_item_checkboxes = False template = "legacy/grid_base.mako" async_template = "legacy/grid_base_async.mako" use_async = False use_hide_message = True global_actions: List[GridAction] = [] columns: List[GridColumn] = [] operations: List[GridOperation] = [] standard_filters: List[GridColumnFilter] = [] # Any columns that are filterable (either standard or advanced) should have a default value set in the default filter. default_filter: Dict[str, str] = {} default_sort_key: Optional[str] = None use_paging = False num_rows_per_page = 25 num_page_links = 10 allow_fetching_all_results = True # Set preference names. cur_filter_pref_name = ".filter" cur_sort_key_pref_name = ".sort_key" legend = None info_text = None
[docs] def __init__(self): # Determine if any multiple row operations are defined self.has_multiple_item_operations = False for operation in self.operations: if operation.allow_multiple: self.has_multiple_item_operations = True break # If a column does not have a model class, set the column's model class # to be the grid's model class. for column in self.columns: if not column.model_class: column.model_class = self.model_class
def __call__(self, trans, **kwargs): # Get basics. # FIXME: pretty sure this is only here to pass along, can likely be eliminated status = kwargs.get("status", None) message = kwargs.get("message", None) # Build a base filter and sort key that is the combination of the saved state and defaults. # Saved state takes preference over defaults. base_filter = {} if self.default_filter: # default_filter is a dictionary that provides a default set of filters based on the grid's columns. base_filter = self.default_filter.copy() base_sort_key = self.default_sort_key # Build initial query query = self.build_initial_query(trans, **kwargs) query = self.apply_query_filter(trans, query, **kwargs) # Maintain sort state in generated urls extra_url_args = {} # Determine whether use_default_filter flag is set. use_default_filter = False if use_default_filter_str := kwargs.get("use_default_filter"): use_default_filter = use_default_filter_str.lower() == "true" # Process filtering arguments to (a) build a query that represents the filter and (b) build a # dictionary that denotes the current filter. cur_filter_dict = {} for column in self.columns: if column.key: # Get the filter criterion for the column. Precedence is (a) if using default filter, only look there; otherwise, # (b) look in kwargs; and (c) look in base filter. column_filter = None if use_default_filter: if self.default_filter: column_filter = self.default_filter.get(column.key) elif f"f-{column.model_class.__name__}.{column.key}" in kwargs: # Queries that include table joins cannot guarantee unique column names. This problem is # handled by setting the column_filter value to <TableName>.<ColumnName>. column_filter = kwargs.get(f"f-{column.model_class.__name__}.{column.key}") elif f"f-{column.key}" in kwargs: column_filter = kwargs.get(f"f-{column.key}") elif column.key in base_filter: column_filter = base_filter.get(column.key) # Method (1) combines a mix of strings and lists of strings into a single string and (2) attempts to de-jsonify all strings. def loads_recurse(item): decoded_list = [] if isinstance(item, str): try: # Not clear what we're decoding, so recurse to ensure that we catch everything. decoded_item = loads(item) if isinstance(decoded_item, list): decoded_list = loads_recurse(decoded_item) else: decoded_list = [str(decoded_item)] except ValueError: decoded_list = [str(item)] elif isinstance(item, list): for element in item: a_list = loads_recurse(element) decoded_list = decoded_list + a_list return decoded_list # If column filter found, apply it. if column_filter is not None: # TextColumns may have a mix of json and strings. if isinstance(column, TextColumn): column_filter = loads_recurse(column_filter) if len(column_filter) == 1: column_filter = column_filter[0] # Interpret ',' as a separator for multiple terms. if isinstance(column_filter, str) and column_filter.find(",") != -1: column_filter = column_filter.split(",") # Check if filter is empty if isinstance(column_filter, list): # Remove empty strings from filter list column_filter = [x for x in column_filter if x != ""] if len(column_filter) == 0: continue elif isinstance(column_filter, str): # If filter criterion is empty, do nothing. if column_filter == "": continue # Update query. query = column.filter(trans, trans.user, query, column_filter) # Upate current filter dict. # Column filters are rendered in various places, sanitize them all here. cur_filter_dict[column.key] = sanitize_text(column_filter) # Carry filter along to newly generated urls; make sure filter is a string so # that we can encode to UTF-8 and thus handle user input to filters. if isinstance(column_filter, list): # Filter is a list; process each item. column_filter = [str(_).encode("utf-8") if not isinstance(_, str) else _ for _ in column_filter] extra_url_args[f"f-{column.key}"] = dumps(column_filter) else: # Process singleton filter. if not isinstance(column_filter, str): column_filter = str(column_filter) extra_url_args[f"f-{column.key}"] = column_filter.encode("utf-8") # Process sort arguments. sort_key = None if "sort" in kwargs: sort_key = kwargs["sort"] elif base_sort_key: sort_key = base_sort_key if sort_key: ascending = not (sort_key.startswith("-")) # Queries that include table joins cannot guarantee unique column names. This problem is # handled by setting the column_filter value to <TableName>.<ColumnName>. table_name = None if sort_key.find(".") > -1: a_list = sort_key.split(".") if ascending: table_name = a_list[0] else: table_name = a_list[0][1:] column_name = a_list[1] elif ascending: column_name = sort_key else: column_name = sort_key[1:] # Sort key is a column key. for column in self.columns: if column.key and column.key.find(".") > -1: column_key = column.key.split(".")[1] else: column_key = column.key if (table_name is None or table_name == column.model_class.__name__) and column_key == column_name: query = column.sort(trans, query, ascending, column_name=column_name) break extra_url_args["sort"] = sort_key # There might be a current row current_item = self.get_current_item(trans, **kwargs) # Process page number. if self.use_paging: if "page" in kwargs: if kwargs["page"] == "all": page_num = 0 if self.allow_fetching_all_results: page_num = 0 else: # Should make it harder to return all results at once page_num = 1 else: page_num = int(kwargs["page"]) else: page_num = 1 if page_num == 0: # Show all rows in page. total_num_rows = query.count() page_num = 1 num_pages = 1 else: # Show a limited number of rows. Before modifying query, get the total number of rows that query # returns so that the total number of pages can be computed. total_num_rows = query.count() query = query.limit(self.num_rows_per_page).offset((page_num - 1) * self.num_rows_per_page) num_pages = int(math.ceil(float(total_num_rows) / self.num_rows_per_page)) else: # Defaults. page_num = 1 num_pages = 1 # There are some places in grid templates where it's useful for a grid # to have its current filter. self.cur_filter_dict = cur_filter_dict # Log grid view. context = str(self.__class__.__name__) params = cur_filter_dict.copy() params["sort"] = sort_key params["async"] = "async" in kwargs # TODO:?? # commenting this out; when this fn calls session.add( action ) and session.flush the query from this fn # is effectively 'wiped' out. Nate believes it has something to do with our use of session( autocommit=True ) # in mapping.py. If you change that to False, the log_action doesn't affect the query # Below, I'm rendering the template first (that uses query), then calling log_action, then returning the page # trans.log_action( trans.get_user(), text_type( "grid.view" ), context, params ) # Render grid. def url(*args, **kwargs): route_name = kwargs.pop("__route_name__", None) # Only include sort/filter arguments if not linking to another # page. This is a bit of a hack. if "action" in kwargs: new_kwargs = {} else: new_kwargs = dict(extra_url_args) # Extend new_kwargs with first argument if found if len(args) > 0: new_kwargs.update(args[0]) new_kwargs.update(kwargs) # We need to encode item ids if "id" in new_kwargs: id = new_kwargs["id"] if isinstance(id, list): new_kwargs["id"] = [trans.security.encode_id(i) for i in id] else: new_kwargs["id"] = trans.security.encode_id(id) # The url_for invocation *must* include a controller and action. if "controller" not in new_kwargs: new_kwargs["controller"] = trans.controller if "action" not in new_kwargs: new_kwargs["action"] = trans.action if route_name: return url_for(route_name, **new_kwargs) return url_for(**new_kwargs) self.use_panels = kwargs.get("use_panels", False) in [True, "True", "true"] self.advanced_search = kwargs.get("advanced_search", False) in [True, "True", "true"] async_request = (self.use_async) and (kwargs.get("async", False) in [True, "True", "true"]) # Currently, filling the template returns a str object; this requires decoding the string into a # unicode object within mako templates. What probably should be done is to return the template as # utf-8 unicode; however, this would require encoding the object as utf-8 before returning the grid # results via a controller method, which is require substantial changes. Hence, for now, return grid # as str. page = trans.fill_template( iff(async_request, self.async_template, self.template), grid=self, query=query, cur_page_num=page_num, num_pages=num_pages, num_page_links=self.num_page_links, allow_fetching_all_results=self.allow_fetching_all_results, default_filter_dict=self.default_filter, cur_filter_dict=cur_filter_dict, sort_key=sort_key, current_item=current_item, ids=kwargs.get("id", []), url=url, status=status, message=message, info_text=self.info_text, use_panels=self.use_panels, use_hide_message=self.use_hide_message, advanced_search=self.advanced_search, show_item_checkboxes=( self.show_item_checkboxes or kwargs.get("show_item_checkboxes", "") in ["True", "true"] ), # Pass back kwargs so that grid template can set and use args without # grid explicitly having to pass them. kwargs=kwargs, ) trans.log_action(trans.get_user(), "grid.view", context, params) return page
[docs] def get_ids(self, **kwargs): id = [] if "id" in kwargs: id = kwargs["id"] # Coerce ids to list if not isinstance(id, list): id = id.split(",") # Ensure ids are integers try: id = list(map(int, id)) except Exception: decorators.error("Invalid id") return id
# ---- Override these ----------------------------------------------------
[docs] def handle_operation(self, trans, operation, ids, **kwargs): pass
[docs] def get_current_item(self, trans, **kwargs): return None
[docs] def build_initial_query(self, trans, **kwargs): return trans.sa_session.query(self.model_class)
[docs] def apply_query_filter(self, trans, query, **kwargs): # Applies a database filter that holds for all items in the grid. # (gvk) Is this method necessary? Why not simply build the entire query, # including applying filters in the build_initial_query() method? return query