https://issues.apache.org/jira/browse/AMQ-6331 - Honor excluded destinations in static routing on network connectors

This commit is contained in:
Dejan Bosanac 2016-06-21 15:07:30 +02:00
parent 27d955501f
commit 1faa4afa90
2 changed files with 73 additions and 50 deletions

View File

@ -1009,45 +1009,46 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
return; return;
} }
if (isPermissableDestination(md.getDestination())) {
if (message.isPersistent() || configuration.isAlwaysSyncSend()) {
if (message.isPersistent() || configuration.isAlwaysSyncSend()) { // The message was not sent using async send, so we should only
// ack the local broker when we get confirmation that the remote
// The message was not sent using async send, so we should only // broker has received the message.
// ack the local broker when we get confirmation that the remote remoteBroker.asyncRequest(message, new ResponseCallback() {
// broker has received the message. @Override
remoteBroker.asyncRequest(message, new ResponseCallback() { public void onCompletion(FutureResponse future) {
@Override
public void onCompletion(FutureResponse future) {
try { try {
Response response = future.getResult(); Response response = future.getResult();
if (response.isException()) { if (response.isException()) {
ExceptionResponse er = (ExceptionResponse) response; ExceptionResponse er = (ExceptionResponse) response;
serviceLocalException(md, er.getException()); serviceLocalException(md, er.getException());
} else { } else {
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
networkBridgeStatistics.getDequeues().increment(); networkBridgeStatistics.getDequeues().increment();
} }
} catch (IOException e) { } catch (IOException e) {
serviceLocalException(md, e); serviceLocalException(md, e);
} finally { } finally {
sub.decrementOutstandingResponses(); sub.decrementOutstandingResponses();
} }
} }
}); });
} else { } else {
// If the message was originally sent using async send, we will // If the message was originally sent using async send, we will
// preserve that QOS by bridging it using an async send (small chance // preserve that QOS by bridging it using an async send (small chance
// of message loss). // of message loss).
try { try {
remoteBroker.oneway(message); remoteBroker.oneway(message);
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
networkBridgeStatistics.getDequeues().increment(); networkBridgeStatistics.getDequeues().increment();
} finally { } finally {
sub.decrementOutstandingResponses(); sub.decrementOutstandingResponses();
} }
}
serviceOutbound(message);
} }
serviceOutbound(message);
} else { } else {
LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage()); LOG.debug("No subscription registered with this network bridge for consumerId: {} for message: {}", md.getConsumerId(), md.getMessage());
} }
@ -1132,17 +1133,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
} }
ActiveMQDestination[] dests = staticallyIncludedDestinations; ActiveMQDestination[] dests = excludedDestinations;
if (dests != null && dests.length > 0) {
for (ActiveMQDestination dest : dests) {
DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return true;
}
}
}
dests = excludedDestinations;
if (dests != null && dests.length > 0) { if (dests != null && dests.length > 0) {
for (ActiveMQDestination dest : dests) { for (ActiveMQDestination dest : dests) {
DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest); DestinationFilter exclusionFilter = DestinationFilter.parseFilter(dest);
@ -1152,6 +1143,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
} }
dests = staticallyIncludedDestinations;
if (dests != null && dests.length > 0) {
for (ActiveMQDestination dest : dests) {
DestinationFilter inclusionFilter = DestinationFilter.parseFilter(dest);
if (dest != null && inclusionFilter.matches(destination) && dest.getDestinationType() == destination.getDestinationType()) {
return true;
}
}
}
dests = dynamicallyIncludedDestinations; dests = dynamicallyIncludedDestinations;
if (dests != null && dests.length > 0) { if (dests != null && dests.length > 0) {
for (ActiveMQDestination dest : dests) { for (ActiveMQDestination dest : dests) {
@ -1173,14 +1174,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
ActiveMQDestination[] dests = staticallyIncludedDestinations; ActiveMQDestination[] dests = staticallyIncludedDestinations;
if (dests != null) { if (dests != null) {
for (ActiveMQDestination dest : dests) { for (ActiveMQDestination dest : dests) {
DemandSubscription sub = createDemandSubscription(dest); if (isPermissableDestination(dest)) {
sub.setStaticallyIncluded(true); DemandSubscription sub = createDemandSubscription(dest);
try { sub.setStaticallyIncluded(true);
addSubscription(sub); try {
} catch (IOException e) { addSubscription(sub);
LOG.error("Failed to add static destination {}", dest, e); } catch (IOException e) {
LOG.error("Failed to add static destination {}", dest, e);
}
LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
} else {
LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest);
} }
LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest);
} }
} }
} }

View File

@ -117,6 +117,24 @@ public class DemandForwardingBridgeFilterTest extends NetworkTestSupport {
assertReceiveNoMessageOn("OTHER.T2", ActiveMQDestination.TOPIC_TYPE); assertReceiveNoMessageOn("OTHER.T2", ActiveMQDestination.TOPIC_TYPE);
} }
public void testExcludeStaticDestinations() throws Exception {
NetworkBridgeConfiguration configuration = getDefaultBridgeConfiguration();
configuration.setExcludedDestinations(Arrays.asList(ActiveMQDestination.createDestination("TEST.X1", ActiveMQDestination.QUEUE_TYPE), ActiveMQDestination.createDestination("OTHER.X1", ActiveMQDestination.QUEUE_TYPE)));
configuration.setStaticallyIncludedDestinations(Arrays.asList(ActiveMQDestination.createDestination(
"TEST.>", ActiveMQDestination.QUEUE_TYPE), ActiveMQDestination.createDestination(
"OTHER.X1", ActiveMQDestination.QUEUE_TYPE), ActiveMQDestination.createDestination(
"OTHER.X2", ActiveMQDestination.QUEUE_TYPE)));
configureAndStartBridge(configuration);
assertReceiveNoMessageOn("TEST.X1", ActiveMQDestination.QUEUE_TYPE);
assertReceiveMessageOn("TEST.X2", ActiveMQDestination.QUEUE_TYPE);
assertReceiveNoMessageOn("OTHER.X1", ActiveMQDestination.QUEUE_TYPE);
assertReceiveMessageOn("OTHER.X2", ActiveMQDestination.QUEUE_TYPE);
}
private void assertReceiveMessageOn(String destinationName, byte destinationType) throws Exception, private void assertReceiveMessageOn(String destinationName, byte destinationType) throws Exception,
InterruptedException { InterruptedException {