From 52e1a05aaadda40eb2ce63f610937981f96506c6 Mon Sep 17 00:00:00 2001 From: gtully Date: Fri, 31 Oct 2014 16:33:56 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5347 - variant of patch and additional tests applied, prefetch=0 test without restart exposed ordering bug with fix for https://issues.apache.org/jira/browse/AMQ-2719 --- .../activemq/broker/TransportConnection.java | 6 +- .../broker/region/PrefetchSubscription.java | 3 + .../apache/activemq/broker/region/Queue.java | 33 +- .../activemq/broker/region/RegionBroker.java | 4 +- .../RedeliveryRestartWithExceptionTest.java | 419 ++++++++++++++++++ 5 files changed, 447 insertions(+), 18 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java index 5bb7ec1ec0..a9e4c86d0c 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -929,7 +929,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor { try { if (!stopping.get()) { if (messageDispatch != null) { - broker.preProcessDispatch(messageDispatch); + try { + broker.preProcessDispatch(messageDispatch); + } catch (RuntimeException convertToIO) { + throw new IOException(convertToIO); + } } dispatch(command); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 5ba3b539e8..8b8a78840c 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -731,6 +731,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription { LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), dispatchCounter, dispatched.size() }); } } + if (node instanceof QueueMessageReference) { + ((QueueMessageReference) node).unlock(); + } } }); context.getConnection().dispatchAsync(md); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index a1b8b07465..5c7a98868f 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1968,24 +1968,25 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index // proper order redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch); } - if (!pagedInPendingDispatch.isEmpty()) { - // Next dispatch anything that had not been - // dispatched before. - pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch); - } - // and now see if we can dispatch the new stuff.. and append to - // the pending - // list anything that does not actually get dispatched. - if (list != null && !list.isEmpty()) { - if (pagedInPendingDispatch.isEmpty()) { - pagedInPendingDispatch.addAll(doActualDispatch(list)); - } else { - for (MessageReference qmr : list) { - if (!pagedInPendingDispatch.contains(qmr)) { - pagedInPendingDispatch.addMessageLast(qmr); + if (redeliveredWaitingDispatch.isEmpty()) { + if (!pagedInPendingDispatch.isEmpty()) { + // Next dispatch anything that had not been + // dispatched before. + pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch); + } + // and now see if we can dispatch the new stuff.. and append to the pending + // list anything that does not actually get dispatched. + if (list != null && !list.isEmpty()) { + if (pagedInPendingDispatch.isEmpty()) { + pagedInPendingDispatch.addAll(doActualDispatch(list)); + } else { + for (MessageReference qmr : list) { + if (!pagedInPendingDispatch.contains(qmr)) { + pagedInPendingDispatch.addMessageLast(qmr); + } } + doWakeUp = true; } - doWakeUp = true; } } } finally { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index cb79c84f90..658bc7c1dd 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -628,7 +628,9 @@ public class RegionBroker extends EmptyBroker { try { ((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message); } catch (IOException error) { - LOG.error("Failed to persist JMSRedeliveryFlag on {} in {}", message.getMessageId(), message.getDestination(), error); + RuntimeException runtimeException = new RuntimeException("Failed to persist JMSRedeliveryFlag on " + message.getMessageId() + " in " + message.getDestination(), error); + LOG.warn(runtimeException.getLocalizedMessage(), runtimeException); + throw runtimeException; } finally { message.setRedeliveryCounter(originalValue); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java new file mode 100644 index 0000000000..4126f0625e --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RedeliveryRestartWithExceptionTest.java @@ -0,0 +1,419 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker; + +import java.io.File; +import java.io.IOException; +import java.util.Set; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.scheduler.JobSchedulerStore; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.ProducerId; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.ProxyMessageStore; +import org.apache.activemq.store.ProxyTopicMessageStore; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.TransactionStore; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.usage.SystemUsage; +import org.junit.After; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RedeliveryRestartWithExceptionTest extends TestSupport { + + private static final transient Logger LOG = LoggerFactory.getLogger(RedeliveryRestartWithExceptionTest.class); + ActiveMQConnection connection; + BrokerService broker = null; + String queueName = "redeliveryRestartQ"; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + broker = new BrokerService(); + configureBroker(broker, true); + broker.setDeleteAllMessagesOnStartup(true); + broker.start(); + } + + @Override + @After + public void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + broker.stop(); + super.tearDown(); + } + + protected void configureBroker(BrokerService broker, boolean throwExceptionOnUpdate) throws Exception { + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setPersistJMSRedelivered(true); + policyMap.setDefaultEntry(policy); + broker.setDestinationPolicy(policyMap); + broker.setPersistenceAdapter(new KahaDBWithUpdateExceptionPersistenceAdapter(throwExceptionOnUpdate)); + broker.addConnector("tcp://0.0.0.0:0"); + } + + @org.junit.Test + public void testValidateRedeliveryFlagAfterRestart() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + + "?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination destination = session.createQueue(queueName); + populateDestination(10, destination, connection); + TextMessage msg = null; + MessageConsumer consumer = session.createConsumer(destination); + Exception expectedException = null; + try { + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(5000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertTrue("Should not receive the 5th message", i < 4); + //The first 4 messages will be ok but the 5th one should hit an exception in updateMessage and should not be delivered + } + } catch (Exception e) { + //Expecting an exception and disconnect on the 5th message + LOG.info("Got expected:", e); + expectedException = e; + } + assertNotNull("Expecting an exception when updateMessage fails", expectedException); + + consumer.close(); + connection.close(); + + restartBroker(); + + connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + + "?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = session.createQueue(queueName); + consumer = session.createConsumer(destination); + + + // consume the messages that were previously delivered + for (int i = 0; i < 4; i++) { + msg = (TextMessage) consumer.receive(4000); + LOG.info("redelivered? got: " + msg); + assertNotNull("got the message again", msg); + assertEquals("re delivery flag", true, msg.getJMSRedelivered()); + assertTrue("redelivery count survives restart", msg.getLongProperty("JMSXDeliveryCount") > 1); + msg.acknowledge(); + } + + + // consume the rest that were not redeliveries + for (int i = 0; i < 6; i++) { + msg = (TextMessage) consumer.receive(4000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + msg.acknowledge(); + } + connection.close(); + } + + + @org.junit.Test + public void testValidateRedeliveryFlagAfterTransientFailureConnectionDrop() throws Exception { + + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString() + + "?jms.prefetchPolicy.all=0"); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination destination = session.createQueue(queueName); + populateDestination(10, destination, connection); + TextMessage msg = null; + MessageConsumer consumer = session.createConsumer(destination); + Exception expectedException = null; + try { + for (int i = 0; i < 5; i++) { + msg = (TextMessage) consumer.receive(5000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertTrue("Should not receive the 5th message", i < 4); + //The first 4 messages will be ok but the 5th one should hit an exception in updateMessage and should not be delivered + } + } catch (Exception e) { + //Expecting an exception and disconnect on the 5th message + LOG.info("Got expected:", e); + expectedException = e; + } + assertNotNull("Expecting an exception when updateMessage fails", expectedException); + + consumer.close(); + connection.close(); + + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = session.createQueue(queueName); + consumer = session.createConsumer(destination); + + + // consume the messages that were previously delivered + for (int i = 0; i < 4; i++) { + msg = (TextMessage) consumer.receive(4000); + LOG.info("redelivered? got: " + msg); + assertNotNull("got the message again", msg); + assertEquals("re delivery flag", true, msg.getJMSRedelivered()); + assertTrue("redelivery count survives reconnect", msg.getLongProperty("JMSXDeliveryCount") > 1); + msg.acknowledge(); + } + + + // consume the rest that were not redeliveries + for (int i = 0; i < 6; i++) { + msg = (TextMessage) consumer.receive(4000); + LOG.info("not redelivered? got: " + msg); + assertNotNull("got the message", msg); + assertEquals("not a redelivery", false, msg.getJMSRedelivered()); + assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount")); + msg.acknowledge(); + } + connection.close(); + } + + private void restartBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + broker = createRestartedBroker(); + broker.start(); + } + + private BrokerService createRestartedBroker() throws Exception { + broker = new BrokerService(); + configureBroker(broker, false); + return broker; + } + + private void populateDestination(final int nbMessages, final Destination destination, javax.jms.Connection connection) throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + for (int i = 1; i <= nbMessages; i++) { + producer.send(session.createTextMessage("")); + } + producer.close(); + session.close(); + } + + private class KahaDBWithUpdateExceptionPersistenceAdapter implements PersistenceAdapter { + + private KahaDBPersistenceAdapter kahaDB = new KahaDBPersistenceAdapter(); + private boolean throwExceptionOnUpdate; + + public KahaDBWithUpdateExceptionPersistenceAdapter(boolean throwExceptionOnUpdate) { + this.throwExceptionOnUpdate = throwExceptionOnUpdate; + } + + @Override + public void start() throws Exception { + kahaDB.start(); + } + + @Override + public void stop() throws Exception { + kahaDB.stop(); + } + + @Override + public Set getDestinations() { + return kahaDB.getDestinations(); + } + + @Override + public MessageStore createQueueMessageStore(ActiveMQQueue destination) + throws IOException { + MessageStore proxyMessageStoreWithException = new ProxyMessageStoreWithUpdateException( + kahaDB.createQueueMessageStore(destination), throwExceptionOnUpdate); + return proxyMessageStoreWithException; + } + + @Override + public TopicMessageStore createTopicMessageStore( + ActiveMQTopic destination) throws IOException { + TopicMessageStore proxyMessageStoreWithException = new ProxyTopicMessageStoreWithUpdateException( + kahaDB.createTopicMessageStore(destination), throwExceptionOnUpdate); + return proxyMessageStoreWithException; + } + + @Override + public JobSchedulerStore createJobSchedulerStore() throws IOException, UnsupportedOperationException { + return kahaDB.createJobSchedulerStore(); + } + + @Override + public void removeQueueMessageStore(ActiveMQQueue destination) { + kahaDB.removeQueueMessageStore(destination); + } + + @Override + public void removeTopicMessageStore(ActiveMQTopic destination) { + kahaDB.removeTopicMessageStore(destination); + } + + @Override + public TransactionStore createTransactionStore() throws IOException { + return kahaDB.createTransactionStore(); + } + + @Override + public void beginTransaction(ConnectionContext context) + throws IOException { + kahaDB.beginTransaction(context); + } + + @Override + public void commitTransaction(ConnectionContext context) + throws IOException { + kahaDB.commitTransaction(context); + } + + @Override + public void rollbackTransaction(ConnectionContext context) + throws IOException { + kahaDB.rollbackTransaction(context); + } + + @Override + public long getLastMessageBrokerSequenceId() throws IOException { + return kahaDB.getLastMessageBrokerSequenceId(); + } + + @Override + public void deleteAllMessages() throws IOException { + kahaDB.deleteAllMessages(); + } + + @Override + public void setUsageManager(SystemUsage usageManager) { + kahaDB.setUsageManager(usageManager); + } + + @Override + public void setBrokerName(String brokerName) { + kahaDB.setBrokerName(brokerName); + } + + @Override + public void setDirectory(File dir) { + kahaDB.setDirectory(dir); + } + + @Override + public File getDirectory() { + return kahaDB.getDirectory(); + } + + @Override + public void checkpoint(boolean sync) throws IOException { + kahaDB.checkpoint(sync); + } + + @Override + public long size() { + return kahaDB.size(); + } + + @Override + public long getLastProducerSequenceId(ProducerId id) throws IOException { + return kahaDB.getLastProducerSequenceId(id); + } + + } + + private class ProxyMessageStoreWithUpdateException extends ProxyMessageStore { + private boolean throwExceptionOnUpdate; + private int numBeforeException = 4; + public ProxyMessageStoreWithUpdateException(MessageStore delegate, boolean throwExceptionOnUpdate) { + super(delegate); + this.throwExceptionOnUpdate = throwExceptionOnUpdate; + } + + @Override + public void updateMessage(Message message) throws IOException { + if(throwExceptionOnUpdate) { + if(numBeforeException > 0) { + numBeforeException--; + super.updateMessage(message); + } else { + // lets only do it once so we can validate transient store failure + throwExceptionOnUpdate = false; + + //A message that has never been delivered will hit this exception + throw new IOException("Hit our simulated exception writing the update to disk"); + } + } else { + super.updateMessage(message); + } + } + } + + private class ProxyTopicMessageStoreWithUpdateException extends ProxyTopicMessageStore { + private boolean throwExceptionOnUpdate; + private int numBeforeException = 4; + public ProxyTopicMessageStoreWithUpdateException(TopicMessageStore delegate, boolean throwExceptionOnUpdate) { + super(delegate); + this.throwExceptionOnUpdate = throwExceptionOnUpdate; + } + + @Override + public void updateMessage(Message message) throws IOException { + if(throwExceptionOnUpdate) { + if(numBeforeException > 0) { + numBeforeException--; + super.updateMessage(message); + } else { + //A message that has never been delivered will hit this exception + throw new IOException("Hit our simulated exception writing the update to disk"); + } + } else { + super.updateMessage(message); + } + } + } +} \ No newline at end of file