Source code for galaxy.web.security

import collections
import logging
import os
import os.path

from Crypto.Cipher import Blowfish
from Crypto.Util import number
from Crypto.Util.randpool import RandomPool

import galaxy.exceptions
from galaxy.util import smart_str

log = logging.getLogger(__name__)

KIND_TOO_LONG_MESSAGE = "Galaxy coding error, keep encryption 'kinds' smaller to utilize more bites of randomness from id_secret values."

if os.path.exists("/dev/urandom"):
    # We have urandom, use it as the source of random data
    random_fd = os.open("/dev/urandom", os.O_RDONLY)

    def get_random_bytes(nbytes):
        value = os.read(random_fd, nbytes)
        # Normally we should get as much as we need
        if len(value) == nbytes:
            return value.encode("hex")
        # If we don't, keep reading (this is slow and should never happen)
        while len(value) < nbytes:
            value += os.read(random_fd, nbytes - len(value))
        return value.encode("hex")
[docs] def get_random_bytes(nbytes): nbits = nbytes * 8 random_pool = RandomPool(1064) while random_pool.entropy < nbits: random_pool.add_event() random_pool.stir() return str(number.getRandomNumber(nbits, random_pool.get_bytes))
[docs]class SecurityHelper(object):
[docs] def __init__(self, **config): id_secret = config['id_secret'] self.id_secret = id_secret self.id_cipher = Blowfish.new(self.id_secret) per_kind_id_secret_base = config.get('per_kind_id_secret_base', self.id_secret) self.id_ciphers_for_kind = _cipher_cache(per_kind_id_secret_base)
[docs] def encode_id(self, obj_id, kind=None): if obj_id is None: raise galaxy.exceptions.MalformedId("Attempted to encode None id") id_cipher = self.__id_cipher(kind) # Convert to string s = str(obj_id) # Pad to a multiple of 8 with leading "!" s = ("!" * (8 - len(s) % 8)) + s # Encrypt return id_cipher.encrypt(s).encode('hex')
[docs] def encode_dict_ids(self, a_dict, kind=None, skip_startswith=None): """ Encode all ids in dictionary. Ids are identified by (a) an 'id' key or (b) a key that ends with '_id' """ for key, val in a_dict.items(): if key == 'id' or key.endswith('_id') and (skip_startswith is None or not key.startswith(skip_startswith)): a_dict[key] = self.encode_id(val, kind=kind) return a_dict
[docs] def encode_all_ids(self, rval, recursive=False): """ Encodes all integer values in the dict rval whose keys are 'id' or end with '_id' excluding `tool_id` which are consumed and produced as is via the API. """ if not isinstance(rval, dict): return rval for k, v in rval.items(): if (k == 'id' or k.endswith('_id')) and v is not None and k not in ['tool_id', 'external_id']: try: rval[k] = self.encode_id(v) except Exception: pass # probably already encoded if (k.endswith("_ids") and isinstance(v, list)): try: o = [] for i in v: o.append(self.encode_id(i)) rval[k] = o except Exception: pass else: if recursive and isinstance(v, dict): rval[k] = self.encode_all_ids(v, recursive) elif recursive and isinstance(v, list): rval[k] = [self.encode_all_ids(el, True) for el in v] return rval
[docs] def decode_id(self, obj_id, kind=None): id_cipher = self.__id_cipher(kind) return int(id_cipher.decrypt(obj_id.decode('hex')).lstrip("!"))
[docs] def encode_guid(self, session_key): # Session keys are strings # Pad to a multiple of 8 with leading "!" s = ("!" * (8 - len(session_key) % 8)) + session_key # Encrypt return self.id_cipher.encrypt(s).encode('hex')
[docs] def decode_guid(self, session_key): # Session keys are strings return self.id_cipher.decrypt(session_key.decode('hex')).lstrip("!")
[docs] def get_new_guid(self): # Generate a unique, high entropy 128 bit random number return get_random_bytes(16)
def __id_cipher(self, kind): if not kind: id_cipher = self.id_cipher else: id_cipher = self.id_ciphers_for_kind[kind] return id_cipher
class _cipher_cache(collections.defaultdict): def __init__(self, secret_base): self.secret_base = secret_base def __missing__(self, key): assert len(key) < 15, KIND_TOO_LONG_MESSAGE secret = self.secret_base + "__" + key return Blowfish.new(_last_bits(secret)) def _last_bits(secret): """We append the kind at the end, so just use the bits at the end. """ last_bits = smart_str(secret) if len(last_bits) > MAXIMUM_ID_SECRET_LENGTH: last_bits = last_bits[-MAXIMUM_ID_SECRET_LENGTH:] return last_bits