mirror of https://github.com/apache/activemq.git
AMQ-8131 - revert treating unmatched as real acks b/c individual acks are not tracked. make use of enableMessageExpirationOnActiveDurableSubs to ensure unmatched can eventually expire
This commit is contained in:
parent
e1b3204407
commit
2f40261362
|
@ -71,6 +71,12 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
|
|||
|
||||
@Override
|
||||
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
|
||||
if (ack != null && ack.isUnmatchedAck()) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
TransactionContext c = persistenceAdapter.getTransactionContext(context);
|
||||
try {
|
||||
long[] res = getCachedStoreSequenceId(c, destination, messageId);
|
||||
|
|
|
@ -41,14 +41,11 @@ import org.apache.activemq.store.TopicMessageStore;
|
|||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.junit.Test;
|
||||
//import org.slf4j.Logger;
|
||||
//import org.slf4j.LoggerFactory;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class ActiveDurableSubscriptionBrowseExpireTest extends DurableSubscriptionOfflineTestBase {
|
||||
//private static final Logger LOG = LoggerFactory.getLogger(ActiveDurableSubscriptionBrowseExpireTest.class);
|
||||
private boolean enableExpiration = true;
|
||||
|
||||
public ActiveDurableSubscriptionBrowseExpireTest(boolean enableExpiration) {
|
||||
|
|
|
@ -22,8 +22,14 @@ import java.sql.ResultSet;
|
|||
import java.sql.ResultSetMetaData;
|
||||
import java.sql.SQLException;
|
||||
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||
import org.apache.activemq.util.Wait;
|
||||
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.MessageProducer;
|
||||
|
@ -43,14 +49,24 @@ public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport
|
|||
return jdbc;
|
||||
}
|
||||
|
||||
public void testUnmatchedCleanedUp() throws Exception {
|
||||
public void testUnmatchedCleanedUpOnExpiry() throws Exception {
|
||||
|
||||
// ensure expiry processing on active durable sub and no send to DLQ
|
||||
ActiveMQTopic activeMQTopic = new ActiveMQTopic("TestSelectorNoMatchCleanupOnExpired");
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry policyEntry = new PolicyEntry();
|
||||
policyEntry.getDeadLetterStrategy().setProcessExpired(false);
|
||||
policyMap.put(activeMQTopic, policyEntry);
|
||||
broker.setDestinationPolicy(policyMap);
|
||||
broker.setEnableMessageExpirationOnActiveDurableSubs(true);
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Topic topic = session.createTopic("TestSelectorNoMatchCleanup");
|
||||
Topic topic = session.createTopic(activeMQTopic.getTopicName());
|
||||
TopicSubscriber consumer = session.createDurableSubscriber(topic, "sub1", "color='red'", false);
|
||||
TopicSubscriber consumerNoMatch = session.createDurableSubscriber(topic, "sub2", "color='green'", false);
|
||||
MessageProducer producer = session.createProducer(topic);
|
||||
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||
producer.setTimeToLive(1000);
|
||||
connection.start();
|
||||
|
||||
TextMessage msg = session.createTextMessage();
|
||||
|
@ -77,10 +93,23 @@ public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport
|
|||
printResults("ACKS", result);
|
||||
statement.close();
|
||||
|
||||
// run for each priority
|
||||
for (int i=0; i<10; i++) {
|
||||
((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).cleanup();
|
||||
}
|
||||
// need to wait for expiry to kick in.....
|
||||
// browse till we get no messages and execute cleanup asap
|
||||
assertTrue("no messages from browse",
|
||||
Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
|
||||
Message[] browseResult = ((RegionBroker) broker.getRegionBroker()).getTopicRegion().getDestinationMap().get(topic).browse();
|
||||
|
||||
System.err.println("Browse: "+browseResult.length +", v:"+browseResult);
|
||||
|
||||
// run for each priority
|
||||
for(int i = 0; i < 10; i++) {
|
||||
((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).cleanup();
|
||||
}
|
||||
return browseResult.length == 0;
|
||||
}}));
|
||||
|
||||
// after cleanup
|
||||
statement = conn.prepareStatement("SELECT ID FROM ACTIVEMQ_MSGS");
|
||||
|
@ -114,5 +143,6 @@ public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport
|
|||
}
|
||||
System.out.println();
|
||||
}
|
||||
System.out.println("**" + detail + "** END");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue