Warning
This document is for an old release of Galaxy. You can alternatively view this page in the latest release if it exists or view the top of the latest release's documentation.
Source code for galaxy.workflow.render
import svgwrite
MARGIN = 5
LINE_SPACING = 15
STANDALONE_SVG_TEMPLATE = """<?xml version="1.0" standalone="no"?>
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
"http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
%s"""
[docs]class WorkflowCanvas:
[docs] def __init__(self):
self.canvas = svgwrite.Drawing(profile="full")
self.connectors = []
self.boxes = []
self.text = []
self.in_pos = {}
self.out_pos = {}
self.widths = {}
self.max_x = 0
self.max_y = 0
self.max_width = 0
self.data = []
[docs] def finish(self):
# max_x, max_y, max_width = self.max_x, self.max_y, self.max_width
for box in self.boxes:
self.canvas.add(box)
for connector in self.connectors:
self.canvas.add(connector)
text_style_layer = self.canvas.g(style="font-family: Helvetica, Arial, FreeSans, Sans, sans, sans-serif;")
for text in self.text:
text_style_layer.add(text)
self.canvas.add(text_style_layer)
return self.canvas
[docs] def add_boxes(self, step_dict, width, name_fill):
x, y = step_dict["position"]["left"], step_dict["position"]["top"]
self.boxes.append(svgwrite.shapes.Rect((x - MARGIN, y), (width, 30), fill=name_fill, stroke="#000000"))
box_height = (len(step_dict["data_inputs"]) + len(step_dict["data_outputs"])) * LINE_SPACING + MARGIN
# Draw separator line.
if len(step_dict["data_inputs"]) > 0:
box_height += LINE_SPACING
sep_y = y + len(step_dict["data_inputs"]) * LINE_SPACING + 40
self.text.append(
svgwrite.shapes.Line((x - MARGIN, sep_y), (x + width - MARGIN, sep_y), stroke=svgwrite.rgb(0, 0, 0))
)
# Define an input/output box.
self.boxes.append(
svgwrite.shapes.Rect(
(x - MARGIN, y + 30), (width, box_height), fill="#ffffff", stroke=svgwrite.rgb(0, 0, 0)
)
)
[docs] def add_text(self, module_data_inputs, module_data_outputs, step, module_name):
left, top = step.position["left"], step.position["top"]
x, y = left, top
order_index = step.order_index
max_len = len(module_name) * 1.5
self.text.append(svgwrite.text.Text(module_name, (x, y + 20), style="font-size:14px"))
y += 45
count = 0
in_pos = self.in_pos
out_pos = self.out_pos
for di in module_data_inputs:
cur_y = y + count * LINE_SPACING
if order_index not in in_pos:
in_pos[order_index] = {}
in_pos[order_index][di["name"]] = (x, cur_y)
self.text.append(svgwrite.text.Text(di["label"], (x, cur_y), style="font-size:10px"))
count += 1
max_len = max(max_len, len(di["label"]))
if len(module_data_inputs) > 0:
y += LINE_SPACING
for do in module_data_outputs:
cur_y = y + count * LINE_SPACING
if order_index not in out_pos:
out_pos[order_index] = {}
out_pos[order_index][do["name"]] = (x, cur_y)
self.text.append(svgwrite.text.Text(do["name"], (x, cur_y), style="font-size:10px"))
count += 1
max_len = max(max_len, len(do["name"]))
self.widths[order_index] = max_len * 5.5
self.max_x = max(self.max_x, left)
self.max_y = max(self.max_y, top)
self.max_width = max(self.max_width, self.widths[order_index])
[docs] def add_connection(self, step_dict, conn, output_dict):
in_coords = self.in_pos[step_dict["id"]][conn]
# out_pos_index will be a step number like 1, 2, 3...
out_pos_index = output_dict["id"]
# out_pos_name will be a string like 'o', 'o2', etc.
out_pos_name = output_dict["output_name"]
if out_pos_index in self.out_pos:
# out_conn_index_dict will be something like:
# 7: {'o': (824.5, 618)}
out_conn_index_dict = self.out_pos[out_pos_index]
if out_pos_name in out_conn_index_dict:
out_conn_pos = out_conn_index_dict[out_pos_name]
else:
# Take any key / value pair available in out_conn_index_dict.
# A problem will result if the dictionary is empty.
if out_conn_index_dict:
key = next(iter(out_conn_index_dict.keys()))
out_conn_pos = self.out_pos[out_pos_index][key]
adjusted = (out_conn_pos[0] + self.widths[output_dict["id"]], out_conn_pos[1])
self.text.append(
svgwrite.shapes.Circle(
center=(out_conn_pos[0] + self.widths[output_dict["id"]] - MARGIN, out_conn_pos[1] - MARGIN),
r=5,
fill="#ffffff",
stroke="#000000",
)
)
marker = self.canvas.marker(
overflow="visible",
refX="0",
refY="5",
viewBox="0 0 10 5",
markerWidth="8",
markerHeight="10",
markerUnits="strokeWidth",
orient="auto",
stroke="none",
fill="black",
)
marker.add(self.canvas.path(d="M 0 0 L 10 5 L 0 10 z"))
self.canvas.defs.add(marker)
conn = svgwrite.shapes.Line(
(adjusted[0], adjusted[1] - MARGIN), (in_coords[0] - 10, in_coords[1]), stroke="#000000"
)
conn["marker-end"] = marker.get_funciri()
self.connectors.append(conn)
[docs] def add_steps(self, highlight_errors=False):
# Only highlight missing tools if displaying in the tool shed.
for step_dict in self.data:
tool_unavailable = step_dict.get("tool_errors", False)
if highlight_errors and tool_unavailable:
fill = "#EBBCB2"
else:
fill = "#EBD9B2"
width = self.widths[step_dict["id"]]
self.add_boxes(step_dict, width, fill)
for conn, output_dict in step_dict["input_connections"].items():
self.add_connection(step_dict, conn, output_dict)
[docs] def populate_data_for_step(self, step, module_name, module_data_inputs, module_data_outputs, tool_errors=None):
step_dict = {
"id": step.order_index,
"data_inputs": module_data_inputs,
"data_outputs": module_data_outputs,
"position": step.position,
}
if tool_errors:
step_dict["tool_errors"] = tool_errors
input_conn_dict = {}
for conn in step.input_connections:
input_conn_dict[conn.input_name] = dict(id=conn.output_step.order_index, output_name=conn.output_name)
step_dict["input_connections"] = input_conn_dict
self.data.append(step_dict)
self.add_text(module_data_inputs, module_data_outputs, step, module_name)