Source code for galaxy.quota

"""Galaxy Quotas"""

import logging
from typing import Optional

from sqlalchemy import select
from sqlalchemy.sql import text

import galaxy.util
from galaxy.model.base import transaction

log = logging.getLogger(__name__)


class QuotaAgent:  # metaclass=abc.ABCMeta
    """Abstraction around querying Galaxy for quota available and used.

    Certain parts of the app that deal directly with modifying the quota assume more than
    this interface - they assume the availability of the methods on DatabaseQuotaAgent that
    implements this interface. But for read-only quota operations - such as checking available
    quota or reporting it to users - methods defined on this interface should be sufficient
    and the NoQuotaAgent should be a valid choice.

    Sticking to well annotated methods on this interface should make it clean and
    possible to implement other backends for quota setting in the future such as managing
    the quota in other apps (LDAP maybe?) or via configuration files.
    """

    def relabel_quota_for_dataset(self, dataset, from_label: Optional[str], to_label: Optional[str]):
        """Update the quota source label for dataset and adjust relevant quotas.

        Subtract quota for labels from users using old label and quota for new label
        for these users.
        """

    # TODO: make abstractmethod after they work better with mypy
    def get_quota(self, user, quota_source_label=None) -> Optional[int]:
        """Return quota in bytes or None if no quota is set."""

    def get_quota_nice_size(self, user, quota_source_label=None) -> Optional[str]:
        """Return quota as a human-readable string or 'unlimited' if no quota is set."""
        quota_bytes = self.get_quota(user, quota_source_label=quota_source_label)
        if quota_bytes is not None:
            quota_str = galaxy.util.nice_size(quota_bytes)
        else:
            quota_str = "unlimited"
        return quota_str

    # TODO: make abstractmethod after they work better with mypy
    def get_percent(
        self, trans=None, user=False, history=False, usage=False, quota=False, quota_source_label=None
    ) -> Optional[int]:
        """Return the percentage of any storage quota applicable to the user/transaction."""

    def get_usage(self, trans=None, user=False, history=False, quota_source_label=None) -> Optional[float]:
        if trans:
            user = trans.user
            history = trans.history
        assert user is not False, "Could not determine user."
        if not user:
            assert history, "Could not determine anonymous user's history."
            usage = history.disk_size
        else:
            if quota_source_label is None:
                usage = user.total_disk_usage
            else:
                quota_source_usage = user.quota_source_usage_for(quota_source_label)
                if not quota_source_usage or quota_source_usage.disk_usage is None:
                    usage = 0.0
                else:
                    usage = quota_source_usage.disk_usage
        return usage

    def is_over_quota(self, app, job, job_destination):
        """Return True if the user or history is over quota for specified job.

        job_destination unused currently but an important future application will
        be admins and/or users dynamically specifying which object stores to use
        and that will likely come in through the job destination.
        """


[docs]class NoQuotaAgent(QuotaAgent): """Base quota agent, always returns no quota"""
[docs] def __init__(self): pass
[docs] def get_quota(self, user, quota_source_label=None) -> Optional[int]: return None
[docs] def relabel_quota_for_dataset(self, dataset, from_label: Optional[str], to_label: Optional[str]): return None
@property def default_quota(self): return None
[docs] def get_percent( self, trans=None, user=False, history=False, usage=False, quota=False, quota_source_label=None ) -> Optional[int]: return None
[docs] def is_over_quota(self, app, job, job_destination): return False
class DatabaseQuotaAgent(QuotaAgent): """Class that handles galaxy quotas""" def __init__(self, model): self.model = model self.sa_session = model.context def get_quota(self, user, quota_source_label=None) -> Optional[int]: """ Calculated like so: 1. Anonymous users get the default quota. 2. Logged in users start with the highest of their associated '=' quotas or the default quota, if there are no associated '=' quotas. If an '=' unlimited (-1 in the database) quota is found during this process, the user has no quota (aka unlimited). 3. Quota is increased or decreased by any corresponding '+' or '-' quotas. """ if not user: return self._default_unregistered_quota(quota_source_label) query = text( """ SELECT ( COALESCE(MAX(CASE WHEN union_quota.operation = '=' THEN union_quota.bytes ELSE NULL END), (SELECT default_quota.bytes FROM quota as default_quota LEFT JOIN default_quota_association on default_quota.id = default_quota_association.quota_id WHERE default_quota_association.type = 'registered' AND default_quota.deleted != :is_true AND default_quota.quota_source_label {label_cond})) + (CASE WHEN SUM(CASE WHEN union_quota.operation = '=' AND union_quota.bytes = -1 THEN 1 ELSE 0 END) > 0 THEN NULL ELSE 0 END) + (COALESCE(SUM( CASE WHEN union_quota.operation = '+' THEN union_quota.bytes WHEN union_quota.operation = '-' THEN -1 * union_quota.bytes ELSE 0 END ), 0)) ) FROM ( SELECT user_quota.operation as operation, user_quota.bytes as bytes FROM galaxy_user as guser LEFT JOIN user_quota_association as uqa on guser.id = uqa.user_id LEFT JOIN quota as user_quota on user_quota.id = uqa.quota_id WHERE user_quota.deleted != :is_true AND user_quota.quota_source_label {label_cond} AND guser.id = :user_id UNION ALL SELECT group_quota.operation as operation, group_quota.bytes as bytes FROM galaxy_user as guser LEFT JOIN user_group_association as uga on guser.id = uga.user_id LEFT JOIN galaxy_group on galaxy_group.id = uga.group_id LEFT JOIN group_quota_association as gqa on galaxy_group.id = gqa.group_id LEFT JOIN quota as group_quota on group_quota.id = gqa.quota_id WHERE group_quota.deleted != :is_true AND group_quota.quota_source_label {label_cond} AND guser.id = :user_id ) as union_quota """.format( label_cond="IS NULL" if quota_source_label is None else " = :label" ) ) engine = self.sa_session.get_bind() with engine.connect() as conn: res = conn.execute(query, {"is_true": True, "user_id": user.id, "label": quota_source_label}).fetchone() if res: return int(res[0]) if res[0] else None else: return None def relabel_quota_for_dataset(self, dataset, from_label: Optional[str], to_label: Optional[str]): adjust = dataset.get_total_size() with_quota_affected_users = """WITH quota_affected_users AS ( SELECT DISTINCT user_id FROM history INNER JOIN history_dataset_association on history_dataset_association.history_id = history.id INNER JOIN dataset on history_dataset_association.dataset_id = dataset.id WHERE dataset_id = :dataset_id )""" engine = self.sa_session.get_bind() # Hack for older sqlite, would work on newer sqlite - 3.24.0 for_sqlite = "sqlite" in engine.dialect.name if to_label == from_label: return if to_label is None: to_statement = f""" {with_quota_affected_users} UPDATE galaxy_user SET disk_usage = coalesce(disk_usage, 0) + :adjust WHERE id in (select user_id from quota_affected_users) """ else: if for_sqlite: to_statement = f""" {with_quota_affected_users}, new_quota_sources (user_id, disk_usage, quota_source_label) AS ( SELECT user_id, :adjust as disk_usage, :to_label as quota_source_label FROM quota_affected_users ) INSERT OR REPLACE INTO user_quota_source_usage (id, user_id, quota_source_label, disk_usage) SELECT old.id, new.user_id, new.quota_source_label, COALESCE(old.disk_usage + :adjust, :adjust) FROM new_quota_sources as new LEFT JOIN user_quota_source_usage AS old ON new.user_id = old.user_id AND NEW.quota_source_label = old.quota_source_label""" else: to_statement = f""" {with_quota_affected_users}, new_quota_sources (user_id, disk_usage, quota_source_label) AS ( SELECT user_id, :adjust as disk_usage, :to_label as quota_source_label FROM quota_affected_users ) INSERT INTO user_quota_source_usage(user_id, disk_usage, quota_source_label) SELECT * FROM new_quota_sources ON CONFLICT ON constraint uqsu_unique_label_per_user DO UPDATE SET disk_usage = user_quota_source_usage.disk_usage + :adjust """ if from_label is None: from_statement = f""" {with_quota_affected_users} UPDATE galaxy_user SET disk_usage = coalesce(disk_usage - :adjust, 0) WHERE id in (select user_id from quota_affected_users) """ else: if for_sqlite: from_statement = f""" {with_quota_affected_users}, new_quota_sources (user_id, disk_usage, quota_source_label) AS ( SELECT user_id, :adjust as disk_usage, :from_label as quota_source_label FROM quota_affected_users ) INSERT OR REPLACE INTO user_quota_source_usage (id, user_id, quota_source_label, disk_usage) SELECT old.id, new.user_id, new.quota_source_label, COALESCE(old.disk_usage - :adjust, 0) FROM new_quota_sources as new LEFT JOIN user_quota_source_usage AS old ON new.user_id = old.user_id AND NEW.quota_source_label = old.quota_source_label""" else: from_statement = f""" {with_quota_affected_users}, new_quota_sources (user_id, disk_usage, quota_source_label) AS ( SELECT user_id, 0 as disk_usage, :from_label as quota_source_label FROM quota_affected_users ) INSERT INTO user_quota_source_usage(user_id, disk_usage, quota_source_label) SELECT * FROM new_quota_sources ON CONFLICT ON constraint uqsu_unique_label_per_user DO UPDATE SET disk_usage = user_quota_source_usage.disk_usage - :adjust """ bind = {"dataset_id": dataset.id, "adjust": int(adjust), "to_label": to_label, "from_label": from_label} engine = self.sa_session.get_bind() with engine.connect() as conn: with conn.begin(): conn.execute(text(from_statement), bind) conn.execute(text(to_statement), bind) def _default_unregistered_quota(self, quota_source_label): return self._default_quota(self.model.DefaultQuotaAssociation.types.UNREGISTERED, quota_source_label) def _default_quota(self, default_type, quota_source_label): label_condition = "IS NULL" if quota_source_label is None else "= :label" query = text( f""" SELECT bytes FROM quota as default_quota LEFT JOIN default_quota_association on default_quota.id = default_quota_association.quota_id WHERE default_quota_association.type = :default_type AND default_quota.deleted != :is_true AND default_quota.quota_source_label {label_condition} """ ) engine = self.sa_session.get_bind() with engine.connect() as conn: res = conn.execute( query, {"is_true": True, "label": quota_source_label, "default_type": default_type} ).fetchone() if res: return res[0] else: return None def set_default_quota(self, default_type, quota): # Unset the current default(s) associated with this quota, if there are any for dqa in quota.default: self.sa_session.delete(dqa) # Unset the current users/groups associated with this quota for uqa in quota.users: self.sa_session.delete(uqa) for gqa in quota.groups: self.sa_session.delete(gqa) # Find the old default, assign the new quota if it exists label = quota.quota_source_label stmt = select(self.model.DefaultQuotaAssociation).filter( self.model.DefaultQuotaAssociation.type == default_type ) dqas = self.sa_session.scalars(stmt).all() target_default = None for dqa in dqas: if dqa.quota.quota_source_label == label and not dqa.quota.deleted: target_default = dqa if target_default: target_default.quota = quota # Or create if necessary else: target_default = self.model.DefaultQuotaAssociation(default_type, quota) self.sa_session.add(target_default) with transaction(self.sa_session): self.sa_session.commit() def get_percent( self, trans=None, user=False, history=False, usage=False, quota=False, quota_source_label=None ) -> Optional[int]: """ Return the percentage of any storage quota applicable to the user/transaction. """ # if trans passed, use it to get the user, history (instead of/override vals passed) if trans: user = trans.user history = trans.history # if quota wasn't passed, attempt to get the quota if quota is False: quota = self.get_quota(user, quota_source_label=quota_source_label) # return none if no applicable quotas or quotas disabled if quota is None: return None # get the usage, if it wasn't passed if usage is False: usage = self.get_usage(trans, user, history, quota_source_label=quota_source_label) try: return min((int(float(usage) / quota * 100), 100)) except ZeroDivisionError: return 100 def set_entity_quota_associations(self, quotas=None, users=None, groups=None, delete_existing_assocs=True): if quotas is None: quotas = [] if users is None: users = [] if groups is None: groups = [] for quota in quotas: if delete_existing_assocs: flush_needed = False for a in quota.users + quota.groups: self.sa_session.delete(a) flush_needed = True if flush_needed: with transaction(self.sa_session): self.sa_session.commit() for user in users: uqa = self.model.UserQuotaAssociation(user, quota) self.sa_session.add(uqa) for group in groups: gqa = self.model.GroupQuotaAssociation(group, quota) self.sa_session.add(gqa) with transaction(self.sa_session): self.sa_session.commit() def is_over_quota(self, app, job, job_destination): # Doesn't work because job.object_store_id until inside handler :_( # quota_source_label = job.quota_source_label if job_destination is not None: object_store_id = job_destination.params.get("object_store_id", None) object_store = app.object_store quota_source_map = object_store.get_quota_source_map() quota_source_label = quota_source_map.get_quota_source_info(object_store_id).label else: quota_source_label = None quota = self.get_quota(job.user, quota_source_label=quota_source_label) if quota is not None: try: usage = self.get_usage(user=job.user, history=job.history, quota_source_label=quota_source_label) if usage > quota: return True except AssertionError: pass # No history, should not happen with an anon user return False
[docs]def get_quota_agent(config, model) -> QuotaAgent: quota_agent: QuotaAgent if config.enable_quotas: quota_agent = galaxy.quota.DatabaseQuotaAgent(model) else: quota_agent = galaxy.quota.NoQuotaAgent() return quota_agent
__all__ = ("get_quota_agent", "NoQuotaAgent")