From ccf4b9f34f0fe68c3e0ae3acc790f58299f1db99 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Tue, 8 Jul 2014 16:20:21 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5262 close connections when the connector is stopped. --- .../activemq/network/jms/JmsConnector.java | 31 ++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/jms/JmsConnector.java b/activemq-broker/src/main/java/org/apache/activemq/network/jms/JmsConnector.java index afeb88a94e..6ddc6c5e30 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/network/jms/JmsConnector.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/jms/JmsConnector.java @@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicReference; import javax.jms.Connection; import javax.jms.Destination; -import javax.jms.QueueConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.Service; @@ -73,20 +72,21 @@ public abstract class JmsConnector implements Service { private ReconnectionPolicy policy = new ReconnectionPolicy(); protected ThreadPoolExecutor connectionSerivce; - private List inboundBridges = new CopyOnWriteArrayList(); - private List outboundBridges = new CopyOnWriteArrayList(); + private final List inboundBridges = new CopyOnWriteArrayList(); + private final List outboundBridges = new CopyOnWriteArrayList(); private String name; private static LRUCache createLRUCache() { return new LRUCache() { private static final long serialVersionUID = -7446792754185879286L; + @Override protected boolean removeEldestEntry(Map.Entry enty) { if (size() > maxCacheSize) { Iterator> iter = entrySet().iterator(); Map.Entry lru = iter.next(); remove(lru.getKey()); - DestinationBridge bridge = (DestinationBridge)lru.getValue(); + DestinationBridge bridge = lru.getValue(); try { bridge.stop(); LOG.info("Expired bridge: {}", bridge); @@ -151,6 +151,7 @@ public abstract class JmsConnector implements Service { return true; } + @Override public void start() throws Exception { if (started.compareAndSet(false, true)) { init(); @@ -164,12 +165,27 @@ public abstract class JmsConnector implements Service { } } + @Override public void stop() throws Exception { if (started.compareAndSet(true, false)) { ThreadPoolUtils.shutdown(connectionSerivce); connectionSerivce = null; + if (foreignConnection.get() != null) { + try { + foreignConnection.get().close(); + } catch (Exception e) { + } + } + + if (localConnection.get() != null) { + try { + localConnection.get().close(); + } catch (Exception e) { + } + } + for (DestinationBridge bridge : inboundBridges) { bridge.stop(); } @@ -480,7 +496,7 @@ public abstract class JmsConnector implements Service { // TODO - How do we handle the re-wiring of replyToBridges in this case. replyToBridges.clear(); - if (this.foreignConnection.compareAndSet((QueueConnection)connection, null)) { + if (this.foreignConnection.compareAndSet(connection, null)) { // Stop the inbound bridges when the foreign connection is dropped since // the bridge has no consumer and needs to be restarted once a new connection @@ -505,7 +521,7 @@ public abstract class JmsConnector implements Service { } }); - } else if (this.localConnection.compareAndSet((QueueConnection)connection, null)) { + } else if (this.localConnection.compareAndSet(connection, null)) { // Stop the outbound bridges when the local connection is dropped since // the bridge has no consumer and needs to be restarted once a new connection @@ -614,7 +630,8 @@ public abstract class JmsConnector implements Service { this.failed.set(true); } - private ThreadFactory factory = new ThreadFactory() { + private final ThreadFactory factory = new ThreadFactory() { + @Override public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: "); thread.setDaemon(true);