https://issues.apache.org/jira/browse/AMQ-5457 - fix and test - we now peek first in redeliveredWaitingDispatch

This commit is contained in:
gtully 2014-11-25 14:21:57 +00:00
parent c5f183548e
commit 74f530a641
2 changed files with 58 additions and 0 deletions

View File

@ -1121,6 +1121,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
pageInMessages(!memoryUsage.isFull(110));
};
doBrowseList(browseList, max, redeliveredWaitingDispatch, pagedInPendingDispatchLock, connectionContext, "redeliveredWaitingDispatch");
doBrowseList(browseList, max, pagedInPendingDispatch, pagedInPendingDispatchLock, connectionContext, "pagedInPendingDispatch");
doBrowseList(browseList, max, pagedInMessages, pagedInMessagesLock, connectionContext, "pagedInMessages");

View File

@ -44,7 +44,9 @@ import javax.management.openmbean.TabularData;
import junit.textui.TestRunner;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;
import org.apache.activemq.EmbeddedBrokerTestSupport;
@ -1362,6 +1364,61 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
session.close();
}
public void testBrowseOrder() throws Exception {
connection = connectionFactory.createConnection();
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setAll(20);
((ActiveMQConnection) connection).setPrefetchPolicy(prefetchPolicy);
useConnection(connection);
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
CompositeData[] compdatalist = queue.browse();
int initialQueueSize = compdatalist.length;
assertEquals("expected", MESSAGE_COUNT, initialQueueSize);
int messageCount = initialQueueSize;
for (int i = 0; i < messageCount; i++) {
CompositeData cdata = compdatalist[i];
String messageID = (String) cdata.get("JMSMessageID");
assertNotNull("Should have a message ID for message " + i, messageID);
Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES);
assertTrue("not empty", intProperties.size() > 0);
assertEquals("counter in order", i, intProperties.get("counter"));
}
echo("Attempting to consume 5 bytes messages from: " + destination);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination);
for (int i=0; i<5; i++) {
Message message = consumer.receive(5000);
assertNotNull(message);
assertEquals("ordered", i, message.getIntProperty("counter"));
echo("Consumed: " + message.getIntProperty("counter"));
}
consumer.close();
session.close();
connection.close();
// browse again and verify order
compdatalist = queue.browse();
initialQueueSize = compdatalist.length;
assertEquals("5 gone", MESSAGE_COUNT - 5, initialQueueSize);
messageCount = initialQueueSize;
for (int i = 0; i < messageCount - 4; i++) {
CompositeData cdata = compdatalist[i];
Map intProperties = CompositeDataHelper.getTabularMap(cdata, CompositeDataConstants.INT_PROPERTIES);
assertTrue("not empty", intProperties.size() > 0);
assertEquals("counter in order", i + 5, intProperties.get("counter"));
echo("Got: " + intProperties.get("counter"));
}
}
public void testAddRemoveConnectorBrokerView() throws Exception {
ObjectName brokerName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost");