close connections when the connector is stopped.
This commit is contained in:
Timothy Bish 2014-07-08 16:20:21 -04:00
parent e62e90abaf
commit ccf4b9f34f
1 changed files with 24 additions and 7 deletions

View File

@ -29,7 +29,6 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.QueueConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Service;
@ -73,20 +72,21 @@ public abstract class JmsConnector implements Service {
private ReconnectionPolicy policy = new ReconnectionPolicy();
protected ThreadPoolExecutor connectionSerivce;
private List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
private List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
private final List<DestinationBridge> inboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
private final List<DestinationBridge> outboundBridges = new CopyOnWriteArrayList<DestinationBridge>();
private String name;
private static LRUCache<Destination, DestinationBridge> createLRUCache() {
return new LRUCache<Destination, DestinationBridge>() {
private static final long serialVersionUID = -7446792754185879286L;
@Override
protected boolean removeEldestEntry(Map.Entry<Destination, DestinationBridge> enty) {
if (size() > maxCacheSize) {
Iterator<Map.Entry<Destination, DestinationBridge>> iter = entrySet().iterator();
Map.Entry<Destination, DestinationBridge> lru = iter.next();
remove(lru.getKey());
DestinationBridge bridge = (DestinationBridge)lru.getValue();
DestinationBridge bridge = lru.getValue();
try {
bridge.stop();
LOG.info("Expired bridge: {}", bridge);
@ -151,6 +151,7 @@ public abstract class JmsConnector implements Service {
return true;
}
@Override
public void start() throws Exception {
if (started.compareAndSet(false, true)) {
init();
@ -164,12 +165,27 @@ public abstract class JmsConnector implements Service {
}
}
@Override
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
ThreadPoolUtils.shutdown(connectionSerivce);
connectionSerivce = null;
if (foreignConnection.get() != null) {
try {
foreignConnection.get().close();
} catch (Exception e) {
}
}
if (localConnection.get() != null) {
try {
localConnection.get().close();
} catch (Exception e) {
}
}
for (DestinationBridge bridge : inboundBridges) {
bridge.stop();
}
@ -480,7 +496,7 @@ public abstract class JmsConnector implements Service {
// TODO - How do we handle the re-wiring of replyToBridges in this case.
replyToBridges.clear();
if (this.foreignConnection.compareAndSet((QueueConnection)connection, null)) {
if (this.foreignConnection.compareAndSet(connection, null)) {
// Stop the inbound bridges when the foreign connection is dropped since
// the bridge has no consumer and needs to be restarted once a new connection
@ -505,7 +521,7 @@ public abstract class JmsConnector implements Service {
}
});
} else if (this.localConnection.compareAndSet((QueueConnection)connection, null)) {
} else if (this.localConnection.compareAndSet(connection, null)) {
// Stop the outbound bridges when the local connection is dropped since
// the bridge has no consumer and needs to be restarted once a new connection
@ -614,7 +630,8 @@ public abstract class JmsConnector implements Service {
this.failed.set(true);
}
private ThreadFactory factory = new ThreadFactory() {
private final ThreadFactory factory = new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "JmsConnector Async Connection Task: ");
thread.setDaemon(true);