From 4ec4fdb72bfc402fe1c3c1ea9dde8c59eced9f92 Mon Sep 17 00:00:00 2001 From: Wosin Date: Fri, 8 Mar 2019 23:20:37 +0100 Subject: [PATCH] Bael 43 (#6423) * BAEL-43: Apache Spark with Spring Cloud Data Flow * Replaced functions with lambdas + removed unnecessary type casting. --- .../apache-spark-job/pom.xml | 36 +++++++++++++++++ .../spring/cloud/PiApproximation.java | 39 +++++++++++++++++++ spring-cloud-data-flow/pom.xml | 1 + 3 files changed, 76 insertions(+) create mode 100644 spring-cloud-data-flow/apache-spark-job/pom.xml create mode 100644 spring-cloud-data-flow/apache-spark-job/src/main/java/com/baeldung/spring/cloud/PiApproximation.java diff --git a/spring-cloud-data-flow/apache-spark-job/pom.xml b/spring-cloud-data-flow/apache-spark-job/pom.xml new file mode 100644 index 0000000000..390b7ebdc6 --- /dev/null +++ b/spring-cloud-data-flow/apache-spark-job/pom.xml @@ -0,0 +1,36 @@ + + + + spring-cloud-data-flow + com.baeldung + 0.0.1-SNAPSHOT + + 4.0.0 + + apache-spark-job + + + org.springframework.cloud + spring-cloud-task-core + 2.0.0.RELEASE + + + org.apache.spark + spark-core_2.10 + 1.6.2 + + + log4j + log4j + + + org.slf4j + slf4j-log4j12 + + + + + + \ No newline at end of file diff --git a/spring-cloud-data-flow/apache-spark-job/src/main/java/com/baeldung/spring/cloud/PiApproximation.java b/spring-cloud-data-flow/apache-spark-job/src/main/java/com/baeldung/spring/cloud/PiApproximation.java new file mode 100644 index 0000000000..dfead21728 --- /dev/null +++ b/spring-cloud-data-flow/apache-spark-job/src/main/java/com/baeldung/spring/cloud/PiApproximation.java @@ -0,0 +1,39 @@ +package com.baeldung.spring.cloud; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.rdd.RDD; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class PiApproximation { + public static void main(String[] args) { + SparkConf conf = new SparkConf().setAppName("BaeldungPIApproximation"); + JavaSparkContext context = new JavaSparkContext(conf); + int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2; + int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices; + + List xs = IntStream.rangeClosed(0, n) + .mapToObj(element -> Integer.valueOf(element)) + .collect(Collectors.toList()); + + JavaRDD dataSet = context.parallelize(xs, slices); + + JavaRDD pointsInsideTheCircle = dataSet.map(integer -> { + double x = Math.random() * 2 - 1; + double y = Math.random() * 2 - 1; + return (x*x + y*y ) < 1 ? 1: 0; + }); + + int count = pointsInsideTheCircle.reduce((integer, integer2) -> integer + integer2); + + System.out.println("The pi was estimated as:" + count / n); + + context.stop(); + } +} diff --git a/spring-cloud-data-flow/pom.xml b/spring-cloud-data-flow/pom.xml index daf8ebd2c8..36a780c28d 100644 --- a/spring-cloud-data-flow/pom.xml +++ b/spring-cloud-data-flow/pom.xml @@ -20,6 +20,7 @@ log-sink batch-job etl + apache-spark-job