Add foundation for monitoring split brokers from a clients view point

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@634488 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-03-07 00:59:02 +00:00
parent cf55702288
commit 88fbb05af5
4 changed files with 168 additions and 20 deletions

View File

@ -83,10 +83,9 @@ public class AdvisoryBroker extends BrokerFilter {
// Don't advise advisory topics.
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
consumers.put(info.getConsumerId(), info);
//consumers.put(info.getConsumerId(), info);
fireConsumerAdvisory(context,info.getDestination(), topic, info);
} else {
// We need to replay all the previously collected state objects
// for this newly added consumer.
if (AdvisorySupport.isConnectionAdvisoryTopic(info.getDestination())) {
@ -128,6 +127,7 @@ public class AdvisoryBroker extends BrokerFilter {
}
}
}
consumers.put(info.getConsumerId(), info);
return answer;
}
@ -251,6 +251,9 @@ public class AdvisoryBroker extends BrokerFilter {
protected void fireAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_NAME, getBrokerName());
String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
}
@ -268,6 +271,7 @@ public class AdvisoryBroker extends BrokerFilter {
}
}
advisoryMessage.setIntProperty("consumerCount", count);
fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
}

View File

@ -40,8 +40,9 @@ public final class AdvisorySupport {
public static final String NO_TOPIC_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Topic.";
public static final String NO_QUEUE_CONSUMERS_TOPIC_PREFIX = ADVISORY_TOPIC_PREFIX + "NoConsumer.Queue.";
public static final String AGENT_TOPIC = "ActiveMQ.Agent";
public static final String ADIVSORY_MESSAGE_TYPE = "Advisory";
public static final String MSG_PROPERTY_ORIGIN_BROKER_ID="originBrokerId";
public static final String MSG_PROPERTY_ORIGIN_BROKER_NAME="originBrokerName";
public static final ActiveMQTopic TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC = new ActiveMQTopic(TEMP_QUEUE_ADVISORY_TOPIC + "," + TEMP_TOPIC_ADVISORY_TOPIC);
private static final ActiveMQTopic AGENT_TOPIC_DESTINATION = new ActiveMQTopic(AGENT_TOPIC);

View File

@ -38,6 +38,7 @@ import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
import org.apache.activemq.broker.ft.MasterConnector;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.ConnectorView;
@ -159,11 +160,13 @@ public class BrokerService implements Service {
private boolean useLocalHostBrokerName;
private CountDownLatch stoppedLatch = new CountDownLatch(1);
private boolean supportFailOver;
private boolean clustered;
private Broker regionBroker;
private int producerSystemUsagePortion = 60;
private int consumerSystemUsagePortion = 40;
private boolean splitSystemUsageForProducersConsumers;
private boolean monitorConnectionSplits;
private int taskRunnerPriority = Thread.NORM_PRIORITY;
private boolean dedicatedTaskRunner;
static {
@ -757,7 +760,7 @@ public class BrokerService implements Service {
public TaskRunnerFactory getTaskRunnerFactory() {
if (taskRunnerFactory == null) {
taskRunnerFactory = new TaskRunnerFactory();
taskRunnerFactory = new TaskRunnerFactory("BrokerService",getTaskRunnerPriority(),true,1000,isDedicatedTaskRunner());
}
return taskRunnerFactory;
}
@ -1216,20 +1219,6 @@ public class BrokerService implements Service {
this.supportFailOver = supportFailOver;
}
/**
* @return the clustered
*/
public boolean isClustered() {
return this.clustered;
}
/**
* @param clustered the clustered to set
*/
public void setClustered(boolean clustered) {
this.clustered = clustered;
}
/**
* Looks up and lazily creates if necessary the destination for the given JMS name
*/
@ -1261,7 +1250,30 @@ public class BrokerService implements Service {
boolean splitSystemUsageForProducersConsumers) {
this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
}
public boolean isMonitorConnectionSplits() {
return monitorConnectionSplits;
}
public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
this.monitorConnectionSplits = monitorConnectionSplits;
}
public int getTaskRunnerPriority() {
return taskRunnerPriority;
}
public void setTaskRunnerPriority(int taskRunnerPriority) {
this.taskRunnerPriority = taskRunnerPriority;
}
public boolean isDedicatedTaskRunner() {
return dedicatedTaskRunner;
}
public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
this.dedicatedTaskRunner = dedicatedTaskRunner;
}
//
// Implementation methods
// -------------------------------------------------------------------------
/**
@ -1579,6 +1591,9 @@ public class BrokerService implements Service {
if (isPopulateJMSXUserID()) {
broker = new UserIDBroker(broker);
}
if (isMonitorConnectionSplits()){
broker = new ConnectionSplitBroker(broker);
}
if (plugins != null) {
for (int i = 0; i < plugins.length; i++) {
BrokerPlugin plugin = plugins[i];
@ -1844,4 +1859,4 @@ public class BrokerService implements Service {
public void setRegionBroker(Broker regionBroker) {
this.regionBroker = regionBroker;
}
}
}

View File

@ -0,0 +1,128 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.cluster;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.DataStructure;
import org.apache.activemq.command.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Monitors for client connections that may fail to another
* broker - but this broker isn't aware they've gone.
* Can occur with network glitches or client error
*
* @version $Revision$
*/
public class ConnectionSplitBroker extends BrokerFilter implements MessageListener{
private static final Log LOG = LogFactory.getLog(ConnectionSplitBroker.class);
private Connection connection;
private Map <ConnectionId,ConnectionInfo>clientMap = new ConcurrentHashMap<ConnectionId,ConnectionInfo>();
public ConnectionSplitBroker(Broker next) {
super(next);
}
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
if (info != null){
clientMap.put(info.getConnectionId(),info);
}
super.addConnection(context, info);
}
public void removeConnection(ConnectionContext context, ConnectionInfo info, Throwable error)
throws Exception {
if (info != null){
clientMap.remove(info.getConnectionId());
}
super.removeConnection(context, info, error);
}
public void start() throws Exception{
super.start();
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(getBrokerService().getVmConnectorURI());
fac.setCloseTimeout(1);
fac.setWarnAboutUnstartedConnectionTimeout(10000);
fac.setWatchTopicAdvisories(false);
fac.setAlwaysSessionAsync(true);
connection = fac.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(AdvisorySupport.getConnectionAdvisoryTopic());
consumer.setMessageListener(this);
}
public synchronized void stop() throws Exception{
if (connection != null){
connection.stop();
connection = null;
}
super.stop();
}
public void onMessage(javax.jms.Message m) {
ActiveMQMessage message = (ActiveMQMessage) m;
DataStructure o = message.getDataStructure();
if (o != null && o.getClass() == ConnectionInfo.class) {
ConnectionInfo info = (ConnectionInfo) o;
String brokerId=null;
try {
brokerId = message.getStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID);
if (brokerId != null && brokerId.equals(getBrokerId().getValue())) {
}
} catch (JMSException e) {
LOG.warn("Failed to get message property "+ AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID,e);
}
}
}
protected boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
if (brokerPath != null) {
for (int i = 0; i < brokerPath.length; i++) {
if (brokerId.equals(brokerPath[i])) {
return true;
}
}
}
return false;
}
}