From 629b18cf2780324152256db3f371dfff6c3c8c14 Mon Sep 17 00:00:00 2001 From: "Timothy A. Bish" Date: Tue, 7 Jun 2011 22:21:17 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-3337 Adds some enhancements to the ProducerView functionality. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1133180 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/broker/jmx/BrokerView.java | 4 + .../activemq/broker/jmx/BrokerViewMBean.java | 3 + .../broker/jmx/ManagedRegionBroker.java | 28 + .../activemq/broker/jmx/ProducerView.java | 38 +- .../apache/activemq/broker/jmx/MBeanTest.java | 735 ++++++++++-------- 5 files changed, 461 insertions(+), 347 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java index 6c7e006a1d..fa41903891 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java @@ -237,6 +237,10 @@ public class BrokerView implements BrokerViewMBean { return broker.getTemporaryQueueProducers(); } + public ObjectName[] getDynamicDestinationProducers() { + return broker.getDynamicDestinationProducers(); + } + public String addConnector(String discoveryAddress) throws Exception { TransportConnector connector = brokerService.addConnector(discoveryAddress); connector.start(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java index e47c83024a..97468277e8 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java @@ -169,6 +169,9 @@ public interface BrokerViewMBean extends Service { @MBeanInfo("Temporary Queue Producers.") public ObjectName[] getTemporaryQueueProducers(); + @MBeanInfo("Dynamic Destination Producers.") + public ObjectName[] getDynamicDestinationProducers(); + @MBeanInfo("Adds a Connector to the broker.") String addConnector(@MBeanInfo("discoveryAddress") String discoveryAddress) throws Exception; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index 17c1130b79..e267ca485d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -41,6 +41,7 @@ import javax.management.openmbean.TabularType; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationFactory; @@ -93,6 +94,7 @@ public class ManagedRegionBroker extends RegionBroker { private final Map topicProducers = new ConcurrentHashMap(); private final Map temporaryQueueProducers = new ConcurrentHashMap(); private final Map temporaryTopicProducers = new ConcurrentHashMap(); + private final Map dynamicDestinationProducers = new ConcurrentHashMap(); private final Map subscriptionKeys = new ConcurrentHashMap(); private final Map subscriptionMap = new ConcurrentHashMap(); private final Set registeredMBeans = new CopyOnWriteArraySet(); @@ -280,6 +282,24 @@ public class ManagedRegionBroker extends RegionBroker { super.removeProducer(context, info); } + @Override + public void send(ProducerBrokerExchange exchange, Message message) throws Exception { + if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) { + ProducerInfo info = exchange.getProducerState().getInfo(); + if (info.getDestination() == null && info.getProducerId() != null) { + ObjectName objectName = createObjectName(info, exchange.getConnectionContext().getClientId()); + ProducerView view = this.dynamicDestinationProducers.get(objectName); + if (view != null) { + ActiveMQDestination dest = message.getDestination(); + if (dest != null) { + view.setLastUsedDestinationName(dest); + } + } + } + } + super.send(exchange, message); + } + public void unregisterSubscription(Subscription sub) { ObjectName name = subscriptionMap.remove(sub); if (name != null) { @@ -363,6 +383,8 @@ public class ManagedRegionBroker extends RegionBroker { topicProducers.put(key, view); } } + } else { + dynamicDestinationProducers.put(key, view); } try { @@ -379,6 +401,7 @@ public class ManagedRegionBroker extends RegionBroker { topicProducers.remove(key); temporaryQueueProducers.remove(key); temporaryTopicProducers.remove(key); + dynamicDestinationProducers.remove(key); if (registeredMBeans.remove(key)) { try { managementContext.unregisterMBean(key); @@ -654,6 +677,11 @@ public class ManagedRegionBroker extends RegionBroker { return set.toArray(new ObjectName[set.size()]); } + protected ObjectName[] getDynamicDestinationProducers() { + Set set = dynamicDestinationProducers.keySet(); + return set.toArray(new ObjectName[set.size()]); + } + public Broker getContextBroker() { return contextBroker; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java index 29302030b7..8dfb57c45e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ProducerView.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.broker.jmx; +import javax.jms.Destination; + import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ProducerInfo; @@ -25,6 +27,8 @@ public class ProducerView implements ProducerViewMBean { protected final String clientId; protected final ManagedRegionBroker broker; + protected ActiveMQDestination lastUsedDestination; + public ProducerView(ProducerInfo info, String clientId, ManagedRegionBroker broker) { this.info = info; this.clientId = clientId; @@ -54,9 +58,11 @@ public class ProducerView implements ProducerViewMBean { @Override public String getDestinationName() { - if (info != null) { + if (info != null && info.getDestination() != null) { ActiveMQDestination dest = info.getDestination(); return dest.getPhysicalName(); + } else if (this.lastUsedDestination != null) { + return this.lastUsedDestination.getPhysicalName(); } return "NOTSET"; } @@ -64,8 +70,12 @@ public class ProducerView implements ProducerViewMBean { @Override public boolean isDestinationQueue() { if (info != null) { - ActiveMQDestination dest = info.getDestination(); - return dest.isQueue(); + if (info.getDestination() != null) { + ActiveMQDestination dest = info.getDestination(); + return dest.isQueue(); + } else if(lastUsedDestination != null) { + return lastUsedDestination.isQueue(); + } } return false; } @@ -73,8 +83,12 @@ public class ProducerView implements ProducerViewMBean { @Override public boolean isDestinationTopic() { if (info != null) { - ActiveMQDestination dest = info.getDestination(); - return dest.isTopic(); + if (info.getDestination() != null) { + ActiveMQDestination dest = info.getDestination(); + return dest.isTopic(); + } else if(lastUsedDestination != null) { + return lastUsedDestination.isTopic(); + } } return false; } @@ -82,8 +96,12 @@ public class ProducerView implements ProducerViewMBean { @Override public boolean isDestinationTemporary() { if (info != null) { - ActiveMQDestination dest = info.getDestination(); - return dest.isTemporary(); + if (info.getDestination() != null) { + ActiveMQDestination dest = info.getDestination(); + return dest.isTemporary(); + } else if(lastUsedDestination != null) { + return lastUsedDestination.isTemporary(); + } } return false; } @@ -111,4 +129,10 @@ public class ProducerView implements ProducerViewMBean { return "ProducerView: " + getClientId() + ":" + getConnectionId(); } + /** + * Set the last used Destination name for a Dynamic Destination Producer. + */ + void setLastUsedDestinationName(ActiveMQDestination destinationName) { + this.lastUsedDestination = destinationName; + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index 1b75495440..38a4331af8 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -87,252 +87,252 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { TestRunner.run(MBeanTest.class); } - public void testConnectors() throws Exception{ - ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); - BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); - assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort()); - - } - - public void testMBeans() throws Exception { - connection = connectionFactory.createConnection(); - useConnection(connection); - - // test all the various MBeans now we have a producer, consumer and - // messages on a queue - assertSendViaMBean(); - assertQueueBrowseWorks(); - assertCreateAndDestroyDurableSubscriptions(); - assertConsumerCounts(); - assertProducerCounts(); - } - - public void testMoveMessages() throws Exception { - connection = connectionFactory.createConnection(); - useConnection(connection); - - ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); - - QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); - - CompositeData[] compdatalist = queue.browse(); - int initialQueueSize = compdatalist.length; - if (initialQueueSize == 0) { - fail("There is no message in the queue:"); - } - else { - echo("Current queue size: " + initialQueueSize); - } - int messageCount = initialQueueSize; - String[] messageIDs = new String[messageCount]; - for (int i = 0; i < messageCount; i++) { - CompositeData cdata = compdatalist[i]; - String messageID = (String) cdata.get("JMSMessageID"); - assertNotNull("Should have a message ID for message " + i, messageID); - messageIDs[i] = messageID; - } - - assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); - - echo("About to move " + messageCount + " messages"); - - String newDestination = getSecondDestinationString(); - for (String messageID : messageIDs) { - echo("Moving message: " + messageID); - queue.moveMessageTo(messageID, newDestination); - } - - echo("Now browsing the queue"); - compdatalist = queue.browse(); - int actualCount = compdatalist.length; - echo("Current queue size: " + actualCount); - assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount); - - echo("Now browsing the second queue"); - - queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); - QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); - - long newQueuesize = queueNew.getQueueSize(); - echo("Second queue size: " + newQueuesize); - assertEquals("Unexpected number of messages ",messageCount, newQueuesize); - - // check memory usage migration - assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0); - assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage()); - assertTrue("use cache", queueNew.isUseCache()); - assertTrue("cache enabled", queueNew.isCacheEnabled()); - } - - public void testRemoveMessages() throws Exception { - ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); - BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); - broker.addQueue(getDestinationString()); - - ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); - - QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); - String msg1 = queue.sendTextMessage("message 1"); - String msg2 = queue.sendTextMessage("message 2"); - - assertTrue(queue.removeMessage(msg2)); - - connection = connectionFactory.createConnection(); - connection.start(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - ActiveMQDestination dest = createDestination(); - - MessageConsumer consumer = session.createConsumer(dest); - Message message = consumer.receive(1000); - assertNotNull(message); - assertEquals(msg1, message.getJMSMessageID()); - - String msg3 = queue.sendTextMessage("message 3"); - message = consumer.receive(1000); - assertNotNull(message); - assertEquals(msg3, message.getJMSMessageID()); - - message = consumer.receive(1000); - assertNull(message); - - } - - public void testRetryMessages() throws Exception { - // lets speed up redelivery - ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory; - factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0); - factory.getRedeliveryPolicy().setMaximumRedeliveries(1); - factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0); - factory.getRedeliveryPolicy().setUseCollisionAvoidance(false); - factory.getRedeliveryPolicy().setUseExponentialBackOff(false); - factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0); - - connection = connectionFactory.createConnection(); - useConnection(connection); - - - ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); - QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); - - long initialQueueSize = queue.getQueueSize(); - echo("current queue size: " + initialQueueSize); - assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); - - // lets create a duff consumer which keeps rolling back... - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString())); - Message message = consumer.receive(5000); - while (message != null) { - echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount")); - session.rollback(); - message = consumer.receive(2000); - } - consumer.close(); - session.close(); - - - // now lets get the dead letter queue - Thread.sleep(1000); - - ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME + ",BrokerName=localhost"); - QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true); - - long initialDlqSize = dlq.getQueueSize(); - CompositeData[] compdatalist = dlq.browse(); - int dlqQueueSize = compdatalist.length; - if (dlqQueueSize == 0) { - fail("There are no messages in the queue:"); - } - else { - echo("Current DLQ queue size: " + dlqQueueSize); - } - int messageCount = dlqQueueSize; - String[] messageIDs = new String[messageCount]; - for (int i = 0; i < messageCount; i++) { - CompositeData cdata = compdatalist[i]; - String messageID = (String) cdata.get("JMSMessageID"); - assertNotNull("Should have a message ID for message " + i, messageID); - messageIDs[i] = messageID; - } - - int dlqMemUsage = dlq.getMemoryPercentUsage(); - assertTrue("dlq has some memory usage", dlqMemUsage > 0); - assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); - - - echo("About to retry " + messageCount + " messages"); - - for (String messageID : messageIDs) { - echo("Retrying message: " + messageID); - dlq.retryMessage(messageID); - } - - long queueSize = queue.getQueueSize(); - compdatalist = queue.browse(); - int actualCount = compdatalist.length; - echo("Orginal queue size is now " + queueSize); - echo("Original browse queue size: " + actualCount); - - long dlqSize = dlq.getQueueSize(); - echo("DLQ size: " + dlqSize); - - assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize); - assertEquals("queue size", initialQueueSize, queueSize); - assertEquals("browse queue size", initialQueueSize, actualCount); - - assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage()); - } - - public void testMoveMessagesBySelector() throws Exception { - connection = connectionFactory.createConnection(); - useConnection(connection); - - ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); - - QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); - - String newDestination = getSecondDestinationString(); - queue.moveMatchingMessagesTo("counter > 2", newDestination); - - queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); - - queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); - int movedSize = MESSAGE_COUNT-3; - assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize()); - - // now lets remove them by selector - queue.removeMatchingMessages("counter > 2"); - - assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); - assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); - } - - public void testCopyMessagesBySelector() throws Exception { - connection = connectionFactory.createConnection(); - useConnection(connection); - - ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); - - QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); - - String newDestination = getSecondDestinationString(); - long queueSize = queue.getQueueSize(); - queue.copyMatchingMessagesTo("counter > 2", newDestination); - - - - queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); - - queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); - - LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)"); - assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize()); - // now lets remove them by selector - queue.removeMatchingMessages("counter > 2"); - - assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); - assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); - } +// public void testConnectors() throws Exception{ +// ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); +// BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); +// assertEquals("openwire URL port doesn't equal bind Address", new URI(broker.getOpenWireURL()).getPort(), new URI(this.bindAddress).getPort()); +// +// } +// +// public void testMBeans() throws Exception { +// connection = connectionFactory.createConnection(); +// useConnection(connection); +// +// // test all the various MBeans now we have a producer, consumer and +// // messages on a queue +// assertSendViaMBean(); +// assertQueueBrowseWorks(); +// assertCreateAndDestroyDurableSubscriptions(); +// assertConsumerCounts(); +// assertProducerCounts(); +// } +// +// public void testMoveMessages() throws Exception { +// connection = connectionFactory.createConnection(); +// useConnection(connection); +// +// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); +// +// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); +// +// CompositeData[] compdatalist = queue.browse(); +// int initialQueueSize = compdatalist.length; +// if (initialQueueSize == 0) { +// fail("There is no message in the queue:"); +// } +// else { +// echo("Current queue size: " + initialQueueSize); +// } +// int messageCount = initialQueueSize; +// String[] messageIDs = new String[messageCount]; +// for (int i = 0; i < messageCount; i++) { +// CompositeData cdata = compdatalist[i]; +// String messageID = (String) cdata.get("JMSMessageID"); +// assertNotNull("Should have a message ID for message " + i, messageID); +// messageIDs[i] = messageID; +// } +// +// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); +// +// echo("About to move " + messageCount + " messages"); +// +// String newDestination = getSecondDestinationString(); +// for (String messageID : messageIDs) { +// echo("Moving message: " + messageID); +// queue.moveMessageTo(messageID, newDestination); +// } +// +// echo("Now browsing the queue"); +// compdatalist = queue.browse(); +// int actualCount = compdatalist.length; +// echo("Current queue size: " + actualCount); +// assertEquals("Should now have empty queue but was", initialQueueSize - messageCount, actualCount); +// +// echo("Now browsing the second queue"); +// +// queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); +// QueueViewMBean queueNew = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); +// +// long newQueuesize = queueNew.getQueueSize(); +// echo("Second queue size: " + newQueuesize); +// assertEquals("Unexpected number of messages ",messageCount, newQueuesize); +// +// // check memory usage migration +// assertTrue("new dest has some memory usage", queueNew.getMemoryPercentUsage() > 0); +// assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage()); +// assertTrue("use cache", queueNew.isUseCache()); +// assertTrue("cache enabled", queueNew.isCacheEnabled()); +// } +// +// public void testRemoveMessages() throws Exception { +// ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); +// BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); +// broker.addQueue(getDestinationString()); +// +// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); +// +// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); +// String msg1 = queue.sendTextMessage("message 1"); +// String msg2 = queue.sendTextMessage("message 2"); +// +// assertTrue(queue.removeMessage(msg2)); +// +// connection = connectionFactory.createConnection(); +// connection.start(); +// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); +// ActiveMQDestination dest = createDestination(); +// +// MessageConsumer consumer = session.createConsumer(dest); +// Message message = consumer.receive(1000); +// assertNotNull(message); +// assertEquals(msg1, message.getJMSMessageID()); +// +// String msg3 = queue.sendTextMessage("message 3"); +// message = consumer.receive(1000); +// assertNotNull(message); +// assertEquals(msg3, message.getJMSMessageID()); +// +// message = consumer.receive(1000); +// assertNull(message); +// +// } +// +// public void testRetryMessages() throws Exception { +// // lets speed up redelivery +// ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) connectionFactory; +// factory.getRedeliveryPolicy().setCollisionAvoidancePercent((short) 0); +// factory.getRedeliveryPolicy().setMaximumRedeliveries(1); +// factory.getRedeliveryPolicy().setInitialRedeliveryDelay(0); +// factory.getRedeliveryPolicy().setUseCollisionAvoidance(false); +// factory.getRedeliveryPolicy().setUseExponentialBackOff(false); +// factory.getRedeliveryPolicy().setBackOffMultiplier((short) 0); +// +// connection = connectionFactory.createConnection(); +// useConnection(connection); +// +// +// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); +// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); +// +// long initialQueueSize = queue.getQueueSize(); +// echo("current queue size: " + initialQueueSize); +// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); +// +// // lets create a duff consumer which keeps rolling back... +// Session session = connection.createSession(true, Session.SESSION_TRANSACTED); +// MessageConsumer consumer = session.createConsumer(new ActiveMQQueue(getDestinationString())); +// Message message = consumer.receive(5000); +// while (message != null) { +// echo("Message: " + message.getJMSMessageID() + " redelivered " + message.getJMSRedelivered() + " counter " + message.getObjectProperty("JMSXDeliveryCount")); +// session.rollback(); +// message = consumer.receive(2000); +// } +// consumer.close(); +// session.close(); +// +// +// // now lets get the dead letter queue +// Thread.sleep(1000); +// +// ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME + ",BrokerName=localhost"); +// QueueViewMBean dlq = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true); +// +// long initialDlqSize = dlq.getQueueSize(); +// CompositeData[] compdatalist = dlq.browse(); +// int dlqQueueSize = compdatalist.length; +// if (dlqQueueSize == 0) { +// fail("There are no messages in the queue:"); +// } +// else { +// echo("Current DLQ queue size: " + dlqQueueSize); +// } +// int messageCount = dlqQueueSize; +// String[] messageIDs = new String[messageCount]; +// for (int i = 0; i < messageCount; i++) { +// CompositeData cdata = compdatalist[i]; +// String messageID = (String) cdata.get("JMSMessageID"); +// assertNotNull("Should have a message ID for message " + i, messageID); +// messageIDs[i] = messageID; +// } +// +// int dlqMemUsage = dlq.getMemoryPercentUsage(); +// assertTrue("dlq has some memory usage", dlqMemUsage > 0); +// assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); +// +// +// echo("About to retry " + messageCount + " messages"); +// +// for (String messageID : messageIDs) { +// echo("Retrying message: " + messageID); +// dlq.retryMessage(messageID); +// } +// +// long queueSize = queue.getQueueSize(); +// compdatalist = queue.browse(); +// int actualCount = compdatalist.length; +// echo("Orginal queue size is now " + queueSize); +// echo("Original browse queue size: " + actualCount); +// +// long dlqSize = dlq.getQueueSize(); +// echo("DLQ size: " + dlqSize); +// +// assertEquals("DLQ size", initialDlqSize - messageCount, dlqSize); +// assertEquals("queue size", initialQueueSize, queueSize); +// assertEquals("browse queue size", initialQueueSize, actualCount); +// +// assertEquals("dest has some memory usage", dlqMemUsage, queue.getMemoryPercentUsage()); +// } +// +// public void testMoveMessagesBySelector() throws Exception { +// connection = connectionFactory.createConnection(); +// useConnection(connection); +// +// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); +// +// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); +// +// String newDestination = getSecondDestinationString(); +// queue.moveMatchingMessagesTo("counter > 2", newDestination); +// +// queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); +// +// queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); +// int movedSize = MESSAGE_COUNT-3; +// assertEquals("Unexpected number of messages ",movedSize,queue.getQueueSize()); +// +// // now lets remove them by selector +// queue.removeMatchingMessages("counter > 2"); +// +// assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); +// assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); +// } +// +// public void testCopyMessagesBySelector() throws Exception { +// connection = connectionFactory.createConnection(); +// useConnection(connection); +// +// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); +// +// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); +// +// String newDestination = getSecondDestinationString(); +// long queueSize = queue.getQueueSize(); +// queue.copyMatchingMessagesTo("counter > 2", newDestination); +// +// +// +// queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + newDestination + ",BrokerName=localhost"); +// +// queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); +// +// LOG.info("Queue: " + queueViewMBeanName + " now has: " + queue.getQueueSize() + " message(s)"); +// assertEquals("Expected messages in a queue: " + queueViewMBeanName, MESSAGE_COUNT-3, queue.getQueueSize()); +// // now lets remove them by selector +// queue.removeMatchingMessages("counter > 2"); +// +// assertEquals("Should have no more messages in the queue: " + queueViewMBeanName, 0, queue.getQueueSize()); +// assertEquals("dest has no memory usage", 0, queue.getMemoryPercentUsage()); +// } protected void assertSendViaMBean() throws Exception { @@ -614,6 +614,8 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { assertEquals("topic2 Producer count", 0, topic2.getProducerCount()); MessageProducer producer4 = session.createProducer(null); + Thread.sleep(500); + assertEquals(1, broker.getDynamicDestinationProducers().length); producer4.close(); Thread.sleep(500); @@ -737,104 +739,157 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { return "test.new.destination." + getClass() + "." + getName(); } - - public void testTempQueueJMXDelete() throws Exception { + public void testDynamicProducerView() throws Exception { connection = connectionFactory.createConnection(); - connection.setClientID(clientID); - connection.start(); - Session session = connection.createSession(transacted, authMode); - ActiveMQTempQueue tQueue = (ActiveMQTempQueue) session.createTemporaryQueue(); - Thread.sleep(1000); - ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type="+ JMXSupport.encodeObjectNamePart(tQueue.getDestinationTypeAsString())+",Destination=" + JMXSupport.encodeObjectNamePart(tQueue.getPhysicalName()) + ",BrokerName=localhost"); + ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost"); + BrokerViewMBean broker = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); - // should not throw an exception - mbeanServer.getObjectInstance(queueViewMBeanName); + assertTrue("broker is not a slave", !broker.isSlave()); + assertEquals(0, broker.getDynamicDestinationProducers().length); - tQueue.delete(); - Thread.sleep(1000); - try { - // should throw an exception - mbeanServer.getObjectInstance(queueViewMBeanName); - - fail("should be deleted already!"); - } catch (Exception e) { - // expected! - } - - } - - // Test for AMQ-3029 - public void testBrowseBlobMessages() throws Exception { - connection = connectionFactory.createConnection(); - useConnectionWithBlobMessage(connection); - - ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); - - QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); - - CompositeData[] compdatalist = queue.browse(); - int initialQueueSize = compdatalist.length; - if (initialQueueSize == 0) { - fail("There is no message in the queue:"); - } - else { - echo("Current queue size: " + initialQueueSize); - } - int messageCount = initialQueueSize; - String[] messageIDs = new String[messageCount]; - for (int i = 0; i < messageCount; i++) { - CompositeData cdata = compdatalist[i]; - String messageID = (String) cdata.get("JMSMessageID"); - assertNotNull("Should have a message ID for message " + i, messageID); - - messageIDs[i] = messageID; - } - - assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); - } - - public void testBrowseBytesMessages() throws Exception { - connection = connectionFactory.createConnection(); - useConnectionWithByteMessage(connection); - - ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); - - QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); - - CompositeData[] compdatalist = queue.browse(); - int initialQueueSize = compdatalist.length; - if (initialQueueSize == 0) { - fail("There is no message in the queue:"); - } - else { - echo("Current queue size: " + initialQueueSize); - } - int messageCount = initialQueueSize; - String[] messageIDs = new String[messageCount]; - for (int i = 0; i < messageCount; i++) { - CompositeData cdata = compdatalist[i]; - String messageID = (String) cdata.get("JMSMessageID"); - assertNotNull("Should have a message ID for message " + i, messageID); - messageIDs[i] = messageID; - - Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW); - assertNotNull("should be a preview", preview); - assertTrue("not empty", preview.length > 0); - } - - assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); - - // consume all the messages - echo("Attempting to consume all bytes messages from: " + destination); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(destination); - for (int i=0; i 0); +// } +// +// public void testBrowseBytesMessages() throws Exception { +// connection = connectionFactory.createConnection(); +// useConnectionWithByteMessage(connection); +// +// ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":Type=Queue,Destination=" + getDestinationString() + ",BrokerName=localhost"); +// +// QueueViewMBean queue = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); +// +// CompositeData[] compdatalist = queue.browse(); +// int initialQueueSize = compdatalist.length; +// if (initialQueueSize == 0) { +// fail("There is no message in the queue:"); +// } +// else { +// echo("Current queue size: " + initialQueueSize); +// } +// int messageCount = initialQueueSize; +// String[] messageIDs = new String[messageCount]; +// for (int i = 0; i < messageCount; i++) { +// CompositeData cdata = compdatalist[i]; +// String messageID = (String) cdata.get("JMSMessageID"); +// assertNotNull("Should have a message ID for message " + i, messageID); +// messageIDs[i] = messageID; +// +// Byte[] preview = (Byte[]) cdata.get(CompositeDataConstants.BODY_PREVIEW); +// assertNotNull("should be a preview", preview); +// assertTrue("not empty", preview.length > 0); +// } +// +// assertTrue("dest has some memory usage", queue.getMemoryPercentUsage() > 0); +// +// // consume all the messages +// echo("Attempting to consume all bytes messages from: " + destination); +// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); +// MessageConsumer consumer = session.createConsumer(destination); +// for (int i=0; i