mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@812514 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
876ddef594
commit
6fc3744c73
|
@ -16,6 +16,8 @@
|
|||
*/
|
||||
package org.apache.activemq.broker.jmx;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.net.URI;
|
||||
|
@ -346,4 +348,13 @@ public class BrokerView implements BrokerViewMBean {
|
|||
URI answer = brokerService.getVmConnectorURI();
|
||||
return answer != null ? answer.toString() : "";
|
||||
}
|
||||
|
||||
public String getDataDirectory() {
|
||||
File file = brokerService.getDataDirectoryFile();
|
||||
try {
|
||||
return file != null ? file.getCanonicalPath():"";
|
||||
} catch (IOException e) {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -238,4 +238,7 @@ public interface BrokerViewMBean extends Service {
|
|||
@MBeanInfo("The url of the VM connector")
|
||||
String getVMURL();
|
||||
|
||||
@MBeanInfo("The location of the data directory")
|
||||
public String getDataDirectory();
|
||||
|
||||
}
|
||||
|
|
|
@ -16,31 +16,9 @@
|
|||
*/
|
||||
package org.apache.activemq.broker.jmx;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.io.IOException;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.InvalidSelectorException;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.management.openmbean.CompositeDataSupport;
|
||||
import javax.management.openmbean.CompositeType;
|
||||
import javax.management.openmbean.OpenDataException;
|
||||
import javax.management.openmbean.TabularData;
|
||||
import javax.management.openmbean.TabularDataSupport;
|
||||
import javax.management.openmbean.TabularType;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.Queue;
|
||||
import org.apache.activemq.broker.region.Subscription;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
|
@ -51,6 +29,25 @@ import org.apache.activemq.filter.MessageEvaluationContext;
|
|||
import org.apache.activemq.selector.SelectorParser;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.InvalidSelectorException;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import javax.management.MalformedObjectNameException;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.openmbean.CompositeData;
|
||||
import javax.management.openmbean.CompositeDataSupport;
|
||||
import javax.management.openmbean.CompositeType;
|
||||
import javax.management.openmbean.OpenDataException;
|
||||
import javax.management.openmbean.TabularData;
|
||||
import javax.management.openmbean.TabularDataSupport;
|
||||
import javax.management.openmbean.TabularType;
|
||||
|
||||
public class DestinationView implements DestinationViewMBean {
|
||||
private static final Log LOG = LogFactory.getLog(DestinationViewMBean.class);
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
|
||||
/**
|
||||
* @author Filip Hanik
|
||||
* @org.apache.xbean.XBean element="discardingDLQBrokerPlugin"
|
||||
* @version 1.0
|
||||
*/
|
||||
public class DiscardingDLQBrokerPlugin implements BrokerPlugin {
|
||||
|
|
|
@ -28,11 +28,15 @@ import org.apache.commons.logging.LogFactory;
|
|||
*
|
||||
* Useful, if you have set the broker usage policy to process ONLY persistent or ONLY non-persistent
|
||||
* messages.
|
||||
* @org.apache.xbean.XBean element="forcePersistencyModeBrokerPlugin"
|
||||
*/
|
||||
public class ForcePersistencyModeBrokerPlugin implements BrokerPlugin {
|
||||
public static Log log = LogFactory.getLog(ForcePersistencyModeBrokerPlugin.class);
|
||||
private static Log LOG = LogFactory.getLog(ForcePersistencyModeBrokerPlugin.class);
|
||||
private boolean persistenceFlag = false;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*/
|
||||
public ForcePersistencyModeBrokerPlugin() {
|
||||
}
|
||||
|
||||
|
@ -46,7 +50,7 @@ public class ForcePersistencyModeBrokerPlugin implements BrokerPlugin {
|
|||
public Broker installPlugin(Broker broker) throws Exception{
|
||||
ForcePersistencyModeBroker pB = new ForcePersistencyModeBroker(broker);
|
||||
pB.setPersistenceFlag(isPersistenceForced());
|
||||
log.info("Installing ForcePersistencyModeBroker plugin: persistency enforced=" + pB.isPersistent());
|
||||
LOG.info("Installing ForcePersistencyModeBroker plugin: persistency enforced=" + pB.isPersistent());
|
||||
return pB;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,195 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.plugin;
|
||||
|
||||
import org.apache.activemq.advisory.AdvisorySupport;
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.BrokerFilter;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.DestinationStatistics;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQMapMessage;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.command.ProducerId;
|
||||
import org.apache.activemq.command.ProducerInfo;
|
||||
import org.apache.activemq.state.ProducerState;
|
||||
import org.apache.activemq.usage.SystemUsage;
|
||||
import org.apache.activemq.util.IdGenerator;
|
||||
import org.apache.activemq.util.LongSequenceGenerator;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import java.io.File;
|
||||
import java.net.URI;
|
||||
import java.util.Set;
|
||||
/**
|
||||
* A StatisticsBroker You can retrieve a Map Message for a Destination - or
|
||||
* Broker containing statistics as key-value pairs The message must contain a
|
||||
* replyTo Destination - else its ignored
|
||||
*
|
||||
*/
|
||||
public class StatisticsBroker extends BrokerFilter {
|
||||
private static Log LOG = LogFactory.getLog(StatisticsBroker.class);
|
||||
static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
|
||||
static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
|
||||
private static final IdGenerator ID_GENERATOR = new IdGenerator();
|
||||
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
|
||||
protected final ProducerId advisoryProducerId = new ProducerId();
|
||||
|
||||
/**
|
||||
*
|
||||
* Constructor
|
||||
*
|
||||
* @param next
|
||||
*/
|
||||
public StatisticsBroker(Broker next) {
|
||||
super(next);
|
||||
this.advisoryProducerId.setConnectionId(ID_GENERATOR.generateId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the persistence mode
|
||||
*
|
||||
* @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange,
|
||||
* org.apache.activemq.command.Message)
|
||||
*/
|
||||
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
|
||||
ActiveMQDestination msgDest = messageSend.getDestination();
|
||||
ActiveMQDestination replyTo = messageSend.getReplyTo();
|
||||
if (replyTo != null) {
|
||||
String physicalName = msgDest.getPhysicalName();
|
||||
boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0,
|
||||
STATS_DESTINATION_PREFIX.length());
|
||||
boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX
|
||||
.length());
|
||||
if (destStats) {
|
||||
String queueryName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length());
|
||||
ActiveMQDestination queryDest = ActiveMQDestination.createDestination(queueryName,msgDest.getDestinationType());
|
||||
Set<Destination> set = getDestinations(queryDest);
|
||||
for (Destination dest : set) {
|
||||
DestinationStatistics stats = dest.getDestinationStatistics();
|
||||
if (stats != null) {
|
||||
ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
|
||||
statsMessage.setString("destinationName", dest.getActiveMQDestination().toString());
|
||||
statsMessage.setLong("size", stats.getMessages().getCount());
|
||||
statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
|
||||
statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
|
||||
statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
|
||||
statsMessage.setLong("expiredCount", stats.getExpired().getCount());
|
||||
statsMessage.setLong("inflightCount", stats.getInflight().getCount());
|
||||
statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
|
||||
statsMessage.setInt("memoryPercentUsage", dest.getMemoryUsage().getPercentUsage());
|
||||
statsMessage.setLong("memoryUsage", dest.getMemoryUsage().getUsage());
|
||||
statsMessage.setLong("memoryLimit", dest.getMemoryUsage().getLimit());
|
||||
statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
|
||||
statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
|
||||
statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
|
||||
statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
|
||||
statsMessage.setLong("producerCount", stats.getProducers().getCount());
|
||||
sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
|
||||
}
|
||||
}
|
||||
} else if (brokerStats) {
|
||||
ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
|
||||
BrokerService brokerService = getBrokerService();
|
||||
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
|
||||
SystemUsage systemUsage = brokerService.getSystemUsage();
|
||||
DestinationStatistics stats = regionBroker.getDestinationStatistics();
|
||||
statsMessage.setString("brokerName", regionBroker.getBrokerName());
|
||||
statsMessage.setString("brokerId", regionBroker.getBrokerId().toString());
|
||||
statsMessage.setLong("size", stats.getMessages().getCount());
|
||||
statsMessage.setLong("enqueueCount", stats.getEnqueues().getCount());
|
||||
statsMessage.setLong("dequeueCount", stats.getDequeues().getCount());
|
||||
statsMessage.setLong("dispatchCount", stats.getDispatched().getCount());
|
||||
statsMessage.setLong("expiredCount", stats.getExpired().getCount());
|
||||
statsMessage.setLong("inflightCount", stats.getInflight().getCount());
|
||||
statsMessage.setLong("messagesCached", stats.getMessagesCached().getCount());
|
||||
statsMessage.setInt("memoryPercentUsage", systemUsage.getMemoryUsage().getPercentUsage());
|
||||
statsMessage.setLong("memoryUsage", systemUsage.getMemoryUsage().getUsage());
|
||||
statsMessage.setLong("memoryLimit", systemUsage.getMemoryUsage().getLimit());
|
||||
statsMessage.setInt("storePercentUsage", systemUsage.getStoreUsage().getPercentUsage());
|
||||
statsMessage.setLong("storeUsage", systemUsage.getStoreUsage().getUsage());
|
||||
statsMessage.setLong("storeLimit", systemUsage.getStoreUsage().getLimit());
|
||||
statsMessage.setInt("tempPercentUsage", systemUsage.getTempUsage().getPercentUsage());
|
||||
statsMessage.setLong("tempUsage", systemUsage.getTempUsage().getUsage());
|
||||
statsMessage.setLong("tempLimit", systemUsage.getTempUsage().getLimit());
|
||||
statsMessage.setDouble("averageEnqueueTime", stats.getProcessTime().getAverageTime());
|
||||
statsMessage.setDouble("maxEnqueueTime", stats.getProcessTime().getMaxTime());
|
||||
statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
|
||||
statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
|
||||
statsMessage.setLong("producerCount", stats.getProducers().getCount());
|
||||
String answer = brokerService.getTransportConnectorURIsAsMap().get("tcp");
|
||||
answer = answer != null ? answer : "";
|
||||
statsMessage.setString("openwire", answer);
|
||||
answer = brokerService.getTransportConnectorURIsAsMap().get("stomp");
|
||||
answer = answer != null ? answer : "";
|
||||
statsMessage.setString("stomp", answer);
|
||||
answer = brokerService.getTransportConnectorURIsAsMap().get("ssl");
|
||||
answer = answer != null ? answer : "";
|
||||
statsMessage.setString("ssl", answer);
|
||||
answer = brokerService.getTransportConnectorURIsAsMap().get("stomp+ssl");
|
||||
answer = answer != null ? answer : "";
|
||||
statsMessage.setString("stomp+ssl", answer);
|
||||
URI uri = brokerService.getVmConnectorURI();
|
||||
answer = uri != null ? uri.toString() : "";
|
||||
statsMessage.setString("vm", answer);
|
||||
File file = brokerService.getDataDirectoryFile();
|
||||
answer = file != null ? file.getCanonicalPath() : "";
|
||||
statsMessage.setString("dataDirectory", answer);
|
||||
sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
|
||||
} else {
|
||||
super.send(producerExchange, messageSend);
|
||||
}
|
||||
} else {
|
||||
super.send(producerExchange, messageSend);
|
||||
}
|
||||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
super.start();
|
||||
LOG.info("Starting StatisticsBroker");
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
super.stop();
|
||||
}
|
||||
|
||||
protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo)
|
||||
throws Exception {
|
||||
msg.setPersistent(false);
|
||||
msg.setType(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
|
||||
msg.setMessageId(new MessageId(this.advisoryProducerId, this.messageIdGenerator.getNextSequenceId()));
|
||||
msg.setDestination(replyTo);
|
||||
msg.setResponseRequired(false);
|
||||
msg.setProducerId(this.advisoryProducerId);
|
||||
boolean originalFlowControl = context.isProducerFlowControl();
|
||||
final ProducerBrokerExchange producerExchange = new ProducerBrokerExchange();
|
||||
producerExchange.setConnectionContext(context);
|
||||
producerExchange.setMutable(true);
|
||||
producerExchange.setProducerState(new ProducerState(new ProducerInfo()));
|
||||
try {
|
||||
context.setProducerFlowControl(false);
|
||||
this.next.send(producerExchange, msg);
|
||||
} finally {
|
||||
context.setProducerFlowControl(originalFlowControl);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,42 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.plugin;
|
||||
|
||||
import org.apache.activemq.broker.Broker;
|
||||
import org.apache.activemq.broker.BrokerPlugin;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* A StatisticsBrokerPlugin
|
||||
* @org.apache.xbean.XBean element="statisticsBrokerPlugin"
|
||||
*
|
||||
*/
|
||||
public class StatisticsBrokerPlugin implements BrokerPlugin {
|
||||
private static Log LOG = LogFactory.getLog(StatisticsBrokerPlugin.class);
|
||||
/**
|
||||
* @param broker
|
||||
* @return the plug-in
|
||||
* @throws Exception
|
||||
* @see org.apache.activemq.broker.BrokerPlugin#installPlugin(org.apache.activemq.broker.Broker)
|
||||
*/
|
||||
public Broker installPlugin(Broker broker) throws Exception {
|
||||
StatisticsBroker answer = new StatisticsBroker(broker);
|
||||
LOG.info("Installing StaticsBroker");
|
||||
return answer;
|
||||
}
|
||||
}
|
|
@ -267,3 +267,11 @@ vmQueueCursor = org.apache.activemq.broker.region.policy.VMPendingQueueMessageSt
|
|||
|
||||
xaConnectionFactory = org.apache.activemq.spring.ActiveMQXAConnectionFactory
|
||||
|
||||
statisticsBrokerPlugin = org.apache.activemq.plugin.StatisticsBrokerPlugin
|
||||
|
||||
forcePersistencyModeBrokerPlugin = org.apache.activemq.plugin.ForcePersistencyModeBrokerPlugin
|
||||
|
||||
discardingDLQBrokerPlugin = org.apache.activemq.plugin.DiscardingDLQBrokerPlugin
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -569,7 +569,7 @@ other brokers in a federated network
|
|||
<xs:element name='plugins' minOccurs='0' maxOccurs='1'>
|
||||
<xs:annotation>
|
||||
<xs:documentation><![CDATA[
|
||||
Sets a number of broker plugins to install such as for security
|
||||
Sets a number of broker plugins to install to extend the Broker functionality - such as for security
|
||||
authentication or authorization
|
||||
]]></xs:documentation>
|
||||
</xs:annotation>
|
||||
|
@ -586,6 +586,9 @@ authentication or authorization
|
|||
<xs:element ref='tns:timeStampingBrokerPlugin'/>
|
||||
<xs:element ref='tns:udpTraceBrokerPlugin'/>
|
||||
<xs:element ref='tns:traceBrokerPathPlugin'/>
|
||||
<xs:element ref='tns:statisticsBrokerPlugin'/>
|
||||
<xs:element ref='tns:discardingDLQBrokerPlugin'/>
|
||||
<xs:element ref='tns:forcePersistencyModeBrokerPlugin'/>
|
||||
<xs:any namespace='##other'/>
|
||||
</xs:choice>
|
||||
</xs:complexType>
|
||||
|
@ -617,6 +620,9 @@ other brokers in a federated network
|
|||
<xs:element ref='tns:timeStampingBrokerPlugin'/>
|
||||
<xs:element ref='tns:udpTraceBrokerPlugin'/>
|
||||
<xs:element ref='tns:traceBrokerPathPlugin'/>
|
||||
<xs:element ref='tns:statisticsBrokerPlugin'/>
|
||||
<xs:element ref='tns:discardingDLQBrokerPlugin'/>
|
||||
<xs:element ref='tns:forcePersistencyModeBrokerPlugin'/>
|
||||
<xs:any namespace='##other'/>
|
||||
</xs:choice>
|
||||
</xs:complexType>
|
||||
|
@ -5889,5 +5895,42 @@ warning if the user forgets. To disable the warning just set the value to <
|
|||
</xs:complexType>
|
||||
</xs:element>
|
||||
|
||||
<!-- element for type: org.apache.activemq.plugin.DiscardingDLQBrokerPlugin -->
|
||||
<xs:element name='discardingDLQBrokerPlugin'>
|
||||
<xs:annotation>
|
||||
<xs:documentation><![CDATA[
|
||||
Filter Plugin for DLQ
|
||||
]]></xs:documentation>
|
||||
</xs:annotation>
|
||||
<xs:complexType>
|
||||
<xs:attribute name='dropTemporaryTopics' type='xs:boolean'/>
|
||||
<xs:attribute name='dropTemporaryQueues' type='xs:boolean'/>
|
||||
<xs:attribute name='dropAll' type='xs:boolean'/>
|
||||
<xs:attribute name='dropOnly' type='xs:string'/>
|
||||
<xs:attribute name='reportInterval' type='xs:integer'/>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
|
||||
<!-- element for type: org.apache.activemq.plugin.ForcePersistencyModeBrokerPlugin -->
|
||||
<xs:element name='forcePersistencyModeBrokerPlugin'>
|
||||
<xs:annotation>
|
||||
<xs:documentation><![CDATA[
|
||||
A helper plugin to force a mode of persistence on messages
|
||||
]]></xs:documentation>
|
||||
</xs:annotation>
|
||||
<xs:complexType>
|
||||
<xs:attribute name='persistenceFlag' type='xs:boolean'/>
|
||||
</xs:complexType>
|
||||
</xs:element>
|
||||
|
||||
<!-- element for type: org.apache.activemq.plugin.StatisticsBrokerPlugin -->
|
||||
<xs:element name='statisticsBrokerPlugin'>
|
||||
<xs:annotation>
|
||||
<xs:documentation><![CDATA[
|
||||
Enable statistic messages
|
||||
]]></xs:documentation>
|
||||
</xs:annotation>
|
||||
</xs:element>
|
||||
|
||||
|
||||
</xs:schema>
|
||||
|
|
|
@ -0,0 +1,128 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.plugin;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
import org.apache.activemq.broker.BrokerPlugin;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import java.net.URI;
|
||||
import java.util.Enumeration;
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.MapMessage;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import junit.framework.TestCase;
|
||||
|
||||
/**
|
||||
* A BrokerStatisticsPluginTest
|
||||
* A testcase for https://issues.apache.org/activemq/browse/AMQ-2379
|
||||
*
|
||||
*/
|
||||
public class BrokerStatisticsPluginTest extends TestCase{
|
||||
private static final Log LOG = LogFactory.getLog(BrokerStatisticsPluginTest.class);
|
||||
|
||||
private Connection connection;
|
||||
private BrokerService broker;
|
||||
|
||||
public void testBrokerStats() throws Exception{
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue replyTo = session.createTemporaryQueue();
|
||||
MessageConsumer consumer = session.createConsumer(replyTo);
|
||||
Queue query = session.createQueue(StatisticsBroker.STATS_BROKER_PREFIX);
|
||||
MessageProducer producer = session.createProducer(query);
|
||||
Message msg = session.createMessage();
|
||||
msg.setJMSReplyTo(replyTo);
|
||||
producer.send(msg);
|
||||
MapMessage reply = (MapMessage) consumer.receive(10*1000);
|
||||
assertNotNull(reply);
|
||||
assertTrue(reply.getMapNames().hasMoreElements());
|
||||
/*
|
||||
for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
|
||||
String name = e.nextElement().toString();
|
||||
System.err.println(name+"="+reply.getObject(name));
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
}
|
||||
|
||||
public void testDestinationStats() throws Exception{
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue replyTo = session.createTemporaryQueue();
|
||||
MessageConsumer consumer = session.createConsumer(replyTo);
|
||||
Queue testQueue = session.createQueue("Test.Queue");
|
||||
MessageProducer producer = session.createProducer(null);
|
||||
Queue query = session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + testQueue.getQueueName());
|
||||
Message msg = session.createMessage();
|
||||
|
||||
producer.send(testQueue,msg);
|
||||
|
||||
msg.setJMSReplyTo(replyTo);
|
||||
producer.send(query,msg);
|
||||
MapMessage reply = (MapMessage) consumer.receive();
|
||||
assertNotNull(reply);
|
||||
assertTrue(reply.getMapNames().hasMoreElements());
|
||||
/*
|
||||
for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
|
||||
String name = e.nextElement().toString();
|
||||
System.err.println(name+"="+reply.getObject(name));
|
||||
}
|
||||
*/
|
||||
|
||||
|
||||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
broker = createBroker();
|
||||
ConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectorURIsAsMap().get("tcp"));
|
||||
connection = factory.createConnection();
|
||||
connection.start();
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception{
|
||||
if (this.connection != null) {
|
||||
this.connection.close();
|
||||
}
|
||||
if (this.broker!=null) {
|
||||
this.broker.stop();
|
||||
}
|
||||
}
|
||||
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
//return createBroker("org/apache/activemq/plugin/statistics-plugin-broker.xml");
|
||||
BrokerService answer = new BrokerService();
|
||||
BrokerPlugin[] plugins = new BrokerPlugin[1];
|
||||
plugins[0] = new StatisticsBrokerPlugin();
|
||||
answer.setPlugins(plugins);
|
||||
answer.setDeleteAllMessagesOnStartup(true);
|
||||
answer.addConnector("tcp://localhost:0");
|
||||
answer.start();
|
||||
return answer;
|
||||
}
|
||||
|
||||
protected BrokerService createBroker(String uri) throws Exception {
|
||||
LOG.info("Loading broker configuration from the classpath with URI: " + uri);
|
||||
return BrokerFactory.createBroker(new URI("xbean:" + uri));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<!-- this file can only be parsed using the xbean-spring library -->
|
||||
<!-- START SNIPPET: example -->
|
||||
<beans>
|
||||
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
||||
|
||||
<broker useJmx="true" xmlns="http://activemq.apache.org/schema/core" >
|
||||
|
||||
<transportConnectors>
|
||||
<transportConnector uri="tcp://localhost:0"/>
|
||||
</transportConnectors>
|
||||
<plugins>
|
||||
<statisticsBrokerPlugin/>
|
||||
</plugins>
|
||||
|
||||
|
||||
</broker>
|
||||
|
||||
</beans>
|
Loading…
Reference in New Issue