diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java index 3616a9bab2..0c4edb233f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java @@ -18,7 +18,6 @@ package org.apache.activemq.broker; import java.io.IOException; import java.net.URI; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -32,7 +31,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.activemq.broker.ft.MasterBroker; import org.apache.activemq.broker.region.ConnectionStatistics; import org.apache.activemq.broker.region.RegionBroker; @@ -1152,7 +1150,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor { // now turn duplex off this side info.setDuplexConnection(false); duplexBridge.setCreatedByDuplex(true); - duplexBridge.duplexStart(brokerInfo, info); + duplexBridge.duplexStart(this,brokerInfo, info); LOG.info("Created Duplex Bridge back to " + info.getBrokerName()); return null; diff --git a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index b6bde2c0b9..26767467ae 100644 --- a/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -28,7 +28,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.activemq.Service; import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.TransportConnection; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTempDestination; @@ -120,6 +122,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { private AtomicBoolean started = new AtomicBoolean(); + private TransportConnection duplexInitiatingConnection; public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) { this.configuration = configuration; @@ -127,9 +130,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { this.remoteBroker = remoteBroker; } - public void duplexStart(BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception { + public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception { this.localBrokerInfo = localBrokerInfo; this.remoteBrokerInfo = remoteBrokerInfo; + this.duplexInitiatingConnection = connection; start(); serviceRemoteCommand(remoteBrokerInfo); } @@ -381,7 +385,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { LOG.debug("The remote Exception was: " + error, error); ASYNC_TASKS.execute(new Runnable() { public void run() { - ServiceSupport.dispose(DemandForwardingBridgeSupport.this); + ServiceSupport.dispose(getControllingService()); } }); fireBridgeFailed(); @@ -533,13 +537,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge { LOG.debug("The local Exception was:" + error, error); ASYNC_TASKS.execute(new Runnable() { public void run() { - ServiceSupport.dispose(DemandForwardingBridgeSupport.this); + ServiceSupport.dispose(getControllingService()); } }); fireBridgeFailed(); } } + protected Service getControllingService() { + return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this; + } + protected void addSubscription(DemandSubscription sub) throws IOException { if (sub != null) { localBroker.oneway(sub.getLocalInfo()); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java index ef0b72f837..6814e04d1e 100755 --- a/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/RecoveryBrokerTest.java @@ -33,6 +33,7 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.command.XATransactionId; /** * Used to simulate the recovery that occurs when a broker shuts down. @@ -235,7 +236,7 @@ public class RecoveryBrokerTest extends BrokerRestartTestSupport { Message m = receiveMessage(connection); assertNull(m); } - + public void testTopicDurableConsumerHoldsPersistentMessageAfterRestart() throws Exception { ActiveMQDestination destination = new ActiveMQTopic("TEST"); @@ -403,6 +404,7 @@ public class RecoveryBrokerTest extends BrokerRestartTestSupport { assertNoMessagesLeft(connection); } + public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception { @@ -456,6 +458,8 @@ public class RecoveryBrokerTest extends BrokerRestartTestSupport { Message m = receiveMessage(connection); assertNull(m); } + + public void testQueuePersistentUncommitedAcksLostOnRestart() throws Exception { @@ -512,6 +516,62 @@ public class RecoveryBrokerTest extends BrokerRestartTestSupport { assertNoMessagesLeft(connection); } + + public void testQueuePersistentXAUncommitedAcksLostOnRestart() throws Exception { + int NUMBER = 100; + ActiveMQDestination destination = new ActiveMQQueue("TEST"); + + // Setup the producer and send the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + for (int i = 0; i < NUMBER; i++) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + connection.send(message); + } + + // Setup the consumer and receive the message. + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // Begin the transaction. + XATransactionId txid = createXATransaction(sessionInfo); + connection.send(createBeginTransaction(connectionInfo, txid)); + for (int i = 0; i < NUMBER; i++) { + Message m = receiveMessage(connection); + assertNotNull(m); + MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE); + ack.setTransactionId(txid); + connection.send(ack); + } + // Don't commit + + // restart the broker. + restartBroker(); + + // Setup the consumer and receive the message. + connection = createConnection(); + connectionInfo = createConnectionInfo(); + sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // All messages should be re-delivered. + for (int i = 0; i < NUMBER; i++) { + Message m = receiveMessage(connection); + assertNotNull(m); + } + + assertNoMessagesLeft(connection); + } public static Test suite() { return suite(RecoveryBrokerTest.class); diff --git a/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java b/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java new file mode 100644 index 0000000000..c08f3848f3 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java @@ -0,0 +1,132 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import java.util.Set; + +import javax.management.MBeanServerConnection; +import javax.management.ObjectInstance; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; + +import junit.framework.TestCase; + +import org.apache.activemq.broker.BrokerService; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +public class DuplexNetworkMBeanTest extends TestCase { + + protected static final Log LOG = LogFactory.getLog(DuplexNetworkMBeanTest.class); + protected final int numRestarts = 2; + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName("broker"); + broker.addConnector("tcp://localhost:61617"); + + return broker; + } + + protected BrokerService createNetworkedBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName("networkedBroker"); + broker.addConnector("tcp://localhost:62617"); + NetworkConnector networkConnector = broker.addNetworkConnector("static://tcp://localhost:61617"); + networkConnector.setDuplex(true); + return broker; + } + + public void testMbeanPresenceOnNetworkBrokerRestart() throws Exception { + BrokerService broker = createBroker(); + broker.start(); + assertEquals(1, countMbeans(broker, "Connector")); + assertEquals(0, countMbeans(broker, "Connection")); + BrokerService networkedBroker = null; + for (int i=0; i 0) { + Thread.sleep(100); + } + mbeans = mbsc.queryMBeans(beanName, null); + } while (mbeans.isEmpty() && expiryTime > System.currentTimeMillis()); + return mbeans.size(); + } +}