diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/JMSQueueControl.java b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/JMSQueueControl.java
index 8e9b270f21..7aa2912137 100644
--- a/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/JMSQueueControl.java
+++ b/activemq-jms-client/src/main/java/org/apache/activemq/api/jms/management/JMSQueueControl.java
@@ -275,4 +275,35 @@ public interface JMSQueueControl extends DestinationControl
*/
void flushExecutor();
+ /**
+ * Lists all the messages scheduled for delivery for this queue.
+ *
+ * 1 Map represents 1 message, keys are the message's properties and headers, values are the corresponding values.
+ */
+ @Operation(desc = "List the messages scheduled for delivery", impact = MBeanOperationInfo.INFO)
+ Map[] listScheduledMessages() throws Exception;
+
+ /**
+ * Lists all the messages scheduled for delivery for this queue using JSON serialization.
+ */
+ @Operation(desc = "List the messages scheduled for delivery and returns them using JSON", impact = MBeanOperationInfo.INFO)
+ String listScheduledMessagesAsJSON() throws Exception;
+
+ /**
+ * Lists all the messages being deliver per consumer.
+ *
+ * The Map's key is a toString representation for the consumer. Each consumer will then return a Map[] same way is returned by {@link #listScheduledMessages()}
+ */
+ @Operation(desc = "List all messages being delivered per consumer")
+ Map[]> listDeliveringMessages() throws Exception;
+
+ /**
+ * Executes a conversion of {@link #listDeliveringMessages()} to JSON
+ *
+ * @return
+ * @throws Exception
+ */
+ @Operation(desc = "list all messages being delivered per consumer using JSON form")
+ String listDeliveringMessagesAsJSON() throws Exception;
+
}
diff --git a/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSQueueControlImpl.java b/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSQueueControlImpl.java
index 2b7a4dcfef..6ae572bdc9 100644
--- a/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSQueueControlImpl.java
+++ b/activemq-jms-server/src/main/java/org/apache/activemq/jms/management/impl/JMSQueueControlImpl.java
@@ -18,6 +18,7 @@ package org.apache.activemq.jms.management.impl;
import javax.management.MBeanInfo;
import javax.management.StandardMBean;
+import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.api.core.ActiveMQException;
@@ -183,16 +184,7 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
Map[] coreMessages = coreQueueControl.listMessages(filter);
- Map[] jmsMessages = new Map[coreMessages.length];
-
- int i = 0;
-
- for (Map coreMessage : coreMessages)
- {
- Map jmsMessage = ActiveMQMessage.coreMaptoJMSMap(coreMessage);
- jmsMessages[i++] = jmsMessage;
- }
- return jmsMessages;
+ return toJMSMap(coreMessages);
}
catch (ActiveMQException e)
{
@@ -200,6 +192,64 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
}
}
+ private Map[] toJMSMap(Map[] coreMessages)
+ {
+ Map[] jmsMessages = new Map[coreMessages.length];
+
+ int i = 0;
+
+ for (Map coreMessage : coreMessages)
+ {
+ Map jmsMessage = ActiveMQMessage.coreMaptoJMSMap(coreMessage);
+ jmsMessages[i++] = jmsMessage;
+ }
+ return jmsMessages;
+ }
+
+ @Override
+ public Map[] listScheduledMessages() throws Exception
+ {
+ Map[] coreMessages = coreQueueControl.listScheduledMessages();
+
+ return toJMSMap(coreMessages);
+ }
+
+ @Override
+ public String listScheduledMessagesAsJSON() throws Exception
+ {
+ return coreQueueControl.listScheduledMessagesAsJSON();
+ }
+
+ @Override
+ public Map[]> listDeliveringMessages() throws Exception
+ {
+ try
+ {
+ Map[]> returnMap = new HashMap[]>();
+
+
+ // the workingMap from the queue-control
+ Map[]> workingMap = coreQueueControl.listDeliveringMessages();
+
+ for (Map.Entry[]> entry : workingMap.entrySet())
+ {
+ returnMap.put(entry.getKey(), toJMSMap(entry.getValue()));
+ }
+
+ return returnMap;
+ }
+ catch (ActiveMQException e)
+ {
+ throw new IllegalStateException(e.getMessage());
+ }
+ }
+
+ @Override
+ public String listDeliveringMessagesAsJSON() throws Exception
+ {
+ return coreQueueControl.listDeliveringMessagesAsJSON();
+ }
+
public String listMessagesAsJSON(final String filter) throws Exception
{
return JMSQueueControlImpl.toJSON(listMessages(filter));
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/ConsumerStuckTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/ConsumerStuckTest.java
index 5587b27b09..6ee29241a6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/ConsumerStuckTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/ConsumerStuckTest.java
@@ -134,16 +134,14 @@ public class ConsumerStuckTest extends ServiceTestBase
long timeout = System.currentTimeMillis() + 20000;
- while (System.currentTimeMillis() < timeout && server.getSessions().size() != 0)
+ long timeStart = System.currentTimeMillis();
+
+ while (timeout > System.currentTimeMillis() && server.getSessions().size() != 0 && server.getConnectionCount() != 0)
{
Thread.sleep(10);
}
- System.out.println("Size = " + server.getConnectionCount());
-
- System.out.println("sessions = " + server.getSessions().size());
-
-
+ System.out.println("Time = " + System.currentTimeMillis() + " time diff = " + (System.currentTimeMillis() - timeStart) + ", connections Size = " + server.getConnectionCount() + " sessions = " + server.getSessions().size());
if (server.getSessions().size() != 0)
{
@@ -151,14 +149,16 @@ public class ConsumerStuckTest extends ServiceTestBase
fail("The cleanup wasn't able to finish cleaning the session. It's probably stuck, look at the thread dump generated by the test for more information");
}
+ System.out.println("Size = " + server.getConnectionCount());
- timeout = System.currentTimeMillis() + 20000;
+ System.out.println("sessions = " + server.getSessions().size());
- while (System.currentTimeMillis() < timeout && server.getConnectionCount() != 0)
+
+ if (server.getSessions().size() != 0)
{
- Thread.sleep(10);
+ System.out.println(threadDump("Thread dump"));
+ fail("The cleanup wasn't able to finish cleaning the session. It's probably stuck, look at the thread dump generated by the test for more information");
}
-
assertEquals(0, server.getConnectionCount());
}
finally
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/server/management/JMSQueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/server/management/JMSQueueControlTest.java
index 5f345e0136..effeb65fff 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/server/management/JMSQueueControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/server/management/JMSQueueControlTest.java
@@ -52,6 +52,7 @@ import org.apache.activemq.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.core.settings.impl.AddressSettings;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.jms.client.ActiveMQDestination;
+import org.apache.activemq.jms.client.ActiveMQJMSConnectionFactory;
import org.apache.activemq.jms.client.ActiveMQQueue;
import org.apache.activemq.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.jms.server.management.JMSNotificationType;
@@ -157,6 +158,56 @@ public class JMSQueueControlTest extends ManagementTestBase
Assert.assertEquals(0, data.length);
}
+ @Test
+ public void testListDeliveringMessages() throws Exception
+ {
+ JMSQueueControl queueControl = createManagementControl();
+
+ Assert.assertEquals(0, queueControl.getMessageCount());
+
+ String[] ids = JMSUtil.sendMessages(queue, 20);
+
+ ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory)ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
+ new TransportConfiguration(InVMConnectorFactory.class.getName()));
+
+ Connection conn = cf.createConnection();
+ conn.start();
+ Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+
+ for (int i = 0; i < 20; i++)
+ {
+ Assert.assertNotNull(consumer.receive(5000));
+ }
+
+ Assert.assertEquals(20, queueControl.getMessageCount());
+
+ Map[]> deliverings = queueControl.listDeliveringMessages();
+
+ // Just one consumer.. so just one queue
+ Assert.assertEquals(1, deliverings.size());
+
+
+ for (Map.Entry[]> deliveryEntry : deliverings.entrySet())
+ {
+ System.out.println("Key:" + deliveryEntry.getKey());
+
+ for (int i = 0; i < 20; i++)
+ {
+ Assert.assertEquals(ids[i], deliveryEntry.getValue()[i].get("JMSMessageID").toString());
+ }
+ }
+
+
+
+ session.rollback();
+ session.close();
+
+ JMSUtil.consumeMessages(20, queue);
+ }
+
@Test
public void testListMessagesAsJSONWithNullFilter() throws Exception
{
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
index 3921cb145b..9adc930776 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
@@ -32,6 +32,8 @@ import org.apache.activemq.api.jms.management.JMSQueueControl;
import org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.jms.client.ActiveMQQueue;
+import org.junit.Ignore;
+import org.junit.Test;
/**
*
@@ -82,6 +84,15 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest
super.tearDown();
}
+ @Ignore
+ @Override
+ @Test
+ public void testListDeliveringMessages() throws Exception
+ {
+ // I'm not implementing the required proxy for this test on this JMS test
+ }
+
+
@Override
protected JMSQueueControl createManagementControl() throws Exception
{
@@ -197,6 +208,30 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest
return (String)proxy.invokeOperation("listMessageCounterHistory");
}
+ @Override
+ public Map[] listScheduledMessages() throws Exception
+ {
+ return null;
+ }
+
+ @Override
+ public String listScheduledMessagesAsJSON() throws Exception
+ {
+ return null;
+ }
+
+ @Override
+ public Map[]> listDeliveringMessages() throws Exception
+ {
+ return null;
+ }
+
+ @Override
+ public String listDeliveringMessagesAsJSON() throws Exception
+ {
+ return null;
+ }
+
public String listMessageCounterHistoryAsHTML() throws Exception
{
return (String)proxy.invokeOperation("listMessageCounterHistoryAsHTML");
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/ra/ActiveMQMessageHandlerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/ra/ActiveMQMessageHandlerTest.java
index cdf90f7bd8..3902470953 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/ra/ActiveMQMessageHandlerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/ra/ActiveMQMessageHandlerTest.java
@@ -99,7 +99,7 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
spec.setDestination(MDBQUEUE);
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
CountDownLatch latch = new CountDownLatch(15);
- MultipleEndpoints endpoint = new MultipleEndpoints(latch, false);
+ MultipleEndpoints endpoint = new MultipleEndpoints(latch, null, false);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
qResourceAdapter.endpointActivation(endpointFactory, spec);
ClientSession session = locator.createSessionFactory().createSession();
@@ -132,7 +132,8 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
spec.setDestination(MDBQUEUE);
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
CountDownLatch latch = new CountDownLatch(SIZE);
- MultipleEndpoints endpoint = new MultipleEndpoints(latch, true);
+ CountDownLatch latchDone = new CountDownLatch(SIZE);
+ MultipleEndpoints endpoint = new MultipleEndpoints(latch, latchDone, true);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
qResourceAdapter.endpointActivation(endpointFactory, spec);
ClientSession session = locator.createSessionFactory().createSession();
@@ -148,6 +149,8 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+ latchDone.await(5, TimeUnit.SECONDS);
+
assertEquals(SIZE, endpoint.messages.intValue());
assertEquals(0, endpoint.interrupted.intValue());
@@ -169,7 +172,8 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
spec.setDestination(MDBQUEUE);
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
CountDownLatch latch = new CountDownLatch(SIZE);
- MultipleEndpoints endpoint = new MultipleEndpoints(latch, true);
+ CountDownLatch latchDone = new CountDownLatch(SIZE);
+ MultipleEndpoints endpoint = new MultipleEndpoints(latch, latchDone, true);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
qResourceAdapter.endpointActivation(endpointFactory, spec);
ClientSession session = locator.createSessionFactory().createSession();
@@ -185,6 +189,8 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+ latchDone.await(5, TimeUnit.SECONDS);
+
assertEquals(SIZE, endpoint.messages.intValue());
//half onmessage interrupted
assertEquals(SIZE / 2, endpoint.interrupted.intValue());
@@ -856,14 +862,16 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
class MultipleEndpoints extends DummyMessageEndpoint
{
private final CountDownLatch latch;
+ private final CountDownLatch latchDone;
private final boolean pause;
AtomicInteger messages = new AtomicInteger(0);
AtomicInteger interrupted = new AtomicInteger(0);
- public MultipleEndpoints(CountDownLatch latch, boolean pause)
+ public MultipleEndpoints(CountDownLatch latch, CountDownLatch latchDone, boolean pause)
{
super(latch);
this.latch = latch;
+ this.latchDone = latchDone;
this.pause = pause;
}
@@ -888,17 +896,27 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
@Override
public void onMessage(Message message)
{
- latch.countDown();
- if (pause && messages.getAndIncrement() % 2 == 0)
+ try
{
- try
+ latch.countDown();
+ if (pause && messages.getAndIncrement() % 2 == 0)
{
- IntegrationTestLogger.LOGGER.info("pausing for 2 secs");
- Thread.sleep(2000);
+ try
+ {
+ System.out.println("pausing for 2 secs");
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e)
+ {
+ interrupted.incrementAndGet();
+ }
}
- catch (InterruptedException e)
+ }
+ finally
+ {
+ if (latchDone != null)
{
- interrupted.getAndIncrement();
+ latchDone.countDown();
}
}
}