scala - How to Parallel Prims Algorithm in Graphx -
so i'm trying write parallel algorithm prims algorithm cant quite figure out how using spark graphx. i've looked pretty hard resources there aren't lot of examples of implementing shortest path algorithms in graphx. think need use divide , conquer , split graph sub graphs , merge msts.
graphx resource: http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html#the-property-graph
parallel prims resource: https://www8.cs.umu.se/kurser/5dv050/vt10/handouts/f10.pdf
code:
import org.apache.spark._ import org.apache.log4j.logger import org.apache.log4j.level import org.apache.spark.sparkcontext import org.apache.spark.sparkcontext._ import org.apache.spark.sparkconf import org.apache.spark.graphx._ import org.apache.spark.rdd.rdd import org.apache.spark.graphx.util._ object parallelprims { logger.getlogger("org").setlevel(level.off) logger.getlogger("akka").setlevel(level.off) def main(args: array[string]) { val conf = new sparkconf().setappname("parallel prims").setmaster("local") val sc = new sparkcontext(conf) val logfile = "nodedata.txt" val logdata = sc.textfile(logfile, 2).cache() // splitting off header node val headerandrows = logdata.map(line => line.split(",").map(_.trim)) val header = headerandrows.first val data = headerandrows.filter(_(0) != header(0)) // parse number of nodes , edges header val numnodes = header(0).toint val numedges = header(1).toint val vertexarray = new array[(long, string)](numnodes) val edgearray = new array[edge[int]](numedges) // create vertex array var count = 0 (count <- 0 numnodes - 1) { vertexarray(count) = (count.tolong + 1, ("v" + (count + 1)).tostring()) } count = 0 val rrdarr = data.take(data.count.toint) // create edge array (count <- 0 (numedges - 1)) { val line = rrdarr(count) val cols = line.tolist val edge = edge(cols(0).tolong, cols(1).tolong, cols(2).toint) edgearray(count) = edge(cols(0).tolong, cols(1).tolong, cols(2).toint) } // creating graphx graph val vertexrdd: rdd[(long, (string))] = sc.parallelize(vertexarray) val edgerdd: rdd[edge[int]] = sc.parallelize(edgearray) val graph: graph[string, int] = graph(vertexrdd, edgerdd) graph.triplets.take(6).foreach(println) } }
nodedata.txt
4,6 1,2,5 1,3,8 1,4,4 2,3,8 2,4,7 3,4,1
output
((1,v1),(2,v2),5) ((1,v1),(3,v3),8) ((1,v1),(4,v4),4) ((2,v2),(3,v3),8) ((2,v2),(4,v4),7) ((3,v3),(4,v4),1)
here version of prims algorithm.
var graph : graph [string, int] = ... // empty rdd mst var mst = sc.parallelize(array[edgetriplet[int, int]]()) // pick random vertex graph var vt: rdd[vertexid] = sc.parallelize(array(graph.pickrandomvertex)) // until vertices in vt set val vcount = graph.vertices.count while (vt.count < vcount) { // rdd make inner joins val hvt = vt.map(x => (x, x)) // add key make inner join val bysrc = graph.triplets.map(triplet => (triplet.srcid, triplet)) // add key make inner join val bydst = graph.triplets.map(triplet => (triplet.dstid, triplet)) // triplet source vertex in vt val bysrcjoined = bysrc.join(hvt).map(_._2._1) // triplet destinaiton vertex in vt val bydstjoined = bydst.join(hvt).map(_._2._1) // sum previous 2 rdds , substract triplets both source , destination vertex in vt val candidates = bysrcjoined.union(bydstjoined).subtract(bydstjoined.intersection(bysrcjoined)) // find triplet least weight val triplet = candidates.sortby(triplet => triplet.attr).first // add triplet mst mst = mst.union(sc.parallelize(array(triplet))) // find out whether should add source or destinaiton vertex vt if (!vt.filter(x => x == triplet.srcid).isempty) { vt = vt.union(sc.parallelize(array(triplet.dstid))) } else { vt = vt.union(sc.parallelize(array(triplet.srcid))) } } // final minimum spanning tree mst.collect.foreach(p => println(p.srcid + " " + p.attr + " " + p.dstid))
Comments
Post a Comment