https://issues.apache.org/jira/browse/AMQ-3695: Failover using a JDBC Message Store and Virtual Topic can result in a lost message if queue is empty. Problem is that an empty destination is not recorded, as there is no entry in the messages table. Fix is to make use of the ack table, in the same way a for durable subs. For destinations that match the virtual topic filter, an entry out of priority range is added to the ack table. the startup destination query now unions over the ack and messages table

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1240162 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2012-02-03 13:43:36 +00:00
parent 0295e8d44d
commit b6f63b0d10
9 changed files with 120 additions and 24 deletions

View File

@ -23,6 +23,7 @@ import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -74,7 +75,9 @@ import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.broker.scheduler.SchedulerBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.network.ConnectionFilter;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
@ -204,6 +207,7 @@ public class BrokerService implements Service {
private int offlineDurableSubscriberTimeout = -1;
private int offlineDurableSubscriberTaskSchedule = 300000;
private DestinationFilter virtualConsumerDestinationFilter;
static {
String localHostName = "localhost";
@ -2130,6 +2134,9 @@ public class BrokerService implements Service {
getBroker().addDestination(adminConnectionContext, destination,true);
}
}
if (isUseVirtualTopics()) {
startVirtualConsumerDestinations();
}
}
/**
@ -2297,28 +2304,40 @@ public class BrokerService implements Service {
}
}
/**
* Starts all destiantions in persistence store. This includes all inactive
* destinations
*/
protected void startDestinationsInPersistenceStore(Broker broker) throws Exception {
Set destinations = destinationFactory.getDestinations();
if (destinations != null) {
Iterator iter = destinations.iterator();
ConnectionContext adminConnectionContext = broker.getAdminConnectionContext();
if (adminConnectionContext == null) {
ConnectionContext context = new ConnectionContext();
context.setBroker(broker);
adminConnectionContext = context;
broker.setAdminConnectionContext(adminConnectionContext);
}
while (iter.hasNext()) {
ActiveMQDestination destination = (ActiveMQDestination) iter.next();
broker.addDestination(adminConnectionContext, destination,false);
protected void startVirtualConsumerDestinations() throws Exception {
ConnectionContext adminConnectionContext = getAdminConnectionContext();
Set<ActiveMQDestination> destinations = destinationFactory.getDestinations();
DestinationFilter filter = getVirtualTopicConsumerDestinationFilter();
if (!destinations.isEmpty()) {
for (ActiveMQDestination destination : destinations) {
if (filter.matches(destination) == true) {
broker.addDestination(adminConnectionContext, destination, false);
}
}
}
}
private DestinationFilter getVirtualTopicConsumerDestinationFilter() {
// created at startup, so no sync needed
if (virtualConsumerDestinationFilter == null) {
Set <ActiveMQQueue> consumerDestinations = new HashSet<ActiveMQQueue>();
for (DestinationInterceptor interceptor : destinationInterceptors) {
if (interceptor instanceof VirtualDestinationInterceptor) {
VirtualDestinationInterceptor virtualDestinationInterceptor = (VirtualDestinationInterceptor) interceptor;
for (VirtualDestination virtualDestination: virtualDestinationInterceptor.getVirtualDestinations()) {
if (virtualDestination instanceof VirtualTopic) {
consumerDestinations.add(new ActiveMQQueue(((VirtualTopic) virtualDestination).getPrefix() + DestinationFilter.ANY_DESCENDENT));
}
}
}
}
ActiveMQQueue filter = new ActiveMQQueue();
filter.setCompositeDestinations(consumerDestinations.toArray(new ActiveMQDestination[]{}));
virtualConsumerDestinationFilter = DestinationFilter.parseFilter(filter);
}
return virtualConsumerDestinationFilter;
}
protected synchronized ThreadPoolExecutor getExecutor() {
if (this.executor == null) {
this.executor = new ThreadPoolExecutor(1, 10, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
@ -2568,4 +2587,9 @@ public class BrokerService implements Service {
public void setOfflineDurableSubscriberTaskSchedule(int offlineDurableSubscriberTaskSchedule) {
this.offlineDurableSubscriberTaskSchedule = offlineDurableSubscriberTaskSchedule;
}
public boolean shouldRecordVirtualDestination(ActiveMQDestination destination) {
return isUseVirtualTopics() && destination.isQueue() &&
getVirtualTopicConsumerDestinationFilter().matches(destination);
}
}

View File

@ -97,4 +97,6 @@ public interface JDBCAdapter {
public int getMaxRows();
public void setMaxRows(int maxRows);
void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
}

View File

@ -68,14 +68,33 @@ public class JDBCMessageStore extends AbstractMessageStore {
protected ActiveMQMessageAudit audit;
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) {
public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination, ActiveMQMessageAudit audit) throws IOException {
super(destination);
this.persistenceAdapter = persistenceAdapter;
this.adapter = adapter;
this.wireFormat = wireFormat;
this.audit = audit;
if (destination.isQueue() && persistenceAdapter.getBrokerService().shouldRecordVirtualDestination(destination)) {
recordDestinationCreation(destination);
}
}
private void recordDestinationCreation(ActiveMQDestination destination) throws IOException {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
c = persistenceAdapter.getTransactionContext();
if (adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, destination.getQualifiedName(), destination.getQualifiedName()) < 0) {
adapter.doRecordDestination(c, destination);
}
} catch (SQLException e) {
JDBCPersistenceAdapter.log("JDBC Failure: ", e);
throw IOExceptionSupport.create("Failed to record destination: " + destination + ". Reason: " + e, e);
} finally {
c.close();
}
}
public void addMessage(ConnectionContext context, Message message) throws IOException {
MessageId messageId = message.getMessageId();
if (audit != null && audit.isDuplicate(message)) {

View File

@ -112,7 +112,6 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
}
public Set<ActiveMQDestination> getDestinations() {
// Get a connection and insert the message into the DB.
TransactionContext c = null;
try {
c = getTransactionContext();

View File

@ -60,7 +60,7 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
};
public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) {
public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic, ActiveMQMessageAudit audit) throws IOException {
super(persistenceAdapter, adapter, wireFormat, topic, audit);
}

View File

@ -356,7 +356,8 @@ public class Statements {
public String getFindAllDestinationsStatement() {
if (findAllDestinationsStatement == null) {
findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName();
findAllDestinationsStatement = "SELECT DISTINCT CONTAINER FROM " + getFullMessageTableName()
+ " UNION DISTINCT SELECT DISTINCT CONTAINER FROM " + getFullAckTableName();
}
return findAllDestinationsStatement;
}

View File

@ -17,8 +17,11 @@
package org.apache.activemq.store.jdbc.adapter;
import java.io.IOException;
import java.io.PrintStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
@ -771,6 +774,9 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
rs = s.executeQuery();
if (rs.next()) {
result = rs.getLong(1);
if (result == 0 && rs.wasNull()) {
result = -1;
}
}
} finally {
cleanupExclusiveLock.readLock().unlock();
@ -848,7 +854,30 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
public void setMaxRows(int maxRows) {
this.maxRows = maxRows;
}
}
@Override
public void doRecordDestination(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException {
PreparedStatement s = null;
cleanupExclusiveLock.readLock().lock();
try {
s = c.getConnection().prepareStatement(this.statements.getCreateDurableSubStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, destination.getQualifiedName());
s.setString(3, destination.getQualifiedName());
s.setString(4, null);
s.setLong(5, 0);
s.setString(6, destination.getQualifiedName());
s.setLong(7, 11); // entry out of priority range
if (s.executeUpdate() != 1) {
throw new IOException("Could not create ack record for destination: " + destination);
}
} finally {
cleanupExclusiveLock.readLock().unlock();
close(s);
}
}
/**
* @param c

View File

@ -21,9 +21,13 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTopicSendReceiveWithTwoConnectionsTest;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -108,4 +112,21 @@ public class QueueMasterSlaveTest extends JmsTopicSendReceiveWithTwoConnectionsT
slave.set(broker);
slaveStarted.countDown();
}
public void testVirtualTopicFailover() throws Exception {
MessageConsumer qConsumer = createConsumer(session, new ActiveMQQueue("Consumer.A.VirtualTopic.TA1"));
assertNull("No message there yet", qConsumer.receive(1000));
qConsumer.close();
master.stop();
assertTrue("slave started", slaveStarted.await(10, TimeUnit.SECONDS));
final String text = "ForUWhenSlaveKicksIn";
producer.send(new ActiveMQTopic("VirtualTopic.TA1"), session.createTextMessage(text));
qConsumer = createConsumer(session, new ActiveMQQueue("Consumer.A.VirtualTopic.TA1"));
javax.jms.Message message = qConsumer.receive(4000);
assertNotNull("Get message after failover", message);
assertEquals("correct message", text, ((TextMessage)message).getText());
}
}

View File

@ -30,6 +30,7 @@ public class JDBCNetworkBrokerDetachTest extends NetworkBrokerDetachTest {
jdbc.setDataSource(dataSource);
jdbc.deleteAllMessages();
broker.setPersistenceAdapter(jdbc);
broker.setUseVirtualTopics(false);
}
}