Related to AMQ-4563: Added test cases where we select against the JMSMessageID and fixed a bug that was causing it to fail.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1488376 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-05-31 19:56:12 +00:00
parent ec9975c36e
commit eec7d7404a
2 changed files with 62 additions and 3 deletions

View File

@ -79,6 +79,52 @@ public class AMQ4563Test extends AmqpTestSupport {
assertEquals(0, messagesReceived);
}
@Test(timeout = 60000)
public void testSelectingOnAMQPMessageID() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://txqueue");
assertTrue(brokerService.isPersistent());
Connection connection = createAMQPConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("txqueue");
MessageProducer p = session.createProducer(destination);
TextMessage message = session.createTextMessage();
String messageText = "Hello sent at " + new java.util.Date().toString();
message.setText(messageText);
p.send(message);
// Restart broker.
restartBroker(connection, session);
String selector = "JMSMessageID = '" + message.getJMSMessageID() + "'";
LOG.info("Using selector: "+selector);
int messagesReceived = readAllMessages(queue, selector);
assertEquals(1, messagesReceived);
}
@Test(timeout = 60000)
public void testSelectingOnActiveMQMessageID() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
QueueImpl queue = new QueueImpl("queue://txqueue");
assertTrue(brokerService.isPersistent());
Connection connection = createAMQConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("txqueue");
MessageProducer p = session.createProducer(destination);
TextMessage message = session.createTextMessage();
String messageText = "Hello sent at " + new java.util.Date().toString();
message.setText(messageText);
p.send(message);
// Restart broker.
restartBroker(connection, session);
String selector = "JMSMessageID = '" + message.getJMSMessageID() + "'";
LOG.info("Using selector: "+selector);
int messagesReceived = readAllMessages(queue, selector);
assertEquals(1, messagesReceived);
}
@Test(timeout = 60000)
public void testMessagesAreAckedAMQPProducer() throws Exception {
int messagesSent = 3;
@ -110,11 +156,20 @@ public class AMQ4563Test extends AmqpTestSupport {
}
private int readAllMessages(QueueImpl queue) throws JMSException {
return readAllMessages(queue, null);
}
private int readAllMessages(QueueImpl queue, String selector) throws JMSException {
Connection connection = createAMQPConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
int messagesReceived = 0;
MessageConsumer consumer = session.createConsumer(queue);
MessageConsumer consumer;
if( selector==null ) {
consumer = session.createConsumer(queue);
} else {
consumer = session.createConsumer(queue, selector);
}
Message msg = consumer.receive(5000);
while(msg != null) {
assertNotNull(msg);
@ -186,6 +241,7 @@ public class AMQ4563Test extends AmqpTestSupport {
brokerService.setPersistenceAdapter(kaha);
brokerService.setAdvisorySupport(false);
brokerService.setUseJmx(false);
brokerService.setStoreOpenWireVersion(10);
openwireUri = brokerService.addConnector("tcp://0.0.0.0:0").getPublishableConnectString();
// Setup SSL context...

View File

@ -83,7 +83,6 @@ public class MessageId implements DataStructure, Comparable<MessageId> {
*/
public void setTextView(String key) {
this.textView = key;
this.key = key;
}
/**
@ -128,7 +127,11 @@ public class MessageId implements DataStructure, Comparable<MessageId> {
public String toString() {
if (key == null) {
if( textView!=null ) {
key = textView;
if( textView.startsWith("ID:") ) {
key = textView;
} else {
key = "ID:"+textView;
}
} else {
key = producerId.toString() + ":" + producerSequenceId;
}