Warning

This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.

Source code for galaxy.util.object_wrapper

"""
Classes for wrapping Objects and Sanitizing string output.
"""
from __future__ import absolute_import

import collections
import inspect
import logging
import string
import sys
from numbers import Number
from types import (
    BuiltinFunctionType,
    BuiltinMethodType,
    CodeType,
    FrameType,
    FunctionType,
    GeneratorType,
    GetSetDescriptorType,
    MemberDescriptorType,
    MethodType,
    ModuleType,
    TracebackType,
)

try:
    from types import NoneType
except ImportError:
    NoneType = type(None)
try:
    from types import NotImplementedType
except ImportError:
    NotImplementedType = type(NotImplemented)

try:
    from types import EllipsisType
except ImportError:
    EllipsisType = type(Ellipsis)

try:
    from types import XRangeType
except ImportError:
    XRangeType = range

try:
    from types import SliceType
except ImportError:
    SliceType = slice

try:
    from types import (
        BufferType,
        DictProxyType
    )
except ImportError:
    # Py3 doesn't have these concepts, just treat them like SliceType that
    # so they are __WRAP_NO_SUBCLASS__.
    BufferType = SliceType
    DictProxyType = SliceType

from six.moves import (
    copyreg as copy_reg,
    UserDict
)

from galaxy.util import sanitize_lists_to_string as _sanitize_lists_to_string

log = logging.getLogger(__name__)

# Define different behaviors for different types, see also: https://docs.python.org/2/library/types.html

# Known Callable types
__CALLABLE_TYPES__ = (FunctionType, MethodType, GeneratorType, CodeType, BuiltinFunctionType, BuiltinMethodType, )

# Always wrap these types without attempting to subclass
__WRAP_NO_SUBCLASS__ = (ModuleType, XRangeType, SliceType, BufferType, TracebackType, FrameType, DictProxyType,
                        GetSetDescriptorType, MemberDescriptorType) + __CALLABLE_TYPES__

# Don't wrap or sanitize.
__DONT_SANITIZE_TYPES__ = (Number, bool, NoneType, NotImplementedType, EllipsisType, bytearray, )

# Don't wrap, but do sanitize.
__DONT_WRAP_TYPES__ = tuple()  # ( basestring, ) so that we can get the unsanitized string, we will now wrap basestring instances

# Wrap contents, but not the container
__WRAP_SEQUENCES__ = (tuple, list, )
__WRAP_SETS__ = (set, frozenset, )
__WRAP_MAPPINGS__ = (dict, UserDict, )


# Define the set of characters that are not sanitized, and define a set of mappings for those that are.
# characters that are valid
VALID_CHARACTERS = set(string.ascii_letters + string.digits + " -=_.()/+*^,:?!@")

# characters that are allowed but need to be escaped
CHARACTER_MAP = {'>': '__gt__',
                 '<': '__lt__',
                 "'": '__sq__',
                 '"': '__dq__',
                 '[': '__ob__',
                 ']': '__cb__',
                 '{': '__oc__',
                 '}': '__cc__',
                 '\n': '__cn__',
                 '\r': '__cr__',
                 '\t': '__tc__',
                 '#': '__pd__'}

INVALID_CHARACTER = "X"

if sys.version_info > (3, 0):
    # __coerce__ doesn't do anything under Python anyway.
[docs] def coerce(x, y): return x
[docs]def cmp(x, y): # Builtin in Python 2, but not Python 3. return (x > y) - (x < y)
[docs]def sanitize_lists_to_string(values, valid_characters=VALID_CHARACTERS, character_map=CHARACTER_MAP, invalid_character=INVALID_CHARACTER): return _sanitize_lists_to_string(values, valid_characters=valid_characters, character_map=character_map, invalid_character=invalid_character)
[docs]def wrap_with_safe_string(value, no_wrap_classes=None): """ Recursively wrap values that should be wrapped. """ def __do_wrap(value): if isinstance(value, SafeStringWrapper): # Only ever wrap one-layer return value if isinstance(value, collections.Callable): safe_class = CallableSafeStringWrapper else: safe_class = SafeStringWrapper if isinstance(value, no_wrap_classes): return value if isinstance(value, __DONT_WRAP_TYPES__): return sanitize_lists_to_string(value, valid_characters=VALID_CHARACTERS, character_map=CHARACTER_MAP) if isinstance(value, __WRAP_NO_SUBCLASS__): return safe_class(value, safe_string_wrapper_function=__do_wrap) for this_type in __WRAP_SEQUENCES__ + __WRAP_SETS__: if isinstance(value, this_type): return this_type(list(map(__do_wrap, value))) for this_type in __WRAP_MAPPINGS__: if isinstance(value, this_type): # Wrap both key and value return this_type((__do_wrap(x[0]), __do_wrap(x[1])) for x in value.items()) # Create a dynamic class that joins SafeStringWrapper with the object being wrapped. # This allows e.g. isinstance to continue to work. try: wrapped_class_name = value.__name__ wrapped_class = value except Exception: wrapped_class_name = value.__class__.__name__ wrapped_class = value.__class__ value_mod = inspect.getmodule(value) if value_mod: wrapped_class_name = "%s.%s" % (value_mod.__name__, wrapped_class_name) wrapped_class_name = "SafeStringWrapper(%s:%s)" % (wrapped_class_name, ",".join(sorted(map(str, no_wrap_classes)))) do_wrap_func_name = "__do_wrap_%s" % (wrapped_class_name) do_wrap_func = __do_wrap global_dict = globals() if wrapped_class_name in global_dict: # Check to see if we have created a wrapper for this class yet, if so, reuse wrapped_class = global_dict.get(wrapped_class_name) do_wrap_func = global_dict.get(do_wrap_func_name, __do_wrap) else: try: wrapped_class = type(wrapped_class_name, (safe_class, wrapped_class, ), {}) except TypeError as e: # Fail-safe for when a class cannot be dynamically subclassed. log.warning("Unable to create dynamic subclass for %s, %s: %s", type(value), value, e) wrapped_class = type(wrapped_class_name, (safe_class, ), {}) if wrapped_class not in (SafeStringWrapper, CallableSafeStringWrapper): # Save this wrapper for reuse and pickling/copying global_dict[wrapped_class_name] = wrapped_class do_wrap_func.__name__ = do_wrap_func_name global_dict[do_wrap_func_name] = do_wrap_func def pickle_safe_object(safe_object): return (wrapped_class, (safe_object.unsanitized, do_wrap_func, )) # Set pickle and copy properties copy_reg.pickle(wrapped_class, pickle_safe_object, do_wrap_func) return wrapped_class(value, safe_string_wrapper_function=do_wrap_func) # Determine classes not to wrap if no_wrap_classes: if not isinstance(no_wrap_classes, (tuple, list)): no_wrap_classes = [no_wrap_classes] no_wrap_classes = list(no_wrap_classes) + list(__DONT_SANITIZE_TYPES__) + [SafeStringWrapper] else: no_wrap_classes = list(__DONT_SANITIZE_TYPES__) + [SafeStringWrapper] no_wrap_classes = tuple(set(sorted(no_wrap_classes, key=str))) return __do_wrap(value)
# N.B. refer to e.g. https://docs.python.org/2/reference/datamodel.html for information on Python's Data Model.
[docs]class SafeStringWrapper(object): """ Class that wraps and sanitizes any provided value's attributes that will attempt to be cast into a string. Attempts to mimic behavior of original class, including operands. To ensure proper handling of e.g. subclass checks, the *wrap_with_safe_string()* method should be used. This wrapping occurs in a recursive/parasitic fashion, as all called attributes of the originally wrapped object will also be wrapped and sanitized, unless the attribute is of a type found in __DONT_SANITIZE_TYPES__ + __DONT_WRAP_TYPES__, where e.g. ~(strings will still be sanitized, but not wrapped), and e.g. integers will have neither. """ __UNSANITIZED_ATTRIBUTE_NAME__ = 'unsanitized' __NO_WRAP_NAMES__ = ['__safe_string_wrapper_function__', '__class__', __UNSANITIZED_ATTRIBUTE_NAME__] def __new__(cls, *arg, **kwd): # We need to define a __new__ since, we are subclassing from e.g. immutable str, which internally sets data # that will be used when other + this (this + other is handled by __add__) try: sanitized_value = sanitize_lists_to_string(arg[0], valid_characters=VALID_CHARACTERS, character_map=CHARACTER_MAP) return super(SafeStringWrapper, cls).__new__(cls, sanitized_value) except TypeError: # Class to be wrapped takes no parameters. # This is pefectly normal for mutable types. return super(SafeStringWrapper, cls).__new__(cls)
[docs] def __init__(self, value, safe_string_wrapper_function=wrap_with_safe_string): self.unsanitized = value self.__safe_string_wrapper_function__ = safe_string_wrapper_function
def __str__(self): return sanitize_lists_to_string(self.unsanitized, valid_characters=VALID_CHARACTERS, character_map=CHARACTER_MAP) def __repr__(self): return "%s object at %x on: %s" % (sanitize_lists_to_string(self.__class__.__name__, valid_characters=VALID_CHARACTERS, character_map=CHARACTER_MAP), id(self), sanitize_lists_to_string(repr(self.unsanitized), valid_characters=VALID_CHARACTERS, character_map=CHARACTER_MAP)) def __lt__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.unsanitized < other def __le__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.unsanitized <= other def __eq__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.unsanitized == other def __ne__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.unsanitized != other def __gt__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.unsanitized > other def __ge__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.unsanitized >= other def __cmp__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return cmp(self.unsanitized, other) # Do not implement __rcmp__, python 2.2 < 2.6 def __hash__(self): return hash(self.unsanitized) def __bool__(self): return bool(self.unsanitized) __nonzero__ = __bool__ # Do not implement __unicode__, we will rely on __str__ def __getattr__(self, name): if name in SafeStringWrapper.__NO_WRAP_NAMES__: # FIXME: is this ever reached? return object.__getattribute__(self, name) return self.__safe_string_wrapper_function__(getattr(self.unsanitized, name)) def __setattr__(self, name, value): if name in SafeStringWrapper.__NO_WRAP_NAMES__: return object.__setattr__(self, name, value) return setattr(self.unsanitized, name, value) def __delattr__(self, name): if name in SafeStringWrapper.__NO_WRAP_NAMES__: return object.__delattr__(self, name) return delattr(self.unsanitized, name) def __getattribute__(self, name): if name in SafeStringWrapper.__NO_WRAP_NAMES__: return object.__getattribute__(self, name) return self.__safe_string_wrapper_function__(getattr(object.__getattribute__(self, 'unsanitized'), name)) # Skip Descriptors # Skip __slots__ # Don't need to define a metaclass, we'll use the helper function to handle with subclassing for e.g. isinstance() # Revisit: # __instancecheck__ # __subclasscheck__ # We are using a helper class to create dynamic subclasses to handle class checks # We address __call__ as needed based upon unsanitized, through the use of a CallableSafeStringWrapper class def __len__(self): original_value = self.unsanitized while isinstance(original_value, SafeStringWrapper): original_value = self.unsanitized return len(self.unsanitized) def __getitem__(self, key): return self.__safe_string_wrapper_function__(self.unsanitized[key]) def __setitem__(self, key, value): while isinstance(value, SafeStringWrapper): value = value.unsanitized self.unsanitized[key] = value def __delitem__(self, key): del self.unsanitized[key] def __iter__(self): return iter(map(self.__safe_string_wrapper_function__, iter(self.unsanitized))) # Do not implement __reversed__ def __contains__(self, item): # FIXME: Do we need to consider if item is/isn't or does/doesn't contain SafeStringWrapper? # When considering e.g. nested lists/dicts/etc, this gets complicated while isinstance(item, SafeStringWrapper): item = item.unsanitized return item in self.unsanitized # Not sure that we need these slice methods, but will provide anyway def __getslice__(self, i, j): return self.__safe_string_wrapper_function__(self.unsanitized[i:j]) def __setslice__(self, i, j, value): self.unsanitized[i:j] = value def __delslice__(self, i, j): del self.unsanitized[i:j] def __add__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.__safe_string_wrapper_function__(self.unsanitized + other) def __sub__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.__safe_string_wrapper_function__(self.unsanitized - other) def __mul__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.__safe_string_wrapper_function__(self.unsanitized * other) def __floordiv__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.__safe_string_wrapper_function__(self.unsanitized // other) def __mod__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.__safe_string_wrapper_function__(self.unsanitized % other) def __divmod__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.__safe_string_wrapper_function__(divmod(self.unsanitized, other)) def __pow__(self, *other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.__safe_string_wrapper_function__(pow(self.unsanitized, *other)) def __lshift__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.__safe_string_wrapper_function__(self.unsanitized << other) def __rshift__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.__safe_string_wrapper_function__(self.unsanitized >> other) def __and__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.__safe_string_wrapper_function__(self.unsanitized & other) def __xor__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.__safe_string_wrapper_function__(self.unsanitized ^ other) def __or__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.__safe_string_wrapper_function__(self.unsanitized | other) def __div__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.__safe_string_wrapper_function__(self.unsanitized / other) def __truediv__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.__safe_string_wrapper_function__(self.unsanitized / other) # The only reflected operand that we will define is __rpow__, due to coercion rules complications as per docs def __rpow__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return self.__safe_string_wrapper_function__(pow(other, self.unsanitized)) # Do not implement in-place operands def __neg__(self): return self.__safe_string_wrapper_function__(-self.unsanitized) def __pos__(self): return self.__safe_string_wrapper_function__(+self.unsanitized) def __abs__(self): return self.__safe_string_wrapper_function__(abs(self.unsanitized)) def __invert__(self): return self.__safe_string_wrapper_function__(~self.unsanitized) def __complex__(self): return self.__safe_string_wrapper_function__(complex(self.unsanitized)) def __int__(self): return int(self.unsanitized) def __float__(self): return float(self.unsanitized) def __oct__(self): return oct(self.unsanitized) def __hex__(self): return hex(self.unsanitized) def __index__(self): return self.unsanitized.index() def __coerce__(self, other): while isinstance(other, SafeStringWrapper): other = other.unsanitized return coerce(self.unsanitized, other) def __enter__(self): return self.unsanitized.__enter__() def __exit__(self, *args): return self.unsanitized.__exit__(*args)
[docs]class CallableSafeStringWrapper(SafeStringWrapper): def __call__(self, *args, **kwds): return self.__safe_string_wrapper_function__(self.unsanitized(*args, **kwds))
# Enable pickling/deepcopy
[docs]def pickle_SafeStringWrapper(safe_object): args = (safe_object.unsanitized, ) cls = SafeStringWrapper if isinstance(safe_object, CallableSafeStringWrapper): cls = CallableSafeStringWrapper return (cls, args)
copy_reg.pickle(SafeStringWrapper, pickle_SafeStringWrapper, wrap_with_safe_string) copy_reg.pickle(CallableSafeStringWrapper, pickle_SafeStringWrapper, wrap_with_safe_string)