From 20e8886165192cd55e5151120b07ff0029242e28 Mon Sep 17 00:00:00 2001 From: Kumar Chandrakant Date: Sun, 6 Jan 2019 19:26:21 +0530 Subject: [PATCH] Kafka spark cassandra (#6078) * Adding files for the tutorial BAEL-2301 * Incorporating review comments on the article. --- .../data/pipeline/WordCountingApp.java | 58 ++++------------ .../WordCountingAppWithCheckpoint.java | 66 +++++-------------- 2 files changed, 29 insertions(+), 95 deletions(-) diff --git a/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java index 08695b3631..1155644e1e 100644 --- a/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java +++ b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java @@ -6,7 +6,6 @@ import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -35,7 +34,6 @@ import scala.Tuple2; public class WordCountingApp { - @SuppressWarnings("serial") public static void main(String[] args) throws InterruptedException { Logger.getLogger("org") .setLevel(Level.OFF); @@ -61,52 +59,24 @@ public class WordCountingApp { JavaInputDStream> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams)); - JavaPairDStream results = messages.mapToPair(new PairFunction, String, String>() { - @Override - public Tuple2 call(ConsumerRecord record) { - return new Tuple2<>(record.key(), record.value()); - } - }); + JavaPairDStream results = messages.mapToPair((PairFunction, String, String>) record -> new Tuple2<>(record.key(), record.value())); - JavaDStream lines = results.map(new Function, String>() { - @Override - public String call(Tuple2 tuple2) { - return tuple2._2(); - } - }); + JavaDStream lines = results.map((Function, String>) tuple2 -> tuple2._2()); - JavaDStream words = lines.flatMap(new FlatMapFunction() { - @Override - public Iterator call(String x) { - return Arrays.asList(x.split("\\s+")) - .iterator(); - } - }); + JavaDStream words = lines.flatMap((FlatMapFunction) x -> Arrays.asList(x.split("\\s+")) + .iterator()); - JavaPairDStream wordCounts = words.mapToPair(new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2<>(s, 1); - } - }) - .reduceByKey(new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + JavaPairDStream wordCounts = words.mapToPair((PairFunction) s -> new Tuple2<>(s, 1)) + .reduceByKey((Function2) (i1, i2) -> i1 + i2); - wordCounts.foreachRDD(new VoidFunction>() { - @Override - public void call(JavaPairRDD javaRdd) throws Exception { - Map wordCountMap = javaRdd.collectAsMap(); - for (String key : wordCountMap.keySet()) { - List words = Arrays.asList(new Word(key, wordCountMap.get(key))); - JavaRDD rdd = streamingContext.sparkContext() - .parallelize(words); - javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class)) - .saveToCassandra(); - } + wordCounts.foreachRDD((VoidFunction>) javaRdd -> { + Map wordCountMap = javaRdd.collectAsMap(); + for (String key : wordCountMap.keySet()) { + List wordList = Arrays.asList(new Word(key, wordCountMap.get(key))); + JavaRDD rdd = streamingContext.sparkContext() + .parallelize(wordList); + javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class)) + .saveToCassandra(); } }); diff --git a/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java index e20b910635..79e21f7209 100644 --- a/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java +++ b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java @@ -6,7 +6,6 @@ import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -15,7 +14,6 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.Optional; @@ -43,7 +41,6 @@ public class WordCountingAppWithCheckpoint { public static JavaSparkContext sparkContext; - @SuppressWarnings("serial") public static void main(String[] args) throws InterruptedException { Logger.getLogger("org") @@ -74,63 +71,30 @@ public class WordCountingAppWithCheckpoint { JavaInputDStream> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams)); - JavaPairDStream results = messages.mapToPair(new PairFunction, String, String>() { - @Override - public Tuple2 call(ConsumerRecord record) { - return new Tuple2<>(record.key(), record.value()); - } - }); + JavaPairDStream results = messages.mapToPair((PairFunction, String, String>) record -> new Tuple2<>(record.key(), record.value())); - JavaDStream lines = results.map(new Function, String>() { - @Override - public String call(Tuple2 tuple2) { - return tuple2._2(); - } - }); + JavaDStream lines = results.map((Function, String>) tuple2 -> tuple2._2()); - JavaDStream words = lines.flatMap(new FlatMapFunction() { - @Override - public Iterator call(String x) { - return Arrays.asList(x.split("\\s+")) - .iterator(); - } - }); + JavaDStream words = lines.flatMap((FlatMapFunction) x -> Arrays.asList(x.split("\\s+")) + .iterator()); - JavaPairDStream wordCounts = words.mapToPair(new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2<>(s, 1); - } - }) - .reduceByKey(new Function2() { - @Override - public Integer call(Integer i1, Integer i2) { - return i1 + i2; - } - }); + JavaPairDStream wordCounts = words.mapToPair((PairFunction) s -> new Tuple2<>(s, 1)) + .reduceByKey((Function2) (i1, i2) -> i1 + i2); - Function3, State, Tuple2> mappingFunc = (word, one, state) -> { + JavaMapWithStateDStream> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function((Function3, State, Tuple2>) (word, one, state) -> { int sum = one.orElse(0) + (state.exists() ? state.get() : 0); Tuple2 output = new Tuple2<>(word, sum); state.update(sum); return output; - }; + })); - JavaPairRDD initialRDD = JavaPairRDD.fromJavaRDD(sparkContext.emptyRDD()); - - JavaMapWithStateDStream> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function(mappingFunc) - .initialState(initialRDD)); - - cumulativeWordCounts.foreachRDD(new VoidFunction>>() { - @Override - public void call(JavaRDD> javaRdd) throws Exception { - List> wordCountList = javaRdd.collect(); - for (Tuple2 tuple : wordCountList) { - List words = Arrays.asList(new Word(tuple._1, tuple._2)); - JavaRDD rdd = sparkContext.parallelize(words); - javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class)) - .saveToCassandra(); - } + cumulativeWordCounts.foreachRDD((VoidFunction>>) javaRdd -> { + List> wordCountList = javaRdd.collect(); + for (Tuple2 tuple : wordCountList) { + List wordList = Arrays.asList(new Word(tuple._1, tuple._2)); + JavaRDD rdd = sparkContext.parallelize(wordList); + javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class)) + .saveToCassandra(); } });