mirror of https://github.com/apache/activemq.git
AMQ-8131 - track unmatched acks in the acks table
This commit is contained in:
parent
1c315db1d1
commit
e1b3204407
|
@ -71,12 +71,6 @@ public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMess
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
|
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId, MessageAck ack) throws IOException {
|
||||||
if (ack != null && ack.isUnmatchedAck()) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("ignoring unmatched selector ack for: " + messageId + ", cleanup will get to this message after subsequent acks.");
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
TransactionContext c = persistenceAdapter.getTransactionContext(context);
|
TransactionContext c = persistenceAdapter.getTransactionContext(context);
|
||||||
try {
|
try {
|
||||||
long[] res = getCachedStoreSequenceId(c, destination, messageId);
|
long[] res = getCachedStoreSequenceId(c, destination, messageId);
|
||||||
|
|
|
@ -39,11 +39,11 @@ import org.apache.activemq.store.PersistenceAdapter;
|
||||||
*/
|
*/
|
||||||
public abstract class DurableSubscriptionTestSupport extends TestSupport {
|
public abstract class DurableSubscriptionTestSupport extends TestSupport {
|
||||||
|
|
||||||
private Connection connection;
|
protected Connection connection;
|
||||||
private Session session;
|
private Session session;
|
||||||
private TopicSubscriber consumer;
|
private TopicSubscriber consumer;
|
||||||
private MessageProducer producer;
|
private MessageProducer producer;
|
||||||
private BrokerService broker;
|
protected BrokerService broker;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
|
protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
|
||||||
|
@ -525,7 +525,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport {
|
||||||
return DeliveryMode.PERSISTENT;
|
return DeliveryMode.PERSISTENT;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertTextMessageEquals(String string, Message message) throws JMSException {
|
void assertTextMessageEquals(String string, Message message) throws JMSException {
|
||||||
assertNotNull("Message was null", message);
|
assertNotNull("Message was null", message);
|
||||||
assertTrue("Message is not a TextMessage", message instanceof TextMessage);
|
assertTrue("Message is not a TextMessage", message instanceof TextMessage);
|
||||||
assertEquals(string, ((TextMessage) message).getText());
|
assertEquals(string, ((TextMessage) message).getText());
|
||||||
|
|
|
@ -17,10 +17,20 @@
|
||||||
package org.apache.activemq.usecases;
|
package org.apache.activemq.usecases;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.sql.PreparedStatement;
|
||||||
|
import java.sql.ResultSet;
|
||||||
|
import java.sql.ResultSetMetaData;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
|
||||||
import org.apache.activemq.store.PersistenceAdapter;
|
import org.apache.activemq.store.PersistenceAdapter;
|
||||||
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
|
||||||
import org.apache.derby.jdbc.EmbeddedDataSource;
|
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
import javax.jms.Topic;
|
||||||
|
import javax.jms.TopicSubscriber;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -33,4 +43,76 @@ public class JDBCDurableSubscriptionTest extends DurableSubscriptionTestSupport
|
||||||
return jdbc;
|
return jdbc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testUnmatchedCleanedUp() throws Exception {
|
||||||
|
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Topic topic = session.createTopic("TestSelectorNoMatchCleanup");
|
||||||
|
TopicSubscriber consumer = session.createDurableSubscriber(topic, "sub1", "color='red'", false);
|
||||||
|
TopicSubscriber consumerNoMatch = session.createDurableSubscriber(topic, "sub2", "color='green'", false);
|
||||||
|
MessageProducer producer = session.createProducer(topic);
|
||||||
|
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
TextMessage msg = session.createTextMessage();
|
||||||
|
msg.setText("Msg:1");
|
||||||
|
msg.setStringProperty("color", "blue");
|
||||||
|
producer.send(msg);
|
||||||
|
msg.setText("Msg:2");
|
||||||
|
msg.setStringProperty("color", "red");
|
||||||
|
producer.send(msg);
|
||||||
|
|
||||||
|
assertTextMessageEquals("Msg:2", consumer.receive(5000));
|
||||||
|
|
||||||
|
assertNull(consumerNoMatch.receiveNoWait());
|
||||||
|
|
||||||
|
// verify cleanup
|
||||||
|
java.sql.Connection conn = ((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).getDataSource().getConnection();
|
||||||
|
PreparedStatement statement = conn.prepareStatement("SELECT ID FROM ACTIVEMQ_MSGS");
|
||||||
|
ResultSet result = statement.executeQuery();
|
||||||
|
printResults("MSGS", result);
|
||||||
|
statement.close();
|
||||||
|
|
||||||
|
statement = conn.prepareStatement("SELECT * FROM ACTIVEMQ_ACKS");
|
||||||
|
result = statement.executeQuery();
|
||||||
|
printResults("ACKS", result);
|
||||||
|
statement.close();
|
||||||
|
|
||||||
|
// run for each priority
|
||||||
|
for (int i=0; i<10; i++) {
|
||||||
|
((JDBCPersistenceAdapter) broker.getPersistenceAdapter()).cleanup();
|
||||||
|
}
|
||||||
|
|
||||||
|
// after cleanup
|
||||||
|
statement = conn.prepareStatement("SELECT ID FROM ACTIVEMQ_MSGS");
|
||||||
|
result = statement.executeQuery();
|
||||||
|
printResults("MSGS-AFTER", result);
|
||||||
|
statement.close();
|
||||||
|
|
||||||
|
statement = conn.prepareStatement("SELECT * FROM ACTIVEMQ_ACKS");
|
||||||
|
result = statement.executeQuery();
|
||||||
|
printResults("ACKS-AFTER", result);
|
||||||
|
statement.close();
|
||||||
|
|
||||||
|
|
||||||
|
// verify empty
|
||||||
|
statement = conn.prepareStatement("SELECT * FROM ACTIVEMQ_MSGS");
|
||||||
|
result = statement.executeQuery();
|
||||||
|
assertFalse(result.next());
|
||||||
|
|
||||||
|
conn.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void printResults(String detail, ResultSet result) throws SQLException {
|
||||||
|
System.out.println("**" + detail + "**");
|
||||||
|
ResultSetMetaData resultSetMetaData = result.getMetaData();
|
||||||
|
int columnsNumber = resultSetMetaData.getColumnCount();
|
||||||
|
while (result.next()) {
|
||||||
|
for (int i = 1; i <= columnsNumber; i++) {
|
||||||
|
if (i > 1) System.out.print(", ");
|
||||||
|
String columnValue = result.getString(i);
|
||||||
|
System.out.print(columnValue + " " + resultSetMetaData.getColumnName(i));
|
||||||
|
}
|
||||||
|
System.out.println();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue