Added two new properties for configuration to a network bridge,
advisoryPrefetchSize and advisoryAckPercentage.  By default
advisoryPrefetchSize is set to 0, which is disabled, and will use the
prefetchSize value unless otherwise set.  Also added validation to
prefetchSize to make sure it is greater than 0 as 0 is not allowed.
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-04-27 14:07:33 +00:00
parent 4d6f4d7475
commit 297eadf746
5 changed files with 296 additions and 3 deletions

View File

@ -26,102 +26,137 @@ public class NetworkConnectorView implements NetworkConnectorViewMBean {
this.connector = connector;
}
@Override
public void start() throws Exception {
connector.start();
}
@Override
public void stop() throws Exception {
connector.stop();
}
@Override
public String getName() {
return connector.getName();
}
@Override
public int getMessageTTL() {
return connector.getMessageTTL();
}
@Override
public int getConsumerTTL() {
return connector.getConsumerTTL();
}
@Override
public int getPrefetchSize() {
return connector.getPrefetchSize();
}
@Override
public int getAdvisoryPrefetchSize() {
return connector.getAdvisoryPrefetchSize();
}
@Override
public String getUserName() {
return connector.getUserName();
}
@Override
public boolean isBridgeTempDestinations() {
return connector.isBridgeTempDestinations();
}
@Override
public boolean isConduitSubscriptions() {
return connector.isConduitSubscriptions();
}
@Override
public boolean isDecreaseNetworkConsumerPriority() {
return connector.isDecreaseNetworkConsumerPriority();
}
@Override
public boolean isDispatchAsync() {
return connector.isDispatchAsync();
}
@Override
public boolean isDynamicOnly() {
return connector.isDynamicOnly();
}
@Override
public boolean isDuplex() {
return connector.isDuplex();
}
@Override
public boolean isSuppressDuplicateQueueSubscriptions() {
return connector.isSuppressDuplicateQueueSubscriptions();
}
@Override
public boolean isSuppressDuplicateTopicSubscriptions() {
return connector.isSuppressDuplicateTopicSubscriptions();
}
@Override
public void setBridgeTempDestinations(boolean bridgeTempDestinations) {
connector.setBridgeTempDestinations(bridgeTempDestinations);
}
@Override
public void setConduitSubscriptions(boolean conduitSubscriptions) {
connector.setConduitSubscriptions(conduitSubscriptions);
}
@Override
public void setDispatchAsync(boolean dispatchAsync) {
connector.setDispatchAsync(dispatchAsync);
}
@Override
public void setDynamicOnly(boolean dynamicOnly) {
connector.setDynamicOnly(dynamicOnly);
}
@Override
public void setMessageTTL(int messageTTL) {
connector.setMessageTTL(messageTTL);
}
@Override
public void setConsumerTTL(int consumerTTL) {
connector.setConsumerTTL(consumerTTL);
}
@Override
public void setPassword(String password) {
connector.setPassword(password);
}
@Override
public void setPrefetchSize(int prefetchSize) {
connector.setPrefetchSize(prefetchSize);
}
@Override
public void setAdvisoryPrefetchSize(int advisoryPrefetchSize) {
connector.setAdvisoryPrefetchSize(advisoryPrefetchSize);
}
@Override
public void setUserName(String userName) {
connector.setUserName(userName);
}
@Override
public String getPassword() {
String pw = connector.getPassword();
// Hide the password for security reasons.
@ -131,14 +166,17 @@ public class NetworkConnectorView implements NetworkConnectorViewMBean {
return pw;
}
@Override
public void setDecreaseNetworkConsumerPriority(boolean decreaseNetworkConsumerPriority) {
connector.setDecreaseNetworkConsumerPriority(decreaseNetworkConsumerPriority);
}
@Override
public void setSuppressDuplicateQueueSubscriptions(boolean val) {
connector.setSuppressDuplicateQueueSubscriptions(val);
}
@Override
public void setSuppressDuplicateTopicSubscriptions(boolean val) {
connector.setSuppressDuplicateTopicSubscriptions(val);
}

View File

@ -28,6 +28,13 @@ public interface NetworkConnectorViewMBean extends Service {
int getPrefetchSize();
/**
* @return Advisory prefetch setting.
*/
@MBeanInfo("The prefetch setting for the advisory message consumer. If set to <= 0 then this setting is disabled "
+ "and the prefetchSize attribute is used instead for configuring the advisory consumer.")
int getAdvisoryPrefetchSize();
String getUserName();
boolean isBridgeTempDestinations();
@ -62,6 +69,8 @@ public interface NetworkConnectorViewMBean extends Service {
void setPrefetchSize(int prefetchSize);
void setAdvisoryPrefetchSize(int advisoryPrefetchSize);
void setUserName(String userName);
String getPassword();

View File

@ -570,7 +570,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
advisoryTopic += "," + AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC;
}
demandConsumerInfo.setDestination(new ActiveMQTopic(advisoryTopic));
demandConsumerInfo.setPrefetchSize(configuration.getPrefetchSize());
configureConsumerPrefetch(demandConsumerInfo);
remoteBroker.oneway(demandConsumerInfo);
}
startedLatch.countDown();
@ -726,7 +726,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
private void ackAdvisory(Message message) throws IOException {
demandConsumerDispatched++;
if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() * .75)) {
if (demandConsumerDispatched > (demandConsumerInfo.getPrefetchSize() *
(configuration.getAdvisoryAckPercentage() / 100f))) {
MessageAck ack = new MessageAck(message, MessageAck.STANDARD_ACK_TYPE, demandConsumerDispatched);
ack.setConsumerId(demandConsumerInfo.getConsumerId());
remoteBroker.oneway(ack);
@ -1364,7 +1365,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else {
sub.getLocalInfo().setDispatchAsync(configuration.isDispatchAsync());
}
sub.getLocalInfo().setPrefetchSize(configuration.getPrefetchSize());
configureConsumerPrefetch(sub.getLocalInfo());
subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
@ -1720,4 +1721,15 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return -1;
}
protected void configureConsumerPrefetch(ConsumerInfo consumerInfo) {
//If a consumer on an advisory topic and advisoryPrefetchSize has been explicitly
//set then use it, else default to the prefetchSize setting
if (AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination()) &&
configuration.getAdvisoryPrefetchSize() > 0) {
consumerInfo.setPrefetchSize(configuration.getAdvisoryPrefetchSize());
} else {
consumerInfo.setPrefetchSize(configuration.getPrefetchSize());
}
}
}

View File

@ -37,6 +37,12 @@ public class NetworkBridgeConfiguration {
private boolean duplex;
private boolean bridgeTempDestinations = true;
private int prefetchSize = 1000;
/**
* By default set to 0, which is disabled and prefetchSize value will be
* used instead.
*/
private int advisoryPrefetchSize = 0;
private int advisoryAckPercentage = 75;
private int networkTTL = 1;
private int consumerTTL = networkTTL;
private int messageTTL = networkTTL;
@ -205,9 +211,43 @@ public class NetworkBridgeConfiguration {
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
*/
public void setPrefetchSize(int prefetchSize) {
if (prefetchSize < 1) {
throw new IllegalArgumentException("prefetchSize must be > 0"
+ " because network consumers do not poll for messages.");
}
this.prefetchSize = prefetchSize;
}
public int getAdvisoryPrefetchSize() {
return advisoryPrefetchSize;
}
/**
* Prefetch size for advisory consumers. Just like prefetchSize, if set, this
* value must be greater than 0 because network consumers do not poll for messages.
* Setting this to 0 or less means this value is disabled and prefetchSize will be
* used instead.
*
* @param advisoryPrefetchSize
*/
public void setAdvisoryPrefetchSize(int advisoryPrefetchSize) {
this.advisoryPrefetchSize = advisoryPrefetchSize;
}
public int getAdvisoryAckPercentage() {
return advisoryAckPercentage;
}
/**
* @param advisoryAckPercentage the percentage of the advisory prefetch size
* value that can be dispatched before an ack will be sent, defaults to 75
* which means that when the number of received messages is greater than 75% of
* the prefetch size an ack will be sent back
*/
public void setAdvisoryAckPercentage(int advisoryAckPercentage) {
this.advisoryAckPercentage = advisoryAckPercentage;
}
/**
* @return the userName
*/

View File

@ -18,7 +18,9 @@ package org.apache.activemq.usecases;
import java.net.URI;
import java.util.Arrays;
import javax.jms.MessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
@ -39,6 +41,7 @@ public class AdvisoryViaNetworkTest extends JmsMultipleBrokersTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(AdvisoryViaNetworkTest.class);
@Override
protected BrokerService createBroker(String brokerName) throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
@ -76,6 +79,197 @@ public class AdvisoryViaNetworkTest extends JmsMultipleBrokersTestSupport {
messagesB.assertMessagesReceived(2);
}
/**
* Test that explicitly setting advisoryPrefetchSize works for advisory topics
* on a network connector
*
* @throws Exception
*/
public void testAdvisoryPrefetchSize() throws Exception {
ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
ActiveMQTopic topic1 = new ActiveMQTopic("A.FOO");
createBroker("A");
BrokerService brokerB = createBroker("B");
NetworkConnector networkBridge = bridgeBrokers("A", "B");
networkBridge.addStaticallyIncludedDestination(advisoryTopic);
networkBridge.addStaticallyIncludedDestination(topic1);
networkBridge.setDuplex(true);
networkBridge.setAdvisoryPrefetchSize(10);
networkBridge.setPrefetchSize(1);
startAllBrokers();
verifyPeerBrokerInfo(brokers.get("A"), 1);
createConsumer("A", topic1);
createConsumer("A", new ActiveMQTopic("A.FOO2"));
//verify that brokerB's advisory prefetch is 10 but normal topic prefetch is 1
assertEquals(10, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
assertEquals(1, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
//both advisory messages are not acked yet because of optimized acks
assertDeqInflight(0, 2);
}
/**
* Test that explicitly setting advisoryPrefetchSize to 1 works for advisory topics
* on a network connector
*
* @throws Exception
*/
public void testAdvisoryPrefetchSize1() throws Exception {
ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
ActiveMQTopic topic1 = new ActiveMQTopic("A.FOO");
createBroker("A");
BrokerService brokerB = createBroker("B");
NetworkConnector networkBridge = bridgeBrokers("A", "B");
networkBridge.addStaticallyIncludedDestination(advisoryTopic);
networkBridge.addStaticallyIncludedDestination(topic1);
networkBridge.setDuplex(true);
networkBridge.setAdvisoryPrefetchSize(1);
networkBridge.setPrefetchSize(10);
startAllBrokers();
verifyPeerBrokerInfo(brokers.get("A"), 1);
createConsumer("A", topic1);
createConsumer("A", new ActiveMQTopic("A.FOO2"));
//verify that brokerB's advisory prefetch is 1 but normal topic prefetch is 10
assertEquals(1, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
assertEquals(10, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
assertDeqInflight(2, 0);
}
/**
* Test that if advisoryPrefetchSize isn't set then prefetchSize is used instead
* for backwards compatibility
*
* @throws Exception
*/
public void testAdvisoryPrefetchSizeNotSet() throws Exception {
ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
ActiveMQTopic topic1 = new ActiveMQTopic("A.FOO");
createBroker("A");
BrokerService brokerB = createBroker("B");
NetworkConnector networkBridge = bridgeBrokers("A", "B");
networkBridge.addStaticallyIncludedDestination(advisoryTopic);
networkBridge.addStaticallyIncludedDestination(topic1);
networkBridge.setDuplex(true);
networkBridge.setPrefetchSize(10);
startAllBrokers();
verifyPeerBrokerInfo(brokers.get("A"), 1);
createConsumer("A", topic1);
createConsumer("A", new ActiveMQTopic("A.FOO2"));
//verify that both consumers have a prefetch of 10
assertEquals(10, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
assertEquals(10, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
assertDeqInflight(0, 2);
}
/**
* Test that if advisoryPrefetchSize isn't set then prefetchSize is used instead
* for backwards compatibility (test when set to 1)
*
* @throws Exception
*/
public void testPrefetchSize1() throws Exception {
ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
ActiveMQTopic topic1 = new ActiveMQTopic("A.FOO");
createBroker("A");
BrokerService brokerB = createBroker("B");
NetworkConnector networkBridge = bridgeBrokers("A", "B");
networkBridge.addStaticallyIncludedDestination(advisoryTopic);
networkBridge.setDuplex(true);
networkBridge.setPrefetchSize(1);
startAllBrokers();
verifyPeerBrokerInfo(brokers.get("A"), 1);
createConsumer("A", topic1);
createConsumer("A", new ActiveMQTopic("A.FOO2"));
//verify that both consumers have a prefetch of 1
assertEquals(1, brokerB.getDestination(advisoryTopic).getConsumers().get(0).getPrefetchSize());
assertEquals(1, brokerB.getDestination(topic1).getConsumers().get(0).getPrefetchSize());
assertDeqInflight(2, 0);
}
/**
* Test configuring the advisoryAckPercentage works with advisoryPrefetchSize
* @throws Exception
*/
public void testAdvisoryPrefetchSizePercent() throws Exception {
ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
createBroker("A");
createBroker("B");
NetworkConnector networkBridge = bridgeBrokers("A", "B");
networkBridge.addStaticallyIncludedDestination(advisoryTopic);
networkBridge.setDuplex(true);
networkBridge.setAdvisoryPrefetchSize(10);
networkBridge.setAdvisoryAckPercentage(65);
startAllBrokers();
verifyPeerBrokerInfo(brokers.get("A"), 1);
for (int i = 0; i < 10; i++) {
createConsumer("A", new ActiveMQTopic("A.FOO"));
}
assertDeqInflight(7, 3);
}
/**
* Test configuring the advisoryAckPercentage works when only prefetchSize exists
* and is applied against that instead for advisory consumers
*
* @throws Exception
*/
public void testPrefetchSizePercent() throws Exception {
ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Consumer.Topic.A.>");
createBroker("A");
createBroker("B");
NetworkConnector networkBridge = bridgeBrokers("A", "B");
networkBridge.addStaticallyIncludedDestination(advisoryTopic);
networkBridge.setDuplex(true);
networkBridge.setPrefetchSize(10);
networkBridge.setAdvisoryAckPercentage(65);
startAllBrokers();
verifyPeerBrokerInfo(brokers.get("A"), 1);
for (int i = 0; i < 10; i++) {
createConsumer("A", new ActiveMQTopic("A.FOO"));
}
assertDeqInflight(7, 3);
}
private void assertDeqInflight(final int dequeue, final int inflight) throws Exception {
assertTrue("deq and inflight as expected", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
RegionBroker regionBroker = (RegionBroker) brokers.get("A").broker.getRegionBroker();
LOG.info("A Deq:" + regionBroker.getDestinationStatistics().getDequeues().getCount());
LOG.info("A Inflight:" + regionBroker.getDestinationStatistics().getInflight().getCount());
return regionBroker.getDestinationStatistics().getDequeues().getCount() == dequeue
&& regionBroker.getDestinationStatistics().getInflight().getCount() == inflight;
}
}));
}
public void testAdvisoryForwardingDuplexNC() throws Exception {
ActiveMQTopic advisoryTopic = new ActiveMQTopic("ActiveMQ.Advisory.Producer.Topic.FOO");