- Queues can now be browsed, messages deleted, or queue can be purged.
- Added initial hooks to expose subscriptions for JMX managment.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@375654 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-02-07 18:34:21 +00:00
parent db747b49d9
commit e6f5d3026e
13 changed files with 625 additions and 41 deletions

View File

@ -16,12 +16,16 @@
*/
package org.apache.activemq.broker.jmx;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.QueueRegion;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -35,6 +39,17 @@ public class ManagedQueueRegion extends QueueRegion {
regionBroker = broker;
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
Subscription sub = super.createSubscription(context, info);
regionBroker.registerSubscription(sub);
return sub;
}
protected void destroySubscription(Subscription sub) {
regionBroker.unregisterSubscription(sub);
super.destroySubscription(sub);
}
protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
Destination rc = super.createDestination(destination);
regionBroker.register(destination, rc);

View File

@ -18,8 +18,11 @@ package org.apache.activemq.broker.jmx;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.memory.UsageManager;
@ -68,9 +71,15 @@ public class ManagedRegionBroker extends RegionBroker {
map.put("Destination", JMXSupport.encodeObjectNamePart(destName.getPhysicalName()));
ObjectName destObjectName= new ObjectName(brokerObjectName.getDomain(), map);
DestinationViewMBean view = new DestinationView(destination);
Object view;
if( destination instanceof Queue ) {
view = new QueueView((Queue) destination);
} else {
view = new TopicView((Topic) destination);
}
mbeanServer.registerMBean(view, destObjectName);
}
public void unregister(ActiveMQDestination destName) throws Throwable {
@ -82,4 +91,12 @@ public class ManagedRegionBroker extends RegionBroker {
mbeanServer.unregisterMBean(destObjectName);
}
public void registerSubscription(Subscription sub) {
// TODO: Use this to expose subscriptions to the JMX bus for management
}
public void unregisterSubscription(Subscription sub) {
// TODO: Use this to expose subscriptions to the JMX bus for management
}
}

View File

@ -16,24 +16,37 @@
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.broker.Broker;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TempQueueRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
public class ManagedTempQueueRegion extends TempQueueRegion {
private final ManagedRegionBroker regionBroker;
public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory) {
super(regionBroker,destinationStatistics, memoryManager, taskRunnerFactory);
this.regionBroker = regionBroker;
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
Subscription sub = super.createSubscription(context, info);
regionBroker.registerSubscription(sub);
return sub;
}
protected void destroySubscription(Subscription sub) {
regionBroker.unregisterSubscription(sub);
super.destroySubscription(sub);
}
protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
Destination rc = super.createDestination(destination);

View File

@ -16,12 +16,15 @@
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.broker.Broker;
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TempTopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -33,6 +36,17 @@ public class ManagedTempTopicRegion extends TempTopicRegion {
super(regionBroker,destinationStatistics, memoryManager, taskRunnerFactory);
this.regionBroker = regionBroker;
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
Subscription sub = super.createSubscription(context, info);
regionBroker.registerSubscription(sub);
return sub;
}
protected void destroySubscription(Subscription sub) {
regionBroker.unregisterSubscription(sub);
super.destroySubscription(sub);
}
protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
Destination rc = super.createDestination(destination);

View File

@ -16,12 +16,16 @@
*/
package org.apache.activemq.broker.jmx;
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
@ -35,6 +39,17 @@ public class ManagedTopicRegion extends TopicRegion {
regionBroker = broker;
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
Subscription sub = super.createSubscription(context, info);
regionBroker.registerSubscription(sub);
return sub;
}
protected void destroySubscription(Subscription sub) {
regionBroker.unregisterSubscription(sub);
super.destroySubscription(sub);
}
protected Destination createDestination(ActiveMQDestination destination) throws Throwable {
Destination rc = super.createDestination(destination);
regionBroker.register(destination, rc);

View File

@ -0,0 +1,257 @@
package org.apache.activemq.broker.jmx;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.management.openmbean.ArrayType;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.SimpleType;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.Message;
public class OpenTypeSupport {
interface OpenTypeFactory {
CompositeType getCompositeType() throws OpenDataException;
Map getFields( Object o ) throws OpenDataException;
}
private static final HashMap openTypeFactories = new HashMap();
abstract static class AbstractOpenTypeFactory implements OpenTypeFactory {
private CompositeType compositeType;
ArrayList itemNamesList = new ArrayList();
ArrayList itemDescriptionsList = new ArrayList();
ArrayList itemTypesList = new ArrayList();
public CompositeType getCompositeType() throws OpenDataException {
if( compositeType == null ) {
init();
compositeType = createCompositeType();
}
return compositeType;
}
protected void init() throws OpenDataException {
}
protected CompositeType createCompositeType() throws OpenDataException {
String[] itemNames = (String[]) itemNamesList.toArray(new String[itemNamesList.size()]);
String[] itemDescriptions = (String[]) itemDescriptionsList.toArray(new String[itemDescriptionsList.size()]);
OpenType[] itemTypes = (OpenType[]) itemTypesList.toArray(new OpenType[itemTypesList.size()]);
return new CompositeType(getTypeName(), getDescription(), itemNames, itemDescriptions, itemTypes);
}
abstract protected String getTypeName();
protected void addItem(String name, String description, OpenType type) {
itemNamesList.add(name);
itemDescriptionsList.add(description);
itemTypesList.add(type);
}
protected String getDescription() {
return getTypeName();
}
public Map getFields(Object o) throws OpenDataException {
HashMap rc = new HashMap();
return rc;
}
}
static class MessageOpenTypeFactory extends AbstractOpenTypeFactory {
protected String getTypeName() {
return ActiveMQMessage.class.getName();
}
protected void init() throws OpenDataException {
super.init();
addItem("JMSCorrelationID", "JMSCorrelationID", SimpleType.STRING);
addItem("JMSDestination", "JMSDestination", SimpleType.STRING);
addItem("JMSMessageID", "JMSMessageID", SimpleType.STRING);
addItem("JMSReplyTo", "JMSReplyTo", SimpleType.STRING);
addItem("JMSType", "JMSType", SimpleType.STRING);
addItem("JMSDeliveryMode", "JMSDeliveryMode", SimpleType.STRING);
addItem("JMSExpiration", "JMSExpiration", SimpleType.LONG);
addItem("JMSPriority", "JMSPriority", SimpleType.INTEGER);
addItem("JMSRedelivered", "JMSRedelivered", SimpleType.BOOLEAN);
addItem("JMSTimestamp", "JMSTimestamp", SimpleType.DATE);
addItem("Properties", "Properties", SimpleType.STRING);
}
public Map getFields(Object o) throws OpenDataException {
ActiveMQMessage m = (ActiveMQMessage) o;
Map rc = super.getFields(o);
rc.put("JMSCorrelationID", m.getJMSCorrelationID());
rc.put("JMSDestination", ""+m.getJMSDestination());
rc.put("JMSMessageID", m.getJMSMessageID());
rc.put("JMSReplyTo", ""+m.getJMSReplyTo());
rc.put("JMSType", m.getJMSType());
rc.put("JMSDeliveryMode", m.getJMSDeliveryMode()==DeliveryMode.PERSISTENT ? "PERSISTENT" : "NON-PERSISTENT");
rc.put("JMSExpiration", new Long(m.getJMSExpiration()));
rc.put("JMSPriority", new Integer(m.getJMSPriority()));
rc.put("JMSRedelivered", new Boolean(m.getJMSRedelivered()));
rc.put("JMSTimestamp", new Date(m.getJMSTimestamp()));
try {
rc.put("Properties", ""+m.getProperties());
} catch (IOException e) {
rc.put("Properties", "");
}
return rc;
}
}
static class ByteMessageOpenTypeFactory extends MessageOpenTypeFactory {
protected String getTypeName() {
return ActiveMQBytesMessage.class.getName();
}
protected void init() throws OpenDataException {
super.init();
addItem("BodyLength", "Body length", SimpleType.LONG);
addItem("BodyPreview", "Body preview", new ArrayType(1,SimpleType.BYTE));
}
public Map getFields(Object o) throws OpenDataException {
ActiveMQBytesMessage m = (ActiveMQBytesMessage) o;
Map rc = super.getFields(o);
long length=0;
try {
length = m.getBodyLength();
rc.put("BodyLength", new Long(length));
} catch (JMSException e) {
rc.put("BodyLength", new Long(0));
}
try {
byte preview[] = new byte[ (int)Math.min(length, 255) ];
m.readBytes(preview);
rc.put("BodyPreview", preview);
} catch (JMSException e) {
rc.put("BodyPreview", new byte[]{});
}
return rc;
}
}
static class MapMessageOpenTypeFactory extends MessageOpenTypeFactory {
protected String getTypeName() {
return ActiveMQMapMessage.class.getName();
}
protected void init() throws OpenDataException {
super.init();
addItem("ContentMap", "Content map", SimpleType.STRING);
}
public Map getFields(Object o) throws OpenDataException {
ActiveMQMapMessage m = (ActiveMQMapMessage) o;
Map rc = super.getFields(o);
long length=0;
try {
rc.put("ContentMap", ""+m.getContentMap());
} catch (JMSException e) {
rc.put("ContentMap", "");
}
return rc;
}
}
static class ObjectMessageOpenTypeFactory extends MessageOpenTypeFactory {
protected String getTypeName() {
return ActiveMQObjectMessage.class.getName();
}
protected void init() throws OpenDataException {
super.init();
}
public Map getFields(Object o) throws OpenDataException {
ActiveMQObjectMessage m = (ActiveMQObjectMessage) o;
Map rc = super.getFields(o);
return rc;
}
}
static class StreamMessageOpenTypeFactory extends MessageOpenTypeFactory {
protected String getTypeName() {
return ActiveMQStreamMessage.class.getName();
}
protected void init() throws OpenDataException {
super.init();
}
public Map getFields(Object o) throws OpenDataException {
ActiveMQStreamMessage m = (ActiveMQStreamMessage) o;
Map rc = super.getFields(o);
return rc;
}
}
static class TextMessageOpenTypeFactory extends MessageOpenTypeFactory {
protected String getTypeName() {
return ActiveMQTextMessage.class.getName();
}
protected void init() throws OpenDataException {
super.init();
addItem("Text", "Text", SimpleType.STRING);
}
public Map getFields(Object o) throws OpenDataException {
ActiveMQTextMessage m = (ActiveMQTextMessage) o;
Map rc = super.getFields(o);
try {
rc.put("Text", ""+m.getText());
} catch (JMSException e) {
rc.put("Text", "");
}
return rc;
}
}
static {
openTypeFactories.put(ActiveMQMessage.class, new MessageOpenTypeFactory());
openTypeFactories.put(ActiveMQBytesMessage.class, new ByteMessageOpenTypeFactory());
openTypeFactories.put(ActiveMQMapMessage.class, new MapMessageOpenTypeFactory());
openTypeFactories.put(ActiveMQObjectMessage.class, new ObjectMessageOpenTypeFactory());
openTypeFactories.put(ActiveMQStreamMessage.class, new StreamMessageOpenTypeFactory());
openTypeFactories.put(ActiveMQTextMessage.class, new TextMessageOpenTypeFactory());
}
public static OpenTypeFactory getFactory(Class clazz) throws OpenDataException {
return (OpenTypeFactory) openTypeFactories.get(clazz);
}
public static CompositeData convert(Message message) throws OpenDataException {
OpenTypeFactory f = getFactory(message.getClass());
if( f == null )
throw new OpenDataException("Cannot create a CompositeData for type: "+message.getClass().getName());
CompositeType ct = f.getCompositeType();
Map fields = f.getFields(message);
return new CompositeDataSupport(ct, fields);
}
}

View File

@ -0,0 +1,113 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.Message;
public class QueueView implements QueueViewMBean {
private final Queue destination;
public QueueView(Queue destination) {
this.destination = destination;
}
public void gc() {
destination.gc();
}
public void resetStatistics() {
destination.getDestinationStatistics().reset();
}
public long getEnqueueCount() {
return destination.getDestinationStatistics().getEnqueues().getCount();
}
public long getDequeueCount() {
return destination.getDestinationStatistics().getDequeues().getCount();
}
public long getConsumerCount() {
return destination.getDestinationStatistics().getConsumers().getCount();
}
public long getMessages() {
return destination.getDestinationStatistics().getMessages().getCount();
}
public long getMessagesCached() {
return destination.getDestinationStatistics().getMessagesCached().getCount();
}
public CompositeData[] browse() throws OpenDataException {
Message[] messages = destination.browse();
CompositeData c[] = new CompositeData[messages.length];
for (int i = 0; i < c.length; i++) {
try {
System.out.println(messages[i].getMessageId());
c[i] = OpenTypeSupport.convert(messages[i]);
} catch (Throwable e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return c;
}
public TabularData browseAsTable() throws OpenDataException {
OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
Message[] messages = destination.browse();
CompositeType ct = factory.getCompositeType();
TabularType tt = new TabularType("MessageList", "MessageList", ct, new String[]{"JMSMessageID"});
TabularDataSupport rc = new TabularDataSupport(tt);
for (int i = 0; i < messages.length; i++) {
System.out.println(messages[i].getMessageId());
rc.put(new CompositeDataSupport(ct, factory.getFields(messages[i])));
}
return rc;
}
public CompositeData getMessage(String messageId) throws OpenDataException {
Message rc = destination.getMessage(messageId);
if( rc ==null )
return null;
return OpenTypeSupport.convert(rc);
}
public void removeMessage(String messageId) {
destination.removeMessage(messageId);
}
public void purge() {
destination.purge();
}
}

View File

@ -0,0 +1,42 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
public interface QueueViewMBean {
public void gc();
public void resetStatistics();
public long getEnqueueCount();
public long getDequeueCount();
public long getConsumerCount();
public long getMessages();
public long getMessagesCached();
public CompositeData[] browse() throws OpenDataException;
public TabularData browseAsTable() throws OpenDataException;
public CompositeData getMessage(String messageId) throws OpenDataException;
public void removeMessage(String messageId);
public void purge();
}

View File

@ -16,13 +16,13 @@
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Topic;
public class DestinationView implements DestinationViewMBean {
public class TopicView implements TopicViewMBean {
private final Destination destination;
private final Topic destination;
public DestinationView(Destination destination) {
public TopicView(Topic destination) {
this.destination = destination;
}

View File

@ -17,7 +17,7 @@
package org.apache.activemq.broker.jmx;
public interface DestinationViewMBean {
public interface TopicViewMBean {
public void gc();
public void resetStatistics();

View File

@ -170,6 +170,11 @@ abstract public class AbstractRegion implements Region {
Destination dest = (Destination) iter.next();
dest.removeSubscription(context, sub);
}
destroySubscription(sub);
}
protected void destroySubscription(Subscription sub) {
}
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Throwable {

View File

@ -16,7 +16,11 @@
*/
package org.apache.activemq.broker.region;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.group.MessageGroupHashBucket;
@ -41,10 +45,7 @@ import org.apache.activemq.transaction.Synchronization;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
/**
* The Queue is a List of MessageEntry objects that are dispatched to matching
@ -139,7 +140,7 @@ public class Queue implements Destination {
for (Iterator iter = messages.iterator(); iter.hasNext();) {
IndirectMessageReference node = (IndirectMessageReference) iter.next();
if (node.isDropped() ) {
if (node.isDropped()) {
continue;
}
@ -148,15 +149,13 @@ public class Queue implements Destination {
if (sub.matches(node, msgContext)) {
sub.add(node);
}
}
catch (IOException e) {
} catch (IOException e) {
log.warn("Could not load message: " + e, e);
}
}
}
}
finally {
} finally {
msgContext.clear();
dispatchValve.turnOn();
}
@ -193,17 +192,18 @@ public class Queue implements Destination {
MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
try {
msgContext.setDestination(destination);
for (Iterator iter = messages.iterator(); iter.hasNext();) {
IndirectMessageReference node = (IndirectMessageReference) iter.next();
if (node.isDropped() ) {
IndirectMessageReference node = (IndirectMessageReference) iter.next();
if (node.isDropped()) {
continue;
}
String groupID = node.getGroupID();
// Re-deliver all messages that the sub locked
if (node.getLockOwner() == sub || wasExclusiveOwner || (groupID != null && ownedGroups.contains(groupID))) {
if (node.getLockOwner() == sub || wasExclusiveOwner
|| (groupID != null && ownedGroups.contains(groupID))) {
node.incrementRedeliveryCounter();
node.unlock();
msgContext.setMessageReference(node);
@ -216,8 +216,7 @@ public class Queue implements Destination {
}
}
}
finally {
} finally {
dispatchValve.turnOn();
}
@ -225,9 +224,9 @@ public class Queue implements Destination {
public void send(final ConnectionContext context, final Message message) throws Throwable {
if( context.isProducerFlowControl() )
if (context.isProducerFlowControl())
usageManager.waitForSpace();
message.setRegionDestination(this);
if (store != null && message.isPersistent())
@ -242,8 +241,7 @@ public class Queue implements Destination {
dispatch(context, node, message);
}
});
}
else {
} else {
dispatch(context, node, message);
}
} finally {
@ -274,7 +272,7 @@ public class Queue implements Destination {
for (Iterator iter = messages.iterator(); iter.hasNext();) {
// Remove dropped messages from the queue.
IndirectMessageReference node = (IndirectMessageReference) iter.next();
if (node.isDropped()) {
if (node.isDropped()) {
garbageSize--;
iter.remove();
continue;
@ -283,7 +281,8 @@ public class Queue implements Destination {
}
}
public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException {
public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
final MessageReference node) throws IOException {
if (store != null && node.isPersistent()) {
store.removeMessage(context, ack);
}
@ -291,15 +290,16 @@ public class Queue implements Destination {
public Message loadMessage(MessageId messageId) throws IOException {
Message msg = store.getMessage(messageId);
if( msg!=null ) {
if (msg != null) {
msg.setRegionDestination(this);
}
return msg;
}
public String toString() {
return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + usageManager.getPercentUsage()
+ "%, size=" + messages.size() + ", in flight groups=" + messageGroupOwners;
return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size()
+ ", memory=" + usageManager.getPercentUsage() + "%, size=" + messages.size() + ", in flight groups="
+ messageGroupOwners;
}
public void start() throws Exception {
@ -324,7 +324,7 @@ public class Queue implements Destination {
public MessageGroupMap getMessageGroupOwners() {
if (messageGroupOwners == null) {
messageGroupOwners = new MessageGroupHashBucket(messageGroupHashBucketCount );
messageGroupOwners = new MessageGroupHashBucket(messageGroupHashBucketCount);
}
return messageGroupOwners;
}
@ -352,7 +352,6 @@ public class Queue implements Destination {
public void setMessageGroupHashBucketCount(int messageGroupHashBucketCount) {
this.messageGroupHashBucketCount = messageGroupHashBucketCount;
}
// Implementation methods
// -------------------------------------------------------------------------
@ -370,7 +369,7 @@ public class Queue implements Destination {
messages.add(node);
}
synchronized(consumers) {
synchronized (consumers) {
if (consumers.isEmpty()) {
log.debug("No subscriptions registered, will not dispatch message at this time.");
return;
@ -381,8 +380,7 @@ public class Queue implements Destination {
msgContext.setMessageReference(node);
dispatchPolicy.dispatch(context, node, msgContext, consumers);
}
finally {
} finally {
msgContext.clear();
dispatchValve.decrement();
}
@ -405,4 +403,93 @@ public class Queue implements Destination {
return store;
}
public Message[] browse() {
ArrayList l = new ArrayList();
synchronized (messages) {
for (Iterator iter = messages.iterator(); iter.hasNext();) {
try {
MessageReference r = (MessageReference) iter.next();
try {
Message m = r.getMessage();
if (m != null) {
l.add(m);
}
} finally {
r.decrementReferenceCount();
}
} catch (IOException e) {
}
}
}
return (Message[]) l.toArray(new Message[l.size()]);
}
public void removeMessage(String messageId) {
synchronized (messages) {
ConnectionContext c = new ConnectionContext();
for (Iterator iter = messages.iterator(); iter.hasNext();) {
try {
IndirectMessageReference r = (IndirectMessageReference) iter.next();
if (messageId.equals(r.getMessageId().toString())) {
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setDestination(destination);
ack.setMessageID(r.getMessageId());
acknowledge(c, null, ack, r);
r.drop();
dropEvent();
}
} catch (IOException e) {
}
}
}
}
public Message getMessage(String messageId) {
synchronized (messages) {
for (Iterator iter = messages.iterator(); iter.hasNext();) {
try {
MessageReference r = (MessageReference) iter.next();
if (messageId.equals(r.getMessageId().toString())) {
r.incrementReferenceCount();
try {
Message m = r.getMessage();
if (m != null) {
return m;
}
} finally {
r.decrementReferenceCount();
}
break;
}
} catch (IOException e) {
}
}
}
return null;
}
public void purge() {
synchronized (messages) {
ConnectionContext c = new ConnectionContext();
for (Iterator iter = messages.iterator(); iter.hasNext();) {
try {
IndirectMessageReference r = (IndirectMessageReference) iter.next();
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
ack.setDestination(destination);
ack.setMessageID(r.getMessageId());
acknowledge(c, null, ack, r);
r.drop();
dropEvent();
} catch (IOException e) {
}
}
}
}
}

View File

@ -24,6 +24,7 @@ import java.io.OutputStream;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
@ -661,4 +662,9 @@ public class ActiveMQMapMessage extends ActiveMQMessage implements MapMessage {
"theTable = " + map +
" }";
}
public Map getContentMap() throws JMSException {
initializeReading();
return map;
}
}