This commit is contained in:
rajdavies 2013-12-04 19:00:54 +00:00
parent 02ef9445dd
commit 489f929686
8 changed files with 312 additions and 3 deletions

View File

@ -173,6 +173,21 @@ public class BrokerMBeanSupport {
return new ObjectName(connectorName.getDomain(), map);
}
public static ObjectName createNetworkOutBoundDestinationObjectName(ObjectName networkName, ActiveMQDestination destination) throws MalformedObjectNameException {
String str = networkName.toString();
str += ",direction=outbound" + createDestinationProperties(destination);
return new ObjectName(str);
}
public static ObjectName createNetworkInBoundDestinationObjectName(ObjectName networkName, ActiveMQDestination destination) throws MalformedObjectNameException {
String str = networkName.toString();
str += ",direction=inbound" + createDestinationProperties(destination);
return new ObjectName(str);
}
public static ObjectName createProxyConnectorName(ObjectName brokerObjectName, String type, String name) throws MalformedObjectNameException {
return createProxyConnectorName(brokerObjectName.toString(), type, name);
}

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
import org.apache.activemq.management.TimeStatisticImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NetworkDestinationView implements NetworkDestinationViewMBean {
private static final Logger LOG = LoggerFactory.getLogger(NetworkDestinationViewMBean.class);
private TimeStatisticImpl timeStatistic = new TimeStatisticImpl("networkEnqueue","network messages enqueued");
private final String name;
private long lastTime = -1;
public NetworkDestinationView(String name){
this.name = name;
}
/**
* Returns the name of this destination
*/
@Override
public String getName() {
return name;
}
/**
* Resets the managment counters.
*/
@Override
public void resetStatistics() {
timeStatistic.reset();
lastTime = -1;
}
@Override
public long getCount() {
return timeStatistic.getCount();
}
@Override
public double getRate() {
return timeStatistic.getAveragePerSecond();
}
public void messageSent(){
long currentTime = System.currentTimeMillis();
long time = 0;
if (lastTime < 0){
time = 0;
lastTime = currentTime;
}else{
time = currentTime-lastTime;
}
timeStatistic.addTime(time);
lastTime=currentTime;
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.jmx;
public interface NetworkDestinationViewMBean {
/**
* Returns the name of this destination
*/
@MBeanInfo("Name of this destination.")
String getName();
/**
* Resets the managment counters.
*/
@MBeanInfo("Resets statistics.")
void resetStatistics();
/**
* Returns the number of messages that have been sent to the destination.
*
* @return The number of messages that have been sent to the destination.
*/
@MBeanInfo("Number of messages that have been sent to the destination.")
long getCount();
/**
* Returns the rate of messages that have been sent to the destination.
*
* @return The rate of messages that have been sent to the destination.
*/
@MBeanInfo("rate of messages sent across the network destination.")
double getRate();
}

View File

@ -607,6 +607,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else {
duplexInboundLocalBroker.oneway(message);
}
serviceInboundMessage(message);
}
} else {
switch (command.getDataStructureType()) {
@ -985,6 +986,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
sub.decrementOutstandingResponses();
}
}
serviceOutbound(message);
} else {
LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
}
@ -1612,4 +1614,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
protected void serviceOutbound(Message message){
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null){
l.onOutboundMessage(this,message);
}
}
protected void serviceInboundMessage(Message message){
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null){
l.onInboundMessage(this,message);
}
}
}

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
import org.apache.activemq.broker.jmx.NetworkDestinationView;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MBeanBridgeDestination {
private static final Logger LOG = LoggerFactory.getLogger(MBeanBridgeDestination.class);
private final BrokerService brokerService;
private final NetworkBridge bridge;
private Map<ActiveMQDestination, ObjectName> destinationObjectNameMap = new ConcurrentHashMap<ActiveMQDestination, ObjectName>();
private Map<ActiveMQDestination, NetworkDestinationView> outboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
private Map<ActiveMQDestination, NetworkDestinationView> inboundDestinationViewMap = new ConcurrentHashMap<ActiveMQDestination, NetworkDestinationView>();
public MBeanBridgeDestination(BrokerService brokerService, NetworkBridge bridge) {
this.brokerService = brokerService;
this.bridge = bridge;
}
public void onOutboundMessage(Message message) {
ActiveMQDestination destination = message.getDestination();
NetworkDestinationView networkDestinationView = outboundDestinationViewMap.get(destination);
if (networkDestinationView == null) {
synchronized (destinationObjectNameMap) {
if (!destinationObjectNameMap.containsKey(destination)) {
ObjectName bridgeObjectName = bridge.getMbeanObjectName();
try {
ObjectName objectName = BrokerMBeanSupport.createNetworkOutBoundDestinationObjectName(bridgeObjectName, destination);
networkDestinationView = new NetworkDestinationView(destination.getPhysicalName());
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
destinationObjectNameMap.put(destination, objectName);
outboundDestinationViewMap.put(destination, networkDestinationView);
} catch (Exception e) {
LOG.warn("Failed to register " + destination, e);
}
}
}
}
networkDestinationView.messageSent();
}
public void onInboundMessage(Message message) {
ActiveMQDestination destination = message.getDestination();
NetworkDestinationView networkDestinationView = inboundDestinationViewMap.get(destination);
if (networkDestinationView == null) {
synchronized (destinationObjectNameMap) {
if (!destinationObjectNameMap.containsKey(destination)) {
ObjectName bridgeObjectName = bridge.getMbeanObjectName();
try {
ObjectName objectName = BrokerMBeanSupport.createNetworkInBoundDestinationObjectName(bridgeObjectName, destination);
networkDestinationView= new NetworkDestinationView(destination.getPhysicalName());
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), networkDestinationView, objectName);
destinationObjectNameMap.put(destination, objectName);
inboundDestinationViewMap.put(destination, networkDestinationView);
} catch (Exception e) {
LOG.warn("Failed to register " + destination, e);
}
}
}
}
networkDestinationView.messageSent();
}
public void close() {
if (!brokerService.isUseJmx()) {
return;
}
for (ObjectName objectName : destinationObjectNameMap.values()) {
try {
if (objectName != null) {
brokerService.getManagementContext().unregisterMBean(objectName);
}
} catch (Throwable e) {
LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
}
}
destinationObjectNameMap.clear();
outboundDestinationViewMap.clear();
inboundDestinationViewMap.clear();
}
}

View File

@ -16,14 +16,17 @@
*/
package org.apache.activemq.network;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.AnnotatedMBean;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
import org.apache.activemq.broker.jmx.NetworkBridgeView;
import org.apache.activemq.broker.jmx.NetworkBridgeViewMBean;
import org.apache.activemq.command.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,7 +37,7 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
BrokerService brokerService;
ObjectName connectorName;
boolean createdByDuplex = false;
private Map<NetworkBridge,MBeanBridgeDestination> destinationObjectNameMap = new ConcurrentHashMap<NetworkBridge,MBeanBridgeDestination>();
public MBeanNetworkListener(BrokerService brokerService, ObjectName connectorName) {
this.brokerService = brokerService;
this.connectorName = connectorName;
@ -55,6 +58,8 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
ObjectName objectName = createNetworkBridgeObjectName(bridge);
AnnotatedMBean.registerMBean(brokerService.getManagementContext(), view, objectName);
bridge.setMbeanObjectName(objectName);
MBeanBridgeDestination mBeanBridgeDestination = new MBeanBridgeDestination(brokerService,bridge);
destinationObjectNameMap.put(bridge,mBeanBridgeDestination);
LOG.debug("registered: {} as: {}", bridge, objectName);
} catch (Throwable e) {
LOG.debug("Network bridge could not be registered in JMX: {}", e.getMessage(), e);
@ -71,11 +76,17 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
if (objectName != null) {
brokerService.getManagementContext().unregisterMBean(objectName);
}
MBeanBridgeDestination mBeanBridgeDestination = destinationObjectNameMap.remove(bridge);
if (mBeanBridgeDestination != null){
mBeanBridgeDestination.close();
}
} catch (Throwable e) {
LOG.debug("Network bridge could not be unregistered in JMX: {}", e.getMessage(), e);
}
}
protected ObjectName createNetworkBridgeObjectName(NetworkBridge bridge) throws MalformedObjectNameException {
return BrokerMBeanSupport.createNetworkBridgeObjectName(connectorName, bridge.getRemoteAddress());
}
@ -83,4 +94,23 @@ public class MBeanNetworkListener implements NetworkBridgeListener {
public void setCreatedByDuplex(boolean createdByDuplex) {
this.createdByDuplex = createdByDuplex;
}
@Override
public void onOutboundMessage(NetworkBridge bridge,Message message) {
MBeanBridgeDestination mBeanBridgeDestination = destinationObjectNameMap.get(bridge);
if (mBeanBridgeDestination != null){
mBeanBridgeDestination.onOutboundMessage(message);
}
}
@Override
public void onInboundMessage(NetworkBridge bridge,Message message) {
MBeanBridgeDestination mBeanBridgeDestination = destinationObjectNameMap.get(bridge);
if (mBeanBridgeDestination != null){
mBeanBridgeDestination.onInboundMessage(message);
}
}
}

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.network;
import org.apache.activemq.command.Message;
/**
* called when a bridge fails
*
@ -38,4 +40,18 @@ public interface NetworkBridgeListener {
*/
void onStop(NetworkBridge bridge);
/**
* Called when message forwarded over the network
* @param bridge
* @param message
*/
void onOutboundMessage (NetworkBridge bridge,Message message);
/**
* Called for when a message arrives over the network
* @param bridge
* @param message
*/
void onInboundMessage (NetworkBridge bridge,Message message);
}

View File

@ -154,7 +154,7 @@ public class SizeStatisticImpl extends StatisticImpl{
buffer.append(Long.toString(minSize));
buffer.append(" totalSize: ");
buffer.append(Long.toString(totalSize));
buffer.append(" averageTime: ");
buffer.append(" averageSize: ");
buffer.append(Double.toString(getAverageSize()));
buffer.append(" averageTimeExMinMax: ");
buffer.append(Double.toString(getAveragePerSecondExcludingMinMax()));