* BAEL-7490 read write file in separate thread

* Change the to try resources

* Update the code to sync with article

* BAEL-7489 first draft

* Change to spring-kafka-3 module

* moved to another repo
This commit is contained in:
Wynn Teo 2024-02-19 03:47:45 +08:00 committed by GitHub
parent dd3503d8be
commit 6a80f85625
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 191 additions and 0 deletions

View File

@ -0,0 +1,56 @@
package com.baeldung.spring.kafka.kafkaexception;
import java.lang.management.ManagementFactory;
import java.util.Properties;
import java.util.UUID;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringSerializer;
public class HandleInstanceAlreadyExistsException {
public static void generateUniqueClientIDUsingUUIDRandom() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
String clientId = "my-producer-" + UUID.randomUUID();
props.setProperty("client.id", clientId);
KafkaProducer<String, String> producer1 = new KafkaProducer<>(props);
clientId = "my-producer-" + UUID.randomUUID();
props.setProperty("client.id", clientId);
KafkaProducer<String, String> producer2 = new KafkaProducer<>(props);
}
public static void closeProducerProperlyBeforeReinstantiate() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "my-producer");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
KafkaProducer<String, String> producer1 = new KafkaProducer<>(props);
producer1.close();
producer1 = new KafkaProducer<>(props);
}
public static void useUniqueObjectName() throws Exception {
MBeanServer mBeanServer1 = ManagementFactory.getPlatformMBeanServer();
MBeanServer mBeanServer2 = ManagementFactory.getPlatformMBeanServer();
ObjectName objectName1 = new ObjectName("kafka.server:type=KafkaMetrics,id=metric1");
ObjectName objectName2 = new ObjectName("kafka.server:type=KafkaMetrics,id=metric2");
MyMBean mBean1 = new MyMBean();
mBeanServer1.registerMBean(mBean1, objectName1);
MyMBean mBean2 = new MyMBean();
mBeanServer2.registerMBean(mBean2, objectName2);
}
}

View File

@ -0,0 +1,12 @@
package com.baeldung.spring.kafka.kafkaexception;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaAppMain {
public static void main(String[] args) {
SpringApplication.run(KafkaAppMain.class, args);
}
}

View File

@ -0,0 +1,123 @@
package com.baeldung.spring.kafka.kafkaexception;
import java.lang.management.ManagementFactory;
import java.util.Properties;
import javax.management.Attribute;
import javax.management.AttributeList;
import javax.management.AttributeNotFoundException;
import javax.management.DynamicMBean;
import javax.management.InvalidAttributeValueException;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanConstructorInfo;
import javax.management.MBeanException;
import javax.management.MBeanInfo;
import javax.management.MBeanNotificationInfo;
import javax.management.MBeanOperationInfo;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.ReflectionException;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.stereotype.Service;
@Service
public class SimulateInstanceAlreadyExistsException {
public static void jmxRegistrationConflicts() throws Exception {
// Create two instances of MBeanServer
MBeanServer mBeanServer1 = ManagementFactory.getPlatformMBeanServer();
MBeanServer mBeanServer2 = ManagementFactory.getPlatformMBeanServer();
// Define the same ObjectName for both MBeans
ObjectName objectName = new ObjectName("kafka.server:type=KafkaMetrics");
// Create and register the first MBean
MyMBean mBean1 = new MyMBean();
mBeanServer1.registerMBean(mBean1, objectName);
// Attempt to register the second MBean with the same ObjectName
MyMBean mBean2 = new MyMBean();
mBeanServer2.registerMBean(mBean2, objectName);
}
public static void duplicateConsumerClientID() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "my-consumer");
props.put("group.id", "test-group");
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", StringDeserializer.class);
// Simulating concurrent client creation by multiple threads
for (int i = 0; i < 3; i++) {
new Thread(() -> {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
}).start();
}
}
public void duplicateProducerClientID() throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "my-producer");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
KafkaProducer<String, String> producer1 = new KafkaProducer<>(props);
// Attempting to create another producer using same client.id
KafkaProducer<String, String> producer2 = new KafkaProducer<>(props);
}
public static void unclosedProducerAndReinitialize() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("client.id", "my-producer");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", StringSerializer.class);
KafkaProducer<String, String> producer1 = new KafkaProducer<>(props);
// Attempting to reinitialize without proper close
producer1 = new KafkaProducer<>(props);
}
}
class MyMBean implements DynamicMBean {
@Override
public Object getAttribute(String attribute) throws AttributeNotFoundException, MBeanException, ReflectionException {
return null;
}
@Override
public void setAttribute(Attribute attribute) throws AttributeNotFoundException, InvalidAttributeValueException, MBeanException, ReflectionException {
}
@Override
public AttributeList getAttributes(String[] attributes) {
return null;
}
@Override
public AttributeList setAttributes(AttributeList attributes) {
return null;
}
@Override
public Object invoke(String actionName, Object[] params, String[] signature) throws MBeanException, ReflectionException {
return null;
}
@Override
public MBeanInfo getMBeanInfo() {
MBeanAttributeInfo[] attributes = new MBeanAttributeInfo[0];
MBeanConstructorInfo[] constructors = new MBeanConstructorInfo[0];
MBeanOperationInfo[] operations = new MBeanOperationInfo[0];
MBeanNotificationInfo[] notifications = new MBeanNotificationInfo[0];
return new MBeanInfo(MyMBean.class.getName(), "My MBean", attributes, constructors, operations, notifications);
}
}