AMQ-9504 - Prevent registering duplicate mKahadb adapters

This fixes an issue on start up of a broker that is configured with
multiple mKahaDB filtered adapters and one is configured with
perDestination=true. Before this fix a duplicate persistence adapter
could be created because the filter did not check for existing matches.

Patch applied with thanks to Ritesh Adval
This commit is contained in:
Christopher L. Shannon 2024-05-22 09:22:01 -04:00
parent 8464b2d658
commit ddfb36515c
2 changed files with 168 additions and 8 deletions

View File

@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -88,7 +89,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
}; };
final DelegateDestinationMap destinationMap = new DelegateDestinationMap(); final DelegateDestinationMap destinationMap = new DelegateDestinationMap();
List<PersistenceAdapter> adapters = new CopyOnWriteArrayList<PersistenceAdapter>(); List<PersistenceAdapter> adapters = new CopyOnWriteArrayList<>();
private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB"); private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this); MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
@ -383,16 +384,18 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
} }
private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) throws IOException { private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) throws IOException {
FileFilter destinationNames = new FileFilter() { FileFilter destinationNames = file ->
@Override file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
public boolean accept(File file) {
return file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
}
};
File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames); File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
if (candidates != null) { if (candidates != null) {
Set<File> existing = adapters.stream().map(PersistenceAdapter::getDirectory).collect(
Collectors.toSet());
for (File candidate : candidates) { for (File candidate : candidates) {
registerExistingAdapter(template, candidate); if(!existing.contains(candidate)) {
LOG.debug("Adapter does not exist for dir: {} so will register it", candidate);
registerExistingAdapter(template, candidate);
}
} }
} }
} }

View File

@ -0,0 +1,157 @@
package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import jakarta.jms.Connection;
import jakarta.jms.DeliveryMode;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class MultiKahaDBMultipleFilteredAdapterTest {
private final static int maxFileLength = 1024*1024*32;
private static final String QUEUE_NAME = "QUEUE.amqMultiKahadbMultiFilteredAdapter";
private static final String TOPIC_NAME = "TOPIC.amqMultiKahadbMultiFilteredAdapter";
private static final int MESSAGE_COUNT = 100;
private BrokerService broker;
private ActiveMQConnectionFactory cf;
@Before
public void setUp() throws Exception {
prepareBrokerWithMultiStore(true);
TransportConnector connector = broker.addConnector("tcp://localhost:0");
broker.start();
broker.waitUntilStarted();
cf = new ActiveMQConnectionFactory("failover://(" + connector.getConnectUri() + ")");
}
@After
public void tearDown() throws Exception {
broker.stop();
}
@Test
public void testTopicWildcardAndPerDestinationFilteredAdapter() throws Exception {
//create one topic and queue and send messages
sendMessages(false);
sendMessages(true);
//the adapters will persist the data, topic will be by wildcard filtered adapter
//and queue will be by per destionation adapter
//stop broker and restart, the bug is that on restart wildcard filtered adapter adds one KahaDBPersistenceAdapter
// when calling MultiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters
//and on the call to doStart of MultiKahaDBPersistenceAdapter it uses per destination filtered adapter to add
//another KahaDBPersistenceAdapter and fails on jmx bean registration
// the fix is to make sure to check the persistent adapter already exists or not when we do per destination
// filtered adapter processing and only add if one does not exists already
var multiKaha = (MultiKahaDBPersistenceAdapter) broker.getPersistenceAdapter();
// Verify 2 adapters created and not 3
assertEquals(2, multiKaha.getAdapters().size());
Set<File> dirs = multiKaha.getAdapters().stream().map(adapter -> adapter.getDirectory()).collect(
Collectors.toSet());
broker.stop();
broker.waitUntilStopped();
//restart should succeed with fix
prepareBrokerWithMultiStore(false);
broker.start();
broker.waitUntilStarted();
// Verify 2 adapters created and not 3 and same dirs
multiKaha = (MultiKahaDBPersistenceAdapter) broker.getPersistenceAdapter();
assertEquals(2, multiKaha.getAdapters().size());
assertEquals(dirs, multiKaha.getAdapters().stream().map(adapter -> adapter.getDirectory()).collect(
Collectors.toSet()));
}
private void sendMessages(boolean isTopic) throws Exception {
Connection connection = cf.createConnection();
Session session = connection.createSession();
MessageProducer producer = isTopic ? session.createProducer(session
.createTopic(TOPIC_NAME)) : session.createProducer(session
.createQueue(QUEUE_NAME));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < MESSAGE_COUNT; i++) {
TextMessage message = session
.createTextMessage("Test message " + i);
producer.send(message);
}
producer.close();
session.close();
connection.close();
if(!isTopic) {
assertQueueLength(MESSAGE_COUNT);
}
}
private void assertQueueLength(int len) throws Exception, IOException {
Set<org.apache.activemq.broker.region.Destination> destinations = broker.getBroker().getDestinations(
new ActiveMQQueue(QUEUE_NAME));
org.apache.activemq.broker.region.Queue queue = (Queue) destinations.iterator().next();
assertEquals(len, queue.getMessageStore().getMessageCount());
}
protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception {
BrokerService broker = new BrokerService();
broker.setUseJmx(true);
broker.setBrokerName("localhost");
broker.setPersistenceAdapter(kaha);
return broker;
}
protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
kaha.setJournalMaxFileLength(maxFileLength);
kaha.setCleanupInterval(5000);
if (delete) {
kaha.deleteAllMessages();
}
return kaha;
}
public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
if (deleteAllMessages) {
multiKahaDBPersistenceAdapter.deleteAllMessages();
}
ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<>();
//have a topic wildcard filtered adapter
FilteredKahaDBPersistenceAdapter adapter1 = new FilteredKahaDBPersistenceAdapter();
adapter1.setPersistenceAdapter(createStore(deleteAllMessages));
adapter1.setTopic(">");
adapters.add(adapter1);
//have another per destination filtered adapter
FilteredKahaDBPersistenceAdapter adapter2 = new FilteredKahaDBPersistenceAdapter();
adapter2.setPersistenceAdapter(createStore(deleteAllMessages));
adapter2.setPerDestination(true);
adapters.add(adapter2);
multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
broker = createBroker(multiKahaDBPersistenceAdapter);
}
}