diff --git a/JGit/pom.xml b/JGit/pom.xml
index d1ebd364da..176d55d321 100644
--- a/JGit/pom.xml
+++ b/JGit/pom.xml
@@ -3,10 +3,11 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.baeldung
- JGitSnippets
+ JGit
1.0-SNAPSHOT
jar
http://maven.apache.org
+ JGit
com.baeldung
diff --git a/algorithms/src/main/java/com/baeldung/algorithms/distancebetweenpoints/DistanceBetweenPointsService.java b/algorithms/src/main/java/com/baeldung/algorithms/distancebetweenpoints/DistanceBetweenPointsService.java
new file mode 100644
index 0000000000..0c8eb86a38
--- /dev/null
+++ b/algorithms/src/main/java/com/baeldung/algorithms/distancebetweenpoints/DistanceBetweenPointsService.java
@@ -0,0 +1,38 @@
+package com.baeldung.algorithms.distancebetweenpoints;
+
+import java.awt.geom.Point2D;
+
+public class DistanceBetweenPointsService {
+
+ public double calculateDistanceBetweenPoints(
+ double x1,
+ double y1,
+ double x2,
+ double y2) {
+
+ return Math.sqrt((y2 - y1) * (y2 - y1) + (x2 - x1) * (x2 - x1));
+ }
+
+ public double calculateDistanceBetweenPointsWithHypot(
+ double x1,
+ double y1,
+ double x2,
+ double y2) {
+
+ double ac = Math.abs(y2 - y1);
+ double cb = Math.abs(x2 - x1);
+
+ return Math.hypot(ac, cb);
+ }
+
+ public double calculateDistanceBetweenPointsWithPoint2D(
+ double x1,
+ double y1,
+ double x2,
+ double y2) {
+
+ return Point2D.distance(x1, y1, x2, y2);
+
+ }
+
+}
diff --git a/algorithms/src/test/java/com/baeldung/algorithms/distancebetweenpoints/DistanceBetweenPointsServiceUnitTest.java b/algorithms/src/test/java/com/baeldung/algorithms/distancebetweenpoints/DistanceBetweenPointsServiceUnitTest.java
new file mode 100644
index 0000000000..785afdbb2b
--- /dev/null
+++ b/algorithms/src/test/java/com/baeldung/algorithms/distancebetweenpoints/DistanceBetweenPointsServiceUnitTest.java
@@ -0,0 +1,54 @@
+package com.baeldung.algorithms.distancebetweenpoints;
+
+import org.junit.Test;
+
+import com.baeldung.algorithms.distancebetweenpoints.DistanceBetweenPointsService;
+
+import static org.junit.Assert.assertEquals;
+
+public class DistanceBetweenPointsServiceUnitTest {
+
+ private DistanceBetweenPointsService service = new DistanceBetweenPointsService();
+
+ @Test
+ public void givenTwoPoints_whenCalculateDistanceByFormula_thenCorrect() {
+
+ double x1 = 3;
+ double y1 = 4;
+ double x2 = 7;
+ double y2 = 1;
+
+ double distance = service.calculateDistanceBetweenPoints(x1, y1, x2, y2);
+
+ assertEquals(distance, 5, 0.001);
+
+ }
+
+ @Test
+ public void givenTwoPoints_whenCalculateDistanceWithHypot_thenCorrect() {
+
+ double x1 = 3;
+ double y1 = 4;
+ double x2 = 7;
+ double y2 = 1;
+
+ double distance = service.calculateDistanceBetweenPointsWithHypot(x1, y1, x2, y2);
+
+ assertEquals(distance, 5, 0.001);
+
+ }
+
+ @Test
+ public void givenTwoPoints_whenCalculateDistanceWithPoint2D_thenCorrect() {
+
+ double x1 = 3;
+ double y1 = 4;
+ double x2 = 7;
+ double y2 = 1;
+
+ double distance = service.calculateDistanceBetweenPointsWithPoint2D(x1, y1, x2, y2);
+
+ assertEquals(distance, 5, 0.001);
+
+ }
+}
diff --git a/apache-avro/pom.xml b/apache-avro/pom.xml
index 39da518269..3e3234ff96 100644
--- a/apache-avro/pom.xml
+++ b/apache-avro/pom.xml
@@ -4,8 +4,9 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.baeldung
- apache-avro-tutorial
+ apache-avro
0.0.1-SNAPSHOT
+ Apache Avro
UTF-8
diff --git a/core-java/src/main/java/com/baeldung/synthetic/BridgeMethodDemo.java b/core-java/src/main/java/com/baeldung/synthetic/BridgeMethodDemo.java
new file mode 100644
index 0000000000..bdf6684f78
--- /dev/null
+++ b/core-java/src/main/java/com/baeldung/synthetic/BridgeMethodDemo.java
@@ -0,0 +1,23 @@
+package com.baeldung.synthetic;
+
+import java.util.Comparator;
+
+/**
+ * Class which contains a synthetic bridge method.
+ *
+ * @author Donato Rimenti
+ *
+ */
+public class BridgeMethodDemo implements Comparator {
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.util.Comparator#compare(java.lang.Object, java.lang.Object)
+ */
+ @Override
+ public int compare(Integer o1, Integer o2) {
+ return 0;
+ }
+
+}
diff --git a/core-java/src/main/java/com/baeldung/synthetic/SyntheticConstructorDemo.java b/core-java/src/main/java/com/baeldung/synthetic/SyntheticConstructorDemo.java
new file mode 100644
index 0000000000..d3d75ac05e
--- /dev/null
+++ b/core-java/src/main/java/com/baeldung/synthetic/SyntheticConstructorDemo.java
@@ -0,0 +1,34 @@
+package com.baeldung.synthetic;
+
+/**
+ * Wrapper for a class which contains a synthetic constructor.
+ *
+ * @author Donato Rimenti
+ *
+ */
+public class SyntheticConstructorDemo {
+
+ /**
+ * We need to instantiate the {@link NestedClass} using a private
+ * constructor from the enclosing instance in order to generate a synthetic
+ * constructor.
+ */
+ private NestedClass nestedClass = new NestedClass();
+
+ /**
+ * Class which contains a synthetic constructor.
+ *
+ * @author Donato Rimenti
+ *
+ */
+ class NestedClass {
+
+ /**
+ * In order to generate a synthetic constructor, this class must have a
+ * private constructor.
+ */
+ private NestedClass() {
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/core-java/src/main/java/com/baeldung/synthetic/SyntheticFieldDemo.java b/core-java/src/main/java/com/baeldung/synthetic/SyntheticFieldDemo.java
new file mode 100644
index 0000000000..1813e03953
--- /dev/null
+++ b/core-java/src/main/java/com/baeldung/synthetic/SyntheticFieldDemo.java
@@ -0,0 +1,22 @@
+package com.baeldung.synthetic;
+
+/**
+ * Wrapper for a class which contains a synthetic field reference to the outer
+ * class.
+ *
+ * @author Donato Rimenti
+ *
+ */
+public class SyntheticFieldDemo {
+
+ /**
+ * Class which contains a synthetic field reference to the outer class.
+ *
+ * @author Donato Rimenti
+ *
+ */
+ class NestedClass {
+
+ }
+
+}
\ No newline at end of file
diff --git a/core-java/src/main/java/com/baeldung/synthetic/SyntheticMethodDemo.java b/core-java/src/main/java/com/baeldung/synthetic/SyntheticMethodDemo.java
new file mode 100644
index 0000000000..59be4e1429
--- /dev/null
+++ b/core-java/src/main/java/com/baeldung/synthetic/SyntheticMethodDemo.java
@@ -0,0 +1,48 @@
+package com.baeldung.synthetic;
+
+/**
+ * Wrapper for a class which contains two synthetic methods accessors to a
+ * private field.
+ *
+ * @author Donato Rimenti
+ *
+ */
+public class SyntheticMethodDemo {
+
+ /**
+ * Class which contains two synthetic methods accessors to a private field.
+ *
+ * @author Donato Rimenti
+ *
+ */
+ class NestedClass {
+
+ /**
+ * Field for which will be generated synthetic methods accessors. It's
+ * important that this field is private for this purpose.
+ */
+ private String nestedField;
+ }
+
+ /**
+ * Gets the private nested field. We need to read the nested field in order
+ * to generate the synthetic getter.
+ *
+ * @return the {@link NestedClass#nestedField}
+ */
+ public String getNestedField() {
+ return new NestedClass().nestedField;
+ }
+
+ /**
+ * Sets the private nested field. We need to write the nested field in order
+ * to generate the synthetic setter.
+ *
+ * @param nestedField
+ * the {@link NestedClass#nestedField}
+ */
+ public void setNestedField(String nestedField) {
+ new NestedClass().nestedField = nestedField;
+ }
+
+}
\ No newline at end of file
diff --git a/core-java/src/test/java/com/baeldung/synthetic/SyntheticUnitTest.java b/core-java/src/test/java/com/baeldung/synthetic/SyntheticUnitTest.java
new file mode 100644
index 0000000000..20f7647f48
--- /dev/null
+++ b/core-java/src/test/java/com/baeldung/synthetic/SyntheticUnitTest.java
@@ -0,0 +1,99 @@
+package com.baeldung.synthetic;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link SyntheticFieldDemo}, {@link SyntheticMethodDemo},
+ * {@link SyntheticConstructorDemo} and {@link BridgeMethodDemo} classes.
+ *
+ * @author Donato Rimenti
+ *
+ */
+public class SyntheticUnitTest {
+
+ /**
+ * Tests that the {@link SyntheticMethodDemo.NestedClass} contains two synthetic
+ * methods.
+ */
+ @Test
+ public void givenSyntheticMethod_whenIsSinthetic_thenTrue() {
+ // Checks that the nested class contains exactly two synthetic methods.
+ Method[] methods = SyntheticMethodDemo.NestedClass.class.getDeclaredMethods();
+ Assert.assertEquals("This class should contain only two methods", 2, methods.length);
+
+ for (Method m : methods) {
+ System.out.println("Method: " + m.getName() + ", isSynthetic: " + m.isSynthetic());
+ Assert.assertTrue("All the methods of this class should be synthetic", m.isSynthetic());
+ }
+ }
+
+ /**
+ * Tests that {@link SyntheticConstructorDemo.NestedClass} contains a synthetic
+ * constructor.
+ */
+ @Test
+ public void givenSyntheticConstructor_whenIsSinthetic_thenTrue() {
+ // Checks that the nested class contains exactly a synthetic
+ // constructor.
+ int syntheticConstructors = 0;
+ Constructor>[] constructors = SyntheticConstructorDemo.NestedClass.class.getDeclaredConstructors();
+ Assert.assertEquals("This class should contain only two constructors", 2, constructors.length);
+
+ for (Constructor> c : constructors) {
+ System.out.println("Constructor: " + c.getName() + ", isSynthetic: " + c.isSynthetic());
+
+ // Counts the synthetic constructors.
+ if (c.isSynthetic()) {
+ syntheticConstructors++;
+ }
+ }
+
+ // Checks that there's exactly one synthetic constructor.
+ Assert.assertEquals(1, syntheticConstructors);
+ }
+
+ /**
+ * Tests that {@link SyntheticFieldDemo.NestedClass} contains a synthetic field.
+ */
+ @Test
+ public void givenSyntheticField_whenIsSinthetic_thenTrue() {
+ // This class should contain exactly one synthetic field.
+ Field[] fields = SyntheticFieldDemo.NestedClass.class.getDeclaredFields();
+ Assert.assertEquals("This class should contain only one field", 1, fields.length);
+
+ for (Field f : fields) {
+ System.out.println("Field: " + f.getName() + ", isSynthetic: " + f.isSynthetic());
+ Assert.assertTrue("All the fields of this class should be synthetic", f.isSynthetic());
+ }
+ }
+
+ /**
+ * Tests that {@link BridgeMethodDemo} contains a synthetic bridge method.
+ */
+ @Test
+ public void givenBridgeMethod_whenIsBridge_thenTrue() {
+ // This class should contain exactly one synthetic bridge method.
+ int syntheticMethods = 0;
+ Method[] methods = BridgeMethodDemo.class.getDeclaredMethods();
+ for (Method m : methods) {
+ System.out.println(
+ "Method: " + m.getName() + ", isSynthetic: " + m.isSynthetic() + ", isBridge: " + m.isBridge());
+
+ // Counts the synthetic methods and checks that they are also bridge
+ // methods.
+ if (m.isSynthetic()) {
+ syntheticMethods++;
+ Assert.assertTrue("The synthetic method in this class should also be a bridge method", m.isBridge());
+ }
+ }
+
+ // Checks that there's exactly one synthetic bridge method.
+ Assert.assertEquals("There should be exactly 1 synthetic bridge method in this class", 1, syntheticMethods);
+ }
+
+}
\ No newline at end of file
diff --git a/couchbase/pom.xml b/couchbase/pom.xml
index f6397fe309..4f0f8787ca 100644
--- a/couchbase/pom.xml
+++ b/couchbase/pom.xml
@@ -3,11 +3,11 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.baeldung
- couchbase-sdk
+ couchbase
0.1-SNAPSHOT
jar
couchbase
- Couchbase SDK Tutorials
+ Couchbase Tutorials
com.baeldung
diff --git a/ejb/wildfly/pom.xml b/ejb/wildfly/pom.xml
index 7159096f3c..53d10a90ed 100644
--- a/ejb/wildfly/pom.xml
+++ b/ejb/wildfly/pom.xml
@@ -2,9 +2,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.baeldung.wildfly
- wildfly-example
+ wildfly
0.0.1-SNAPSHOT
pom
+ wildfly
com.baeldung.ejb
diff --git a/ejb/wildfly/wildfly-mdb/pom.xml b/ejb/wildfly/wildfly-mdb/pom.xml
index 0b2ec7d5a3..186ddc50c0 100644
--- a/ejb/wildfly/wildfly-mdb/pom.xml
+++ b/ejb/wildfly/wildfly-mdb/pom.xml
@@ -1,8 +1,9 @@
4.0.0
- widlfly-mdb
-
+ wildfly-mdb
+ wildfly-mdb
+
com.baeldung.wildfly
wildfly-example
diff --git a/feign/pom.xml b/feign/pom.xml
index 29c2a784bc..ea645383c1 100644
--- a/feign/pom.xml
+++ b/feign/pom.xml
@@ -3,7 +3,8 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.baeldung.feign
- feign-client
+ feign
+ feign
com.baeldung
diff --git a/google-web-toolkit/pom.xml b/google-web-toolkit/pom.xml
index b2f7cab355..db9ce2eac0 100644
--- a/google-web-toolkit/pom.xml
+++ b/google-web-toolkit/pom.xml
@@ -6,10 +6,11 @@
4.0.0
com.baeldung
- google_web_toolkit
+ google-web-toolkit
war
1.0-SNAPSHOT
-
+ google-web-toolkit
+
com.baeldung
parent-modules
diff --git a/grpc/pom.xml b/grpc/pom.xml
index 218e2df008..949f26d376 100644
--- a/grpc/pom.xml
+++ b/grpc/pom.xml
@@ -1,11 +1,11 @@
4.0.0
- grpc
- grpc-demo
+ grpc
0.0.1-SNAPSHOT
jar
-
+ grpc
+
com.baeldung
parent-modules
diff --git a/guest/remote-debugging/pom.xml b/guest/remote-debugging/pom.xml
index 67fed3f1a1..974421de97 100644
--- a/guest/remote-debugging/pom.xml
+++ b/guest/remote-debugging/pom.xml
@@ -3,9 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.stackify
- java-remote-debugging
+ remote-debugging
0.0.1-SNAPSHOT
war
+ remote-debugging
org.springframework.boot
diff --git a/jhipster/jhipster-microservice/car-app/pom.xml b/jhipster/jhipster-microservice/car-app/pom.xml
index 77fddf9c72..c52def554f 100644
--- a/jhipster/jhipster-microservice/car-app/pom.xml
+++ b/jhipster/jhipster-microservice/car-app/pom.xml
@@ -10,10 +10,10 @@
../../../parent-boot-1
com.car.app
- carapp
+ car-app
0.0.1-SNAPSHOT
war
- Carapp
+ car-app
${maven.version}
diff --git a/jhipster/jhipster-microservice/dealer-app/pom.xml b/jhipster/jhipster-microservice/dealer-app/pom.xml
index 5f6485a203..a9366e9bd3 100644
--- a/jhipster/jhipster-microservice/dealer-app/pom.xml
+++ b/jhipster/jhipster-microservice/dealer-app/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4.0.0
com.dealer.app
- dealerapp
+ dealer-app
0.0.1-SNAPSHOT
war
- Dealerapp
+ dealer-app
parent-boot-1
diff --git a/jhipster/jhipster-microservice/gateway-app/pom.xml b/jhipster/jhipster-microservice/gateway-app/pom.xml
index 85b3688efa..0ae74e11bb 100644
--- a/jhipster/jhipster-microservice/gateway-app/pom.xml
+++ b/jhipster/jhipster-microservice/gateway-app/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
4.0.0
com.gateway
- gateway
+ gateway-app
0.0.1-SNAPSHOT
war
- Gateway
+ gateway-app
parent-boot-1
diff --git a/jjwt/pom.xml b/jjwt/pom.xml
index 189e957e42..6bf9f4426a 100644
--- a/jjwt/pom.xml
+++ b/jjwt/pom.xml
@@ -3,10 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
io.jsonwebtoken
- jjwtfun
+ jjwt
0.0.1-SNAPSHOT
jar
- jjwtfun
+ jjwt
Exercising the JJWT
diff --git a/libraries/pom.xml b/libraries/pom.xml
index 80e3303ba5..c8135d8d2a 100644
--- a/libraries/pom.xml
+++ b/libraries/pom.xml
@@ -154,6 +154,17 @@
commons-dbutils
${commons.dbutils.version}
+
+ org.apache.flink
+ flink-connector-kafka-0.11_2.11
+ ${flink.version}
+
+
+ org.apache.flink
+ flink-streaming-java_2.11
+ ${flink.version}
+
+
org.apache.flink
flink-core
@@ -178,7 +189,7 @@
org.apache.flink
- flink-test-utils_2.10
+ flink-test-utils_2.11
${flink.version}
test
@@ -228,6 +239,11 @@
jackson-databind
${jackson.version}
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ ${jackson.version}
+
org.datanucleus
@@ -902,7 +918,7 @@
4.5.3
2.5
- 1.2.0
+ 1.5.0
2.8.5
2.92
1.9.26
diff --git a/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java b/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java
new file mode 100644
index 0000000000..d02b1bcb83
--- /dev/null
+++ b/libraries/src/main/java/com/baeldung/flink/FlinkDataPipeline.java
@@ -0,0 +1,82 @@
+package com.baeldung.flink;
+
+
+import com.baeldung.flink.model.Backup;
+import com.baeldung.flink.model.InputMessage;
+import com.baeldung.flink.operator.BackupAggregator;
+import com.baeldung.flink.operator.InputMessageTimestampAssigner;
+import com.baeldung.flink.operator.WordsCapitalizer;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
+
+import static com.baeldung.flink.connector.Consumers.*;
+import static com.baeldung.flink.connector.Producers.*;
+
+public class FlinkDataPipeline {
+
+ public static void capitalize() throws Exception {
+ String inputTopic = "flink_input";
+ String outputTopic = "flink_output";
+ String consumerGroup = "baeldung";
+ String address = "localhost:9092";
+
+ StreamExecutionEnvironment environment =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+
+ FlinkKafkaConsumer011 flinkKafkaConsumer =
+ createStringConsumerForTopic(inputTopic, address, consumerGroup);
+ flinkKafkaConsumer.setStartFromEarliest();
+
+ DataStream stringInputStream =
+ environment.addSource(flinkKafkaConsumer);
+
+ FlinkKafkaProducer011 flinkKafkaProducer =
+ createStringProducer(outputTopic, address);
+
+ stringInputStream
+ .map(new WordsCapitalizer())
+ .addSink(flinkKafkaProducer);
+
+ environment.execute();
+ }
+
+public static void createBackup () throws Exception {
+ String inputTopic = "flink_input";
+ String outputTopic = "flink_output";
+ String consumerGroup = "baeldung";
+ String kafkaAddress = "localhost:9092";
+
+ StreamExecutionEnvironment environment =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+
+ environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ FlinkKafkaConsumer011 flinkKafkaConsumer =
+ createInputMessageConsumer(inputTopic, kafkaAddress, consumerGroup);
+ flinkKafkaConsumer.setStartFromEarliest();
+
+ flinkKafkaConsumer
+ .assignTimestampsAndWatermarks(new InputMessageTimestampAssigner());
+ FlinkKafkaProducer011 flinkKafkaProducer =
+ createBackupProducer(outputTopic, kafkaAddress);
+
+ DataStream inputMessagesStream =
+ environment.addSource(flinkKafkaConsumer);
+
+ inputMessagesStream
+ .timeWindowAll(Time.hours(24))
+ .aggregate(new BackupAggregator())
+ .addSink(flinkKafkaProducer);
+
+ environment.execute();
+}
+
+ public static void main(String[] args) throws Exception {
+ createBackup();
+ }
+
+}
diff --git a/libraries/src/main/java/com/baeldung/flink/connector/Consumers.java b/libraries/src/main/java/com/baeldung/flink/connector/Consumers.java
new file mode 100644
index 0000000000..514085f9c4
--- /dev/null
+++ b/libraries/src/main/java/com/baeldung/flink/connector/Consumers.java
@@ -0,0 +1,32 @@
+package com.baeldung.flink.connector;
+
+import com.baeldung.flink.model.InputMessage;
+import com.baeldung.flink.schema.InputMessageDeserializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
+
+import java.util.Properties;
+
+public class Consumers {
+
+public static FlinkKafkaConsumer011 createStringConsumerForTopic(
+ String topic, String kafkaAddress, String kafkaGroup ) {
+ Properties props = new Properties();
+ props.setProperty("bootstrap.servers", kafkaAddress);
+ props.setProperty("group.id",kafkaGroup);
+ FlinkKafkaConsumer011 consumer =
+ new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(),props);
+
+ return consumer;
+}
+
+ public static FlinkKafkaConsumer011 createInputMessageConsumer(String topic, String kafkaAddress, String kafkaGroup ) {
+ Properties properties = new Properties();
+ properties.setProperty("bootstrap.servers", kafkaAddress);
+ properties.setProperty("group.id",kafkaGroup);
+ FlinkKafkaConsumer011 consumer = new FlinkKafkaConsumer011(
+ topic, new InputMessageDeserializationSchema(),properties);
+
+ return consumer;
+ }
+}
diff --git a/libraries/src/main/java/com/baeldung/flink/connector/Producers.java b/libraries/src/main/java/com/baeldung/flink/connector/Producers.java
new file mode 100644
index 0000000000..8e6f3f8f37
--- /dev/null
+++ b/libraries/src/main/java/com/baeldung/flink/connector/Producers.java
@@ -0,0 +1,17 @@
+package com.baeldung.flink.connector;
+
+import com.baeldung.flink.model.Backup;
+import com.baeldung.flink.schema.BackupSerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
+
+public class Producers {
+
+ public static FlinkKafkaProducer011 createStringProducer(String topic, String kafkaAddress) {
+ return new FlinkKafkaProducer011<>(kafkaAddress, topic, new SimpleStringSchema());
+ }
+
+ public static FlinkKafkaProducer011 createBackupProducer(String topic, String kafkaAddress) {
+ return new FlinkKafkaProducer011(kafkaAddress, topic, new BackupSerializationSchema());
+ }
+}
diff --git a/libraries/src/main/java/com/baeldung/flink/model/Backup.java b/libraries/src/main/java/com/baeldung/flink/model/Backup.java
new file mode 100644
index 0000000000..268ceec7f3
--- /dev/null
+++ b/libraries/src/main/java/com/baeldung/flink/model/Backup.java
@@ -0,0 +1,27 @@
+package com.baeldung.flink.model;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.UUID;
+
+public class Backup {
+
+ @JsonProperty("inputMessages")
+ List inputMessages;
+ @JsonProperty("backupTimestamp")
+ LocalDateTime backupTimestamp;
+ @JsonProperty("uuid")
+ UUID uuid;
+
+ public Backup(List inputMessages, LocalDateTime backupTimestamp) {
+ this.inputMessages = inputMessages;
+ this.backupTimestamp = backupTimestamp;
+ this.uuid = UUID.randomUUID();
+ }
+
+ public List getInputMessages() {
+ return inputMessages;
+ }
+}
diff --git a/libraries/src/main/java/com/baeldung/flink/model/InputMessage.java b/libraries/src/main/java/com/baeldung/flink/model/InputMessage.java
new file mode 100644
index 0000000000..183fa69c11
--- /dev/null
+++ b/libraries/src/main/java/com/baeldung/flink/model/InputMessage.java
@@ -0,0 +1,72 @@
+package com.baeldung.flink.model;
+
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.google.common.base.Objects;
+
+import java.time.LocalDateTime;
+
+@JsonSerialize
+public class InputMessage {
+ String sender;
+ String recipient;
+ LocalDateTime sentAt;
+ String message;
+
+ public InputMessage() {
+ }
+
+ public String getSender() {
+ return sender;
+ }
+
+ public void setSender(String sender) {
+ this.sender = sender;
+ }
+
+ public String getRecipient() {
+ return recipient;
+ }
+
+ public void setRecipient(String recipient) {
+ this.recipient = recipient;
+ }
+
+ public LocalDateTime getSentAt() {
+ return sentAt;
+ }
+
+ public void setSentAt(LocalDateTime sentAt) {
+ this.sentAt = sentAt;
+ }
+
+ public String getMessage() {
+ return message;
+ }
+
+ public void setMessage(String message) {
+ this.message = message;
+ }
+
+ public InputMessage(String sender, String recipient, LocalDateTime sentAt, String message) {
+ this.sender = sender;
+ this.recipient = recipient;
+ this.sentAt = sentAt;
+ this.message = message;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ InputMessage message1 = (InputMessage) o;
+ return Objects.equal(sender, message1.sender) &&
+ Objects.equal(recipient, message1.recipient) &&
+ Objects.equal(sentAt, message1.sentAt) &&
+ Objects.equal(message, message1.message);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(sender, recipient, sentAt, message);
+ }
+}
diff --git a/libraries/src/main/java/com/baeldung/flink/operator/BackupAggregator.java b/libraries/src/main/java/com/baeldung/flink/operator/BackupAggregator.java
new file mode 100644
index 0000000000..c39b8413d1
--- /dev/null
+++ b/libraries/src/main/java/com/baeldung/flink/operator/BackupAggregator.java
@@ -0,0 +1,34 @@
+package com.baeldung.flink.operator;
+
+import com.baeldung.flink.model.Backup;
+import com.baeldung.flink.model.InputMessage;
+import org.apache.flink.api.common.functions.AggregateFunction;
+
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+
+ public class BackupAggregator implements AggregateFunction, Backup> {
+ @Override
+ public List createAccumulator() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public List add(InputMessage inputMessage, List inputMessages) {
+ inputMessages.add(inputMessage);
+ return inputMessages;
+ }
+
+ @Override
+ public Backup getResult(List inputMessages) {
+ Backup backup = new Backup(inputMessages, LocalDateTime.now());
+ return backup;
+ }
+
+ @Override
+ public List merge(List inputMessages, List acc1) {
+ inputMessages.addAll(acc1);
+ return inputMessages;
+ }
+ }
diff --git a/libraries/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java b/libraries/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java
new file mode 100644
index 0000000000..05828d9588
--- /dev/null
+++ b/libraries/src/main/java/com/baeldung/flink/operator/InputMessageTimestampAssigner.java
@@ -0,0 +1,23 @@
+package com.baeldung.flink.operator;
+
+import com.baeldung.flink.model.InputMessage;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import javax.annotation.Nullable;
+import java.time.ZoneId;
+
+public class InputMessageTimestampAssigner implements AssignerWithPunctuatedWatermarks {
+
+ @Override
+ public long extractTimestamp(InputMessage element, long previousElementTimestamp) {
+ ZoneId zoneId = ZoneId.systemDefault();
+ return element.getSentAt().atZone(zoneId).toEpochSecond() * 1000;
+ }
+
+ @Nullable
+ @Override
+ public Watermark checkAndGetNextWatermark(InputMessage lastElement, long extractedTimestamp) {
+ return new Watermark(extractedTimestamp - 15);
+ }
+}
diff --git a/libraries/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java b/libraries/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java
new file mode 100644
index 0000000000..f9103d225c
--- /dev/null
+++ b/libraries/src/main/java/com/baeldung/flink/operator/WordsCapitalizer.java
@@ -0,0 +1,11 @@
+package com.baeldung.flink.operator;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+public class WordsCapitalizer implements MapFunction {
+
+ @Override
+ public String map(String s) {
+ return s.toUpperCase();
+ }
+}
diff --git a/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java b/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java
new file mode 100644
index 0000000000..967b266bb6
--- /dev/null
+++ b/libraries/src/main/java/com/baeldung/flink/schema/BackupSerializationSchema.java
@@ -0,0 +1,33 @@
+package com.baeldung.flink.schema;
+
+import com.baeldung.flink.model.Backup;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BackupSerializationSchema
+ implements SerializationSchema {
+
+ static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
+
+ Logger logger = LoggerFactory.getLogger(BackupSerializationSchema.class);
+
+ @Override
+ public byte[] serialize(Backup backupMessage) {
+ if(objectMapper == null) {
+ objectMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+ objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
+ }
+ try {
+ String json = objectMapper.writeValueAsString(backupMessage);
+ return json.getBytes();
+ } catch (com.fasterxml.jackson.core.JsonProcessingException e) {
+ logger.error("Failed to parse JSON", e);
+ }
+ return new byte[0];
+ }
+}
diff --git a/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java b/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java
new file mode 100644
index 0000000000..1df456bbe5
--- /dev/null
+++ b/libraries/src/main/java/com/baeldung/flink/schema/InputMessageDeserializationSchema.java
@@ -0,0 +1,34 @@
+package com.baeldung.flink.schema;
+
+import com.baeldung.flink.model.InputMessage;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+
+public class InputMessageDeserializationSchema implements
+ DeserializationSchema {
+
+ static ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
+
+
+ @Override
+ public InputMessage deserialize(byte[] bytes) throws IOException {
+
+ return objectMapper.readValue(bytes, InputMessage.class);
+ }
+
+ @Override
+ public boolean isEndOfStream(InputMessage inputMessage) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return TypeInformation.of(InputMessage.class);
+ }
+}
diff --git a/libraries/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java b/libraries/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java
new file mode 100644
index 0000000000..ab7d119c16
--- /dev/null
+++ b/libraries/src/test/java/com/baeldung/flink/BackupCreatorIntegrationTest.java
@@ -0,0 +1,103 @@
+package com.baeldung.flink;
+
+import com.baeldung.flink.model.Backup;
+import com.baeldung.flink.model.InputMessage;
+import com.baeldung.flink.operator.BackupAggregator;
+import com.baeldung.flink.operator.InputMessageTimestampAssigner;
+import com.baeldung.flink.schema.BackupSerializationSchema;
+import com.baeldung.flink.schema.InputMessageDeserializationSchema;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import org.apache.commons.collections.ListUtils;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.awaitility.Awaitility;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class BackupCreatorIntegrationTest {
+ public static ObjectMapper mapper;
+
+ @Before
+ public void setup() {
+ mapper = new ObjectMapper().registerModule(new JavaTimeModule());
+ }
+
+ @Test
+ public void givenProperJson_whenDeserializeIsInvoked_thenProperObjectIsReturned() throws IOException {
+ InputMessage message = new InputMessage("Me", "User", LocalDateTime.now(), "Test Message");
+ byte[] messageSerialized = mapper.writeValueAsBytes(message);
+ DeserializationSchema deserializationSchema = new InputMessageDeserializationSchema();
+ InputMessage messageDeserialized = deserializationSchema.deserialize(messageSerialized);
+
+ assertEquals(message, messageDeserialized);
+ }
+
+ @Test
+ public void givenMultipleInputMessagesFromDifferentDays_whenBackupCreatorIsUser_thenMessagesAreGroupedProperly() throws Exception {
+ LocalDateTime currentTime = LocalDateTime.now();
+ InputMessage message = new InputMessage("Me", "User", currentTime, "First TestMessage");
+ InputMessage secondMessage = new InputMessage("Me", "User", currentTime.plusHours(1), "First TestMessage");
+ InputMessage thirdMessage = new InputMessage("Me", "User", currentTime.plusHours(2), "First TestMessage");
+ InputMessage fourthMessage = new InputMessage("Me", "User", currentTime.plusHours(3), "First TestMessage");
+ InputMessage fifthMessage = new InputMessage("Me", "User", currentTime.plusHours(25), "First TestMessage");
+ InputMessage sixthMessage = new InputMessage("Me", "User", currentTime.plusHours(26), "First TestMessage");
+
+ List firstBackupMessages = Arrays.asList(message, secondMessage, thirdMessage, fourthMessage);
+ List secondBackupMessages = Arrays.asList(fifthMessage, sixthMessage);
+ List inputMessages = ListUtils.union(firstBackupMessages, secondBackupMessages);
+
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+ env.setParallelism(1);
+ DataStreamSource testDataSet = env.fromCollection(inputMessages);
+ CollectingSink sink = new CollectingSink();
+ testDataSet.assignTimestampsAndWatermarks(new InputMessageTimestampAssigner())
+ .timeWindowAll(Time.hours(24))
+ .aggregate(new BackupAggregator())
+ .addSink(sink);
+
+ env.execute();
+
+ Awaitility.await().until(() -> sink.backups.size() == 2);
+ assertEquals(2, sink.backups.size());
+ assertEquals(firstBackupMessages, sink.backups.get(0).getInputMessages());
+ assertEquals(secondBackupMessages, sink.backups.get(1).getInputMessages());
+
+ }
+
+ @Test
+ public void givenProperBackupObject_whenSerializeIsInvoked_thenObjectIsProperlySerialized() throws IOException {
+ InputMessage message = new InputMessage("Me", "User", LocalDateTime.now(), "Test Message");
+ List messages = Arrays.asList(message);
+ Backup backup = new Backup(messages, LocalDateTime.now());
+ byte[] backupSerialized = mapper.writeValueAsBytes(backup);
+ SerializationSchema serializationSchema = new BackupSerializationSchema();
+ byte[] backupProcessed = serializationSchema.serialize(backup);
+
+ assertEquals(backupSerialized, backupProcessed);
+ }
+
+ private static class CollectingSink implements SinkFunction {
+
+ public static List backups = new ArrayList<>();
+
+ @Override
+ public synchronized void invoke(Backup value, Context context) throws Exception {
+ backups.add(value);
+ }
+ }
+}
diff --git a/libraries/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java b/libraries/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java
new file mode 100644
index 0000000000..8a98dae4b5
--- /dev/null
+++ b/libraries/src/test/java/com/baeldung/flink/WordCapitalizerIntegrationTest.java
@@ -0,0 +1,34 @@
+package com.baeldung.flink;
+
+import com.baeldung.flink.operator.WordsCapitalizer;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class WordCapitalizerIntegrationTest {
+
+ @Test
+ public void givenDataSet_whenExecuteWordCapitalizer_thenReturnCapitalizedWords() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+ List data = Arrays.asList("dog", "cat", "wolf", "pig");
+
+ DataSet testDataSet = env.fromCollection(data);
+
+
+ List dataProcessed = testDataSet
+ .map(new WordsCapitalizer())
+ .collect();
+
+ List testDataCapitalized = data.stream()
+ .map(String::toUpperCase)
+ .collect(Collectors.toList());
+
+ Assert.assertEquals(testDataCapitalized, dataProcessed);
+ }
+
+}
diff --git a/micronaut/pom.xml b/micronaut/pom.xml
index 56d051e506..7b722306b6 100644
--- a/micronaut/pom.xml
+++ b/micronaut/pom.xml
@@ -1,8 +1,10 @@
4.0.0
com.baeldung.micronaut
- hello-world
+ micronaut
0.1
+ micronaut
+
com.baeldung.micronaut.helloworld.server.ServerApplication
1.0.0.M2
diff --git a/osgi/osgi-intro-sample-activator/pom.xml b/osgi/osgi-intro-sample-activator/pom.xml
index 42008dafdd..77d7198698 100644
--- a/osgi/osgi-intro-sample-activator/pom.xml
+++ b/osgi/osgi-intro-sample-activator/pom.xml
@@ -8,7 +8,7 @@
- osgi-intro
+ osgi
com.baeldung
1.0-SNAPSHOT
diff --git a/osgi/osgi-intro-sample-client/pom.xml b/osgi/osgi-intro-sample-client/pom.xml
index f0cf0a357a..7f5faedb30 100644
--- a/osgi/osgi-intro-sample-client/pom.xml
+++ b/osgi/osgi-intro-sample-client/pom.xml
@@ -8,7 +8,7 @@
bundle
- osgi-intro
+ osgi
com.baeldung
1.0-SNAPSHOT
diff --git a/osgi/osgi-intro-sample-service/pom.xml b/osgi/osgi-intro-sample-service/pom.xml
index 29741e7eb2..8f81645ad4 100644
--- a/osgi/osgi-intro-sample-service/pom.xml
+++ b/osgi/osgi-intro-sample-service/pom.xml
@@ -8,7 +8,7 @@
- osgi-intro
+ osgi
com.baeldung
1.0-SNAPSHOT
diff --git a/osgi/pom.xml b/osgi/pom.xml
index 7c59fbaff4..9ebcb09091 100644
--- a/osgi/pom.xml
+++ b/osgi/pom.xml
@@ -2,9 +2,10 @@
4.0.0
- osgi-intro
+ osgi
pom
1.0-SNAPSHOT
+ osgi
com.baeldung
diff --git a/patterns/design-patterns/pom.xml b/patterns/design-patterns/pom.xml
index 22fc88b75a..dc2631b36e 100644
--- a/patterns/design-patterns/pom.xml
+++ b/patterns/design-patterns/pom.xml
@@ -2,13 +2,12 @@
4.0.0
- com.baeldung
design-patterns
1.0
jar
com.baeldung
- patterns-parent
+ patterns
1.0.0-SNAPSHOT
..
diff --git a/patterns/front-controller/pom.xml b/patterns/front-controller/pom.xml
index 96ed86d975..435b0dd9cd 100644
--- a/patterns/front-controller/pom.xml
+++ b/patterns/front-controller/pom.xml
@@ -6,7 +6,7 @@
war
- patterns-parent
+ patterns
com.baeldung
1.0.0-SNAPSHOT
..
diff --git a/patterns/intercepting-filter/pom.xml b/patterns/intercepting-filter/pom.xml
index ee6fef30c9..fa94d5d1fd 100644
--- a/patterns/intercepting-filter/pom.xml
+++ b/patterns/intercepting-filter/pom.xml
@@ -8,7 +8,7 @@
com.baeldung
- patterns-parent
+ patterns
1.0.0-SNAPSHOT
..
diff --git a/patterns/pom.xml b/patterns/pom.xml
index c8f44227a3..df09f1836a 100644
--- a/patterns/pom.xml
+++ b/patterns/pom.xml
@@ -2,8 +2,7 @@
4.0.0
- com.baeldung
- patterns-parent
+ patterns
pom
diff --git a/performance-tests/pom.xml b/performance-tests/pom.xml
index 0240b2c2ea..8c9d027724 100644
--- a/performance-tests/pom.xml
+++ b/performance-tests/pom.xml
@@ -3,7 +3,7 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
- performancetests
+ performance-tests
parent-modules
diff --git a/persistence-modules/spring-data-dynamodb/pom.xml b/persistence-modules/spring-data-dynamodb/pom.xml
index e5bd78d208..f3d794d001 100644
--- a/persistence-modules/spring-data-dynamodb/pom.xml
+++ b/persistence-modules/spring-data-dynamodb/pom.xml
@@ -2,10 +2,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.baeldung
- spring-boot-dynamodb
+ spring-data-dynamodb
0.0.1-SNAPSHOT
jar
- spring-boot-dynamodb
+ spring-data-dynamodb
This is simple boot application for Spring boot dynamodb test
diff --git a/resteasy/pom.xml b/resteasy/pom.xml
index 867c1f4c1b..31a6ed485a 100644
--- a/resteasy/pom.xml
+++ b/resteasy/pom.xml
@@ -3,9 +3,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.baeldung
- resteasy-tutorial
+ resteasy
1.0
war
+ resteasy
com.baeldung
diff --git a/spring-5-mvc/src/main/java/com/baeldung/web/SseEmitterController.java b/spring-5-mvc/src/main/java/com/baeldung/web/SseEmitterController.java
index b11c93fb08..00113c5ff7 100644
--- a/spring-5-mvc/src/main/java/com/baeldung/web/SseEmitterController.java
+++ b/spring-5-mvc/src/main/java/com/baeldung/web/SseEmitterController.java
@@ -1,12 +1,16 @@
package com.baeldung.web;
-import com.baeldung.Constants;
+import java.time.LocalTime;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter.SseEventBuilder;
+
+import com.baeldung.Constants;
@Controller
public class SseEmitterController {
@@ -29,4 +33,27 @@ public class SseEmitterController {
return emitter;
}
+ @GetMapping("/stream-sse-mvc")
+ public SseEmitter streamSseMvc() {
+ SseEmitter emitter = new SseEmitter();
+ ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
+
+ sseMvcExecutor.execute(() -> {
+ try {
+ for (int i = 0; true; i++) {
+ SseEventBuilder event = SseEmitter.event()
+ .data("SSE MVC - " + LocalTime.now()
+ .toString())
+ .id(String.valueOf(i))
+ .name("sse event - mvc");
+ emitter.send(event);
+ Thread.sleep(1000);
+ }
+ } catch (Exception ex) {
+ emitter.completeWithError(ex);
+ }
+ });
+ return emitter;
+ }
+
}
diff --git a/spring-5-reactive/pom.xml b/spring-5-reactive/pom.xml
index e81d3d8b79..5f455c3906 100644
--- a/spring-5-reactive/pom.xml
+++ b/spring-5-reactive/pom.xml
@@ -94,6 +94,12 @@
${project-reactor-test}
test
+
+ org.junit.platform
+ junit-platform-runner
+ ${junit.platform.version}
+ test
+
@@ -117,6 +123,7 @@
1.0
4.1
3.1.6.RELEASE
+ 1.2.0
diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/ConsumerSSEApplication.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/ConsumerSSEApplication.java
new file mode 100644
index 0000000000..3997607ef0
--- /dev/null
+++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/ConsumerSSEApplication.java
@@ -0,0 +1,19 @@
+package com.baeldung.reactive.serversentevents.consumer;
+
+import java.util.Collections;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableAsync;
+
+@SpringBootApplication
+@EnableAsync
+public class ConsumerSSEApplication {
+
+ public static void main(String[] args) {
+ SpringApplication app = new SpringApplication(ConsumerSSEApplication.class);
+ app.setDefaultProperties(Collections.singletonMap("server.port", "8082"));
+ app.run(args);
+ }
+
+}
diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/controller/ClientController.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/controller/ClientController.java
new file mode 100644
index 0000000000..69a6bc396c
--- /dev/null
+++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/consumer/controller/ClientController.java
@@ -0,0 +1,83 @@
+package com.baeldung.reactive.serversentevents.consumer.controller;
+
+import java.time.LocalTime;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.reactive.function.client.WebClient;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@RestController
+@RequestMapping("/sse-consumer")
+public class ClientController {
+
+ private static Logger logger = LoggerFactory.getLogger(ClientController.class);
+ private WebClient client = WebClient.create("http://localhost:8081/sse-server");
+
+ @GetMapping("/launch-sse-client")
+ public String launchSSEFromSSEWebClient() {
+ consumeSSE();
+ return "LAUNCHED EVENT CLIENT!!! Check the logs...";
+ }
+
+ @GetMapping("/launch-flux-client")
+ public String launchcFluxFromSSEWebClient() {
+ consumeFlux();
+ return "LAUNCHED EVENT CLIENT!!! Check the logs...";
+ }
+
+ @GetMapping("/launch-sse-from-flux-endpoint-client")
+ public String launchFluxFromFluxWebClient() {
+ consumeSSEFromFluxEndpoint();
+ return "LAUNCHED EVENT CLIENT!!! Check the logs...";
+ }
+
+ @Async
+ public void consumeSSE() {
+ ParameterizedTypeReference> type = new ParameterizedTypeReference>() {
+ };
+
+ Flux> eventStream = client.get()
+ .uri("/stream-sse")
+ .retrieve()
+ .bodyToFlux(type);
+
+ eventStream.subscribe(content -> logger.info("Current time: {} - Received SSE: name[{}], id [{}], content[{}] ", LocalTime.now(), content.event(), content.id(), content.data()), error -> logger.error("Error receiving SSE: {}", error),
+ () -> logger.info("Completed!!!"));
+ }
+
+ @Async
+ public void consumeFlux() {
+ Flux stringStream = client.get()
+ .uri("/stream-flux")
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .retrieve()
+ .bodyToFlux(String.class);
+
+ stringStream.subscribe(content -> logger.info("Current time: {} - Received content: {} ", LocalTime.now(), content), error -> logger.error("Error retrieving content: {}", error), () -> logger.info("Completed!!!"));
+ }
+
+ @Async
+ public void consumeSSEFromFluxEndpoint() {
+ ParameterizedTypeReference> type = new ParameterizedTypeReference>() {
+ };
+
+ Flux> eventStream = client.get()
+ .uri("/stream-flux")
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .retrieve()
+ .bodyToFlux(type);
+
+ eventStream.subscribe(content -> logger.info("Current time: {} - Received SSE: name[{}], id [{}], content[{}] ", LocalTime.now(), content.event(), content.id(), content.data()), error -> logger.error("Error receiving SSE: {}", error),
+ () -> logger.info("Completed!!!"));
+ }
+}
diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/ServerSSEApplication.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/ServerSSEApplication.java
new file mode 100644
index 0000000000..2750e6616d
--- /dev/null
+++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/ServerSSEApplication.java
@@ -0,0 +1,17 @@
+package com.baeldung.reactive.serversentevents.server;
+
+import java.util.Collections;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class ServerSSEApplication {
+
+ public static void main(String[] args) {
+ SpringApplication app = new SpringApplication(ServerSSEApplication.class);
+ app.setDefaultProperties(Collections.singletonMap("server.port", "8081"));
+ app.run(args);
+ }
+
+}
diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/controllers/ServerController.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/controllers/ServerController.java
new file mode 100644
index 0000000000..1ad8e848cf
--- /dev/null
+++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/serversentevents/server/controllers/ServerController.java
@@ -0,0 +1,35 @@
+package com.baeldung.reactive.serversentevents.server.controllers;
+
+import java.time.Duration;
+import java.time.LocalTime;
+
+import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerSentEvent;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import reactor.core.publisher.Flux;
+
+@RestController
+@RequestMapping("/sse-server")
+public class ServerController {
+
+ @GetMapping("/stream-sse")
+ public Flux> streamEvents() {
+ return Flux.interval(Duration.ofSeconds(1))
+ .map(sequence -> ServerSentEvent. builder()
+ .id(String.valueOf(sequence))
+ .event("periodic-event")
+ .data("SSE - " + LocalTime.now()
+ .toString())
+ .build());
+ }
+
+ @GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public Flux streamFlux() {
+ return Flux.interval(Duration.ofSeconds(1))
+ .map(sequence -> "Flux - " + LocalTime.now()
+ .toString());
+ }
+}
diff --git a/spring-5-reactive/src/test/java/com/baeldung/reactive/serversentsevents/ServiceSentEventLiveTest.java b/spring-5-reactive/src/test/java/com/baeldung/reactive/serversentsevents/ServiceSentEventLiveTest.java
new file mode 100644
index 0000000000..53f4a3b1bb
--- /dev/null
+++ b/spring-5-reactive/src/test/java/com/baeldung/reactive/serversentsevents/ServiceSentEventLiveTest.java
@@ -0,0 +1,49 @@
+package com.baeldung.reactive.serversentsevents;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.function.Executable;
+import org.junit.platform.runner.JUnitPlatform;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.http.MediaType;
+import org.springframework.test.web.reactive.server.WebTestClient;
+
+@RunWith(JUnitPlatform.class)
+@SpringBootTest
+public class ServiceSentEventLiveTest {
+
+ private WebTestClient client = WebTestClient.bindToServer()
+ .baseUrl("http://localhost:8081/sse-server")
+ .build();
+
+ @Test
+ public void whenSSEEndpointIsCalled_thenEventStreamingBegins() {
+
+ Executable sseStreamingCall = () -> client.get()
+ .uri("/stream-sse")
+ .exchange()
+ .expectStatus()
+ .isOk()
+ .expectHeader()
+ .contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
+ .expectBody(String.class);
+
+ Assertions.assertThrows(IllegalStateException.class, sseStreamingCall, "Expected test to timeout and throw IllegalStateException, but it didn't");
+ }
+
+ @Test
+ public void whenFluxEndpointIsCalled_thenEventStreamingBegins() {
+
+ Executable sseStreamingCall = () -> client.get()
+ .uri("/stream-flux")
+ .exchange()
+ .expectStatus()
+ .isOk()
+ .expectHeader()
+ .contentTypeCompatibleWith(MediaType.TEXT_EVENT_STREAM)
+ .expectBody(String.class);
+
+ Assertions.assertThrows(IllegalStateException.class, sseStreamingCall, "Expected test to timeout and throw IllegalStateException, but it didn't");
+ }
+}
diff --git a/spring-amqp/pom.xml b/spring-amqp/pom.xml
index 065fce7d35..e08a4243b3 100755
--- a/spring-amqp/pom.xml
+++ b/spring-amqp/pom.xml
@@ -2,10 +2,10 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
4.0.0
com.baeldung
- springamqp
+ spring-amqp
0.1-SNAPSHOT
jar
- springamqp
+ spring-amqp
Introduction to Spring-AMQP
diff --git a/spring-boot-h2/spring-boot-h2-database/.gitignore b/spring-boot-h2/spring-boot-h2-database/.gitignore
new file mode 100644
index 0000000000..82eca336e3
--- /dev/null
+++ b/spring-boot-h2/spring-boot-h2-database/.gitignore
@@ -0,0 +1,25 @@
+/target/
+!.mvn/wrapper/maven-wrapper.jar
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/build/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
\ No newline at end of file
diff --git a/spring-boot-h2/spring-boot-h2-database/pom.xml b/spring-boot-h2/spring-boot-h2-database/pom.xml
new file mode 100644
index 0000000000..4b660334da
--- /dev/null
+++ b/spring-boot-h2/spring-boot-h2-database/pom.xml
@@ -0,0 +1,52 @@
+
+
+ 4.0.0
+
+ com.baeldung.h2db
+ spring-boot-h2-database
+ 0.0.1-SNAPSHOT
+ jar
+
+ Demo Spring Boot applications that starts H2 in memory database
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.0.4.RELEASE
+
+
+
+
+ UTF-8
+ UTF-8
+ 1.8
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-data-jpa
+
+
+
+ com.h2database
+ h2
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
diff --git a/spring-boot-h2/spring-boot-h2-database/src/main/java/com/baeldung/h2db/demo/SpringBootApp.java b/spring-boot-h2/spring-boot-h2-database/src/main/java/com/baeldung/h2db/demo/SpringBootApp.java
new file mode 100644
index 0000000000..1fe080ec22
--- /dev/null
+++ b/spring-boot-h2/spring-boot-h2-database/src/main/java/com/baeldung/h2db/demo/SpringBootApp.java
@@ -0,0 +1,60 @@
+package com.baeldung.h2db.demo;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import javax.annotation.PostConstruct;
+import org.h2.tools.Server;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.RowMapper;
+
+@SpringBootApplication
+public class SpringBootApp {
+
+ @Autowired
+ private JdbcTemplate jdbcTemplate;
+
+ public static void main(String[] args) {
+ SpringApplication.run(SpringBootApp.class, args);
+ }
+
+ @PostConstruct
+ private void initDb() {
+ System.out.println(String.format(
+ "****** Creating table: %s, and Inserting test data ******", "Employees"));
+
+ String sqlStatements[] = {
+ "drop table employees if exists",
+ "create table employees(id serial,first_name varchar(255),last_name varchar(255))",
+ "insert into employees(first_name, last_name) values('Eugen','Paraschiv')",
+ "insert into employees(first_name, last_name) values('Scott','Tiger')"
+ };
+
+ Arrays.asList(sqlStatements).stream().forEach(sql -> {
+ System.out.println(sql);
+ jdbcTemplate.execute(sql);
+ });
+
+ System.out.println(String.format("****** Fetching from table: %s ******", "Employees"));
+ jdbcTemplate.query("select id,first_name,last_name from employees",
+ new RowMapper