diff --git a/src/pathpyG/__init__.py b/src/pathpyG/__init__.py index 66406411c..680487f3e 100644 --- a/src/pathpyG/__init__.py +++ b/src/pathpyG/__init__.py @@ -14,6 +14,5 @@ from pathpyG import io from pathpyG import nn from pathpyG import algorithms -from pathpyG import processes from pathpyG import statistics from pathpyG.visualisations import plot, layout diff --git a/src/pathpyG/processes/__init__.py b/src/pathpyG/processes/__init__.py deleted file mode 100644 index efeed517b..000000000 --- a/src/pathpyG/processes/__init__.py +++ /dev/null @@ -1,19 +0,0 @@ -"""Module for pathpy processes.""" - -# !/usr/bin/python -tt -# -*- coding: utf-8 -*- -# ============================================================================= -# File : __init__.py -- initialisation of processes -# Author : Ingo Scholtes -# Time-stamp: -# -# Copyright (c) 2016-2020 Pathpy Developers -# ============================================================================= -# flake8: noqa -# pylint: disable=unused-import - -from pathpyG.processes.random_walk import RandomWalk - -# from pathpyG.processes.epidemic_spreading import EpidemicSIR -from pathpyG.processes.sampling import VoseAliasSampling -from pathpyG.processes.random_walk import HigherOrderRandomWalk diff --git a/src/pathpyG/processes/process.py b/src/pathpyG/processes/process.py deleted file mode 100644 index fe08a1eb9..000000000 --- a/src/pathpyG/processes/process.py +++ /dev/null @@ -1,245 +0,0 @@ -"""Base classes for simulation of dynamical processes""" - -import abc -from collections import defaultdict -from typing import Iterable, TYPE_CHECKING, Any, Optional, List, Dict, Tuple, Union, Set - -from pandas import DataFrame - -from pathpyG import Graph -from tqdm import tqdm - - -class BaseProcess: - """Abstract base class for all implementations of discrete-time dynamical processes.""" - - def __init__(self, network: Graph): - """initialize process.""" - self._network = network - self.init(self.random_seed()) - - @property - def network(self) -> Graph: - return self._network - - @abc.abstractmethod - def init(self, seed: Any) -> None: - """Abstract method to initialize the process with a given seed state.""" - - @abc.abstractmethod - def random_seed(self) -> Any: - """Abstract method to generate a random seed state for the process.""" - - @abc.abstractmethod - def step(self) -> Iterable[str]: - """Abstract method to simulate a single step of the process. Returns - an iterable of node uids whose state has been changed in this step.""" - - @abc.abstractproperty - def time(self) -> int: - """Abstract property returning the current time.""" - - @abc.abstractmethod - def state_to_color(self, Any) -> Union[Tuple[int, int, int], str]: - """Abstract method mapping node states to RGB colors or color names.""" - - @abc.abstractmethod - def node_state(self, v: str) -> Any: - """Abstract method returning the current state of a given node.""" - - def simulation_run(self, steps: int, seed: Optional[Any] = None) -> Tuple[int, Set[str]]: - """Abstract generator method that initializes the process, runs a number of steps and yields a tuple consisting of the current time and the set of nodes whose state has changed in each step.""" - if seed == None: - self.init(self.random_seed()) - else: - self.init(seed) - for _ in range(steps): - ret = self.step() - if ret is not None: - yield self.time, ret - else: - return None - - def run_experiment(self, steps: int, runs: Optional[Union[int, Iterable[Any]]] = 1) -> DataFrame: - """Perform one or more simulation runs of the process with a given number of steps.""" - - # Generate initializations for different runs - seeds: List = list() - if type(runs) == int: - for s in range(runs): - seeds.append(self.random_seed()) - else: - for s in runs: - seeds.append(s) - - results = list() - run_id: int = 0 - for seed in tqdm(seeds): - - # initialize seed state and record initial state - self.init(seed) - for v in self.network.nodes: - results.append( - {"run_id": run_id, "seed": seed, "time": self.time, "node": v, "state": self.node_state(v)} - ) - - # simulate the given number of steps - for time, updated_nodes in self.simulation_run(steps, seed): - # print(updated_nodes) - # record the new state of each changed node - for v in updated_nodes: - results.append( - {"run_id": run_id, "seed": seed, "time": time, "node": v, "state": self.node_state(v)} - ) - run_id += 1 - - return DataFrame.from_dict(results) - - # TODO : add plot method - # def plot(self, data: DataFrame, run_id: int = 0, timescale: Optional[int] = 1, **kwargs): - # """ - # Display an interactive plot of the evolution of a process based on a recorded simulation experiment - - # Parameters - # ---------- - # data: DataFrame - # A pandas dataframe containing the state changes recorded in a simulation of the process, as generated by function `run_experiment` - - # run_id: Optional[int]=0 - # The integer identifier of the simulation run contained in `data` that shall be visualized. - # If omitted, a default value of zero is used, i.e. the first simulation run in `data` will - # be visualized. - - # timescale: Optional[int]=100 - # Determines the speed of the visualisation. For the default value of 100, each simulation step - # will be displayed for 100 timesteps in the visualisation. - - # **kwargs - # Optional keyword-arguments that will be passed to the plot function of the underlying instance - # of TemporalNetwork - - # Examples - # -------- - - # Generate 10 random walks and visualize the walk dynamics of the run with id 3 - - # >>> n = pp.Network(directed=False) - # >>> n.add_edge('a', 'b') - # >>> rw = pp.processes.RandomWalk(n) - # >>> data = rw.run_experiment(steps=100, runs=10) - # >>> rw.plot(data, run_id=3) - - # See Also: - # --------- - # TemporalNetwork, plot, RandomWalk, HigherOrderRandomWalk, EpidemicSIR - # """ - - # evolution: DataFrame = data.loc[data['run_id'] == run_id] - - # start_time = evolution.min()['time'] - # end_time = evolution.max()['time'] - - # if end_time <= start_time: - # LOG.warning('Run data does not contain time evolution') - # return None - - # # create network with temporal attributes - # tn = TemporalNetwork(directed=self.network.directed) - - # # add nodes and set initial state - # for v in self.network.nodes.uids: - # tv = TemporalNode(v) - # tn.add_node(tv) - # tv[start_time, 'color'] = self.state_to_color(evolution.loc[( - # evolution['time'] == start_time) & (evolution['node'] == v)]['state'].values[0]) - - # # if process is simulated on temporal network - # if isinstance(self.network, TemporalNetwork): - # for edge in self.network.edges[start_time:end_time]: - # tn.add_edge(edge.v.uid, edge.w.uid, start=max( - # edge.start, start_time), end=min(end_time, edge.end)) - - # # update states - # for index, row in evolution.iterrows(): - # tn.nodes[row['node']][row['time'], - # 'color'] = self.state_to_color(row['state']) - # # if process is simulated on static network - # else: - # # add all edges - # for e in self.network.edges: - # tn.add_edge(e.v.uid, e.w.uid, start=start_time, - # end=end_time*timescale) - - # # update states - # for index, row in evolution.iterrows(): - # tn.nodes[row['node']][row['time']*timescale, - # 'color'] = self.state_to_color(row['state']) - # return tn.plot(node_color=self.state_to_color(False), **kwargs) - - # TODO: - - # def to_directed_acylic_graph(self, data: DataFrame, run_id: Optional[int] = 0, time_delta: Optional[int] = None, states: Optional[Iterable[Any]] = None) -> DirectedAcyclicGraph: - # """Returns a directed acyclic graph representation of all state changes over time. - # In this graph an edge (v_t' -> w_t) indicates that node w changed to state x at time t after a - # connected node v previously changed its state to x at time t' < t (i.e. (v,w) exists in the network). - - # A link (v-t') -> (w-t) in the directed acyclic graph indicates that node v may have causally influenced node w at time t. As an example, for a an SIR epidemic spreading process, the DAG representation captures possible transmission routes. - - # Parameters - # ---------- - # data: DataFrame - # recorded state changes of nodes, as returned by `run_experiment` - - # run_id: Optional[int]=0 - # identifier of simulation run to turn into DAG - - # time_delta: Optional[int]=None - # maximum time difference of possible influence, i.e. if set to delta, any state changes between connected nodes that are apart further than delta time steps are not considered. If None (default) the last prior state change of any connected node is considered, independent of the - # time distance - - # states: Optional[Iterable[Any]]=None - # Only changes to states in this set will be considered. If None (default) all state changes will be considered - # """ - # dag = DirectedAcyclicGraph(uid='{0}'.format(run_id)) - # run = data.loc[data['run_id'] == run_id] - - # for index, row in run.iterrows(): - # # add temporal node - # state = row['state'] - # if states and state not in states: - # continue - - # w = row['node'] - # t = row['time'] - # uid = '{0}-{1}'.format(w, t) - # dag.add_node(uid, node_label=w, time=t, state=state) - - # # find predecessor of node v that last changed its state - # predecessors = [] - # for v in self._network.predecessors[w]: - - # # get all state changes of node v prior to time t - # candidates = run.loc[(run['node'] == v.uid) & (run['time'] < t)] - - # if len(candidates) > 0: - - # # find time stamp and new state of last state change - # r = candidates['time'].argmax() - # last_time = candidates.iloc[r]['time'] - # last_state = candidates.iloc[r]['state'] - - # # check last state change and time difference - # if last_state in states and (time_delta is None or (t-last_time) < time_delta): - # pred_uid = '{0}-{1}'.format(v.uid, last_time) - # if pred_uid not in dag.nodes: - # predecessors.append( - # Node(pred_uid, node_label=v.uid, time=last_time, state=last_state)) - # else: - # predecessors.append(dag.nodes[pred_uid]) - # # predecessors = ['{0}-{1}'.format(v.uid, t_p)] - # # elif : - - # for v in predecessors: - # dag.add_edge(v, dag.nodes[uid]) - - # return dag diff --git a/src/pathpyG/processes/random_walk.py b/src/pathpyG/processes/random_walk.py deleted file mode 100644 index 1c301b2e0..000000000 --- a/src/pathpyG/processes/random_walk.py +++ /dev/null @@ -1,578 +0,0 @@ -"""Classes to simlate random walks on static, temporal, and higher-order networks. -""" - -from __future__ import annotations -import abc - -from scipy.sparse.construct import random - -from typing import Any, Iterable, Optional, Union, Set, Tuple - -import numpy as np -import scipy as sp # pylint: disable=import-error -from scipy.sparse import linalg as spl -from scipy import linalg as spla -from pandas import DataFrame -from pathpyG import PathData -from pathpyG import Graph - -from .sampling import VoseAliasSampling -from .process import BaseProcess - -# create custom types -Weight = Union[str, bool, None] - - -class RandomWalk(BaseProcess): - """Class that implements a biased random walk process in a network. - - Instances of this class can be used to simulate random walk processes in any instance - of the class Graph. The random walk process can include weighted edges as well as a - restart probability, i.e. a per-step probability to teleport to a - randomly chosen node. - - Since any instance of HigherOrderGraph is also an instance of Graph, this class - can be directly be applied to simulate random walks in higher-order networks. However, - the state space of such a random walk is given by the higher-order nodes. If you wish to - simulate a higher-order random walk while projecting states to the corresponding first-order - network, you should use the class HigherOrderRandomWalk instead. - - The implementation follows the general concept to simulate discrete-time (stochastic) processes - as implemented in the base class BaseProcess. Hence, the user can either use the iterator interface - to iterate through the steps of a single random walk process, or use the `run_experiment` function - to simulate multiple runs of a random walk with different start nodes (i.e. seeds). - - The `run_experiment` function returns a pandas DataFrame object that contains all node state changes - during the process' evolution. This data frame can be converted to Path and PathCollection objects - and it can be visualized using the plot function. - - Examples: - Generate and visualize a single biased random walk with 10 steps on a network - - >>> import pathpyG as pp - >>> g = pp.Graph.from_edge_list([['a','b'], ['b','c'], ['c','a'], ['c','d'], ['d','a']]) - >>> rw = pp.processes.RandomWalk(g, weight='edge_weight') - >>> data = rw.run_experiment(steps=10, seed='a') - >>> rw.plot(data) - [interactive visualization] - - Generate a single random walk with 10 steps starting from node 'a' and - return a WalkData instance - - >>> p = rw.get_path(rw.run_experiment(steps=10, runs=['a'])) - - Generate one random walk with 10 steps starting from each node and - return a PathCollection instance - - >>> pc = rw.get_paths(rw.run_experiment(steps=10, runs=g.nodes)) - [ 'a', 'b', 'c', 'a', 'a', 'b', 'c', 'd', 'a', 'b'] - [ 'd', 'a', 'b', 'c', 'd', 'a', 'b', 'c', 'a', 'b', 'c' ] - [ 'c', 'a', 'b', 'c', 'a', 'b', 'c', 'd', 'a', 'b', 'c' ] - [ 'b', 'c', 'a', 'b', 'c', 'd', 'a', 'b', 'c', 'a', 'b' ] - - Simulate a random walk using the iterator interface, which provides full access - to the state after each simulation step - - >>> for time, _ in rw.simulation_run(steps=5, seed='a'): - >>> print('Current node = {0}'.format(rw.current_node)) - >>> print(rw.visitation_frequencies) - Current node = b - [0.5 0.5 0. 0. ] - Current node = c - [0.33333333 0.33333333 0.33333333 0. ] - Current node = d - [0.25 0.25 0.25 0.25] - Current node = a - [0.4 0.2 0.2 0.2] - Current node = b - [0.33333333 0.33333333 0.16666667 0.16666667] - Current node = a - [0.42857143 0.28571429 0.14285714 0.14285714] - Current node = c - [0.375 0.25 0.25 0.125] - Current node = a - [0.44444444 0.22222222 0.22222222 0.11111111] - Current node = b - [0.4 0.3 0.2 0.1] - Current node = a - [0.45454545 0.27272727 0.18181818 0.09090909] - """ - - def __init__(self, network: Graph, weight: Optional[Weight] = None, restart_prob: float = 0) -> None: - """Creates a biased random walk process in a network. - - Args: - network: The network instance on which to perform the random walk process. Can also - be an instance of HigherOrderNetwork. - weight: If specified, the given numerical edge attribute will be used to bias - the random walk transition probabilities. - restart_probability: The per-step probability that a random walker restarts in a random node - """ - - # transition matrix of random walk - self._transition_matrix = RandomWalk.compute_transition_matrix(network, weight, restart_prob) - - # initialize Vose Alias Samplers - - self.samplers = { - v: VoseAliasSampling( - np.nan_to_num(np.ravel(self._transition_matrix[network.mapping.to_idx(v), :].todense())) - ) - for v in network.nodes - } - - # compute eigenvectors and eigenvalues of transition matrix - if network.n > 2: - _, eigenvectors = spl.eigs(self._transition_matrix.transpose(), k=1, which="LM") - pi = eigenvectors.reshape( - eigenvectors.size, - ) - else: - eigenvals, eigenvectors = spla.eig(self._transition_matrix.transpose().toarray()) - x = np.argsort(-eigenvals) - pi = eigenvectors[x][:, 0] - - # calculate stationary visitation probabilities - self._stationary_probabilities = np.real(pi / np.sum(pi)) - - self._network = network - self.init(self.random_seed()) - - def init(self, seed: str) -> None: - """ - Initializes the random walk state with a given seed/source node - - Args: - seed: Id of node in which the random walk will start - """ - # reset currently visited node (or higher-order node) - self._current_node = seed - - # set time - self._t = 0 - - # set number of times each node has been visited - self._visitations = np.ravel(np.zeros(shape=(1, self._network.n))) - self._visitations[self._network.mapping.to_idx(seed)] = 1 - - def random_seed(self) -> Any: - """ - Returns a random node from the network, chosen uniformly at random - """ - x = np.random.choice(range(self._network.n)) - return self._network.mapping.to_id(x) - - def step(self) -> Iterable[str]: - """ - Function that will be called for each step of the random walk. This function - returns a tuple, where the first entry is the id of the currently visited node and the second entry is the id of the previously visited node. - """ - - # determine next node - next_node = self.network.mapping.to_id(self.samplers[self._current_node].sample()) - # TODO: assertion will not hold if restart_prob > 0 - # assert (self._current_node, next_node) in self._network.edges, 'Assertion Error: {0} not in edge list'.format( - # (self._current_node, next_node)) - - previous_node = self._current_node - self._current_node = next_node - - # increment visitations and current time - self._visitations[self._network.mapping.to_idx(self._current_node)] += 1 - self._t += 1 - - # return tuple of changed nodes, where the first node is the currently visited node - return (self._current_node, previous_node) - - def node_state(self, v) -> bool: - """ - Returns a boolean variable indicating whether the walker is currently - visiting (first-order) node v - """ - if v in self._network.nodes: - return v == self._current_node - # TODO: Error here! - elif type(self._network) == HigherOrderGraph: - return v == self._network.mapping.to_id(self._current_node)[-1] - else: - raise NotImplementedError("Random walk not implemented for network of type {0}".format(type(self._network))) - - @property - def time(self) -> int: - """ - The current time of the random walk process, i.e. the number of steps taken since the start node. - """ - return self._t - - def state_to_color(self, state: bool) -> str: - """ - Maps the current (visitation) state of nodes to colors for visualization. The state is True for the currently visited node and False for all other nodes. - - Args: - state: Current visitation state - """ - if state: - return "red" - else: - return "blue" - - @staticmethod - def compute_transition_matrix( - network: Graph, weight: Optional[Weight] = None, restart_prob: float = 0 - ) -> sp.sparse.csr_matrix: - """Returns the transition matrix of a (biased) random walk in the given network. - - Returns a transition matrix that describes a random walk process in the - given network. - - Args: - network: The network for which the transition matrix will be created. - weight: If specified, the numerical edge attribute that shall be used in the biased - transition probabilities of the random walk. - - """ - if weight is None or weight is False: - A = network.sparse_adj_matrix().todense() - elif weight is True: - A = network.sparse_adj_matrix(edge_attr="edge_weight").todense() - else: - A = network.sparse_adj_matrix(edge_attr=weight).todense() - D = A.sum(axis=1) - n = network.n - T = sp.sparse.lil_matrix((n, n)) - zero_deg = 0 - for i in range(n): - if D[i] == 0: - zero_deg += 1 - for j in range(n): - if D[i] > 0: - T[i, j] = restart_prob * (1.0 / n) + (1 - restart_prob) * A[i, j] / D[i] - else: - if restart_prob > 0: - T[i, j] = 1.0 / n - else: - T[i, j] = 0.0 - # if zero_deg > 0: - # LOG.warning( - # 'Network contains {0} nodes with zero out-degree'.format(zero_deg)) - return T.tocsr() - - @property - def transition_matrix(self) -> sp.sparse.csr_matrix: - """Returns the transition matrix of the random walk""" - return self._transition_matrix - - def transition_probabilities(self, node: str) -> np.array: - """Returns a vector that contains transition probabilities. - - Returns a vector that contains transition probabilities from a given - node to all other nodes in the network. - """ - return np.nan_to_num(np.ravel(self._transition_matrix[self._network.mapping.to_idx(node), :].todense())) - - def visitation_probabilities(self, t, seed: str) -> np.ndarray: - """Calculates visitation probabilities of nodes after t steps for a given start node - - Initially, all visitation probabilities are zero except for the start node. - """ - assert seed in self._network.nodes - - initial_dist = np.zeros(self._network.n) - initial_dist[self._network.mapping.to_idx(seed)] = 1.0 - return np.dot(initial_dist, (self._transition_matrix**t).todense()) - - def transition_matrix_pd(self) -> DataFrame: - """ - Returns the transition matrix as pandas DataFrame with proper row/column labels. - """ - return DataFrame( - self.transition_matrix.todense(), - columns=[v for v in self._network.nodes], - index=[v for v in self._network.nodes], - ) - - @property - def current_node(self) -> str: - return self._current_node - - def get_path(self, data: DataFrame, run_id: Optional[int] = 0, first_order: Optional[bool] = True) -> PathData: - """Returns a path that represents the sequence of (first-order) nodes traversed - by a single random walk. - - Args: - data: Pandas `DataFrame` containing the trajectory of one or more (higher-order) random walks, generated by a call of `run_experiment` - run_uid: Uid of the random walk simulation to be returns as Path (default: 0). - - Returns: - Path object containing the sequence of nodes traversed by the random walk - """ - # list of traversed nodes starting with seed node - walk_steps = list(data.loc[(data["run_id"] == run_id) & (data["state"] == True)]["node"].values) - - # generate Path - path = PathData(self._network.mapping) - path.append_walk([walk_steps[i] for i in range(len(walk_steps))]) - return path - - def get_paths(self, data: DataFrame, run_ids: Optional[Iterable] = None) -> PathData: - """Return a PathData object where each path is one random walk trajectory - - Args: - data: Pandas `DataFrame` containing the trajectory of one or more random walks, generated by `run_experiment` - run_ids: UIDs of random walk simulation runs to be included in the `PathData`. If None (default), all runs will be included. - """ - - if not run_ids: # generate paths for all run_ids in the data frame - runs = data["run_id"].unique() - else: - runs = run_ids - - walks = PathData(self._network.mapping) - for id in runs: - walk_steps = list(data.loc[(data["run_id"] == id) & (data["state"] == True)]["node"].values) - - # add walk to PathData - walks.append_walk(walk_steps) - - return walks - - def stationary_state(self, **kwargs: Any) -> np.array: - """Compute stationary visitation probabilities of random walk. - - Computes stationary visitation probabilities of nodes based on the - leading eigenvector of the transition matrix. - - Args: - kwargs: Arbitrary key-value pairs to bee passed to the - scipy.sparse.linalg.eigs function. - """ - _p = self._stationary_probabilities - if kwargs: - _, eigenvectors = sp.sparse.linalg.eigs(self._transition_matrix.transpose(), k=1, which="LM", **kwargs) - pi = eigenvectors.reshape( - eigenvectors.size, - ) - _p = np.real(pi / np.sum(pi)) - return _p - - @property - def visitation_frequencies(self) -> np.array: - """Returns current normalized visitation frequencies of nodes based on the history of - the random walk. Initially, all visitation probabilities are zero except for the start node. - """ - return np.nan_to_num(self._visitations / (self._t + 1)) - - @property - def total_variation_distance(self) -> float: - """Returns the total variation distance between stationary - visitation probabilities and the current visitation frequencies - - Computes the total variation distance between the current visitation - probabilities and the stationary probabilities. This quantity converges - to zero for RandomWalk.t -> np.infty and its magnitude indicates the - current relaxation of the random walk process. - """ - return self.TVD(self.stationary_state(), self.visitation_frequencies) - - @staticmethod - def TVD(a: np.array, b: np.array) -> float: - """Calculates the total variation distance between two probability vectors""" - return np.abs(a - b).sum() / 2.0 - - -class HigherOrderRandomWalk(RandomWalk): - """Class that implements a biased random walk process in a higher-order network. - - Instances of this class can be used to simulate random walk processes in higher-order networks for - arbitrary orders k. The random walk process can include weighted edges as well as a - restart probability, i.e. a per-step probability to teleport to a - randomly chosen higher-order node. - - Different from the class RandomWalk, instances of class HigherOrderRandomWalk automatically project states to the corresponding first-order network, i.e. paths and visualisations are given - in terms of the nodes in the first-order network, while the dynamics of the random walk is governed by the underlying higher-order network. - - The implementation follows the general concept to simulate discrete-time (stochastic) processes - as implemented in the base class BaseProcess. Hence, the user can either use the iterator interface - to iterate through the steps of a single random walk process, or use the `run_experiment` function - to simulate multiple runs of a random walk with different start nodes (i.e. seeds). - - The `run_experiment` function returns a pandas DataFrame object that contains all node state changes - during the process' evolution. This data frame can be converted to Path and PathCollection objects - and it can be visualized using the plot function. - - Examples: - Generate and visualize a single random walk with 10 steps on a higher-order network - - >>> import pathpy as pp - >>> g = pp.Graph.from_edge_list([['a','b'], ['b','c'], ['c','a'], ['c','d'], ['d','a']]) - >>> paths = pp.WalkData(g3.mapping) - >>> paths.add_walk_seq(['a','b','c'],freq=1) - >>> paths.add_walk_seq(['b','c','a'],freq=1) - >>> paths.add_walk_seq(['b','c','d'],freq=0.2) - >>> paths.add_walk_seq(['c','a','b'],freq=1) - >>> paths.add_walk_seq(['c','d','a'],freq=0.2) - >>> paths.add_walk_seq(['d','a','b'],freq=1) - >>> g_ho = pp.HigherOrderGraph(paths, order =2) - - >>> rw = pp.processes.HigherOrderRandomWalk(g_ho, weight=True) - >>> data = rw.run_experiment(steps=10, runs=[('b','c')]) - >>> rw.plot(data) - [interactive visualization in first-order network] - - Use `plot` function of base class to visualize random walk in second-order network - - >>> pp.processes.RandomWalk.plot(rw, data) - [interactive visualization in second-order network] - - Generate a single random walk with 10 steps starting from node 'b-c' and - return a first-order path - - >>> p = rw.get_path(rw.run_experiment(steps=10, runs=['b-c'])) - >>> pprint([v.uid for v in p.nodes ]) - [ 'a', 'b', 'c', 'a', 'a', 'b', 'c', 'd', 'a', 'b'] - - Use `get_path` function of base class to return path with second-order nodes - - >>> p = pp.processes.RandomWalk.get_path(rw2, data) - >>> print([ v.uid for v in p.nodes ]) - - Generate one random walk with 10 steps starting from each node and - return a WalkData instance with first-order paths - - >>> paths = rw.get_paths(rw.run_experiment(steps=10, runs=g_ho.nodes)) - >>> pprint([v.uid for v in p.nodes ]) - [ 'a', 'b', 'c', 'a', 'a', 'b', 'c', 'd', 'a', 'b'] - [ 'd', 'a', 'b', 'c', 'd', 'a', 'b', 'c', 'a', 'b', 'c' ] - [ 'c', 'a', 'b', 'c', 'a', 'b', 'c', 'd', 'a', 'b', 'c' ] - [ 'b', 'c', 'a', 'b', 'c', 'd', 'a', 'b', 'c', 'a', 'b' ] - - Simulate a random walk using the iterator interface, which provides full access - to the state after each simulation step - - >>> for time, _ in rw2.simulation_run(steps=50, seed='b-c'): - >>> print('Current node = {0}'.format(rw2.first_order_node(rw2.current_node))) - >>> print(rw2._first_order_visitation_frequencies) - Current node = b - [0.33333333 0.33333333 0.33333333 0. ] - Current node = c - [0.32142857 0.32142857 0.35714286 0. ] - Current node = a - [0.34482759 0.31034483 0.34482759 0. ] - Current node = b - [0.33333333 0.33333333 0.33333333 0. ] - Current node = c - [0.32258065 0.32258065 0.35483871 0. ] - Current node = a - """ - - def __init__( - self, higher_order_network: Graph, first_order_network, weight: Optional[Weight] = None, restart_prob: float = 0 - ) -> None: - """Creates a biased random walk process in a network. - - Args: - higher_order_network: The higher-order network instance on which to perform the random walk process. - first_order_network: The first-order network instance to be used for mapping the process to first-order nodes - weight: If specified, the given numerical edge attribute will be used to bias - the random walk transition probabilities. - restart_probability: The per-step probability that a random walker restarts in a random (higher-order) node - """ - self._first_order_network = first_order_network - RandomWalk.__init__(self, higher_order_network, weight, restart_prob) - - def init(self, seed) -> None: - - # set number of times each first-order node has been visited - self._first_order_visitations = np.ravel(np.zeros(shape=(1, self._first_order_network.n))) - self._first_order_visitations[self._first_order_network.mapping.to_idx(seed[-1])] = 1 - RandomWalk.init(self, seed) - - @property - def first_order_visitation_frequencies(self) -> np.array: - """Returns current normalized visitation frequencies of first-order nodes based on the history of - the higher-order random walk. Initially, all visitation probabilities are zero except for the last node of the higher-order seed node. - """ - return np.nan_to_num(self._first_order_visitations / (self._t + 1)) - - def first_order_stationary_state(self, **kwargs) -> np.array: - """Returns current normalized visitation frequencies of first-order nodes based on the history of - the higher-order random walk. Initially, all visitation probabilities are zero except for the last node of the higher-order seed node. - """ - first_order_stationary_state = np.ravel(np.zeros(shape=(1, self._first_order_network.n))) - higher_order_stationary_dist = RandomWalk.stationary_state(self, **kwargs) - for v in self._network.nodes: - # newly visited node in first_order network - v1 = v.relations[-1] - first_order_stationary_state[self._first_order_network.mapping.to_idx[v1]] += higher_order_stationary_dist[ - self._network.mapping.to_idx[v] - ] - return first_order_stationary_state - - @property - def first_order_total_variation_distance(self) -> float: - """Returns the total variation distance between stationary - visitation probabilities and the current visitation frequencies, projected - to nodes in the first_order_network. - - Computes the total variation distance between the current (first-order) node visitation - probabilities and the (first-order) stationary node visitation probabilities. This quantity converges to zero for HigherOrderRandomWalk.time -> np.infty and its magnitude indicates the - current relaxation of the higher-order random walk process. - """ - return self.TVD(self.first_order_stationary_state(), self.first_order_visitation_frequencies) - - def first_order_node(self, higher_order_node: tuple) -> str: - """ - Maps a given uid of a node in the higher-order network to the uid of the corresponding first-order node. - - Args: - higher_order_node: Tuple that represents the higher-order node - - Returns: - String of the corresponding first-order node - """ - return higher_order_node[-1] - - def step(self) -> Iterable[str]: - """ - Function that will be called for each step of the random walk. This function - returns a tuple, where the first entry is the uids of the currently visited higher-order node and the second entry is the uid of the previously visited higher-order node. - - Use the `first_order_node` function to map those nodes to nodes in the first-order network - """ - (current_node, previous_node) = RandomWalk.step(self) - - self._first_order_visitations[self._first_order_network.mapping.to_idx(current_node[-1])] += 1 - - return (current_node, previous_node) - - def get_paths(self, data: DataFrame, run_ids: Optional[Iterable] = 0) -> PathData: - """Returns paths that represent the sequences of (first-order) nodes traversed by random walks with given run ids. - - Args: - data: Pandas data frame containing the trajectory of one or more (higher-order) random walks, generated by a call of `run_experiment` - run_uid: Uid of the random walk simulations to be returned as WalkData (default: 0). - - Returns: - WalkData object containing the sequences of nodes traversed by the random walks - """ - # list of traversed nodes starting with seed node - - if not run_ids: # generate paths for all run_ids in the data frame - runs = data["run_id"].unique() - else: - runs = run_ids - - paths = PathData(mapping=self._first_order_network.mapping) - for run in runs: - walk_steps = list(data.loc[(data["run_id"] == run) & (data["state"] == True)]["node"].values) - - # for higher-order random walk, seed node is a higher-order node - # consisting of one or more edges - seed = walk_steps[0] - walk = [v for v in seed] - - # map higher-order nodes to first-order nodes - for i in range(1, len(walk_steps)): - walk.append(walk_steps[i][-1]) - paths.append_walk(walk) - return paths diff --git a/src/pathpyG/processes/sampling.py b/src/pathpyG/processes/sampling.py deleted file mode 100644 index 79f8f3efb..000000000 --- a/src/pathpyG/processes/sampling.py +++ /dev/null @@ -1,96 +0,0 @@ -"""Classes for efficient random sampling from discrete distributions -""" - -# !/usr/bin/python -tt -# -*- coding: utf-8 -*- -# ============================================================================= -# File : vose_sampling.py -- Class to sample from discrete distributions -# Author : Ingo Scholtes -# Time-stamp: -# -# Copyright (c) 2016-2021 Pathpy Developers -# ============================================================================= -from __future__ import annotations -from typing import Union - -import numpy as np - - -class VoseAliasSampling: - """ - Implementation of fast biased sampling of discrete values [0, ..., n] - - For a concise explanation see https://www.keithschwarz.com/darts-dice-coins/ - - Args: - weights: relative weights of the n events, where weights[i] is the relative - statistical weight of event i. The weights do not need to be - normalized. - - For an array with length n, generated random values - will be from range(n). - - Examples: - Create a VoseAliasSampling instance - - >>> from pathpy.processes import VoseAliasSampling - >>> sampler = VoseAliasSampling([1,1,2]) - - Fast biased sampling in O(1) - - >>> [ sampler.sample() for i in range(10) ] - [ 0 2 0 1 2 1 2 1 2 0 2 2 ] - """ - - def __init__(self, weights: Union[np.array, list]) -> None: - """ - Initializes probability and alias tables - """ - self.n = len(weights) - self.probs = dict() - self.scaled_probs = dict() - self.aliases = dict() - - small = list() - large = list() - - for i in range(1, self.n + 1): - self.probs[i] = weights[i - 1] - self.scaled_probs[i] = self.n * weights[i - 1] - if self.scaled_probs[i] > 1: - large.append(i) - elif self.scaled_probs[i] <= 1: - small.append(i) - - while small and large: - l = small.pop() - g = large.pop() - - self.probs[l] = self.scaled_probs[l] - self.aliases[l] = g - self.scaled_probs[g] = self.scaled_probs[l] + self.scaled_probs[g] - 1 - - if self.scaled_probs[g] < 1: - small.append(g) - else: - large.append(g) - while large: - g = large.pop() - self.probs[g] = 1 - while small: - l = small.pop() - self.probs[l] = 1 - - def sample(self) -> int: - """ - Biased sampling of discrete value in O(1) - - Returns: integer value from range(n), where n is the length - of the weight array used to create the instance. - """ - i = np.random.randint(1, self.n + 1) - x = np.random.rand() - if x < self.probs[i]: - return i - 1 - else: - return self.aliases[i] - 1 diff --git a/tests/processes/__init__.py b/tests/processes/__init__.py deleted file mode 100644 index 7554407aa..000000000 --- a/tests/processes/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Necessary to make Python treat the tests directory as a module. - -This is required since mypy doesn't support the same file name otherwise -It is also required to enable module specific overrides in pyproject.toml -""" diff --git a/tests/processes/conftest.py b/tests/processes/conftest.py deleted file mode 100644 index deccd6e1e..000000000 --- a/tests/processes/conftest.py +++ /dev/null @@ -1,52 +0,0 @@ -from __future__ import annotations -from typing import TYPE_CHECKING, Tuple - -import pytest -import torch - -from pathpyG.core.graph import Graph -from pathpyG.core.path_data import PathData -from pathpyG.core.multi_order_model import MultiOrderModel - - -@pytest.fixture -def simple_graph() -> Graph: - """Return a simple example for a graph with a ring topology.""" - return Graph.from_edge_list( - [ - ("a", "b"), - ("b", "c"), - ("c", "d"), - ("d", "e"), - ("e", "f"), - ("f", "g"), - ("g", "h"), - ("h", "i"), - ("i", "j"), - ("j", "k"), - ("k", "l"), - ("l", "m"), - ("m", "n"), - ("n", "o"), - ("o", "a"), - ] - ) - - -@pytest.fixture -def simple_second_order_graph() -> Tuple[Graph, Graph]: - """Return a simple second-order graph.""" - g = Graph.from_edge_list([["a", "b"], ["b", "c"], ["c", "a"], ["c", "d"], ["d", "a"]]) - - g.data["edge_weight"] = torch.tensor([[1], [1], [2], [1], [1]]) - - paths = PathData(g.mapping) - paths.append_walk(["a", "b", "c"], weight=1) - paths.append_walk(["b", "c", "a"], weight=1) - paths.append_walk(["b", "c", "d"], weight=0.2) - paths.append_walk(["c", "a", "b"], weight=1) - paths.append_walk(["c", "d", "a"], weight=0.2) - paths.append_walk(["d", "a", "b"], weight=1) - - m = MultiOrderModel.from_path_data(paths, max_order=2) - return (g, m.layers[2]) diff --git a/tests/processes/test_random_walk.py b/tests/processes/test_random_walk.py deleted file mode 100644 index cde6276f1..000000000 --- a/tests/processes/test_random_walk.py +++ /dev/null @@ -1,52 +0,0 @@ -from __future__ import annotations -from typing import TYPE_CHECKING, Tuple - -from torch import IntTensor, equal, tensor - -from pathpyG import config -from pathpyG.processes.random_walk import RandomWalk, HigherOrderRandomWalk -from pathpyG.core.path_data import PathData -from pathpyG.core.graph import Graph -from pathpyG.core.multi_order_model import MultiOrderModel - - -def check_transitions(g, paths: PathData): - for i in range(paths.num_paths): - w = paths.get_walk(i) - for j in range(len(w) - 1): - assert g.is_edge(w[j], w[j + 1]) - - -def test_random_walk(simple_graph): - rw = RandomWalk(simple_graph) - - steps = 20 - data = rw.run_experiment(steps=steps, runs=[v for v in simple_graph.nodes]) - - assert len(data) == simple_graph.n * steps * 2 + simple_graph.n * simple_graph.n - - # make sure that all transitions correspond to edges - paths = rw.get_paths(data) - check_transitions(simple_graph, paths) - - -def test_transition_matrix(simple_graph): - rw = RandomWalk(simple_graph) - - assert (rw.transition_matrix.data == 1.0).all() - assert rw.transition_probabilities("a")[1] == 1.0 - - -def test_higher_order_random_walk(simple_second_order_graph: Tuple[Graph, Graph]): - g = simple_second_order_graph[0] - g2 = simple_second_order_graph[1] - print(g2.mapping) - rw = HigherOrderRandomWalk(g2, g, weight=True) - steps = 100 - data = rw.run_experiment(steps=steps, runs=g2.nodes) - - assert len(data) == g2.n * steps * 2 + g2.n * g2.n - paths = rw.get_paths(data) - check_transitions(g, paths) - - # rw.first_order_stationary_state()