merge #73 - a few fixes coming from former branches
This commit is contained in:
commit
28db9f9849
|
@ -275,4 +275,35 @@ public interface JMSQueueControl extends DestinationControl
|
||||||
*/
|
*/
|
||||||
void flushExecutor();
|
void flushExecutor();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lists all the messages scheduled for delivery for this queue.
|
||||||
|
* <br>
|
||||||
|
* 1 Map represents 1 message, keys are the message's properties and headers, values are the corresponding values.
|
||||||
|
*/
|
||||||
|
@Operation(desc = "List the messages scheduled for delivery", impact = MBeanOperationInfo.INFO)
|
||||||
|
Map<String, Object>[] listScheduledMessages() throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lists all the messages scheduled for delivery for this queue using JSON serialization.
|
||||||
|
*/
|
||||||
|
@Operation(desc = "List the messages scheduled for delivery and returns them using JSON", impact = MBeanOperationInfo.INFO)
|
||||||
|
String listScheduledMessagesAsJSON() throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Lists all the messages being deliver per consumer.
|
||||||
|
* <br>
|
||||||
|
* The Map's key is a toString representation for the consumer. Each consumer will then return a Map<String,Object>[] same way is returned by {@link #listScheduledMessages()}
|
||||||
|
*/
|
||||||
|
@Operation(desc = "List all messages being delivered per consumer")
|
||||||
|
Map<String, Map<String, Object>[]> listDeliveringMessages() throws Exception;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executes a conversion of {@link #listDeliveringMessages()} to JSON
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Operation(desc = "list all messages being delivered per consumer using JSON form")
|
||||||
|
String listDeliveringMessagesAsJSON() throws Exception;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.jms.management.impl;
|
||||||
|
|
||||||
import javax.management.MBeanInfo;
|
import javax.management.MBeanInfo;
|
||||||
import javax.management.StandardMBean;
|
import javax.management.StandardMBean;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.activemq.api.core.ActiveMQException;
|
import org.apache.activemq.api.core.ActiveMQException;
|
||||||
|
@ -183,6 +184,16 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
|
||||||
String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
|
String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
|
||||||
Map<String, Object>[] coreMessages = coreQueueControl.listMessages(filter);
|
Map<String, Object>[] coreMessages = coreQueueControl.listMessages(filter);
|
||||||
|
|
||||||
|
return toJMSMap(coreMessages);
|
||||||
|
}
|
||||||
|
catch (ActiveMQException e)
|
||||||
|
{
|
||||||
|
throw new IllegalStateException(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, Object>[] toJMSMap(Map<String, Object>[] coreMessages)
|
||||||
|
{
|
||||||
Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
|
Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
|
||||||
|
|
||||||
int i = 0;
|
int i = 0;
|
||||||
|
@ -194,12 +205,51 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
|
||||||
}
|
}
|
||||||
return jmsMessages;
|
return jmsMessages;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Object>[] listScheduledMessages() throws Exception
|
||||||
|
{
|
||||||
|
Map<String, Object>[] coreMessages = coreQueueControl.listScheduledMessages();
|
||||||
|
|
||||||
|
return toJMSMap(coreMessages);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String listScheduledMessagesAsJSON() throws Exception
|
||||||
|
{
|
||||||
|
return coreQueueControl.listScheduledMessagesAsJSON();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Map<String, Object>[]> listDeliveringMessages() throws Exception
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
Map<String, Map<String, Object>[]> returnMap = new HashMap<String, Map<String, Object>[]>();
|
||||||
|
|
||||||
|
|
||||||
|
// the workingMap from the queue-control
|
||||||
|
Map<String, Map<String, Object>[]> workingMap = coreQueueControl.listDeliveringMessages();
|
||||||
|
|
||||||
|
for (Map.Entry<String, Map<String, Object>[]> entry : workingMap.entrySet())
|
||||||
|
{
|
||||||
|
returnMap.put(entry.getKey(), toJMSMap(entry.getValue()));
|
||||||
|
}
|
||||||
|
|
||||||
|
return returnMap;
|
||||||
|
}
|
||||||
catch (ActiveMQException e)
|
catch (ActiveMQException e)
|
||||||
{
|
{
|
||||||
throw new IllegalStateException(e.getMessage());
|
throw new IllegalStateException(e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String listDeliveringMessagesAsJSON() throws Exception
|
||||||
|
{
|
||||||
|
return coreQueueControl.listDeliveringMessagesAsJSON();
|
||||||
|
}
|
||||||
|
|
||||||
public String listMessagesAsJSON(final String filter) throws Exception
|
public String listMessagesAsJSON(final String filter) throws Exception
|
||||||
{
|
{
|
||||||
return JMSQueueControlImpl.toJSON(listMessages(filter));
|
return JMSQueueControlImpl.toJSON(listMessages(filter));
|
||||||
|
|
|
@ -134,16 +134,14 @@ public class ConsumerStuckTest extends ServiceTestBase
|
||||||
|
|
||||||
long timeout = System.currentTimeMillis() + 20000;
|
long timeout = System.currentTimeMillis() + 20000;
|
||||||
|
|
||||||
while (System.currentTimeMillis() < timeout && server.getSessions().size() != 0)
|
long timeStart = System.currentTimeMillis();
|
||||||
|
|
||||||
|
while (timeout > System.currentTimeMillis() && server.getSessions().size() != 0 && server.getConnectionCount() != 0)
|
||||||
{
|
{
|
||||||
Thread.sleep(10);
|
Thread.sleep(10);
|
||||||
}
|
}
|
||||||
|
|
||||||
System.out.println("Size = " + server.getConnectionCount());
|
System.out.println("Time = " + System.currentTimeMillis() + " time diff = " + (System.currentTimeMillis() - timeStart) + ", connections Size = " + server.getConnectionCount() + " sessions = " + server.getSessions().size());
|
||||||
|
|
||||||
System.out.println("sessions = " + server.getSessions().size());
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if (server.getSessions().size() != 0)
|
if (server.getSessions().size() != 0)
|
||||||
{
|
{
|
||||||
|
@ -151,14 +149,16 @@ public class ConsumerStuckTest extends ServiceTestBase
|
||||||
fail("The cleanup wasn't able to finish cleaning the session. It's probably stuck, look at the thread dump generated by the test for more information");
|
fail("The cleanup wasn't able to finish cleaning the session. It's probably stuck, look at the thread dump generated by the test for more information");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
System.out.println("Size = " + server.getConnectionCount());
|
||||||
|
|
||||||
timeout = System.currentTimeMillis() + 20000;
|
System.out.println("sessions = " + server.getSessions().size());
|
||||||
|
|
||||||
while (System.currentTimeMillis() < timeout && server.getConnectionCount() != 0)
|
|
||||||
|
if (server.getSessions().size() != 0)
|
||||||
{
|
{
|
||||||
Thread.sleep(10);
|
System.out.println(threadDump("Thread dump"));
|
||||||
|
fail("The cleanup wasn't able to finish cleaning the session. It's probably stuck, look at the thread dump generated by the test for more information");
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEquals(0, server.getConnectionCount());
|
assertEquals(0, server.getConnectionCount());
|
||||||
}
|
}
|
||||||
finally
|
finally
|
||||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.activemq.core.settings.impl.AddressFullMessagePolicy;
|
||||||
import org.apache.activemq.core.settings.impl.AddressSettings;
|
import org.apache.activemq.core.settings.impl.AddressSettings;
|
||||||
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
|
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.jms.client.ActiveMQDestination;
|
import org.apache.activemq.jms.client.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.jms.client.ActiveMQJMSConnectionFactory;
|
||||||
import org.apache.activemq.jms.client.ActiveMQQueue;
|
import org.apache.activemq.jms.client.ActiveMQQueue;
|
||||||
import org.apache.activemq.jms.server.impl.JMSServerManagerImpl;
|
import org.apache.activemq.jms.server.impl.JMSServerManagerImpl;
|
||||||
import org.apache.activemq.jms.server.management.JMSNotificationType;
|
import org.apache.activemq.jms.server.management.JMSNotificationType;
|
||||||
|
@ -157,6 +158,56 @@ public class JMSQueueControlTest extends ManagementTestBase
|
||||||
Assert.assertEquals(0, data.length);
|
Assert.assertEquals(0, data.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testListDeliveringMessages() throws Exception
|
||||||
|
{
|
||||||
|
JMSQueueControl queueControl = createManagementControl();
|
||||||
|
|
||||||
|
Assert.assertEquals(0, queueControl.getMessageCount());
|
||||||
|
|
||||||
|
String[] ids = JMSUtil.sendMessages(queue, 20);
|
||||||
|
|
||||||
|
ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory)ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
|
||||||
|
new TransportConfiguration(InVMConnectorFactory.class.getName()));
|
||||||
|
|
||||||
|
Connection conn = cf.createConnection();
|
||||||
|
conn.start();
|
||||||
|
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
|
||||||
|
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
|
||||||
|
|
||||||
|
for (int i = 0; i < 20; i++)
|
||||||
|
{
|
||||||
|
Assert.assertNotNull(consumer.receive(5000));
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(20, queueControl.getMessageCount());
|
||||||
|
|
||||||
|
Map<String, Map<String, Object>[]> deliverings = queueControl.listDeliveringMessages();
|
||||||
|
|
||||||
|
// Just one consumer.. so just one queue
|
||||||
|
Assert.assertEquals(1, deliverings.size());
|
||||||
|
|
||||||
|
|
||||||
|
for (Map.Entry<String, Map<String, Object>[]> deliveryEntry : deliverings.entrySet())
|
||||||
|
{
|
||||||
|
System.out.println("Key:" + deliveryEntry.getKey());
|
||||||
|
|
||||||
|
for (int i = 0; i < 20; i++)
|
||||||
|
{
|
||||||
|
Assert.assertEquals(ids[i], deliveryEntry.getValue()[i].get("JMSMessageID").toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
session.rollback();
|
||||||
|
session.close();
|
||||||
|
|
||||||
|
JMSUtil.consumeMessages(20, queue);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testListMessagesAsJSONWithNullFilter() throws Exception
|
public void testListMessagesAsJSONWithNullFilter() throws Exception
|
||||||
{
|
{
|
||||||
|
|
|
@ -32,6 +32,8 @@ import org.apache.activemq.api.jms.management.JMSQueueControl;
|
||||||
import org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory;
|
import org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory;
|
||||||
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
|
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.jms.client.ActiveMQQueue;
|
import org.apache.activemq.jms.client.ActiveMQQueue;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -82,6 +84,15 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest
|
||||||
super.tearDown();
|
super.tearDown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Ignore
|
||||||
|
@Override
|
||||||
|
@Test
|
||||||
|
public void testListDeliveringMessages() throws Exception
|
||||||
|
{
|
||||||
|
// I'm not implementing the required proxy for this test on this JMS test
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected JMSQueueControl createManagementControl() throws Exception
|
protected JMSQueueControl createManagementControl() throws Exception
|
||||||
{
|
{
|
||||||
|
@ -197,6 +208,30 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest
|
||||||
return (String)proxy.invokeOperation("listMessageCounterHistory");
|
return (String)proxy.invokeOperation("listMessageCounterHistory");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Object>[] listScheduledMessages() throws Exception
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String listScheduledMessagesAsJSON() throws Exception
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<String, Map<String, Object>[]> listDeliveringMessages() throws Exception
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String listDeliveringMessagesAsJSON() throws Exception
|
||||||
|
{
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
public String listMessageCounterHistoryAsHTML() throws Exception
|
public String listMessageCounterHistoryAsHTML() throws Exception
|
||||||
{
|
{
|
||||||
return (String)proxy.invokeOperation("listMessageCounterHistoryAsHTML");
|
return (String)proxy.invokeOperation("listMessageCounterHistoryAsHTML");
|
||||||
|
|
|
@ -99,7 +99,7 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
|
||||||
spec.setDestination(MDBQUEUE);
|
spec.setDestination(MDBQUEUE);
|
||||||
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
|
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
|
||||||
CountDownLatch latch = new CountDownLatch(15);
|
CountDownLatch latch = new CountDownLatch(15);
|
||||||
MultipleEndpoints endpoint = new MultipleEndpoints(latch, false);
|
MultipleEndpoints endpoint = new MultipleEndpoints(latch, null, false);
|
||||||
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
|
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
|
||||||
qResourceAdapter.endpointActivation(endpointFactory, spec);
|
qResourceAdapter.endpointActivation(endpointFactory, spec);
|
||||||
ClientSession session = locator.createSessionFactory().createSession();
|
ClientSession session = locator.createSessionFactory().createSession();
|
||||||
|
@ -132,7 +132,8 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
|
||||||
spec.setDestination(MDBQUEUE);
|
spec.setDestination(MDBQUEUE);
|
||||||
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
|
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
|
||||||
CountDownLatch latch = new CountDownLatch(SIZE);
|
CountDownLatch latch = new CountDownLatch(SIZE);
|
||||||
MultipleEndpoints endpoint = new MultipleEndpoints(latch, true);
|
CountDownLatch latchDone = new CountDownLatch(SIZE);
|
||||||
|
MultipleEndpoints endpoint = new MultipleEndpoints(latch, latchDone, true);
|
||||||
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
|
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
|
||||||
qResourceAdapter.endpointActivation(endpointFactory, spec);
|
qResourceAdapter.endpointActivation(endpointFactory, spec);
|
||||||
ClientSession session = locator.createSessionFactory().createSession();
|
ClientSession session = locator.createSessionFactory().createSession();
|
||||||
|
@ -148,6 +149,8 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
|
||||||
|
|
||||||
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
|
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
|
||||||
|
|
||||||
|
latchDone.await(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
assertEquals(SIZE, endpoint.messages.intValue());
|
assertEquals(SIZE, endpoint.messages.intValue());
|
||||||
assertEquals(0, endpoint.interrupted.intValue());
|
assertEquals(0, endpoint.interrupted.intValue());
|
||||||
|
|
||||||
|
@ -169,7 +172,8 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
|
||||||
spec.setDestination(MDBQUEUE);
|
spec.setDestination(MDBQUEUE);
|
||||||
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
|
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
|
||||||
CountDownLatch latch = new CountDownLatch(SIZE);
|
CountDownLatch latch = new CountDownLatch(SIZE);
|
||||||
MultipleEndpoints endpoint = new MultipleEndpoints(latch, true);
|
CountDownLatch latchDone = new CountDownLatch(SIZE);
|
||||||
|
MultipleEndpoints endpoint = new MultipleEndpoints(latch, latchDone, true);
|
||||||
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
|
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, false);
|
||||||
qResourceAdapter.endpointActivation(endpointFactory, spec);
|
qResourceAdapter.endpointActivation(endpointFactory, spec);
|
||||||
ClientSession session = locator.createSessionFactory().createSession();
|
ClientSession session = locator.createSessionFactory().createSession();
|
||||||
|
@ -185,6 +189,8 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
|
||||||
|
|
||||||
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
|
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
|
||||||
|
|
||||||
|
latchDone.await(5, TimeUnit.SECONDS);
|
||||||
|
|
||||||
assertEquals(SIZE, endpoint.messages.intValue());
|
assertEquals(SIZE, endpoint.messages.intValue());
|
||||||
//half onmessage interrupted
|
//half onmessage interrupted
|
||||||
assertEquals(SIZE / 2, endpoint.interrupted.intValue());
|
assertEquals(SIZE / 2, endpoint.interrupted.intValue());
|
||||||
|
@ -856,14 +862,16 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
|
||||||
class MultipleEndpoints extends DummyMessageEndpoint
|
class MultipleEndpoints extends DummyMessageEndpoint
|
||||||
{
|
{
|
||||||
private final CountDownLatch latch;
|
private final CountDownLatch latch;
|
||||||
|
private final CountDownLatch latchDone;
|
||||||
private final boolean pause;
|
private final boolean pause;
|
||||||
AtomicInteger messages = new AtomicInteger(0);
|
AtomicInteger messages = new AtomicInteger(0);
|
||||||
AtomicInteger interrupted = new AtomicInteger(0);
|
AtomicInteger interrupted = new AtomicInteger(0);
|
||||||
|
|
||||||
public MultipleEndpoints(CountDownLatch latch, boolean pause)
|
public MultipleEndpoints(CountDownLatch latch, CountDownLatch latchDone, boolean pause)
|
||||||
{
|
{
|
||||||
super(latch);
|
super(latch);
|
||||||
this.latch = latch;
|
this.latch = latch;
|
||||||
|
this.latchDone = latchDone;
|
||||||
this.pause = pause;
|
this.pause = pause;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -887,18 +895,28 @@ public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onMessage(Message message)
|
public void onMessage(Message message)
|
||||||
|
{
|
||||||
|
try
|
||||||
{
|
{
|
||||||
latch.countDown();
|
latch.countDown();
|
||||||
if (pause && messages.getAndIncrement() % 2 == 0)
|
if (pause && messages.getAndIncrement() % 2 == 0)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
IntegrationTestLogger.LOGGER.info("pausing for 2 secs");
|
System.out.println("pausing for 2 secs");
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
}
|
}
|
||||||
catch (InterruptedException e)
|
catch (InterruptedException e)
|
||||||
{
|
{
|
||||||
interrupted.getAndIncrement();
|
interrupted.incrementAndGet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
if (latchDone != null)
|
||||||
|
{
|
||||||
|
latchDone.countDown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue