[AMQ-6907] add selectorAware option to conditionalNetworkBridgeFilterFactory such that replay back to origin can happen if there are no matching local consumers

This commit is contained in:
gtully 2018-02-28 16:07:52 +00:00
parent efaf9cd77e
commit 82c9f9531e
2 changed files with 125 additions and 4 deletions

View File

@ -43,6 +43,7 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
int replayDelay = 0;
int rateLimit = 0;
int rateDuration = 1000;
private boolean selectorAware = false;
@Override
public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int messageTTL, int consumerTTL) {
@ -54,6 +55,7 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
filter.setRateLimit(getRateLimit());
filter.setRateDuration(getRateDuration());
filter.setReplayDelay(getReplayDelay());
filter.setSelectorAware(isSelectorAware());
return filter;
}
@ -89,6 +91,14 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
this.replayDelay = replayDelay;
}
public void setSelectorAware(boolean selectorAware) {
this.selectorAware = selectorAware;
}
public boolean isSelectorAware() {
return selectorAware;
}
private static class ConditionalNetworkBridgeFilter extends NetworkBridgeFilter {
final static Logger LOG = LoggerFactory.getLogger(ConditionalNetworkBridgeFilter.class);
private int rateLimit;
@ -98,6 +108,7 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
private int matchCount;
private long rateDurationEnd;
private boolean selectorAware = false;
@Override
protected boolean matchesForwardingFilter(Message message, final MessageEvaluationContext mec) {
@ -136,10 +147,23 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
List<Subscription> consumers = regionDestination.getConsumers();
for (Subscription sub : consumers) {
if (!sub.getConsumerInfo().isNetworkSubscription() && !sub.getConsumerInfo().isBrowser()) {
if (!isSelectorAware()) {
LOG.trace("Not replaying [{}] for [{}] to origin due to existing local consumer: {}", new Object[]{
message.getMessageId(), message.getDestination(), sub.getConsumerInfo()
});
return false;
} else {
try {
if (sub.matches(message, mec)) {
LOG.trace("Not replaying [{}] for [{}] to origin due to existing selector matching local consumer: {}", new Object[]{
message.getMessageId(), message.getDestination(), sub.getConsumerInfo()
});
return false;
}
} catch (Exception ignored) {}
}
}
}
return true;
@ -172,5 +196,13 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
public void setAllowReplayWhenNoConsumers(boolean allowReplayWhenNoConsumers) {
this.allowReplayWhenNoConsumers = allowReplayWhenNoConsumers;
}
public void setSelectorAware(boolean selectorAware) {
this.selectorAware = selectorAware;
}
public boolean isSelectorAware() {
return selectorAware;
}
}
}

View File

@ -178,6 +178,95 @@ public class AMQ4607Test extends JmsMultipleBrokersTestSupport implements Uncaug
}
public void testMigratingConsumerSelectorAwareTrue() throws Exception {
bridge("Broker0", "Broker1");
if (!duplex) bridge("Broker1", "Broker0");
ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
conditionalNetworkBridgeFilterFactory.setReplayDelay(0);
conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
conditionalNetworkBridgeFilterFactory.setSelectorAware(true);
brokers.get("Broker1").broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
startAllBrokers();
this.waitForBridgeFormation();
Destination dest = createDestination("TEST.FOO", false);
sendMessages("Broker0", dest, 1);
assertExactMessageCount("Broker0", dest, 1, TIMEOUT);
MessageConsumer messageConsumerNoMatch = createConsumer("Broker1", dest, "DoNotConsume = 'true'");
assertExactConsumersConnect("Broker0", dest, 1, TIMEOUT);
assertExactConsumersConnect("Broker1", dest, 1, TIMEOUT);
assertExactMessageCount("Broker1", dest, 1, TIMEOUT);
assertExactMessageCount("Broker0", dest, 0, TIMEOUT);
// now consume the message
final String brokerId = "Broker0";
MessageConsumer messageConsumer = createConsumer(brokerId, dest);
assertExactConsumersConnect("Broker0", dest, 2, TIMEOUT);
assertExactConsumersConnect("Broker1", dest, 2, TIMEOUT);
assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokers.get(brokerId).allMessages.getMessageIds().size() == 1;
}
}));
messageConsumer.close();
}
public void testMigratingConsumerSelectorAwareFalse() throws Exception {
bridge("Broker0", "Broker1");
if (!duplex) bridge("Broker1", "Broker0");
ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
conditionalNetworkBridgeFilterFactory.setReplayDelay(0);
conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
conditionalNetworkBridgeFilterFactory.setSelectorAware(false);
brokers.get("Broker1").broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
startAllBrokers();
this.waitForBridgeFormation();
Destination dest = createDestination("TEST.FOO", false);
sendMessages("Broker0", dest, 1);
assertExactMessageCount("Broker0", dest, 1, TIMEOUT);
MessageConsumer messageConsumerNoMatch = createConsumer("Broker1", dest, "DoNotConsume = 'true'");
assertExactConsumersConnect("Broker0", dest, 1, TIMEOUT);
assertExactConsumersConnect("Broker1", dest, 1, TIMEOUT);
assertExactMessageCount("Broker1", dest, 1, TIMEOUT);
assertExactMessageCount("Broker0", dest, 0, TIMEOUT);
// now try consume the message
final String brokerId = "Broker0";
MessageConsumer messageConsumer = createConsumer(brokerId, dest);
assertExactConsumersConnect("Broker0", dest, 2, TIMEOUT);
assertExactConsumersConnect("Broker1", dest, 2, TIMEOUT);
assertExactMessageCount("Broker1", dest, 1, TIMEOUT);
assertExactMessageCount("Broker0", dest, 0, TIMEOUT);
assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokers.get(brokerId).allMessages.getMessageIds().size() == 0;
}
}));
messageConsumer.close();
}
protected void assertExactMessageCount(final String brokerName, Destination destination, final int count, long timeout) throws Exception {
ManagementContext context = brokers.get(brokerName).broker.getManagementContext();
final QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);