scala - How to Parallel Prims Algorithm in Graphx -


so i'm trying write parallel algorithm prims algorithm cant quite figure out how using spark graphx. i've looked pretty hard resources there aren't lot of examples of implementing shortest path algorithms in graphx. think need use divide , conquer , split graph sub graphs , merge msts.

graphx resource: http://ampcamp.berkeley.edu/big-data-mini-course/graph-analytics-with-graphx.html#the-property-graph

parallel prims resource: https://www8.cs.umu.se/kurser/5dv050/vt10/handouts/f10.pdf

code:

import org.apache.spark._ import org.apache.log4j.logger import org.apache.log4j.level import org.apache.spark.sparkcontext import org.apache.spark.sparkcontext._ import org.apache.spark.sparkconf import org.apache.spark.graphx._ import org.apache.spark.rdd.rdd import org.apache.spark.graphx.util._  object parallelprims {   logger.getlogger("org").setlevel(level.off)   logger.getlogger("akka").setlevel(level.off)   def main(args: array[string]) {     val conf = new sparkconf().setappname("parallel prims").setmaster("local")     val sc = new sparkcontext(conf)     val logfile = "nodedata.txt"      val logdata = sc.textfile(logfile, 2).cache()     // splitting off header node     val headerandrows = logdata.map(line => line.split(",").map(_.trim))     val header = headerandrows.first     val data = headerandrows.filter(_(0) != header(0))     // parse number of nodes , edges header     val numnodes = header(0).toint     val numedges = header(1).toint      val vertexarray = new array[(long, string)](numnodes)      val edgearray = new array[edge[int]](numedges)     // create vertex array     var count = 0     (count <- 0 numnodes - 1) {       vertexarray(count) = (count.tolong + 1, ("v" + (count + 1)).tostring())     }     count = 0     val rrdarr = data.take(data.count.toint)     // create edge array     (count <- 0 (numedges - 1)) {       val line = rrdarr(count)       val cols = line.tolist       val edge = edge(cols(0).tolong, cols(1).tolong, cols(2).toint)       edgearray(count) = edge(cols(0).tolong, cols(1).tolong, cols(2).toint)     }     // creating graphx graph     val vertexrdd: rdd[(long, (string))] = sc.parallelize(vertexarray)     val edgerdd: rdd[edge[int]] = sc.parallelize(edgearray)      val graph: graph[string, int] = graph(vertexrdd, edgerdd)      graph.triplets.take(6).foreach(println)    }  } 

nodedata.txt

4,6 1,2,5 1,3,8 1,4,4 2,3,8 2,4,7 3,4,1 

output

((1,v1),(2,v2),5) ((1,v1),(3,v3),8) ((1,v1),(4,v4),4) ((2,v2),(3,v3),8) ((2,v2),(4,v4),7) ((3,v3),(4,v4),1) 

here version of prims algorithm.

var graph : graph [string, int] = ...  // empty rdd mst var mst = sc.parallelize(array[edgetriplet[int, int]]())  // pick random vertex graph var vt: rdd[vertexid] = sc.parallelize(array(graph.pickrandomvertex))  // until vertices in vt set  val vcount = graph.vertices.count while (vt.count < vcount) {    // rdd make inner joins   val hvt = vt.map(x => (x, x))     // add key make inner join   val bysrc = graph.triplets.map(triplet => (triplet.srcid, triplet))    // add key make inner join   val bydst = graph.triplets.map(triplet => (triplet.dstid, triplet))     // triplet source vertex in vt   val bysrcjoined = bysrc.join(hvt).map(_._2._1)     // triplet destinaiton vertex in vt   val bydstjoined = bydst.join(hvt).map(_._2._1)    // sum previous 2 rdds , substract triplets both source , destination vertex in vt   val candidates = bysrcjoined.union(bydstjoined).subtract(bydstjoined.intersection(bysrcjoined))    // find triplet least weight   val triplet = candidates.sortby(triplet => triplet.attr).first    // add triplet mst    mst = mst.union(sc.parallelize(array(triplet)))    // find out whether should add source or destinaiton vertex vt   if (!vt.filter(x => x == triplet.srcid).isempty) {     vt = vt.union(sc.parallelize(array(triplet.dstid)))   } else {     vt = vt.union(sc.parallelize(array(triplet.srcid)))   } }  // final minimum spanning tree mst.collect.foreach(p => println(p.srcid + " " + p.attr + " " + p.dstid)) 

Comments

Popular posts from this blog

Load Balancing in Bluemix using custom domain and DNS SRV records -

oracle - pls-00402 alias required in select list of cursor to avoid duplicate column names -

python - Consider setting $PYTHONHOME to <prefix>[:<exec_prefix>] error -