Source code for tigerlily.pagerank

"""Personalized PageRank computation with TigerGraph."""

import json
import typing
from typing import Dict, List

import pandas as pd
import pyTigerGraph as tg  # noqa: N813
import requests
from tqdm.notebook import tqdm


[docs]class PersonalizedPageRankMachine: """Define a drug-protein graph and compute the Personalized PageRank of nodes.""" def __init__(self, host: str, graphname: str, username: str, secret: str, password: str): """Set up a Personalized PageRank computation machine. :param host: Address of the TigerGraph host. :param graphname: Name of the Graph used for analytics. :param username: The username for the grapgh :param secret: The secret generated in TigerGraph Studio. :param password: The password of the user. """ self._host = host self._graphname = graphname self._username = username self._secret = secret self._password = password
[docs] def connect(self): """Connect to the host with the authentication details.""" token_getter = tg.TigerGraphConnection(host=self._host, graphname=self._graphname) token = token_getter.getToken(self._secret, "12000")[0] self.connection = tg.TigerGraphConnection( host=self._host, graphname=self._graphname, username=self._username, password=self._password, apiToken=token )
def _purge_graph(self): """Delete the ecisting drug and gene type nodes.""" self.connection.delVertices("drug") self.connection.delVertices("gene") def _upload_relationship(self, edges: pd.DataFrame, source: str, target: str, edge_type: str = "interacts"): """Given an edge dataframe uploading the edges corresponding to specific source and target types. :param edges: Edges dataframe of interest. :param source: Source node type. :param target: Target node type. :param edge_type: The type of edges. """ sub_edges = edges[(edges["type_1"] == source) & (edges["type_2"] == target)] sub_edges = [(edge[0], edge[1], {}) for edge in sub_edges[["node_1", "node_2"]].values.tolist()] self.connection.upsertEdges(source, edge_type, target, sub_edges)
[docs] def upload_graph(self, new_graph: bool, edges: pd.DataFrame): """ Uploadthe edges from a dataframe using the PyTigerGraph connection. :param new_graph: Decision about deleting the existing nodes in the graph. :param edges: The dataframe with the edges between drugs and proteins. """ assert "type_1" in edges.columns and "type_2" in edges.columns assert "node_1" in edges.columns and "node_2" in edges.columns if new_graph: self._purge_graph() self._upload_relationship(edges, "drug", "gene", "interacts") self._upload_relationship(edges, "gene", "gene", "interacts") self._upload_relationship(edges, "gene", "drug", "interacts")
[docs] def install_query( self, url: str = "https://raw.githubusercontent.com/tigergraph/gsql-graph-algorithms/master/algorithms/Centrality/pagerank/personalized/multi_source/tg_pagerank_pers.gsql", # noqa:E501 ): """Install a query on the host. :param url: A url to the query string. """ script = requests.get(url).text script = script.replace("CREATE QUERY", "CREATE OR REPLACE QUERY") self.connection.gsql(script) self.connection.gsql("INSTALL QUERY ALL")
[docs] @typing.no_type_check def personalized_pagerank( self, node_id: str, node_type: str = "drug", edge_type: str = "interacts", print_accum: bool = True, damping: float = 0.85, iterations: int = 20, top_k: int = 40, ) -> Dict: """Compute the pagerank for a specific node. :param node_id: Identifier of the node of interest. :param node_type: Type of the node. :param edge_type: Type of the edge. :param print_accum: Accumulation flag. :param damping: Non return probability. :param iterations: Number of steps per walk. :param top_k: Number of closest neighbors to return for the query. :returns: Personalized PageRank nodes for a specific node in the Graph. """ params = {} params["source"] = [{"type": node_type, "id": node_id}] params["e_type"] = edge_type params["print_accum"] = print_accum params["damping"] = damping params["iter"] = iterations params["top_k"] = top_k query = "RUN QUERY tg_pagerank_pers(" + json.dumps(params) + ")" response = self.connection.gsql(query) response = json.loads(response) return response
[docs] def get_personalized_pagerank( self, node_ids: List, edge_type: str = "interacts", print_accum: bool = True, damping: float = 0.5, iterations: int = 100, top_k: int = 100, ) -> pd.DataFrame: """ Compute the pruned Personalized PageRank for a list of nodes. :param node_ids: Identifiers of the nodes of interest. :param edge_type: Type of the node. :param print_accum: Accumulation flag. :param damping: Non return probability. :param iterations: Number of steps per walk. :param top_k: Number of closest neighbors to return for the query. :returns: A table of node pairs with PageRank scores. """ all_scores = [] for node_id in tqdm(node_ids): scores = self.personalized_pagerank( node_id["v_id"], node_id["v_type"], edge_type, print_accum, damping, iterations, top_k ) scores = scores["results"][0]["top_scores"] scores = [[node_id["v_id"], edge["vertex_id"], edge["score"]] for edge in scores] scores = pd.DataFrame(scores, columns=["node_1", "node_2", "score"]) all_scores.append(scores) all_scores = pd.concat(all_scores) return all_scores