mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3935 - JConsole browse() function does not work if useCache=false. Add test case and a few other browse variants with useCache=false. Fix ensures a force pagein is done if usecache is false
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1362462 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8aadc4b9ec
commit
7a7d68411c
|
@ -1008,7 +1008,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
||||||
public void doBrowse(List<Message> browseList, int max) {
|
public void doBrowse(List<Message> browseList, int max) {
|
||||||
final ConnectionContext connectionContext = createConnectionContext();
|
final ConnectionContext connectionContext = createConnectionContext();
|
||||||
try {
|
try {
|
||||||
pageInMessages(false);
|
pageInMessages(!isUseCache());
|
||||||
List<MessageReference> toExpire = new ArrayList<MessageReference>();
|
List<MessageReference> toExpire = new ArrayList<MessageReference>();
|
||||||
|
|
||||||
pagedInPendingDispatchLock.writeLock().lock();
|
pagedInPendingDispatchLock.writeLock().lock();
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.activemq;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Enumeration;
|
import java.util.Enumeration;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import javax.jms.Message;
|
import javax.jms.Message;
|
||||||
import javax.jms.MessageConsumer;
|
import javax.jms.MessageConsumer;
|
||||||
import javax.jms.MessageProducer;
|
import javax.jms.MessageProducer;
|
||||||
|
@ -27,16 +26,16 @@ import javax.jms.Queue;
|
||||||
import javax.jms.QueueBrowser;
|
import javax.jms.QueueBrowser;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
import javax.management.ObjectName;
|
||||||
import org.apache.activemq.broker.StubConnection;
|
import javax.management.openmbean.CompositeData;
|
||||||
|
import javax.management.openmbean.TabularData;
|
||||||
|
import junit.framework.Test;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.jmx.QueueViewMBean;
|
||||||
import org.apache.activemq.broker.region.BaseDestination;
|
import org.apache.activemq.broker.region.BaseDestination;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ConnectionInfo;
|
|
||||||
import org.apache.activemq.command.ConsumerInfo;
|
|
||||||
import org.apache.activemq.command.MessageAck;
|
|
||||||
import org.apache.activemq.command.ProducerInfo;
|
|
||||||
import org.apache.activemq.command.SessionInfo;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -44,7 +43,11 @@ import org.apache.activemq.command.SessionInfo;
|
||||||
public class JmsQueueBrowserTest extends JmsTestSupport {
|
public class JmsQueueBrowserTest extends JmsTestSupport {
|
||||||
private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
|
private static final org.apache.commons.logging.Log LOG = org.apache.commons.logging.LogFactory
|
||||||
.getLog(JmsQueueBrowserTest.class);
|
.getLog(JmsQueueBrowserTest.class);
|
||||||
|
public boolean isUseCache = false;
|
||||||
|
|
||||||
|
public static Test suite() throws Exception {
|
||||||
|
return suite(JmsQueueBrowserTest.class);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests the queue browser. Browses the messages then the consumer tries to receive them. The messages should still
|
* Tests the queue browser. Browses the messages then the consumer tries to receive them. The messages should still
|
||||||
|
@ -106,12 +109,177 @@ public class JmsQueueBrowserTest extends JmsTestSupport {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void initCombosForTestBatchSendBrowseReceive() {
|
||||||
|
addCombinationValues("isUseCache", new Boolean[]{Boolean.TRUE, Boolean.FALSE});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testBatchSendBrowseReceive() throws Exception {
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
ActiveMQQueue destination = new ActiveMQQueue("TEST");
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
TextMessage[] outbound = new TextMessage[10];
|
||||||
|
for (int i=0; i<10; i++) {
|
||||||
|
outbound[i] = session.createTextMessage( i + " Message");
|
||||||
|
};
|
||||||
|
|
||||||
|
// lets consume any outstanding messages from previous test runs
|
||||||
|
while (consumer.receive(1000) != null) {
|
||||||
|
}
|
||||||
|
consumer.close();
|
||||||
|
|
||||||
|
for (int i=0;i<outbound.length; i++) {
|
||||||
|
producer.send(outbound[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
QueueBrowser browser = session.createBrowser((Queue) destination);
|
||||||
|
Enumeration enumeration = browser.getEnumeration();
|
||||||
|
|
||||||
|
for (int i=0; i<outbound.length; i++) {
|
||||||
|
assertTrue("should have a", enumeration.hasMoreElements());
|
||||||
|
assertEquals(outbound[i], enumeration.nextElement());
|
||||||
|
}
|
||||||
|
browser.close();
|
||||||
|
|
||||||
|
for (int i=0;i<outbound.length; i++) {
|
||||||
|
producer.send(outbound[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify second batch is visible to browse
|
||||||
|
browser = session.createBrowser((Queue) destination);
|
||||||
|
enumeration = browser.getEnumeration();
|
||||||
|
for (int j=0; j<2;j++) {
|
||||||
|
for (int i=0; i<outbound.length; i++) {
|
||||||
|
assertTrue("should have a", enumeration.hasMoreElements());
|
||||||
|
assertEquals("j=" + j + ", i=" + i, outbound[i].getText(), ((TextMessage) enumeration.nextElement()).getText());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
browser.close();
|
||||||
|
|
||||||
|
consumer = session.createConsumer(destination);
|
||||||
|
for (int i=0; i<outbound.length * 2; i++) {
|
||||||
|
assertNotNull("Got message: " + i, consumer.receive(2000));
|
||||||
|
}
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public void initCombosForTestBatchSendJmxBrowseReceive() {
|
||||||
|
addCombinationValues("isUseCache", new Boolean[]{Boolean.TRUE, Boolean.FALSE});
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testBatchSendJmxBrowseReceive() throws Exception {
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
ActiveMQQueue destination = new ActiveMQQueue("TEST");
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
TextMessage[] outbound = new TextMessage[10];
|
||||||
|
for (int i=0; i<10; i++) {
|
||||||
|
outbound[i] = session.createTextMessage( i + " Message");
|
||||||
|
};
|
||||||
|
|
||||||
|
// lets consume any outstanding messages from previous test runs
|
||||||
|
while (consumer.receive(1000) != null) {
|
||||||
|
}
|
||||||
|
consumer.close();
|
||||||
|
|
||||||
|
for (int i=0;i<outbound.length; i++) {
|
||||||
|
producer.send(outbound[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:Type=Queue,Destination=TEST,BrokerName=localhost");
|
||||||
|
|
||||||
|
LOG.info("Create QueueView MBean...");
|
||||||
|
QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext().newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
|
||||||
|
|
||||||
|
long concount = proxy.getConsumerCount();
|
||||||
|
LOG.info("Consumer Count :" + concount);
|
||||||
|
long messcount = proxy.getQueueSize();
|
||||||
|
LOG.info("current number of messages in the queue :" + messcount);
|
||||||
|
|
||||||
|
// lets browse
|
||||||
|
CompositeData[] compdatalist = proxy.browse();
|
||||||
|
if (compdatalist.length == 0) {
|
||||||
|
fail("There is no message in the queue:");
|
||||||
|
}
|
||||||
|
String[] messageIDs = new String[compdatalist.length];
|
||||||
|
|
||||||
|
for (int i = 0; i < compdatalist.length; i++) {
|
||||||
|
CompositeData cdata = compdatalist[i];
|
||||||
|
|
||||||
|
if (i == 0) {
|
||||||
|
LOG.info("Columns: " + cdata.getCompositeType().keySet());
|
||||||
|
}
|
||||||
|
messageIDs[i] = (String)cdata.get("JMSMessageID");
|
||||||
|
LOG.info("message " + i + " : " + cdata.values());
|
||||||
|
}
|
||||||
|
|
||||||
|
TabularData table = proxy.browseAsTable();
|
||||||
|
LOG.info("Found tabular data: " + table);
|
||||||
|
assertTrue("Table should not be empty!", table.size() > 0);
|
||||||
|
|
||||||
|
assertEquals("Queue size", outbound.length, proxy.getQueueSize());
|
||||||
|
assertEquals("Queue size", outbound.length, compdatalist.length);
|
||||||
|
assertEquals("Queue size", outbound.length, table.size());
|
||||||
|
|
||||||
|
|
||||||
|
LOG.info("Send another 10");
|
||||||
|
for (int i=0;i<outbound.length; i++) {
|
||||||
|
producer.send(outbound[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Browse again");
|
||||||
|
|
||||||
|
messcount = proxy.getQueueSize();
|
||||||
|
LOG.info("current number of messages in the queue :" + messcount);
|
||||||
|
|
||||||
|
compdatalist = proxy.browse();
|
||||||
|
if (compdatalist.length == 0) {
|
||||||
|
fail("There is no message in the queue:");
|
||||||
|
}
|
||||||
|
messageIDs = new String[compdatalist.length];
|
||||||
|
|
||||||
|
for (int i = 0; i < compdatalist.length; i++) {
|
||||||
|
CompositeData cdata = compdatalist[i];
|
||||||
|
|
||||||
|
if (i == 0) {
|
||||||
|
LOG.info("Columns: " + cdata.getCompositeType().keySet());
|
||||||
|
}
|
||||||
|
messageIDs[i] = (String)cdata.get("JMSMessageID");
|
||||||
|
LOG.info("message " + i + " : " + cdata.values());
|
||||||
|
}
|
||||||
|
|
||||||
|
table = proxy.browseAsTable();
|
||||||
|
LOG.info("Found tabular data: " + table);
|
||||||
|
assertTrue("Table should not be empty!", table.size() > 0);
|
||||||
|
|
||||||
|
assertEquals("Queue size", outbound.length*2, proxy.getQueueSize());
|
||||||
|
assertEquals("Queue size", outbound.length*2, compdatalist.length);
|
||||||
|
assertEquals("Queue size", outbound.length * 2, table.size());
|
||||||
|
|
||||||
|
consumer = session.createConsumer(destination);
|
||||||
|
for (int i=0; i<outbound.length * 2; i++) {
|
||||||
|
assertNotNull("Got message: " + i, consumer.receive(2000));
|
||||||
|
}
|
||||||
|
consumer.close();
|
||||||
|
}
|
||||||
|
|
||||||
public void testBrowseReceive() throws Exception {
|
public void testBrowseReceive() throws Exception {
|
||||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
ActiveMQQueue destination = new ActiveMQQueue("TEST");
|
ActiveMQQueue destination = new ActiveMQQueue("TEST");
|
||||||
|
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
|
// create consumer
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
// lets consume any outstanding messages from previous test runs
|
||||||
|
while (consumer.receive(1000) != null) {
|
||||||
|
}
|
||||||
|
|
||||||
Message[] outbound = new Message[]{session.createTextMessage("First Message"),
|
Message[] outbound = new Message[]{session.createTextMessage("First Message"),
|
||||||
session.createTextMessage("Second Message"),
|
session.createTextMessage("Second Message"),
|
||||||
session.createTextMessage("Third Message")};
|
session.createTextMessage("Third Message")};
|
||||||
|
@ -124,9 +292,6 @@ public class JmsQueueBrowserTest extends JmsTestSupport {
|
||||||
QueueBrowser browser = session.createBrowser((Queue) destination);
|
QueueBrowser browser = session.createBrowser((Queue) destination);
|
||||||
Enumeration enumeration = browser.getEnumeration();
|
Enumeration enumeration = browser.getEnumeration();
|
||||||
|
|
||||||
// create consumer
|
|
||||||
MessageConsumer consumer = session.createConsumer(destination);
|
|
||||||
|
|
||||||
// browse the first message
|
// browse the first message
|
||||||
assertTrue("should have received the first message", enumeration.hasMoreElements());
|
assertTrue("should have received the first message", enumeration.hasMoreElements());
|
||||||
assertEquals(outbound[0], (Message) enumeration.nextElement());
|
assertEquals(outbound[0], (Message) enumeration.nextElement());
|
||||||
|
@ -156,6 +321,10 @@ public class JmsQueueBrowserTest extends JmsTestSupport {
|
||||||
MessageProducer producer = session.createProducer(destination);
|
MessageProducer producer = session.createProducer(destination);
|
||||||
MessageConsumer consumer = session.createConsumer(destinationPrefetch10);
|
MessageConsumer consumer = session.createConsumer(destinationPrefetch10);
|
||||||
|
|
||||||
|
// lets consume any outstanding messages from previous test runs
|
||||||
|
while (consumer.receive(1000) != null) {
|
||||||
|
}
|
||||||
|
|
||||||
for (int i=0; i<numMessages; i++) {
|
for (int i=0; i<numMessages; i++) {
|
||||||
TextMessage message = session.createTextMessage("Message: " + i);
|
TextMessage message = session.createTextMessage("Message: " + i);
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
|
@ -199,6 +368,12 @@ public class JmsQueueBrowserTest extends JmsTestSupport {
|
||||||
session.createTextMessage("Third Message")};
|
session.createTextMessage("Third Message")};
|
||||||
|
|
||||||
|
|
||||||
|
// create consumer
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
// lets consume any outstanding messages from previous test runs
|
||||||
|
while (consumer.receive(1000) != null) {
|
||||||
|
}
|
||||||
|
|
||||||
MessageProducer producer = session.createProducer(destination);
|
MessageProducer producer = session.createProducer(destination);
|
||||||
producer.send(outbound[0]);
|
producer.send(outbound[0]);
|
||||||
producer.send(outbound[1]);
|
producer.send(outbound[1]);
|
||||||
|
@ -218,8 +393,6 @@ public class JmsQueueBrowserTest extends JmsTestSupport {
|
||||||
|
|
||||||
browser.close();
|
browser.close();
|
||||||
|
|
||||||
// create consumer
|
|
||||||
MessageConsumer consumer = session.createConsumer(destination);
|
|
||||||
|
|
||||||
// Receive the first message.
|
// Receive the first message.
|
||||||
TextMessage msg = (TextMessage)consumer.receive(1000);
|
TextMessage msg = (TextMessage)consumer.receive(1000);
|
||||||
|
@ -233,4 +406,15 @@ public class JmsQueueBrowserTest extends JmsTestSupport {
|
||||||
producer.close();
|
producer.close();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected BrokerService createBroker() throws Exception {
|
||||||
|
BrokerService brokerService = super.createBroker();
|
||||||
|
PolicyMap policyMap = new PolicyMap();
|
||||||
|
PolicyEntry policyEntry = new PolicyEntry();
|
||||||
|
policyEntry.setUseCache(isUseCache);
|
||||||
|
policyMap.setDefaultEntry(policyEntry);
|
||||||
|
brokerService.setDestinationPolicy(policyMap);
|
||||||
|
return brokerService;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue