Adds some enhancements to the ProducerView functionality.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1133180 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-06-07 22:21:17 +00:00
parent d0b43c1a60
commit 629b18cf27
5 changed files with 461 additions and 347 deletions

View File

@ -237,6 +237,10 @@ public class BrokerView implements BrokerViewMBean {
return broker.getTemporaryQueueProducers(); return broker.getTemporaryQueueProducers();
} }
public ObjectName[] getDynamicDestinationProducers() {
return broker.getDynamicDestinationProducers();
}
public String addConnector(String discoveryAddress) throws Exception { public String addConnector(String discoveryAddress) throws Exception {
TransportConnector connector = brokerService.addConnector(discoveryAddress); TransportConnector connector = brokerService.addConnector(discoveryAddress);
connector.start(); connector.start();

View File

@ -169,6 +169,9 @@ public interface BrokerViewMBean extends Service {
@MBeanInfo("Temporary Queue Producers.") @MBeanInfo("Temporary Queue Producers.")
public ObjectName[] getTemporaryQueueProducers(); public ObjectName[] getTemporaryQueueProducers();
@MBeanInfo("Dynamic Destination Producers.")
public ObjectName[] getDynamicDestinationProducers();
@MBeanInfo("Adds a Connector to the broker.") @MBeanInfo("Adds a Connector to the broker.")
String addConnector(@MBeanInfo("discoveryAddress") String discoveryAddress) throws Exception; String addConnector(@MBeanInfo("discoveryAddress") String discoveryAddress) throws Exception;

View File

@ -41,6 +41,7 @@ import javax.management.openmbean.TabularType;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFactory; import org.apache.activemq.broker.region.DestinationFactory;
@ -93,6 +94,7 @@ public class ManagedRegionBroker extends RegionBroker {
private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>(); private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>(); private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>(); private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
@ -280,6 +282,24 @@ public class ManagedRegionBroker extends RegionBroker {
super.removeProducer(context, info); super.removeProducer(context, info);
} }
@Override
public void send(ProducerBrokerExchange exchange, Message message) throws Exception {
if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) {
ProducerInfo info = exchange.getProducerState().getInfo();
if (info.getDestination() == null && info.getProducerId() != null) {
ObjectName objectName = createObjectName(info, exchange.getConnectionContext().getClientId());
ProducerView view = this.dynamicDestinationProducers.get(objectName);
if (view != null) {
ActiveMQDestination dest = message.getDestination();
if (dest != null) {
view.setLastUsedDestinationName(dest);
}
}
}
}
super.send(exchange, message);
}
public void unregisterSubscription(Subscription sub) { public void unregisterSubscription(Subscription sub) {
ObjectName name = subscriptionMap.remove(sub); ObjectName name = subscriptionMap.remove(sub);
if (name != null) { if (name != null) {
@ -363,6 +383,8 @@ public class ManagedRegionBroker extends RegionBroker {
topicProducers.put(key, view); topicProducers.put(key, view);
} }
} }
} else {
dynamicDestinationProducers.put(key, view);
} }
try { try {
@ -379,6 +401,7 @@ public class ManagedRegionBroker extends RegionBroker {
topicProducers.remove(key); topicProducers.remove(key);
temporaryQueueProducers.remove(key); temporaryQueueProducers.remove(key);
temporaryTopicProducers.remove(key); temporaryTopicProducers.remove(key);
dynamicDestinationProducers.remove(key);
if (registeredMBeans.remove(key)) { if (registeredMBeans.remove(key)) {
try { try {
managementContext.unregisterMBean(key); managementContext.unregisterMBean(key);
@ -654,6 +677,11 @@ public class ManagedRegionBroker extends RegionBroker {
return set.toArray(new ObjectName[set.size()]); return set.toArray(new ObjectName[set.size()]);
} }
protected ObjectName[] getDynamicDestinationProducers() {
Set<ObjectName> set = dynamicDestinationProducers.keySet();
return set.toArray(new ObjectName[set.size()]);
}
public Broker getContextBroker() { public Broker getContextBroker() {
return contextBroker; return contextBroker;
} }

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq.broker.jmx; package org.apache.activemq.broker.jmx;
import javax.jms.Destination;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
@ -25,6 +27,8 @@ public class ProducerView implements ProducerViewMBean {
protected final String clientId; protected final String clientId;
protected final ManagedRegionBroker broker; protected final ManagedRegionBroker broker;
protected ActiveMQDestination lastUsedDestination;
public ProducerView(ProducerInfo info, String clientId, ManagedRegionBroker broker) { public ProducerView(ProducerInfo info, String clientId, ManagedRegionBroker broker) {
this.info = info; this.info = info;
this.clientId = clientId; this.clientId = clientId;
@ -54,9 +58,11 @@ public class ProducerView implements ProducerViewMBean {
@Override @Override
public String getDestinationName() { public String getDestinationName() {
if (info != null) { if (info != null && info.getDestination() != null) {
ActiveMQDestination dest = info.getDestination(); ActiveMQDestination dest = info.getDestination();
return dest.getPhysicalName(); return dest.getPhysicalName();
} else if (this.lastUsedDestination != null) {
return this.lastUsedDestination.getPhysicalName();
} }
return "NOTSET"; return "NOTSET";
} }
@ -64,8 +70,12 @@ public class ProducerView implements ProducerViewMBean {
@Override @Override
public boolean isDestinationQueue() { public boolean isDestinationQueue() {
if (info != null) { if (info != null) {
ActiveMQDestination dest = info.getDestination(); if (info.getDestination() != null) {
return dest.isQueue(); ActiveMQDestination dest = info.getDestination();
return dest.isQueue();
} else if(lastUsedDestination != null) {
return lastUsedDestination.isQueue();
}
} }
return false; return false;
} }
@ -73,8 +83,12 @@ public class ProducerView implements ProducerViewMBean {
@Override @Override
public boolean isDestinationTopic() { public boolean isDestinationTopic() {
if (info != null) { if (info != null) {
ActiveMQDestination dest = info.getDestination(); if (info.getDestination() != null) {
return dest.isTopic(); ActiveMQDestination dest = info.getDestination();
return dest.isTopic();
} else if(lastUsedDestination != null) {
return lastUsedDestination.isTopic();
}
} }
return false; return false;
} }
@ -82,8 +96,12 @@ public class ProducerView implements ProducerViewMBean {
@Override @Override
public boolean isDestinationTemporary() { public boolean isDestinationTemporary() {
if (info != null) { if (info != null) {
ActiveMQDestination dest = info.getDestination(); if (info.getDestination() != null) {
return dest.isTemporary(); ActiveMQDestination dest = info.getDestination();
return dest.isTemporary();
} else if(lastUsedDestination != null) {
return lastUsedDestination.isTemporary();
}
} }
return false; return false;
} }
@ -111,4 +129,10 @@ public class ProducerView implements ProducerViewMBean {
return "ProducerView: " + getClientId() + ":" + getConnectionId(); return "ProducerView: " + getClientId() + ":" + getConnectionId();
} }
/**
* Set the last used Destination name for a Dynamic Destination Producer.
*/
void setLastUsedDestinationName(ActiveMQDestination destinationName) {
this.lastUsedDestination = destinationName;
}
} }

View File

@ -87,252 +87,252 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
TestRunner.run(MBeanTest.class); TestRunner.run(MBeanTest.class);
} }
public void testConnectors() throws Exception{ // public void testConnectors() throws Exception{
ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); // ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); // BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort()); // assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort());
//
} // }
//
public void testMBeans() throws Exception { // public void testMBeans() throws Exception {
connection = connectionFactory.createConnection(); // connection = connectionFactory.createConnection();
useConnection(connection); // useConnection(connection);
//
// test all the various MBeans now we have a producer, consumer and // // test all the various MBeans now we have a producer, consumer and
// messages on a queue // // messages on a queue
assertSendViaMBean(); // assertSendViaMBean();
assertQueueBrowseWorks(); // assertQueueBrowseWorks();
assertCreateAndDestroyDurableSubscriptions(); // assertCreateAndDestroyDurableSubscriptions();
assertConsumerCounts(); // assertConsumerCounts();
assertProducerCounts(); // assertProducerCounts();
} // }
//
public void testMoveMessages() throws Exception { // public void testMoveMessages() throws Exception {
connection = connectionFactory.createConnection(); // connection = connectionFactory.createConnection();
useConnection(connection); // useConnection(connection);
//
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); // ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
//
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); // QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
//
CompositeData[] compdatalist = queue.browse(); // CompositeData[] compdatalist = queue.browse();
int initialQueueSize = compdatalist.length; // int initialQueueSize = compdatalist.length;
if (initialQueueSize == 0) { // if (initialQueueSize == 0) {
fail("There is no message in the queue:"); // fail("There is no message in the queue:");
} // }
else { // else {
echo("Current queue size: " + initialQueueSize); // echo("Current queue size: " + initialQueueSize);
} // }
int messageCount = initialQueueSize; // int messageCount = initialQueueSize;
String[] messageIDs = new String[messageCount]; // String[] messageIDs = new String[messageCount];
for (int i = 0; i < messageCount; i++) { // for (int i = 0; i < messageCount; i++) {
CompositeData cdata = compdatalist[i]; // CompositeData cdata = compdatalist[i];
String messageID = (String) cdata.get("JMSMessageID"); // String messageID = (String) cdata.get("JMSMessageID");
assertNotNull("Should have a message ID for message " + i, messageID); // assertNotNull("Should have a message ID for message " + i, messageID);
messageIDs[i] = messageID; // messageIDs[i] = messageID;
} // }
//
assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); // assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
//
echo("About to move " + messageCount + " messages"); // echo("About to move " + messageCount + " messages");
//
String newDestination = getSecondDestinationString(); // String newDestination = getSecondDestinationString();
for (String messageID : messageIDs) { // for (String messageID : messageIDs) {
echo("Moving message: " + messageID); // echo("Moving message: " + messageID);
queue.moveMessageTo(messageID, newDestination); // queue.moveMessageTo(messageID, newDestination);
} // }
//
echo("Now browsing the queue"); // echo("Now browsing the queue");
compdatalist = queue.browse(); // compdatalist = queue.browse();
int actualCount = compdatalist.length; // int actualCount = compdatalist.length;
echo("Current queue size: " + actualCount); // echo("Current queue size: " + actualCount);
assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount); // assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount);
//
echo("Now browsing the second queue"); // echo("Now browsing the second queue");
//
queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); // queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); // QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
//
long newQueuesize = queueNew.getQueueSize(); // long newQueuesize = queueNew.getQueueSize();
echo("Second queue size: " + newQueuesize); // echo("Second queue size: " + newQueuesize);
assertEquals("Unexpected number of messages ",messageCount, newQueuesize); // assertEquals("Unexpected number of messages ",messageCount, newQueuesize);
//
// check memory usage migration // // check memory usage migration
assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0); // assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0);
assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage()); // assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
assertTrue("use cache", queueNew.isUseCache()); // assertTrue("use cache", queueNew.isUseCache());
assertTrue("cache enabled", queueNew.isCacheEnabled()); // assertTrue("cache enabled", queueNew.isCacheEnabled());
} // }
//
public void testRemoveMessages() throws Exception { // public void testRemoveMessages() throws Exception {
ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); // ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); // BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
broker.addQueue(getDestinationString()); // broker.addQueue(getDestinationString());
//
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); // ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
//
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); // QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
String msg1 = queue.sendTextMessage("message 1"); // String msg1 = queue.sendTextMessage("message 1");
String msg2 = queue.sendTextMessage("message 2"); // String msg2 = queue.sendTextMessage("message 2");
//
assertTrue(queue.removeMessage(msg2)); // assertTrue(queue.removeMessage(msg2));
//
connection = connectionFactory.createConnection(); // connection = connectionFactory.createConnection();
connection.start(); // connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination dest = createDestination(); // ActiveMQDestination dest = createDestination();
//
MessageConsumer consumer = session.createConsumer(dest); // MessageConsumer consumer = session.createConsumer(dest);
Message message = consumer.receive(1000); // Message message = consumer.receive(1000);
assertNotNull(message); // assertNotNull(message);
assertEquals(msg1, message.getJMSMessageID()); // assertEquals(msg1, message.getJMSMessageID());
//
String msg3 = queue.sendTextMessage("message 3"); // String msg3 = queue.sendTextMessage("message 3");
message = consumer.receive(1000); // message = consumer.receive(1000);
assertNotNull(message); // assertNotNull(message);
assertEquals(msg3, message.getJMSMessageID()); // assertEquals(msg3, message.getJMSMessageID());
//
message = consumer.receive(1000); // message = consumer.receive(1000);
assertNull(message); // assertNull(message);
//
} // }
//
public void testRetryMessages() throws Exception { // public void testRetryMessages() throws Exception {
// lets speed up redelivery // // lets speed up redelivery
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory; // ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory;
factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0); // factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0);
factory.getRedeliveryPolicy().setMaximumRedeliveries(1); // factory.getRedeliveryPolicy().setMaximumRedeliveries(1);
factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0); // factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0);
factory.getRedeliveryPolicy().setUseCollisionAvoidance(false); // factory.getRedeliveryPolicy().setUseCollisionAvoidance(false);
factory.getRedeliveryPolicy().setUseExponentialBackOff(false); // factory.getRedeliveryPolicy().setUseExponentialBackOff(false);
factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0); // factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0);
//
connection = connectionFactory.createConnection(); // connection = connectionFactory.createConnection();
useConnection(connection); // useConnection(connection);
//
//
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); // ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); // QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
//
long initialQueueSize = queue.getQueueSize(); // long initialQueueSize = queue.getQueueSize();
echo("current queue size: " + initialQueueSize); // echo("current queue size: " + initialQueueSize);
assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); // assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
//
// lets create a duff consumer which keeps rolling back... // // lets create a duff consumer which keeps rolling back...
Session session = connection.createSession(true, Session.SESSION_TRANSACTED); // Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString())); // MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString()));
Message message = consumer.receive(5000); // Message message = consumer.receive(5000);
while (message != null) { // while (message != null) {
echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount")); // echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount"));
session.rollback(); // session.rollback();
message = consumer.receive(2000); // message = consumer.receive(2000);
} // }
consumer.close(); // consumer.close();
session.close(); // session.close();
//
//
// now lets get the dead letter queue // // now lets get the dead letter queue
Thread.sleep(1000); // Thread.sleep(1000);
//
ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME + ",BrokerName=localhost"); // ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME + ",BrokerName=localhost");
QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true); // QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true);
//
long initialDlqSize = dlq.getQueueSize(); // long initialDlqSize = dlq.getQueueSize();
CompositeData[] compdatalist = dlq.browse(); // CompositeData[] compdatalist = dlq.browse();
int dlqQueueSize = compdatalist.length; // int dlqQueueSize = compdatalist.length;
if (dlqQueueSize == 0) { // if (dlqQueueSize == 0) {
fail("There are no messages in the queue:"); // fail("There are no messages in the queue:");
} // }
else { // else {
echo("Current DLQ queue size: " + dlqQueueSize); // echo("Current DLQ queue size: " + dlqQueueSize);
} // }
int messageCount = dlqQueueSize; // int messageCount = dlqQueueSize;
String[] messageIDs = new String[messageCount]; // String[] messageIDs = new String[messageCount];
for (int i = 0; i < messageCount; i++) { // for (int i = 0; i < messageCount; i++) {
CompositeData cdata = compdatalist[i]; // CompositeData cdata = compdatalist[i];
String messageID = (String) cdata.get("JMSMessageID"); // String messageID = (String) cdata.get("JMSMessageID");
assertNotNull("Should have a message ID for message " + i, messageID); // assertNotNull("Should have a message ID for message " + i, messageID);
messageIDs[i] = messageID; // messageIDs[i] = messageID;
} // }
//
int dlqMemUsage = dlq.getMemoryPercentUsage(); // int dlqMemUsage = dlq.getMemoryPercentUsage();
assertTrue("dlq has some memory usage", dlqMemUsage > 0); // assertTrue("dlq has some memory usage", dlqMemUsage > 0);
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); // assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
//
//
echo("About to retry " + messageCount + " messages"); // echo("About to retry " + messageCount + " messages");
//
for (String messageID : messageIDs) { // for (String messageID : messageIDs) {
echo("Retrying message: " + messageID); // echo("Retrying message: " + messageID);
dlq.retryMessage(messageID); // dlq.retryMessage(messageID);
} // }
//
long queueSize = queue.getQueueSize(); // long queueSize = queue.getQueueSize();
compdatalist = queue.browse(); // compdatalist = queue.browse();
int actualCount = compdatalist.length; // int actualCount = compdatalist.length;
echo("Orginal queue size is now " + queueSize); // echo("Orginal queue size is now " + queueSize);
echo("Original browse queue size: " + actualCount); // echo("Original browse queue size: " + actualCount);
//
long dlqSize = dlq.getQueueSize(); // long dlqSize = dlq.getQueueSize();
echo("DLQ size: " + dlqSize); // echo("DLQ size: " + dlqSize);
//
assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize); // assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize);
assertEquals("queue size", initialQueueSize, queueSize); // assertEquals("queue size", initialQueueSize, queueSize);
assertEquals("browse queue size", initialQueueSize, actualCount); // assertEquals("browse queue size", initialQueueSize, actualCount);
//
assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage()); // assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage());
} // }
//
public void testMoveMessagesBySelector() throws Exception { // public void testMoveMessagesBySelector() throws Exception {
connection = connectionFactory.createConnection(); // connection = connectionFactory.createConnection();
useConnection(connection); // useConnection(connection);
//
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); // ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
//
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); // QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
//
String newDestination = getSecondDestinationString(); // String newDestination = getSecondDestinationString();
queue.moveMatchingMessagesTo("counter > 2", newDestination); // queue.moveMatchingMessagesTo("counter > 2", newDestination);
//
queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); // queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
//
queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); // queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
int movedSize = MESSAGE_COUNT-3; // int movedSize = MESSAGE_COUNT-3;
assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize()); // assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize());
//
// now lets remove them by selector // // now lets remove them by selector
queue.removeMatchingMessages("counter > 2"); // queue.removeMatchingMessages("counter > 2");
//
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); // assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); // assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
} // }
//
public void testCopyMessagesBySelector() throws Exception { // public void testCopyMessagesBySelector() throws Exception {
connection = connectionFactory.createConnection(); // connection = connectionFactory.createConnection();
useConnection(connection); // useConnection(connection);
//
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); // ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
//
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); // QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
//
String newDestination = getSecondDestinationString(); // String newDestination = getSecondDestinationString();
long queueSize = queue.getQueueSize(); // long queueSize = queue.getQueueSize();
queue.copyMatchingMessagesTo("counter > 2", newDestination); // queue.copyMatchingMessagesTo("counter > 2", newDestination);
//
//
//
queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); // queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost");
//
queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); // queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
//
LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)"); // LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)");
assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize()); // assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize());
// now lets remove them by selector // // now lets remove them by selector
queue.removeMatchingMessages("counter > 2"); // queue.removeMatchingMessages("counter > 2");
//
assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); // assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize());
assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); // assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage());
} // }
protected void assertSendViaMBean() throws Exception { protected void assertSendViaMBean() throws Exception {
@ -614,6 +614,8 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertEquals("topic2 Producer count", 0, topic2.getProducerCount()); assertEquals("topic2 Producer count", 0, topic2.getProducerCount());
MessageProducer producer4 = session.createProducer(null); MessageProducer producer4 = session.createProducer(null);
Thread.sleep(500);
assertEquals(1, broker.getDynamicDestinationProducers().length);
producer4.close(); producer4.close();
Thread.sleep(500); Thread.sleep(500);
@ -737,104 +739,157 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
return "test.new.destination." + getClass() + "." + getName(); return "test.new.destination." + getClass() + "." + getName();
} }
public void testDynamicProducerView() throws Exception {
public void testTempQueueJMXDelete() throws Exception {
connection = connectionFactory.createConnection(); connection = connectionFactory.createConnection();
connection.setClientID(clientID); ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
connection.start(); BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
Session session = connection.createSession(transacted, authMode);
ActiveMQTempQueue tQueue = (ActiveMQTempQueue) session.createTemporaryQueue();
Thread.sleep(1000);
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type="+ JMXSupport.encodeObjectNamePart(tQueue.getDestinationTypeAsString())+",Destination=" + JMXSupport.encodeObjectNamePart(tQueue.getPhysicalName()) + ",BrokerName=localhost");
// should not throw an exception assertTrue("broker is not a slave", !broker.isSlave());
mbeanServer.getObjectInstance(queueViewMBeanName); assertEquals(0, broker.getDynamicDestinationProducers().length);
tQueue.delete();
Thread.sleep(1000);
try {
// should throw an exception
mbeanServer.getObjectInstance(queueViewMBeanName);
fail("should be deleted already!");
} catch (Exception e) {
// expected!
}
}
// Test for AMQ-3029
public void testBrowseBlobMessages() throws Exception {
connection = connectionFactory.createConnection();
useConnectionWithBlobMessage(connection);
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
CompositeData[] compdatalist = queue.browse();
int initialQueueSize = compdatalist.length;
if (initialQueueSize == 0) {
fail("There is no message in the queue:");
}
else {
echo("Current queue size: " + initialQueueSize);
}
int messageCount = initialQueueSize;
String[] messageIDs = new String[messageCount];
for (int i = 0; i < messageCount; i++) {
CompositeData cdata = compdatalist[i];
String messageID = (String) cdata.get("JMSMessageID");
assertNotNull("Should have a message ID for message " + i, messageID);
messageIDs[i] = messageID;
}
assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
}
public void testBrowseBytesMessages() throws Exception {
connection = connectionFactory.createConnection();
useConnectionWithByteMessage(connection);
ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
CompositeData[] compdatalist = queue.browse();
int initialQueueSize = compdatalist.length;
if (initialQueueSize == 0) {
fail("There is no message in the queue:");
}
else {
echo("Current queue size: " + initialQueueSize);
}
int messageCount = initialQueueSize;
String[] messageIDs = new String[messageCount];
for (int i = 0; i < messageCount; i++) {
CompositeData cdata = compdatalist[i];
String messageID = (String) cdata.get("JMSMessageID");
assertNotNull("Should have a message ID for message " + i, messageID);
messageIDs[i] = messageID;
Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW);
assertNotNull("should be a preview", preview);
assertTrue("not empty", preview.length > 0);
}
assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
// consume all the messages
echo("Attempting to consume all bytes messages from: " + destination);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(destination); MessageProducer producer = session.createProducer(null);
for (int i=0; i<MESSAGE_COUNT; i++) {
Message message = consumer.receive(5000); Destination dest1 = session.createTopic("DynamicDest-1");
assertNotNull(message); Destination dest2 = session.createTopic("DynamicDest-2");
assertTrue(message instanceof BytesMessage); Destination dest3 = session.createQueue("DynamicDest-3");
}
consumer.close(); // Wait a bit to let the producer get registered.
session.close(); Thread.sleep(100);
assertEquals(1, broker.getDynamicDestinationProducers().length);
ObjectName viewName = broker.getDynamicDestinationProducers()[0];
assertNotNull(viewName);
ProducerViewMBean view = (ProducerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, viewName, ProducerViewMBean.class, true);
assertNotNull(view);
assertEquals("NOTSET", view.getDestinationName());
producer.send(dest1, session.createTextMessage("Test Message 1"));
Thread.sleep(200);
assertEquals(((ActiveMQDestination)dest1).getPhysicalName(), view.getDestinationName());
assertTrue(view.isDestinationTopic());
assertFalse(view.isDestinationQueue());
assertFalse(view.isDestinationTemporary());
producer.send(dest2, session.createTextMessage("Test Message 2"));
Thread.sleep(200);
assertEquals(((ActiveMQDestination)dest2).getPhysicalName(), view.getDestinationName());
assertTrue(view.isDestinationTopic());
assertFalse(view.isDestinationQueue());
assertFalse(view.isDestinationTemporary());
producer.send(dest3, session.createTextMessage("Test Message 3"));
Thread.sleep(200);
assertEquals(((ActiveMQDestination)dest3).getPhysicalName(), view.getDestinationName());
assertTrue(view.isDestinationQueue());
assertFalse(view.isDestinationTopic());
assertFalse(view.isDestinationTemporary());
producer.close();
Thread.sleep(200);
assertEquals(0, broker.getDynamicDestinationProducers().length);
} }
// public void testTempQueueJMXDelete() throws Exception {
// connection = connectionFactory.createConnection();
//
// connection.setClientID(clientID);
// connection.start();
// Session session = connection.createSession(transacted, authMode);
// ActiveMQTempQueue tQueue = (ActiveMQTempQueue) session.createTemporaryQueue();
// Thread.sleep(1000);
// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type="+ JMXSupport.encodeObjectNamePart(tQueue.getDestinationTypeAsString())+",Destination=" + JMXSupport.encodeObjectNamePart(tQueue.getPhysicalName()) + ",BrokerName=localhost");
//
// // should not throw an exception
// mbeanServer.getObjectInstance(queueViewMBeanName);
//
// tQueue.delete();
// Thread.sleep(1000);
// try {
// // should throw an exception
// mbeanServer.getObjectInstance(queueViewMBeanName);
//
// fail("should be deleted already!");
// } catch (Exception e) {
// // expected!
// }
//
// }
//
// // Test for AMQ-3029
// public void testBrowseBlobMessages() throws Exception {
// connection = connectionFactory.createConnection();
// useConnectionWithBlobMessage(connection);
//
// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
//
// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
//
// CompositeData[] compdatalist = queue.browse();
// int initialQueueSize = compdatalist.length;
// if (initialQueueSize == 0) {
// fail("There is no message in the queue:");
// }
// else {
// echo("Current queue size: " + initialQueueSize);
// }
// int messageCount = initialQueueSize;
// String[] messageIDs = new String[messageCount];
// for (int i = 0; i < messageCount; i++) {
// CompositeData cdata = compdatalist[i];
// String messageID = (String) cdata.get("JMSMessageID");
// assertNotNull("Should have a message ID for message " + i, messageID);
//
// messageIDs[i] = messageID;
// }
//
// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
// }
//
// public void testBrowseBytesMessages() throws Exception {
// connection = connectionFactory.createConnection();
// useConnectionWithByteMessage(connection);
//
// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost");
//
// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
//
// CompositeData[] compdatalist = queue.browse();
// int initialQueueSize = compdatalist.length;
// if (initialQueueSize == 0) {
// fail("There is no message in the queue:");
// }
// else {
// echo("Current queue size: " + initialQueueSize);
// }
// int messageCount = initialQueueSize;
// String[] messageIDs = new String[messageCount];
// for (int i = 0; i < messageCount; i++) {
// CompositeData cdata = compdatalist[i];
// String messageID = (String) cdata.get("JMSMessageID");
// assertNotNull("Should have a message ID for message " + i, messageID);
// messageIDs[i] = messageID;
//
// Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW);
// assertNotNull("should be a preview", preview);
// assertTrue("not empty", preview.length > 0);
// }
//
// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0);
//
// // consume all the messages
// echo("Attempting to consume all bytes messages from: " + destination);
// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// MessageConsumer consumer = session.createConsumer(destination);
// for (int i=0; i<MESSAGE_COUNT; i++) {
// Message message = consumer.receive(5000);
// assertNotNull(message);
// assertTrue(message instanceof BytesMessage);
// }
// consumer.close();
// session.close();
// }
} }