Added more operations to the JMX beans.

- broker view can now create and destroy destinations
 - queue view can now copy messages to other destinations.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@382532 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-02 22:11:25 +00:00
parent 1229c23bc4
commit ed5d43b04f
12 changed files with 122 additions and 28 deletions

View File

@ -866,6 +866,7 @@ public class BrokerService implements Service {
if (isUseJmx()) {
ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
managedBroker.setContextBroker(broker);
BrokerViewMBean view = new BrokerView(managedBroker, getMemoryManager());
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
ObjectName objectName = getBrokerObjectName();

View File

@ -17,8 +17,11 @@
package org.apache.activemq.broker.jmx;
import javax.management.ObjectName;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.memory.UsageManager;
public class BrokerView implements BrokerViewMBean {
@ -120,5 +123,27 @@ public class BrokerView implements BrokerViewMBean {
public ObjectName[] getInactiveDurableTopicSubscribers(){
return broker.getInactiveDurableTopicSubscribers();
}
public void addTopic(String name) throws Throwable {
broker.addDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQTopic(name));
}
public void addQueue(String name) throws Throwable {
broker.addDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name));
}
public void removeTopic(String name) throws Throwable {
broker.removeDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQTopic(name), 1000);
}
public void removeQueue(String name) throws Throwable {
broker.removeDestination(getConnectionContext(broker.getContextBroker()), new ActiveMQQueue(name), 1000);
}
static public ConnectionContext getConnectionContext(Broker broker) {
ConnectionContext context = new ConnectionContext();
context.setBroker(broker);
return context;
}
}

View File

@ -50,5 +50,9 @@ public interface BrokerViewMBean extends Service {
public ObjectName[] getTemporaryTopicSubscribers();
public ObjectName[] getTemporaryQueueSubscribers();
public void addTopic(String name) throws Throwable;
public void addQueue(String name) throws Throwable;
public void removeTopic(String name) throws Throwable;
public void removeQueue(String name) throws Throwable;
}

View File

@ -20,6 +20,7 @@ import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.command.ActiveMQMessage;
@ -27,8 +28,10 @@ import org.apache.activemq.command.Message;
public class DestinationView {
protected final Destination destination;
protected final ManagedRegionBroker broker;
public DestinationView(Destination destination){
public DestinationView(ManagedRegionBroker broker, Destination destination){
this.broker = broker;
this.destination=destination;
}

View File

@ -16,13 +16,13 @@ package org.apache.activemq.broker.jmx;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
@ -32,11 +32,12 @@ import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker;
@ -58,8 +59,8 @@ import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
public class ManagedRegionBroker extends RegionBroker{
private static final Log log=LogFactory.getLog(ManagedRegionBroker.class);
private final MBeanServer mbeanServer;
@ -76,6 +77,9 @@ public class ManagedRegionBroker extends RegionBroker{
private final Map temporaryTopicSubscribers=new ConcurrentHashMap();
private final Map subscriptionKeys = new ConcurrentHashMap();
private final Map subscriptionMap = new ConcurrentHashMap();
/* This is the first broker in the broker interceptor chain. */
private Broker contextBroker;
public ManagedRegionBroker(BrokerService brokerService,MBeanServer mbeanServer,ObjectName brokerObjectName,
TaskRunnerFactory taskRunnerFactory,UsageManager memoryManager,PersistenceAdapter adapter,
@ -119,9 +123,9 @@ public class ManagedRegionBroker extends RegionBroker{
ObjectName destObjectName=new ObjectName(brokerObjectName.getDomain(),map);
DestinationView view;
if(destination instanceof Queue){
view=new QueueView((Queue) destination);
view=new QueueView(this, (Queue) destination);
}else{
view=new TopicView((Topic) destination);
view=new TopicView(this, (Topic) destination);
}
registerDestination(destObjectName,destName,view);
}catch(Exception e){
@ -387,4 +391,12 @@ public class ManagedRegionBroker extends RegionBroker{
Set set = inactiveDurableTopicSubscribers.keySet();
return (ObjectName[])set.toArray(new ObjectName[set.size()]);
}
public Broker getContextBroker() {
return contextBroker;
}
public void setContextBroker(Broker contextBroker) {
this.contextBroker = contextBroker;
}
}

View File

@ -16,10 +16,11 @@ package org.apache.activemq.broker.jmx;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.OpenDataException;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
public class QueueView extends DestinationView implements QueueViewMBean{
public QueueView(Queue destination){
super(destination);
public QueueView(ManagedRegionBroker broker, Queue destination){
super(broker, destination);
}
public CompositeData getMessage(String messageId) throws OpenDataException{
@ -36,4 +37,8 @@ public class QueueView extends DestinationView implements QueueViewMBean{
public void purge(){
((Queue) destination).purge();
}
public boolean copyMessageTo(String messageId, String destinationName) throws Throwable {
return ((Queue) destination).copyMessageTo(BrokerView.getConnectionContext(broker.getContextBroker()), messageId, ActiveMQDestination.createDestination(destinationName, ActiveMQDestination.QUEUE_TYPE));
}
}

View File

@ -39,4 +39,5 @@ public interface QueueViewMBean {
public void removeMessage(String messageId);
public void purge();
public boolean copyMessageTo(String messageId, String destinationName) throws Throwable;
}

View File

@ -16,7 +16,7 @@ package org.apache.activemq.broker.jmx;
import org.apache.activemq.broker.region.Topic;
public class TopicView extends DestinationView implements TopicViewMBean{
public TopicView(Topic destination){
super(destination);
public TopicView(ManagedRegionBroker broker, Topic destination){
super(broker, destination);
}
}

View File

@ -24,6 +24,7 @@ import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import javax.jms.InvalidSelectorException;
@ -168,23 +169,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
// The original destination and transaction id do not get filled when the message is first
// sent,
// it is only populated if the message is routed to another destination like the DLQ
if(message.getOriginalDestination()!=null)
message.setOriginalDestination(message.getDestination());
if(message.getOriginalTransactionId()!=null)
message.setOriginalTransactionId(message.getTransactionId());
DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message
.getDestination());
message.setDestination(deadLetterDestination);
message.setTransactionId(null);
message.evictMarshlledForm();
boolean originalFlowControl=context.isProducerFlowControl();
try{
context.setProducerFlowControl(false);
context.getBroker().send(context,message);
}finally{
context.setProducerFlowControl(originalFlowControl);
}
ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message.getDestination());
BrokerSupport.resend(context, message, deadLetterDestination);
}
}finally{
node.decrementReferenceCount();

View File

@ -42,6 +42,7 @@ import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -500,6 +501,28 @@ public class Queue implements Destination {
}
}
public boolean copyMessageTo(ConnectionContext context, String messageId, ActiveMQDestination dest) throws Throwable {
synchronized (messages) {
for (Iterator iter = messages.iterator(); iter.hasNext();) {
try {
MessageReference r = (MessageReference) iter.next();
if (messageId.equals(r.getMessageId().toString())) {
r.incrementReferenceCount();
try {
Message m = r.getMessage();
BrokerSupport.resend(context, m, dest);
} finally {
r.decrementReferenceCount();
}
break;
}
} catch (IOException e) {
}
}
}
return false;
}
}

View File

@ -0,0 +1,32 @@
package org.apache.activemq.util;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
public class BrokerSupport {
/**
* @param context
* @param message
* @param deadLetterDestination
* @throws Throwable
*/
static public void resend(final ConnectionContext context, Message message, ActiveMQDestination deadLetterDestination) throws Throwable {
if(message.getOriginalDestination()!=null)
message.setOriginalDestination(message.getDestination());
if(message.getOriginalTransactionId()!=null)
message.setOriginalTransactionId(message.getTransactionId());
message.setDestination(deadLetterDestination);
message.setTransactionId(null);
message.evictMarshlledForm();
boolean originalFlowControl=context.isProducerFlowControl();
try{
context.setProducerFlowControl(false);
context.getBroker().send(context,message);
}finally{
context.setProducerFlowControl(originalFlowControl);
}
}
}

View File

@ -1,7 +1,7 @@
#
# The logging properties used during tests..
#
log4j.rootLogger=INFO, out
log4j.rootLogger=DEBUG, stdout
log4j.logger.org.apache.activemq.spring=WARN