mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3887 - have local dispatch do safe wait till remove broker setup is complete such that producer is fully created
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1375459 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7e206615b8
commit
3fa927554c
|
@ -405,7 +405,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
if (!disposed.get()) {
|
if (!disposed.get()) {
|
||||||
try {
|
try {
|
||||||
if (command.isMessageDispatch()) {
|
if (command.isMessageDispatch()) {
|
||||||
waitStarted();
|
safeWaitUntilStarted();
|
||||||
MessageDispatch md = (MessageDispatch) command;
|
MessageDispatch md = (MessageDispatch) command;
|
||||||
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
|
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
|
||||||
ackAdvisory(md.getMessage());
|
ackAdvisory(md.getMessage());
|
||||||
|
@ -686,6 +686,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
if (!disposed.get()) {
|
if (!disposed.get()) {
|
||||||
try {
|
try {
|
||||||
if (command.isMessageDispatch()) {
|
if (command.isMessageDispatch()) {
|
||||||
|
safeWaitUntilStarted();
|
||||||
enqueueCounter.incrementAndGet();
|
enqueueCounter.incrementAndGet();
|
||||||
final MessageDispatch md = (MessageDispatch) command;
|
final MessageDispatch md = (MessageDispatch) command;
|
||||||
final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
|
final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
|
||||||
|
@ -1211,10 +1212,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
return removeDone;
|
return removeDone;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void waitStarted() throws InterruptedException {
|
|
||||||
startedLatch.await();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Performs a timed wait on the started latch and then checks for disposed before performing
|
* Performs a timed wait on the started latch and then checks for disposed before performing
|
||||||
* another wait each time the the started wait times out.
|
* another wait each time the the started wait times out.
|
||||||
|
@ -1229,11 +1226,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void clearDownSubscriptions() {
|
|
||||||
subscriptionMapByLocalId.clear();
|
|
||||||
subscriptionMapByRemoteId.clear();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
|
protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info) throws IOException {
|
||||||
NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
|
NetworkBridgeFilterFactory filterFactory = defaultFilterFactory;
|
||||||
if (brokerService != null && brokerService.getDestinationPolicy() != null) {
|
if (brokerService != null && brokerService.getDestinationPolicy() != null) {
|
||||||
|
@ -1252,7 +1244,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
|
LOG.trace(configuration.getBrokerName() + " disconnecting local loop back connection for: " + remoteBrokerName + ", with id:" + remoteBrokerId);
|
||||||
}
|
}
|
||||||
waitStarted();
|
safeWaitUntilStarted();
|
||||||
ServiceSupport.dispose(this);
|
ServiceSupport.dispose(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue