Warning
This document is for an in-development version of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.rule_helper
import hashlib
import logging
import random
from datetime import datetime
from sqlalchemy import func
from galaxy import (
model,
util,
)
from galaxy.tool_util.deps.dependencies import ToolInfo
log = logging.getLogger(__name__)
VALID_JOB_HASH_STRATEGIES = ["job", "user", "history", "workflow_invocation"]
[docs]class RuleHelper:
"""Utility to allow job rules to interface cleanly with the rest of
Galaxy and shield them from low-level details of models, metrics, etc....
Currently focus is on figuring out job statistics for a given user, but
could interface with other stuff as well.
"""
[docs] def supports_container(self, job_or_tool, container_type):
"""
Job rules can pass this function a job, job_wrapper, or tool and
determine if the underlying tool believes it can be run with a specific container type.
:param job_or_tool:
:param container_type: either "docker" or "singularity" currently
:return: true if the tool supports the specified container type.
"""
# Not a ton of logic in this method - but the idea is to shield rule
# developers from the details and they shouldn't have to know how to
# interrogate tool or job to figure out if it can be run in a
# container.
if hasattr(job_or_tool, "containers"):
tool = job_or_tool
elif hasattr(job_or_tool, "tool"):
# Have a JobWrapper-like
tool = job_or_tool.tool
else:
# Have a Job object.
tool = self.app.toolbox.get_tool(job_or_tool.tool_id, tool_version=job_or_tool.tool_version)
tool_info = ToolInfo(
tool.containers, tool.requirements, tool.requires_galaxy_python_environment, tool.docker_env_pass_through
)
container_description = self.app.container_finder.find_best_container_description([container_type], tool_info)
return container_description is not None
[docs] def supports_docker(self, job_or_tool):
"""
Returns true if the tool or job supports running on a singularity container.
:param job_or_tool: the job or tool to test for.
:return: true if the tool/job can run in docker.
"""
return self.supports_container(job_or_tool, container_type="docker")
[docs] def supports_singularity(self, job_or_tool):
"""
Returns true if the tool or job supports running on a singularity container.
:param job_or_tool: the job or tool to test for.
:return: true if the tool/job can run in singularity.
"""
return self.supports_container(job_or_tool, container_type="singularity")
[docs] def job_count(self, **kwds):
query = self.query(model.Job)
return self._filter_job_query(query, **kwds).count()
[docs] def sum_job_runtime(self, **kwds):
# TODO: Consider sum_core_hours or something that scales runtime by
# by calculated cores per job.
query = self.metric_query(
select=func.sum(model.JobMetricNumeric.table.c.metric_value),
metric_name="runtime_seconds",
plugin="core",
)
query = query.join(model.Job)
return float(self._filter_job_query(query, **kwds).first()[0])
[docs] def metric_query(self, select, metric_name, plugin, numeric=True):
metric_class = model.JobMetricNumeric if numeric else model.JobMetricText
query = self.query(select)
query = query.filter(metric_class.table.c.plugin == plugin)
query = query.filter(metric_class.table.c.metric_name == metric_name)
return query
def _filter_job_query(
self,
query,
for_user_email=None,
for_destination=None,
for_destinations=None,
for_job_states=None,
created_in_last=None,
updated_in_last=None,
):
if for_destination is not None:
for_destinations = [for_destination]
query = query.join(model.User)
if for_user_email is not None:
query = query.filter(model.User.table.c.email == for_user_email)
if for_destinations is not None:
if len(for_destinations) == 1:
query = query.filter(model.Job.table.c.destination_id == for_destinations[0])
else:
query = query.filter(model.Job.table.c.destination_id.in_(for_destinations))
if created_in_last is not None:
end_date = datetime.now()
start_date = end_date - created_in_last
query = query.filter(model.Job.table.c.create_time >= start_date)
if updated_in_last is not None:
end_date = datetime.now()
start_date = end_date - updated_in_last
log.info(end_date)
log.info(start_date)
query = query.filter(model.Job.table.c.update_time >= start_date)
if for_job_states is not None:
# Optimize the singleton case - can be much more performant in my experience.
if len(for_job_states) == 1:
query = query.filter(model.Job.table.c.state == for_job_states[0])
else:
query = query.filter(model.Job.table.c.state.in_(for_job_states))
return query
[docs] def should_burst(self, destination_ids, num_jobs, job_states=None):
"""Check if the specified destinations ``destination_ids`` have at
least ``num_jobs`` assigned to it - send in ``job_state`` as ``queued``
to limit this check to number of jobs queued.
See stock_rules for an simple example of using this function - but to
get the most out of it - it should probably be used with custom job
rules that can respond to the bursting by allocating resources,
launching cloud nodes, etc....
"""
if job_states is None:
job_states = "queued,running"
from_destination_job_count = self.job_count(
for_destinations=destination_ids, for_job_states=util.listify(job_states)
)
# Would this job push us over maximum job count before requiring
# bursting (roughly... very roughly given many handler threads may be
# scheduling jobs).
return (from_destination_job_count + 1) > int(num_jobs)
[docs] def choose_one(self, lst, hash_value=None):
"""Choose a random value from supplied list. If hash_value is passed
in then every request with that same hash_value would produce the same
choice from the supplied list.
"""
if hash_value is None:
return random.choice(lst)
if not isinstance(hash_value, int):
# Convert hash_value string into index
as_hex = hashlib.md5(util.smart_str(hash_value)).hexdigest()
hash_value = int(as_hex, 16)
# else assumed to be 'random' int from 0-~Inf
random_index = hash_value % len(lst)
return lst[random_index]
[docs] def job_hash(self, job, hash_by=None):
"""Produce a reproducible hash for the given job on various
criteria - for instance if hash_by is "workflow_invocation,history" -
all jobs within the same workflow invocation will receive the same
hash - for jobs outside of workflows all jobs within the same history
will receive the same hash, other jobs will be hashed on job's id
randomly.
Primarily intended for use with ``choose_one`` above - to consistent
route or schedule related jobs.
"""
if hash_by is None:
hash_by = ["job"]
hash_bys = util.listify(hash_by)
for hash_by in hash_bys:
job_hash = self._try_hash_for_job(job, hash_by)
if job_hash:
return job_hash
# Fall back to just hashing by job id, should always return a value.
return self._try_hash_for_job(job, "job")
def _try_hash_for_job(self, job, hash_by):
"""May return False or None if hash type is invalid for that job -
e.g. attempting to hash by user for anonymous job or by workflow
invocation for jobs outside of workflows.
"""
if hash_by not in VALID_JOB_HASH_STRATEGIES:
message = f"Do not know how to hash jobs by {hash_by}, must be one of {VALID_JOB_HASH_STRATEGIES}"
raise Exception(message)
if hash_by == "workflow_invocation":
return job.raw_param_dict().get("__workflow_invocation_uuid__", None)
elif hash_by == "history":
return job.history_id
elif hash_by == "user":
user = job.user
return user and user.id
elif hash_by == "job":
return job.id