Source code for galaxy.workflow.render

import svgwrite

MARGIN = 5
LINE_SPACING = 15
STANDALONE_SVG_TEMPLATE = """<?xml version="1.0" standalone="no"?>
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN"
  "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
%s"""


[docs]class WorkflowCanvas:
[docs] def __init__(self): self.canvas = svgwrite.Drawing(profile="full") self.connectors = [] self.boxes = [] self.text = [] self.in_pos = {} self.out_pos = {} self.widths = {} self.max_x = 0 self.max_y = 0 self.max_width = 0 self.data = []
[docs] def finish(self, for_embed=False): # max_x, max_y, max_width = self.max_x, self.max_y, self.max_width for box in self.boxes: self.canvas.add(box) for connector in self.connectors: self.canvas.add(connector) text_style_layer = self.canvas.g(style="font-family: Helvetica, Arial, FreeSans, Sans, sans, sans-serif;") for text in self.text: text_style_layer.add(text) self.canvas.add(text_style_layer) # if we're embedding this in HTML - setup a viewbox and preserve aspect ratio # https://css-tricks.com/scale-svg/#aa-the-viewbox-attribute if for_embed: self.canvas.viewbox(-5, -5, self.max_x + self.max_width, self.max_y + 150) self.canvas.fit() return self.canvas
[docs] def add_boxes(self, step_dict, width, name_fill): x, y = step_dict["position"]["left"], step_dict["position"]["top"] self.boxes.append(svgwrite.shapes.Rect((x - MARGIN, y), (width, 30), fill=name_fill, stroke="#000000")) box_height = (len(step_dict["data_inputs"]) + len(step_dict["data_outputs"])) * LINE_SPACING + MARGIN # Draw separator line. if len(step_dict["data_inputs"]) > 0: box_height += LINE_SPACING sep_y = y + len(step_dict["data_inputs"]) * LINE_SPACING + 40 self.text.append( svgwrite.shapes.Line((x - MARGIN, sep_y), (x + width - MARGIN, sep_y), stroke=svgwrite.rgb(0, 0, 0)) ) # Define an input/output box. self.boxes.append( svgwrite.shapes.Rect( (x - MARGIN, y + 30), (width, box_height), fill="#ffffff", stroke=svgwrite.rgb(0, 0, 0) ) )
[docs] def add_text(self, module_data_inputs, module_data_outputs, step, module_name): left, top = step.position["left"], step.position["top"] x, y = left, top order_index = step.order_index max_len = len(module_name) * 1.5 self.text.append(svgwrite.text.Text(module_name, (x, y + 20), style="font-size:14px")) y += 45 count = 0 in_pos = self.in_pos out_pos = self.out_pos for di in module_data_inputs: cur_y = y + count * LINE_SPACING if order_index not in in_pos: in_pos[order_index] = {} in_pos[order_index][di["name"]] = (x, cur_y) self.text.append(svgwrite.text.Text(di["label"], (x, cur_y), style="font-size:10px")) count += 1 max_len = max(max_len, len(di["label"])) if len(module_data_inputs) > 0: y += LINE_SPACING for do in module_data_outputs: cur_y = y + count * LINE_SPACING if order_index not in out_pos: out_pos[order_index] = {} out_pos[order_index][do["name"]] = (x, cur_y) self.text.append(svgwrite.text.Text(do["name"], (x, cur_y), style="font-size:10px")) count += 1 max_len = max(max_len, len(do["name"])) self.widths[order_index] = max_len * 5.5 self.max_x = max(self.max_x, left) self.max_y = max(self.max_y, top) self.max_width = max(self.max_width, self.widths[order_index])
[docs] def add_connection(self, step_dict, conn, output_dict): in_coords = self.in_pos[step_dict["id"]][conn] # out_pos_index will be a step number like 1, 2, 3... out_pos_index = output_dict["id"] # out_pos_name will be a string like 'o', 'o2', etc. out_pos_name = output_dict["output_name"] if out_pos_index in self.out_pos: # out_conn_index_dict will be something like: # 7: {'o': (824.5, 618)} out_conn_index_dict = self.out_pos[out_pos_index] if out_pos_name in out_conn_index_dict: out_conn_pos = out_conn_index_dict[out_pos_name] else: # Take any key / value pair available in out_conn_index_dict. # A problem will result if the dictionary is empty. if out_conn_index_dict: key = next(iter(out_conn_index_dict.keys())) out_conn_pos = self.out_pos[out_pos_index][key] adjusted = (out_conn_pos[0] + self.widths[output_dict["id"]], out_conn_pos[1]) self.text.append( svgwrite.shapes.Circle( center=(out_conn_pos[0] + self.widths[output_dict["id"]] - MARGIN, out_conn_pos[1] - MARGIN), r=5, fill="#ffffff", stroke="#000000", ) ) marker = self.canvas.marker( overflow="visible", refX="0", refY="5", viewBox="0 0 10 5", markerWidth="8", markerHeight="10", markerUnits="strokeWidth", orient="auto", stroke="none", fill="black", ) marker.add(self.canvas.path(d="M 0 0 L 10 5 L 0 10 z")) self.canvas.defs.add(marker) conn = svgwrite.shapes.Line( (adjusted[0], adjusted[1] - MARGIN), (in_coords[0] - 10, in_coords[1]), stroke="#000000" ) conn["marker-end"] = marker.get_funciri() self.connectors.append(conn)
[docs] def add_steps(self, highlight_errors=False): # Only highlight missing tools if displaying in the tool shed. for step_dict in self.data: tool_unavailable = step_dict.get("tool_errors", False) if highlight_errors and tool_unavailable: fill = "#EBBCB2" else: fill = "#EBD9B2" width = self.widths[step_dict["id"]] self.add_boxes(step_dict, width, fill) for conn, output_dict in step_dict["input_connections"].items(): self.add_connection(step_dict, conn, output_dict)
[docs] def populate_data_for_step(self, step, module_name, module_data_inputs, module_data_outputs, tool_errors=None): step_dict = { "id": step.order_index, "data_inputs": module_data_inputs, "data_outputs": module_data_outputs, "position": step.position, } if tool_errors: step_dict["tool_errors"] = tool_errors input_conn_dict = {} for conn in step.input_connections: input_conn_dict[conn.input_name] = dict(id=conn.output_step.order_index, output_name=conn.output_name) step_dict["input_connections"] = input_conn_dict self.data.append(step_dict) self.add_text(module_data_inputs, module_data_outputs, step, module_name)