This closes #97 first message management

This commit is contained in:
jbertram 2015-04-22 17:01:59 -05:00
commit 16137eabc7
7 changed files with 115 additions and 3 deletions

View File

@ -92,6 +92,16 @@ public interface QueueControl
*/
String getFirstMessageAsJSON() throws Exception;
/**
* Returns the timestamp of the first message in milliseconds.
*/
Long getFirstMessageTimestamp() throws Exception;
/**
* Returns the age of the first message in milliseconds.
*/
Long getFirstMessageAge() throws Exception;
/**
* Returns the expiry address associated to this queue.
*/

View File

@ -55,6 +55,21 @@ public interface JMSQueueControl extends DestinationControl
*/
String getSelector();
/**
* Returns the first message on the queue as JSON
*/
String getFirstMessageAsJSON() throws Exception;
/**
* Returns the timestamp of the first message in milliseconds.
*/
Long getFirstMessageTimestamp() throws Exception;
/**
* Returns the age of the first message in milliseconds.
*/
Long getFirstMessageAge() throws Exception;
// Operations ----------------------------------------------------
/**

View File

@ -146,6 +146,21 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
return coreQueueControl.getExpiryAddress();
}
public String getFirstMessageAsJSON() throws Exception
{
return coreQueueControl.getFirstMessageAsJSON();
}
public Long getFirstMessageTimestamp() throws Exception
{
return coreQueueControl.getFirstMessageTimestamp();
}
public Long getFirstMessageAge() throws Exception
{
return coreQueueControl.getFirstMessageAge();
}
@Override
public void addBinding(String binding) throws Exception
{

View File

@ -19,6 +19,7 @@ package org.apache.activemq.core.management.impl;
import javax.management.MBeanOperationInfo;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -486,7 +487,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl
}
}
public String getFirstMessageAsJSON() throws Exception
protected Map<String, Object>[] getFirstMessage() throws Exception
{
checkStarted();
@ -505,7 +506,7 @@ public class QueueControlImpl extends AbstractControl implements QueueControl
Message message = ref.getMessage();
messages.add(message.toMap());
}
return toJSON(messages.toArray(new Map[1])).toString();
return messages.toArray(new Map[1]);
}
finally
{
@ -519,6 +520,37 @@ public class QueueControlImpl extends AbstractControl implements QueueControl
}
public String getFirstMessageAsJSON() throws Exception
{
return toJSON(getFirstMessage()).toString();
}
public Long getFirstMessageTimestamp() throws Exception
{
Map<String, Object>[] _message = getFirstMessage();
if (_message == null || _message.length == 0 || _message[0] == null)
{
return null;
}
Map<String, Object> message = _message[0];
if (!message.containsKey("timestamp"))
{
return null;
}
return (Long)message.get("timestamp");
}
public Long getFirstMessageAge() throws Exception
{
Long firstMessageTimestamp = getFirstMessageTimestamp();
if (firstMessageTimestamp == null)
{
return null;
}
long now = new Date().getTime();
return now - firstMessageTimestamp.longValue();
}
public long countMessages(final String filterStr) throws Exception
{
checkStarted();

View File

@ -159,6 +159,21 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest
return (String)proxy.retrieveAttributeValue("expiryAddress");
}
public String getFirstMessageAsJSON() throws Exception
{
return (String)proxy.retrieveAttributeValue("firstMessageAsJSON");
}
public Long getFirstMessageTimestamp() throws Exception
{
return (Long)proxy.retrieveAttributeValue("firstMessageTimestamp");
}
public Long getFirstMessageAge() throws Exception
{
return (Long)proxy.retrieveAttributeValue("firstMessageAge");
}
public long getMessageCount()
{
return ((Number)proxy.retrieveAttributeValue("messageCount")).longValue();
@ -351,4 +366,4 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest
// Inner classes -------------------------------------------------
}
}

View File

@ -285,11 +285,21 @@ public class QueueControlTest extends ManagementTestBase
// It's empty, so it's supposed to be like this
assertEquals("[{}]", queueControl.getFirstMessageAsJSON());
long beforeSend = System.currentTimeMillis();
ClientProducer producer = session.createProducer(address);
producer.send(session.createMessage(false).putStringProperty("x", "valueX").putStringProperty("y", "valueY"));
System.out.println("first:" + queueControl.getFirstMessageAsJSON());
long firstMessageTimestamp = queueControl.getFirstMessageTimestamp();
System.out.println("first message timestamp: " + firstMessageTimestamp);
assertTrue(beforeSend <= firstMessageTimestamp);
assertTrue(firstMessageTimestamp <= System.currentTimeMillis());
long firstMessageAge = queueControl.getFirstMessageAge();
System.out.println("first message age: " + firstMessageAge);
assertTrue(firstMessageAge <= (System.currentTimeMillis() - firstMessageTimestamp));
session.deleteQueue(queue);
}

View File

@ -182,6 +182,21 @@ public class QueueControlUsingCoreTest extends QueueControlTest
return (String) proxy.invokeOperation("getFirstMessageAsJSON");
}
/**
* Returns the timestamp of the first message in milliseconds.
*/
public Long getFirstMessageTimestamp() throws Exception
{
return (Long) proxy.invokeOperation("getFirstMessageTimestamp");
}
/**
* Returns the age of the first message in milliseconds.
*/
public Long getFirstMessageAge() throws Exception
{
return (Long) proxy.invokeOperation("getFirstMessageAge");
}
public String listMessageCounterHistoryAsHTML() throws Exception
{