mirror of https://github.com/apache/activemq.git
Updated unit tests git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1133399 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
629b18cf27
commit
c43282ed8e
|
@ -87,253 +87,252 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
|
|||
TestRunner.run(MBeanTest.class);
|
||||
}
|
||||
|
||||
// public void testConnectors() throws Exception{
|
||||
// ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
|
||||
// BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
|
||||
// assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort());
|
||||
//
|
||||
// }
|
||||
//
|
||||
// public void testMBeans() throws Exception {
|
||||
// connection = connectionFactory.createConnection();
|
||||
// useConnection(connection);
|
||||
//
|
||||
// // test all the various MBeans now we have a producer, consumer and
|
||||
// // messages on a queue
|
||||
// assertSendViaMBean();
|
||||
// assertQueueBrowseWorks();
|
||||
// assertCreateAndDestroyDurableSubscriptions();
|
||||
// assertConsumerCounts();
|
||||
// assertProducerCounts();
|
||||
// }
|
||||
//
|
||||
// public void testMoveMessages() throws Exception {
|
||||
// connection = connectionFactory.createConnection();
|
||||
// useConnection(connection);
|
||||
//
|
||||
// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
//
|
||||
// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
//
|
||||
// CompositeData[] compdatalist = queue.browse();
|
||||
// int initialQueueSize = compdatalist.length;
|
||||
// if (initialQueueSize == 0) {
|
||||
// fail("There is no message in the queue:");
|
||||
// }
|
||||
// else {
|
||||
// echo("Current queue size: " + initialQueueSize);
|
||||
// }
|
||||
// int messageCount = initialQueueSize;
|
||||
// String[] messageIDs = new String[messageCount];
|
||||
// for (int i = 0; i < messageCount; i++) {
|
||||
// CompositeData cdata = compdatalist[i];
|
||||
// String messageID = (String) cdata.get("JMSMessageID");
|
||||
// assertNotNull("Should have a message ID for message " + i, messageID);
|
||||
// messageIDs[i] = messageID;
|
||||
// }
|
||||
//
|
||||
// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
|
||||
//
|
||||
// echo("About to move " + messageCount + " messages");
|
||||
//
|
||||
// String newDestination = getSecondDestinationString();
|
||||
// for (String messageID : messageIDs) {
|
||||
// echo("Moving message: " + messageID);
|
||||
// queue.moveMessageTo(messageID, newDestination);
|
||||
// }
|
||||
//
|
||||
// echo("Now browsing the queue");
|
||||
// compdatalist = queue.browse();
|
||||
// int actualCount = compdatalist.length;
|
||||
// echo("Current queue size: " + actualCount);
|
||||
// assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount);
|
||||
//
|
||||
// echo("Now browsing the second queue");
|
||||
//
|
||||
// queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
|
||||
// QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
//
|
||||
// long newQueuesize = queueNew.getQueueSize();
|
||||
// echo("Second queue size: " + newQueuesize);
|
||||
// assertEquals("Unexpected number of messages ",messageCount, newQueuesize);
|
||||
//
|
||||
// // check memory usage migration
|
||||
// assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0);
|
||||
// assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
|
||||
// assertTrue("use cache", queueNew.isUseCache());
|
||||
// assertTrue("cache enabled", queueNew.isCacheEnabled());
|
||||
// }
|
||||
//
|
||||
// public void testRemoveMessages() throws Exception {
|
||||
// ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
|
||||
// BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
|
||||
// broker.addQueue(getDestinationString());
|
||||
//
|
||||
// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
//
|
||||
// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
// String msg1 = queue.sendTextMessage("message 1");
|
||||
// String msg2 = queue.sendTextMessage("message 2");
|
||||
//
|
||||
// assertTrue(queue.removeMessage(msg2));
|
||||
//
|
||||
// connection = connectionFactory.createConnection();
|
||||
// connection.start();
|
||||
// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
// ActiveMQDestination dest = createDestination();
|
||||
//
|
||||
// MessageConsumer consumer = session.createConsumer(dest);
|
||||
// Message message = consumer.receive(1000);
|
||||
// assertNotNull(message);
|
||||
// assertEquals(msg1, message.getJMSMessageID());
|
||||
//
|
||||
// String msg3 = queue.sendTextMessage("message 3");
|
||||
// message = consumer.receive(1000);
|
||||
// assertNotNull(message);
|
||||
// assertEquals(msg3, message.getJMSMessageID());
|
||||
//
|
||||
// message = consumer.receive(1000);
|
||||
// assertNull(message);
|
||||
//
|
||||
// }
|
||||
//
|
||||
// public void testRetryMessages() throws Exception {
|
||||
// // lets speed up redelivery
|
||||
// ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;
|
||||
// factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0);
|
||||
// factory.getRedeliveryPolicy().setMaximumRedeliveries(1);
|
||||
// factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0);
|
||||
// factory.getRedeliveryPolicy().setUseCollisionAvoidance(false);
|
||||
// factory.getRedeliveryPolicy().setUseExponentialBackOff(false);
|
||||
// factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0);
|
||||
//
|
||||
// connection = connectionFactory.createConnection();
|
||||
// useConnection(connection);
|
||||
//
|
||||
//
|
||||
// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
//
|
||||
// long initialQueueSize = queue.getQueueSize();
|
||||
// echo("current queue size: " + initialQueueSize);
|
||||
// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
|
||||
//
|
||||
// // lets create a duff consumer which keeps rolling back...
|
||||
// Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
// MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString()));
|
||||
// Message message = consumer.receive(5000);
|
||||
// while (message != null) {
|
||||
// echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount"));
|
||||
// session.rollback();
|
||||
// message = consumer.receive(2000);
|
||||
// }
|
||||
// consumer.close();
|
||||
// session.close();
|
||||
//
|
||||
//
|
||||
// // now lets get the dead letter queue
|
||||
// Thread.sleep(1000);
|
||||
//
|
||||
// ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME + ",BrokerName=localhost");
|
||||
// QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
|
||||
//
|
||||
// long initialDlqSize = dlq.getQueueSize();
|
||||
// CompositeData[] compdatalist = dlq.browse();
|
||||
// int dlqQueueSize = compdatalist.length;
|
||||
// if (dlqQueueSize == 0) {
|
||||
// fail("There are no messages in the queue:");
|
||||
// }
|
||||
// else {
|
||||
// echo("Current DLQ queue size: " + dlqQueueSize);
|
||||
// }
|
||||
// int messageCount = dlqQueueSize;
|
||||
// String[] messageIDs = new String[messageCount];
|
||||
// for (int i = 0; i < messageCount; i++) {
|
||||
// CompositeData cdata = compdatalist[i];
|
||||
// String messageID = (String) cdata.get("JMSMessageID");
|
||||
// assertNotNull("Should have a message ID for message " + i, messageID);
|
||||
// messageIDs[i] = messageID;
|
||||
// }
|
||||
//
|
||||
// int dlqMemUsage = dlq.getMemoryPercentUsage();
|
||||
// assertTrue("dlq has some memory usage", dlqMemUsage > 0);
|
||||
// assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
|
||||
//
|
||||
//
|
||||
// echo("About to retry " + messageCount + " messages");
|
||||
//
|
||||
// for (String messageID : messageIDs) {
|
||||
// echo("Retrying message: " + messageID);
|
||||
// dlq.retryMessage(messageID);
|
||||
// }
|
||||
//
|
||||
// long queueSize = queue.getQueueSize();
|
||||
// compdatalist = queue.browse();
|
||||
// int actualCount = compdatalist.length;
|
||||
// echo("Orginal queue size is now " + queueSize);
|
||||
// echo("Original browse queue size: " + actualCount);
|
||||
//
|
||||
// long dlqSize = dlq.getQueueSize();
|
||||
// echo("DLQ size: " + dlqSize);
|
||||
//
|
||||
// assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize);
|
||||
// assertEquals("queue size", initialQueueSize, queueSize);
|
||||
// assertEquals("browse queue size", initialQueueSize, actualCount);
|
||||
//
|
||||
// assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage());
|
||||
// }
|
||||
//
|
||||
// public void testMoveMessagesBySelector() throws Exception {
|
||||
// connection = connectionFactory.createConnection();
|
||||
// useConnection(connection);
|
||||
//
|
||||
// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
//
|
||||
// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
//
|
||||
// String newDestination = getSecondDestinationString();
|
||||
// queue.moveMatchingMessagesTo("counter > 2", newDestination);
|
||||
//
|
||||
// queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
|
||||
//
|
||||
// queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
// int movedSize = MESSAGE_COUNT-3;
|
||||
// assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
|
||||
//
|
||||
// // now lets remove them by selector
|
||||
// queue.removeMatchingMessages("counter > 2");
|
||||
//
|
||||
// assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
|
||||
// assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
|
||||
// }
|
||||
//
|
||||
// public void testCopyMessagesBySelector() throws Exception {
|
||||
// connection = connectionFactory.createConnection();
|
||||
// useConnection(connection);
|
||||
//
|
||||
// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
//
|
||||
// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
//
|
||||
// String newDestination = getSecondDestinationString();
|
||||
// long queueSize = queue.getQueueSize();
|
||||
// queue.copyMatchingMessagesTo("counter > 2", newDestination);
|
||||
//
|
||||
//
|
||||
//
|
||||
// queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
|
||||
//
|
||||
// queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
//
|
||||
// LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)");
|
||||
// assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize());
|
||||
// // now lets remove them by selector
|
||||
// queue.removeMatchingMessages("counter > 2");
|
||||
//
|
||||
// assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
|
||||
// assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
|
||||
// }
|
||||
public void testConnectors() throws Exception{
|
||||
ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
|
||||
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
|
||||
assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort());
|
||||
|
||||
}
|
||||
|
||||
public void testMBeans() throws Exception {
|
||||
connection = connectionFactory.createConnection();
|
||||
useConnection(connection);
|
||||
|
||||
// test all the various MBeans now we have a producer, consumer and
|
||||
// messages on a queue
|
||||
assertSendViaMBean();
|
||||
assertQueueBrowseWorks();
|
||||
assertCreateAndDestroyDurableSubscriptions();
|
||||
assertConsumerCounts();
|
||||
assertProducerCounts();
|
||||
}
|
||||
|
||||
public void testMoveMessages() throws Exception {
|
||||
connection = connectionFactory.createConnection();
|
||||
useConnection(connection);
|
||||
|
||||
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
|
||||
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
|
||||
CompositeData[] compdatalist = queue.browse();
|
||||
int initialQueueSize = compdatalist.length;
|
||||
if (initialQueueSize == 0) {
|
||||
fail("There is no message in the queue:");
|
||||
}
|
||||
else {
|
||||
echo("Current queue size: " + initialQueueSize);
|
||||
}
|
||||
int messageCount = initialQueueSize;
|
||||
String[] messageIDs = new String[messageCount];
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
CompositeData cdata = compdatalist[i];
|
||||
String messageID = (String) cdata.get("JMSMessageID");
|
||||
assertNotNull("Should have a message ID for message " + i, messageID);
|
||||
messageIDs[i] = messageID;
|
||||
}
|
||||
|
||||
assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
|
||||
|
||||
echo("About to move " + messageCount + " messages");
|
||||
|
||||
String newDestination = getSecondDestinationString();
|
||||
for (String messageID : messageIDs) {
|
||||
echo("Moving message: " + messageID);
|
||||
queue.moveMessageTo(messageID, newDestination);
|
||||
}
|
||||
|
||||
echo("Now browsing the queue");
|
||||
compdatalist = queue.browse();
|
||||
int actualCount = compdatalist.length;
|
||||
echo("Current queue size: " + actualCount);
|
||||
assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount);
|
||||
|
||||
echo("Now browsing the second queue");
|
||||
|
||||
queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
|
||||
QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
|
||||
long newQueuesize = queueNew.getQueueSize();
|
||||
echo("Second queue size: " + newQueuesize);
|
||||
assertEquals("Unexpected number of messages ",messageCount, newQueuesize);
|
||||
|
||||
// check memory usage migration
|
||||
assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0);
|
||||
assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
|
||||
assertTrue("use cache", queueNew.isUseCache());
|
||||
assertTrue("cache enabled", queueNew.isCacheEnabled());
|
||||
}
|
||||
|
||||
public void testRemoveMessages() throws Exception {
|
||||
ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
|
||||
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
|
||||
broker.addQueue(getDestinationString());
|
||||
|
||||
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
|
||||
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
String msg1 = queue.sendTextMessage("message 1");
|
||||
String msg2 = queue.sendTextMessage("message 2");
|
||||
|
||||
assertTrue(queue.removeMessage(msg2));
|
||||
|
||||
connection = connectionFactory.createConnection();
|
||||
connection.start();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
ActiveMQDestination dest = createDestination();
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(dest);
|
||||
Message message = consumer.receive(1000);
|
||||
assertNotNull(message);
|
||||
assertEquals(msg1, message.getJMSMessageID());
|
||||
|
||||
String msg3 = queue.sendTextMessage("message 3");
|
||||
message = consumer.receive(1000);
|
||||
assertNotNull(message);
|
||||
assertEquals(msg3, message.getJMSMessageID());
|
||||
|
||||
message = consumer.receive(1000);
|
||||
assertNull(message);
|
||||
|
||||
}
|
||||
|
||||
public void testRetryMessages() throws Exception {
|
||||
// lets speed up redelivery
|
||||
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;
|
||||
factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0);
|
||||
factory.getRedeliveryPolicy().setMaximumRedeliveries(1);
|
||||
factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0);
|
||||
factory.getRedeliveryPolicy().setUseCollisionAvoidance(false);
|
||||
factory.getRedeliveryPolicy().setUseExponentialBackOff(false);
|
||||
factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0);
|
||||
|
||||
connection = connectionFactory.createConnection();
|
||||
useConnection(connection);
|
||||
|
||||
|
||||
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
|
||||
long initialQueueSize = queue.getQueueSize();
|
||||
echo("current queue size: " + initialQueueSize);
|
||||
assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
|
||||
|
||||
// lets create a duff consumer which keeps rolling back...
|
||||
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
|
||||
MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString()));
|
||||
Message message = consumer.receive(5000);
|
||||
while (message != null) {
|
||||
echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount"));
|
||||
session.rollback();
|
||||
message = consumer.receive(2000);
|
||||
}
|
||||
consumer.close();
|
||||
session.close();
|
||||
|
||||
|
||||
// now lets get the dead letter queue
|
||||
Thread.sleep(1000);
|
||||
|
||||
ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME + ",BrokerName=localhost");
|
||||
QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
|
||||
|
||||
long initialDlqSize = dlq.getQueueSize();
|
||||
CompositeData[] compdatalist = dlq.browse();
|
||||
int dlqQueueSize = compdatalist.length;
|
||||
if (dlqQueueSize == 0) {
|
||||
fail("There are no messages in the queue:");
|
||||
}
|
||||
else {
|
||||
echo("Current DLQ queue size: " + dlqQueueSize);
|
||||
}
|
||||
int messageCount = dlqQueueSize;
|
||||
String[] messageIDs = new String[messageCount];
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
CompositeData cdata = compdatalist[i];
|
||||
String messageID = (String) cdata.get("JMSMessageID");
|
||||
assertNotNull("Should have a message ID for message " + i, messageID);
|
||||
messageIDs[i] = messageID;
|
||||
}
|
||||
|
||||
int dlqMemUsage = dlq.getMemoryPercentUsage();
|
||||
assertTrue("dlq has some memory usage", dlqMemUsage > 0);
|
||||
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
|
||||
|
||||
|
||||
echo("About to retry " + messageCount + " messages");
|
||||
|
||||
for (String messageID : messageIDs) {
|
||||
echo("Retrying message: " + messageID);
|
||||
dlq.retryMessage(messageID);
|
||||
}
|
||||
|
||||
long queueSize = queue.getQueueSize();
|
||||
compdatalist = queue.browse();
|
||||
int actualCount = compdatalist.length;
|
||||
echo("Orginal queue size is now " + queueSize);
|
||||
echo("Original browse queue size: " + actualCount);
|
||||
|
||||
long dlqSize = dlq.getQueueSize();
|
||||
echo("DLQ size: " + dlqSize);
|
||||
|
||||
assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize);
|
||||
assertEquals("queue size", initialQueueSize, queueSize);
|
||||
assertEquals("browse queue size", initialQueueSize, actualCount);
|
||||
|
||||
assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage());
|
||||
}
|
||||
|
||||
public void testMoveMessagesBySelector() throws Exception {
|
||||
connection = connectionFactory.createConnection();
|
||||
useConnection(connection);
|
||||
|
||||
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
|
||||
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
|
||||
String newDestination = getSecondDestinationString();
|
||||
queue.moveMatchingMessagesTo("counter > 2", newDestination);
|
||||
|
||||
queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
|
||||
|
||||
queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
int movedSize = MESSAGE_COUNT-3;
|
||||
assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
|
||||
|
||||
// now lets remove them by selector
|
||||
queue.removeMatchingMessages("counter > 2");
|
||||
|
||||
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
|
||||
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
|
||||
}
|
||||
|
||||
public void testCopyMessagesBySelector() throws Exception {
|
||||
connection = connectionFactory.createConnection();
|
||||
useConnection(connection);
|
||||
|
||||
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
|
||||
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
|
||||
String newDestination = getSecondDestinationString();
|
||||
long queueSize = queue.getQueueSize();
|
||||
queue.copyMatchingMessagesTo("counter > 2", newDestination);
|
||||
|
||||
|
||||
|
||||
queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
|
||||
|
||||
queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
|
||||
LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)");
|
||||
assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize());
|
||||
// now lets remove them by selector
|
||||
queue.removeMatchingMessages("counter > 2");
|
||||
|
||||
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
|
||||
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
|
||||
}
|
||||
|
||||
protected void assertSendViaMBean() throws Exception {
|
||||
String queueName = getDestinationString() + ".SendMBBean";
|
||||
|
@ -793,103 +792,103 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
|
|||
assertEquals(0, broker.getDynamicDestinationProducers().length);
|
||||
}
|
||||
|
||||
// public void testTempQueueJMXDelete() throws Exception {
|
||||
// connection = connectionFactory.createConnection();
|
||||
//
|
||||
// connection.setClientID(clientID);
|
||||
// connection.start();
|
||||
// Session session = connection.createSession(transacted, authMode);
|
||||
// ActiveMQTempQueue tQueue = (ActiveMQTempQueue) session.createTemporaryQueue();
|
||||
// Thread.sleep(1000);
|
||||
// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type="+ JMXSupport.encodeObjectNamePart(tQueue.getDestinationTypeAsString())+",Destination=" + JMXSupport.encodeObjectNamePart(tQueue.getPhysicalName()) + ",BrokerName=localhost");
|
||||
//
|
||||
// // should not throw an exception
|
||||
// mbeanServer.getObjectInstance(queueViewMBeanName);
|
||||
//
|
||||
// tQueue.delete();
|
||||
// Thread.sleep(1000);
|
||||
// try {
|
||||
// // should throw an exception
|
||||
// mbeanServer.getObjectInstance(queueViewMBeanName);
|
||||
//
|
||||
// fail("should be deleted already!");
|
||||
// } catch (Exception e) {
|
||||
// // expected!
|
||||
// }
|
||||
//
|
||||
// }
|
||||
//
|
||||
// // Test for AMQ-3029
|
||||
// public void testBrowseBlobMessages() throws Exception {
|
||||
// connection = connectionFactory.createConnection();
|
||||
// useConnectionWithBlobMessage(connection);
|
||||
//
|
||||
// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
//
|
||||
// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
//
|
||||
// CompositeData[] compdatalist = queue.browse();
|
||||
// int initialQueueSize = compdatalist.length;
|
||||
// if (initialQueueSize == 0) {
|
||||
// fail("There is no message in the queue:");
|
||||
// }
|
||||
// else {
|
||||
// echo("Current queue size: " + initialQueueSize);
|
||||
// }
|
||||
// int messageCount = initialQueueSize;
|
||||
// String[] messageIDs = new String[messageCount];
|
||||
// for (int i = 0; i < messageCount; i++) {
|
||||
// CompositeData cdata = compdatalist[i];
|
||||
// String messageID = (String) cdata.get("JMSMessageID");
|
||||
// assertNotNull("Should have a message ID for message " + i, messageID);
|
||||
//
|
||||
// messageIDs[i] = messageID;
|
||||
// }
|
||||
//
|
||||
// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
|
||||
// }
|
||||
//
|
||||
// public void testBrowseBytesMessages() throws Exception {
|
||||
// connection = connectionFactory.createConnection();
|
||||
// useConnectionWithByteMessage(connection);
|
||||
//
|
||||
// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
//
|
||||
// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
//
|
||||
// CompositeData[] compdatalist = queue.browse();
|
||||
// int initialQueueSize = compdatalist.length;
|
||||
// if (initialQueueSize == 0) {
|
||||
// fail("There is no message in the queue:");
|
||||
// }
|
||||
// else {
|
||||
// echo("Current queue size: " + initialQueueSize);
|
||||
// }
|
||||
// int messageCount = initialQueueSize;
|
||||
// String[] messageIDs = new String[messageCount];
|
||||
// for (int i = 0; i < messageCount; i++) {
|
||||
// CompositeData cdata = compdatalist[i];
|
||||
// String messageID = (String) cdata.get("JMSMessageID");
|
||||
// assertNotNull("Should have a message ID for message " + i, messageID);
|
||||
// messageIDs[i] = messageID;
|
||||
//
|
||||
// Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW);
|
||||
// assertNotNull("should be a preview", preview);
|
||||
// assertTrue("not empty", preview.length > 0);
|
||||
// }
|
||||
//
|
||||
// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
|
||||
//
|
||||
// // consume all the messages
|
||||
// echo("Attempting to consume all bytes messages from: " + destination);
|
||||
// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
// MessageConsumer consumer = session.createConsumer(destination);
|
||||
// for (int i=0; i<MESSAGE_COUNT; i++) {
|
||||
// Message message = consumer.receive(5000);
|
||||
// assertNotNull(message);
|
||||
// assertTrue(message instanceof BytesMessage);
|
||||
// }
|
||||
// consumer.close();
|
||||
// session.close();
|
||||
// }
|
||||
public void testTempQueueJMXDelete() throws Exception {
|
||||
connection = connectionFactory.createConnection();
|
||||
|
||||
connection.setClientID(clientID);
|
||||
connection.start();
|
||||
Session session = connection.createSession(transacted, authMode);
|
||||
ActiveMQTempQueue tQueue = (ActiveMQTempQueue) session.createTemporaryQueue();
|
||||
Thread.sleep(1000);
|
||||
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type="+ JMXSupport.encodeObjectNamePart(tQueue.getDestinationTypeAsString())+",Destination=" + JMXSupport.encodeObjectNamePart(tQueue.getPhysicalName()) + ",BrokerName=localhost");
|
||||
|
||||
// should not throw an exception
|
||||
mbeanServer.getObjectInstance(queueViewMBeanName);
|
||||
|
||||
tQueue.delete();
|
||||
Thread.sleep(1000);
|
||||
try {
|
||||
// should throw an exception
|
||||
mbeanServer.getObjectInstance(queueViewMBeanName);
|
||||
|
||||
fail("should be deleted already!");
|
||||
} catch (Exception e) {
|
||||
// expected!
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Test for AMQ-3029
|
||||
public void testBrowseBlobMessages() throws Exception {
|
||||
connection = connectionFactory.createConnection();
|
||||
useConnectionWithBlobMessage(connection);
|
||||
|
||||
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
|
||||
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
|
||||
CompositeData[] compdatalist = queue.browse();
|
||||
int initialQueueSize = compdatalist.length;
|
||||
if (initialQueueSize == 0) {
|
||||
fail("There is no message in the queue:");
|
||||
}
|
||||
else {
|
||||
echo("Current queue size: " + initialQueueSize);
|
||||
}
|
||||
int messageCount = initialQueueSize;
|
||||
String[] messageIDs = new String[messageCount];
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
CompositeData cdata = compdatalist[i];
|
||||
String messageID = (String) cdata.get("JMSMessageID");
|
||||
assertNotNull("Should have a message ID for message " + i, messageID);
|
||||
|
||||
messageIDs[i] = messageID;
|
||||
}
|
||||
|
||||
assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
|
||||
}
|
||||
|
||||
public void testBrowseBytesMessages() throws Exception {
|
||||
connection = connectionFactory.createConnection();
|
||||
useConnectionWithByteMessage(connection);
|
||||
|
||||
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
|
||||
|
||||
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
|
||||
|
||||
CompositeData[] compdatalist = queue.browse();
|
||||
int initialQueueSize = compdatalist.length;
|
||||
if (initialQueueSize == 0) {
|
||||
fail("There is no message in the queue:");
|
||||
}
|
||||
else {
|
||||
echo("Current queue size: " + initialQueueSize);
|
||||
}
|
||||
int messageCount = initialQueueSize;
|
||||
String[] messageIDs = new String[messageCount];
|
||||
for (int i = 0; i < messageCount; i++) {
|
||||
CompositeData cdata = compdatalist[i];
|
||||
String messageID = (String) cdata.get("JMSMessageID");
|
||||
assertNotNull("Should have a message ID for message " + i, messageID);
|
||||
messageIDs[i] = messageID;
|
||||
|
||||
Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW);
|
||||
assertNotNull("should be a preview", preview);
|
||||
assertTrue("not empty", preview.length > 0);
|
||||
}
|
||||
|
||||
assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
|
||||
|
||||
// consume all the messages
|
||||
echo("Attempting to consume all bytes messages from: " + destination);
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
for (int i=0; i<MESSAGE_COUNT; i++) {
|
||||
Message message = consumer.receive(5000);
|
||||
assertNotNull(message);
|
||||
assertTrue(message instanceof BytesMessage);
|
||||
}
|
||||
consumer.close();
|
||||
session.close();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue