Adding source code for the tutorial tracked under BAEL-3203. (#7600)

This commit is contained in:
Kumar Chandrakant 2019-08-24 02:02:02 +05:30 committed by Grzegorz Piwowarek
parent 7bcfd6bfa4
commit 8e190c76af
11 changed files with 332 additions and 62 deletions

150
apache-spark/data/iris.data Normal file
View File

@ -0,0 +1,150 @@
5.1,3.5,1.4,0.2,Iris-setosa
4.9,3.0,1.4,0.2,Iris-setosa
4.7,3.2,1.3,0.2,Iris-setosa
4.6,3.1,1.5,0.2,Iris-setosa
5.0,3.6,1.4,0.2,Iris-setosa
5.4,3.9,1.7,0.4,Iris-setosa
4.6,3.4,1.4,0.3,Iris-setosa
5.0,3.4,1.5,0.2,Iris-setosa
4.4,2.9,1.4,0.2,Iris-setosa
4.9,3.1,1.5,0.1,Iris-setosa
5.4,3.7,1.5,0.2,Iris-setosa
4.8,3.4,1.6,0.2,Iris-setosa
4.8,3.0,1.4,0.1,Iris-setosa
4.3,3.0,1.1,0.1,Iris-setosa
5.8,4.0,1.2,0.2,Iris-setosa
5.7,4.4,1.5,0.4,Iris-setosa
5.4,3.9,1.3,0.4,Iris-setosa
5.1,3.5,1.4,0.3,Iris-setosa
5.7,3.8,1.7,0.3,Iris-setosa
5.1,3.8,1.5,0.3,Iris-setosa
5.4,3.4,1.7,0.2,Iris-setosa
5.1,3.7,1.5,0.4,Iris-setosa
4.6,3.6,1.0,0.2,Iris-setosa
5.1,3.3,1.7,0.5,Iris-setosa
4.8,3.4,1.9,0.2,Iris-setosa
5.0,3.0,1.6,0.2,Iris-setosa
5.0,3.4,1.6,0.4,Iris-setosa
5.2,3.5,1.5,0.2,Iris-setosa
5.2,3.4,1.4,0.2,Iris-setosa
4.7,3.2,1.6,0.2,Iris-setosa
4.8,3.1,1.6,0.2,Iris-setosa
5.4,3.4,1.5,0.4,Iris-setosa
5.2,4.1,1.5,0.1,Iris-setosa
5.5,4.2,1.4,0.2,Iris-setosa
4.9,3.1,1.5,0.1,Iris-setosa
5.0,3.2,1.2,0.2,Iris-setosa
5.5,3.5,1.3,0.2,Iris-setosa
4.9,3.1,1.5,0.1,Iris-setosa
4.4,3.0,1.3,0.2,Iris-setosa
5.1,3.4,1.5,0.2,Iris-setosa
5.0,3.5,1.3,0.3,Iris-setosa
4.5,2.3,1.3,0.3,Iris-setosa
4.4,3.2,1.3,0.2,Iris-setosa
5.0,3.5,1.6,0.6,Iris-setosa
5.1,3.8,1.9,0.4,Iris-setosa
4.8,3.0,1.4,0.3,Iris-setosa
5.1,3.8,1.6,0.2,Iris-setosa
4.6,3.2,1.4,0.2,Iris-setosa
5.3,3.7,1.5,0.2,Iris-setosa
5.0,3.3,1.4,0.2,Iris-setosa
7.0,3.2,4.7,1.4,Iris-versicolor
6.4,3.2,4.5,1.5,Iris-versicolor
6.9,3.1,4.9,1.5,Iris-versicolor
5.5,2.3,4.0,1.3,Iris-versicolor
6.5,2.8,4.6,1.5,Iris-versicolor
5.7,2.8,4.5,1.3,Iris-versicolor
6.3,3.3,4.7,1.6,Iris-versicolor
4.9,2.4,3.3,1.0,Iris-versicolor
6.6,2.9,4.6,1.3,Iris-versicolor
5.2,2.7,3.9,1.4,Iris-versicolor
5.0,2.0,3.5,1.0,Iris-versicolor
5.9,3.0,4.2,1.5,Iris-versicolor
6.0,2.2,4.0,1.0,Iris-versicolor
6.1,2.9,4.7,1.4,Iris-versicolor
5.6,2.9,3.6,1.3,Iris-versicolor
6.7,3.1,4.4,1.4,Iris-versicolor
5.6,3.0,4.5,1.5,Iris-versicolor
5.8,2.7,4.1,1.0,Iris-versicolor
6.2,2.2,4.5,1.5,Iris-versicolor
5.6,2.5,3.9,1.1,Iris-versicolor
5.9,3.2,4.8,1.8,Iris-versicolor
6.1,2.8,4.0,1.3,Iris-versicolor
6.3,2.5,4.9,1.5,Iris-versicolor
6.1,2.8,4.7,1.2,Iris-versicolor
6.4,2.9,4.3,1.3,Iris-versicolor
6.6,3.0,4.4,1.4,Iris-versicolor
6.8,2.8,4.8,1.4,Iris-versicolor
6.7,3.0,5.0,1.7,Iris-versicolor
6.0,2.9,4.5,1.5,Iris-versicolor
5.7,2.6,3.5,1.0,Iris-versicolor
5.5,2.4,3.8,1.1,Iris-versicolor
5.5,2.4,3.7,1.0,Iris-versicolor
5.8,2.7,3.9,1.2,Iris-versicolor
6.0,2.7,5.1,1.6,Iris-versicolor
5.4,3.0,4.5,1.5,Iris-versicolor
6.0,3.4,4.5,1.6,Iris-versicolor
6.7,3.1,4.7,1.5,Iris-versicolor
6.3,2.3,4.4,1.3,Iris-versicolor
5.6,3.0,4.1,1.3,Iris-versicolor
5.5,2.5,4.0,1.3,Iris-versicolor
5.5,2.6,4.4,1.2,Iris-versicolor
6.1,3.0,4.6,1.4,Iris-versicolor
5.8,2.6,4.0,1.2,Iris-versicolor
5.0,2.3,3.3,1.0,Iris-versicolor
5.6,2.7,4.2,1.3,Iris-versicolor
5.7,3.0,4.2,1.2,Iris-versicolor
5.7,2.9,4.2,1.3,Iris-versicolor
6.2,2.9,4.3,1.3,Iris-versicolor
5.1,2.5,3.0,1.1,Iris-versicolor
5.7,2.8,4.1,1.3,Iris-versicolor
6.3,3.3,6.0,2.5,Iris-virginica
5.8,2.7,5.1,1.9,Iris-virginica
7.1,3.0,5.9,2.1,Iris-virginica
6.3,2.9,5.6,1.8,Iris-virginica
6.5,3.0,5.8,2.2,Iris-virginica
7.6,3.0,6.6,2.1,Iris-virginica
4.9,2.5,4.5,1.7,Iris-virginica
7.3,2.9,6.3,1.8,Iris-virginica
6.7,2.5,5.8,1.8,Iris-virginica
7.2,3.6,6.1,2.5,Iris-virginica
6.5,3.2,5.1,2.0,Iris-virginica
6.4,2.7,5.3,1.9,Iris-virginica
6.8,3.0,5.5,2.1,Iris-virginica
5.7,2.5,5.0,2.0,Iris-virginica
5.8,2.8,5.1,2.4,Iris-virginica
6.4,3.2,5.3,2.3,Iris-virginica
6.5,3.0,5.5,1.8,Iris-virginica
7.7,3.8,6.7,2.2,Iris-virginica
7.7,2.6,6.9,2.3,Iris-virginica
6.0,2.2,5.0,1.5,Iris-virginica
6.9,3.2,5.7,2.3,Iris-virginica
5.6,2.8,4.9,2.0,Iris-virginica
7.7,2.8,6.7,2.0,Iris-virginica
6.3,2.7,4.9,1.8,Iris-virginica
6.7,3.3,5.7,2.1,Iris-virginica
7.2,3.2,6.0,1.8,Iris-virginica
6.2,2.8,4.8,1.8,Iris-virginica
6.1,3.0,4.9,1.8,Iris-virginica
6.4,2.8,5.6,2.1,Iris-virginica
7.2,3.0,5.8,1.6,Iris-virginica
7.4,2.8,6.1,1.9,Iris-virginica
7.9,3.8,6.4,2.0,Iris-virginica
6.4,2.8,5.6,2.2,Iris-virginica
6.3,2.8,5.1,1.5,Iris-virginica
6.1,2.6,5.6,1.4,Iris-virginica
7.7,3.0,6.1,2.3,Iris-virginica
6.3,3.4,5.6,2.4,Iris-virginica
6.4,3.1,5.5,1.8,Iris-virginica
6.0,3.0,4.8,1.8,Iris-virginica
6.9,3.1,5.4,2.1,Iris-virginica
6.7,3.1,5.6,2.4,Iris-virginica
6.9,3.1,5.1,2.3,Iris-virginica
5.8,2.7,5.1,1.9,Iris-virginica
6.8,3.2,5.9,2.3,Iris-virginica
6.7,3.3,5.7,2.5,Iris-virginica
6.7,3.0,5.2,2.3,Iris-virginica
6.3,2.5,5.0,1.9,Iris-virginica
6.5,3.0,5.2,2.0,Iris-virginica
6.2,3.4,5.4,2.3,Iris-virginica
5.9,3.0,5.1,1.8,Iris-virginica

View File

@ -0,0 +1 @@
{"class":"org.apache.spark.mllib.classification.LogisticRegressionModel","version":"1.0","numFeatures":4,"numClasses":3}

View File

@ -1,31 +1,32 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" <project xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
<modelVersion>4.0.0</modelVersion> xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>com.baeldung</groupId> <modelVersion>4.0.0</modelVersion>
<artifactId>apache-spark</artifactId> <groupId>com.baeldung</groupId>
<version>1.0-SNAPSHOT</version> <artifactId>apache-spark</artifactId>
<name>apache-spark</name> <version>1.0-SNAPSHOT</version>
<packaging>jar</packaging> <name>apache-spark</name>
<url>http://maven.apache.org</url> <packaging>jar</packaging>
<url>http://maven.apache.org</url>
<parent> <parent>
<groupId>com.baeldung</groupId> <groupId>com.baeldung</groupId>
<artifactId>parent-modules</artifactId> <artifactId>parent-modules</artifactId>
<version>1.0.0-SNAPSHOT</version> <version>1.0.0-SNAPSHOT</version>
</parent> </parent>
<dependencies> <dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${org.apache.spark.spark-core.version}</version>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId> <artifactId>spark-core_2.11</artifactId>
<version>${org.apache.spark.spark-sql.version}</version> <version>${org.apache.spark.spark-core.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${org.apache.spark.spark-sql.version}</version>
<scope>provided</scope>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
@ -33,6 +34,12 @@
<version>${org.apache.spark.spark-streaming.version}</version> <version>${org.apache.spark.spark-streaming.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${org.apache.spark.spark-mllib.version}</version>
<scope>provided</scope>
</dependency>
<dependency> <dependency>
<groupId>org.apache.spark</groupId> <groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
@ -48,46 +55,47 @@
<artifactId>spark-cassandra-connector-java_2.11</artifactId> <artifactId>spark-cassandra-connector-java_2.11</artifactId>
<version>${com.datastax.spark.spark-cassandra-connector-java.version}</version> <version>${com.datastax.spark.spark-cassandra-connector-java.version}</version>
</dependency> </dependency>
</dependencies> </dependencies>
<build> <build>
<plugins> <plugins>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId> <artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version> <version>${maven-compiler-plugin.version}</version>
<configuration> <configuration>
<source>${java.version}</source> <source>${java.version}</source>
<target>${java.version}</target> <target>${java.version}</target>
</configuration> </configuration>
</plugin> </plugin>
<plugin> <plugin>
<artifactId>maven-assembly-plugin</artifactId> <artifactId>maven-assembly-plugin</artifactId>
<executions> <executions>
<execution> <execution>
<phase>package</phase> <phase>package</phase>
<goals> <goals>
<goal>single</goal> <goal>single</goal>
</goals> </goals>
</execution> </execution>
</executions> </executions>
<configuration> <configuration>
<descriptorRefs> <descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef> <descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs> </descriptorRefs>
</configuration> </configuration>
</plugin> </plugin>
</plugins> </plugins>
</build> </build>
<properties> <properties>
<org.apache.spark.spark-core.version>2.3.0</org.apache.spark.spark-core.version> <org.apache.spark.spark-core.version>2.3.0</org.apache.spark.spark-core.version>
<org.apache.spark.spark-sql.version>2.3.0</org.apache.spark.spark-sql.version> <org.apache.spark.spark-sql.version>2.3.0</org.apache.spark.spark-sql.version>
<org.apache.spark.spark-streaming.version>2.3.0</org.apache.spark.spark-streaming.version> <org.apache.spark.spark-streaming.version>2.3.0</org.apache.spark.spark-streaming.version>
<org.apache.spark.spark-streaming-kafka.version>2.3.0</org.apache.spark.spark-streaming-kafka.version> <org.apache.spark.spark-mllib.version>2.3.0</org.apache.spark.spark-mllib.version>
<com.datastax.spark.spark-cassandra-connector.version>2.3.0</com.datastax.spark.spark-cassandra-connector.version> <org.apache.spark.spark-streaming-kafka.version>2.3.0</org.apache.spark.spark-streaming-kafka.version>
<com.datastax.spark.spark-cassandra-connector-java.version>1.5.2</com.datastax.spark.spark-cassandra-connector-java.version> <com.datastax.spark.spark-cassandra-connector.version>2.3.0</com.datastax.spark.spark-cassandra-connector.version>
<com.datastax.spark.spark-cassandra-connector-java.version>1.5.2</com.datastax.spark.spark-cassandra-connector-java.version>
<maven-compiler-plugin.version>3.2</maven-compiler-plugin.version> <maven-compiler-plugin.version>3.2</maven-compiler-plugin.version>
</properties> </properties>
</project> </project>

View File

@ -0,0 +1,111 @@
package com.baeldung.ml;
import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS;
import org.apache.spark.mllib.evaluation.MulticlassMetrics;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.stat.MultivariateStatisticalSummary;
import org.apache.spark.mllib.stat.Statistics;
import scala.Tuple2;
public class MachineLearningApp {
public static void main(String[] args) {
// 1. Setting the Spark Context
SparkConf conf = new SparkConf().setAppName("Main")
.setMaster("local[2]")
.set("spark.executor.memory", "3g")
.set("spark.driver.memory", "3g");
JavaSparkContext sc = new JavaSparkContext(conf);
Logger.getLogger("org")
.setLevel(Level.OFF);
Logger.getLogger("akka")
.setLevel(Level.OFF);
// 2. Loading the Data-set
String dataFile = "data\\iris.data";
JavaRDD<String> data = sc.textFile(dataFile);
// 3. Exploratory Data Analysis
// 3.1. Creating Vector of Input Data
JavaRDD<Vector> inputData = data.map(line -> {
String[] parts = line.split(",");
double[] v = new double[parts.length - 1];
for (int i = 0; i < parts.length - 1; i++) {
v[i] = Double.parseDouble(parts[i]);
}
return Vectors.dense(v);
});
// 3.2. Performing Statistical Analysis
MultivariateStatisticalSummary summary = Statistics.colStats(inputData.rdd());
System.out.println("Summary Mean:");
System.out.println(summary.mean());
System.out.println("Summary Variance:");
System.out.println(summary.variance());
System.out.println("Summary Non-zero:");
System.out.println(summary.numNonzeros());
// 3.3. Performing Correlation Analysis
Matrix correlMatrix = Statistics.corr(inputData.rdd(), "pearson");
System.out.println("Correlation Matrix:");
System.out.println(correlMatrix.toString());
// 4. Data Preparation
// 4.1. Creating Map for Textual Output Labels
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("Iris-setosa", 0);
map.put("Iris-versicolor", 1);
map.put("Iris-virginica", 2);
// 4.2. Creating LabeledPoint of Input and Output Data
JavaRDD<LabeledPoint> parsedData = data.map(line -> {
String[] parts = line.split(",");
double[] v = new double[parts.length - 1];
for (int i = 0; i < parts.length - 1; i++) {
v[i] = Double.parseDouble(parts[i]);
}
return new LabeledPoint(map.get(parts[parts.length - 1]), Vectors.dense(v));
});
// 5. Data Splitting into 80% Training and 20% Test Sets
JavaRDD<LabeledPoint>[] splits = parsedData.randomSplit(new double[] { 0.8, 0.2 }, 11L);
JavaRDD<LabeledPoint> trainingData = splits[0].cache();
JavaRDD<LabeledPoint> testData = splits[1];
// 6. Modeling
// 6.1. Model Training
LogisticRegressionModel model = new LogisticRegressionWithLBFGS().setNumClasses(3)
.run(trainingData.rdd());
// 6.2. Model Evaluation
JavaPairRDD<Object, Object> predictionAndLabels = testData.mapToPair(p -> new Tuple2<>(model.predict(p.features()), p.label()));
MulticlassMetrics metrics = new MulticlassMetrics(predictionAndLabels.rdd());
double accuracy = metrics.accuracy();
System.out.println("Model Accuracy on Test Data: " + accuracy);
// 7. Model Saving and Loading
// 7.1. Model Saving
model.save(sc.sc(), "model\\logistic-regression");
// 7.2. Model Loading
LogisticRegressionModel sameModel = LogisticRegressionModel.load(sc.sc(), "model\\logistic-regression");
// 7.3. Prediction on New Data
Vector newData = Vectors.dense(new double[] { 1, 1, 1, 1 });
double prediction = sameModel.predict(newData);
System.out.println("Model Prediction on New Data = " + prediction);
// 8. Clean-up
sc.close();
}
}