resolve https://issues.apache.org/activemq/browse/AMQ-2691 - ensure suppressed durable subs are acked so that message can be removed and not recovered at a later date. Tidy up network bridge creation so that durables are supported by default and allow dynamicOnly to determine if durables are auto bridged. Avoid duplicate mbeans for durable subs on restart and allow active to reflect status of a sub

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@932342 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-04-09 10:36:00 +00:00
parent b6eef83ef4
commit 05f82a9b6d
9 changed files with 291 additions and 85 deletions

View File

@ -29,6 +29,7 @@ import org.apache.activemq.broker.region.Region;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
@ -343,9 +344,11 @@ public class ManagedRegionBroker extends RegionBroker {
if (infos != null) { if (infos != null) {
for (int i = 0; i < infos.length; i++) { for (int i = 0; i < infos.length; i++) {
SubscriptionInfo info = infos[i]; SubscriptionInfo info = infos[i];
LOG.debug("Restoring durable subscription: " + info);
SubscriptionKey key = new SubscriptionKey(info); SubscriptionKey key = new SubscriptionKey(info);
subscriptions.put(key, info); if (!alreadyKnown(key)) {
LOG.debug("Restoring durable subscription mbean: " + info);
subscriptions.put(key, info);
}
} }
} }
} }
@ -359,6 +362,15 @@ public class ManagedRegionBroker extends RegionBroker {
} }
} }
private boolean alreadyKnown(SubscriptionKey key) {
boolean known = false;
known = ((TopicRegion) getTopicRegion()).durableSubscriptionExists(key);
if (LOG.isTraceEnabled()) {
LOG.trace("Sub with key: " + key + ", " + (known ? "": "not") + " already registered");
}
return known;
}
protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info) { protected void addInactiveSubscription(SubscriptionKey key, SubscriptionInfo info) {
Hashtable map = brokerObjectName.getKeyPropertyList(); Hashtable map = brokerObjectName.getKeyPropertyList();
try { try {

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.broker.jmx; package org.apache.activemq.broker.jmx;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.TopicSubscription; import org.apache.activemq.broker.region.TopicSubscription;
/** /**
@ -58,5 +59,15 @@ public class TopicSubscriptionView extends SubscriptionView implements TopicSubs
} }
} }
@Override
public boolean isActive() {
if (subscription instanceof DurableTopicSubscription) {
return ((DurableTopicSubscription)subscription).isActive();
} else {
return super.isActive();
}
}
} }

View File

@ -283,6 +283,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
MessageId messageId = node.getMessageId(); MessageId messageId = node.getMessageId();
if (ack.getLastMessageId().equals(messageId)) { if (ack.getLastMessageId().equals(messageId)) {
// this should never be within a transaction // this should never be within a transaction
dequeueCounter++;
node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
destination = node.getRegionDestination(); destination = node.getRegionDestination();
acknowledge(context, ack, node); acknowledge(context, ack, node);

View File

@ -170,7 +170,7 @@ public class TopicRegion extends AbstractRegion {
for (int i = 0; i < infos.length; i++) { for (int i = 0; i < infos.length; i++) {
SubscriptionInfo info = infos[i]; SubscriptionInfo info = infos[i];
LOG.debug("Restoring durable subscription: " + infos); LOG.debug("Restoring durable subscription: " + info);
SubscriptionKey key = new SubscriptionKey(info); SubscriptionKey key = new SubscriptionKey(info);
// A single durable sub may be subscribing to multiple topics. // A single durable sub may be subscribing to multiple topics.
@ -313,4 +313,8 @@ public class TopicRegion extends AbstractRegion {
this.keepDurableSubsActive = keepDurableSubsActive; this.keepDurableSubsActive = keepDurableSubsActive;
} }
public boolean durableSubscriptionExists(SubscriptionKey key) {
return this.durableSubscriptions.containsKey(key);
}
} }

View File

@ -30,6 +30,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.Service; import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
@ -65,6 +66,7 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo; import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter; import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.transport.DefaultTransportListener; import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.FutureResponse; import org.apache.activemq.transport.FutureResponse;
import org.apache.activemq.transport.ResponseCallback; import org.apache.activemq.transport.ResponseCallback;
@ -88,9 +90,9 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision$ * @version $Revision$
*/ */
public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware { public abstract class DemandForwardingBridgeSupport implements NetworkBridge, BrokerServiceAware {
private static final Log LOG = LogFactory.getLog(DemandForwardingBridge.class); private static final Log LOG = LogFactory.getLog(DemandForwardingBridge.class);
private static final ThreadPoolExecutor ASYNC_TASKS; private static final ThreadPoolExecutor ASYNC_TASKS;
protected static final String DURABLE_SUB_PREFIX = "NC-DS_";
protected final Transport localBroker; protected final Transport localBroker;
protected final Transport remoteBroker; protected final Transport remoteBroker;
protected final IdGenerator idGenerator = new IdGenerator(); protected final IdGenerator idGenerator = new IdGenerator();
@ -677,14 +679,19 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
final MessageDispatch md = (MessageDispatch) command; final MessageDispatch md = (MessageDispatch) command;
final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId()); final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) { if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
// See if this consumer's brokerPath tells us it came from the broker at the other end
// of the bridge. I think we should be making this decision based on the message's if (originallyCameFromRemote(md, sub)) {
// broker bread crumbs and not the consumer's? However, the message's broker bread if (LOG.isDebugEnabled()) {
// crumbs are null, which is another matter. LOG.debug(configuration.getBrokerName() + " message not forwarded to " + remoteBrokerName + " because message came from there or fails networkTTL: " + md.getMessage());
boolean cameFromRemote = false; }
Object consumerInfo = md.getMessage().getDataStructure(); // still ack as it may be durable
if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo)) try {
cameFromRemote = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId()); localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
} finally {
sub.decrementOutstandingResponses();
}
return;
}
Message message = configureMessage(md); Message message = configureMessage(md);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -697,17 +704,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
// send, we will preserve that QOS // send, we will preserve that QOS
// by bridging it using an async send (small chance // by bridging it using an async send (small chance
// of message loss). // of message loss).
try { try {
// Don't send it off to the remote if it originally came from the remote. remoteBroker.oneway(message);
if (!cameFromRemote) {
remoteBroker.oneway(message);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Message not forwarded on to remote, because message came from remote");
}
}
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
dequeueCounter.incrementAndGet(); dequeueCounter.incrementAndGet();
} finally { } finally {
@ -730,7 +728,6 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} else { } else {
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1)); localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
dequeueCounter.incrementAndGet(); dequeueCounter.incrementAndGet();
} }
} catch (IOException e) { } catch (IOException e) {
serviceLocalException(e); serviceLocalException(e);
@ -741,8 +738,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
}; };
remoteBroker.asyncRequest(message, callback); remoteBroker.asyncRequest(message, callback);
}
}
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage()); LOG.debug("No subscription registered with this network bridge for consumerId " + md.getConsumerId() + " for message: " + md.getMessage());
@ -779,6 +776,27 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
} }
} }
private boolean originallyCameFromRemote(MessageDispatch md, DemandSubscription sub) throws Exception {
// See if this consumer's brokerPath tells us it came from the broker at the other end
// of the bridge. I think we should be making this decision based on the message's
// broker bread crumbs and not the consumer's? However, the message's broker bread
// crumbs are null, which is another matter.
boolean cameFromRemote = false;
Object consumerInfo = md.getMessage().getDataStructure();
if (consumerInfo != null && (consumerInfo instanceof ConsumerInfo)) {
cameFromRemote = contains(((ConsumerInfo) consumerInfo).getBrokerPath(), remoteBrokerInfo.getBrokerId());
}
// for durable subs, suppression via filter leaves dangling acks so we need to
// check here and allow the ack irrespective
if (!cameFromRemote && sub.getLocalInfo().isDurable()) {
MessageEvaluationContext messageEvalContext = new MessageEvaluationContext();
messageEvalContext.setMessageReference(md.getMessage());
cameFromRemote = !createNetworkBridgeFilter(null).matches(messageEvalContext);
}
return cameFromRemote;
}
/** /**
* @return Returns the dynamicallyIncludedDestinations. * @return Returns the dynamicallyIncludedDestinations.
*/ */
@ -1130,9 +1148,14 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub); subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(), sub);
subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub); subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(), sub);
// This works for now since we use a VM connection to the local broker. if (!info.isDurable()) {
// may need to change if we ever subscribe to a remote broker. // This works for now since we use a VM connection to the local broker.
sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info)); // may need to change if we ever subscribe to a remote broker.
sub.getLocalInfo().setAdditionalPredicate(createNetworkBridgeFilter(info));
} else {
// need to ack this message if it is ignored as it is durable so
// we check before we send. see: originallyCameFromRemote()
}
} }
protected void removeDemandSubscription(ConsumerId id) throws IOException { protected void removeDemandSubscription(ConsumerId id) throws IOException {

View File

@ -54,7 +54,7 @@ public class DurableConduitBridge extends ConduitBridge {
*/ */
protected void setupStaticDestinations() { protected void setupStaticDestinations() {
super.setupStaticDestinations(); super.setupStaticDestinations();
ActiveMQDestination[] dests = durableDestinations; ActiveMQDestination[] dests = configuration.isDynamicOnly() ? null : durableDestinations;
if (dests != null) { if (dests != null) {
for (int i = 0; i < dests.length; i++) { for (int i = 0; i < dests.length; i++) {
ActiveMQDestination dest = dests[i]; ActiveMQDestination dest = dests[i];
@ -96,7 +96,7 @@ public class DurableConduitBridge extends ConduitBridge {
} }
protected String getSubscriberName(ActiveMQDestination dest) { protected String getSubscriberName(ActiveMQDestination dest) {
String subscriberName = configuration.getBrokerName() + "_" + dest.getPhysicalName(); String subscriberName = DURABLE_SUB_PREFIX + configuration.getBrokerName() + "_" + dest.getPhysicalName();
return subscriberName; return subscriberName;
} }

View File

@ -55,11 +55,8 @@ public final class NetworkBridgeFactory {
final NetworkBridgeListener listener) { final NetworkBridgeListener listener) {
DemandForwardingBridge result = null; DemandForwardingBridge result = null;
if (configuration.isConduitSubscriptions()) { if (configuration.isConduitSubscriptions()) {
if (configuration.isDynamicOnly()) { // dynamicOnly determines whether durables are auto bridged
result = new ConduitBridge(configuration, localTransport, remoteTransport); result = new DurableConduitBridge(configuration, localTransport, remoteTransport);
} else {
result = new DurableConduitBridge(configuration, localTransport, remoteTransport);
}
} else { } else {
result = new DemandForwardingBridge(configuration, localTransport, remoteTransport); result = new DemandForwardingBridge(configuration, localTransport, remoteTransport);
} }

View File

@ -28,8 +28,11 @@ import javax.jms.Session;
import org.apache.activemq.JmsMultipleBrokersTestSupport; import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.jmx.BrokerView; import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class AMQ2439Test extends JmsMultipleBrokersTestSupport { public class AMQ2439Test extends JmsMultipleBrokersTestSupport {
private static final Log LOG = LogFactory.getLog(AMQ2439Test.class);
Destination dest; Destination dest;
@ -45,6 +48,7 @@ public class AMQ2439Test extends JmsMultipleBrokersTestSupport {
assertTrue("dequeue is correct", Wait.waitFor(new Wait.Condition() { assertTrue("dequeue is correct", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
LOG.info("dequeue count (want 1000), is : " + brokerView.getTotalDequeueCount());
return 1000 == brokerView.getTotalDequeueCount(); return 1000 == brokerView.getTotalDequeueCount();
} }
})); }));

View File

@ -19,11 +19,18 @@ package org.apache.activemq.network;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeNotNull; import static org.junit.Assume.assumeNotNull;
import java.io.File;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TopicSubscriber;
import javax.management.InstanceNotFoundException; import javax.management.InstanceNotFoundException;
import javax.management.MBeanServerConnection; import javax.management.MBeanServerConnection;
import javax.management.ObjectName; import javax.management.ObjectName;
@ -35,6 +42,9 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -44,28 +54,54 @@ public class NetworkBrokerDetachTest {
private final static String BROKER_NAME = "broker"; private final static String BROKER_NAME = "broker";
private final static String REM_BROKER_NAME = "networkedBroker"; private final static String REM_BROKER_NAME = "networkedBroker";
private final static String QUEUE_NAME = "testQ"; private final static String DESTINATION_NAME = "testQ";
private final static int NUM_CONSUMERS = 1; private final static int NUM_CONSUMERS = 1;
protected static final Log LOG = LogFactory.getLog(NetworkBrokerDetachTest.class); protected static final Log LOG = LogFactory.getLog(NetworkBrokerDetachTest.class);
protected final int numRestarts = 3; protected final int numRestarts = 3;
protected final int networkTTL = 2;
protected final boolean dynamicOnly = false;
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
broker.setBrokerName(BROKER_NAME); broker.setBrokerName(BROKER_NAME);
configureBroker(broker);
broker.addConnector("tcp://localhost:61617"); broker.addConnector("tcp://localhost:61617");
NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://localhost:62617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false"); NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://localhost:62617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false");
networkConnector.setDuplex(false); configureNetworkConnector(networkConnector);
return broker; return broker;
} }
protected BrokerService createNetworkedBroker() throws Exception { protected BrokerService createNetworkedBroker() throws Exception {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
broker.setBrokerName(REM_BROKER_NAME); broker.setBrokerName(REM_BROKER_NAME);
configureBroker(broker);
broker.getManagementContext().setCreateConnector(false);
broker.addConnector("tcp://localhost:62617"); broker.addConnector("tcp://localhost:62617");
NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://localhost:61617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false");
configureNetworkConnector(networkConnector);
return broker; return broker;
} }
private void configureNetworkConnector(NetworkConnector networkConnector) {
networkConnector.setDuplex(false);
networkConnector.setNetworkTTL(networkTTL);
networkConnector.setDynamicOnly(dynamicOnly);
}
// variants for each store....
private void configureBroker(BrokerService broker) throws Exception {
//KahaPersistenceAdapter persistenceAdapter = new KahaPersistenceAdapter();
//persistenceAdapter.setDirectory(new File("target/activemq-data/kaha/" + broker.getBrokerName() + "/NetworBrokerDetatchTest"));
//broker.setPersistenceAdapter(persistenceAdapter);
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
persistenceAdapter.setDirectory(new File("target/activemq-data/kahadb/NetworBrokerDetatchTest"));
broker.setPersistenceAdapter(persistenceAdapter);
// default AMQ
}
@Test @Test
public void testNetworkedBrokerDetach() throws Exception { public void testNetworkedBrokerDetach() throws Exception {
BrokerService broker = createBroker(); BrokerService broker = createBroker();
@ -79,29 +115,13 @@ public class NetworkBrokerDetachTest {
ConnectionFactory consFactory = createConnectionFactory(networkedBroker); ConnectionFactory consFactory = createConnectionFactory(networkedBroker);
Connection consConn = consFactory.createConnection(); Connection consConn = consFactory.createConnection();
Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE); Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME);
for(int i=0; i<NUM_CONSUMERS; i++) { for(int i=0; i<NUM_CONSUMERS; i++) {
consSession.createConsumer(consSession.createQueue(QUEUE_NAME)); consSession.createConsumer(destination);
} }
assertTrue("got expected consumer count from mbean within time limit", Wait.waitFor(new Wait.Condition() { assertTrue("got expected consumer count from mbean within time limit",
verifyConsumerCount(1, destination, BROKER_NAME));
public boolean isSatisified() throws Exception {
boolean result = false;
MBeanServerConnection mbsc = getMBeanServerConnection();
if (mbsc != null) {
// We should have 1 consumer for the queue on the local broker
Object consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME, "ConsumerCount");
if (consumers != null) {
LOG.info("Consumers for " + QUEUE_NAME + " on " + BROKER_NAME + " : " + consumers);
if (1L == ((Long)consumers).longValue()) {
result = true;
}
}
}
return result;
}
}));
LOG.info("Stopping Consumer on the networked broker ..."); LOG.info("Stopping Consumer on the networked broker ...");
@ -109,24 +129,7 @@ public class NetworkBrokerDetachTest {
consConn.close(); consConn.close();
// We should have 0 consumer for the queue on the local broker // We should have 0 consumer for the queue on the local broker
assertTrue("got expected 0 count from mbean within time limit", Wait.waitFor(new Wait.Condition() { assertTrue("got expected 0 count from mbean within time limit", verifyConsumerCount(0, destination, BROKER_NAME));
public boolean isSatisified() throws Exception {
boolean result = false;
MBeanServerConnection mbsc = getMBeanServerConnection();
if (mbsc != null) {
// We should have 1 consumer for the queue on the local broker
Object consumers = getAttribute(mbsc, "Queue", "Destination=" + QUEUE_NAME, "ConsumerCount");
if (consumers != null) {
LOG.info("Consumers for " + QUEUE_NAME + " on " + BROKER_NAME + " : " + consumers);
if (0L == ((Long)consumers).longValue()) {
result = true;
}
}
}
return result;
}
}));
networkedBroker.stop(); networkedBroker.stop();
networkedBroker.waitUntilStopped(); networkedBroker.waitUntilStopped();
@ -134,6 +137,106 @@ public class NetworkBrokerDetachTest {
broker.waitUntilStopped(); broker.waitUntilStopped();
} }
@Test
public void testNetworkedBrokerDurableSubAfterRestart() throws Exception {
BrokerService brokerOne = createBroker();
brokerOne.setDeleteAllMessagesOnStartup(true);
brokerOne.start();
BrokerService brokerTwo = createNetworkedBroker();
brokerTwo.setDeleteAllMessagesOnStartup(true);
brokerTwo.start();
final AtomicInteger count = new AtomicInteger(0);
MessageListener counter = new MessageListener() {
public void onMessage(Message message) {
count.incrementAndGet();
}
};
LOG.info("Creating durable consumer on each broker ...");
ActiveMQTopic destination = registerDurableConsumer(brokerTwo, counter);
registerDurableConsumer(brokerOne, counter);
assertTrue("got expected consumer count from local broker mbean within time limit",
verifyConsumerCount(2, destination, BROKER_NAME));
assertTrue("got expected consumer count from network broker mbean within time limit",
verifyConsumerCount(2, destination, REM_BROKER_NAME));
sendMessageTo(destination, brokerOne);
assertTrue("Got one message on each", verifyMessageCount(2, count));
LOG.info("Stopping brokerTwo...");
brokerTwo.stop();
brokerTwo.waitUntilStopped();
LOG.info("restarting broker Two...");
brokerTwo = createNetworkedBroker();
brokerTwo.start();
LOG.info("Recreating durable Consumer on the broker after restart...");
registerDurableConsumer(brokerTwo, counter);
// give advisories a chance to percolate
TimeUnit.SECONDS.sleep(5);
sendMessageTo(destination, brokerOne);
// expect similar after restart
assertTrue("got expected consumer count from local broker mbean within time limit",
verifyConsumerCount(2, destination, BROKER_NAME));
// a durable sub is auto bridged on restart unless dynamicOnly=true
assertTrue("got expected consumer count from network broker mbean within time limit",
verifyConsumerCount(2, destination, REM_BROKER_NAME));
assertTrue("got no inactive subs on broker", verifyDurableConsumerCount(0, BROKER_NAME));
assertTrue("got no inactive subs on other broker", verifyDurableConsumerCount(0, REM_BROKER_NAME));
assertTrue("Got two more messages after restart", verifyMessageCount(4, count));
TimeUnit.SECONDS.sleep(1);
assertTrue("still Got just two more messages", verifyMessageCount(4, count));
brokerTwo.stop();
brokerTwo.waitUntilStopped();
brokerOne.stop();
brokerOne.waitUntilStopped();
}
private boolean verifyMessageCount(final int i, final AtomicInteger count) throws Exception {
return Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return i == count.get();
}
});
}
private ActiveMQTopic registerDurableConsumer(
BrokerService brokerService, MessageListener listener) throws Exception {
ConnectionFactory factory = createConnectionFactory(brokerService);
Connection connection = factory.createConnection();
connection.setClientID("DurableOne");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic destination = (ActiveMQTopic) session.createTopic(DESTINATION_NAME);
// unique to a broker
TopicSubscriber sub = session.createDurableSubscriber(destination, "SubOne" + brokerService.getBrokerName());
sub.setMessageListener(listener);
return destination;
}
private void sendMessageTo(ActiveMQTopic destination, BrokerService brokerService) throws Exception {
ConnectionFactory factory = createConnectionFactory(brokerService);
Connection conn = factory.createConnection();
conn.start();
Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createProducer(destination).send(session.createTextMessage("Hi"));
conn.close();
}
protected ConnectionFactory createConnectionFactory(final BrokerService broker) throws Exception { protected ConnectionFactory createConnectionFactory(final BrokerService broker) throws Exception {
String url = ((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString(); String url = ((TransportConnector) broker.getTransportConnectors().get(0)).getServer().getConnectURI().toString();
@ -154,6 +257,46 @@ public class NetworkBrokerDetachTest {
} }
// JMX Helper Methods // JMX Helper Methods
private boolean verifyConsumerCount(final long expectedCount, final ActiveMQDestination destination, final String brokerName) throws Exception {
return Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
boolean result = false;
MBeanServerConnection mbsc = getMBeanServerConnection();
if (mbsc != null) {
// We should have 1 consumer for the queue on the local broker
Object consumers = getAttribute(mbsc, brokerName, destination.isQueue() ? "Queue" : "Topic", "Destination=" + destination.getPhysicalName(), "ConsumerCount");
if (consumers != null) {
LOG.info("Consumers for " + destination.getPhysicalName() + " on " + brokerName + " : " + consumers);
if (expectedCount == ((Long)consumers).longValue()) {
result = true;
}
}
}
return result;
}
});
}
private boolean verifyDurableConsumerCount(final long expectedCount, final String brokerName) throws Exception {
return Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
boolean result = false;
MBeanServerConnection mbsc = getMBeanServerConnection();
if (mbsc != null) {
Set subs = getMbeans(mbsc, brokerName, "Subscription", "active=false,*");
if (subs != null) {
LOG.info("inactive durable subs on " + brokerName + " : " + subs);
if (expectedCount == subs.size()) {
result = true;
}
}
}
return result;
}
});
}
private MBeanServerConnection getMBeanServerConnection() throws MalformedURLException { private MBeanServerConnection getMBeanServerConnection() throws MalformedURLException {
final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"); final JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
@ -171,10 +314,21 @@ public class NetworkBrokerDetachTest {
return mbsc; return mbsc;
} }
private Object getAttribute(MBeanServerConnection mbsc, String type, String pattern, String attrName) throws Exception {
private Set getMbeans(MBeanServerConnection mbsc, String brokerName, String type, String pattern) throws Exception {
Set obj = null;
try {
obj = mbsc.queryMBeans(getObjectName(brokerName, type, pattern), null);
} catch (InstanceNotFoundException ignored) {
LOG.warn("getAttribute ex: " + ignored);
}
return obj;
}
private Object getAttribute(MBeanServerConnection mbsc, String brokerName, String type, String pattern, String attrName) throws Exception {
Object obj = null; Object obj = null;
try { try {
obj = mbsc.getAttribute(getObjectName(BROKER_NAME, type, pattern), attrName); obj = mbsc.getAttribute(getObjectName(brokerName, type, pattern), attrName);
} catch (InstanceNotFoundException ignored) { } catch (InstanceNotFoundException ignored) {
LOG.warn("getAttribute ex: " + ignored); LOG.warn("getAttribute ex: " + ignored);
} }