diff --git a/apache-spark/pom.xml b/apache-spark/pom.xml index 41fd51d9fd..4eb00192ac 100644 --- a/apache-spark/pom.xml +++ b/apache-spark/pom.xml @@ -27,6 +27,18 @@ ${org.apache.spark.spark-sql.version} provided + + org.apache.spark + spark-graphx_2.11 + ${org.apache.spark.spark-graphx.version} + provided + + + graphframes + graphframes + ${graphframes.version} + provided + org.apache.spark spark-streaming_2.11 @@ -82,9 +94,17 @@ 2.3.0 2.3.0 2.3.0 + 2.3.0 + 0.7.0-spark2.4-s_2.11 2.3.0 2.3.0 1.5.2 + + + SparkPackagesRepo + http://dl.bintray.com/spark-packages/maven + + diff --git a/apache-spark/src/main/java/com/baeldung/graphframes/GraphExperiments.java b/apache-spark/src/main/java/com/baeldung/graphframes/GraphExperiments.java new file mode 100644 index 0000000000..30524a8c8b --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphframes/GraphExperiments.java @@ -0,0 +1,52 @@ +package com.baeldung.graphframes; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.spark.api.java.function.VoidFunction; +import org.apache.spark.graphx.Edge; +import org.apache.spark.graphx.Graph; +import org.apache.spark.graphx.VertexRDD; +import org.graphframes.GraphFrame; +import scala.Tuple2; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class GraphExperiments { + public static Map USERS = new HashMap<>(); + + public static void main(String[] args) throws IOException { + Logger.getLogger("org").setLevel(Level.OFF); + GraphLoader loader = new GraphLoader(); + GraphFrame graph = loader.getGraphFrameUserRelationship(); + + GraphExperiments experiments = new GraphExperiments(); + experiments.doGraphFrameOperations(graph); + experiments.doGraphFrameAlgorithms(graph); + } + + private void doGraphFrameOperations(GraphFrame graph) { + graph.vertices().show(); + graph.edges().show(); + + graph.vertices().filter("name = 'Martin'").show(); + + graph.filterEdges("type = 'Friend'") + .dropIsolatedVertices().vertices().show(); + + graph.degrees().show(); + graph.inDegrees().show(); + graph.outDegrees().show(); + } + + private void doGraphFrameAlgorithms(GraphFrame graph) { + + graph.pageRank().maxIter(20).resetProbability(0.15).run().vertices().show(); + + graph.connectedComponents().run().show(); + + graph.triangleCount().run().show(); + } + +} diff --git a/apache-spark/src/main/java/com/baeldung/graphframes/GraphLoader.java b/apache-spark/src/main/java/com/baeldung/graphframes/GraphLoader.java new file mode 100644 index 0000000000..cad1fb4e26 --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphframes/GraphLoader.java @@ -0,0 +1,72 @@ +package com.baeldung.graphframes; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.graphframes.GraphFrame; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +public class GraphLoader { + + public JavaSparkContext getSparkContext() throws IOException { + Path temp = Files.createTempDirectory("sparkGraphFrames"); + SparkConf sparkConf = new SparkConf().setAppName("SparkGraphX").setMaster("local[*]"); + JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf); + javaSparkContext.setCheckpointDir(temp.toString()); + return javaSparkContext; + } + + public GraphFrame getGraphFrameUserRelationship() throws IOException { + Path temp = Files.createTempDirectory("sparkGraphFrames"); + SparkSession session = SparkSession.builder() + .appName("SparkGraphFrameSample") + .config("spark.sql.warehouse.dir", temp.toString()) + .sparkContext(getSparkContext().sc()) + .master("local[*]") + .getOrCreate(); + List users = loadUsers(); + + Dataset userDataset = session.createDataFrame(users, User.class); + + List relationshipsList = getRelations(); + Dataset relationshipDataset = session.createDataFrame(relationshipsList, Relationship.class); + + GraphFrame graphFrame = new GraphFrame(userDataset, relationshipDataset); + + return graphFrame; + } + + public List getRelations() { + List relationships = new ArrayList<>(); + relationships.add(new Relationship("Friend", "1", "2")); + relationships.add(new Relationship("Following", "1", "4")); + relationships.add(new Relationship("Friend", "2", "4")); + relationships.add(new Relationship("Relative", "3", "1")); + relationships.add(new Relationship("Relative", "3", "4")); + + return relationships; + } + + private List loadUsers() { + User john = new User(1L, "John"); + User martin = new User(2L, "Martin"); + User peter = new User(3L, "Peter"); + User alicia = new User(4L, "Alicia"); + + List users = new ArrayList<>(); + + users.add(new User(1L, "John")); + users.add(new User(2L, "Martin")); + users.add(new User(3L, "Peter")); + users.add(new User(4L, "Alicia")); + + return users; + } +} diff --git a/apache-spark/src/main/java/com/baeldung/graphframes/Relationship.java b/apache-spark/src/main/java/com/baeldung/graphframes/Relationship.java new file mode 100644 index 0000000000..ce1780ea3f --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphframes/Relationship.java @@ -0,0 +1,39 @@ +package com.baeldung.graphframes; + +import java.io.Serializable; +import java.util.UUID; + +public class Relationship implements Serializable { + private String type; + private String src; + private String dst; + private UUID id; + + public Relationship(String type, String src, String dst) { + this.type = type; + this.src = src; + this.dst = dst; + this.id = UUID.randomUUID(); + } + + public String getId() { + return id.toString(); + } + + public String getType() { + return type; + } + + public String getSrc() { + return src; + } + + public String getDst() { + return dst; + } + + @Override + public String toString() { + return getSrc() + " -- " + getType() + " --> " + getDst(); + } +} diff --git a/apache-spark/src/main/java/com/baeldung/graphframes/User.java b/apache-spark/src/main/java/com/baeldung/graphframes/User.java new file mode 100644 index 0000000000..50022a1da1 --- /dev/null +++ b/apache-spark/src/main/java/com/baeldung/graphframes/User.java @@ -0,0 +1,27 @@ +package com.baeldung.graphframes; + +import java.io.Serializable; + +public class User implements Serializable { + + private Long id; + private String name; + + public User(long id, String name) { + this.id = id; + this.name = name; + } + + public String getId() { + return id.toString(); + } + + public String getName() { + return name; + } + + @Override + public String toString() { + return "<" + id + "," + name + ">"; + } +}