ACTIVEMQ6-73 Adding missing methods
https://issues.apache.org/jira/browse/ACTIVEMQ6-73 https://bugzilla.redhat.com/show_bug.cgi?id=1174152 this is adding missing methods on the Management interface as originally raised at https://bugzilla.redhat.com/show_bug.cgi?id=1174152 by RedHat
This commit is contained in:
parent
2ae6f36e74
commit
0eb1e332d3
|
@ -275,4 +275,35 @@ public interface JMSQueueControl extends DestinationControl
|
|||
*/
|
||||
void flushExecutor();
|
||||
|
||||
/**
|
||||
* Lists all the messages scheduled for delivery for this queue.
|
||||
* <br>
|
||||
* 1 Map represents 1 message, keys are the message's properties and headers, values are the corresponding values.
|
||||
*/
|
||||
@Operation(desc = "List the messages scheduled for delivery", impact = MBeanOperationInfo.INFO)
|
||||
Map<String, Object>[] listScheduledMessages() throws Exception;
|
||||
|
||||
/**
|
||||
* Lists all the messages scheduled for delivery for this queue using JSON serialization.
|
||||
*/
|
||||
@Operation(desc = "List the messages scheduled for delivery and returns them using JSON", impact = MBeanOperationInfo.INFO)
|
||||
String listScheduledMessagesAsJSON() throws Exception;
|
||||
|
||||
/**
|
||||
* Lists all the messages being deliver per consumer.
|
||||
* <br>
|
||||
* The Map's key is a toString representation for the consumer. Each consumer will then return a Map<String,Object>[] same way is returned by {@link #listScheduledMessages()}
|
||||
*/
|
||||
@Operation(desc = "List all messages being delivered per consumer")
|
||||
Map<String, Map<String, Object>[]> listDeliveringMessages() throws Exception;
|
||||
|
||||
/**
|
||||
* Executes a conversion of {@link #listDeliveringMessages()} to JSON
|
||||
*
|
||||
* @return
|
||||
* @throws Exception
|
||||
*/
|
||||
@Operation(desc = "list all messages being delivered per consumer using JSON form")
|
||||
String listDeliveringMessagesAsJSON() throws Exception;
|
||||
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.jms.management.impl;
|
|||
|
||||
import javax.management.MBeanInfo;
|
||||
import javax.management.StandardMBean;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.activemq.api.core.ActiveMQException;
|
||||
|
@ -183,16 +184,7 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
|
|||
String filter = JMSQueueControlImpl.createFilterFromJMSSelector(filterStr);
|
||||
Map<String, Object>[] coreMessages = coreQueueControl.listMessages(filter);
|
||||
|
||||
Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
|
||||
|
||||
int i = 0;
|
||||
|
||||
for (Map<String, Object> coreMessage : coreMessages)
|
||||
{
|
||||
Map<String, Object> jmsMessage = ActiveMQMessage.coreMaptoJMSMap(coreMessage);
|
||||
jmsMessages[i++] = jmsMessage;
|
||||
}
|
||||
return jmsMessages;
|
||||
return toJMSMap(coreMessages);
|
||||
}
|
||||
catch (ActiveMQException e)
|
||||
{
|
||||
|
@ -200,6 +192,64 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
|
|||
}
|
||||
}
|
||||
|
||||
private Map<String, Object>[] toJMSMap(Map<String, Object>[] coreMessages)
|
||||
{
|
||||
Map<String, Object>[] jmsMessages = new Map[coreMessages.length];
|
||||
|
||||
int i = 0;
|
||||
|
||||
for (Map<String, Object> coreMessage : coreMessages)
|
||||
{
|
||||
Map<String, Object> jmsMessage = ActiveMQMessage.coreMaptoJMSMap(coreMessage);
|
||||
jmsMessages[i++] = jmsMessage;
|
||||
}
|
||||
return jmsMessages;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object>[] listScheduledMessages() throws Exception
|
||||
{
|
||||
Map<String, Object>[] coreMessages = coreQueueControl.listScheduledMessages();
|
||||
|
||||
return toJMSMap(coreMessages);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String listScheduledMessagesAsJSON() throws Exception
|
||||
{
|
||||
return coreQueueControl.listScheduledMessagesAsJSON();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Map<String, Object>[]> listDeliveringMessages() throws Exception
|
||||
{
|
||||
try
|
||||
{
|
||||
Map<String, Map<String, Object>[]> returnMap = new HashMap<String, Map<String, Object>[]>();
|
||||
|
||||
|
||||
// the workingMap from the queue-control
|
||||
Map<String, Map<String, Object>[]> workingMap = coreQueueControl.listDeliveringMessages();
|
||||
|
||||
for (Map.Entry<String, Map<String, Object>[]> entry : workingMap.entrySet())
|
||||
{
|
||||
returnMap.put(entry.getKey(), toJMSMap(entry.getValue()));
|
||||
}
|
||||
|
||||
return returnMap;
|
||||
}
|
||||
catch (ActiveMQException e)
|
||||
{
|
||||
throw new IllegalStateException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String listDeliveringMessagesAsJSON() throws Exception
|
||||
{
|
||||
return coreQueueControl.listDeliveringMessagesAsJSON();
|
||||
}
|
||||
|
||||
public String listMessagesAsJSON(final String filter) throws Exception
|
||||
{
|
||||
return JMSQueueControlImpl.toJSON(listMessages(filter));
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.apache.activemq.core.settings.impl.AddressFullMessagePolicy;
|
|||
import org.apache.activemq.core.settings.impl.AddressSettings;
|
||||
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.jms.client.ActiveMQDestination;
|
||||
import org.apache.activemq.jms.client.ActiveMQJMSConnectionFactory;
|
||||
import org.apache.activemq.jms.client.ActiveMQQueue;
|
||||
import org.apache.activemq.jms.server.impl.JMSServerManagerImpl;
|
||||
import org.apache.activemq.jms.server.management.JMSNotificationType;
|
||||
|
@ -157,6 +158,56 @@ public class JMSQueueControlTest extends ManagementTestBase
|
|||
Assert.assertEquals(0, data.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListDeliveringMessages() throws Exception
|
||||
{
|
||||
JMSQueueControl queueControl = createManagementControl();
|
||||
|
||||
Assert.assertEquals(0, queueControl.getMessageCount());
|
||||
|
||||
String[] ids = JMSUtil.sendMessages(queue, 20);
|
||||
|
||||
ActiveMQJMSConnectionFactory cf = (ActiveMQJMSConnectionFactory)ActiveMQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType.CF,
|
||||
new TransportConfiguration(InVMConnectorFactory.class.getName()));
|
||||
|
||||
Connection conn = cf.createConnection();
|
||||
conn.start();
|
||||
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
|
||||
for (int i = 0; i < 20; i++)
|
||||
{
|
||||
Assert.assertNotNull(consumer.receive(5000));
|
||||
}
|
||||
|
||||
Assert.assertEquals(20, queueControl.getMessageCount());
|
||||
|
||||
Map<String, Map<String, Object>[]> deliverings = queueControl.listDeliveringMessages();
|
||||
|
||||
// Just one consumer.. so just one queue
|
||||
Assert.assertEquals(1, deliverings.size());
|
||||
|
||||
|
||||
for (Map.Entry<String, Map<String, Object>[]> deliveryEntry : deliverings.entrySet())
|
||||
{
|
||||
System.out.println("Key:" + deliveryEntry.getKey());
|
||||
|
||||
for (int i = 0; i < 20; i++)
|
||||
{
|
||||
Assert.assertEquals(ids[i], deliveryEntry.getValue()[i].get("JMSMessageID").toString());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
session.rollback();
|
||||
session.close();
|
||||
|
||||
JMSUtil.consumeMessages(20, queue);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testListMessagesAsJSONWithNullFilter() throws Exception
|
||||
{
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.apache.activemq.api.jms.management.JMSQueueControl;
|
|||
import org.apache.activemq.core.remoting.impl.invm.InVMConnectorFactory;
|
||||
import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.jms.client.ActiveMQQueue;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -82,6 +84,15 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest
|
|||
super.tearDown();
|
||||
}
|
||||
|
||||
@Ignore
|
||||
@Override
|
||||
@Test
|
||||
public void testListDeliveringMessages() throws Exception
|
||||
{
|
||||
// I'm not implementing the required proxy for this test on this JMS test
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected JMSQueueControl createManagementControl() throws Exception
|
||||
{
|
||||
|
@ -197,6 +208,30 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest
|
|||
return (String)proxy.invokeOperation("listMessageCounterHistory");
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object>[] listScheduledMessages() throws Exception
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String listScheduledMessagesAsJSON() throws Exception
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Map<String, Object>[]> listDeliveringMessages() throws Exception
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String listDeliveringMessagesAsJSON() throws Exception
|
||||
{
|
||||
return null;
|
||||
}
|
||||
|
||||
public String listMessageCounterHistoryAsHTML() throws Exception
|
||||
{
|
||||
return (String)proxy.invokeOperation("listMessageCounterHistoryAsHTML");
|
||||
|
|
Loading…
Reference in New Issue