Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.jobs.rule_helper
import hashlib
import logging
import random
from datetime import datetime
from sqlalchemy import func
from galaxy import (
model,
util
)
from galaxy.tool_util.deps.dependencies import ToolInfo
log = logging.getLogger(__name__)
VALID_JOB_HASH_STRATEGIES = ["job", "user", "history", "workflow_invocation"]
[docs]class RuleHelper:
""" Utility to allow job rules to interface cleanly with the rest of
Galaxy and shield them from low-level details of models, metrics, etc....
Currently focus is on figuring out job statistics for a given user, but
could interface with other stuff as well.
"""
[docs] def supports_container(self, job_or_tool, container_type):
"""
Job rules can pass this function a job, job_wrapper, or tool and
determine if the underlying tool believes it can be run with a specific container type.
:param job_or_tool:
:param container_type: either "docker" or "singularity" currently
:return: true if the tool supports the specified container type.
"""
# Not a ton of logic in this method - but the idea is to shield rule
# developers from the details and they shouldn't have to know how to
# interrogate tool or job to figure out if it can be run in a
# container.
if hasattr(job_or_tool, 'containers'):
tool = job_or_tool
elif hasattr(job_or_tool, 'tool'):
# Have a JobWrapper-like
tool = job_or_tool.tool
else:
# Have a Job object.
tool = self.app.toolbox.get_tool(job_or_tool.tool_id, tool_version=job_or_tool.tool_version)
tool_info = ToolInfo(tool.containers, tool.requirements, tool.requires_galaxy_python_environment,
tool.docker_env_pass_through)
container_description = self.app.container_finder.find_best_container_description([container_type], tool_info)
return container_description is not None
[docs] def supports_docker(self, job_or_tool):
"""
Returns true if the tool or job supports running on a singularity container.
:param job_or_tool: the job or tool to test for.
:return: true if the tool/job can run in docker.
"""
return self.supports_container(job_or_tool, container_type="docker")
[docs] def supports_singularity(self, job_or_tool):
"""
Returns true if the tool or job supports running on a singularity container.
:param job_or_tool: the job or tool to test for.
:return: true if the tool/job can run in singularity.
"""
return self.supports_container(job_or_tool, container_type="singularity")
[docs] def job_count(
self,
**kwds
):
query = self.query(model.Job)
return self._filter_job_query(query, **kwds).count()
[docs] def sum_job_runtime(
self,
**kwds
):
# TODO: Consider sum_core_hours or something that scales runtime by
# by calculated cores per job.
query = self.metric_query(
select=func.sum(model.JobMetricNumeric.table.c.metric_value),
metric_name="runtime_seconds",
plugin="core",
)
query = query.join(model.Job)
return float(self._filter_job_query(query, **kwds).first()[0])
[docs] def metric_query(self, select, metric_name, plugin, numeric=True):
metric_class = model.JobMetricNumeric if numeric else model.JobMetricText
query = self.query(select)
query = query.filter(metric_class.table.c.plugin == plugin)
query = query.filter(metric_class.table.c.metric_name == metric_name)
return query
def _filter_job_query(
self,
query,
for_user_email=None,
for_destination=None,
for_destinations=None,
for_job_states=None,
created_in_last=None,
updated_in_last=None,
):
if for_destination is not None:
for_destinations = [for_destination]
query = query.join(model.User)
if for_user_email is not None:
query = query.filter(model.User.table.c.email == for_user_email)
if for_destinations is not None:
if len(for_destinations) == 1:
query = query.filter(model.Job.table.c.destination_id == for_destinations[0])
else:
query = query.filter(model.Job.table.c.destination_id.in_(for_destinations))
if created_in_last is not None:
end_date = datetime.now()
start_date = end_date - created_in_last
query = query.filter(model.Job.table.c.create_time >= start_date)
if updated_in_last is not None:
end_date = datetime.now()
start_date = end_date - updated_in_last
log.info(end_date)
log.info(start_date)
query = query.filter(model.Job.table.c.update_time >= start_date)
if for_job_states is not None:
# Optimize the singleton case - can be much more performant in my experience.
if len(for_job_states) == 1:
query = query.filter(model.Job.table.c.state == for_job_states[0])
else:
query = query.filter(model.Job.table.c.state.in_(for_job_states))
return query
[docs] def should_burst(self, destination_ids, num_jobs, job_states=None):
""" Check if the specified destinations ``destination_ids`` have at
least ``num_jobs`` assigned to it - send in ``job_state`` as ``queued``
to limit this check to number of jobs queued.
See stock_rules for an simple example of using this function - but to
get the most out of it - it should probably be used with custom job
rules that can respond to the bursting by allocating resources,
launching cloud nodes, etc....
"""
if job_states is None:
job_states = "queued,running"
from_destination_job_count = self.job_count(
for_destinations=destination_ids,
for_job_states=util.listify(job_states)
)
# Would this job push us over maximum job count before requiring
# bursting (roughly... very roughly given many handler threads may be
# scheduling jobs).
return (from_destination_job_count + 1) > int(num_jobs)
[docs] def choose_one(self, lst, hash_value=None):
""" Choose a random value from supplied list. If hash_value is passed
in then every request with that same hash_value would produce the same
choice from the supplied list.
"""
if hash_value is None:
return random.choice(lst)
if not isinstance(hash_value, int):
# Convert hash_value string into index
as_hex = hashlib.md5(util.smart_str(hash_value)).hexdigest()
hash_value = int(as_hex, 16)
# else assumed to be 'random' int from 0-~Inf
random_index = hash_value % len(lst)
return lst[random_index]
[docs] def job_hash(self, job, hash_by=None):
""" Produce a reproducible hash for the given job on various
criteria - for instance if hash_by is "workflow_invocation,history" -
all jobs within the same workflow invocation will receive the same
hash - for jobs outside of workflows all jobs within the same history
will receive the same hash, other jobs will be hashed on job's id
randomly.
Primarily intended for use with ``choose_one`` above - to consistent
route or schedule related jobs.
"""
if hash_by is None:
hash_by = ["job"]
hash_bys = util.listify(hash_by)
for hash_by in hash_bys:
job_hash = self._try_hash_for_job(job, hash_by)
if job_hash:
return job_hash
# Fall back to just hashing by job id, should always return a value.
return self._try_hash_for_job(job, "job")
def _try_hash_for_job(self, job, hash_by):
""" May return False or None if hash type is invalid for that job -
e.g. attempting to hash by user for anonymous job or by workflow
invocation for jobs outside of workflows.
"""
if hash_by not in VALID_JOB_HASH_STRATEGIES:
message = f"Do not know how to hash jobs by {hash_by}, must be one of {VALID_JOB_HASH_STRATEGIES}"
raise Exception(message)
if hash_by == "workflow_invocation":
return job.raw_param_dict().get("__workflow_invocation_uuid__", None)
elif hash_by == "history":
return job.history_id
elif hash_by == "user":
user = job.user
return user and user.id
elif hash_by == "job":
return job.id