Merge pull request #7953 from norbertoritzmann/master
[BAEL-3086] Changes related to Apache Spark GraphX article
This commit is contained in:
commit
f28029e59e
|
@ -27,6 +27,18 @@
|
||||||
<version>${org.apache.spark.spark-sql.version}</version>
|
<version>${org.apache.spark.spark-sql.version}</version>
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.apache.spark</groupId>
|
||||||
|
<artifactId>spark-graphx_2.11</artifactId>
|
||||||
|
<version>${org.apache.spark.spark-graphx.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>graphframes</groupId>
|
||||||
|
<artifactId>graphframes</artifactId>
|
||||||
|
<version>${graphframes.version}</version>
|
||||||
|
<scope>provided</scope>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.apache.spark</groupId>
|
<groupId>org.apache.spark</groupId>
|
||||||
<artifactId>spark-streaming_2.11</artifactId>
|
<artifactId>spark-streaming_2.11</artifactId>
|
||||||
|
@ -82,9 +94,17 @@
|
||||||
<org.apache.spark.spark-sql.version>2.3.0</org.apache.spark.spark-sql.version>
|
<org.apache.spark.spark-sql.version>2.3.0</org.apache.spark.spark-sql.version>
|
||||||
<org.apache.spark.spark-streaming.version>2.3.0</org.apache.spark.spark-streaming.version>
|
<org.apache.spark.spark-streaming.version>2.3.0</org.apache.spark.spark-streaming.version>
|
||||||
<org.apache.spark.spark-mllib.version>2.3.0</org.apache.spark.spark-mllib.version>
|
<org.apache.spark.spark-mllib.version>2.3.0</org.apache.spark.spark-mllib.version>
|
||||||
|
<org.apache.spark.spark-graphx.version>2.3.0</org.apache.spark.spark-graphx.version>
|
||||||
|
<graphframes.version>0.7.0-spark2.4-s_2.11</graphframes.version>
|
||||||
<org.apache.spark.spark-streaming-kafka.version>2.3.0</org.apache.spark.spark-streaming-kafka.version>
|
<org.apache.spark.spark-streaming-kafka.version>2.3.0</org.apache.spark.spark-streaming-kafka.version>
|
||||||
<com.datastax.spark.spark-cassandra-connector.version>2.3.0</com.datastax.spark.spark-cassandra-connector.version>
|
<com.datastax.spark.spark-cassandra-connector.version>2.3.0</com.datastax.spark.spark-cassandra-connector.version>
|
||||||
<com.datastax.spark.spark-cassandra-connector-java.version>1.5.2</com.datastax.spark.spark-cassandra-connector-java.version>
|
<com.datastax.spark.spark-cassandra-connector-java.version>1.5.2</com.datastax.spark.spark-cassandra-connector-java.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
|
<repositories>
|
||||||
|
<repository>
|
||||||
|
<id>SparkPackagesRepo</id>
|
||||||
|
<url>http://dl.bintray.com/spark-packages/maven</url>
|
||||||
|
</repository>
|
||||||
|
</repositories>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
package com.baeldung.graphframes;
|
||||||
|
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.apache.log4j.Logger;
|
||||||
|
import org.apache.spark.api.java.function.VoidFunction;
|
||||||
|
import org.apache.spark.graphx.Edge;
|
||||||
|
import org.apache.spark.graphx.Graph;
|
||||||
|
import org.apache.spark.graphx.VertexRDD;
|
||||||
|
import org.graphframes.GraphFrame;
|
||||||
|
import scala.Tuple2;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class GraphExperiments {
|
||||||
|
public static Map<Long, User> USERS = new HashMap<>();
|
||||||
|
|
||||||
|
public static void main(String[] args) throws IOException {
|
||||||
|
Logger.getLogger("org").setLevel(Level.OFF);
|
||||||
|
GraphLoader loader = new GraphLoader();
|
||||||
|
GraphFrame graph = loader.getGraphFrameUserRelationship();
|
||||||
|
|
||||||
|
GraphExperiments experiments = new GraphExperiments();
|
||||||
|
experiments.doGraphFrameOperations(graph);
|
||||||
|
experiments.doGraphFrameAlgorithms(graph);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doGraphFrameOperations(GraphFrame graph) {
|
||||||
|
graph.vertices().show();
|
||||||
|
graph.edges().show();
|
||||||
|
|
||||||
|
graph.vertices().filter("name = 'Martin'").show();
|
||||||
|
|
||||||
|
graph.filterEdges("type = 'Friend'")
|
||||||
|
.dropIsolatedVertices().vertices().show();
|
||||||
|
|
||||||
|
graph.degrees().show();
|
||||||
|
graph.inDegrees().show();
|
||||||
|
graph.outDegrees().show();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doGraphFrameAlgorithms(GraphFrame graph) {
|
||||||
|
|
||||||
|
graph.pageRank().maxIter(20).resetProbability(0.15).run().vertices().show();
|
||||||
|
|
||||||
|
graph.connectedComponents().run().show();
|
||||||
|
|
||||||
|
graph.triangleCount().run().show();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,72 @@
|
||||||
|
package com.baeldung.graphframes;
|
||||||
|
|
||||||
|
import org.apache.spark.SparkConf;
|
||||||
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
|
import org.apache.spark.sql.Dataset;
|
||||||
|
import org.apache.spark.sql.Row;
|
||||||
|
import org.apache.spark.sql.SparkSession;
|
||||||
|
import org.graphframes.GraphFrame;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
public class GraphLoader {
|
||||||
|
|
||||||
|
public JavaSparkContext getSparkContext() throws IOException {
|
||||||
|
Path temp = Files.createTempDirectory("sparkGraphFrames");
|
||||||
|
SparkConf sparkConf = new SparkConf().setAppName("SparkGraphX").setMaster("local[*]");
|
||||||
|
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
|
||||||
|
javaSparkContext.setCheckpointDir(temp.toString());
|
||||||
|
return javaSparkContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
public GraphFrame getGraphFrameUserRelationship() throws IOException {
|
||||||
|
Path temp = Files.createTempDirectory("sparkGraphFrames");
|
||||||
|
SparkSession session = SparkSession.builder()
|
||||||
|
.appName("SparkGraphFrameSample")
|
||||||
|
.config("spark.sql.warehouse.dir", temp.toString())
|
||||||
|
.sparkContext(getSparkContext().sc())
|
||||||
|
.master("local[*]")
|
||||||
|
.getOrCreate();
|
||||||
|
List<User> users = loadUsers();
|
||||||
|
|
||||||
|
Dataset<Row> userDataset = session.createDataFrame(users, User.class);
|
||||||
|
|
||||||
|
List<Relationship> relationshipsList = getRelations();
|
||||||
|
Dataset<Row> relationshipDataset = session.createDataFrame(relationshipsList, Relationship.class);
|
||||||
|
|
||||||
|
GraphFrame graphFrame = new GraphFrame(userDataset, relationshipDataset);
|
||||||
|
|
||||||
|
return graphFrame;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Relationship> getRelations() {
|
||||||
|
List<Relationship> relationships = new ArrayList<>();
|
||||||
|
relationships.add(new Relationship("Friend", "1", "2"));
|
||||||
|
relationships.add(new Relationship("Following", "1", "4"));
|
||||||
|
relationships.add(new Relationship("Friend", "2", "4"));
|
||||||
|
relationships.add(new Relationship("Relative", "3", "1"));
|
||||||
|
relationships.add(new Relationship("Relative", "3", "4"));
|
||||||
|
|
||||||
|
return relationships;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<User> loadUsers() {
|
||||||
|
User john = new User(1L, "John");
|
||||||
|
User martin = new User(2L, "Martin");
|
||||||
|
User peter = new User(3L, "Peter");
|
||||||
|
User alicia = new User(4L, "Alicia");
|
||||||
|
|
||||||
|
List<User> users = new ArrayList<>();
|
||||||
|
|
||||||
|
users.add(new User(1L, "John"));
|
||||||
|
users.add(new User(2L, "Martin"));
|
||||||
|
users.add(new User(3L, "Peter"));
|
||||||
|
users.add(new User(4L, "Alicia"));
|
||||||
|
|
||||||
|
return users;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,39 @@
|
||||||
|
package com.baeldung.graphframes;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.UUID;
|
||||||
|
|
||||||
|
public class Relationship implements Serializable {
|
||||||
|
private String type;
|
||||||
|
private String src;
|
||||||
|
private String dst;
|
||||||
|
private UUID id;
|
||||||
|
|
||||||
|
public Relationship(String type, String src, String dst) {
|
||||||
|
this.type = type;
|
||||||
|
this.src = src;
|
||||||
|
this.dst = dst;
|
||||||
|
this.id = UUID.randomUUID();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return id.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getType() {
|
||||||
|
return type;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getSrc() {
|
||||||
|
return src;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDst() {
|
||||||
|
return dst;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return getSrc() + " -- " + getType() + " --> " + getDst();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,27 @@
|
||||||
|
package com.baeldung.graphframes;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
|
||||||
|
public class User implements Serializable {
|
||||||
|
|
||||||
|
private Long id;
|
||||||
|
private String name;
|
||||||
|
|
||||||
|
public User(long id, String name) {
|
||||||
|
this.id = id;
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getId() {
|
||||||
|
return id.toString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "<" + id + "," + name + ">";
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue