Kafka spark cassandra (#6089)

* Adding files for the tutorial BAEL-2301

* Incorporating review comments on the article.

* Incorporated additional review comments on the article.
This commit is contained in:
Kumar Chandrakant 2019-01-10 02:20:46 +05:30 committed by Grzegorz Piwowarek
parent 128817cfeb
commit 70a7aaa4fc
2 changed files with 12 additions and 25 deletions

View File

@ -14,13 +14,7 @@ import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
@ -59,17 +53,17 @@ public class WordCountingApp {
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
JavaPairDStream<String, String> results = messages.mapToPair((PairFunction<ConsumerRecord<String, String>, String, String>) record -> new Tuple2<>(record.key(), record.value()));
JavaPairDStream<String, String> results = messages.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
JavaDStream<String> lines = results.map((Function<Tuple2<String, String>, String>) tuple2 -> tuple2._2());
JavaDStream<String> lines = results.map(tuple2 -> tuple2._2());
JavaDStream<String> words = lines.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split("\\s+"))
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split("\\s+"))
.iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1))
.reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> i1 + i2);
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((i1, i2) -> i1 + i2);
wordCounts.foreachRDD((VoidFunction<JavaPairRDD<String, Integer>>) javaRdd -> {
wordCounts.foreachRDD(javaRdd -> {
Map<String, Integer> wordCountMap = javaRdd.collectAsMap();
for (String key : wordCountMap.keySet()) {
List<Word> wordList = Arrays.asList(new Word(key, wordCountMap.get(key)));

View File

@ -16,15 +16,8 @@ import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
@ -71,24 +64,24 @@ public class WordCountingAppWithCheckpoint {
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
JavaPairDStream<String, String> results = messages.mapToPair((PairFunction<ConsumerRecord<String, String>, String, String>) record -> new Tuple2<>(record.key(), record.value()));
JavaPairDStream<String, String> results = messages.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
JavaDStream<String> lines = results.map((Function<Tuple2<String, String>, String>) tuple2 -> tuple2._2());
JavaDStream<String> lines = results.map(tuple2 -> tuple2._2());
JavaDStream<String> words = lines.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split("\\s+"))
JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split("\\s+"))
.iterator());
JavaPairDStream<String, Integer> wordCounts = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1))
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1))
.reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> i1 + i2);
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function((Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>) (word, one, state) -> {
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function((word, one, state) -> {
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}));
cumulativeWordCounts.foreachRDD((VoidFunction<JavaRDD<Tuple2<String, Integer>>>) javaRdd -> {
cumulativeWordCounts.foreachRDD(javaRdd -> {
List<Tuple2<String, Integer>> wordCountList = javaRdd.collect();
for (Tuple2<String, Integer> tuple : wordCountList) {
List<Word> wordList = Arrays.asList(new Word(tuple._1, tuple._2));