mirror of https://github.com/apache/activemq.git
Moving BrokerSubscriptionInfo processing into a new thread to prevent a
deadlock of the network bridge on startup
(cherry picked from commit b9cb02ae54
)
This commit is contained in:
parent
b0ee133eaf
commit
2ea6b00eef
|
@ -163,6 +163,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
protected BrokerService brokerService = null;
|
||||
private ObjectName mbeanObjectName;
|
||||
private final ExecutorService serialExecutor = Executors.newSingleThreadExecutor();
|
||||
//Use a new executor for processing BrokerSubscriptionInfo so we don't block other threads
|
||||
private final ExecutorService syncExecutor = Executors.newSingleThreadExecutor();
|
||||
private Transport duplexInboundLocalBroker = null;
|
||||
private ProducerInfo duplexInboundLocalProducerInfo;
|
||||
|
||||
|
@ -295,6 +297,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
List<Runnable> pendingTasks = serialExecutor.shutdownNow();
|
||||
LOG.info("pending tasks on stop {}", pendingTasks);
|
||||
}
|
||||
//Shutdown the syncExecutor, call countDown to make sure a thread can
|
||||
//terminate if it is waiting
|
||||
staticDestinationsLatch.countDown();
|
||||
syncExecutor.shutdown();
|
||||
if (!syncExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
|
||||
List<Runnable> pendingTasks = syncExecutor.shutdownNow();
|
||||
LOG.info("pending tasks on stop {}", pendingTasks);
|
||||
}
|
||||
localBroker.oneway(new ShutdownInfo());
|
||||
remoteBroker.oneway(new ShutdownInfo());
|
||||
} catch (Throwable e) {
|
||||
|
@ -648,34 +658,52 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
} else if (command.isBrokerInfo()) {
|
||||
futureRemoteBrokerInfo.set((BrokerInfo) command);
|
||||
} else if (command instanceof BrokerSubscriptionInfo) {
|
||||
staticDestinationsLatch.await();
|
||||
BrokerSubscriptionInfo subInfo = (BrokerSubscriptionInfo) command;
|
||||
LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
|
||||
this.brokerService.getBrokerName(), subInfo.getBrokerName());
|
||||
final BrokerSubscriptionInfo brokerSubscriptionInfo = (BrokerSubscriptionInfo) command;
|
||||
|
||||
if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
|
||||
&& !configuration.isDynamicOnly()) {
|
||||
if (started.get()) {
|
||||
if (subInfo.getSubscriptionInfos() != null) {
|
||||
for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
|
||||
//re-add any process any non-NC consumers that match the
|
||||
//dynamicallyIncludedDestinations list
|
||||
if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
|
||||
NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
|
||||
serviceRemoteConsumerAdvisory(info);
|
||||
//Start in a new thread so we don't block the transport waiting for staticDestinations
|
||||
syncExecutor.execute(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
staticDestinationsLatch.await();
|
||||
//Make sure after the countDown of staticDestinationsLatch we aren't stopping
|
||||
if (!disposed.get()) {
|
||||
BrokerSubscriptionInfo subInfo = brokerSubscriptionInfo;
|
||||
LOG.debug("Received Remote BrokerSubscriptionInfo on {} from {}",
|
||||
brokerService.getBrokerName(), subInfo.getBrokerName());
|
||||
|
||||
if (configuration.isSyncDurableSubs() && configuration.isConduitSubscriptions()
|
||||
&& !configuration.isDynamicOnly()) {
|
||||
if (started.get()) {
|
||||
if (subInfo.getSubscriptionInfos() != null) {
|
||||
for (ConsumerInfo info : subInfo.getSubscriptionInfos()) {
|
||||
//re-add any process any non-NC consumers that match the
|
||||
//dynamicallyIncludedDestinations list
|
||||
if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
|
||||
NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) {
|
||||
serviceRemoteConsumerAdvisory(info);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//After re-added, clean up any empty durables
|
||||
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
|
||||
DemandSubscription ds = i.next();
|
||||
if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) {
|
||||
cleanupDurableSub(ds, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//After re-added, clean up any empty durables
|
||||
for (Iterator<DemandSubscription> i = subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
|
||||
DemandSubscription ds = i.next();
|
||||
if (NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, ds.getLocalInfo().getDestination())) {
|
||||
cleanupDurableSub(ds, i);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Error processing BrokerSubscriptionInfo: {}", e.getMessage(), e);
|
||||
LOG.debug(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
} else if (command.getClass() == ConnectionError.class) {
|
||||
ConnectionError ce = (ConnectionError) command;
|
||||
serviceRemoteException(ce.getException());
|
||||
|
|
Loading…
Reference in New Issue