"""
Graph content classes.
"""
import logging
from galaxy.datatypes.dataproviders.column import ColumnarDataProvider
from galaxy.datatypes.dataproviders.dataset import DatasetDataProvider
from galaxy.datatypes.dataproviders.hierarchy import XMLDataProvider
from galaxy.datatypes.protocols import DatasetProtocol
from galaxy.util import simplegraph
from . import (
data,
dataproviders,
tabular,
xml,
)
log = logging.getLogger(__name__)
[docs]
@dataproviders.decorators.has_dataproviders
class Xgmml(xml.GenericXml):
"""
XGMML graph format
(http://wiki.cytoscape.org/Cytoscape_User_Manual/Network_Formats).
"""
file_ext = "xgmml"
[docs]
def set_peek(self, dataset: DatasetProtocol, **kwd) -> None:
"""
Set the peek and blurb text
"""
if not dataset.dataset.purged:
dataset.peek = data.get_file_peek(dataset.get_file_name())
dataset.blurb = "XGMML data"
else:
dataset.peek = "file does not exist"
dataset.blurb = "file purged from disk"
[docs]
def sniff(self, filename: str) -> bool:
"""
Returns false and the user must manually set.
"""
return False
[docs]
@staticmethod
def merge(split_files: list[str], output_file: str) -> None:
"""
Merging multiple XML files is non-trivial and must be done in subclasses.
"""
if len(split_files) > 1:
raise NotImplementedError(
"Merging multiple XML files is non-trivial " + "and must be implemented for each XML type"
)
# For one file only, use base class method (move/copy)
data.Text.merge(split_files, output_file)
[docs]
@dataproviders.decorators.dataprovider_factory("node-edge", XMLDataProvider.settings)
def node_edge_dataprovider(self, dataset: DatasetProtocol, **settings) -> "XGMMLGraphDataProvider":
dataset_source = DatasetDataProvider(dataset)
return XGMMLGraphDataProvider(dataset_source, **settings)
[docs]
@dataproviders.decorators.has_dataproviders
class Sif(tabular.Tabular):
"""
SIF graph format
(http://wiki.cytoscape.org/Cytoscape_User_Manual/Network_Formats).
First column: node id
Second column: relationship type
Third to Nth column: target ids for link
"""
file_ext = "sif"
[docs]
def set_peek(self, dataset: DatasetProtocol, **kwd) -> None:
"""
Set the peek and blurb text
"""
if not dataset.dataset.purged:
dataset.peek = data.get_file_peek(dataset.get_file_name())
dataset.blurb = "SIF data"
else:
dataset.peek = "file does not exist"
dataset.blurb = "file purged from disk"
[docs]
def sniff(self, filename: str) -> bool:
"""
Returns false and the user must manually set.
"""
return False
[docs]
@staticmethod
def merge(split_files: list[str], output_file: str) -> None:
data.Text.merge(split_files, output_file)
[docs]
@dataproviders.decorators.dataprovider_factory("node-edge", ColumnarDataProvider.settings)
def node_edge_dataprovider(self, dataset: DatasetProtocol, **settings) -> "SIFGraphDataProvider":
dataset_source = DatasetDataProvider(dataset)
return SIFGraphDataProvider(dataset_source, **settings)
# ----------------------------------------------------------------------------- graph specific data providers
[docs]
class XGMMLGraphDataProvider(XMLDataProvider):
"""
Provide two lists: nodes, edges::
'nodes': contains objects of the form:
{ 'id' : <some string id>, 'data': <any extra data> }
'edges': contains objects of the form:
{ 'source' : <an index into nodes>, 'target': <an index into nodes>, 'data': <any extra data> }
"""
def __iter__(self):
# use simple graph to store nodes and links, later providing them as a dict
# essentially this is a form of aggregation
graph = simplegraph.SimpleGraph()
parent_gen = super().__iter__()
for graph_elem in parent_gen:
if "children" not in graph_elem:
continue
for elem in graph_elem["children"]:
# use endswith to work around Elementtree namespaces
if elem["tag"].endswith("node"):
node_id = elem["attrib"]["id"]
# pass the entire, parsed xml element as the data
graph.add_node(node_id, **elem)
elif elem["tag"].endswith("edge"):
source_id = elem["attrib"]["source"]
target_id = elem["attrib"]["target"]
graph.add_edge(source_id, target_id, **elem)
yield graph.as_dict()
[docs]
class SIFGraphDataProvider(ColumnarDataProvider):
"""
Provide two lists: nodes, edges::
'nodes': contains objects of the form:
{ 'id' : <some string id>, 'data': <any extra data> }
'edges': contains objects of the form:
{ 'source' : <an index into nodes>, 'target': <an index into nodes>, 'data': <any extra data> }
"""
def __iter__(self):
# use simple graph to store nodes and links, later providing them as a dict
# essentially this is a form of aggregation
graph = simplegraph.SimpleGraph()
# SIF is tabular with the source, link-type, and all targets in the columns
parent_gen = super().__iter__()
for columns in parent_gen:
if columns:
source_id = columns[0]
# there's no extra data for nodes (or links) in the examples I've seen
graph.add_node(source_id)
# targets are the (variadic) remaining columns
if len(columns) >= 3:
relation = columns[1]
targets = columns[2:]
for target_id in targets:
graph.add_node(target_id)
graph.add_edge(source_id, target_id, type=relation)
yield graph.as_dict()