diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index 175002aec3..744220dc81 100755 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -39,7 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * + * */ public class JDBCMessageStore extends AbstractMessageStore { @@ -98,6 +98,7 @@ public class JDBCMessageStore extends AbstractMessageStore { } } + @Override public void addMessage(final ConnectionContext context, final Message message) throws IOException { MessageId messageId = message.getMessageId(); if (audit != null && audit.isDuplicate(message)) { @@ -133,6 +134,7 @@ public class JDBCMessageStore extends AbstractMessageStore { pendingAdditions.add(sequence); c.onCompletion(new Runnable() { + @Override public void run() { // jdbc close or jms commit - while futureOrSequenceLong==null ordered // work will remain pending on the Queue @@ -207,6 +209,7 @@ public class JDBCMessageStore extends AbstractMessageStore { } } + @Override public Message getMessage(MessageId messageId) throws IOException { // Get a connection and pull the message out of the DB TransactionContext c = persistenceAdapter.getTransactionContext(); @@ -245,6 +248,7 @@ public class JDBCMessageStore extends AbstractMessageStore { } } + @Override public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { long seq = ack.getLastMessageId().getFutureOrSequenceLong() != null ? @@ -263,6 +267,7 @@ public class JDBCMessageStore extends AbstractMessageStore { } } + @Override public void recover(final MessageRecoveryListener listener) throws Exception { // Get all the Message ids out of the database. @@ -270,14 +275,30 @@ public class JDBCMessageStore extends AbstractMessageStore { try { c = persistenceAdapter.getTransactionContext(); adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() { + @Override public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { - Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); - msg.getMessageId().setBrokerSequenceId(sequenceId); - return listener.recoverMessage(msg); + if (listener.hasSpace()) { + Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data)); + msg.getMessageId().setBrokerSequenceId(sequenceId); + return listener.recoverMessage(msg); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Message recovery limit reached for MessageRecoveryListener"); + } + return false; + } } + @Override public boolean recoverMessageReference(String reference) throws Exception { - return listener.recoverMessageReference(new MessageId(reference)); + if (listener.hasSpace()) { + return listener.recoverMessageReference(new MessageId(reference)); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("Message recovery limit reached for MessageRecoveryListener"); + } + return false; + } } }); } catch (SQLException e) { @@ -291,6 +312,7 @@ public class JDBCMessageStore extends AbstractMessageStore { /** * @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext) */ + @Override public void removeAllMessages(ConnectionContext context) throws IOException { // Get a connection and remove the message from the DB TransactionContext c = persistenceAdapter.getTransactionContext(context); @@ -328,6 +350,7 @@ public class JDBCMessageStore extends AbstractMessageStore { * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int, * org.apache.activemq.store.MessageRecoveryListener) */ + @Override public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception { TransactionContext c = persistenceAdapter.getTransactionContext(); try { @@ -337,6 +360,7 @@ public class JDBCMessageStore extends AbstractMessageStore { adapter.doRecoverNextMessages(c, destination, perPriorityLastRecovered, minPendingSequeunceId(), maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() { + @Override public boolean recoverMessage(long sequenceId, byte[] data) throws Exception { Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data)); msg.getMessageId().setBrokerSequenceId(sequenceId); @@ -346,6 +370,7 @@ public class JDBCMessageStore extends AbstractMessageStore { return true; } + @Override public boolean recoverMessageReference(String reference) throws Exception { if (listener.hasSpace()) { listener.recoverMessageReference(new MessageId(reference)); @@ -370,6 +395,7 @@ public class JDBCMessageStore extends AbstractMessageStore { /** * @see org.apache.activemq.store.MessageStore#resetBatching() */ + @Override public void resetBatching() { if (LOG.isTraceEnabled()) { LOG.trace(this + " resetBatching. last recovered: " + Arrays.toString(perPriorityLastRecovered)); @@ -401,6 +427,7 @@ public class JDBCMessageStore extends AbstractMessageStore { } + @Override public void setPrioritizedMessages(boolean prioritizedMessages) { super.setPrioritizedMessages(prioritizedMessages); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java new file mode 100644 index 0000000000..e8e819c8f4 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapterExpiredMessageTest.java @@ -0,0 +1,157 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.jdbc; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TopicSubscriber; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.store.MessageRecoveryListener; +import org.apache.activemq.store.ProxyTopicMessageStore; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class JDBCPersistenceAdapterExpiredMessageTest { + + @Rule + public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target")); + + protected BrokerService brokerService; + private AtomicBoolean hasSpaceCalled = new AtomicBoolean(); + private int expireSize = 5; + + @Before + public void setUp() throws Exception { + hasSpaceCalled.set(false); + brokerService = new BrokerService(); + + //Wrap the adapter and listener to set a flag to make sure we are calling hasSpace() + //during expiration + JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter() { + + @Override + public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { + ProxyTopicMessageStore proxy = new ProxyTopicMessageStore(super.createTopicMessageStore(destination)) { + + @Override + public void recover(final MessageRecoveryListener listener) throws Exception { + MessageRecoveryListener delegate = new MessageRecoveryListener() { + + @Override + public boolean recoverMessageReference(MessageId ref) throws Exception { + return listener.recoverMessageReference(ref); + } + + @Override + public boolean recoverMessage(Message message) throws Exception { + return listener.recoverMessage(message); + } + + @Override + public boolean isDuplicate(MessageId ref) { + return listener.isDuplicate(ref); + } + + @Override + public boolean hasSpace() { + hasSpaceCalled.set(true); + return listener.hasSpace(); + } + }; + super.recover(delegate); + } + + }; + return proxy; + } + + }; + + brokerService.setSchedulerSupport(false); + brokerService.setDataDirectoryFile(dataFileDir.getRoot()); + brokerService.setPersistenceAdapter(jdbc); + brokerService.setDeleteAllMessagesOnStartup(true); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setExpireMessagesPeriod(5000); + defaultEntry.setMaxExpirePageSize(expireSize); + policyMap.setDefaultEntry(defaultEntry); + brokerService.setDestinationPolicy(policyMap); + + brokerService.start(); + } + + @After + public void stop() throws Exception { + if (brokerService != null) { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + } + + @Test + public void testMaxExpirePageSize() throws Exception { + final ActiveMQTopic topic = new ActiveMQTopic("test.topic"); + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + factory.setClientID("clientId");; + Connection conn = factory.createConnection(); + conn.start(); + Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber sub = sess.createDurableSubscriber(topic, "sub1"); + sub.close(); + + MessageProducer producer = sess.createProducer(topic); + producer.setTimeToLive(1000); + + for (int i = 0; i < 50; i++) { + producer.send(sess.createTextMessage("test message: " + i)); + } + + //There should be exactly 5 messages expired because the limit was hit and it stopped + //The expire messages period is 5 seconds which should give enough time for this assertion + //to pass before expiring more messages + assertTrue(Wait.waitFor(new Condition() { + + @Override + public boolean isSatisified() throws Exception { + long expired = brokerService.getDestination(topic).getDestinationStatistics().getExpired().getCount(); + return expired == expireSize && hasSpaceCalled.get(); + } + }, 15000, 1000)); + } +}