diff --git a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java index ccbd5c26b6..a857bcf4ef 100644 --- a/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java +++ b/activemq-jdbc-store/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java @@ -71,6 +71,12 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess @Override public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException { + if (ack != null && ack.isUnmatchedAck()) { + if (LOG.isTraceEnabled()) { + LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks."); + } + return; + } TransactionContext c = persistenceAdapter.getTransactionContext(context); try { long[] res = getCachedStoreSequenceId(c, destination, messageId); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java index 69c734e270..8954c55b2f 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ActiveDurableSubscriptionBrowseExpireTest.java @@ -41,14 +41,11 @@ import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.command.MessageId; import org.junit.Test; -//import org.slf4j.Logger; -//import org.slf4j.LoggerFactory; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @RunWith(value = Parameterized.class) public class ActiveDurableSubscriptionBrowseExpireTest extends DurableSubscriptionOfflineTestBase { - //private static final Logger LOG = LoggerFactory.getLogger(ActiveDurableSubscriptionBrowseExpireTest.class); private boolean enableExpiration = true; public ActiveDurableSubscriptionBrowseExpireTest(boolean enableExpiration) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java index 5cb336bba8..fba65d1948 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/JDBCDurableSubscriptionTest.java @@ -22,8 +22,14 @@ import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.Message; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.util.Wait; import javax.jms.DeliveryMode; import javax.jms.MessageProducer; @@ -43,14 +49,24 @@ public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport return jdbc; } - public void testUnmatchedCleanedUp() throws Exception { + public void testUnmatchedCleanedUpOnExpiry() throws Exception { + + // ensure expiry processing on active durable sub and no send to DLQ + ActiveMQTopic activeMQTopic = new ActiveMQTopic("TestSelectorNoMatchCleanupOnExpired"); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.getDeadLetterStrategy().setProcessExpired(false); + policyMap.put(activeMQTopic, policyEntry); + broker.setDestinationPolicy(policyMap); + broker.setEnableMessageExpirationOnActiveDurableSubs(true); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic("TestSelectorNoMatchCleanup"); + Topic topic = session.createTopic(activeMQTopic.getTopicName()); TopicSubscriber consumer = session.createDurableSubscriber(topic, "sub1", "color='red'", false); TopicSubscriber consumerNoMatch = session.createDurableSubscriber(topic, "sub2", "color='green'", false); MessageProducer producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); + producer.setTimeToLive(1000); connection.start(); TextMessage msg = session.createTextMessage(); @@ -77,10 +93,23 @@ public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport printResults("ACKS", result); statement.close(); - // run for each priority - for (int i=0; i<10; i++) { - ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).cleanup(); - } + // need to wait for expiry to kick in..... + // browse till we get no messages and execute cleanup asap + assertTrue("no messages from browse", + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + + Message[] browseResult = ((RegionBroker) broker.getRegionBroker()).getTopicRegion().getDestinationMap().get(topic).browse(); + + System.err.println("Browse: "+browseResult.length +", v:"+browseResult); + + // run for each priority + for(int i = 0; i < 10; i++) { + ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).cleanup(); + } + return browseResult.length == 0; + }})); // after cleanup statement = conn.prepareStatement("SELECT ID FROM ACTIVEMQ_MSGS"); @@ -114,5 +143,6 @@ public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport } System.out.println(); } + System.out.println("**" + detail + "** END"); } }