Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.quota
"""Galaxy Quotas"""
import logging
from typing import Optional
from sqlalchemy import select
from sqlalchemy.sql import text
import galaxy.util
from galaxy.model.base import transaction
log = logging.getLogger(__name__)
class QuotaAgent: # metaclass=abc.ABCMeta
"""Abstraction around querying Galaxy for quota available and used.
Certain parts of the app that deal directly with modifying the quota assume more than
this interface - they assume the availability of the methods on DatabaseQuotaAgent that
implements this interface. But for read-only quota operations - such as checking available
quota or reporting it to users - methods defined on this interface should be sufficient
and the NoQuotaAgent should be a valid choice.
Sticking to well annotated methods on this interface should make it clean and
possible to implement other backends for quota setting in the future such as managing
the quota in other apps (LDAP maybe?) or via configuration files.
"""
# TODO: make abstractmethod after they work better with mypy
def get_quota(self, user, quota_source_label=None) -> Optional[int]:
"""Return quota in bytes or None if no quota is set."""
def get_quota_nice_size(self, user, quota_source_label=None) -> Optional[str]:
"""Return quota as a human-readable string or 'unlimited' if no quota is set."""
quota_bytes = self.get_quota(user, quota_source_label=quota_source_label)
if quota_bytes is not None:
quota_str = galaxy.util.nice_size(quota_bytes)
else:
quota_str = "unlimited"
return quota_str
# TODO: make abstractmethod after they work better with mypy
def get_percent(
self, trans=None, user=False, history=False, usage=False, quota=False, quota_source_label=None
) -> Optional[int]:
"""Return the percentage of any storage quota applicable to the user/transaction."""
def get_usage(self, trans=None, user=False, history=False, quota_source_label=None) -> Optional[float]:
if trans:
user = trans.user
history = trans.history
assert user is not False, "Could not determine user."
if not user:
assert history, "Could not determine anonymous user's history."
usage = history.disk_size
else:
if quota_source_label is None:
usage = user.total_disk_usage
else:
quota_source_usage = user.quota_source_usage_for(quota_source_label)
if not quota_source_usage or quota_source_usage.disk_usage is None:
usage = 0.0
else:
usage = quota_source_usage.disk_usage
return usage
def is_over_quota(self, app, job, job_destination):
"""Return True if the user or history is over quota for specified job.
job_destination unused currently but an important future application will
be admins and/or users dynamically specifying which object stores to use
and that will likely come in through the job destination.
"""
[docs]class NoQuotaAgent(QuotaAgent):
"""Base quota agent, always returns no quota"""
@property
def default_quota(self):
return None
[docs] def get_percent(
self, trans=None, user=False, history=False, usage=False, quota=False, quota_source_label=None
) -> Optional[int]:
return None
class DatabaseQuotaAgent(QuotaAgent):
"""Class that handles galaxy quotas"""
def __init__(self, model):
self.model = model
self.sa_session = model.context
def get_quota(self, user, quota_source_label=None) -> Optional[int]:
"""
Calculated like so:
1. Anonymous users get the default quota.
2. Logged in users start with the highest of their associated '='
quotas or the default quota, if there are no associated '='
quotas. If an '=' unlimited (-1 in the database) quota is found
during this process, the user has no quota (aka unlimited).
3. Quota is increased or decreased by any corresponding '+' or '-'
quotas.
"""
if not user:
return self._default_unregistered_quota(quota_source_label)
query = text(
"""
SELECT (
COALESCE(MAX(CASE WHEN union_quota.operation = '='
THEN union_quota.bytes
ELSE NULL
END),
(SELECT default_quota.bytes
FROM quota as default_quota
LEFT JOIN default_quota_association on default_quota.id = default_quota_association.quota_id
WHERE default_quota_association.type = 'registered'
AND default_quota.deleted != :is_true
AND default_quota.quota_source_label {label_cond}))
+
(CASE WHEN SUM(CASE WHEN union_quota.operation = '=' AND union_quota.bytes = -1
THEN 1 ELSE 0
END) > 0
THEN NULL
ELSE 0 END)
+
(COALESCE(SUM(
CASE WHEN union_quota.operation = '+' THEN union_quota.bytes
WHEN union_quota.operation = '-' THEN -1 * union_quota.bytes
ELSE 0
END
), 0))
)
FROM (
SELECT user_quota.operation as operation, user_quota.bytes as bytes
FROM galaxy_user as guser
LEFT JOIN user_quota_association as uqa on guser.id = uqa.user_id
LEFT JOIN quota as user_quota on user_quota.id = uqa.quota_id
WHERE user_quota.deleted != :is_true
AND user_quota.quota_source_label {label_cond}
AND guser.id = :user_id
UNION ALL
SELECT group_quota.operation as operation, group_quota.bytes as bytes
FROM galaxy_user as guser
LEFT JOIN user_group_association as uga on guser.id = uga.user_id
LEFT JOIN galaxy_group on galaxy_group.id = uga.group_id
LEFT JOIN group_quota_association as gqa on galaxy_group.id = gqa.group_id
LEFT JOIN quota as group_quota on group_quota.id = gqa.quota_id
WHERE group_quota.deleted != :is_true
AND group_quota.quota_source_label {label_cond}
AND guser.id = :user_id
) as union_quota
""".format(
label_cond="IS NULL" if quota_source_label is None else " = :label"
)
)
conn = self.sa_session.connection()
with conn.begin():
res = conn.execute(query, is_true=True, user_id=user.id, label=quota_source_label).fetchone()
if res:
return int(res[0]) if res[0] else None
else:
return None
def _default_unregistered_quota(self, quota_source_label):
return self._default_quota(self.model.DefaultQuotaAssociation.types.UNREGISTERED, quota_source_label)
def _default_quota(self, default_type, quota_source_label):
label_condition = "IS NULL" if quota_source_label is None else "= :label"
query = text(
f"""
SELECT bytes
FROM quota as default_quota
LEFT JOIN default_quota_association on default_quota.id = default_quota_association.quota_id
WHERE default_quota_association.type = :default_type
AND default_quota.deleted != :is_true
AND default_quota.quota_source_label {label_condition}
"""
)
conn = self.sa_session.connection()
with conn.begin():
res = conn.execute(query, is_true=True, label=quota_source_label, default_type=default_type).fetchone()
if res:
return res[0]
else:
return None
def set_default_quota(self, default_type, quota):
# Unset the current default(s) associated with this quota, if there are any
for dqa in quota.default:
self.sa_session.delete(dqa)
# Unset the current users/groups associated with this quota
for uqa in quota.users:
self.sa_session.delete(uqa)
for gqa in quota.groups:
self.sa_session.delete(gqa)
# Find the old default, assign the new quota if it exists
label = quota.quota_source_label
stmt = select(self.model.DefaultQuotaAssociation).filter(
self.model.DefaultQuotaAssociation.type == default_type
)
dqas = self.sa_session.scalars(stmt).all()
target_default = None
for dqa in dqas:
if dqa.quota.quota_source_label == label and not dqa.quota.deleted:
target_default = dqa
if target_default:
target_default.quota = quota
# Or create if necessary
else:
target_default = self.model.DefaultQuotaAssociation(default_type, quota)
self.sa_session.add(target_default)
with transaction(self.sa_session):
self.sa_session.commit()
def get_percent(
self, trans=None, user=False, history=False, usage=False, quota=False, quota_source_label=None
) -> Optional[int]:
"""
Return the percentage of any storage quota applicable to the user/transaction.
"""
# if trans passed, use it to get the user, history (instead of/override vals passed)
if trans:
user = trans.user
history = trans.history
# if quota wasn't passed, attempt to get the quota
if quota is False:
quota = self.get_quota(user, quota_source_label=quota_source_label)
# return none if no applicable quotas or quotas disabled
if quota is None:
return None
# get the usage, if it wasn't passed
if usage is False:
usage = self.get_usage(trans, user, history, quota_source_label=quota_source_label)
try:
return min((int(float(usage) / quota * 100), 100))
except ZeroDivisionError:
return 100
def set_entity_quota_associations(self, quotas=None, users=None, groups=None, delete_existing_assocs=True):
if quotas is None:
quotas = []
if users is None:
users = []
if groups is None:
groups = []
for quota in quotas:
if delete_existing_assocs:
flush_needed = False
for a in quota.users + quota.groups:
self.sa_session.delete(a)
flush_needed = True
if flush_needed:
with transaction(self.sa_session):
self.sa_session.commit()
for user in users:
uqa = self.model.UserQuotaAssociation(user, quota)
self.sa_session.add(uqa)
for group in groups:
gqa = self.model.GroupQuotaAssociation(group, quota)
self.sa_session.add(gqa)
with transaction(self.sa_session):
self.sa_session.commit()
def is_over_quota(self, app, job, job_destination):
# Doesn't work because job.object_store_id until inside handler :_(
# quota_source_label = job.quota_source_label
if job_destination is not None:
object_store_id = job_destination.params.get("object_store_id", None)
object_store = app.object_store
quota_source_map = object_store.get_quota_source_map()
quota_source_label = quota_source_map.get_quota_source_info(object_store_id).label
else:
quota_source_label = None
quota = self.get_quota(job.user, quota_source_label=quota_source_label)
if quota is not None:
try:
usage = self.get_usage(user=job.user, history=job.history, quota_source_label=quota_source_label)
if usage > quota:
return True
except AssertionError:
pass # No history, should not happen with an anon user
return False
[docs]def get_quota_agent(config, model) -> QuotaAgent:
quota_agent: QuotaAgent
if config.enable_quotas:
quota_agent = galaxy.quota.DatabaseQuotaAgent(model)
else:
quota_agent = galaxy.quota.NoQuotaAgent()
return quota_agent
__all__ = ("get_quota_agent", "NoQuotaAgent")