git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@675314 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-07-09 19:14:35 +00:00
parent 730aef4d1e
commit f4012c5098
4 changed files with 205 additions and 7 deletions

View File

@ -18,7 +18,6 @@ package org.apache.activemq.broker;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
@ -32,7 +31,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
@ -1152,7 +1150,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
// now turn duplex off this side
info.setDuplexConnection(false);
duplexBridge.setCreatedByDuplex(true);
duplexBridge.duplexStart(brokerInfo, info);
duplexBridge.duplexStart(this,brokerInfo, info);
LOG.info("Created Duplex Bridge back to " + info.getBrokerName());
return null;

View File

@ -28,7 +28,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
@ -120,6 +122,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
private AtomicBoolean started = new AtomicBoolean();
private TransportConnection duplexInitiatingConnection;
public DemandForwardingBridgeSupport(NetworkBridgeConfiguration configuration, Transport localBroker, Transport remoteBroker) {
this.configuration = configuration;
@ -127,9 +130,10 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
this.remoteBroker = remoteBroker;
}
public void duplexStart(BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
public void duplexStart(TransportConnection connection, BrokerInfo localBrokerInfo, BrokerInfo remoteBrokerInfo) throws Exception {
this.localBrokerInfo = localBrokerInfo;
this.remoteBrokerInfo = remoteBrokerInfo;
this.duplexInitiatingConnection = connection;
start();
serviceRemoteCommand(remoteBrokerInfo);
}
@ -381,7 +385,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
LOG.debug("The remote Exception was: " + error, error);
ASYNC_TASKS.execute(new Runnable() {
public void run() {
ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
ServiceSupport.dispose(getControllingService());
}
});
fireBridgeFailed();
@ -533,13 +537,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge {
LOG.debug("The local Exception was:" + error, error);
ASYNC_TASKS.execute(new Runnable() {
public void run() {
ServiceSupport.dispose(DemandForwardingBridgeSupport.this);
ServiceSupport.dispose(getControllingService());
}
});
fireBridgeFailed();
}
}
protected Service getControllingService() {
return duplexInitiatingConnection != null ? duplexInitiatingConnection : DemandForwardingBridgeSupport.this;
}
protected void addSubscription(DemandSubscription sub) throws IOException {
if (sub != null) {
localBroker.oneway(sub.getLocalInfo());

View File

@ -33,6 +33,7 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.XATransactionId;
/**
* Used to simulate the recovery that occurs when a broker shuts down.
@ -235,7 +236,7 @@ public class RecoveryBrokerTest extends BrokerRestartTestSupport {
Message m = receiveMessage(connection);
assertNull(m);
}
public void testTopicDurableConsumerHoldsPersistentMessageAfterRestart() throws Exception {
ActiveMQDestination destination = new ActiveMQTopic("TEST");
@ -403,6 +404,7 @@ public class RecoveryBrokerTest extends BrokerRestartTestSupport {
assertNoMessagesLeft(connection);
}
public void testQueuePersistentCommitedAcksNotLostOnRestart() throws Exception {
@ -456,6 +458,8 @@ public class RecoveryBrokerTest extends BrokerRestartTestSupport {
Message m = receiveMessage(connection);
assertNull(m);
}
public void testQueuePersistentUncommitedAcksLostOnRestart() throws Exception {
@ -512,6 +516,62 @@ public class RecoveryBrokerTest extends BrokerRestartTestSupport {
assertNoMessagesLeft(connection);
}
public void testQueuePersistentXAUncommitedAcksLostOnRestart() throws Exception {
int NUMBER = 100;
ActiveMQDestination destination = new ActiveMQQueue("TEST");
// Setup the producer and send the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
for (int i = 0; i < NUMBER; i++) {
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
connection.send(message);
}
// Setup the consumer and receive the message.
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// Begin the transaction.
XATransactionId txid = createXATransaction(sessionInfo);
connection.send(createBeginTransaction(connectionInfo, txid));
for (int i = 0; i < NUMBER; i++) {
Message m = receiveMessage(connection);
assertNotNull(m);
MessageAck ack = createAck(consumerInfo, m, 1, MessageAck.STANDARD_ACK_TYPE);
ack.setTransactionId(txid);
connection.send(ack);
}
// Don't commit
// restart the broker.
restartBroker();
// Setup the consumer and receive the message.
connection = createConnection();
connectionInfo = createConnectionInfo();
sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// All messages should be re-delivered.
for (int i = 0; i < NUMBER; i++) {
Message m = receiveMessage(connection);
assertNotNull(m);
}
assertNoMessagesLeft(connection);
}
public static Test suite() {
return suite(RecoveryBrokerTest.class);

View File

@ -0,0 +1,132 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import java.util.Set;
import javax.management.MBeanServerConnection;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class DuplexNetworkMBeanTest extends TestCase {
protected static final Log LOG = LogFactory.getLog(DuplexNetworkMBeanTest.class);
protected final int numRestarts = 2;
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("broker");
broker.addConnector("tcp://localhost:61617");
return broker;
}
protected BrokerService createNetworkedBroker() throws Exception {
BrokerService broker = new BrokerService();
broker.setBrokerName("networkedBroker");
broker.addConnector("tcp://localhost:62617");
NetworkConnector networkConnector = broker.addNetworkConnector("static://tcp://localhost:61617");
networkConnector.setDuplex(true);
return broker;
}
public void testMbeanPresenceOnNetworkBrokerRestart() throws Exception {
BrokerService broker = createBroker();
broker.start();
assertEquals(1, countMbeans(broker, "Connector"));
assertEquals(0, countMbeans(broker, "Connection"));
BrokerService networkedBroker = null;
for (int i=0; i<numRestarts; i++) {
networkedBroker = createNetworkedBroker();
networkedBroker.start();
assertEquals(1, countMbeans(networkedBroker, "NetworkBridge", 2000));
assertEquals(1, countMbeans(broker, "Connection"));
networkedBroker.stop();
networkedBroker.waitUntilStopped();
assertEquals(0, countMbeans(networkedBroker, "stopped"));
}
assertEquals(0, countMbeans(networkedBroker, "NetworkBridge"));
assertEquals(0, countMbeans(networkedBroker, "Connector"));
assertEquals(0, countMbeans(networkedBroker, "Connection"));
assertEquals(1, countMbeans(broker, "Connector"));
broker.stop();
}
public void testMbeanPresenceOnBrokerRestart() throws Exception {
BrokerService networkedBroker = createNetworkedBroker();
networkedBroker.start();
assertEquals(1, countMbeans(networkedBroker, "Connector"));
assertEquals(0, countMbeans(networkedBroker, "Connection"));
BrokerService broker = null;
for (int i=0; i<numRestarts; i++) {
broker = createBroker();
broker.start();
assertEquals(1, countMbeans(networkedBroker, "NetworkBridge", 5000));
assertEquals(1, countMbeans(broker, "Connection"));
broker.stop();
broker.waitUntilStopped();
}
assertEquals(0, countMbeans(networkedBroker, "NetworkBridge"));
assertEquals(1, countMbeans(networkedBroker, "Connector"));
assertEquals(0, countMbeans(networkedBroker, "Connection"));
assertEquals(0, countMbeans(broker, "Connection"));
networkedBroker.stop();
}
private int countMbeans(BrokerService broker, String type) throws Exception {
return countMbeans(broker, type, 0);
}
private int countMbeans(BrokerService broker, String type, int timeout) throws Exception {
final long expiryTime = System.currentTimeMillis() + timeout;
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
Set all = mbsc.queryMBeans(null, null);
LOG.info("MBean total=" + all.size());
for (Object o : all) {
ObjectInstance bean = (ObjectInstance)o;
LOG.info(bean.getObjectName());
}
ObjectName beanName = new ObjectName("org.apache.activemq:BrokerName="
+ broker.getBrokerName() + ",Type=" + type +",*");
Set mbeans = null;
do {
if (timeout > 0) {
Thread.sleep(100);
}
mbeans = mbsc.queryMBeans(beanName, null);
} while (mbeans.isEmpty() && expiryTime > System.currentTimeMillis());
return mbeans.size();
}
}