Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.web.security
import collections
import logging
import os
import os.path
from Crypto.Cipher import Blowfish
from Crypto.Util import number
from Crypto.Util.randpool import RandomPool
import galaxy.exceptions
from galaxy.util import smart_str
log = logging.getLogger(__name__)
MAXIMUM_ID_SECRET_BITS = 448
MAXIMUM_ID_SECRET_LENGTH = MAXIMUM_ID_SECRET_BITS / 8
KIND_TOO_LONG_MESSAGE = "Galaxy coding error, keep encryption 'kinds' smaller to utilize more bites of randomness from id_secret values."
if os.path.exists("/dev/urandom"):
# We have urandom, use it as the source of random data
random_fd = os.open("/dev/urandom", os.O_RDONLY)
def get_random_bytes(nbytes):
value = os.read(random_fd, nbytes)
# Normally we should get as much as we need
if len(value) == nbytes:
return value.encode("hex")
# If we don't, keep reading (this is slow and should never happen)
while len(value) < nbytes:
value += os.read(random_fd, nbytes - len(value))
return value.encode("hex")
else:
[docs] def get_random_bytes(nbytes):
nbits = nbytes * 8
random_pool = RandomPool(1064)
while random_pool.entropy < nbits:
random_pool.add_event()
random_pool.stir()
return str(number.getRandomNumber(nbits, random_pool.get_bytes))
[docs]class SecurityHelper(object):
[docs] def __init__(self, **config):
id_secret = config['id_secret']
self.id_secret = id_secret
self.id_cipher = Blowfish.new(self.id_secret)
per_kind_id_secret_base = config.get('per_kind_id_secret_base', self.id_secret)
self.id_ciphers_for_kind = _cipher_cache(per_kind_id_secret_base)
[docs] def encode_id(self, obj_id, kind=None):
if obj_id is None:
raise galaxy.exceptions.MalformedId("Attempted to encode None id")
id_cipher = self.__id_cipher(kind)
# Convert to string
s = str(obj_id)
# Pad to a multiple of 8 with leading "!"
s = ("!" * (8 - len(s) % 8)) + s
# Encrypt
return id_cipher.encrypt(s).encode('hex')
[docs] def encode_dict_ids(self, a_dict, kind=None, skip_startswith=None):
"""
Encode all ids in dictionary. Ids are identified by (a) an 'id' key or
(b) a key that ends with '_id'
"""
for key, val in a_dict.items():
if key == 'id' or key.endswith('_id') and (skip_startswith is None or not key.startswith(skip_startswith)):
a_dict[key] = self.encode_id(val, kind=kind)
return a_dict
[docs] def encode_all_ids(self, rval, recursive=False):
"""
Encodes all integer values in the dict rval whose keys are 'id' or end
with '_id' excluding `tool_id` which are consumed and produced as is
via the API.
"""
if not isinstance(rval, dict):
return rval
for k, v in rval.items():
if (k == 'id' or k.endswith('_id')) and v is not None and k not in ['tool_id', 'external_id']:
try:
rval[k] = self.encode_id(v)
except Exception:
pass # probably already encoded
if (k.endswith("_ids") and isinstance(v, list)):
try:
o = []
for i in v:
o.append(self.encode_id(i))
rval[k] = o
except Exception:
pass
else:
if recursive and isinstance(v, dict):
rval[k] = self.encode_all_ids(v, recursive)
elif recursive and isinstance(v, list):
rval[k] = [self.encode_all_ids(el, True) for el in v]
return rval
[docs] def decode_id(self, obj_id, kind=None):
id_cipher = self.__id_cipher(kind)
return int(id_cipher.decrypt(obj_id.decode('hex')).lstrip("!"))
[docs] def encode_guid(self, session_key):
# Session keys are strings
# Pad to a multiple of 8 with leading "!"
s = ("!" * (8 - len(session_key) % 8)) + session_key
# Encrypt
return self.id_cipher.encrypt(s).encode('hex')
[docs] def decode_guid(self, session_key):
# Session keys are strings
return self.id_cipher.decrypt(session_key.decode('hex')).lstrip("!")
[docs] def get_new_guid(self):
# Generate a unique, high entropy 128 bit random number
return get_random_bytes(16)
def __id_cipher(self, kind):
if not kind:
id_cipher = self.id_cipher
else:
id_cipher = self.id_ciphers_for_kind[kind]
return id_cipher
class _cipher_cache(collections.defaultdict):
def __init__(self, secret_base):
self.secret_base = secret_base
def __missing__(self, key):
assert len(key) < 15, KIND_TOO_LONG_MESSAGE
secret = self.secret_base + "__" + key
return Blowfish.new(_last_bits(secret))
def _last_bits(secret):
"""We append the kind at the end, so just use the bits at the end.
"""
last_bits = smart_str(secret)
if len(last_bits) > MAXIMUM_ID_SECRET_LENGTH:
last_bits = last_bits[-MAXIMUM_ID_SECRET_LENGTH:]
return last_bits