Source code for VeraGridEngine.Utils.ThirdParty.SyntheticNetworks.rpgm_algo

__author__ = "Paul Schultz"
__date__ = "Jul 11, 2015"
__version__ = "v2.1"

# This file is based on the network creation algorithm published in:
#
# A Random Growth Model for Power Grids and Other Spatially Embedded Infrastructure Networks
# Paul Schultz, Jobst Heitzig, and Juergen Kurths
# Eur. Phys. J. Special Topics on "Resilient power grids and extreme events" (2014)
# DOI: 10.1140/epjst/e2014-02279-6
#

import numpy as np
import networkx as nx
import itertools
from scipy.sparse import dok_matrix


[docs] class RpgAlgorithm: """ RpgAlgorithm """ __slots__ = ( "n", "n0", "p", "q", "r", "s", "lon", "lat", "distance", "added_nodes", "added_edges", "distance_measure", "sampling", "low_memory", "debug", "init_edges", "adjacency", "dGmatrix", "mst_edges", ) def __init__(self): # parameters for the algorithm self.n = 20 self.n0 = 10 self.p = 0.2 self.q = 0.3 self.r = 1. / 3. self.s = 0.1 # node coordinates self.lon = [] self.lat = [] self.distance = {} self.added_nodes = 0 self.added_edges = 0 # CHANGE WITH CAUTION! self.distance_measure = "euclidean" self.sampling = "uniform" self.low_memory = True self.debug = False self.init_edges = set() def __str__(self): print("----------") for attr in ("identifier", "added_nodes", "n", "n0", "p", "q", "r", "s"): if hasattr(self, attr): print("{} : {}".format(attr, str(getattr(self, attr)))) return "----------" ############################################################################### # ## PUBLIC FUNCTIONS ## # ###############################################################################
[docs] def set_params(self, **kwargs): for key in kwargs: if not hasattr(self, key): print("ERROR: There is no parameter called: {}".format(key)) print("Possible choices: n,n0,p,q,r,s") continue else: if self._validation(key, kwargs[key]): setattr(self, key, kwargs[key]) else: print("ERROR: invalid parameter value for {}".format(key))
[docs] def initialise(self): assert self.n >= self.n0 # keep track of added nodes self.added_nodes = 0 # step I1: draw random locations from rho and add nodes ####################################################### self._add_random_locations(self.n0) self.added_nodes += self.n0 # step I2: construct minimum spanning tree ########################################## self._initial_mst() edge_mask = set([self._s(key) for key in self.adjacency.keys()]) # edge_mask = set(edge_mask) if self.debug: print("I2", edge_mask) # step I3: add redundant links ############################## m = min(int(np.floor(self.n0 * (1 - self.s) * (self.p + self.q))), self.n0 * (self.n0 - 1) / 2 - (self.n0 - 1)) candidates = dict() for (u, v) in self.distance.keys(): if not (u, v) in set(edge_mask): candidates[(u, v)] = self.distance[(u, v)] if self.r > 0: dGmatrix = self._get_graph_distances() onesquare = np.ones([self.n0, self.n0]) for k in range(m): if self.r > 0: for (u, v) in candidates.keys(): candidates[(u, v)] = self.distance[(u, v)] / (1. + dGmatrix[u, v])**self.r a, b = min(candidates, key=candidates.get) self.adjacency[a, b] = self.adjacency[b, a] = 1 # make sure i,j is not selected again: candidates.pop((a, b), None) if self.r > 0: dGmatrix = np.minimum(np.minimum(dGmatrix, dGmatrix[:, [a]] + onesquare + dGmatrix[[b], :]), dGmatrix[:, [b]] + onesquare + dGmatrix[[a], :]) if self.debug: print("I3", (a, b)) self.added_edges += m assert self.added_edges == (len(self.adjacency.keys()) / 2) # label initial edges self.init_edges = sorted(set([self._s(key) for key in self.adjacency.keys()])) if self.debug and self.r > 0: assert ((dGmatrix - self._get_graph_distances())**2).sum() == 0 # check that update went well
[docs] def grow(self): self._add_random_locations(self.n - self.n0) self.adjacency._shape = (self.n, self.n) if self.r > 0: self.dGmatrix = self._get_graph_distances() self.dGmatrix = np.concatenate([np.concatenate([self.dGmatrix, np.zeros((self.n - self.n0, self.n0))],axis=0), np.zeros((self.n, self.n - self.n0))],axis=1) # connect new vertices for i in range(self.n0, self.n): self.added_nodes += 1 self._growth_step(i) # TODO: this is probably redundant assert self.added_nodes == self.n if self.debug and self.r > 0: assert ((self.dGmatrix - self._get_graph_distances())**2).sum() == 0 # check that update went well if self.r > 0: delattr(self, "dGmatrix")
@property def edges(self): return list(set([self._s(key) for key in self.adjacency.keys()])) ############################################################################### # ## PRIVATE FUNCTIONS ## # ############################################################################### def _get_coords(self): if self.sampling == "uniform": return self._uniformunitsquare() else: raise NotImplementedError() def _get_distance(self, u, v): if self.distance_measure == "euclidean": return self._euclidean(int(u), int(v)) else: raise NotImplementedError() def _update_distance(self): N = len(self.lat) for v in range(N): for u in range(v): self.distance[(u, v)] = self._get_distance(u, v) @staticmethod def _uniformunitsquare(): """ return point drawn uniformly at random from unit square -> 2D coordinates """ return np.random.uniform(size=2) def _euclidean(self, u, v): """ return euclidean distance between x and y """ x = np.array([self.lat[u], self.lon[u]]) y = np.array([self.lat[v], self.lon[v]]) return np.sqrt(sum((x-y)**2)) def _growth_step(self, node): if self.debug: print("---------") print("adding node {}".format(node)) # step G5: split random link at midpoint ######################################## if (np.random.random() < self.s) and len(self.adjacency) > 1: # choose link at random: elist = sorted(set([self._s(key) for key in self.adjacency.keys()])) ei = np.random.randint(len(elist)) e = elist[ei] a, b = e[0], e[1] # add node at midpoint and calc distances: self.lat[node] = (self.lat[a] + self.lat[b]) / 2. self.lon[node] = (self.lon[a] + self.lon[b]) / 2. if not self.low_memory: self._update_distance() self.adjacency[a, b] = self.adjacency[b, a] = 0 self.adjacency[a, node] = self.adjacency[node, a] = 1 self.adjacency[b, node] = self.adjacency[node, b] = 1 if self.r > 0: self.dGmatrix[:(node + 1), :(node + 1)] = self._get_graph_distances() self.added_edges += 1 if self.debug: print("G5", (int(a), int(b))) #TODO: make shure (a, node) and (b, node) are not selected again? else: # step G2: link to nearest ########################## if node == 1: target = 0 else: target = self._get_closest_connected_node(node, node - 1) self.adjacency[node, target] = self.adjacency[target, node] = 1 if self.r > 0: # adjust network distances: #self.dGmatrix[:(node + 1), :(node + 1)] = self._get_graph_distances() self.dGmatrix[node, :self.added_nodes] = self.dGmatrix[target, :self.added_nodes] + 1 self.dGmatrix[:self.added_nodes, node] = self.dGmatrix[:self.added_nodes, target] + 1 self.dGmatrix[node, node] = 0 self.added_edges += 1 if self.debug: print("G2", (node, target)) # step G3: add optimal redundant link to node ############################################# if np.random.random() < self.p: candidates = {} for v in range(node - 1): if self.adjacency[v, node] == 0: candidates[(v, node)] = self._get_distance(v, node) if self.low_memory else self.distance[(v, node)] # there might be no candidates if n0 = 1 if len(candidates) > 0: if self.r > 0: #dGmatrix = self._get_graph_distances() for (u, v) in candidates.keys(): candidates[(u, v)] /= ( 1. + self.dGmatrix[u, v])**self.r a, b = min(candidates, key=candidates.get) self.adjacency[a, b] = self.adjacency[b, a] = 1 if self.r > 0: if a == node: target = b else: target = a # adjust network distances: self.dGmatrix = np.minimum( np.minimum(self.dGmatrix, self.dGmatrix[:, [node]] + np.ones([self.n, self.n]) + self.dGmatrix[[target],:] ), self.dGmatrix[:,[target]] + np.ones([self.n, self.n]) + self.dGmatrix[[node], :] ) self.added_edges += 1 if self.debug: print("G3", (a, b)) # step G4: add another optimal redundant link to random node ############################################################ if np.random.random() < self.q: i2 = np.random.randint(node) candidates = {} for v in range(node): if v == i2: continue if self.adjacency[v, i2] == 0: candidates[self._s((v, i2))] = self._get_distance(v, i2) if self.low_memory else self.distance[self._s((v, i2))] # there might be no candidates if n0 = 1 if len(candidates) > 0: if self.r > 0: #dGmatrix = self._get_graph_distances() for (u, v) in candidates.keys(): candidates[(u, v)] /= ( 1. + self.dGmatrix[u, v])**self.r a, b = min(candidates, key=candidates.get) self.adjacency[a, b] = self.adjacency[b, a] = 1 if self.r > 0: if a == i2: target = b else: target = a # adjust network distances: self.dGmatrix = np.minimum( np.minimum(self.dGmatrix, self.dGmatrix[:, [i2]] + np.ones([self.n, self.n]) + self.dGmatrix[[target], :] ), self.dGmatrix[:, [target]] + np.ones([self.n, self.n]) + self.dGmatrix[[i2], :] ) self.added_edges += 1 if self.debug: print("G4", i2, (a, b)) if self.debug and self.r > 0: # check that update went well assert ((self.dGmatrix[:(node + 1), :(node + 1)] - self._get_graph_distances())**2).sum() == 0 @staticmethod def _validation(attr, value): """ :param attr: :param value: :return: """ if attr == "n0" or attr == "n": if value < 1: return False else: return True elif attr == "r": if value < 0: return False else: return True else: if value < 0 or value > 1: return False else: return True def _initial_mst(self): adjacency = np.zeros([self.n0, self.n0]) np.fill_diagonal(adjacency, 0) self.mst_edges = self._get_mst() for edge in self.mst_edges: adjacency[edge[0], edge[1]] = adjacency[edge[1], edge[0]] = 1 self.added_edges += 1 self.adjacency = dok_matrix(adjacency) def _get_mst(self): # full_graph = Graph.Full(self.n0) # factor = 1e5 # since small weights lead to MST problem # weights = [factor * self.distance[self._s((edge.source,edge.target))] for edge in full_graph.es] # G = full_graph.spanning_tree(weights).as_undirected() # return G.get_edgelist() g = nx.complete_graph(self.n0) factor = 1e5 for u, v in g.edges(): g[u][v]['weight'] = factor * self.distance[self._s((u, v))] nx.minimum_spanning_edges(g) return list(nx.minimum_spanning_edges(g)) def _get_graph_distances(self): elist = sorted(set([self._s(key) for key in self.adjacency.keys()])) # old code # G = Graph(self.added_nodes) # G.add_edges(elist) # return np.array(G.shortest_paths()) g = nx.Graph(elist) p = dict(nx.shortest_path_length(g)) n = len(p) pairs = np.empty((n, n), dtype=int) for i, j in itertools.product(range(n), range(n)): pairs[i, j] = p[i][j] return pairs def _add_random_locations(self, _m): m = int(_m) if m < 1: raise ValueError("You have to add a positive integer number of nodes.") else: for i in range(m): pos = self._get_coords() self.lat.append(pos[0]) self.lon.append(pos[1]) self._update_distance() def _get_closest_connected_node(self, source, connected): # vertices need to be properly ordered for this to work, i.e. nodes in the connected component # should be labeled from 0 to connected-1 min_ = np.inf target = source for node in range(connected): if source == node: # should actually not happen continue elif self.adjacency[source, node] == 0: d = self._get_distance(node, source) if self.low_memory else self.distance[(node, source)] if d < min_: min_ = d target = node return target @staticmethod def _s(tuple_: tuple) -> tuple: """ :param tuple_: :return: """ if tuple_[0] < tuple_[1]: return tuple_ else: return tuple_[1], tuple_[0]
####################################################################################################################### ####################################################################################################################### #######################################################################################################################
[docs] def main(): from matplotlib import pyplot as plt # initialise algorithm g = RpgAlgorithm() # for detailed output set g.debug = False # set desired parameters and perform algorithm g.set_params(n=100, n0=10, r=1./3.) g.initialise() g.grow() print(g) G = nx.Graph(g.edges) pos = {i: (g.lat[i], g.lon[i]) for i in range(g.added_nodes)} nx.draw(G, pos=pos, with_labels=True, node_color='lightblue') plt.show()
if __name__ == "__main__": main()