From 945932fa1d96bcfe576ce92fe4d8b61ba1b3c04b Mon Sep 17 00:00:00 2001 From: Chirag Dewan Date: Sun, 22 Jul 2018 17:56:20 +0530 Subject: [PATCH 1/8] Real Time Event Streaming with Spring Webflux --- .../EventStreamingApplication.java | 38 +++++++++++++++++++ .../eventstreaming/EventWebSocketClient.java | 24 ++++++++++++ .../eventstreaming/EventWebSocketHandler.java | 27 +++++++++++++ 3 files changed, 89 insertions(+) create mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventStreamingApplication.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketClient.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketHandler.java diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventStreamingApplication.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventStreamingApplication.java new file mode 100644 index 0000000000..5deff35fa7 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventStreamingApplication.java @@ -0,0 +1,38 @@ +package com.baeldung.reactive.webflux.eventstreaming; + +import java.util.HashMap; +import java.util.Map; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.web.reactive.HandlerMapping; +import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; + +@SpringBootApplication +public class EventStreamingApplication { + + public static void main(String[] args) { + SpringApplication.run(EventStreamingApplication.class, args); + } + + + @Bean + public HandlerMapping webSocketMapping() { + Map map = new HashMap<>(); + map.put("/events", new EventWebSocketHandler()); + + SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping(); + mapping.setUrlMap(map); + mapping.setOrder(1); + return mapping; + } + + @Bean + public WebSocketHandlerAdapter handlerAdapter() { + return new WebSocketHandlerAdapter(); + } + +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketClient.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketClient.java new file mode 100644 index 0000000000..3fa4027449 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketClient.java @@ -0,0 +1,24 @@ +package com.baeldung.reactive.webflux.eventstreaming; + +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; + +import org.springframework.web.reactive.socket.WebSocketMessage; +import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; +import org.springframework.web.reactive.socket.client.WebSocketClient; + +public class EventWebSocketClient { + + public static void main(String[] args) throws URISyntaxException { + + WebSocketClient client = new ReactorNettyWebSocketClient(); + + client.execute( + URI.create("ws://localhost:8080/events"), + session -> session.receive() + .map(WebSocketMessage::getPayloadAsText) + .log().then()) + .block(Duration.ofMinutes(10L)); + } +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketHandler.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketHandler.java new file mode 100644 index 0000000000..37deaf3d1b --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketHandler.java @@ -0,0 +1,27 @@ +package com.baeldung.reactive.webflux.eventstreaming; + +import java.text.MessageFormat; +import java.time.Duration; +import java.time.LocalTime; +import java.util.UUID; + +import org.springframework.web.reactive.socket.WebSocketHandler; +import org.springframework.web.reactive.socket.WebSocketSession; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class EventWebSocketHandler implements WebSocketHandler { + + MessageFormat mf = new MessageFormat("EventID: {0} , Event Time: {1}") ; + + @Override + public Mono handle(WebSocketSession session) { + return session.send( + Flux. generate(sink -> sink.next( + mf.format(new Object[] {UUID.randomUUID(),LocalTime.now()}))) + .delayElements(Duration.ofSeconds(1)) + .map(session::textMessage)); + } + +} From f95092d90a24b6629798bf99a1811a5dd192fc19 Mon Sep 17 00:00:00 2001 From: Chirag Dewan Date: Mon, 23 Jul 2018 21:10:03 +0530 Subject: [PATCH 2/8] Real Time Event Streaming with Spring Webflux --- .../webflux/eventstreaming/EventWebSocketClient.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketClient.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketClient.java index 3fa4027449..357a5677c1 100644 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketClient.java +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketClient.java @@ -10,12 +10,14 @@ import org.springframework.web.reactive.socket.client.WebSocketClient; public class EventWebSocketClient { - public static void main(String[] args) throws URISyntaxException { + private static final String REMOTE_URL = "ws://localhost:8080/events"; + + public static void main(String[] args) throws URISyntaxException { WebSocketClient client = new ReactorNettyWebSocketClient(); client.execute( - URI.create("ws://localhost:8080/events"), + URI.create(REMOTE_URL), session -> session.receive() .map(WebSocketMessage::getPayloadAsText) .log().then()) From 216715010be058330a08cdb5946abdec9c864ed5 Mon Sep 17 00:00:00 2001 From: Chirag Dewan Date: Wed, 8 Aug 2018 22:19:00 +0530 Subject: [PATCH 3/8] Adding Hello world Servlet and HTML impl --- .../com/baeldung/jetty/HelloWorldServlet.java | 28 +++++++++++++++++++ libraries/src/main/webapp/WEB-INF/web.xml | 10 +++++++ libraries/src/main/webapp/helloworld.html | 9 ++++++ 3 files changed, 47 insertions(+) create mode 100644 libraries/src/main/java/com/baeldung/jetty/HelloWorldServlet.java create mode 100644 libraries/src/main/webapp/WEB-INF/web.xml create mode 100644 libraries/src/main/webapp/helloworld.html diff --git a/libraries/src/main/java/com/baeldung/jetty/HelloWorldServlet.java b/libraries/src/main/java/com/baeldung/jetty/HelloWorldServlet.java new file mode 100644 index 0000000000..d730e70ad2 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/jetty/HelloWorldServlet.java @@ -0,0 +1,28 @@ +package com.baeldung.jetty; + +import java.io.IOException; +import java.io.PrintWriter; + +import javax.servlet.ServletException; +import javax.servlet.annotation.WebServlet; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +@WebServlet("/helloworld") +public class HelloWorldServlet extends HttpServlet { + + private static final long serialVersionUID = 2851388791344172542L; + + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { + + PrintWriter writer = resp.getWriter(); + writer.println(""); + writer.println(""); + writer.println(""); + writer.println("

Hello World!

"); + writer.println(""); + writer.println(""); + } +} diff --git a/libraries/src/main/webapp/WEB-INF/web.xml b/libraries/src/main/webapp/WEB-INF/web.xml new file mode 100644 index 0000000000..364bcd9587 --- /dev/null +++ b/libraries/src/main/webapp/WEB-INF/web.xml @@ -0,0 +1,10 @@ + + + + helloworld.html + + \ No newline at end of file diff --git a/libraries/src/main/webapp/helloworld.html b/libraries/src/main/webapp/helloworld.html new file mode 100644 index 0000000000..bfc75f2592 --- /dev/null +++ b/libraries/src/main/webapp/helloworld.html @@ -0,0 +1,9 @@ + + + + +Hello World + +Hello World! + + \ No newline at end of file From 99b800c5b914c19aae5ec6f8784d7335a82e6a82 Mon Sep 17 00:00:00 2001 From: Chirag Dewan Date: Wed, 8 Aug 2018 22:28:28 +0530 Subject: [PATCH 4/8] Removing unmerged files from demo article --- .../EventStreamingApplication.java | 38 ------------------- .../eventstreaming/EventWebSocketClient.java | 26 ------------- .../eventstreaming/EventWebSocketHandler.java | 27 ------------- 3 files changed, 91 deletions(-) delete mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventStreamingApplication.java delete mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketClient.java delete mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketHandler.java diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventStreamingApplication.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventStreamingApplication.java deleted file mode 100644 index 5deff35fa7..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventStreamingApplication.java +++ /dev/null @@ -1,38 +0,0 @@ -package com.baeldung.reactive.webflux.eventstreaming; - -import java.util.HashMap; -import java.util.Map; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.context.annotation.Bean; -import org.springframework.web.reactive.HandlerMapping; -import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping; -import org.springframework.web.reactive.socket.WebSocketHandler; -import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter; - -@SpringBootApplication -public class EventStreamingApplication { - - public static void main(String[] args) { - SpringApplication.run(EventStreamingApplication.class, args); - } - - - @Bean - public HandlerMapping webSocketMapping() { - Map map = new HashMap<>(); - map.put("/events", new EventWebSocketHandler()); - - SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping(); - mapping.setUrlMap(map); - mapping.setOrder(1); - return mapping; - } - - @Bean - public WebSocketHandlerAdapter handlerAdapter() { - return new WebSocketHandlerAdapter(); - } - -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketClient.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketClient.java deleted file mode 100644 index 357a5677c1..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketClient.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.baeldung.reactive.webflux.eventstreaming; - -import java.net.URI; -import java.net.URISyntaxException; -import java.time.Duration; - -import org.springframework.web.reactive.socket.WebSocketMessage; -import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient; -import org.springframework.web.reactive.socket.client.WebSocketClient; - -public class EventWebSocketClient { - - private static final String REMOTE_URL = "ws://localhost:8080/events"; - - public static void main(String[] args) throws URISyntaxException { - - WebSocketClient client = new ReactorNettyWebSocketClient(); - - client.execute( - URI.create(REMOTE_URL), - session -> session.receive() - .map(WebSocketMessage::getPayloadAsText) - .log().then()) - .block(Duration.ofMinutes(10L)); - } -} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketHandler.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketHandler.java deleted file mode 100644 index 37deaf3d1b..0000000000 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/webflux/eventstreaming/EventWebSocketHandler.java +++ /dev/null @@ -1,27 +0,0 @@ -package com.baeldung.reactive.webflux.eventstreaming; - -import java.text.MessageFormat; -import java.time.Duration; -import java.time.LocalTime; -import java.util.UUID; - -import org.springframework.web.reactive.socket.WebSocketHandler; -import org.springframework.web.reactive.socket.WebSocketSession; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - -public class EventWebSocketHandler implements WebSocketHandler { - - MessageFormat mf = new MessageFormat("EventID: {0} , Event Time: {1}") ; - - @Override - public Mono handle(WebSocketSession session) { - return session.send( - Flux. generate(sink -> sink.next( - mf.format(new Object[] {UUID.randomUUID(),LocalTime.now()}))) - .delayElements(Duration.ofSeconds(1)) - .map(session::textMessage)); - } - -} From 443df74d09f4a1971753996a6a395458dc57c2a8 Mon Sep 17 00:00:00 2001 From: Chirag Dewan Date: Thu, 6 Sep 2018 23:19:26 +0530 Subject: [PATCH 5/8] BAEL1426 - Kafka Transactional API. Updating the kafka-clients version to 2.0.0 --- libraries/pom.xml | 41 ++++++- .../com/baeldung/kafka/TransactionalApp.java | 102 ++++++++++++++++++ 2 files changed, 140 insertions(+), 3 deletions(-) create mode 100644 libraries/src/main/java/com/baeldung/kafka/TransactionalApp.java diff --git a/libraries/pom.xml b/libraries/pom.xml index 7402d88ef3..83d78af84f 100644 --- a/libraries/pom.xml +++ b/libraries/pom.xml @@ -795,8 +795,8 @@ http://dl.bintray.com/cuba-platform/main - Apache Staging - https://repository.apache.org/content/groups/staging + Maven Central + https://repo.maven.apache.org/maven2/ @@ -912,6 +912,41 @@ + + + + + org.eclipse.m2e + lifecycle-mapping + 1.0.0 + + + + + + + org.apache.maven.plugins + + + maven-pmd-plugin + + + [3.8,) + + + check + + + + + + + + + + + + @@ -974,7 +1009,7 @@ 2.5.5 1.23.0 v4-rev493-1.21.0 - 1.0.0 + 2.0.0 1.7.0 3.0.14 8.5.24 diff --git a/libraries/src/main/java/com/baeldung/kafka/TransactionalApp.java b/libraries/src/main/java/com/baeldung/kafka/TransactionalApp.java new file mode 100644 index 0000000000..1e95041a0d --- /dev/null +++ b/libraries/src/main/java/com/baeldung/kafka/TransactionalApp.java @@ -0,0 +1,102 @@ +package com.baeldung.kafka; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static java.time.Duration.ofSeconds; +import static java.util.Collections.singleton; +import static org.apache.kafka.clients.consumer.ConsumerConfig.*; +import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.*; + +public class TransactionalApp { + + private static final String CONSUMER_GROUP_ID = "test"; + private static final String OUTPUT_TOPIC = "output"; + private static final String INPUT_TOPIC = "input"; + + public static void main(String[] args) { + + KafkaConsumer consumer = initConsumer(); + KafkaProducer producer = initProducer(); + + producer.initTransactions(); + + try { + + while (true) { + + ConsumerRecords records = consumer.poll(ofSeconds(20)); + + producer.beginTransaction(); + + for (ConsumerRecord record : records) + producer.send(new ProducerRecord(OUTPUT_TOPIC, record)); + + Map offsetsToCommit = new HashMap<>(); + + for (TopicPartition partition : records.partitions()) { + List> partitionedRecords = records.records(partition); + long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset(); + + offsetsToCommit.put(partition, new OffsetAndMetadata(offset)); + } + + producer.sendOffsetsToTransaction(offsetsToCommit, CONSUMER_GROUP_ID); + producer.commitTransaction(); + + } + + } catch (KafkaException e) { + + producer.abortTransaction(); + + } + + + } + + private static KafkaConsumer initConsumer() { + Properties props = new Properties(); + props.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(GROUP_ID_CONFIG, CONSUMER_GROUP_ID); + props.put(ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(singleton(INPUT_TOPIC)); + return consumer; + } + + private static KafkaProducer initProducer() { + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ACKS_CONFIG, "all"); + props.put(RETRIES_CONFIG, 3); + props.put(BATCH_SIZE_CONFIG, 16384); + props.put(LINGER_MS_CONFIG, 1); + props.put(BUFFER_MEMORY_CONFIG, 33554432); + props.put(ENABLE_IDEMPOTENCE_CONFIG, "true"); + props.put(TRANSACTIONAL_ID_CONFIG, "prod-1"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + + return new KafkaProducer(props); + + } + +} From a76e5fba254aacc37c291014a46fb9921f5df058 Mon Sep 17 00:00:00 2001 From: Chirag Dewan Date: Fri, 7 Sep 2018 21:44:49 +0530 Subject: [PATCH 6/8] BAEL1426 - Kafka Transactional API. Updating the kafka-clients version to 2.0.0 --- libraries/pom.xml | 39 +------------------ .../com/baeldung/jetty/HelloWorldServlet.java | 28 ------------- libraries/src/main/webapp/WEB-INF/web.xml | 10 ----- libraries/src/main/webapp/helloworld.html | 9 ----- 4 files changed, 2 insertions(+), 84 deletions(-) delete mode 100644 libraries/src/main/java/com/baeldung/jetty/HelloWorldServlet.java delete mode 100644 libraries/src/main/webapp/WEB-INF/web.xml delete mode 100644 libraries/src/main/webapp/helloworld.html diff --git a/libraries/pom.xml b/libraries/pom.xml index 83d78af84f..cff67791cd 100644 --- a/libraries/pom.xml +++ b/libraries/pom.xml @@ -795,8 +795,8 @@ http://dl.bintray.com/cuba-platform/main - Maven Central - https://repo.maven.apache.org/maven2/ + Apache Staging + https://repository.apache.org/content/groups/staging @@ -912,41 +912,6 @@ - - - - - org.eclipse.m2e - lifecycle-mapping - 1.0.0 - - - - - - - org.apache.maven.plugins - - - maven-pmd-plugin - - - [3.8,) - - - check - - - - - - - - - - - - diff --git a/libraries/src/main/java/com/baeldung/jetty/HelloWorldServlet.java b/libraries/src/main/java/com/baeldung/jetty/HelloWorldServlet.java deleted file mode 100644 index d730e70ad2..0000000000 --- a/libraries/src/main/java/com/baeldung/jetty/HelloWorldServlet.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.baeldung.jetty; - -import java.io.IOException; -import java.io.PrintWriter; - -import javax.servlet.ServletException; -import javax.servlet.annotation.WebServlet; -import javax.servlet.http.HttpServlet; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; - -@WebServlet("/helloworld") -public class HelloWorldServlet extends HttpServlet { - - private static final long serialVersionUID = 2851388791344172542L; - - @Override - protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { - - PrintWriter writer = resp.getWriter(); - writer.println(""); - writer.println(""); - writer.println(""); - writer.println("

Hello World!

"); - writer.println(""); - writer.println(""); - } -} diff --git a/libraries/src/main/webapp/WEB-INF/web.xml b/libraries/src/main/webapp/WEB-INF/web.xml deleted file mode 100644 index 364bcd9587..0000000000 --- a/libraries/src/main/webapp/WEB-INF/web.xml +++ /dev/null @@ -1,10 +0,0 @@ - - - - helloworld.html - - \ No newline at end of file diff --git a/libraries/src/main/webapp/helloworld.html b/libraries/src/main/webapp/helloworld.html deleted file mode 100644 index bfc75f2592..0000000000 --- a/libraries/src/main/webapp/helloworld.html +++ /dev/null @@ -1,9 +0,0 @@ - - - - -Hello World - -Hello World! - - \ No newline at end of file From 6f2d31b26a2f45eaeea2521752574b7a82dfebf6 Mon Sep 17 00:00:00 2001 From: Chirag Dewan Date: Sun, 9 Sep 2018 17:26:53 +0530 Subject: [PATCH 7/8] BAEL-1426 Updating an existing test case due to migration to Kafka2.0 --- .../kafkastreams/KafkaStreamsLiveTest.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java b/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java index 4406494d30..e61f4158a7 100644 --- a/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java +++ b/libraries/src/test/java/com/baeldung/kafkastreams/KafkaStreamsLiveTest.java @@ -4,10 +4,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.test.TestUtils; import org.junit.Ignore; import org.junit.Test; @@ -36,20 +38,20 @@ public class KafkaStreamsLiveTest { streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath()); // when - KStreamBuilder builder = new KStreamBuilder(); + StreamsBuilder builder = new StreamsBuilder(); KStream textLines = builder.stream(inputTopic); Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); KTable wordCounts = textLines.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))).groupBy((key, word) -> word).count(); - wordCounts.foreach((word, count) -> System.out.println("word: " + word + " -> " + count)); + textLines.foreach((word, count) -> System.out.println("word: " + word + " -> " + count)); String outputTopic = "outputTopic"; final Serde stringSerde = Serdes.String(); - final Serde longSerde = Serdes.Long(); - wordCounts.to(stringSerde, longSerde, outputTopic); + final Serde longSerde = Serdes.String(); + textLines.to(outputTopic, Produced.with(stringSerde,longSerde)); - KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + KafkaStreams streams = new KafkaStreams(new Topology(), streamsConfiguration); streams.start(); // then From 6009c4f9100078b2fb347d665e08a964c0572a8f Mon Sep 17 00:00:00 2001 From: daoire Date: Wed, 12 Sep 2018 20:35:59 +0100 Subject: [PATCH 8/8] Updates to Double to string --- .../decimalformat/DoubletoString.java | 45 ++++++++----------- 1 file changed, 18 insertions(+), 27 deletions(-) diff --git a/core-java/src/main/java/com/baeldung/decimalformat/DoubletoString.java b/core-java/src/main/java/com/baeldung/decimalformat/DoubletoString.java index 87d10a3548..e605c5e200 100644 --- a/core-java/src/main/java/com/baeldung/decimalformat/DoubletoString.java +++ b/core-java/src/main/java/com/baeldung/decimalformat/DoubletoString.java @@ -1,38 +1,29 @@ package com.baeldung.decimalformat; - import java.math.RoundingMode; - import java.text.DecimalFormat; - import java.text.NumberFormat; - import java.util.Locale; +import java.math.RoundingMode; +import java.text.DecimalFormat; +import java.text.NumberFormat; - public class DoubletoString { +public class DoubletoString { - public static void main(String[] args) { + public static void main(String[] args) { - double doubleValue = 345.56; + double doubleValue = 345.56; - System.out.println(String.valueOf((int) doubleValue)); + System.out.println(String.valueOf((int) doubleValue)); - System.out.println(String.format("%.0f", doubleValue)); + System.out.println(String.format("%.0f", doubleValue)); - doubleValue = Math.floor(doubleValue); - DecimalFormat df = new DecimalFormat("#"); - df.setRoundingMode(RoundingMode.FLOOR); - System.out.println(df.format(doubleValue)); - - Locale enlocale = new Locale("en", "US"); - String pattern = "###,##"; - df = (DecimalFormat) NumberFormat.getNumberInstance(enlocale); - df.applyPattern(pattern); - String format = df.format(doubleValue); - System.out.println(format); + NumberFormat nf = NumberFormat.getInstance(); + nf.setMaximumFractionDigits(0); + nf.setRoundingMode(RoundingMode.FLOOR); + System.out.println(nf.format(doubleValue)); - Locale dalocale = new Locale("da", "DK"); - df = (DecimalFormat) NumberFormat.getNumberInstance(dalocale); - df.applyPattern(pattern); - System.out.println(df.format(doubleValue)); + doubleValue = Math.floor(doubleValue); + DecimalFormat df = new DecimalFormat("#,###"); + df.setRoundingMode(RoundingMode.FLOOR); + System.out.println(df.format(doubleValue)); + } - } - - } +}