diff --git a/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java index 1155644e1e..db2a73b411 100644 --- a/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java +++ b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingApp.java @@ -14,13 +14,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.SparkConf; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; @@ -59,17 +53,17 @@ public class WordCountingApp { JavaInputDStream> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams)); - JavaPairDStream results = messages.mapToPair((PairFunction, String, String>) record -> new Tuple2<>(record.key(), record.value())); + JavaPairDStream results = messages.mapToPair(record -> new Tuple2<>(record.key(), record.value())); - JavaDStream lines = results.map((Function, String>) tuple2 -> tuple2._2()); + JavaDStream lines = results.map(tuple2 -> tuple2._2()); - JavaDStream words = lines.flatMap((FlatMapFunction) x -> Arrays.asList(x.split("\\s+")) + JavaDStream words = lines.flatMap(x -> Arrays.asList(x.split("\\s+")) .iterator()); - JavaPairDStream wordCounts = words.mapToPair((PairFunction) s -> new Tuple2<>(s, 1)) - .reduceByKey((Function2) (i1, i2) -> i1 + i2); + JavaPairDStream wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) + .reduceByKey((i1, i2) -> i1 + i2); - wordCounts.foreachRDD((VoidFunction>) javaRdd -> { + wordCounts.foreachRDD(javaRdd -> { Map wordCountMap = javaRdd.collectAsMap(); for (String key : wordCountMap.keySet()) { List wordList = Arrays.asList(new Word(key, wordCountMap.get(key))); diff --git a/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java index 79e21f7209..efbe5f3851 100644 --- a/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java +++ b/apache-spark/src/main/java/com/baeldung/data/pipeline/WordCountingAppWithCheckpoint.java @@ -16,15 +16,8 @@ import org.apache.log4j.Logger; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.Optional; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.Function3; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.streaming.Durations; -import org.apache.spark.streaming.State; import org.apache.spark.streaming.StateSpec; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; @@ -71,24 +64,24 @@ public class WordCountingAppWithCheckpoint { JavaInputDStream> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies. Subscribe(topics, kafkaParams)); - JavaPairDStream results = messages.mapToPair((PairFunction, String, String>) record -> new Tuple2<>(record.key(), record.value())); + JavaPairDStream results = messages.mapToPair(record -> new Tuple2<>(record.key(), record.value())); - JavaDStream lines = results.map((Function, String>) tuple2 -> tuple2._2()); + JavaDStream lines = results.map(tuple2 -> tuple2._2()); - JavaDStream words = lines.flatMap((FlatMapFunction) x -> Arrays.asList(x.split("\\s+")) + JavaDStream words = lines.flatMap(x -> Arrays.asList(x.split("\\s+")) .iterator()); - JavaPairDStream wordCounts = words.mapToPair((PairFunction) s -> new Tuple2<>(s, 1)) + JavaPairDStream wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) .reduceByKey((Function2) (i1, i2) -> i1 + i2); - JavaMapWithStateDStream> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function((Function3, State, Tuple2>) (word, one, state) -> { + JavaMapWithStateDStream> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function((word, one, state) -> { int sum = one.orElse(0) + (state.exists() ? state.get() : 0); Tuple2 output = new Tuple2<>(word, sum); state.update(sum); return output; })); - cumulativeWordCounts.foreachRDD((VoidFunction>>) javaRdd -> { + cumulativeWordCounts.foreachRDD(javaRdd -> { List> wordCountList = javaRdd.collect(); for (Tuple2 tuple : wordCountList) { List wordList = Arrays.asList(new Word(tuple._1, tuple._2));