# -*- coding: utf-8 -*- """ .. _layouts: layouts.py ========== Layouts are classes that provide graph drawing algorithms. These classes all take a :class:`graph_core` argument. The graph topology will never be permanently modified by the drawing algorithm: e.g. "dummy" node insertion, edge reversal for making the graph acyclic and so on, are all kept inside the layout object. """ # This code is part of Grandalf # Copyright (C) 2010-2012 Axel Tillequin (bdcht3@gmail.com) # published under the GPLv2 license or EPLv1 license import importlib from bisect import bisect from sys import getrecursionlimit, setrecursionlimit from mantis.grandalf.utils import * # ------------------------------------------------------------------------------ class VertexViewer(object): """ The VertexViewer class is used as the default provider of Vertex dimensions (w,h) and position (xy). In most cases it should be replaced by *view* instances associated with a ui widgets library, allowing to get dimensions and set position directly on the widget. """ def __init__(self, w=2, h=2, data=None): self.w = w self.h = h self.data = data self.xy = None def __str__(self, *args, **kwargs): return "VertexViewer (xy: %s) w: %s h: %s" % (self.xy, self.w, self.h) # ------------------------------------------------------------------------------ class _sugiyama_vertex_attr(object): """ The sugiyama layout adds new attributes to vertices. These attributes are stored in an internal _sugimyama_vertex_attr object. Attributes: rank (int): the rank number is the index of the layer that contains this vertex. dummy (0/1): a flag indicating if the vertex is *dummy* pos (int): the index of the vertex in the layer x (list(float)): the list of computed horizontal coordinates of the vertex bar (float): the current *barycenter* of the vertex """ def __init__(self, r=None, d=0): self.rank = r self.dummy = d self.pos = None self.x = 0 self.bar = None def __str__(self): s = "(%3d,%3d) x=%s" % (self.rank, self.pos, str(self.x)) if self.dummy: s = "[d] %s" % s return s # def __eq__(self,x): # return self.bar == x.bar # def __ne__(self,x): # return self.bar != x.bar # def __lt__(self,x): # return self.bar < x.bar # def __le__(self,x): # return self.bar <= x.bar # def __gt__(self,x): # return self.bar > x.bar # def __ge__(self,x): # return self.bar >= x.bar # ------------------------------------------------------------------------------ class DummyVertex(_sugiyama_vertex_attr): """ The DummyVertex class is used by the sugiyama layout to represent *long* edges, i.e. edges that span over several ranks. For these edges, a DummyVertex is inserted in every inner layer. Attributes: view (viewclass): since a DummyVertex is acting as a Vertex, it must have a view. ctrl (list[_sugiyama_attr]): the list of associated dummy vertices Methods: N(dir): reflect the Vertex method and returns the list of adjacent vertices (possibly dummy) in the given direction. inner(dir): return True if a neighbor in the given direction is *dummy*. """ def __init__(self, r=None, viewclass=VertexViewer): self.view = viewclass() self.ctrl = None super().__init__(r, d=1) def N(self, dir): assert dir == +1 or dir == -1 v = self.ctrl.get(self.rank + dir, None) return [v] if v is not None else [] def inner(self, dir): assert dir == +1 or dir == -1 try: return any([x.dummy == 1 for x in self.N(dir)]) except KeyError: return False except AttributeError: return False def __str__(self): s = "(%3d,%3d) x=%s" % (self.rank, self.pos, str(self.x)) if self.dummy: s = "[d] %s" % s return s # ------------------------------------------------------------------------------ class Layer(list): """ Layer is where Sugiyama layout organises vertices in hierarchical lists. The placement of a vertex is done by the Sugiyama class, but it highly relies on the *ordering* of vertices in each layer to reduce crossings. This ordering depends on the neighbors found in the upper or lower layers. Attributes: layout (SugiyamaLayout): a reference to the sugiyama layout instance that contains this layer upper (Layer): a reference to the *upper* layer (rank-1) lower (Layer): a reference to the *lower* layer (rank+1) ccount (int) : number of crossings detected in this layer Methods: setup (layout): set initial attributes values from provided layout nextlayer(): returns *next* layer in the current layout's direction parameter. prevlayer(): returns *previous* layer in the current layout's direction parameter. order(): compute *optimal* ordering of vertices within the layer. """ __r = None layout = None upper = None lower = None __x = 1.0 ccount = None def __eq__(self, other): return super().__eq__(other) def __str__(self): s = " 1: self.__x = 1.0 / (len(self) - 1) for i, v in enumerate(self): assert layout.grx[v].rank == r layout.grx[v].pos = i layout.grx[v].bar = i * self.__x if r > 0: self.upper = layout.layers[r - 1] if r < len(layout.layers) - 1: self.lower = layout.layers[r + 1] def nextlayer(self): return self.lower if self.layout.dirv == -1 else self.upper def prevlayer(self): return self.lower if self.layout.dirv == +1 else self.upper def order(self): sug = self.layout sug._edge_inverter() c = self._cc() if c > 0: for v in self: sug.grx[v].bar = self._meanvalueattr(v) # now resort layers l according to bar value: self.sort(key=lambda x: sug.grx[x].bar) # reduce & count crossings: c = self._ordering_reduce_crossings() # assign new position in layer l: for i, v in enumerate(self): sug.grx[v].pos = i sug.grx[v].bar = i * self.__x sug._edge_inverter() self.ccount = c return c def _meanvalueattr(self, v): """ find new position of vertex v according to adjacency in prevlayer. position is given by the mean value of adjacent positions. experiments show that meanvalue heuristic performs better than median. """ sug = self.layout if not self.prevlayer(): return sug.grx[v].bar bars = [sug.grx[x].bar for x in self._neighbors(v)] return sug.grx[v].bar if len(bars) == 0 else float(sum(bars)) / len(bars) def _medianindex(self, v): """ find new position of vertex v according to adjacency in layer l+dir. position is given by the median value of adjacent positions. median heuristic is proven to achieve at most 3 times the minimum of crossings (while barycenter achieve in theory the order of |V|) """ assert self.prevlayer() != None N = self._neighbors(v) g = self.layout.grx pos = [g[x].pos for x in N] lp = len(pos) if lp == 0: return [] pos.sort() pos = pos[:: self.layout.dirh] i, j = divmod(lp - 1, 2) return [pos[i]] if j == 0 else [pos[i], pos[i + j]] def _neighbors(self, v): """ neighbors refer to upper/lower adjacent nodes. Note that v.N() provides neighbors of v in the graph, while this method provides the Vertex and DummyVertex adjacent to v in the upper or lower layer (depending on layout.dirv state). """ assert self.layout.dag dirv = self.layout.dirv grxv = self.layout.grx[v] try: # (cache) return grxv.nvs[dirv] except AttributeError: grxv.nvs = {-1: v.N(-1), +1: v.N(+1)} if grxv.dummy: return grxv.nvs[dirv] # v is real, v.N are graph neigbors but we need layers neighbors for d in (-1, +1): tr = grxv.rank + d for i, x in enumerate(v.N(d)): if self.layout.grx[x].rank == tr: continue e = v.e_with(x) dum = self.layout.ctrls[e][tr] grxv.nvs[d][i] = dum return grxv.nvs[dirv] def _crossings(self): """ counts (inefficently but at least accurately) the number of crossing edges between layer l and l+dirv. P[i][j] counts the number of crossings from j-th edge of vertex i. The total count of crossings is the sum of flattened P: x = sum(sum(P,[])) """ g = self.layout.grx P = [] for v in self: P.append([g[x].pos for x in self._neighbors(v)]) for i, p in enumerate(P): candidates = sum(P[i + 1 :], []) for j, e in enumerate(p): p[j] = len(filter((lambda nx: nx < e), candidates)) del candidates return P def _cc(self): """ implementation of the efficient bilayer cross counting by insert-sort (see Barth & Mutzel paper "Simple and Efficient Bilayer Cross Counting") """ g = self.layout.grx P = [] for v in self: P.extend(sorted([g[x].pos for x in self._neighbors(v)])) # count inversions in P: s = [] count = 0 for i, p in enumerate(P): j = bisect(s, p) if j < i: count += i - j s.insert(j, p) return count def _ordering_reduce_crossings(self): assert self.layout.dag g = self.layout.grx N = len(self) X = 0 for i, j in zip(range(N - 1), range(1, N)): vi = self[i] vj = self[j] ni = [g[v].bar for v in self._neighbors(vi)] Xij = Xji = 0 for nj in [g[v].bar for v in self._neighbors(vj)]: x = len([nx for nx in ni if nx > nj]) Xij += x Xji += len(ni) - x if Xji < Xij: self[i] = vj self[j] = vi X += Xji else: X += Xij return X # ------------------------------------------------------------------------------ class SugiyamaLayout(object): """ The Sugiyama layout is the traditional "layered" graph layout called *dot* in graphviz. This layout is quite efficient but heavily relies on drawing heuristics. Adaptive drawing is limited to extending the leaves only, but since the algorithm is quite fast redrawing the entire graph (up to about a thousand nodes) gives usually good results in less than a second. The Sugiyama Layout Class takes as input a core_graph object and implements an efficient drawing algorithm based on nodes dimensions provided through a user-defined *view* property in each vertex. Attributes: dirvh (int): the current aligment state for alignment policy: dirvh=0 -> dirh=+1, dirv=-1: leftmost upper dirvh=1 -> dirh=-1, dirv=-1: rightmost upper dirvh=2 -> dirh=+1, dirv=+1: leftmost lower dirvh=3 -> dirh=-1, dirv=+1: rightmost lower order_inter (int): the default number of layer placement iterations order_attr (str): set attribute name used for layer ordering xspace (int): horizontal space between vertices in a layer yspace (int): vertical space between layers dw (int): default width of a vertex dh (int): default height of a vertex g (graph_core): the graph component reference layers (list[Layer]): the list of layers grx (dict): associate vertex (possibly dummy) with their sugiyama attributes ctrls (dict): associate edge with all its vertices (including dummies) dag (bool): the current acyclic state initdone (bool): True if state is initialized (see init_all). """ def __init__(self, g): from mantis.grandalf.utils.geometry import median_wh # drawing parameters: self.dirvh = 0 self.order_iter = 8 self.order_attr = "pos" self.xspace = 20 self.yspace = 20 self.dw = 10 self.dh = 10 # For layered graphs, vertices and edges need to have some additional # attributes that make sense only for this kind of layout: # update graph struct: self.g = g self.layers = [] self.grx = {} self.ctrls = {} self.dag = False for v in self.g.V(): assert hasattr(v, "view") self.grx[v] = _sugiyama_vertex_attr() self.dw, self.dh = median_wh([v.view for v in self.g.V()]) self.initdone = False def init_all(self, roots=None, inverted_edges=None, optimize=False): """initializes the layout algorithm by computing roots (unless provided), inverted edges (unless provided), vertices ranks and creates all dummy vertices and layers. Parameters: roots (list[Vertex]): set *root* vertices (layer 0) inverted_edges (list[Edge]): set edges to invert to have a DAG. optimize (bool): optimize ranking if True (default False) """ if self.initdone: return # For layered sugiyama algorithm, the input graph must be acyclic, # so we must provide a list of root nodes and a list of inverted edges. if roots is None: roots = [v for v in self.g.sV if len(v.e_in()) == 0] if inverted_edges is None: _ = self.g.get_scs_with_feedback(roots) inverted_edges = [x for x in self.g.sE if x.feedback] self.alt_e = inverted_edges # assign rank to all vertices: self.rank_all(roots, optimize) # add dummy vertex/edge for 'long' edges: for e in self.g.E(): self.setdummies(e) # precompute some layers values: for l in self.layers: l.setup(self) self.initdone = True def draw(self, N=1.5): """compute every node coordinates after converging to optimal ordering by N rounds, and finally perform the edge routing. """ while N > 0.5: for (l, mvmt) in self.ordering_step(): pass N = N - 1 if N > 0: for (l, mvmt) in self.ordering_step(oneway=True): pass self.setxy() self.draw_edges() def _edge_inverter(self): for e in self.alt_e: x, y = e.v e.v = (y, x) self.dag = not self.dag if self.dag: for e in self.g.degenerated_edges: e.detach() self.g.sE.remove(e) else: for e in self.g.degenerated_edges: self.g.add_edge(e) @property def dirvh(self): return self.__dirvh @property def dirv(self): return self.__dirv @property def dirh(self): return self.__dirh @dirvh.setter def dirvh(self, dirvh): assert dirvh in range(4) self.__dirvh = dirvh self.__dirh, self.__dirv = {0: (1, -1), 1: (-1, -1), 2: (1, 1), 3: (-1, 1)}[dirvh] @dirv.setter def dirv(self, dirv): assert dirv in (-1, +1) dirvh = (dirv + 1) + (1 - self.__dirh) // 2 self.dirvh = dirvh @dirh.setter def dirh(self, dirh): assert dirh in (-1, +1) dirvh = (self.__dirv + 1) + (1 - dirh) // 2 self.dirvh = dirvh def rank_all(self, roots, optimize=False): """Computes rank of all vertices. add provided roots to rank 0 vertices, otherwise update ranking from provided roots. The initial rank is based on precedence relationships, optimal ranking may be derived from network flow (simplex). """ self._edge_inverter() r = [x for x in self.g.sV if (len(x.e_in()) == 0 and x not in roots)] self._rank_init(roots + r) if optimize: self._rank_optimize() self._edge_inverter() def _rank_init(self, unranked): """Computes rank of provided unranked list of vertices and all their children. A vertex will be asign a rank when all its inward edges have been *scanned*. When a vertex is asigned a rank, its outward edges are marked *scanned*. """ assert self.dag scan = {} # set rank of unranked based on its in-edges vertices ranks: while len(unranked) > 0: l = [] for v in unranked: self.setrank(v) # mark out-edges has scan-able: for e in v.e_out(): scan[e] = True # check if out-vertices are rank-able: for x in v.N(+1): if not (False in [scan.get(e, False) for e in x.e_in()]): if x not in l: l.append(x) unranked = l def _rank_optimize(self): """optimize ranking by pushing long edges toward lower layers as much as possible. see other interersting network flow solver to minimize total edge length (http://jgaa.info/accepted/2005/EiglspergerSiebenhallerKaufmann2005.9.3.pdf) """ assert self.dag for l in reversed(self.layers): for v in l: gv = self.grx[v] for x in v.N(-1): if all((self.grx[y].rank >= gv.rank for y in x.N(+1))): gx = self.grx[x] self.layers[gx.rank].remove(x) gx.rank = gv.rank - 1 self.layers[gv.rank - 1].append(x) def setrank(self, v): """set rank value for vertex v and add it to the corresponding layer. The Layer is created if it is the first vertex with this rank. """ assert self.dag r = max([self.grx[x].rank for x in v.N(-1)] + [-1]) + 1 self.grx[v].rank = r # add it to its layer: try: self.layers[r].append(v) except IndexError: assert r == len(self.layers) self.layers.append(Layer([v])) def dummyctrl(self, r, ctrl): """creates a DummyVertex at rank r inserted in the ctrl dict of the associated edge and layer. Arguments: r (int): rank value ctrl (dict): the edge's control vertices Returns: DummyVertex : the created DummyVertex. """ dv = DummyVertex(r) dv.view.w, dv.view.h = self.dw, self.dh self.grx[dv] = dv dv.ctrl = ctrl ctrl[r] = dv self.layers[r].append(dv) return dv def setdummies(self, e): """creates and defines all needed dummy vertices for edge e. """ v0, v1 = e.v r0, r1 = self.grx[v0].rank, self.grx[v1].rank if r0 > r1: assert e in self.alt_e v0, v1 = v1, v0 r0, r1 = r1, r0 if (r1 - r0) > 1: # "dummy vertices" are stored in the edge ctrl dict, # keyed by their rank in layers. ctrl = self.ctrls[e] = {} ctrl[r0] = v0 ctrl[r1] = v1 for r in range(r0 + 1, r1): self.dummyctrl(r, ctrl) def draw_step(self): """iterator that computes all vertices coordinates and edge routing after just one step (one layer after the other from top to bottom to top). Purely inefficient ! Use it only for "animation" or debugging purpose. """ ostep = self.ordering_step() for s in ostep: self.setxy() self.draw_edges() yield s def ordering_step(self, oneway=False): """iterator that computes all vertices ordering in their layers (one layer after the other from top to bottom, to top again unless oneway is True). """ self.dirv = -1 crossings = 0 for l in self.layers: mvmt = l.order() crossings += mvmt yield (l, mvmt) if oneway or (crossings == 0): return self.dirv = +1 while l: mvmt = l.order() yield (l, mvmt) l = l.nextlayer() def setxy(self): """computes all vertex coordinates (x,y) using an algorithm by Brandes & Kopf. """ self._edge_inverter() self._detect_alignment_conflicts() inf = float("infinity") # initialize vertex coordinates attributes: for l in self.layers: for v in l: self.grx[v].root = v self.grx[v].align = v self.grx[v].sink = v self.grx[v].shift = inf self.grx[v].X = None self.grx[v].x = [0.0] * 4 curvh = self.dirvh # save current dirvh value for dirvh in range(4): self.dirvh = dirvh self._coord_vertical_alignment() self._coord_horizontal_compact() self.dirvh = curvh # restore it # vertical coordinate assigment of all nodes: Y = 0 for l in self.layers: dY = max([v.view.h / 2.0 for v in l]) for v in l: vx = sorted(self.grx[v].x) # mean of the 2 medians out of the 4 x-coord computed above: avgm = (vx[1] + vx[2]) / 2.0 # final xy-coordinates : v.view.xy = (avgm, Y + dY) Y += 2 * dY + self.yspace self._edge_inverter() def _detect_alignment_conflicts(self): """mark conflicts between edges: inner edges are edges between dummy nodes type 0 is regular crossing regular (or sharing vertex) type 1 is inner crossing regular (targeted crossings) type 2 is inner crossing inner (avoided by reduce_crossings phase) """ curvh = self.dirvh # save current dirvh value self.dirvh = 0 self.conflicts = [] for L in self.layers: last = len(L) - 1 prev = L.prevlayer() if not prev: continue k0 = 0 k1_init = len(prev) - 1 l = 0 for l1, v in enumerate(L): if not self.grx[v].dummy: continue if l1 == last or v.inner(-1): k1 = k1_init if v.inner(-1): k1 = self.grx[v.N(-1)[-1]].pos for vl in L[l : l1 + 1]: for vk in L._neighbors(vl): k = self.grx[vk].pos if k < k0 or k > k1: self.conflicts.append((vk, vl)) l = l1 + 1 k0 = k1 self.dirvh = curvh # restore it def _coord_vertical_alignment(self): """performs vertical alignment according to current dirvh internal state. """ dirh, dirv = self.dirh, self.dirv g = self.grx for l in self.layers[::-dirv]: if not l.prevlayer(): continue r = None for vk in l[::dirh]: for m in l._medianindex(vk): # take the median node in dirv layer: um = l.prevlayer()[m] # if vk is "free" align it with um's root if g[vk].align is vk: if dirv == 1: vpair = (vk, um) else: vpair = (um, vk) # if vk<->um link is used for alignment if (vpair not in self.conflicts) and ( (r is None) or (dirh * r < dirh * m) ): g[um].align = vk g[vk].root = g[um].root g[vk].align = g[vk].root r = m def _coord_horizontal_compact(self): limit = getrecursionlimit() N = len(self.layers) + 10 if N > limit: setrecursionlimit(N) dirh, dirv = self.dirh, self.dirv g = self.grx L = self.layers[::-dirv] # recursive placement of blocks: for l in L: for v in l[::dirh]: if g[v].root is v: self.__place_block(v) setrecursionlimit(limit) # mirror all nodes if right-aligned: if dirh == -1: for l in L: for v in l: x = g[v].X if x: g[v].X = -x # then assign x-coord of its root: inf = float("infinity") rb = inf for l in L: for v in l[::dirh]: g[v].x[self.dirvh] = g[g[v].root].X rs = g[g[v].root].sink s = g[rs].shift if s < inf: g[v].x[self.dirvh] += dirh * s rb = min(rb, g[v].x[self.dirvh]) # normalize to 0, and reinit root/align/sink/shift/X for l in self.layers: for v in l: # g[v].x[dirvh] -= rb g[v].root = g[v].align = g[v].sink = v g[v].shift = inf g[v].X = None # TODO: rewrite in iterative form to avoid recursion limit... def __place_block(self, v): g = self.grx if g[v].X is None: # every block is initially placed at x=0 g[v].X = 0.0 # place block in which v belongs: w = v while 1: j = g[w].pos - self.dirh # predecessor in rank must be placed r = g[w].rank if 0 <= j < len(self.layers[r]): wprec = self.layers[r][j] delta = ( self.xspace + (wprec.view.w + w.view.w) / 2.0 ) # abs positive minimum displ. # take root and place block: u = g[wprec].root self.__place_block(u) # set sink as sink of prec-block root if g[v].sink is v: g[v].sink = g[u].sink if g[v].sink != g[u].sink: s = g[u].sink newshift = g[v].X - (g[u].X + delta) g[s].shift = min(g[s].shift, newshift) else: g[v].X = max(g[v].X, (g[u].X + delta)) # take next node to align in block: w = g[w].align # quit if self aligned if w is v: break def draw_edges(self): """Basic edge routing applied only for edges with dummy points. Enhanced edge routing can be performed by using the apropriate *route_with_xxx* functions from :ref:routing_ in the edges' view. """ for e in self.g.E(): if hasattr(e, "view"): l = [] if e in self.ctrls: D = self.ctrls[e] r0, r1 = self.grx[e.v[0]].rank, self.grx[e.v[1]].rank if r0 < r1: ranks = range(r0 + 1, r1) else: ranks = range(r0 - 1, r1, -1) l = [D[r].view.xy for r in ranks] l.insert(0, e.v[0].view.xy) l.append(e.v[1].view.xy) try: self.route_edge(e, l) except AttributeError: pass e.view.setpath(l) # DIRECTED GRAPH WITH CONSTRAINTS LAYOUT # ------------------------------------------------------------------------------ class DigcoLayout(object): linalg = importlib.import_module("mantis.grandalf.utils.geometry") def __init__(self, g): # drawing parameters: self.xspace = 10 self.yspace = 10 self.dr = 10 self.debug = False self.g = g self.levels = [] for i, v in enumerate(self.g.V()): assert hasattr(v, "view") v.i = i self.dr = max((self.dr, v.view.w, v.view.h)) # solver parameters: self._cg_max_iter = g.order() self._cg_tolerance = 1.0e-6 self._eps = 1.0e-5 self._cv_max_iter = self._cg_max_iter def init_all(self, alpha=0.1, beta=0.01): y = None if self.g.directed: # partition g in hierarchical levels: y = self.part_to_levels(alpha, beta) # initiate positions (y and random in x): self.Z = self._xyinit(y) def draw(self, N=None): if N is None: N = self._cv_max_iter self.Z = self._optimize(self.Z, limit=N) # set view xy from near-optimal coords matrix: for v in self.g.V(): v.view.xy = (self.Z[v.i][0, 0] * self.dr, self.Z[v.i][0, 1] * self.dr) self.draw_edges() def draw_step(self): for x in range(self._cv_max_iter): self.draw(N=1) self.draw_edges() yield # Basic edge routing with segments def draw_edges(self): for e in self.g.E(): if hasattr(e, "view"): l = [e.v[0].view.xy, e.v[1].view.xy] try: self.route_edge(e, l) except AttributeError: pass e.view.setpath(l) # partition the nodes into levels: def part_to_levels(self, alpha, beta): opty, err = self.optimal_arrangement() ordering = list(zip(opty, self.g.sV)) eps = alpha * (opty.max() - opty.min()) / (len(opty) - 1) eps = max(beta, eps) sorted(ordering, reverse=True) l = [] self.levels.append(l) for i in range(len(list(ordering)) - 1): y, v = ordering[i] l.append(v) v.level = self.levels.index(l) if (y - ordering[i + 1][0]) > eps: l = [] self.levels.append(l) y, v = ordering[-1] l.append(v) v.level = self.levels.index(l) return opty def optimal_arrangement(self): b = self.balance() y = DigcoLayout.linalg.rand_ortho1(self.g.order()) return self._conjugate_gradient_L(y, b) # balance vector is assembled in finite-element way... # this is faster than computing b[i] for each i. def balance(self): b = DigcoLayout.linalg.array([0.0] * self.g.order(), dtype=float) for e in self.g.E(): s = e.v[0] d = e.v[1] q = e.w * (self.yspace + (s.view.h + d.view.h) / 2.0) b[s.i] += q b[d.i] -= q return b # We compute the solution Y of L.Y = b by conjugate gradient method # (L is semi-definite positive so Y is unique and convergence is O(n)) # note that only arrays are involved here... def _conjugate_gradient_L(self, y, b): Lii = self.__Lii_() r = b - self.__L_pk(Lii, y) p = DigcoLayout.linalg.array(r, copy=True) rr = sum(r * r) for k in range(self._cg_max_iter): try: Lp = self.__L_pk(Lii, p) alpha = rr / sum(p * Lp) y += alpha / p r -= alpha * Lp newrr = sum(r * r) beta = newrr / rr rr = newrr if rr < self._cg_tolerance: break p = r + beta * p except ZeroDivisionError: return (None, rr) return (y, rr) # _xyinit can use diagonally scaled initial vertices positioning to provide # better convergence in constrained stress majorization def _xyinit(self, y=None): if y is None: y = DigcoLayout.linalg.rand_ortho1(self.g.order()) x = DigcoLayout.linalg.rand_ortho1(self.g.order()) # translate and normalize: x = x - x[0] y = y - y[0] sfactor = 1.0 / max(list(map(abs, y)) + list(map(abs, x))) return DigcoLayout.linalg.matrix(list(zip(x * sfactor, y * sfactor))) # provide the diagonal of the Laplacian matrix of g # the rest of L (sparse!) is already stored in every edges. def __Lii_(self): Lii = [] for v in self.g.V(): Lii.append(sum([e.w for e in v.e])) return DigcoLayout.linalg.array(Lii, dtype=float) # we don't compute the L.Pk matrix/vector product here since # L is sparse (order of |E| not |V|^2 !) so we let each edge # contribute to the resulting L.Pk vector in a FE assembly way... def __L_pk(self, Lii, pk): y = Lii * pk for e in self.g.sE: i1 = e.v[0].i i2 = e.v[1].i y[i1] -= e.w * pk[i2] y[i2] -= e.w * pk[i1] return y # conjugate_gradient with given matrix Lw: # it is assumed that b is not a multivector, # so _cg_Lw should be called in all directions separately. # note that everything is a matrix here, (arrays are row vectors only) def _cg_Lw(self, Lw, z, b): scal = lambda U, V: float(U.transpose() * V) r = b - Lw * z p = r.copy() rr = scal(r, r) for k in range(self._cg_max_iter): if rr < self._cg_tolerance: break Lp = Lw * p alpha = rr / scal(p, Lp) z = z + alpha * p r = r - alpha * Lp newrr = scal(r, r) beta = newrr / rr rr = newrr p = r + beta * p return (z, rr) def __Dij_(self): Dji = [] for v in self.g.V(): wd = self.g.dijkstra(v) Di = [wd[w] for w in self.g.V()] Dji.append(Di) # at this point D is stored by rows, # but anymway it's a symmetric matrix return DigcoLayout.linalg.matrix(Dji, dtype=float) # returns matrix -L^w def __Lij_w_(self): self.Dij = self.__Dij_() # we keep D also for L^Z computations Lij = self.Dij.copy() n = self.g.order() for i in range(n): d = 0 for j in range(n): if j == i: continue Lij[i, j] = 1.0 / self.Dij[i, j] ** 2 d += Lij[i, j] Lij[i, i] = -d return Lij # returns vector -L^Z.Z: def __Lij_Z_Z(self, Z): n = self.g.order() # init: lzz = Z.copy() * 0.0 # lzz has dim Z (n x 2) liz = DigcoLayout.linalg.matrix([0.0] * n) # liz is a row of L^Z (size n) # compute lzz = L^Z.Z while assembling L^Z by row (liz): for i in range(n): iterk_except_i = (k for k in range(n) if k != i) for k in iterk_except_i: v = Z[i] - Z[k] liz[0, k] = 1.0 / ( self.Dij[i, k] * DigcoLayout.linalg.sqrt(v * v.transpose()) ) liz[0, i] = 0.0 # forced, otherwise next liz.sum() is wrong ! liz[0, i] = -liz.sum() # now that we have the i-th row of L^Z, just dotprod with Z: lzz[i] = liz * Z return lzz def _optimize(self, Z, limit=100): Lw = self.__Lij_w_() K = self.g.order() * (self.g.order() - 1.0) / 2.0 stress = float("inf") count = 0 deep = 0 b = self.__Lij_Z_Z(Z) while count < limit: if self.debug: print("count %d" % count) print("Z = ", Z) print("b = ", b) # find next Z by solving Lw.Z = b in every direction: x, xerr = self._cg_Lw(Lw[1:, 1:], Z[1:, 0], b[1:, 0]) y, yerr = self._cg_Lw(Lw[1:, 1:], Z[1:, 1], b[1:, 1]) Z[1:, 0] = x Z[1:, 1] = y if self.debug: print(" cg -> ") print(Z, xerr, yerr) # compute new stress: FZ = K - float(x.transpose() * b[1:, 0] + y.transpose() * b[1:, 1]) # precompute new b: b = self.__Lij_Z_Z(Z) # update new stress: FZ += 2 * float(x.transpose() * b[1:, 0] + y.transpose() * b[1:, 1]) # test convergence: print("stress=%.10f" % FZ) if stress == 0.0: break elif abs((stress - FZ) / stress) < self._eps: if deep == 2: break else: deep += 1 stress = FZ count += 1 return Z # ------------------------------------------------------------------------------ class DwyerLayout(DigcoLayout): pass