Moving BrokerSubscriptionInfo processing into a new thread to prevent a
deadlock of the network bridge on startup
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-10-24 07:51:34 -04:00
parent 1a811b72dd
commit b9cb02ae54
1 changed files with 51 additions and 23 deletions

View File

@ -163,6 +163,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
protected BrokerService brokerService = null;
private ObjectName mbeanObjectName;
private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
//Use a new executor for processing BrokerSubscriptionInfo so we don't block other threads
private final ExecutorService syncExecutor = Executors.newSingleThreadExecutor();
private Transport duplexInboundLocalBroker = null;
private ProducerInfo duplexInboundLocalProducerInfo;
@ -295,6 +297,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
List<Runnable> pendingTasks = serialExecutor.shutdownNow();
LOG.info("pending tasks on stop {}", pendingTasks);
}
//Shutdown the syncExecutor, call countDown to make sure a thread can
//terminate if it is waiting
staticDestinationsLatch.countDown();
syncExecutor.shutdown();
if (!syncExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
List<Runnable> pendingTasks = syncExecutor.shutdownNow();
LOG.info("pending tasks on stop {}", pendingTasks);
}
localBroker.oneway(new ShutdownInfo());
remoteBroker.oneway(new ShutdownInfo());
} catch (Throwable e) {
@ -648,34 +658,52 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else if (command.isBrokerInfo()) {
futureRemoteBrokerInfo.set((BrokerInfo) command);
} else if (command instanceof BrokerSubscriptionInfo) {
staticDestinationsLatch.await();
BrokerSubscriptionInfo subInfo = (BrokerSubscriptionInfo) command;
LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
this.brokerService.getBrokerName(), subInfo.getBrokerName());
final BrokerSubscriptionInfo brokerSubscriptionInfo = (BrokerSubscriptionInfo) command;
if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
&& !configuration.isDynamicOnly()) {
if (started.get()) {
if (subInfo.getSubscriptionInfos() != null) {
for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
//re-add any process any non-NC consumers that match the
//dynamicallyIncludedDestinations list
if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
serviceRemoteConsumerAdvisory(info);
//Start in a new thread so we don't block the transport waiting for staticDestinations
syncExecutor.execute(new Runnable() {
@Override
public void run() {
try {
staticDestinationsLatch.await();
//Make sure after the countDown of staticDestinationsLatch we aren't stopping
if (!disposed.get()) {
BrokerSubscriptionInfo subInfo = brokerSubscriptionInfo;
LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
brokerService.getBrokerName(), subInfo.getBrokerName());
if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
&& !configuration.isDynamicOnly()) {
if (started.get()) {
if (subInfo.getSubscriptionInfos() != null) {
for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
//re-add any process any non-NC consumers that match the
//dynamicallyIncludedDestinations list
if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
serviceRemoteConsumerAdvisory(info);
}
}
}
//After re-added, clean up any empty durables
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) {
cleanupDurableSub(ds, i);
}
}
}
}
}
}
//After re-added, clean up any empty durables
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
DemandSubscription ds = i.next();
if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) {
cleanupDurableSub(ds, i);
}
} catch (Exception e) {
LOG.warn("Error processing BrokerSubscriptionInfo: {}", e.getMessage(), e);
LOG.debug(e.getMessage(), e);
}
}
}
});
} else if (command.getClass() == ConnectionError.class) {
ConnectionError ce = (ConnectionError) command;
serviceRemoteException(ce.getException());