To support the bean renaming effort this pulls together the various code that create all our MBean names and puts them into one location which could later add supporting methods to create MBean queries for the unit tests etc.  This should make it simpler to iron out the naming as we go.  

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1430908 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-01-09 15:48:15 +00:00
parent 881c1b7005
commit 63a660ac04
9 changed files with 368 additions and 144 deletions

View File

@ -55,6 +55,7 @@ import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerMBeanSuppurt;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.ConnectorView;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
@ -109,7 +110,6 @@ import org.apache.activemq.util.IOExceptionHandler;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.InetAddressUtil;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ThreadPoolUtils;
import org.apache.activemq.util.TimeUtils;
@ -1992,9 +1992,7 @@ public class BrokerService implements Service {
}
private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
String objectNameStr = getBrokerObjectName().toString();
objectNameStr += ",connector=clientConnectors,connectorName="+ JMXSupport.encodeObjectNamePart(connector.getName());
return new ObjectName(objectNameStr);
return BrokerMBeanSuppurt.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName());
}
protected void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
@ -2009,15 +2007,11 @@ public class BrokerService implements Service {
}
protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
String objectNameStr = getBrokerObjectName().toString();
objectNameStr += ",connector=networkConnectors,networkConnectorName="+ JMXSupport.encodeObjectNamePart(connector.getName());
return new ObjectName(objectNameStr);
return BrokerMBeanSuppurt.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName());
}
public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException {
String objectNameStr = getBrokerObjectName().toString();
objectNameStr += ",connector=duplexNetworkConnectors,networkConnectorName="+ JMXSupport.encodeObjectNamePart(transport);
return new ObjectName(objectNameStr);
return BrokerMBeanSuppurt.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport.toString());
}
protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
@ -2034,9 +2028,7 @@ public class BrokerService implements Service {
protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
ProxyConnectorView view = new ProxyConnectorView(connector);
try {
String objectNameStr = getBrokerObjectName().toString();
objectNameStr += ",connector=proxyConnectors,proxyConnectorName="+ JMXSupport.encodeObjectNamePart(connector.getName());
ObjectName objectName = new ObjectName(objectNameStr);
ObjectName objectName = BrokerMBeanSuppurt.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName());
AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
} catch (Throwable e) {
throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
@ -2046,9 +2038,7 @@ public class BrokerService implements Service {
protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
JmsConnectorView view = new JmsConnectorView(connector);
try {
String objectNameStr = getBrokerObjectName().toString();
objectNameStr += ",connector=jmsConnectors,JmsConnectors="+ JMXSupport.encodeObjectNamePart(connector.getName());
ObjectName objectName = new ObjectName(objectNameStr);
ObjectName objectName = BrokerMBeanSuppurt.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName());
AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
} catch (Throwable e) {
throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
@ -2167,9 +2157,7 @@ public class BrokerService implements Service {
if (isUseJmx()) {
JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
try {
String objectNameStr = getBrokerObjectName().toString();
objectNameStr += ",service=JobScheduler,name=JMS";
ObjectName objectName = new ObjectName(objectNameStr);
ObjectName objectName = BrokerMBeanSuppurt.createJobSchedulerServiceName(getBrokerObjectName());
AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
this.adminView.setJMSJobScheduler(objectName);
} catch (Throwable e) {
@ -2182,9 +2170,7 @@ public class BrokerService implements Service {
if (isUseJmx()) {
HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker());
try {
String objectNameStr = getBrokerObjectName().toString();
objectNameStr += ",service=Health";
ObjectName objectName = new ObjectName(objectNameStr);
ObjectName objectName = BrokerMBeanSuppurt.createHealthServiceName(getBrokerObjectName());
AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName);
} catch (Throwable e) {
throw IOExceptionSupport.create("Status MBean could not be registered in JMX: "
@ -2235,9 +2221,7 @@ public class BrokerService implements Service {
}
protected ObjectName createBrokerObjectName() throws MalformedObjectNameException {
String objectNameStr = getManagementContext().getJmxDomainName() + ":type=Broker,brokerName=";
objectNameStr += JMXSupport.encodeObjectNamePart(getBrokerName());
return new ObjectName(objectNameStr);
return BrokerMBeanSuppurt.createBrokerObjectName(getManagementContext().getJmxDomainName(), getBrokerName());
}
protected TransportConnector createTransportConnector(URI brokerURI) throws Exception {

View File

@ -0,0 +1,252 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import java.util.Hashtable;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.broker.region.policy.AbortSlowConsumerStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.transaction.XATransaction;
import org.apache.activemq.util.JMXSupport;
public class BrokerMBeanSuppurt {
// MBean Name Creation
public static ObjectName createBrokerObjectName(String jmxDomainName, String brokerName) throws MalformedObjectNameException {
String objectNameStr = jmxDomainName + ":type=Broker,brokerName=";
objectNameStr += JMXSupport.encodeObjectNamePart(brokerName);
return new ObjectName(objectNameStr);
}
public static ObjectName createDestinationName(ObjectName brokerObjectName, ActiveMQDestination destination) throws MalformedObjectNameException {
return createDestinationName(brokerObjectName.toString(), destination);
}
public static ObjectName createDestinationName(String brokerObjectName, ActiveMQDestination destination) throws MalformedObjectNameException {
String objectNameStr = brokerObjectName;
objectNameStr += createDestinationProperties(destination);
return new ObjectName(objectNameStr);
}
public static ObjectName createDestinationName(String brokerObjectName, String type, String name) throws MalformedObjectNameException {
String objectNameStr = brokerObjectName;
objectNameStr += createDestinationProperties(type, name);
return new ObjectName(objectNameStr);
}
private static String createDestinationProperties(ActiveMQDestination destination){
String result = "";
if (destination != null){
result = createDestinationProperties(destination.getDestinationTypeAsString(), destination.getPhysicalName());
}
return result;
}
private static String createDestinationProperties(String type, String name){
return ",destinationType="+ JMXSupport.encodeObjectNamePart(type) +
",destinationName=" + JMXSupport.encodeObjectNamePart(name);
}
public static ObjectName createSubscriptionName(ObjectName brokerObjectName, String connectionClientId, ConsumerInfo info) throws MalformedObjectNameException {
return createSubscriptionName(brokerObjectName.toString(), connectionClientId, info);
}
public static ObjectName createSubscriptionName(String brokerObjectName, String connectionClientId, ConsumerInfo info) throws MalformedObjectNameException {
String objectNameStr = brokerObjectName;
objectNameStr += createDestinationProperties(info.getDestination()) + ",endpoint=Consumer";
objectNameStr += ",clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
objectNameStr += ",consumerId=";
if (info.isDurable()){
objectNameStr += "Durable(" + JMXSupport.encodeObjectNamePart(connectionClientId + ":" + info.getSubscriptionName()) +")";
} else {
objectNameStr += JMXSupport.encodeObjectNamePart(info.getConsumerId().toString());
}
return new ObjectName(objectNameStr);
}
public static ObjectName createProducerName(ObjectName brokerObjectName, String connectionClientId, ProducerInfo info) throws MalformedObjectNameException {
return createProducerName(brokerObjectName.toString(), connectionClientId, info);
}
public static ObjectName createProducerName(String brokerObjectName, String connectionClientId, ProducerInfo producerInfo) throws MalformedObjectNameException {
String objectNameStr = brokerObjectName;
if (producerInfo.getDestination() == null) {
objectNameStr += ",endpoint=dynamicProducer";
} else {
objectNameStr += createDestinationProperties(producerInfo.getDestination()) + ",endpoint=Producer";
}
objectNameStr += ",clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
objectNameStr += ",producerId=" + JMXSupport.encodeObjectNamePart(producerInfo.getProducerId().toString());
return new ObjectName(objectNameStr);
}
public static ObjectName createXATransactionName(ObjectName brokerObjectName, XATransaction transaction) throws MalformedObjectNameException {
return createXATransactionName(brokerObjectName.toString(), transaction);
}
public static ObjectName createXATransactionName(String brokerObjectName, XATransaction transaction) throws MalformedObjectNameException {
String objectNameStr = brokerObjectName;
objectNameStr += "," + "transactionType=RecoveredXaTransaction";
objectNameStr += "," + "Xid=" + JMXSupport.encodeObjectNamePart(transaction.getTransactionId().toString());
return new ObjectName(objectNameStr);
}
public static ObjectName createAbortSlowConsumerStrategyName(ObjectName brokerObjectName, AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
return createAbortSlowConsumerStrategyName(brokerObjectName.toString(), strategy);
}
public static ObjectName createAbortSlowConsumerStrategyName(String brokerObjectName, AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
String objectNameStr = brokerObjectName;
objectNameStr += ",Service=SlowConsumerStrategy,InstanceName="+ JMXSupport.encodeObjectNamePart(strategy.getName());
ObjectName objectName = new ObjectName(objectNameStr);
return objectName;
}
public static ObjectName createConnectorName(ObjectName brokerObjectName, String type, String name) throws MalformedObjectNameException {
return createConnectorName(brokerObjectName.toString(), type, name);
}
public static ObjectName createConnectorName(String brokerObjectName, String type, String name) throws MalformedObjectNameException {
String objectNameStr = brokerObjectName;
objectNameStr += ",connector=" + type + ",connectorName="+ JMXSupport.encodeObjectNamePart(name);
ObjectName objectName = new ObjectName(objectNameStr);
return objectName;
}
public static ObjectName createNetworkConnectorName(ObjectName brokerObjectName, String type, String name) throws MalformedObjectNameException {
return createNetworkConnectorName(brokerObjectName.toString(), type, name);
}
public static ObjectName createNetworkConnectorName(String brokerObjectName, String type, String name) throws MalformedObjectNameException {
String objectNameStr = brokerObjectName;
objectNameStr += ",connector=" + type + ",networkConnectorName="+ JMXSupport.encodeObjectNamePart(name);
ObjectName objectName = new ObjectName(objectNameStr);
return objectName;
}
public static ObjectName createConnectionViewByAddressName(ObjectName connectorName, String type, String address) throws MalformedObjectNameException {
String objectNameStr = connectorName.toString();
objectNameStr += ",connectionViewType=" + JMXSupport.encodeObjectNamePart(type);
objectNameStr += ",connectionName="+ JMXSupport.encodeObjectNamePart(address);
return new ObjectName(objectNameStr);
}
public static ObjectName createConnectionViewByClientIdName(ObjectName connectorName, String clientId) throws MalformedObjectNameException {
String objectNameStr = connectorName.toString();
objectNameStr += ",connectionName="+JMXSupport.encodeObjectNamePart(clientId);
return new ObjectName(objectNameStr);
}
public static ObjectName createNetworkBridgeObjectName(ObjectName connectorName, String remoteAddress) throws MalformedObjectNameException {
Hashtable<String, String> map = new Hashtable<String, String>(connectorName.getKeyPropertyList());
map.put("networkBridge", JMXSupport.encodeObjectNamePart(remoteAddress));
return new ObjectName(connectorName.getDomain(), map);
}
public static ObjectName createProxyConnectorName(ObjectName brokerObjectName, String type, String name) throws MalformedObjectNameException {
return createProxyConnectorName(brokerObjectName.toString(), type, name);
}
public static ObjectName createProxyConnectorName(String brokerObjectName, String type, String name) throws MalformedObjectNameException {
String objectNameStr = brokerObjectName;
objectNameStr += ",connector=" + type + ",proxyConnectorName="+ JMXSupport.encodeObjectNamePart(name);
ObjectName objectName = new ObjectName(objectNameStr);
return objectName;
}
public static ObjectName createJmsConnectorName(ObjectName brokerObjectName, String type, String name) throws MalformedObjectNameException {
return createJmsConnectorName(brokerObjectName.toString(), type, name);
}
public static ObjectName createJmsConnectorName(String brokerObjectName, String type, String name) throws MalformedObjectNameException {
String objectNameStr = brokerObjectName;
objectNameStr += ",connector=" + type + ",JmsConnectors="+ JMXSupport.encodeObjectNamePart(name);
ObjectName objectName = new ObjectName(objectNameStr);
return objectName;
}
public static ObjectName createJobSchedulerServiceName(ObjectName brokerObjectName) throws MalformedObjectNameException {
return createJobSchedulerServiceName(brokerObjectName.toString());
}
public static ObjectName createJobSchedulerServiceName(String brokerObjectName) throws MalformedObjectNameException {
String objectNameStr = brokerObjectName;
objectNameStr += ",service=JobScheduler,name=JMS";
ObjectName objectName = new ObjectName(objectNameStr);
return objectName;
}
public static ObjectName createHealthServiceName(ObjectName brokerObjectName) throws MalformedObjectNameException {
return createHealthServiceName(brokerObjectName.toString());
}
public static ObjectName createHealthServiceName(String brokerObjectName) throws MalformedObjectNameException {
String objectNameStr = brokerObjectName;
objectNameStr += ",service=Health";
ObjectName objectName = new ObjectName(objectNameStr);
return objectName;
}
// MBean Query Creation
public static ObjectName createConnectionQuery(String jmxDomainName, String brokerName, String clientId) throws MalformedObjectNameException {
return new ObjectName(jmxDomainName + ":type=Broker,brokerName="
+ JMXSupport.encodeObjectNamePart(brokerName) + ","
+ "connector=*," + "connectorName=*,"
+ "connectionName=" + JMXSupport.encodeObjectNamePart(clientId));
}
public static ObjectName createConsumerQueury(String jmxDomainName, String clientId) throws MalformedObjectNameException {
return createConsumerQueury(jmxDomainName, null, clientId);
}
public static ObjectName createConsumerQueury(String jmxDomainName, String brokerName, String clientId) throws MalformedObjectNameException {
return new ObjectName(jmxDomainName + ":type=Broker,brokerName="
+ (brokerName != null ? brokerName : "*") + ","
+ "destinationType=*,destinationName=*,"
+ "endpoint=Consumer,"
+ "clientId=" + JMXSupport.encodeObjectNamePart(clientId) + ","
+ "consumerId=*");
}
public static ObjectName createProducerQueury(String jmxDomainName, String clientId) throws MalformedObjectNameException {
return createProducerQueury(jmxDomainName, null, clientId);
}
public static ObjectName createProducerQueury(String jmxDomainName, String brokerName, String clientId) throws MalformedObjectNameException {
return new ObjectName(jmxDomainName + ":type=Broker,brokerName="
+ (brokerName != null ? brokerName : "*") + ","
+ "destinationType=*,destinationName=*,"
+ "endpoint=Producer,"
+ "clientId=" + JMXSupport.encodeObjectNamePart(clientId) + ","
+ "producerId=*");
}
}

View File

@ -23,7 +23,6 @@ import javax.management.ObjectName;
import org.apache.activemq.broker.Connection;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.JMXSupport;
public class ConnectionView implements ConnectionViewMBean {
@ -149,11 +148,7 @@ public class ConnectionView implements ConnectionViewMBean {
private ObjectName createConsumerQueury(String clientId) throws IOException {
try {
return new ObjectName(managementContext.getJmxDomainName() + ":type=Broker,brokerName=*,"
+ "destinationType=*,destinationName=*,"
+ "endpoint=Consumer,"
+ "clientId=" + JMXSupport.encodeObjectNamePart(clientId) + ","
+ "consumerId=*");
return BrokerMBeanSuppurt.createConsumerQueury(managementContext.getJmxDomainName(), clientId);
} catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
@ -161,11 +156,7 @@ public class ConnectionView implements ConnectionViewMBean {
private ObjectName createProducerQueury(String clientId) throws IOException {
try {
return new ObjectName(managementContext.getJmxDomainName() + ":type=Broker,brokerName=*,"
+ "destinationType=*,destinationName=*,"
+ "endpoint=Producer,"
+ "clientId=" + JMXSupport.encodeObjectNamePart(clientId) + ","
+ "producerId=*");
return BrokerMBeanSuppurt.createProducerQueury(managementContext.getJmxDomainName(), clientId);
} catch (Throwable e) {
throw IOExceptionSupport.create(e);
}

View File

@ -69,38 +69,47 @@ public class DestinationView implements DestinationViewMBean {
destination.gc();
}
@Override
public String getName() {
return destination.getName();
}
@Override
public void resetStatistics() {
destination.getDestinationStatistics().reset();
}
@Override
public long getEnqueueCount() {
return destination.getDestinationStatistics().getEnqueues().getCount();
}
@Override
public long getDequeueCount() {
return destination.getDestinationStatistics().getDequeues().getCount();
}
@Override
public long getDispatchCount() {
return destination.getDestinationStatistics().getDispatched().getCount();
}
@Override
public long getInFlightCount() {
return destination.getDestinationStatistics().getInflight().getCount();
}
@Override
public long getExpiredCount() {
return destination.getDestinationStatistics().getExpired().getCount();
}
@Override
public long getConsumerCount() {
return destination.getDestinationStatistics().getConsumers().getCount();
}
@Override
public long getQueueSize() {
return destination.getDestinationStatistics().getMessages().getCount();
}
@ -109,38 +118,47 @@ public class DestinationView implements DestinationViewMBean {
return destination.getDestinationStatistics().getMessagesCached().getCount();
}
@Override
public int getMemoryPercentUsage() {
return destination.getMemoryUsage().getPercentUsage();
}
@Override
public long getMemoryUsageByteCount() {
return destination.getMemoryUsage().getUsage();
}
@Override
public long getMemoryLimit() {
return destination.getMemoryUsage().getLimit();
}
@Override
public void setMemoryLimit(long limit) {
destination.getMemoryUsage().setLimit(limit);
}
@Override
public double getAverageEnqueueTime() {
return destination.getDestinationStatistics().getProcessTime().getAverageTime();
}
@Override
public long getMaxEnqueueTime() {
return destination.getDestinationStatistics().getProcessTime().getMaxTime();
}
@Override
public long getMinEnqueueTime() {
return destination.getDestinationStatistics().getProcessTime().getMinTime();
}
@Override
public boolean isPrioritizedMessages() {
return destination.isPrioritizedMessages();
}
@Override
public CompositeData[] browse() throws OpenDataException {
try {
return browse(null);
@ -150,6 +168,7 @@ public class DestinationView implements DestinationViewMBean {
}
}
@Override
public CompositeData[] browse(String selector) throws OpenDataException, InvalidSelectorException {
Message[] messages = destination.browse();
ArrayList<CompositeData> c = new ArrayList<CompositeData>();
@ -187,6 +206,7 @@ public class DestinationView implements DestinationViewMBean {
/**
* Browses the current destination returning a list of messages
*/
@Override
public List<Object> browseMessages() throws InvalidSelectorException {
return browseMessages(null);
}
@ -195,6 +215,7 @@ public class DestinationView implements DestinationViewMBean {
* Browses the current destination with the given selector returning a list
* of messages
*/
@Override
public List<Object> browseMessages(String selector) throws InvalidSelectorException {
Message[] messages = destination.browse();
ArrayList<Object> answer = new ArrayList<Object>();
@ -223,6 +244,7 @@ public class DestinationView implements DestinationViewMBean {
return answer;
}
@Override
public TabularData browseAsTable() throws OpenDataException {
try {
return browseAsTable(null);
@ -231,6 +253,7 @@ public class DestinationView implements DestinationViewMBean {
}
}
@Override
public TabularData browseAsTable(String selector) throws OpenDataException, InvalidSelectorException {
OpenTypeFactory factory = OpenTypeSupport.getFactory(ActiveMQMessage.class);
Message[] messages = destination.browse();
@ -260,18 +283,22 @@ public class DestinationView implements DestinationViewMBean {
return rc;
}
@Override
public String sendTextMessage(String body) throws Exception {
return sendTextMessage(Collections.EMPTY_MAP, body);
}
@Override
public String sendTextMessage(Map headers, String body) throws Exception {
return sendTextMessage(headers, body, null, null);
}
@Override
public String sendTextMessage(String body, String user, String password) throws Exception {
return sendTextMessage(Collections.EMPTY_MAP, body, user, password);
}
@Override
public String sendTextMessage(Map headers, String body, String userName, String password) throws Exception {
String brokerUrl = "vm://" + broker.getBrokerName();
@ -305,10 +332,12 @@ public class DestinationView implements DestinationViewMBean {
}
@Override
public int getMaxAuditDepth() {
return destination.getMaxAuditDepth();
}
@Override
public int getMaxProducersToAudit() {
return destination.getMaxProducersToAudit();
}
@ -321,38 +350,47 @@ public class DestinationView implements DestinationViewMBean {
destination.setEnableAudit(enableAudit);
}
@Override
public void setMaxAuditDepth(int maxAuditDepth) {
destination.setMaxAuditDepth(maxAuditDepth);
}
@Override
public void setMaxProducersToAudit(int maxProducersToAudit) {
destination.setMaxProducersToAudit(maxProducersToAudit);
}
@Override
public float getMemoryUsagePortion() {
return destination.getMemoryUsage().getUsagePortion();
}
@Override
public long getProducerCount() {
return destination.getDestinationStatistics().getProducers().getCount();
}
@Override
public boolean isProducerFlowControl() {
return destination.isProducerFlowControl();
}
@Override
public void setMemoryUsagePortion(float value) {
destination.getMemoryUsage().setUsagePortion(value);
}
@Override
public void setProducerFlowControl(boolean producerFlowControl) {
destination.setProducerFlowControl(producerFlowControl);
}
@Override
public boolean isAlwaysRetroactive() {
return destination.isAlwaysRetroactive();
}
@Override
public void setAlwaysRetroactive(boolean alwaysRetroactive) {
destination.setAlwaysRetroactive(alwaysRetroactive);
}
@ -365,6 +403,7 @@ public class DestinationView implements DestinationViewMBean {
* @param blockedProducerWarningInterval the interval at which warning about
* blocked producers will be triggered.
*/
@Override
public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
destination.setBlockedProducerWarningInterval(blockedProducerWarningInterval);
}
@ -374,39 +413,45 @@ public class DestinationView implements DestinationViewMBean {
* @return the interval at which warning about blocked producers will be
* triggered.
*/
@Override
public long getBlockedProducerWarningInterval() {
return destination.getBlockedProducerWarningInterval();
}
@Override
public int getMaxPageSize() {
return destination.getMaxPageSize();
}
@Override
public void setMaxPageSize(int pageSize) {
destination.setMaxPageSize(pageSize);
}
@Override
public boolean isUseCache() {
return destination.isUseCache();
}
@Override
public void setUseCache(boolean value) {
destination.setUseCache(value);
}
@Override
public ObjectName[] getSubscriptions() throws IOException, MalformedObjectNameException {
List<Subscription> subscriptions = destination.getConsumers();
ObjectName[] answer = new ObjectName[subscriptions.size()];
ObjectName objectName = broker.getBrokerService().getBrokerObjectName();
ObjectName brokerObjectName = broker.getBrokerService().getBrokerObjectName();
int index = 0;
for (Subscription subscription : subscriptions) {
String connectionClientId = subscription.getContext().getClientId();
String objectNameStr = ManagedRegionBroker.getSubscriptionObjectName(subscription.getConsumerInfo(), connectionClientId, objectName);
answer[index++] = new ObjectName(objectNameStr);
answer[index++] = BrokerMBeanSuppurt.createSubscriptionName(brokerObjectName, connectionClientId, subscription.getConsumerInfo());
}
return answer;
}
@Override
public ObjectName getSlowConsumerStrategy() throws IOException, MalformedObjectNameException {
ObjectName result = null;
SlowConsumerStrategy strategy = destination.getSlowConsumerStrategy();
@ -416,6 +461,7 @@ public class DestinationView implements DestinationViewMBean {
return result;
}
@Override
public String getOptions() {
Map<String, String> options = destination.getActiveMQDestination().getOptions();
String optionsString = "";

View File

@ -72,7 +72,6 @@ import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transaction.XATransaction;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.SubscriptionKey;
import org.slf4j.Logger;
@ -162,7 +161,7 @@ public class ManagedRegionBroker extends RegionBroker {
public void register(ActiveMQDestination destName, Destination destination) {
// TODO refactor to allow views for custom destinations
try {
ObjectName objectName = createObjectName(destName);
ObjectName objectName = BrokerMBeanSuppurt.createDestinationName(brokerObjectName, destName);
DestinationView view;
if (destination instanceof Queue) {
view = new QueueView(this, (Queue)destination);
@ -182,7 +181,7 @@ public class ManagedRegionBroker extends RegionBroker {
public void unregister(ActiveMQDestination destName) {
try {
ObjectName objectName = createObjectName(destName);
ObjectName objectName = BrokerMBeanSuppurt.createDestinationName(brokerObjectName, destName);
unregisterDestination(objectName);
} catch (Exception e) {
LOG.error("Failed to unregister " + destName, e);
@ -191,11 +190,10 @@ public class ManagedRegionBroker extends RegionBroker {
public ObjectName registerSubscription(ConnectionContext context, Subscription sub) {
String connectionClientId = context.getClientId();
ObjectName brokerJmxObjectName = brokerObjectName;
String objectNameStr = getSubscriptionObjectName(sub.getConsumerInfo(), connectionClientId, brokerJmxObjectName);
SubscriptionKey key = new SubscriptionKey(context.getClientId(), sub.getConsumerInfo().getSubscriptionName());
try {
ObjectName objectName = new ObjectName(objectNameStr);
ObjectName objectName = BrokerMBeanSuppurt.createSubscriptionName(brokerObjectName, connectionClientId, sub.getConsumerInfo());
SubscriptionView view;
if (sub.getConsumerInfo().getConsumerId().getConnectionId().equals("OFFLINE")) {
// add offline subscribers to inactive list
@ -226,19 +224,6 @@ public class ManagedRegionBroker extends RegionBroker {
}
}
public static String getSubscriptionObjectName(ConsumerInfo info, String connectionClientId, ObjectName brokerJmxObjectName) {
String objectNameStr = brokerJmxObjectName.toString();
objectNameStr += getDestinationType(info.getDestination()) + ",endpoint=Consumer";
objectNameStr += ",clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
objectNameStr += ",consumerId=";
if (info.isDurable()){
objectNameStr += "Durable(" + JMXSupport.encodeObjectNamePart(connectionClientId + ":" + info.getSubscriptionName()) +")";
} else {
objectNameStr += JMXSupport.encodeObjectNamePart(info.getConsumerId().toString());
}
return objectNameStr;
}
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
Subscription sub = super.addConsumer(context, info);
@ -263,11 +248,10 @@ public class ManagedRegionBroker extends RegionBroker {
}
@Override
public void addProducer(ConnectionContext context, ProducerInfo info)
throws Exception {
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
super.addProducer(context, info);
String connectionClientId = context.getClientId();
ObjectName objectName = createObjectName(info, connectionClientId);
ObjectName objectName = BrokerMBeanSuppurt.createProducerName(brokerObjectName, context.getClientId(), info);
String userName = brokerService.isPopulateUserNameInMBeans() ? context.getUserName() : null;
ProducerView view = new ProducerView(info, connectionClientId, userName, this);
registerProducer(objectName, info.getDestination(), view);
@ -275,7 +259,7 @@ public class ManagedRegionBroker extends RegionBroker {
@Override
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
ObjectName objectName = createObjectName(info, context.getClientId());
ObjectName objectName = BrokerMBeanSuppurt.createProducerName(brokerObjectName, context.getClientId(), info);
unregisterProducer(objectName);
super.removeProducer(context, info);
}
@ -285,7 +269,7 @@ public class ManagedRegionBroker extends RegionBroker {
if (exchange != null && exchange.getProducerState() != null && exchange.getProducerState().getInfo() != null) {
ProducerInfo info = exchange.getProducerState().getInfo();
if (info.getDestination() == null && info.getProducerId() != null) {
ObjectName objectName = createObjectName(info, exchange.getConnectionContext().getClientId());
ObjectName objectName = BrokerMBeanSuppurt.createProducerName(brokerObjectName, exchange.getConnectionContext().getClientId(), info);
ProducerView view = this.dynamicDestinationProducers.get(objectName);
if (view != null) {
ActiveMQDestination dest = message.getDestination();
@ -524,7 +508,7 @@ public class ManagedRegionBroker extends RegionBroker {
protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info, Subscription subscription) {
try {
ConsumerInfo offlineConsumerInfo = subscription != null ? subscription.getConsumerInfo() : ((TopicRegion)getTopicRegion()).createInactiveConsumerInfo(info);
ObjectName objectName = new ObjectName(getSubscriptionObjectName(offlineConsumerInfo, info.getClientId(), brokerObjectName));
ObjectName objectName = BrokerMBeanSuppurt.createSubscriptionName(brokerObjectName, info.getClientId(), offlineConsumerInfo);
SubscriptionView view = new InactiveDurableSubscriptionView(this, key.getClientId(), info, subscription);
try {
@ -689,39 +673,10 @@ public class ManagedRegionBroker extends RegionBroker {
this.contextBroker = contextBroker;
}
protected ObjectName createObjectName(ActiveMQDestination destination) throws MalformedObjectNameException {
// Build the object name for the destination
String objectNameStr = brokerObjectName.toString();
objectNameStr += getDestinationType(destination);
return new ObjectName(objectNameStr);
}
protected static String getDestinationType(ActiveMQDestination destination){
String result = "";
if (destination != null){
result = ",destinationType="+ JMXSupport.encodeObjectNamePart(destination.getDestinationTypeAsString()) + ",destinationName=" + JMXSupport.encodeObjectNamePart(destination.getPhysicalName());
}
return result;
}
protected ObjectName createObjectName(ProducerInfo producerInfo, String connectionClientId) throws MalformedObjectNameException {
String objectNameStr = brokerObjectName.toString();
if (producerInfo.getDestination() == null) {
objectNameStr += ",endpoint=dynamicProducer";
} else {
objectNameStr += getDestinationType(producerInfo.getDestination()) + ",endpoint=Producer";
}
objectNameStr += ",clientId=" + JMXSupport.encodeObjectNamePart(connectionClientId);
objectNameStr += ",producerId=" + JMXSupport.encodeObjectNamePart(producerInfo.getProducerId().toString());
return new ObjectName(objectNameStr);
}
public ObjectName registerSlowConsumerStrategy(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException {
ObjectName objectName = null;
try {
objectName = createObjectName(strategy);
objectName = BrokerMBeanSuppurt.createAbortSlowConsumerStrategyName(brokerObjectName, strategy);
if (!registeredMBeans.contains(objectName)) {
AbortSlowConsumerStrategyView view = new AbortSlowConsumerStrategyView(this, strategy);
AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
@ -734,17 +689,9 @@ public class ManagedRegionBroker extends RegionBroker {
return objectName;
}
protected ObjectName createObjectName(XATransaction transaction) throws MalformedObjectNameException {
ObjectName objectName = new ObjectName(brokerObjectName.toString()
+ "," + "transactionType=RecoveredXaTransaction"
+ "," + "Xid="
+ JMXSupport.encodeObjectNamePart(transaction.getTransactionId().toString()));
return objectName;
}
public void registerRecoveredTransactionMBean(XATransaction transaction) {
try {
ObjectName objectName = createObjectName(transaction);
ObjectName objectName = BrokerMBeanSuppurt.createXATransactionName(brokerObjectName, transaction);
if (!registeredMBeans.contains(objectName)) {
RecoveredXATransactionView view = new RecoveredXATransactionView(this, transaction);
AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, objectName);
@ -758,7 +705,7 @@ public class ManagedRegionBroker extends RegionBroker {
public void unregister(XATransaction transaction) {
try {
ObjectName objectName = createObjectName(transaction);
ObjectName objectName = BrokerMBeanSuppurt.createXATransactionName(brokerObjectName, transaction);
if (registeredMBeans.remove(objectName)) {
try {
managementContext.unregisterMBean(objectName);
@ -772,13 +719,6 @@ public class ManagedRegionBroker extends RegionBroker {
}
}
private ObjectName createObjectName(AbortSlowConsumerStrategy strategy) throws MalformedObjectNameException{
String objectNameStr = this.brokerObjectName.toString();
objectNameStr += ",Service=SlowConsumerStrategy,InstanceName="+ JMXSupport.encodeObjectNamePart(strategy.getName());
ObjectName objectName = new ObjectName(objectNameStr);
return objectName;
}
public ObjectName getSubscriberObjectName(Subscription key) {
return subscriptionMap.get(key);
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.broker.jmx;
import java.io.IOException;
import javax.management.ObjectName;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector;
@ -27,7 +28,6 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.JMXSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -39,7 +39,7 @@ public class ManagedTransportConnection extends TransportConnection {
private final ManagementContext managementContext;
private final ObjectName connectorName;
private ConnectionViewMBean mbean;
private final ConnectionViewMBean mbean;
private ObjectName byClientIdName;
private ObjectName byAddressName;
@ -74,6 +74,7 @@ public class ManagedTransportConnection extends TransportConnection {
super.stopAsync();
}
@Override
public Response processAddConnection(ConnectionInfo info) throws Exception {
Response answer = super.processAddConnection(info);
String clientId = info.getClientId();
@ -115,10 +116,7 @@ public class ManagedTransportConnection extends TransportConnection {
protected ObjectName createByAddressObjectName(String type, String value) throws IOException {
try {
String objectNameStr = connectorName.toString();
objectNameStr += ",connectionViewType=" + JMXSupport.encodeObjectNamePart(type);
objectNameStr += ",connectionName="+JMXSupport.encodeObjectNamePart(value);
return new ObjectName(objectNameStr);
return BrokerMBeanSuppurt.createConnectionViewByAddressName(connectorName, type, value);
} catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
@ -126,12 +124,9 @@ public class ManagedTransportConnection extends TransportConnection {
protected ObjectName createByClientIdObjectName(String value) throws IOException {
try {
String objectNameStr = connectorName.toString();
objectNameStr += ",connectionName="+JMXSupport.encodeObjectNamePart(value);
return new ObjectName(objectNameStr);
return BrokerMBeanSuppurt.createConnectionViewByClientIdName(connectorName, value);
} catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
}
}

View File

@ -31,7 +31,6 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.JMXSupport;
/**
*
@ -56,6 +55,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return the clientId
*/
@Override
public String getClientId() {
return clientId;
}
@ -63,6 +63,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @returns the ObjectName of the Connection that created this subscription
*/
@Override
public ObjectName getConnection() {
ObjectName result = null;
@ -89,10 +90,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
private ObjectName createConnectionQuery(ManagementContext ctx, String brokerName) throws IOException {
try {
return new ObjectName(ctx.getJmxDomainName() + ":type=Broker,brokerName="
+ JMXSupport.encodeObjectNamePart(brokerName) + ","
+ "connector=*," + "connectorName=*,"
+ "connectionName=" + JMXSupport.encodeObjectNamePart(clientId));
return BrokerMBeanSuppurt.createConnectionQuery(ctx.getJmxDomainName(), brokerName, clientId);
} catch (Throwable e) {
throw IOExceptionSupport.create(e);
}
@ -101,6 +99,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return the id of the Connection the Subscription is on
*/
@Override
public String getConnectionId() {
ConsumerInfo info = getConsumerInfo();
if (info != null) {
@ -112,6 +111,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return the id of the Session the subscription is on
*/
@Override
public long getSessionId() {
ConsumerInfo info = getConsumerInfo();
if (info != null) {
@ -123,6 +123,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return the id of the Subscription
*/
@Override
public long getSubcriptionId() {
ConsumerInfo info = getConsumerInfo();
if (info != null) {
@ -134,6 +135,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return the destination name
*/
@Override
public String getDestinationName() {
ConsumerInfo info = getConsumerInfo();
if (info != null) {
@ -143,6 +145,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
return "NOTSET";
}
@Override
public String getSelector() {
if (subscription != null) {
return subscription.getSelector();
@ -150,6 +153,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
return null;
}
@Override
public void setSelector(String selector) throws InvalidSelectorException, UnsupportedOperationException {
if (subscription != null) {
subscription.setSelector(selector);
@ -161,6 +165,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return true if the destination is a Queue
*/
@Override
public boolean isDestinationQueue() {
ConsumerInfo info = getConsumerInfo();
if (info != null) {
@ -173,6 +178,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return true of the destination is a Topic
*/
@Override
public boolean isDestinationTopic() {
ConsumerInfo info = getConsumerInfo();
if (info != null) {
@ -185,6 +191,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return true if the destination is temporary
*/
@Override
public boolean isDestinationTemporary() {
ConsumerInfo info = getConsumerInfo();
if (info != null) {
@ -197,6 +204,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return true if the subscriber is active
*/
@Override
public boolean isActive() {
return true;
}
@ -214,6 +222,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return whether or not the subscriber is retroactive or not
*/
@Override
public boolean isRetroactive() {
ConsumerInfo info = getConsumerInfo();
return info != null ? info.isRetroactive() : false;
@ -222,6 +231,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return whether or not the subscriber is an exclusive consumer
*/
@Override
public boolean isExclusive() {
ConsumerInfo info = getConsumerInfo();
return info != null ? info.isExclusive() : false;
@ -230,6 +240,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return whether or not the subscriber is durable (persistent)
*/
@Override
public boolean isDurable() {
ConsumerInfo info = getConsumerInfo();
return info != null ? info.isDurable() : false;
@ -238,6 +249,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return whether or not the subscriber ignores local messages
*/
@Override
public boolean isNoLocal() {
ConsumerInfo info = getConsumerInfo();
return info != null ? info.isNoLocal() : false;
@ -249,6 +261,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
* perform eviction of messages for slow consumers on non-durable
* topics.
*/
@Override
public int getMaximumPendingMessageLimit() {
ConsumerInfo info = getConsumerInfo();
return info != null ? info.getMaximumPendingMessageLimit() : 0;
@ -257,6 +270,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return the consumer priority
*/
@Override
public byte getPriority() {
ConsumerInfo info = getConsumerInfo();
return info != null ? info.getPriority() : 0;
@ -266,6 +280,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
* @return the name of the consumer which is only used for durable
* consumers.
*/
@Override
public String getSubcriptionName() {
ConsumerInfo info = getConsumerInfo();
return info != null ? info.getSubscriptionName() : null;
@ -274,6 +289,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return number of messages pending delivery
*/
@Override
public int getPendingQueueSize() {
return subscription != null ? subscription.getPendingQueueSize() : 0;
}
@ -281,10 +297,12 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return number of messages dispatched
*/
@Override
public int getDispatchedQueueSize() {
return subscription != null ? subscription.getDispatchedQueueSize() : 0;
}
@Override
public int getMessageCountAwaitingAcknowledge() {
return getDispatchedQueueSize();
}
@ -292,6 +310,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return number of messages that matched the subscription
*/
@Override
public long getDispatchedCounter() {
return subscription != null ? subscription.getDispatchedCounter() : 0;
}
@ -299,6 +318,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return number of messages that matched the subscription
*/
@Override
public long getEnqueueCounter() {
return subscription != null ? subscription.getEnqueueCounter() : 0;
}
@ -306,6 +326,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return number of messages queued by the client
*/
@Override
public long getDequeueCounter() {
return subscription != null ? subscription.getDequeueCounter() : 0;
}
@ -317,16 +338,19 @@ public class SubscriptionView implements SubscriptionViewMBean {
/**
* @return pretty print
*/
@Override
public String toString() {
return "SubscriptionView: " + getClientId() + ":" + getConnectionId();
}
/**
*/
@Override
public int getPrefetchSize() {
return subscription != null ? subscription.getPrefetchSize() : 0;
}
@Override
public boolean isMatchingQueue(String queueName) {
if (isDestinationQueue()) {
return matchesDestination(new ActiveMQQueue(queueName));
@ -334,6 +358,7 @@ public class SubscriptionView implements SubscriptionViewMBean {
return false;
}
@Override
public boolean isMatchingTopic(String topicName) {
if (isDestinationTopic()) {
return matchesDestination(new ActiveMQTopic(topicName));

View File

@ -16,18 +16,14 @@
*/
package org.apache.activemq.network;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Map;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerMBeanSuppurt;
import org.apache.activemq.broker.jmx.NetworkBridgeView;
import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
import org.apache.activemq.util.JMXSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -83,9 +79,7 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
}
protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
Hashtable<String, String> map = new Hashtable<String, String>(connectorName.getKeyPropertyList());
map.put("networkBridge", JMXSupport.encodeObjectNamePart(bridge.getRemoteAddress()));
return new ObjectName(connectorName.getDomain(), map);
return BrokerMBeanSuppurt.createNetworkBridgeObjectName(connectorName, bridge.getRemoteAddress());
}
public void setCreatedByDuplex(boolean createdByDuplex) {

View File

@ -19,13 +19,10 @@ package org.apache.activemq.network;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
@ -33,13 +30,13 @@ import javax.management.ObjectName;
import org.apache.activemq.Service;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerMBeanSuppurt;
import org.apache.activemq.broker.jmx.NetworkBridgeView;
import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.ServiceSupport;
import org.slf4j.Logger;
@ -57,10 +54,12 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
protected ServiceSupport serviceSupport = new ServiceSupport() {
@Override
protected void doStart() throws Exception {
handleStart();
}
@Override
protected void doStop(ServiceStopper stopper) throws Exception {
handleStop(stopper);
}
@ -145,7 +144,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
}
ActiveMQDestination[] dest = new ActiveMQDestination[topics.size()];
dest = (ActiveMQDestination[])topics.toArray(dest);
dest = topics.toArray(dest);
result.setDurableDestinations(dest);
}
return result;
@ -155,10 +154,12 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
return TransportFactory.connect(localURI);
}
@Override
public void start() throws Exception {
serviceSupport.start();
}
@Override
public void stop() throws Exception {
serviceSupport.stop();
}
@ -228,11 +229,7 @@ public abstract class NetworkConnector extends NetworkBridgeConfiguration implem
}
protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
ObjectName connectorName = getObjectName();
Map<String, String> map = new HashMap<String, String>(connectorName.getKeyPropertyList());
return new ObjectName(connectorName.getDomain() + ":" + "BrokerName=" + JMXSupport.encodeObjectNamePart((String)map.get("BrokerName")) + "," + "Type=NetworkBridge,"
+ "NetworkConnectorName=" + JMXSupport.encodeObjectNamePart((String)map.get("NetworkConnectorName")) + "," + "Name="
+ JMXSupport.encodeObjectNamePart(JMXSupport.encodeObjectNamePart(bridge.getRemoteAddress())));
return BrokerMBeanSuppurt.createNetworkBridgeObjectName(getObjectName(), bridge.getRemoteAddress());
}
// ask all the bridges as we can't know to which this consumer is tied