From 8f1de1c3b702a07307e172b8745ece9c116f5f05 Mon Sep 17 00:00:00 2001 From: Chirag Dewan Date: Sun, 24 May 2020 13:58:50 +0530 Subject: [PATCH 1/8] BAEL3889 - Kafka Mock Producer --- .../baeldung/kafka/EvenOddPartitioner.java | 16 +++ .../com/baeldung/kafka/KafkaProducer.java | 40 ++++++ .../baeldung/kafka/KafkaProducerUnitTest.java | 114 ++++++++++++++++++ 3 files changed, 170 insertions(+) create mode 100644 libraries/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java create mode 100644 libraries/src/main/java/com/baeldung/kafka/KafkaProducer.java create mode 100644 libraries/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java diff --git a/libraries/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java b/libraries/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java new file mode 100644 index 0000000000..12c86828dd --- /dev/null +++ b/libraries/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java @@ -0,0 +1,16 @@ +package com.baeldung.kafka; + +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; +import org.apache.kafka.common.Cluster; + +public class EvenOddPartitioner extends DefaultPartitioner { + + @Override + public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { + + if (((String)key).length() % 2 == 0) + return 0; + + return 1; + } +} diff --git a/libraries/src/main/java/com/baeldung/kafka/KafkaProducer.java b/libraries/src/main/java/com/baeldung/kafka/KafkaProducer.java new file mode 100644 index 0000000000..8574cd1c40 --- /dev/null +++ b/libraries/src/main/java/com/baeldung/kafka/KafkaProducer.java @@ -0,0 +1,40 @@ +package com.baeldung.kafka; + +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; + +import java.util.concurrent.Future; + +public class KafkaProducer { + + private final Producer producer; + + public KafkaProducer(Producer producer) { + this.producer = producer; + } + + public Future send(String key, String value) { + ProducerRecord record = new ProducerRecord("topic_sports_news", + key, value); + return producer.send(record); + } + + public void flush() { + producer.flush(); + } + + public void beginTransaction() { + producer.beginTransaction(); + } + + public void initTransaction() { + producer.initTransactions(); + } + + public void commitTransaction() { + producer.commitTransaction(); + } + + +} diff --git a/libraries/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java b/libraries/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java new file mode 100644 index 0000000000..ce3803322c --- /dev/null +++ b/libraries/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java @@ -0,0 +1,114 @@ +package com.baeldung.kafka; + +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static java.util.Collections.emptySet; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class KafkaProducerUnitTest { + + private final String TOPIC_NAME = "topic_sports_news"; + + private KafkaProducer kafkaProducer; + private MockProducer mockProducer; + + private void buildMockProducer(boolean autoComplete) { + this.mockProducer = new MockProducer<>(autoComplete, new StringSerializer(), new StringSerializer()); + } + + @Test + void givenKeyValue_whenSend_thenVerifyHistory() throws ExecutionException, InterruptedException { + + buildMockProducer(true); + //when + kafkaProducer = new KafkaProducer(mockProducer); + Future recordMetadataFuture = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}"); + + //then + assertTrue(mockProducer.history().size() == 1); + assertTrue(mockProducer.history().get(0).key().equalsIgnoreCase("data")); + assertTrue(recordMetadataFuture.get().partition() == 0); + + } + + @Test + void givenKeyValue_whenSend_thenSendOnlyAfterFlush() { + + buildMockProducer(false); + //when + kafkaProducer = new KafkaProducer(mockProducer); + Future record = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}"); + assertFalse(record.isDone()); + + //then + kafkaProducer.flush(); + assertTrue(record.isDone()); + } + + @Test + void givenKeyValue_whenSend_thenReturnException() { + + buildMockProducer(false); + //when + kafkaProducer = new KafkaProducer(mockProducer); + Future record = kafkaProducer.send("site", "{\"site\" : \"baeldung\"}"); + RuntimeException e = new RuntimeException(); + mockProducer.errorNext(e); + //then + try { + record.get(); + } catch (ExecutionException | InterruptedException ex) { + assertEquals(e, ex.getCause()); + } + assertTrue(record.isDone()); + } + + @Test + void givenKeyValue_whenSendWithTxn_thenSendOnlyOnTxnCommit() { + + buildMockProducer(true); + //when + kafkaProducer = new KafkaProducer(mockProducer); + kafkaProducer.initTransaction(); + kafkaProducer.beginTransaction(); + Future record = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}"); + + //then + assertTrue(mockProducer.history().isEmpty()); + kafkaProducer.commitTransaction(); + assertTrue(mockProducer.history().size() == 1); + } + + @Test + void givenKeyValue_whenSendWithPartitioning_thenVerifyPartitionNumber() throws ExecutionException, InterruptedException { + + PartitionInfo partitionInfo0 = new PartitionInfo(TOPIC_NAME, 0, null, null, null); + PartitionInfo partitionInfo1 = new PartitionInfo(TOPIC_NAME, 1, null, null, null); + List list = new ArrayList<>(); + list.add(partitionInfo0); + list.add(partitionInfo1); + Cluster cluster = new Cluster("kafkab", new ArrayList(), list, emptySet(), emptySet()); + this.mockProducer = new MockProducer<>(cluster, true, new EvenOddPartitioner(), new StringSerializer(), new StringSerializer()); + //when + kafkaProducer = new KafkaProducer(mockProducer); + Future recordMetadataFuture = kafkaProducer.send("partition", "{\"site\" : \"baeldung\"}"); + + //then + assertTrue(recordMetadataFuture.get().partition() == 1); + + } + +} \ No newline at end of file From 7943a59707c9534e2f24c90106b32efdcc771dbe Mon Sep 17 00:00:00 2001 From: Chirag Dewan Date: Tue, 26 May 2020 17:23:59 +0530 Subject: [PATCH 2/8] BAEL3889 - Kafka Mock Producer --- .../src/main/java/com/baeldung/kafka/EvenOddPartitioner.java | 0 .../src/main/java/com/baeldung/kafka/KafkaProducer.java | 0 .../src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename {libraries => libraries-6}/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java (100%) rename {libraries => libraries-6}/src/main/java/com/baeldung/kafka/KafkaProducer.java (100%) rename {libraries => libraries-6}/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java (100%) diff --git a/libraries/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java b/libraries-6/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java similarity index 100% rename from libraries/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java rename to libraries-6/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java diff --git a/libraries/src/main/java/com/baeldung/kafka/KafkaProducer.java b/libraries-6/src/main/java/com/baeldung/kafka/KafkaProducer.java similarity index 100% rename from libraries/src/main/java/com/baeldung/kafka/KafkaProducer.java rename to libraries-6/src/main/java/com/baeldung/kafka/KafkaProducer.java diff --git a/libraries/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java b/libraries-6/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java similarity index 100% rename from libraries/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java rename to libraries-6/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java From 75a44b99652b87b3c3c7781a50dd8b6767e24ce7 Mon Sep 17 00:00:00 2001 From: Chirag Dewan Date: Sun, 31 May 2020 18:39:20 +0530 Subject: [PATCH 3/8] BAEL3889 - Kafka Mock Producer --- libraries-data-2/pom.xml | 7 +++++++ .../main/java/com/baeldung/kafka/EvenOddPartitioner.java | 0 .../src/main/java/com/baeldung/kafka/KafkaProducer.java | 0 .../java/com/baeldung/kafka/KafkaProducerUnitTest.java | 0 4 files changed, 7 insertions(+) rename {libraries-6 => libraries-data-2}/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java (100%) rename {libraries-6 => libraries-data-2}/src/main/java/com/baeldung/kafka/KafkaProducer.java (100%) rename {libraries-6 => libraries-data-2}/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java (100%) diff --git a/libraries-data-2/pom.xml b/libraries-data-2/pom.xml index bdfb2c5ed6..2d27ec2107 100644 --- a/libraries-data-2/pom.xml +++ b/libraries-data-2/pom.xml @@ -153,6 +153,13 @@ renjin-script-engine ${renjin.version} + + org.apache.kafka + kafka-clients + ${kafka.version} + test + test + diff --git a/libraries-6/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java b/libraries-data-2/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java similarity index 100% rename from libraries-6/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java rename to libraries-data-2/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java diff --git a/libraries-6/src/main/java/com/baeldung/kafka/KafkaProducer.java b/libraries-data-2/src/main/java/com/baeldung/kafka/KafkaProducer.java similarity index 100% rename from libraries-6/src/main/java/com/baeldung/kafka/KafkaProducer.java rename to libraries-data-2/src/main/java/com/baeldung/kafka/KafkaProducer.java diff --git a/libraries-6/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java b/libraries-data-2/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java similarity index 100% rename from libraries-6/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java rename to libraries-data-2/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java From e245b2eca3347f02e0ab45011ffc8b1c880f01f3 Mon Sep 17 00:00:00 2001 From: Chirag Dewan Date: Sun, 31 May 2020 18:59:54 +0530 Subject: [PATCH 4/8] BAEL3889 - Changing the package structure --- .../com/baeldung/kafka/{ => producer}/EvenOddPartitioner.java | 0 .../java/com/baeldung/kafka/{ => producer}/KafkaProducer.java | 0 .../com/baeldung/kafka/{ => producer}/KafkaProducerUnitTest.java | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename libraries-data-2/src/main/java/com/baeldung/kafka/{ => producer}/EvenOddPartitioner.java (100%) rename libraries-data-2/src/main/java/com/baeldung/kafka/{ => producer}/KafkaProducer.java (100%) rename libraries-data-2/src/test/java/com/baeldung/kafka/{ => producer}/KafkaProducerUnitTest.java (100%) diff --git a/libraries-data-2/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java b/libraries-data-2/src/main/java/com/baeldung/kafka/producer/EvenOddPartitioner.java similarity index 100% rename from libraries-data-2/src/main/java/com/baeldung/kafka/EvenOddPartitioner.java rename to libraries-data-2/src/main/java/com/baeldung/kafka/producer/EvenOddPartitioner.java diff --git a/libraries-data-2/src/main/java/com/baeldung/kafka/KafkaProducer.java b/libraries-data-2/src/main/java/com/baeldung/kafka/producer/KafkaProducer.java similarity index 100% rename from libraries-data-2/src/main/java/com/baeldung/kafka/KafkaProducer.java rename to libraries-data-2/src/main/java/com/baeldung/kafka/producer/KafkaProducer.java diff --git a/libraries-data-2/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java b/libraries-data-2/src/test/java/com/baeldung/kafka/producer/KafkaProducerUnitTest.java similarity index 100% rename from libraries-data-2/src/test/java/com/baeldung/kafka/KafkaProducerUnitTest.java rename to libraries-data-2/src/test/java/com/baeldung/kafka/producer/KafkaProducerUnitTest.java From be292ec3f22c4ca82e7154718d89595c9cd1c9c6 Mon Sep 17 00:00:00 2001 From: Ali Dehghani Date: Tue, 2 Jun 2020 21:32:45 +0430 Subject: [PATCH 5/8] Avoid Using newInstance on Class --- .../reflection/java/reflection/ReflectionUnitTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core-java-modules/core-java-reflection/src/test/java/com/baeldung/reflection/java/reflection/ReflectionUnitTest.java b/core-java-modules/core-java-reflection/src/test/java/com/baeldung/reflection/java/reflection/ReflectionUnitTest.java index 0c090901e7..a791d64874 100644 --- a/core-java-modules/core-java-reflection/src/test/java/com/baeldung/reflection/java/reflection/ReflectionUnitTest.java +++ b/core-java-modules/core-java-reflection/src/test/java/com/baeldung/reflection/java/reflection/ReflectionUnitTest.java @@ -200,7 +200,7 @@ public class ReflectionUnitTest { @Test public void givenClassField_whenSetsAndGetsValue_thenCorrect() throws Exception { final Class birdClass = Class.forName("com.baeldung.java.reflection.Bird"); - final Bird bird = (Bird) birdClass.newInstance(); + final Bird bird = (Bird) birdClass.getConstructor().newInstance(); final Field field = birdClass.getDeclaredField("walks"); field.setAccessible(true); @@ -266,7 +266,7 @@ public class ReflectionUnitTest { @Test public void givenMethod_whenInvokes_thenCorrect() throws Exception { final Class birdClass = Class.forName("com.baeldung.java.reflection.Bird"); - final Bird bird = (Bird) birdClass.newInstance(); + final Bird bird = (Bird) birdClass.getConstructor().newInstance(); final Method setWalksMethod = birdClass.getDeclaredMethod("setWalks", boolean.class); final Method walksMethod = birdClass.getDeclaredMethod("walks"); final boolean walks = (boolean) walksMethod.invoke(bird); From b07bcce7efcdf939dd336b2dceabbb88449b89b9 Mon Sep 17 00:00:00 2001 From: Ali Dehghani Date: Tue, 2 Jun 2020 22:03:54 +0430 Subject: [PATCH 6/8] Introducing Integer Cache --- .../stringtoint/StringToIntOrIntegerUnitTest.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/core-java-modules/core-java-string-conversions/src/test/java/com/baeldung/stringtoint/StringToIntOrIntegerUnitTest.java b/core-java-modules/core-java-string-conversions/src/test/java/com/baeldung/stringtoint/StringToIntOrIntegerUnitTest.java index 106f1fc974..336b2ac324 100644 --- a/core-java-modules/core-java-string-conversions/src/test/java/com/baeldung/stringtoint/StringToIntOrIntegerUnitTest.java +++ b/core-java-modules/core-java-string-conversions/src/test/java/com/baeldung/stringtoint/StringToIntOrIntegerUnitTest.java @@ -26,6 +26,17 @@ public class StringToIntOrIntegerUnitTest { assertThat(result).isEqualTo(new Integer(42)); } + @Test + public void givenString_whenCallingValueOf_shouldCacheSomeValues() { + for (int i = -128; i <= 127; i++) { + String value = i + ""; + Integer first = Integer.valueOf(value); + Integer second = Integer.valueOf(value); + + assertThat(first).isSameAs(second); + } + } + @Test public void givenString_whenCallingIntegerConstructor_shouldConvertToInt() { String givenString = "42"; From 829140d488d33795b1c5b674cc4865f116391ba4 Mon Sep 17 00:00:00 2001 From: Ali Dehghani Date: Wed, 3 Jun 2020 14:10:58 +0430 Subject: [PATCH 7/8] Polluting the Heap --- .../varargs/HeapPollutionUnitTest.java | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) create mode 100644 core-java-modules/core-java-lang-syntax/src/test/java/com/baeldung/varargs/HeapPollutionUnitTest.java diff --git a/core-java-modules/core-java-lang-syntax/src/test/java/com/baeldung/varargs/HeapPollutionUnitTest.java b/core-java-modules/core-java-lang-syntax/src/test/java/com/baeldung/varargs/HeapPollutionUnitTest.java new file mode 100644 index 0000000000..ced2c00bea --- /dev/null +++ b/core-java-modules/core-java-lang-syntax/src/test/java/com/baeldung/varargs/HeapPollutionUnitTest.java @@ -0,0 +1,40 @@ +package com.baeldung.varargs; + +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class HeapPollutionUnitTest { + + @Test(expected = ClassCastException.class) + public void givenGenericVararg_whenUsedUnsafe_shouldThrowClassCastException() { + String one = firstOfFirst(Arrays.asList("one", "two"), Collections.emptyList()); + + assertEquals("one", one); + } + + @Test(expected = ClassCastException.class) + public void givenGenericVararg_whenRefEscapes_mayCauseSubtleBugs() { + String[] args = returnAsIs("One", "Two"); + } + + private static String firstOfFirst(List... strings) { + List ints = Collections.singletonList(42); + Object[] objects = strings; + objects[0] = ints; + + return strings[0].get(0); + } + + private static T[] toArray(T... arguments) { + return arguments; + } + + private static T[] returnAsIs(T a, T b) { + return toArray(a, b); + } +} From de8acfd6e02afec5e466823b73865caab01c6f88 Mon Sep 17 00:00:00 2001 From: Chirag Dewan Date: Sun, 7 Jun 2020 15:29:59 +0530 Subject: [PATCH 8/8] BAEL3889 - Adding braces in EvenOddPartitioner --- .../java/com/baeldung/kafka/producer/EvenOddPartitioner.java | 5 +++-- .../main/java/com/baeldung/kafka/producer/KafkaProducer.java | 2 +- .../com/baeldung/kafka/producer/KafkaProducerUnitTest.java | 4 +++- 3 files changed, 7 insertions(+), 4 deletions(-) diff --git a/libraries-data-2/src/main/java/com/baeldung/kafka/producer/EvenOddPartitioner.java b/libraries-data-2/src/main/java/com/baeldung/kafka/producer/EvenOddPartitioner.java index 12c86828dd..1c77226037 100644 --- a/libraries-data-2/src/main/java/com/baeldung/kafka/producer/EvenOddPartitioner.java +++ b/libraries-data-2/src/main/java/com/baeldung/kafka/producer/EvenOddPartitioner.java @@ -1,4 +1,4 @@ -package com.baeldung.kafka; +package com.baeldung.kafka.producer; import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.Cluster; @@ -8,8 +8,9 @@ public class EvenOddPartitioner extends DefaultPartitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { - if (((String)key).length() % 2 == 0) + if (((String) key).length() % 2 == 0) { return 0; + } return 1; } diff --git a/libraries-data-2/src/main/java/com/baeldung/kafka/producer/KafkaProducer.java b/libraries-data-2/src/main/java/com/baeldung/kafka/producer/KafkaProducer.java index 8574cd1c40..911c9ed3d7 100644 --- a/libraries-data-2/src/main/java/com/baeldung/kafka/producer/KafkaProducer.java +++ b/libraries-data-2/src/main/java/com/baeldung/kafka/producer/KafkaProducer.java @@ -1,4 +1,4 @@ -package com.baeldung.kafka; +package com.baeldung.kafka.producer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; diff --git a/libraries-data-2/src/test/java/com/baeldung/kafka/producer/KafkaProducerUnitTest.java b/libraries-data-2/src/test/java/com/baeldung/kafka/producer/KafkaProducerUnitTest.java index ce3803322c..a7156ed886 100644 --- a/libraries-data-2/src/test/java/com/baeldung/kafka/producer/KafkaProducerUnitTest.java +++ b/libraries-data-2/src/test/java/com/baeldung/kafka/producer/KafkaProducerUnitTest.java @@ -1,5 +1,7 @@ -package com.baeldung.kafka; +package com.baeldung.kafka.producer; +import com.baeldung.kafka.producer.EvenOddPartitioner; +import com.baeldung.kafka.producer.KafkaProducer; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster;