Merge branch 'master' into JAVA-1782-main
This commit is contained in:
commit
1ccbc4027d
|
@ -0,0 +1,40 @@
|
|||
package com.baeldung.varargs;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class HeapPollutionUnitTest {
|
||||
|
||||
@Test(expected = ClassCastException.class)
|
||||
public void givenGenericVararg_whenUsedUnsafe_shouldThrowClassCastException() {
|
||||
String one = firstOfFirst(Arrays.asList("one", "two"), Collections.emptyList());
|
||||
|
||||
assertEquals("one", one);
|
||||
}
|
||||
|
||||
@Test(expected = ClassCastException.class)
|
||||
public void givenGenericVararg_whenRefEscapes_mayCauseSubtleBugs() {
|
||||
String[] args = returnAsIs("One", "Two");
|
||||
}
|
||||
|
||||
private static String firstOfFirst(List<String>... strings) {
|
||||
List<Integer> ints = Collections.singletonList(42);
|
||||
Object[] objects = strings;
|
||||
objects[0] = ints;
|
||||
|
||||
return strings[0].get(0);
|
||||
}
|
||||
|
||||
private static <T> T[] toArray(T... arguments) {
|
||||
return arguments;
|
||||
}
|
||||
|
||||
private static <T> T[] returnAsIs(T a, T b) {
|
||||
return toArray(a, b);
|
||||
}
|
||||
}
|
|
@ -200,7 +200,7 @@ public class ReflectionUnitTest {
|
|||
@Test
|
||||
public void givenClassField_whenSetsAndGetsValue_thenCorrect() throws Exception {
|
||||
final Class<?> birdClass = Class.forName("com.baeldung.java.reflection.Bird");
|
||||
final Bird bird = (Bird) birdClass.newInstance();
|
||||
final Bird bird = (Bird) birdClass.getConstructor().newInstance();
|
||||
final Field field = birdClass.getDeclaredField("walks");
|
||||
field.setAccessible(true);
|
||||
|
||||
|
@ -266,7 +266,7 @@ public class ReflectionUnitTest {
|
|||
@Test
|
||||
public void givenMethod_whenInvokes_thenCorrect() throws Exception {
|
||||
final Class<?> birdClass = Class.forName("com.baeldung.java.reflection.Bird");
|
||||
final Bird bird = (Bird) birdClass.newInstance();
|
||||
final Bird bird = (Bird) birdClass.getConstructor().newInstance();
|
||||
final Method setWalksMethod = birdClass.getDeclaredMethod("setWalks", boolean.class);
|
||||
final Method walksMethod = birdClass.getDeclaredMethod("walks");
|
||||
final boolean walks = (boolean) walksMethod.invoke(bird);
|
||||
|
|
|
@ -26,6 +26,17 @@ public class StringToIntOrIntegerUnitTest {
|
|||
assertThat(result).isEqualTo(new Integer(42));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenString_whenCallingValueOf_shouldCacheSomeValues() {
|
||||
for (int i = -128; i <= 127; i++) {
|
||||
String value = i + "";
|
||||
Integer first = Integer.valueOf(value);
|
||||
Integer second = Integer.valueOf(value);
|
||||
|
||||
assertThat(first).isSameAs(second);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenString_whenCallingIntegerConstructor_shouldConvertToInt() {
|
||||
String givenString = "42";
|
||||
|
|
|
@ -159,6 +159,13 @@
|
|||
<version>${byte-buddy.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.kafka</groupId>
|
||||
<artifactId>kafka-clients</artifactId>
|
||||
<version>${kafka.version}</version>
|
||||
<classifier>test</classifier>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<repositories>
|
||||
|
|
|
@ -0,0 +1,17 @@
|
|||
package com.baeldung.kafka.producer;
|
||||
|
||||
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
|
||||
import org.apache.kafka.common.Cluster;
|
||||
|
||||
public class EvenOddPartitioner extends DefaultPartitioner {
|
||||
|
||||
@Override
|
||||
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
|
||||
|
||||
if (((String) key).length() % 2 == 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
package com.baeldung.kafka.producer;
|
||||
|
||||
import org.apache.kafka.clients.producer.Producer;
|
||||
import org.apache.kafka.clients.producer.ProducerRecord;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
public class KafkaProducer {
|
||||
|
||||
private final Producer<String, String> producer;
|
||||
|
||||
public KafkaProducer(Producer<String, String> producer) {
|
||||
this.producer = producer;
|
||||
}
|
||||
|
||||
public Future<RecordMetadata> send(String key, String value) {
|
||||
ProducerRecord record = new ProducerRecord("topic_sports_news",
|
||||
key, value);
|
||||
return producer.send(record);
|
||||
}
|
||||
|
||||
public void flush() {
|
||||
producer.flush();
|
||||
}
|
||||
|
||||
public void beginTransaction() {
|
||||
producer.beginTransaction();
|
||||
}
|
||||
|
||||
public void initTransaction() {
|
||||
producer.initTransactions();
|
||||
}
|
||||
|
||||
public void commitTransaction() {
|
||||
producer.commitTransaction();
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,116 @@
|
|||
package com.baeldung.kafka.producer;
|
||||
|
||||
import com.baeldung.kafka.producer.EvenOddPartitioner;
|
||||
import com.baeldung.kafka.producer.KafkaProducer;
|
||||
import org.apache.kafka.clients.producer.MockProducer;
|
||||
import org.apache.kafka.clients.producer.RecordMetadata;
|
||||
import org.apache.kafka.common.Cluster;
|
||||
import org.apache.kafka.common.Node;
|
||||
import org.apache.kafka.common.PartitionInfo;
|
||||
import org.apache.kafka.common.serialization.StringSerializer;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import static java.util.Collections.emptySet;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
class KafkaProducerUnitTest {
|
||||
|
||||
private final String TOPIC_NAME = "topic_sports_news";
|
||||
|
||||
private KafkaProducer kafkaProducer;
|
||||
private MockProducer<String, String> mockProducer;
|
||||
|
||||
private void buildMockProducer(boolean autoComplete) {
|
||||
this.mockProducer = new MockProducer<>(autoComplete, new StringSerializer(), new StringSerializer());
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenKeyValue_whenSend_thenVerifyHistory() throws ExecutionException, InterruptedException {
|
||||
|
||||
buildMockProducer(true);
|
||||
//when
|
||||
kafkaProducer = new KafkaProducer(mockProducer);
|
||||
Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}");
|
||||
|
||||
//then
|
||||
assertTrue(mockProducer.history().size() == 1);
|
||||
assertTrue(mockProducer.history().get(0).key().equalsIgnoreCase("data"));
|
||||
assertTrue(recordMetadataFuture.get().partition() == 0);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenKeyValue_whenSend_thenSendOnlyAfterFlush() {
|
||||
|
||||
buildMockProducer(false);
|
||||
//when
|
||||
kafkaProducer = new KafkaProducer(mockProducer);
|
||||
Future<RecordMetadata> record = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}");
|
||||
assertFalse(record.isDone());
|
||||
|
||||
//then
|
||||
kafkaProducer.flush();
|
||||
assertTrue(record.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenKeyValue_whenSend_thenReturnException() {
|
||||
|
||||
buildMockProducer(false);
|
||||
//when
|
||||
kafkaProducer = new KafkaProducer(mockProducer);
|
||||
Future<RecordMetadata> record = kafkaProducer.send("site", "{\"site\" : \"baeldung\"}");
|
||||
RuntimeException e = new RuntimeException();
|
||||
mockProducer.errorNext(e);
|
||||
//then
|
||||
try {
|
||||
record.get();
|
||||
} catch (ExecutionException | InterruptedException ex) {
|
||||
assertEquals(e, ex.getCause());
|
||||
}
|
||||
assertTrue(record.isDone());
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenKeyValue_whenSendWithTxn_thenSendOnlyOnTxnCommit() {
|
||||
|
||||
buildMockProducer(true);
|
||||
//when
|
||||
kafkaProducer = new KafkaProducer(mockProducer);
|
||||
kafkaProducer.initTransaction();
|
||||
kafkaProducer.beginTransaction();
|
||||
Future<RecordMetadata> record = kafkaProducer.send("data", "{\"site\" : \"baeldung\"}");
|
||||
|
||||
//then
|
||||
assertTrue(mockProducer.history().isEmpty());
|
||||
kafkaProducer.commitTransaction();
|
||||
assertTrue(mockProducer.history().size() == 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
void givenKeyValue_whenSendWithPartitioning_thenVerifyPartitionNumber() throws ExecutionException, InterruptedException {
|
||||
|
||||
PartitionInfo partitionInfo0 = new PartitionInfo(TOPIC_NAME, 0, null, null, null);
|
||||
PartitionInfo partitionInfo1 = new PartitionInfo(TOPIC_NAME, 1, null, null, null);
|
||||
List<PartitionInfo> list = new ArrayList<>();
|
||||
list.add(partitionInfo0);
|
||||
list.add(partitionInfo1);
|
||||
Cluster cluster = new Cluster("kafkab", new ArrayList<Node>(), list, emptySet(), emptySet());
|
||||
this.mockProducer = new MockProducer<>(cluster, true, new EvenOddPartitioner(), new StringSerializer(), new StringSerializer());
|
||||
//when
|
||||
kafkaProducer = new KafkaProducer(mockProducer);
|
||||
Future<RecordMetadata> recordMetadataFuture = kafkaProducer.send("partition", "{\"site\" : \"baeldung\"}");
|
||||
|
||||
//then
|
||||
assertTrue(recordMetadataFuture.get().partition() == 1);
|
||||
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue