https://issues.apache.org/jira/browse/AMQ-4607 - add network connector consumerTTL and messageTTL - split effect of networkTTL - allows a message many hops in a mesh while consumer demand is not repeatildy replicated. Rollback cursor aduit on forward so a message can be redispatched on redelivery. Additional test to verify multiple hops back to origin. Allow infinite ttl or hops with -1 https://issues.apache.org/jira/browse/AMQ-2180.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1497716 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2013-06-28 10:32:40 +00:00
parent 61ff570261
commit a95c6dba1b
21 changed files with 562 additions and 73 deletions

View File

@ -38,8 +38,12 @@ public class NetworkConnectorView implements NetworkConnectorViewMBean {
return connector.getName();
}
public int getNetworkTTL() {
return connector.getNetworkTTL();
public int getMessageTTL() {
return connector.getMessageTTL();
}
public int getConsumerTTL() {
return connector.getConsumerTTL();
}
public int getPrefetchSize() {
@ -98,8 +102,12 @@ public class NetworkConnectorView implements NetworkConnectorViewMBean {
connector.setDynamicOnly(dynamicOnly);
}
public void setNetworkTTL(int networkTTL) {
connector.setNetworkTTL(networkTTL);
public void setMessageTTL(int messageTTL) {
connector.setMessageTTL(messageTTL);
}
public void setConsumerTTL(int consumerTTL) {
connector.setConsumerTTL(consumerTTL);
}
public void setPassword(String password) {

View File

@ -22,7 +22,9 @@ public interface NetworkConnectorViewMBean extends Service {
String getName();
int getNetworkTTL();
int getMessageTTL();
int getConsumerTTL();
int getPrefetchSize();
@ -52,7 +54,9 @@ public interface NetworkConnectorViewMBean extends Service {
void setDynamicOnly(boolean dynamicOnly);
void setNetworkTTL(int networkTTL);
void setMessageTTL(int messageTTL);
void setConsumerTTL(int consumerTTL);
void setPassword(String password);

View File

@ -1790,7 +1790,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
});
}
}
if (ack.isPoisonAck()) {
if (ack.isPoisonAck() || (sub != null && sub.getConsumerInfo().isNetworkSubscription())) {
// message gone to DLQ, is ok to allow redelivery
messagesLock.writeLock().lock();
try{

View File

@ -107,7 +107,7 @@ public class AbortSlowConsumerStrategy implements SlowConsumerStrategy, Runnable
try {
LOG.info("aborting "
+ (abortSubscriberConnection ? "connection" : "consumer")
+ ", slow consumer: " + entry.getKey().getConsumerInfo().getConsumerId());
+ ", slow consumer: " + entry.getKey());
final Connection connection = connectionContext.getConnection();
if (connection != null) {

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.network;
import java.util.Arrays;
import java.util.List;
import org.apache.activemq.broker.region.Destination;
@ -31,7 +32,7 @@ import org.slf4j.LoggerFactory;
/**
* implement conditional behavior for queue consumers, allows replaying back to
* origin if no consumers are present on the local broker after a configurable
* delay, irrespective of the networkTTL Also allows rate limiting of messages
* delay, irrespective of the TTL. Also allows rate limiting of messages
* through the network, useful for static includes
*
* @org.apache.xbean.XBean
@ -44,10 +45,11 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
int rateDuration = 1000;
@Override
public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) {
public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int messageTTL, int consumerTTL) {
ConditionalNetworkBridgeFilter filter = new ConditionalNetworkBridgeFilter();
filter.setNetworkBrokerId(remoteBrokerPath[0]);
filter.setNetworkTTL(networkTimeToLive);
filter.setMessageTTL(messageTTL);
filter.setConsumerTTL(consumerTTL);
filter.setAllowReplayWhenNoConsumers(isReplayWhenNoConsumers());
filter.setRateLimit(getRateLimit());
filter.setRateDuration(getRateDuration());
@ -104,9 +106,15 @@ public class ConditionalNetworkBridgeFilterFactory implements NetworkBridgeFilte
// potential replay back to origin
match = allowReplayWhenNoConsumers && hasNoLocalConsumers(message, mec) && hasNotJustArrived(message);
if (match && LOG.isTraceEnabled()) {
LOG.trace("Replaying [" + message.getMessageId() + "] for [" + message.getDestination()
+ "] back to origin in the absence of a local consumer");
if (LOG.isTraceEnabled()) {
if (match) {
LOG.trace("Replaying [" + message.getMessageId() + "] for [" + message.getDestination()
+ "] back to origin in the absence of a local consumer");
} else {
LOG.trace("Suppressing replay of [" + message.getMessageId() + "] for [" + message.getDestination()
+ "] back to origin " + Arrays.asList(message.getBrokerPath()));
}
}
} else {

View File

@ -80,8 +80,7 @@ public class ConduitBridge extends DemandForwardingBridge {
DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination());
if (filter.matches(info.getDestination())) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " matched (add interest) to exsting sub for: " +
ds.getRemoteInfo() + " with sub: " + info.getConsumerId());
LOG.debug(configuration.getBrokerName() + " " + info + " with ids" + info.getNetworkConsumerIds() + " matched (add interest) " + ds);
}
// add the interest in the subscription
if (checkPaths(info.getBrokerPath(), ds.getRemoteInfo().getBrokerPath())) {
@ -105,7 +104,7 @@ public class ConduitBridge extends DemandForwardingBridge {
for (DemandSubscription ds : subscriptionMapByLocalId.values()) {
if (ds.remove(id)) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " removing interest in sub on " + localBroker + " from " + remoteBrokerName + " : sub: " + id + " existing matched sub: " + ds.getRemoteInfo());
LOG.debug(configuration.getBrokerName() + " on " + localBroker + " from " + remoteBrokerName + " removed interest for: " + id + " from " + ds);
}
}
if (ds.isEmpty()) {
@ -116,7 +115,7 @@ public class ConduitBridge extends DemandForwardingBridge {
for (DemandSubscription ds : tmpList) {
removeSubscription(ds);
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " removing sub on " + localBroker + " from " + remoteBrokerName + " : " + ds.getRemoteInfo());
LOG.debug(configuration.getBrokerName() + " on " + localBroker + " from " + remoteBrokerName + " removed " + ds);
}
}
}

View File

@ -27,7 +27,7 @@ import org.apache.activemq.command.NetworkBridgeFilter;
* @org.apache.xbean.XBean
*/
public class DefaultNetworkBridgeFilterFactory implements NetworkBridgeFilterFactory {
public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive) {
return new NetworkBridgeFilter(info, remoteBrokerPath[0], networkTimeToLive);
public NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int messageTTL, int consumerTTL) {
return new NetworkBridgeFilter(info, remoteBrokerPath[0], messageTTL, consumerTTL);
}
}

View File

@ -503,6 +503,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// set our properties
Properties props = new Properties();
IntrospectionSupport.getProperties(configuration, props, null);
props.remove("networkTTL");
String str = MarshallingSupport.propertiesToString(props);
brokerInfo.setNetworkProperties(str);
brokerInfo.setBrokerId(this.localBrokerId);
@ -634,15 +635,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
case ConsumerInfo.DATA_STRUCTURE_TYPE:
localStartedLatch.await();
if (started.get()) {
if (!addConsumerInfo((ConsumerInfo) command)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring ConsumerInfo: " + command);
}
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Adding ConsumerInfo: " + command);
}
}
addConsumerInfo((ConsumerInfo) command);
} else {
// received a subscription whilst stopping
LOG.warn("Stopping - ignoring ConsumerInfo: " + command);
@ -691,7 +684,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
private void serviceRemoteConsumerAdvisory(DataStructure data) throws IOException {
final int networkTTL = configuration.getNetworkTTL();
final int networkTTL = configuration.getConsumerTTL();
if (data.getClass() == ConsumerInfo.class) {
// Create a new local subscription
ConsumerInfo info = (ConsumerInfo) data;
@ -704,7 +697,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
return;
}
if (path != null && path.length >= networkTTL) {
if (path != null && networkTTL > -1 && path.length >= networkTTL) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName + ", restricted to " + networkTTL
+ " network hops only : " + info);
@ -732,29 +725,19 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// in a cyclic network there can be multiple bridges per broker that can propagate
// a network subscription so there is a need to synchronize on a shared entity
synchronized (brokerService.getVmConnectorURI()) {
if (addConsumerInfo(info)) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " bridged sub on " + localBroker + " from " + remoteBrokerName + " : " + info);
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring sub from " + remoteBrokerName
+ " as already subscribed to matching destination : " + info);
}
}
addConsumerInfo(info);
}
} else if (data.getClass() == DestinationInfo.class) {
// It's a destination info - we want to pass up information about temporary destinations
final DestinationInfo destInfo = (DestinationInfo) data;
BrokerId[] path = destInfo.getBrokerPath();
if (path != null && path.length >= networkTTL) {
if (path != null && networkTTL > -1 && path.length >= networkTTL) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " restricted to " + networkTTL + " network hops only");
}
return;
}
if (contains(destInfo.getBrokerPath(), localBrokerPath[0])) {
// Ignore this consumer as it's a consumer we locally sent to the broker.
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " Ignoring destination " + destInfo + " already routed through this broker once");
}
@ -958,7 +941,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
if (suppressMessageDispatch(md, sub)) {
if (LOG.isDebugEnabled()) {
LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName
+ " because message came from there or fails networkTTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath())
+ " because message came from there or fails TTL, brokerPath: " + Arrays.toString(md.getMessage().getBrokerPath())
+ ", message: " + md.getMessage());
}
// still ack as it may be durable
@ -1165,8 +1148,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}
}
protected boolean addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
boolean consumerAdded = false;
protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws IOException {
ConsumerInfo info = consumerInfo.copy();
addRemoteBrokerToBrokerPath(info);
DemandSubscription sub = createDemandSubscription(info);
@ -1178,10 +1160,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
sub.getDurableRemoteSubs().add(new SubscriptionInfo(sub.getRemoteInfo().getClientId(), consumerInfo.getSubscriptionName()));
}
addSubscription(sub);
consumerAdded = true;
LOG.debug(configuration.getBrokerName() + " new demand subscription: " + sub);
}
}
return consumerAdded;
}
private void undoMapRegistration(DemandSubscription sub) {
@ -1421,7 +1402,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
filterFactory = entry.getNetworkBridgeFilterFactory();
}
}
return filterFactory.create(info, getRemoteBrokerPath(), configuration.getNetworkTTL());
return filterFactory.create(info, getRemoteBrokerPath(), configuration.getMessageTTL(), configuration.getConsumerTTL());
}
protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) throws IOException {

View File

@ -52,6 +52,11 @@ public class DemandSubscription {
remoteSubsIds.add(info.getConsumerId());
}
@Override
public String toString() {
return "DemandSub{" + localInfo.getConsumerId() + ",remotes:" + remoteSubsIds + "}";
}
/**
* Increment the consumers associated with this subscription
*

View File

@ -32,6 +32,9 @@ import org.slf4j.LoggerFactory;
public class DurableConduitBridge extends ConduitBridge {
private static final Logger LOG = LoggerFactory.getLogger(DurableConduitBridge.class);
public String toString() {
return "DurableConduitBridge";
}
/**
* Constructor
*

View File

@ -302,6 +302,8 @@ public class LdapNetworkConnector extends NetworkConnector implements NamespaceC
connector.setDynamicOnly(isDynamicOnly());
connector.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
connector.setNetworkTTL(getNetworkTTL());
connector.setConsumerTTL(getConsumerTTL());
connector.setMessageTTL(getMessageTTL());
connector.setConduitSubscriptions(isConduitSubscriptions());
connector.setExcludedDestinations(getExcludedDestinations());
connector.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());

View File

@ -40,6 +40,9 @@ public class NetworkBridgeConfiguration {
private boolean bridgeTempDestinations = true;
private int prefetchSize = 1000;
private int networkTTL = 1;
private int consumerTTL = networkTTL;
private int messageTTL = networkTTL;
private String brokerName = "localhost";
private String brokerURL = "";
private String userName;
@ -170,6 +173,8 @@ public class NetworkBridgeConfiguration {
*/
public void setNetworkTTL(int networkTTL) {
this.networkTTL = networkTTL;
setConsumerTTL(networkTTL);
setMessageTTL(networkTTL);
}
/**
@ -394,4 +399,20 @@ public class NetworkBridgeConfiguration {
public void setAdvisoryForFailedForward(boolean advisoryForFailedForward) {
this.advisoryForFailedForward = advisoryForFailedForward;
}
public void setConsumerTTL(int consumerTTL) {
this.consumerTTL = consumerTTL;
}
public int getConsumerTTL() {
return consumerTTL;
}
public void setMessageTTL(int messageTTL) {
this.messageTTL = messageTTL;
}
public int getMessageTTL() {
return messageTTL;
}
}

View File

@ -24,5 +24,5 @@ import org.apache.activemq.command.NetworkBridgeFilter;
public interface NetworkBridgeFilterFactory {
// create a dispatch filter for network consumers, default impl will not send a message back to
// its origin to prevent looping, the down side is that messages can get stuck
NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int networkTimeToLive);
NetworkBridgeFilter create(ConsumerInfo info, BrokerId[] remoteBrokerPath, int messageTTL, int consumerTTL);
}

View File

@ -276,13 +276,13 @@
</tasks>
</configuration>
<dependencies>
<dependency>
<!-- not needed on osx; dependency>
<groupId>com.sun</groupId>
<artifactId>tools</artifactId>
<version>1.6.0</version>
<scope>system</scope>
<systemPath>${java.home}/../lib/tools.jar</systemPath>
</dependency>
</dependency -->
</dependencies>
</plugin>
</plugins>

View File

@ -2033,7 +2033,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
} catch (JMSException e) {
LOG.warn("Exception closing consumer", e);
}
LOG.warn("Closed consumer on Command");
LOG.warn("Closed consumer on Command, " + id);
break;
}
}

View File

@ -28,7 +28,7 @@ import java.util.Arrays;
/**
* @openwire:marshaller code="91"
*
*
*/
public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
@ -36,15 +36,17 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
static final Logger LOG = LoggerFactory.getLogger(NetworkBridgeFilter.class);
protected BrokerId networkBrokerId;
protected int networkTTL;
protected int messageTTL;
protected int consumerTTL;
transient ConsumerInfo consumerInfo;
public NetworkBridgeFilter() {
}
public NetworkBridgeFilter(ConsumerInfo consumerInfo, BrokerId networkBrokerId, int networkTTL) {
public NetworkBridgeFilter(ConsumerInfo consumerInfo, BrokerId networkBrokerId, int messageTTL, int consumerTTL) {
this.networkBrokerId = networkBrokerId;
this.networkTTL = networkTTL;
this.messageTTL = messageTTL;
this.consumerTTL = consumerTTL;
this.consumerInfo = consumerInfo;
}
@ -86,9 +88,9 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
int hops = message.getBrokerPath() == null ? 0 : message.getBrokerPath().length;
if (hops >= networkTTL) {
if (messageTTL > -1 && hops >= messageTTL) {
if (LOG.isTraceEnabled()) {
LOG.trace("Message restricted to " + networkTTL + " network hops ignoring: " + message);
LOG.trace("Message restricted to " + messageTTL + " network hops ignoring: " + message);
}
return false;
}
@ -103,9 +105,9 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
} else if ( message.getDataStructure() != null && message.getDataStructure().getDataStructureType() == CommandTypes.CONSUMER_INFO) {
ConsumerInfo info = (ConsumerInfo)message.getDataStructure();
hops = info.getBrokerPath() == null ? 0 : info.getBrokerPath().length;
if (hops >= networkTTL) {
if (consumerTTL > -1 && hops >= consumerTTL) {
if (LOG.isTraceEnabled()) {
LOG.trace("ConsumerInfo advisory restricted to " + networkTTL + " network hops ignoring: " + message);
LOG.trace("ConsumerInfo advisory restricted to " + consumerTTL + " network hops ignoring: " + message);
}
return false;
}
@ -132,15 +134,15 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
return false;
}
/**
* @openwire:property version=1
*/
// keep for backward compat with older
// wire formats
public int getNetworkTTL() {
return networkTTL;
return messageTTL;
}
public void setNetworkTTL(int networkTTL) {
this.networkTTL = networkTTL;
messageTTL = networkTTL;
consumerTTL = networkTTL;
}
/**
@ -154,4 +156,25 @@ public class NetworkBridgeFilter implements DataStructure, BooleanExpression {
this.networkBrokerId = remoteBrokerPath;
}
public void setMessageTTL(int messageTTL) {
this.messageTTL = messageTTL;
}
/**
* @openwire:property version=10
*/
public int getMessageTTL() {
return this.messageTTL;
}
public void setConsumerTTL(int consumerTTL) {
this.consumerTTL = consumerTTL;
}
/**
* @openwire:property version=10
*/
public int getConsumerTTL() {
return this.consumerTTL;
}
}

View File

@ -66,8 +66,9 @@ public class NetworkBridgeFilterMarshaller extends BaseDataStreamMarshaller {
super.tightUnmarshal(wireFormat, o, dataIn, bs);
NetworkBridgeFilter info = (NetworkBridgeFilter)o;
info.setNetworkTTL(dataIn.readInt());
info.setNetworkBrokerId((org.apache.activemq.command.BrokerId) tightUnmarsalCachedObject(wireFormat, dataIn, bs));
info.setMessageTTL(dataIn.readInt());
info.setConsumerTTL(dataIn.readInt());
}
@ -82,7 +83,7 @@ public class NetworkBridgeFilterMarshaller extends BaseDataStreamMarshaller {
int rc = super.tightMarshal1(wireFormat, o, bs);
rc += tightMarshalCachedObject1(wireFormat, (DataStructure)info.getNetworkBrokerId(), bs);
return rc + 4;
return rc + 8;
}
/**
@ -96,8 +97,9 @@ public class NetworkBridgeFilterMarshaller extends BaseDataStreamMarshaller {
super.tightMarshal2(wireFormat, o, dataOut, bs);
NetworkBridgeFilter info = (NetworkBridgeFilter)o;
dataOut.writeInt(info.getNetworkTTL());
tightMarshalCachedObject2(wireFormat, (DataStructure)info.getNetworkBrokerId(), dataOut, bs);
dataOut.writeInt(info.getMessageTTL());
dataOut.writeInt(info.getConsumerTTL());
}
@ -112,8 +114,9 @@ public class NetworkBridgeFilterMarshaller extends BaseDataStreamMarshaller {
super.looseUnmarshal(wireFormat, o, dataIn);
NetworkBridgeFilter info = (NetworkBridgeFilter)o;
info.setNetworkTTL(dataIn.readInt());
info.setNetworkBrokerId((org.apache.activemq.command.BrokerId) looseUnmarsalCachedObject(wireFormat, dataIn));
info.setMessageTTL(dataIn.readInt());
info.setConsumerTTL(dataIn.readInt());
}
@ -126,8 +129,9 @@ public class NetworkBridgeFilterMarshaller extends BaseDataStreamMarshaller {
NetworkBridgeFilter info = (NetworkBridgeFilter)o;
super.looseMarshal(wireFormat, o, dataOut);
dataOut.writeInt(info.getNetworkTTL());
looseMarshalCachedObject(wireFormat, (DataStructure)info.getNetworkBrokerId(), dataOut);
dataOut.writeInt(info.getMessageTTL());
dataOut.writeInt(info.getConsumerTTL());
}
}

View File

@ -389,7 +389,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
}
return null;
}
protected void assertConsumersConnect(String brokerName, Destination destination, final int count, long timeout) throws Exception {
BrokerItem brokerItem = brokers.get(brokerName);
Connection conn = brokerItem.createConnection();
@ -528,6 +528,7 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
this.broker = broker;
factory = new ActiveMQConnectionFactory(broker.getVmConnectorURI());
factory.setConnectionIDPrefix(broker.getBrokerName());
consumers = Collections.synchronizedMap(new HashMap<MessageConsumer, MessageIdList>());
connections = Collections.synchronizedList(new ArrayList<Connection>());
allMessages.setVerbose(verbose);

View File

@ -0,0 +1,246 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import java.lang.Thread.UncaughtExceptionHandler;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import junit.framework.Test;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AMQ4607Test extends JmsMultipleBrokersTestSupport implements UncaughtExceptionHandler {
private static final Logger LOG = LoggerFactory.getLogger(AMQ4607Test.class);
public static final int BROKER_COUNT = 3;
public static final int CONSUMER_COUNT = 1;
public static final int MESSAGE_COUNT = 0;
public static final boolean CONDUIT = true;
public static final int TIMEOUT = 20000;
public boolean duplex = true;
protected Map<String, MessageConsumer> consumerMap;
Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>();
private void assertNoUnhandeledExceptions() {
for( Entry<Thread, Throwable> e: unhandeledExceptions.entrySet()) {
LOG.error("Thread:" + e.getKey() + " Had unexpected: " + e.getValue());
}
assertTrue("There are no unhandelled exceptions, see: log for detail on: " + unhandeledExceptions,
unhandeledExceptions.isEmpty());
}
public NetworkConnector bridge(String from, String to) throws Exception {
NetworkConnector networkConnector = bridgeBrokers(from, to, true, -1, CONDUIT);
networkConnector.setSuppressDuplicateQueueSubscriptions(true);
networkConnector.setDecreaseNetworkConsumerPriority(true);
networkConnector.setConsumerTTL(1);
networkConnector.setDuplex(duplex);
return networkConnector;
}
public static Test suite() {
return suite(AMQ4607Test.class);
}
public void initCombos() {
addCombinationValues("duplex", new Boolean[]{Boolean.TRUE, Boolean.FALSE});
}
public void testMigratingConsumer() throws Exception {
bridge("Broker0", "Broker1");
if (!duplex) bridge("Broker1", "Broker0");
bridge("Broker1", "Broker2");
if (!duplex) bridge("Broker2", "Broker1");
bridge("Broker0", "Broker2");
if (!duplex) bridge("Broker2", "Broker0");
startAllBrokers();
this.waitForBridgeFormation();
Destination dest = createDestination("TEST.FOO", false);
sendMessages("Broker0", dest, 1);
for (int i=0; i< BROKER_COUNT; i++) {
MessageConsumer messageConsumer = createConsumer("Broker" + i, dest, "DoNotConsume = 'true'");
for (int J = 0; J < BROKER_COUNT; J++) {
assertExactConsumersConnect("Broker" + J, dest, CONSUMER_COUNT, TIMEOUT);
}
assertNoUnhandeledExceptions();
assertExactMessageCount("Broker" + i, dest, 1, TIMEOUT);
messageConsumer.close();
LOG.info("Check for no consumers..");
for (int J = 0; J < BROKER_COUNT; J++) {
assertExactConsumersConnect("Broker" + J, dest, 0, TIMEOUT);
}
}
// now consume the message
final String brokerId = "Broker2";
MessageConsumer messageConsumer = createConsumer(brokerId, dest);
assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokers.get(brokerId).allMessages.getMessageIds().size() == 1;
}
}));
messageConsumer.close();
}
public void testMigratingConsumerFullCircle() throws Exception {
bridge("Broker0", "Broker1");
if (!duplex) bridge("Broker1", "Broker0");
bridge("Broker1", "Broker2");
if (!duplex) bridge("Broker2", "Broker1");
bridge("Broker0", "Broker2");
if (!duplex) bridge("Broker2", "Broker0");
// allow full loop, immediate replay back to 0 from 2
ConditionalNetworkBridgeFilterFactory conditionalNetworkBridgeFilterFactory = new ConditionalNetworkBridgeFilterFactory();
conditionalNetworkBridgeFilterFactory.setReplayDelay(0);
conditionalNetworkBridgeFilterFactory.setReplayWhenNoConsumers(true);
brokers.get("Broker2").broker.getDestinationPolicy().getDefaultEntry().setNetworkBridgeFilterFactory(conditionalNetworkBridgeFilterFactory);
startAllBrokers();
this.waitForBridgeFormation();
Destination dest = createDestination("TEST.FOO", false);
sendMessages("Broker0", dest, 1);
for (int i=0; i< BROKER_COUNT; i++) {
MessageConsumer messageConsumer = createConsumer("Broker" + i, dest, "DoNotConsume = 'true'");
for (int J = 0; J < BROKER_COUNT; J++) {
assertExactConsumersConnect("Broker" + J, dest, CONSUMER_COUNT, TIMEOUT);
}
assertNoUnhandeledExceptions();
// validate the message has been forwarded
assertExactMessageCount("Broker" + i, dest, 1, TIMEOUT);
messageConsumer.close();
LOG.info("Check for no consumers..");
for (int J = 0; J < BROKER_COUNT; J++) {
assertExactConsumersConnect("Broker" + J, dest, 0, TIMEOUT);
}
}
// now consume the message from the origin
LOG.info("Consume from origin...");
final String brokerId = "Broker0";
MessageConsumer messageConsumer = createConsumer(brokerId, dest);
assertTrue("Consumed ok", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return brokers.get(brokerId).allMessages.getMessageIds().size() == 1;
}
}));
messageConsumer.close();
}
protected void assertExactMessageCount(final String brokerName, Destination destination, final int count, long timeout) throws Exception {
ManagementContext context = brokers.get(brokerName).broker.getManagementContext();
final QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
assertTrue("Excepected queue depth: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
long currentCount = queueViewMBean.getQueueSize();
LOG.info("On " + brokerName + " current queue size for " + queueViewMBean + ", " + currentCount);
if (count != currentCount) {
LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions()));
}
return currentCount == count;
}
}, timeout));
}
protected void assertExactConsumersConnect(final String brokerName, Destination destination, final int count, long timeout) throws Exception {
final ManagementContext context = brokers.get(brokerName).broker.getManagementContext();
assertTrue("Excepected consumers count: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
try {
QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
long currentCount = queueViewMBean.getConsumerCount();
LOG.info("On " + brokerName + " current consumer count for " + queueViewMBean + ", " + currentCount);
if (count != currentCount) {
LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions()));
}
return currentCount == count;
} catch (Exception e) {
LOG.warn("Unexpected: " + e, e);
return false;
}
}
}, timeout));
}
public void setUp() throws Exception {
super.setUp();
unhandeledExceptions.clear();
Thread.setDefaultUncaughtExceptionHandler(this);
// Setup n brokers
for (int i = 0; i < BROKER_COUNT; i++) {
createBroker(new URI("broker:(tcp://localhost:6161" + i + ")/Broker" + i + "?persistent=false&useJmx=true"));
}
consumerMap = new LinkedHashMap<String, MessageConsumer>();
}
@Override
protected void configureBroker(BrokerService brokerService) {
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setExpireMessagesPeriod(0);
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(policyEntry);
brokerService.setDestinationPolicy(policyMap);
}
public void uncaughtException(Thread t, Throwable e) {
synchronized(unhandeledExceptions) {
unhandeledExceptions.put(t,e);
}
}
}

View File

@ -324,7 +324,7 @@ public abstract class DataFileGeneratorTestSupport extends TestSupport {
}
protected BooleanExpression createBooleanExpression(String string) {
return new NetworkBridgeFilter(null, new BrokerId(string), 10);
return new NetworkBridgeFilter(null, new BrokerId(string), 10, 10);
}
}

View File

@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.lang.Thread.UncaughtExceptionHandler;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTestSupport implements UncaughtExceptionHandler {
public static final int BROKER_COUNT = 3;
public static final int CONSUMER_COUNT = 1;
public static final int MESSAGE_COUNT = 0;
public static final boolean DUPLEX = false;
public static final boolean CONDUIT = true;
// NETWORK_TTL=4 is problematic for consumer/demand propagation
// needs setConsumerTTL=1 to override
public static final int NETWORK_TTL = 4;
private static final Logger LOG = LoggerFactory.getLogger(VerifyNetworkConsumersDisconnectTest.class);
public static final int TIMEOUT = 30000;
protected Map<String, MessageConsumer> consumerMap;
Map<Thread, Throwable> unhandeledExceptions = new HashMap<Thread, Throwable>();
private void assertNoUnhandeledExceptions() {
for( Entry<Thread, Throwable> e: unhandeledExceptions.entrySet()) {
LOG.error("Thread:" + e.getKey() + " Had unexpected: " + e.getValue());
}
assertTrue("There are no unhandelled exceptions, see: log for detail on: " + unhandeledExceptions,
unhandeledExceptions.isEmpty());
}
public NetworkConnector bridge(String from, String to) throws Exception {
NetworkConnector networkConnector = bridgeBrokers(from, to, true, NETWORK_TTL, CONDUIT);
networkConnector.setSuppressDuplicateQueueSubscriptions(true);
networkConnector.setDecreaseNetworkConsumerPriority(true);
networkConnector.setDuplex(DUPLEX);
// infinite ttl for messages in a mesh
networkConnector.setMessageTTL(-1);
// one hop for consumers in a mesh
networkConnector.setConsumerTTL(1);
return networkConnector;
}
public void testQueueAllConnected() throws Exception {
bridge("Broker0", "Broker1");
if (!DUPLEX) bridge("Broker1", "Broker0");
bridge("Broker1", "Broker2");
if (!DUPLEX) bridge("Broker2", "Broker1");
startAllBrokers();
this.waitForBridgeFormation();
Destination dest = createDestination("TEST.FOO", false);
// Setup consumers
for (int i = 0; i < BROKER_COUNT; i++) {
consumerMap.put("Consumer:" + i + ":0", createConsumer("Broker" + i, dest));
}
assertExactConsumersConnect("Broker0", dest, 2, TIMEOUT);
assertExactConsumersConnect("Broker2", dest, 2, TIMEOUT);
// piggy in the middle
assertExactConsumersConnect("Broker1", dest, 3, TIMEOUT);
assertNoUnhandeledExceptions();
LOG.info("Complate the mesh - 0->2");
// shorter route
NetworkConnector nc = bridge("Broker0", "Broker2");
nc.setBrokerName("Broker0");
nc.start();
if (!DUPLEX) {
LOG.info("... complate the mesh - 2->0");
nc = bridge("Broker2", "Broker0");
nc.setBrokerName("Broker2");
nc.start();
}
// wait for consumers to get propagated
for (int i = 0; i < BROKER_COUNT; i++) {
assertExactConsumersConnect("Broker" + i, dest, 3, TIMEOUT);
}
// reverse order close
consumerMap.get("Consumer:" + 2 + ":0").close();
TimeUnit.SECONDS.sleep(1);
consumerMap.get("Consumer:" + 1 + ":0").close();
TimeUnit.SECONDS.sleep(1);
consumerMap.get("Consumer:" + 0 + ":0").close();
LOG.info("Check for no consumers..");
for (int i = 0; i < BROKER_COUNT; i++) {
assertExactConsumersConnect("Broker" + i, dest, 0, TIMEOUT);
}
}
protected void assertExactConsumersConnect(final String brokerName, Destination destination, final int count, long timeout) throws Exception {
final ManagementContext context = brokers.get(brokerName).broker.getManagementContext();
assertTrue("Excepected consumers count: " + count + " on: " + brokerName, Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
try {
QueueViewMBean queueViewMBean = (QueueViewMBean) context.newProxyInstance(brokers.get(brokerName).broker.getAdminView().getQueues()[0], QueueViewMBean.class, false);
long currentCount = queueViewMBean.getConsumerCount();
LOG.info("On " + brokerName + " current consumer count for " + queueViewMBean + ", " + currentCount);
if (count != currentCount) {
LOG.info("Sub IDs: " + Arrays.asList(queueViewMBean.getSubscriptions()));
}
return currentCount == count;
} catch (Exception e) {
LOG.warn(": ", e);
return false;
}
}
}, timeout));
}
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
unhandeledExceptions.clear();
Thread.setDefaultUncaughtExceptionHandler(this);
// Setup n brokers
for (int i = 0; i < BROKER_COUNT; i++) {
createBroker(new URI("broker:(tcp://localhost:6161" + i + ")/Broker" + i + "?persistent=false&useJmx=true&brokerId=Broker" + i));
}
consumerMap = new LinkedHashMap<String, MessageConsumer>();
}
@Override
protected void configureBroker(BrokerService brokerService) {
PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setExpireMessagesPeriod(0);
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(policyEntry);
brokerService.setDestinationPolicy(policyMap);
}
public void uncaughtException(Thread t, Throwable e) {
synchronized(unhandeledExceptions) {
unhandeledExceptions.put(t,e);
}
}
}