From b6f63b0d10f3db34356b056e3e4caeedc4c95aad Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Fri, 3 Feb 2012 13:43:36 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3695: Failover using a JDBC Message Store and Virtual Topic can result in a lost message if queue is empty. Problem is that an empty destination is not recorded, as there is no entry in the messages table. Fix is to make use of the ack table, in the same way a for durable subs. For destinations that match the virtual topic filter, an entry out of priority range is added to the ack table. the startup destination query now unions over the ack and messages table git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1240162 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/BrokerService.java | 60 +++++++++++++------ .../activemq/store/jdbc/JDBCAdapter.java | 2 + .../activemq/store/jdbc/JDBCMessageStore.java | 23 ++++++- .../store/jdbc/JDBCPersistenceAdapter.java | 1 - .../store/jdbc/JDBCTopicMessageStore.java | 2 +- .../activemq/store/jdbc/Statements.java | 3 +- .../jdbc/adapter/DefaultJDBCAdapter.java | 31 +++++++++- .../broker/ft/QueueMasterSlaveTest.java | 21 +++++++ .../jdbc/JDBCNetworkBrokerDetachTest.java | 1 + 9 files changed, 120 insertions(+), 24 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index b57f7118c6..4cc3f22e1b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -23,6 +23,7 @@ import java.net.URISyntaxException; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -74,7 +75,9 @@ import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; import org.apache.activemq.broker.region.virtual.VirtualTopic; import org.apache.activemq.broker.scheduler.SchedulerBroker; import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.BrokerId; +import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.network.ConnectionFilter; import org.apache.activemq.network.DiscoveryNetworkConnector; import org.apache.activemq.network.NetworkConnector; @@ -204,6 +207,7 @@ public class BrokerService implements Service { private int offlineDurableSubscriberTimeout = -1; private int offlineDurableSubscriberTaskSchedule = 300000; + private DestinationFilter virtualConsumerDestinationFilter; static { String localHostName = "localhost"; @@ -2130,6 +2134,9 @@ public class BrokerService implements Service { getBroker().addDestination(adminConnectionContext, destination,true); } } + if (isUseVirtualTopics()) { + startVirtualConsumerDestinations(); + } } /** @@ -2297,28 +2304,40 @@ public class BrokerService implements Service { } } - /** - * Starts all destiantions in persistence store. This includes all inactive - * destinations - */ - protected void startDestinationsInPersistenceStore(Broker broker) throws Exception { - Set destinations = destinationFactory.getDestinations(); - if (destinations != null) { - Iterator iter = destinations.iterator(); - ConnectionContext adminConnectionContext = broker.getAdminConnectionContext(); - if (adminConnectionContext == null) { - ConnectionContext context = new ConnectionContext(); - context.setBroker(broker); - adminConnectionContext = context; - broker.setAdminConnectionContext(adminConnectionContext); - } - while (iter.hasNext()) { - ActiveMQDestination destination = (ActiveMQDestination) iter.next(); - broker.addDestination(adminConnectionContext, destination,false); + protected void startVirtualConsumerDestinations() throws Exception { + ConnectionContext adminConnectionContext = getAdminConnectionContext(); + Set destinations = destinationFactory.getDestinations(); + DestinationFilter filter = getVirtualTopicConsumerDestinationFilter(); + if (!destinations.isEmpty()) { + for (ActiveMQDestination destination : destinations) { + if (filter.matches(destination) == true) { + broker.addDestination(adminConnectionContext, destination, false); + } } } } + private DestinationFilter getVirtualTopicConsumerDestinationFilter() { + // created at startup, so no sync needed + if (virtualConsumerDestinationFilter == null) { + Set consumerDestinations = new HashSet(); + for (DestinationInterceptor interceptor : destinationInterceptors) { + if (interceptor instanceof VirtualDestinationInterceptor) { + VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor; + for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) { + if (virtualDestination instanceof VirtualTopic) { + consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT)); + } + } + } + } + ActiveMQQueue filter = new ActiveMQQueue(); + filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{})); + virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter); + } + return virtualConsumerDestinationFilter; + } + protected synchronized ThreadPoolExecutor getExecutor() { if (this.executor == null) { this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue(), new ThreadFactory() { @@ -2568,4 +2587,9 @@ public class BrokerService implements Service { public void setOfflineDurableSubscriberTaskSchedule(int offlineDurableSubscriberTaskSchedule) { this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule; } + + public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) { + return isUseVirtualTopics() && destination.isQueue() && + getVirtualTopicConsumerDestinationFilter().matches(destination); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java index 6b54c90ceb..b72700bbf3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java @@ -97,4 +97,6 @@ public interface JDBCAdapter { public int getMaxRows(); public void setMaxRows(int maxRows); + + void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 25c6c3968c..dd49e081a3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -68,14 +68,33 @@ public class JDBCMessageStore extends AbstractMessageStore { protected ActiveMQMessageAudit audit; - public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) { + public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException { super(destination); this.persistenceAdapter = persistenceAdapter; this.adapter = adapter; this.wireFormat = wireFormat; this.audit = audit; + + if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) { + recordDestinationCreation(destination); + } } - + + private void recordDestinationCreation(ActiveMQDestination destination) throws IOException { + TransactionContext c = persistenceAdapter.getTransactionContext(); + try { + c = persistenceAdapter.getTransactionContext(); + if (adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, destination.getQualifiedName(), destination.getQualifiedName()) < 0) { + adapter.doRecordDestination(c, destination); + } + } catch (SQLException e) { + JDBCPersistenceAdapter.log("JDBC Failure: ", e); + throw IOExceptionSupport.create("Failed to record destination: " + destination + ". Reason: " + e, e); + } finally { + c.close(); + } + } + public void addMessage(ConnectionContext context, Message message) throws IOException { MessageId messageId = message.getMessageId(); if (audit != null && audit.isDuplicate(message)) { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index 8857ca02b9..31b931ed99 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -112,7 +112,6 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist } public Set getDestinations() { - // Get a connection and insert the message into the DB. TransactionContext c = null; try { c = getTransactionContext(); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java index 7d577148ea..aedcf5dc83 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java @@ -60,7 +60,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess }; - public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) { + public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) throws IOException { super(persistenceAdapter, adapter, wireFormat, topic, audit); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java index 549c78abe1..932bfb262e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/Statements.java @@ -356,7 +356,8 @@ public class Statements { public String getFindAllDestinationsStatement() { if (findAllDestinationsStatement == null) { - findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName(); + findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName() + + " UNION DISTINCT SELECT DISTINCT CONTAINER FROM " + getFullAckTableName(); } return findAllDestinationsStatement; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java index e8716a9848..4bc29e2a29 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/adapter/DefaultJDBCAdapter.java @@ -17,8 +17,11 @@ package org.apache.activemq.store.jdbc.adapter; import java.io.IOException; +import java.io.PrintStream; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; @@ -771,6 +774,9 @@ public class DefaultJDBCAdapter implements JDBCAdapter { rs = s.executeQuery(); if (rs.next()) { result = rs.getLong(1); + if (result == 0 && rs.wasNull()) { + result = -1; + } } } finally { cleanupExclusiveLock.readLock().unlock(); @@ -848,7 +854,30 @@ public class DefaultJDBCAdapter implements JDBCAdapter { public void setMaxRows(int maxRows) { this.maxRows = maxRows; - } + } + + @Override + public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException { + PreparedStatement s = null; + cleanupExclusiveLock.readLock().lock(); + try { + s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement()); + s.setString(1, destination.getQualifiedName()); + s.setString(2, destination.getQualifiedName()); + s.setString(3, destination.getQualifiedName()); + s.setString(4, null); + s.setLong(5, 0); + s.setString(6, destination.getQualifiedName()); + s.setLong(7, 11); // entry out of priority range + + if (s.executeUpdate() != 1) { + throw new IOException("Could not create ack record for destination: " + destination); + } + } finally { + cleanupExclusiveLock.readLock().unlock(); + close(s); + } + } /** * @param c diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java index a9527e10f5..6507dd0f31 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java @@ -21,9 +21,13 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import javax.jms.MessageConsumer; +import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.xbean.BrokerFactoryBean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,4 +112,21 @@ public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsT slave.set(broker); slaveStarted.countDown(); } + + public void testVirtualTopicFailover() throws Exception { + + MessageConsumer qConsumer = createConsumer(session, new ActiveMQQueue("Consumer.A.VirtualTopic.TA1")); + assertNull("No message there yet", qConsumer.receive(1000)); + qConsumer.close(); + master.stop(); + assertTrue("slave started", slaveStarted.await(10, TimeUnit.SECONDS)); + + final String text = "ForUWhenSlaveKicksIn"; + producer.send(new ActiveMQTopic("VirtualTopic.TA1"), session.createTextMessage(text)); + + qConsumer = createConsumer(session, new ActiveMQQueue("Consumer.A.VirtualTopic.TA1")); + javax.jms.Message message = qConsumer.receive(4000); + assertNotNull("Get message after failover", message); + assertEquals("correct message", text, ((TextMessage)message).getText()); + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java index cde444fe44..409b20f0eb 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/jdbc/JDBCNetworkBrokerDetachTest.java @@ -30,6 +30,7 @@ public class JDBCNetworkBrokerDetachTest extends NetworkBrokerDetachTest { jdbc.setDataSource(dataSource); jdbc.deleteAllMessages(); broker.setPersistenceAdapter(jdbc); + broker.setUseVirtualTopics(false); } }