From 1a01d137317fd1d2e7ce202034ac3a5eed11b389 Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Fri, 7 Mar 2008 09:06:30 +0000 Subject: [PATCH] Detect clients fail over - that haven't removed themselves from a brokee git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@634589 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/advisory/AdvisoryBroker.java | 2 +- .../broker/cluster/ConnectionSplitBroker.java | 22 +++++++++++++++---- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 593022476c..b86a785ce9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -79,7 +79,7 @@ public class AdvisoryBroker extends BrokerFilter { public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { Subscription answer = next.addConsumer(context, info); - + // Don't advise advisory topics. if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) { ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination()); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java index 1ab06b8cda..7ad2be2984 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.broker.cluster; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -31,6 +32,8 @@ import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerFilter; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.TransportConnection; +import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.ConnectionId; @@ -50,14 +53,14 @@ import org.apache.commons.logging.LogFactory; public class ConnectionSplitBroker extends BrokerFilter implements MessageListener{ private static final Log LOG = LogFactory.getLog(ConnectionSplitBroker.class); private Connection connection; - private Map clientMap = new ConcurrentHashMap(); + private Map clientMap = new ConcurrentHashMap(); public ConnectionSplitBroker(Broker next) { super(next); } public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { if (info != null){ - clientMap.put(info.getConnectionId(),info); + clientMap.put(info.getConnectionId(),context); } super.addConnection(context, info); } @@ -77,6 +80,7 @@ public class ConnectionSplitBroker extends BrokerFilter implements MessageListen fac.setWarnAboutUnstartedConnectionTimeout(10000); fac.setWatchTopicAdvisories(false); fac.setAlwaysSessionAsync(true); + fac.setClientID(getBrokerId().toString()+":" + getBrokerName() + ":ConnectionSplitBroker"); connection = fac.createConnection(); connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -101,8 +105,18 @@ public class ConnectionSplitBroker extends BrokerFilter implements MessageListen String brokerId=null; try { brokerId = message.getStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID); - if (brokerId != null && brokerId.equals(getBrokerId().getValue())) { - + if (brokerId != null && !brokerId.equals(getBrokerId().getValue())) { + //see if it already exits + ConnectionContext old = clientMap.remove(info.getConnectionId()); + if (old != null && old.getConnection() != null) { + String str = "connectionId=" + old.getConnectionId() +",clientId="+old.getClientId(); + LOG.warn("Removing stale connection: " + str); + try { + old.getConnection().stop(); + } catch (Exception e) { + LOG.error("Failed to remove stale connection: " + str); + } + } } } catch (JMSException e) { LOG.warn("Failed to get message property "+ AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID,e);