import calendar
import logging
import re
from collections import namedtuple
from datetime import (
date,
datetime,
timedelta,
)
from math import (
ceil,
floor,
)
import sqlalchemy as sa
from markupsafe import escape
from sqlalchemy import and_
from galaxy import (
model,
util,
)
from galaxy.util import UNKNOWN
from galaxy.web.legacy_framework import grids
from galaxy.webapps.base.controller import (
BaseUIController,
web,
)
from galaxy.webapps.reports.controllers.jobs import (
get_spark_time,
sorter,
)
from galaxy.webapps.reports.controllers.query import ReportQueryBuilder
log = logging.getLogger(__name__)
[docs]class SpecifiedDateListGrid(grids.Grid):
[docs] class WorkflowNameColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, stored_workflow):
return escape(stored_workflow.name)
[docs] class CreateTimeColumn(grids.DateTimeColumn):
[docs] def get_value(self, trans, grid, stored_workflow):
return stored_workflow.create_time
[docs] class UserColumn(grids.TextColumn):
[docs] def get_value(self, trans, grid, stored_workflow):
if stored_workflow.user:
return escape(stored_workflow.user.email)
return UNKNOWN
[docs] class EmailColumn(grids.GridColumn):
[docs] def filter(self, trans, user, query, column_filter):
if column_filter == "All":
return query
return query.filter(
and_(
model.StoredWorkflow.table.c.user_id == model.User.table.c.id,
model.User.table.c.email == column_filter,
)
)
[docs] class SpecifiedDateColumn(grids.GridColumn):
[docs] def filter(self, trans, user, query, column_filter):
if column_filter == "All":
return query
# We are either filtering on a date like YYYY-MM-DD or on a month like YYYY-MM,
# so we need to figure out which type of date we have
if column_filter.count("-") == 2:
# We are filtering on a date like YYYY-MM-DD
year, month, day = map(int, column_filter.split("-"))
start_date = date(year, month, day)
end_date = start_date + timedelta(days=1)
return query.filter(
and_(
self.model_class.table.c.create_time >= start_date,
self.model_class.table.c.create_time < end_date,
)
)
if column_filter.count("-") == 1:
# We are filtering on a month like YYYY-MM
year, month = map(int, column_filter.split("-"))
start_date = date(year, month, 1)
end_date = start_date + timedelta(days=calendar.monthrange(year, month)[1])
return query.filter(
and_(
self.model_class.table.c.create_time >= start_date,
self.model_class.table.c.create_time < end_date,
)
)
# Grid definition
use_async = False
model_class = model.StoredWorkflow
title = "Workflows"
default_sort_key = "name"
columns = [
WorkflowNameColumn("Name", key="name", attach_popup=False, filterable="advanced"),
CreateTimeColumn("Creation Time", key="create_time", attach_popup=False),
UserColumn(
"User",
key="email",
model_class=model.User,
link=(lambda item: dict(operation="user_per_month", id=item.id, webapp="reports")),
attach_popup=False,
),
# Columns that are valid for filtering but are not visible.
SpecifiedDateColumn("Specified Date", key="specified_date", visible=False),
EmailColumn("Email", key="email", model_class=model.User, visible=False),
]
columns.append(
grids.MulticolFilterColumn(
"Search",
cols_to_filter=[columns[0], columns[2]],
key="free-text-search",
visible=False,
filterable="standard",
)
)
default_filter = {"specified_date": "All"}
num_rows_per_page = 50
use_paging = True
[docs] def build_initial_query(self, trans, **kwd):
return trans.sa_session.query(self.model_class).join(model.User).enable_eagerloads(False)
[docs]class Workflows(BaseUIController, ReportQueryBuilder):
specified_date_list_grid = SpecifiedDateListGrid()
[docs] @web.expose
def specified_date_handler(self, trans, **kwd):
# We add params to the keyword dict in this method in order to rename the param
# with an "f-" prefix, simulating filtering by clicking a search link. We have
# to take this approach because the "-" character is illegal in HTTP requests.
if "f-specified_date" in kwd and "specified_date" not in kwd:
# The user clicked a State link in the Advanced Search box, so 'specified_date'
# will have been eliminated.
pass
elif "specified_date" not in kwd:
kwd["f-specified_date"] = "All"
else:
kwd["f-specified_date"] = kwd["specified_date"]
if "operation" in kwd:
operation = kwd["operation"].lower()
if operation == "workflow_per_month":
# The received id is the stored_workflow id.
return trans.response.send_redirect(
web.url_for(controller="workflows", action="workflow_per_month", **kwd)
)
elif operation == "user_per_month":
stored_workflow_id = kwd.get("id", None)
workflow = get_workflow(trans, stored_workflow_id)
if workflow.user:
kwd["email"] = workflow.user.email
else:
kwd["email"] = None # For anonymous users ( shouldn't happen with workflows )
return trans.response.send_redirect(web.url_for(controller="workflows", action="user_per_month", **kwd))
return self.specified_date_list_grid(trans, **kwd)
[docs] @web.expose
def per_month_all(self, trans, **kwd):
message = ""
PageSpec = namedtuple("PageSpec", ["entries", "offset", "page", "pages_found"])
specs = sorter("date", kwd)
sort_id = specs.sort_id
order = specs.order
arrow = specs.arrow
_order = specs.exc_order
offset = 0
limit = 10
if "entries" in kwd:
entries = int(kwd.get("entries"))
else:
entries = 10
limit = entries * 4
if "offset" in kwd:
offset = int(kwd.get("offset"))
else:
offset = 0
if "page" in kwd:
page = int(kwd.get("page"))
else:
page = 1
q = (
sa.select(
self.select_month(model.StoredWorkflow.table.c.create_time).label("date"),
sa.func.count(model.StoredWorkflow.table.c.id).label("total_workflows"),
)
.select_from(sa.outerjoin(model.StoredWorkflow.table, model.User.table))
.group_by(self.group_by_month(model.StoredWorkflow.table.c.create_time))
.order_by(_order)
.offset(offset)
.limit(limit)
)
all_workflows = sa.select(
self.select_day(model.StoredWorkflow.table.c.create_time).label("date"), model.StoredWorkflow.table.c.id
)
trends = {}
for workflow in trans.sa_session.execute(all_workflows):
workflow_day = int(workflow.date.strftime("%-d")) - 1
workflow_month = int(workflow.date.strftime("%-m"))
workflow_month_name = workflow.date.strftime("%B")
workflow_year = workflow.date.strftime("%Y")
key = str(workflow_month_name + workflow_year)
try:
trends[key][workflow_day] += 1
except KeyError:
workflow_year = int(workflow_year)
wday, day_range = calendar.monthrange(workflow_year, workflow_month)
trends[key] = [0] * day_range
trends[key][workflow_day] += 1
workflows = []
for row in trans.sa_session.execute(q):
month_name = row.date.strftime("%B")
year = int(row.date.strftime("%Y"))
workflows.append((row.date.strftime("%Y-%m"), row.total_workflows, month_name, year))
pages_found = ceil(len(workflows) / float(entries))
page_specs = PageSpec(entries, offset, page, pages_found)
return trans.fill_template(
"/webapps/reports/workflows_per_month_all.mako",
order=order,
arrow=arrow,
sort_id=sort_id,
trends=trends,
workflows=workflows,
message=message,
page_specs=page_specs,
)
[docs] @web.expose
def per_user(self, trans, **kwd):
message = ""
PageSpec = namedtuple("PageSpec", ["entries", "offset", "page", "pages_found"])
specs = sorter("user_email", kwd)
sort_id = specs.sort_id
order = specs.order
arrow = specs.arrow
_order = specs.exc_order
time_period = kwd.get("spark_time")
time_period, _time_period = get_spark_time(time_period)
spark_limit = 30
offset = 0
limit = 10
if "entries" in kwd:
entries = int(kwd.get("entries"))
else:
entries = 10
limit = entries * 4
if "offset" in kwd:
offset = int(kwd.get("offset"))
else:
offset = 0
if "page" in kwd:
page = int(kwd.get("page"))
else:
page = 1
workflows = []
q = (
sa.select(
model.User.table.c.email.label("user_email"),
sa.func.count(model.StoredWorkflow.table.c.id).label("total_workflows"),
)
.select_from(sa.outerjoin(model.StoredWorkflow.table, model.User.table))
.group_by("user_email")
.order_by(_order)
.offset(offset)
.limit(limit)
)
all_workflows_per_user = sa.select(
model.User.table.c.email.label("user_email"),
self.select_day(model.StoredWorkflow.table.c.create_time).label("date"),
model.StoredWorkflow.table.c.id,
).select_from(sa.outerjoin(model.StoredWorkflow.table, model.User.table))
currday = datetime.today()
trends = {}
for workflow in trans.sa_session.execute(all_workflows_per_user):
curr_user = re.sub(r"\W+", "", workflow.user_email)
try:
day = currday - workflow.date
except TypeError:
day = datetime.date(currday) - datetime.date(workflow.date)
day = day.days
container = floor(day / _time_period)
container = int(container)
try:
if container < spark_limit:
trends[curr_user][container] += 1
except KeyError:
trends[curr_user] = [0] * spark_limit
if container < spark_limit:
trends[curr_user][container] += 1
for row in trans.sa_session.execute(q):
workflows.append((row.user_email, row.total_workflows))
pages_found = ceil(len(workflows) / float(entries))
page_specs = PageSpec(entries, offset, page, pages_found)
return trans.fill_template(
"/webapps/reports/workflows_per_user.mako",
order=order,
arrow=arrow,
sort_id=sort_id,
spark_limit=spark_limit,
trends=trends,
time_period=time_period,
workflows=workflows,
message=message,
page_specs=page_specs,
)
[docs] @web.expose
def user_per_month(self, trans, **kwd):
params = util.Params(kwd)
message = ""
specs = sorter("date", kwd)
sort_id = specs.sort_id
order = specs.order
arrow = specs.arrow
_order = specs.exc_order
email = util.restore_text(params.get("email", ""))
user_id = trans.security.decode_id(params.get("id", ""))
q = (
sa.select(
self.select_month(model.StoredWorkflow.table.c.create_time).label("date"),
sa.func.count(model.StoredWorkflow.table.c.id).label("total_workflows"),
)
.where(model.StoredWorkflow.table.c.user_id == user_id)
.group_by(self.group_by_month(model.StoredWorkflow.table.c.create_time))
.order_by(_order)
)
all_workflows_user_month = sa.select(
self.select_day(model.StoredWorkflow.table.c.create_time).label("date"), model.StoredWorkflow.table.c.id
).where(model.StoredWorkflow.table.c.user_id == user_id)
trends = {}
for workflow in trans.sa_session.execute(all_workflows_user_month):
workflow_day = int(workflow.date.strftime("%-d")) - 1
workflow_month = int(workflow.date.strftime("%-m"))
workflow_month_name = workflow.date.strftime("%B")
workflow_year = workflow.date.strftime("%Y")
key = str(workflow_month_name + workflow_year)
try:
trends[key][workflow_day] += 1
except KeyError:
workflow_year = int(workflow_year)
wday, day_range = calendar.monthrange(workflow_year, workflow_month)
trends[key] = [0] * day_range
trends[key][workflow_day] += 1
workflows = []
for row in trans.sa_session.execute(q):
workflows.append(
(row.date.strftime("%Y-%m"), row.total_workflows, row.date.strftime("%B"), row.date.strftime("%Y"))
)
return trans.fill_template(
"/webapps/reports/workflows_user_per_month.mako",
email=util.sanitize_text(email),
order=order,
arrow=arrow,
sort_id=sort_id,
trends=trends,
workflows=workflows,
message=message,
)
[docs] @web.expose
def per_workflow(self, trans, **kwd):
message = ""
PageSpec = namedtuple("PageSpec", ["entries", "offset", "page", "pages_found"])
specs = sorter("workflow_name", kwd)
sort_id = specs.sort_id
order = specs.order
arrow = specs.arrow
_order = specs.exc_order
time_period = kwd.get("spark_time")
time_period, _time_period = get_spark_time(time_period)
spark_limit = 30
offset = 0
limit = 10
if "entries" in kwd:
entries = int(kwd.get("entries"))
else:
entries = 10
limit = entries * 4
if "offset" in kwd:
offset = int(kwd.get("offset"))
else:
offset = 0
if "page" in kwd:
page = int(kwd.get("page"))
else:
page = 1
# In case we don't know which is the monitor user we will query for all jobs
q = (
sa.select(
model.Workflow.table.c.id.label("workflow_id"),
sa.func.min(model.Workflow.table.c.name).label("workflow_name"),
sa.func.count(model.WorkflowInvocation.table.c.id).label("total_runs"),
)
.select_from(model.Workflow.table, model.WorkflowInvocation.table)
.where(sa.and_(model.WorkflowInvocation.table.c.workflow_id == model.Workflow.table.c.id))
.group_by(model.Workflow.table.c.id)
.order_by(_order)
.offset(offset)
.limit(limit)
)
all_runs_per_workflow = (
sa.select(
model.Workflow.table.c.id.label("workflow_id"),
model.Workflow.table.c.name.label("workflow_name"),
self.select_day(model.WorkflowInvocation.table.c.create_time).label("date"),
)
.select_from(model.Workflow.table, model.WorkflowInvocation.table)
.where(sa.and_(model.WorkflowInvocation.table.c.workflow_id == model.Workflow.table.c.id))
)
currday = date.today()
trends = {}
for run in trans.sa_session.execute(all_runs_per_workflow):
curr_tool = re.sub(r"\W+", "", str(run.workflow_id))
try:
day = currday - run.date
except TypeError:
day = currday - datetime.date(run.date)
day = day.days
container = floor(day / _time_period)
container = int(container)
try:
if container < spark_limit:
trends[curr_tool][container] += 1
except KeyError:
trends[curr_tool] = [0] * spark_limit
if container < spark_limit:
trends[curr_tool][container] += 1
runs = []
for row in trans.sa_session.execute(q):
runs.append((row.workflow_name, row.total_runs, row.workflow_id))
pages_found = ceil(len(runs) / float(entries))
page_specs = PageSpec(entries, offset, page, pages_found)
return trans.fill_template(
"/webapps/reports/workflows_per_workflow.mako",
order=order,
arrow=arrow,
sort_id=sort_id,
spark_limit=spark_limit,
time_period=time_period,
trends=trends,
runs=runs,
message=message,
page_specs=page_specs,
)
# ---- Utility methods -------------------------------------------------------
[docs]def get_workflow(trans, id):
return trans.sa_session.get(model.Workflow, trans.security.decode_id(id))