Kafka spark cassandra (#6078)
* Adding files for the tutorial BAEL-2301 * Incorporating review comments on the article.
This commit is contained in:
parent
361bc4b6ea
commit
20e8886165
|
@ -6,7 +6,6 @@ import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -35,7 +34,6 @@ import scala.Tuple2;
|
||||||
|
|
||||||
public class WordCountingApp {
|
public class WordCountingApp {
|
||||||
|
|
||||||
@SuppressWarnings("serial")
|
|
||||||
public static void main(String[] args) throws InterruptedException {
|
public static void main(String[] args) throws InterruptedException {
|
||||||
Logger.getLogger("org")
|
Logger.getLogger("org")
|
||||||
.setLevel(Level.OFF);
|
.setLevel(Level.OFF);
|
||||||
|
@ -61,52 +59,24 @@ public class WordCountingApp {
|
||||||
|
|
||||||
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
|
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
|
||||||
|
|
||||||
JavaPairDStream<String, String> results = messages.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
|
JavaPairDStream<String, String> results = messages.mapToPair((PairFunction<ConsumerRecord<String, String>, String, String>) record -> new Tuple2<>(record.key(), record.value()));
|
||||||
@Override
|
|
||||||
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
|
|
||||||
return new Tuple2<>(record.key(), record.value());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
JavaDStream<String> lines = results.map(new Function<Tuple2<String, String>, String>() {
|
JavaDStream<String> lines = results.map((Function<Tuple2<String, String>, String>) tuple2 -> tuple2._2());
|
||||||
@Override
|
|
||||||
public String call(Tuple2<String, String> tuple2) {
|
|
||||||
return tuple2._2();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
|
JavaDStream<String> words = lines.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split("\\s+"))
|
||||||
@Override
|
.iterator());
|
||||||
public Iterator<String> call(String x) {
|
|
||||||
return Arrays.asList(x.split("\\s+"))
|
|
||||||
.iterator();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {
|
JavaPairDStream<String, Integer> wordCounts = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1))
|
||||||
@Override
|
.reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> i1 + i2);
|
||||||
public Tuple2<String, Integer> call(String s) {
|
|
||||||
return new Tuple2<>(s, 1);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.reduceByKey(new Function2<Integer, Integer, Integer>() {
|
|
||||||
@Override
|
|
||||||
public Integer call(Integer i1, Integer i2) {
|
|
||||||
return i1 + i2;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
wordCounts.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
|
wordCounts.foreachRDD((VoidFunction<JavaPairRDD<String, Integer>>) javaRdd -> {
|
||||||
@Override
|
Map<String, Integer> wordCountMap = javaRdd.collectAsMap();
|
||||||
public void call(JavaPairRDD<String, Integer> javaRdd) throws Exception {
|
for (String key : wordCountMap.keySet()) {
|
||||||
Map<String, Integer> wordCountMap = javaRdd.collectAsMap();
|
List<Word> wordList = Arrays.asList(new Word(key, wordCountMap.get(key)));
|
||||||
for (String key : wordCountMap.keySet()) {
|
JavaRDD<Word> rdd = streamingContext.sparkContext()
|
||||||
List<Word> words = Arrays.asList(new Word(key, wordCountMap.get(key)));
|
.parallelize(wordList);
|
||||||
JavaRDD<Word> rdd = streamingContext.sparkContext()
|
javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class))
|
||||||
.parallelize(words);
|
.saveToCassandra();
|
||||||
javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class))
|
|
||||||
.saveToCassandra();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,6 @@ import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -15,7 +14,6 @@ import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.spark.SparkConf;
|
import org.apache.spark.SparkConf;
|
||||||
import org.apache.spark.api.java.JavaPairRDD;
|
|
||||||
import org.apache.spark.api.java.JavaRDD;
|
import org.apache.spark.api.java.JavaRDD;
|
||||||
import org.apache.spark.api.java.JavaSparkContext;
|
import org.apache.spark.api.java.JavaSparkContext;
|
||||||
import org.apache.spark.api.java.Optional;
|
import org.apache.spark.api.java.Optional;
|
||||||
|
@ -43,7 +41,6 @@ public class WordCountingAppWithCheckpoint {
|
||||||
|
|
||||||
public static JavaSparkContext sparkContext;
|
public static JavaSparkContext sparkContext;
|
||||||
|
|
||||||
@SuppressWarnings("serial")
|
|
||||||
public static void main(String[] args) throws InterruptedException {
|
public static void main(String[] args) throws InterruptedException {
|
||||||
|
|
||||||
Logger.getLogger("org")
|
Logger.getLogger("org")
|
||||||
|
@ -74,63 +71,30 @@ public class WordCountingAppWithCheckpoint {
|
||||||
|
|
||||||
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
|
JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String> Subscribe(topics, kafkaParams));
|
||||||
|
|
||||||
JavaPairDStream<String, String> results = messages.mapToPair(new PairFunction<ConsumerRecord<String, String>, String, String>() {
|
JavaPairDStream<String, String> results = messages.mapToPair((PairFunction<ConsumerRecord<String, String>, String, String>) record -> new Tuple2<>(record.key(), record.value()));
|
||||||
@Override
|
|
||||||
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
|
|
||||||
return new Tuple2<>(record.key(), record.value());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
JavaDStream<String> lines = results.map(new Function<Tuple2<String, String>, String>() {
|
JavaDStream<String> lines = results.map((Function<Tuple2<String, String>, String>) tuple2 -> tuple2._2());
|
||||||
@Override
|
|
||||||
public String call(Tuple2<String, String> tuple2) {
|
|
||||||
return tuple2._2();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
|
JavaDStream<String> words = lines.flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split("\\s+"))
|
||||||
@Override
|
.iterator());
|
||||||
public Iterator<String> call(String x) {
|
|
||||||
return Arrays.asList(x.split("\\s+"))
|
|
||||||
.iterator();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {
|
JavaPairDStream<String, Integer> wordCounts = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1))
|
||||||
@Override
|
.reduceByKey((Function2<Integer, Integer, Integer>) (i1, i2) -> i1 + i2);
|
||||||
public Tuple2<String, Integer> call(String s) {
|
|
||||||
return new Tuple2<>(s, 1);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.reduceByKey(new Function2<Integer, Integer, Integer>() {
|
|
||||||
@Override
|
|
||||||
public Integer call(Integer i1, Integer i2) {
|
|
||||||
return i1 + i2;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc = (word, one, state) -> {
|
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function((Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>) (word, one, state) -> {
|
||||||
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
|
int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
|
||||||
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
|
Tuple2<String, Integer> output = new Tuple2<>(word, sum);
|
||||||
state.update(sum);
|
state.update(sum);
|
||||||
return output;
|
return output;
|
||||||
};
|
}));
|
||||||
|
|
||||||
JavaPairRDD<String, Integer> initialRDD = JavaPairRDD.fromJavaRDD(sparkContext.emptyRDD());
|
cumulativeWordCounts.foreachRDD((VoidFunction<JavaRDD<Tuple2<String, Integer>>>) javaRdd -> {
|
||||||
|
List<Tuple2<String, Integer>> wordCountList = javaRdd.collect();
|
||||||
JavaMapWithStateDStream<String, Integer, Integer, Tuple2<String, Integer>> cumulativeWordCounts = wordCounts.mapWithState(StateSpec.function(mappingFunc)
|
for (Tuple2<String, Integer> tuple : wordCountList) {
|
||||||
.initialState(initialRDD));
|
List<Word> wordList = Arrays.asList(new Word(tuple._1, tuple._2));
|
||||||
|
JavaRDD<Word> rdd = sparkContext.parallelize(wordList);
|
||||||
cumulativeWordCounts.foreachRDD(new VoidFunction<JavaRDD<Tuple2<String, Integer>>>() {
|
javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class))
|
||||||
@Override
|
.saveToCassandra();
|
||||||
public void call(JavaRDD<Tuple2<String, Integer>> javaRdd) throws Exception {
|
|
||||||
List<Tuple2<String, Integer>> wordCountList = javaRdd.collect();
|
|
||||||
for (Tuple2<String, Integer> tuple : wordCountList) {
|
|
||||||
List<Word> words = Arrays.asList(new Word(tuple._1, tuple._2));
|
|
||||||
JavaRDD<Word> rdd = sparkContext.parallelize(words);
|
|
||||||
javaFunctions(rdd).writerBuilder("vocabulary", "words", mapToRow(Word.class))
|
|
||||||
.saveToCassandra();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue