mirror of https://github.com/apache/activemq.git
merge -c 835715 Fix for http://issues.apache.org/activemq/browse/AMQ-2439: IllegalState issue.
Tightening synchronization in DemandForwardingBridge to avoid subscription clean up and ack processing race condition. git-svn-id: https://svn.apache.org/repos/asf/activemq/branches/activemq-5.3@835920 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9bd267ed8d
commit
df8d3d4a63
|
@ -114,7 +114,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
protected ActiveMQDestination[] durableDestinations;
|
||||
protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByLocalId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
|
||||
protected final ConcurrentHashMap<ConsumerId, DemandSubscription> subscriptionMapByRemoteId = new ConcurrentHashMap<ConsumerId, DemandSubscription>();
|
||||
protected final BrokerId localBrokerPath[] = new BrokerId[] {null};
|
||||
protected final BrokerId localBrokerPath[] = new BrokerId[] { null };
|
||||
protected CountDownLatch startedLatch = new CountDownLatch(2);
|
||||
protected CountDownLatch localStartedLatch = new CountDownLatch(1);
|
||||
protected CountDownLatch remoteBrokerNameKnownLatch = new CountDownLatch(1);
|
||||
|
@ -131,7 +131,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
private BrokerInfo localBrokerInfo;
|
||||
private BrokerInfo remoteBrokerInfo;
|
||||
|
||||
|
||||
private AtomicBoolean started = new AtomicBoolean();
|
||||
private TransportConnection duplexInitiatingConnection;
|
||||
private BrokerService brokerService = null;
|
||||
|
@ -155,7 +154,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
localBroker.setTransportListener(new DefaultTransportListener() {
|
||||
|
||||
public void onCommand(Object o) {
|
||||
Command command = (Command)o;
|
||||
Command command = (Command) o;
|
||||
serviceLocalCommand(command);
|
||||
}
|
||||
|
||||
|
@ -166,7 +165,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
remoteBroker.setTransportListener(new TransportListener() {
|
||||
|
||||
public void onCommand(Object o) {
|
||||
Command command = (Command)o;
|
||||
Command command = (Command) o;
|
||||
serviceRemoteCommand(command);
|
||||
}
|
||||
|
||||
|
@ -290,10 +289,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
localConnectionInfo.setPassword(configuration.getPassword());
|
||||
Transport originalTransport = remoteBroker;
|
||||
while (originalTransport instanceof TransportFilter) {
|
||||
originalTransport = ((TransportFilter)originalTransport).getNext();
|
||||
originalTransport = ((TransportFilter) originalTransport).getNext();
|
||||
}
|
||||
if (originalTransport instanceof SslTransport) {
|
||||
X509Certificate[] peerCerts = ((SslTransport)originalTransport).getPeerCertificates();
|
||||
X509Certificate[] peerCerts = ((SslTransport) originalTransport).getPeerCertificates();
|
||||
localConnectionInfo.setTransportContext(peerCerts);
|
||||
}
|
||||
localBroker.oneway(localConnectionInfo);
|
||||
|
@ -388,7 +387,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
|
||||
}
|
||||
});
|
||||
if( !sendShutdown.await(10, TimeUnit.SECONDS) ) {
|
||||
if (!sendShutdown.await(10, TimeUnit.SECONDS)) {
|
||||
LOG.info("Network Could not shutdown in a timely manner");
|
||||
}
|
||||
} finally {
|
||||
|
@ -429,7 +428,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
try {
|
||||
if (command.isMessageDispatch()) {
|
||||
waitStarted();
|
||||
MessageDispatch md = (MessageDispatch)command;
|
||||
MessageDispatch md = (MessageDispatch) command;
|
||||
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
|
||||
demandConsumerDispatched++;
|
||||
if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
|
||||
|
@ -438,7 +437,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
}
|
||||
} else if (command.isBrokerInfo()) {
|
||||
lastConnectSucceeded.set(true);
|
||||
remoteBrokerInfo = (BrokerInfo)command;
|
||||
remoteBrokerInfo = (BrokerInfo) command;
|
||||
Properties props = MarshallingSupport.stringToProperties(remoteBrokerInfo.getNetworkProperties());
|
||||
try {
|
||||
IntrospectionSupport.getProperties(configuration, props, null);
|
||||
|
@ -463,12 +462,12 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
// Let the local broker know the remote broker's ID.
|
||||
localBroker.oneway(command);
|
||||
} else if (command.getClass() == ConnectionError.class) {
|
||||
ConnectionError ce = (ConnectionError)command;
|
||||
ConnectionError ce = (ConnectionError) command;
|
||||
serviceRemoteException(ce.getException());
|
||||
} else {
|
||||
if (isDuplex()) {
|
||||
if (command.isMessage()) {
|
||||
ActiveMQMessage message = (ActiveMQMessage)command;
|
||||
ActiveMQMessage message = (ActiveMQMessage) command;
|
||||
if (AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())) {
|
||||
serviceRemoteConsumerAdvisory(message.getDataStructure());
|
||||
} else {
|
||||
|
@ -496,16 +495,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
if (started.get()) {
|
||||
if (!addConsumerInfo((ConsumerInfo) command)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Ignoring ConsumerInfo: "+ command);
|
||||
LOG.debug("Ignoring ConsumerInfo: " + command);
|
||||
}
|
||||
} else {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Adding ConsumerInfo: "+ command);
|
||||
LOG.trace("Adding ConsumerInfo: " + command);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// received a subscription whilst stopping
|
||||
LOG.warn("Stopping - ignoring ConsumerInfo: "+ command);
|
||||
LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
|
@ -538,7 +537,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
final int networkTTL = configuration.getNetworkTTL();
|
||||
if (data.getClass() == ConsumerInfo.class) {
|
||||
// Create a new local subscription
|
||||
ConsumerInfo info = (ConsumerInfo)data;
|
||||
ConsumerInfo info = (ConsumerInfo) data;
|
||||
BrokerId[] path = info.getBrokerPath();
|
||||
|
||||
if (path != null && path.length >= networkTTL) {
|
||||
|
@ -564,7 +563,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
|
||||
// in a cyclic network there can be multiple bridges per broker that can propagate
|
||||
// a network subscription so there is a need to synchronise on a shared entity
|
||||
synchronized(brokerService.getVmConnectorURI()) {
|
||||
synchronized (brokerService.getVmConnectorURI()) {
|
||||
if (addConsumerInfo(info)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(configuration.getBrokerName() + " bridging sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
|
||||
|
@ -578,7 +577,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
} else if (data.getClass() == DestinationInfo.class) {
|
||||
// It's a destination info - we want to pass up
|
||||
// information about temporary destinations
|
||||
DestinationInfo destInfo = (DestinationInfo)data;
|
||||
DestinationInfo destInfo = (DestinationInfo) data;
|
||||
BrokerId[] path = destInfo.getBrokerPath();
|
||||
if (path != null && path.length >= networkTTL) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -597,7 +596,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
destInfo.setConnectionId(localConnectionInfo.getConnectionId());
|
||||
if (destInfo.getDestination() instanceof ActiveMQTempDestination) {
|
||||
// re-set connection id so comes from here
|
||||
ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destInfo.getDestination();
|
||||
ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destInfo.getDestination();
|
||||
tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
|
||||
}
|
||||
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(), getRemoteBrokerPath()));
|
||||
|
@ -606,7 +605,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
}
|
||||
localBroker.oneway(destInfo);
|
||||
} else if (data.getClass() == RemoveInfo.class) {
|
||||
ConsumerId id = (ConsumerId)((RemoveInfo)data).getObjectId();
|
||||
ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
|
||||
removeDemandSubscription(id);
|
||||
}
|
||||
}
|
||||
|
@ -673,17 +672,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
try {
|
||||
if (command.isMessageDispatch()) {
|
||||
enqueueCounter.incrementAndGet();
|
||||
final MessageDispatch md = (MessageDispatch)command;
|
||||
final MessageDispatch md = (MessageDispatch) command;
|
||||
final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
|
||||
if (sub != null && md.getMessage()!=null) {
|
||||
if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
|
||||
// See if this consumer's brokerPath tells us it came from the broker at the other end
|
||||
// of the bridge. I think we should be making this decision based on the message's
|
||||
// broker bread crumbs and not the consumer's? However, the message's broker bread
|
||||
// crumbs are null, which is another matter.
|
||||
boolean cameFromRemote = false;
|
||||
Object consumerInfo = md.getMessage().getDataStructure();
|
||||
if( consumerInfo != null && (consumerInfo instanceof ConsumerInfo) )
|
||||
cameFromRemote = contains( ((ConsumerInfo)consumerInfo).getBrokerPath(),remoteBrokerInfo.getBrokerId());
|
||||
if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo))
|
||||
cameFromRemote = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId());
|
||||
|
||||
Message message = configureMessage(md);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -697,16 +696,21 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
// by bridging it using an async send (small chance
|
||||
// of message loss).
|
||||
|
||||
try {
|
||||
// Don't send it off to the remote if it originally came from the remote.
|
||||
if( !cameFromRemote ) {
|
||||
if (!cameFromRemote) {
|
||||
remoteBroker.oneway(message);
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Message not forwarded on to remote, because message came from remote");
|
||||
}
|
||||
}
|
||||
|
||||
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
|
||||
dequeueCounter.incrementAndGet();
|
||||
} finally {
|
||||
sub.decrementOutstandingResponses();
|
||||
}
|
||||
|
||||
} else {
|
||||
|
||||
|
@ -719,7 +723,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
try {
|
||||
Response response = future.getResult();
|
||||
if (response.isException()) {
|
||||
ExceptionResponse er = (ExceptionResponse)response;
|
||||
ExceptionResponse er = (ExceptionResponse) response;
|
||||
serviceLocalException(er.getException());
|
||||
} else {
|
||||
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
|
||||
|
@ -735,7 +739,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
};
|
||||
|
||||
remoteBroker.asyncRequest(message, callback);
|
||||
sub.incrementOutstandingResponses();
|
||||
}
|
||||
|
||||
} else {
|
||||
|
@ -744,7 +747,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
}
|
||||
}
|
||||
} else if (command.isBrokerInfo()) {
|
||||
localBrokerInfo = (BrokerInfo)command;
|
||||
localBrokerInfo = (BrokerInfo) command;
|
||||
serviceLocalBrokerInfo(command);
|
||||
} else if (command.isShutdownInfo()) {
|
||||
LOG.info(configuration.getBrokerName() + " Shutting down");
|
||||
|
@ -757,7 +760,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
stop();
|
||||
}
|
||||
} else if (command.getClass() == ConnectionError.class) {
|
||||
ConnectionError ce = (ConnectionError)command;
|
||||
ConnectionError ce = (ConnectionError) command;
|
||||
serviceLocalException(ce.getException());
|
||||
} else {
|
||||
switch (command.getDataStructureType()) {
|
||||
|
@ -768,7 +771,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
}
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("Caught an exception processing local command",e);
|
||||
LOG.warn("Caught an exception processing local command", e);
|
||||
serviceLocalException(e);
|
||||
}
|
||||
}
|
||||
|
@ -883,7 +886,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
|
||||
protected BrokerId[] appendToBrokerPath(BrokerId[] brokerPath, BrokerId idToAppend) {
|
||||
if (brokerPath == null || brokerPath.length == 0) {
|
||||
return new BrokerId[] {idToAppend};
|
||||
return new BrokerId[] { idToAppend };
|
||||
}
|
||||
BrokerId rc[] = new BrokerId[brokerPath.length + 1];
|
||||
System.arraycopy(brokerPath, 0, rc, 0, brokerPath.length);
|
||||
|
@ -972,7 +975,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
addRemoteBrokerToBrokerPath(info);
|
||||
DemandSubscription sub = createDemandSubscription(info);
|
||||
if (sub != null) {
|
||||
if (duplicateSuppressionIsRequired(sub) ) {
|
||||
if (duplicateSuppressionIsRequired(sub)) {
|
||||
undoMapRegistration(sub);
|
||||
} else {
|
||||
addSubscription(sub);
|
||||
|
@ -1014,7 +1017,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
return suppress;
|
||||
}
|
||||
|
||||
|
||||
private boolean hasLowerPriority(Subscription existingSub, ConsumerInfo candidateInfo) {
|
||||
boolean suppress = false;
|
||||
|
||||
|
@ -1037,14 +1039,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
+ candidateInfo.getNetworkConsumerIds());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: "+ existingSub, e);
|
||||
LOG.error("Failed to remove duplicated sub as a result of sub with higher priority, sub: " + existingSub, e);
|
||||
}
|
||||
}
|
||||
return suppress;
|
||||
}
|
||||
|
||||
private void removeDuplicateSubscription(Subscription existingSub) throws IOException {
|
||||
for (NetworkConnector connector: brokerService.getNetworkConnectors()) {
|
||||
for (NetworkConnector connector : brokerService.getNetworkConnectors()) {
|
||||
if (connector.removeDemandSubscription(existingSub.getConsumerInfo().getConsumerId())) {
|
||||
break;
|
||||
}
|
||||
|
@ -1075,14 +1077,13 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
return doCreateDemandSubscription(info);
|
||||
}
|
||||
|
||||
|
||||
protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info) throws IOException {
|
||||
DemandSubscription result = new DemandSubscription(info);
|
||||
result.getLocalInfo().setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId()));
|
||||
if (info.getDestination().isTemporary()) {
|
||||
// reset the local connection Id
|
||||
|
||||
ActiveMQTempDestination dest = (ActiveMQTempDestination)result.getLocalInfo().getDestination();
|
||||
ActiveMQTempDestination dest = (ActiveMQTempDestination) result.getLocalInfo().getDestination();
|
||||
dest.setConnectionId(localConnectionInfo.getConnectionId().toString());
|
||||
}
|
||||
|
||||
|
@ -1101,7 +1102,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
return result;
|
||||
}
|
||||
|
||||
final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination){
|
||||
final protected DemandSubscription createDemandSubscription(ActiveMQDestination destination) {
|
||||
ConsumerInfo info = new ConsumerInfo();
|
||||
info.setDestination(destination);
|
||||
// the remote info held by the DemandSubscription holds the original
|
||||
|
@ -1113,7 +1114,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
try {
|
||||
result = createDemandSubscription(info);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to create DemandSubscription ",e);
|
||||
LOG.error("Failed to create DemandSubscription ", e);
|
||||
}
|
||||
if (result != null) {
|
||||
result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
|
||||
|
@ -1132,7 +1133,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
|
||||
}
|
||||
|
||||
|
||||
protected void removeDemandSubscription(ConsumerId id) throws IOException {
|
||||
DemandSubscription sub = subscriptionMapByRemoteId.remove(id);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -1178,7 +1178,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
|
||||
protected abstract void serviceRemoteBrokerInfo(Command command) throws IOException;
|
||||
|
||||
|
||||
protected abstract BrokerId[] getRemoteBrokerPath();
|
||||
|
||||
public void setNetworkBridgeListener(NetworkBridgeListener listener) {
|
||||
|
|
|
@ -37,6 +37,7 @@ public class DemandSubscription {
|
|||
private final ConsumerInfo remoteInfo;
|
||||
private final ConsumerInfo localInfo;
|
||||
private Set<ConsumerId> remoteSubsIds = new CopyOnWriteArraySet<ConsumerId>();
|
||||
|
||||
private AtomicInteger dispatched = new AtomicInteger(0);
|
||||
private AtomicBoolean activeWaiter = new AtomicBoolean();
|
||||
|
||||
|
@ -81,7 +82,6 @@ public class DemandSubscription {
|
|||
return localInfo;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return Returns the remoteInfo.
|
||||
*/
|
||||
|
@ -111,13 +111,18 @@ public class DemandSubscription {
|
|||
|
||||
public void decrementOutstandingResponses() {
|
||||
if (dispatched.decrementAndGet() == 0 && activeWaiter.get()) {
|
||||
synchronized(activeWaiter) {
|
||||
synchronized (activeWaiter) {
|
||||
activeWaiter.notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void incrementOutstandingResponses() {
|
||||
public boolean incrementOutstandingResponses() {
|
||||
dispatched.incrementAndGet();
|
||||
if (activeWaiter.get()) {
|
||||
decrementOutstandingResponses();
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -61,16 +61,23 @@ public class ResponseCorrelator extends TransportFilter {
|
|||
}
|
||||
|
||||
public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
|
||||
Command command = (Command)o;
|
||||
Command command = (Command) o;
|
||||
command.setCommandId(sequenceGenerator.getNextSequenceId());
|
||||
command.setResponseRequired(true);
|
||||
FutureResponse future = new FutureResponse(responseCallback);
|
||||
IOException priorError = null;
|
||||
synchronized (requestMap) {
|
||||
if( this.error !=null ) {
|
||||
throw error;
|
||||
}
|
||||
priorError = this.error;
|
||||
if (priorError == null) {
|
||||
requestMap.put(new Integer(command.getCommandId()), future);
|
||||
}
|
||||
}
|
||||
|
||||
if (priorError != null) {
|
||||
future.set(new ExceptionResponse(priorError));
|
||||
throw priorError;
|
||||
}
|
||||
|
||||
next.oneway(command);
|
||||
return future;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue