resolve: https://issues.apache.org/activemq/browse/AMQ-2904 - fix issue with network connector failover recreating consumer in pull mode, add test. related to: https://issues.apache.org/activemq/browse/AMQ-2579

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@993382 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-09-07 14:40:58 +00:00
parent 444356b24f
commit 4c4f0d18ce
4 changed files with 73 additions and 14 deletions

View File

@ -1872,6 +1872,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount()); LOG.debug("transport interrupted, dispatchers: " + transportInterruptionProcessingComplete.getCount());
} }
signalInterruptionProcessingNeeded();
for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) { for (Iterator<ActiveMQSession> i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = i.next(); ActiveMQSession s = i.next();
s.clearMessagesInProgress(); s.clearMessagesInProgress();
@ -2318,6 +2320,17 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
} }
} }
private void signalInterruptionProcessingNeeded() {
FailoverTransport failoverTransport = transport.narrow(FailoverTransport.class);
if (failoverTransport != null) {
failoverTransport.getStateTracker().transportInterrupted(this.getConnectionInfo().getConnectionId());
if (LOG.isDebugEnabled()) {
LOG.debug("notified failover transport (" + failoverTransport
+ ") of pending interruption processing for: " + this.getConnectionInfo().getConnectionId());
}
}
}
/* /*
* specify the amount of time in milliseconds that a consumer with a transaction pending recovery * specify the amount of time in milliseconds that a consumer with a transaction pending recovery
* will wait to receive re dispatched messages. * will wait to receive re dispatched messages.

View File

@ -674,8 +674,9 @@ public class ConnectionStateTracker extends CommandVisitorAdapter {
} }
} }
public void transportInterrupted() { public void transportInterrupted(ConnectionId connectionId) {
for (ConnectionState connectionState : connectionStates.values()) { ConnectionState connectionState = connectionStates.get(connectionId);
if (connectionState != null) {
connectionState.setConnectionInterruptProcessingComplete(false); connectionState.setConnectionInterruptProcessingComplete(false);
} }
} }

View File

@ -233,8 +233,6 @@ public class FailoverTransport implements CompositeTransport {
connectedTransportURI = null; connectedTransportURI = null;
connected = false; connected = false;
stateTracker.transportInterrupted();
// notify before any reconnect attempt so ack state can be // notify before any reconnect attempt so ack state can be
// whacked // whacked
if (transportListener != null) { if (transportListener != null) {

View File

@ -51,6 +51,8 @@ public class FailoverStaticNetworkTest {
protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts) throws Exception { protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts) throws Exception {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
broker.setUseJmx(true);
broker.getManagementContext().setCreateConnector(false);
broker.setSslContext(sslContext); broker.setSslContext(sslContext);
broker.setDeleteAllMessagesOnStartup(true); broker.setDeleteAllMessagesOnStartup(true);
broker.setBrokerName("Broker_" + listenPort); broker.setBrokerName("Broker_" + listenPort);
@ -68,14 +70,14 @@ public class FailoverStaticNetworkTest {
} }
@Before @Before
public void init() throws Exception { public void setUp() throws Exception {
KeyManager[] km = SslBrokerServiceTest.getKeyManager(); KeyManager[] km = SslBrokerServiceTest.getKeyManager();
TrustManager[] tm = SslBrokerServiceTest.getTrustManager(); TrustManager[] tm = SslBrokerServiceTest.getTrustManager();
sslContext = new SslContext(km, tm, null); sslContext = new SslContext(km, tm, null);
} }
@After @After
public void cleanup() throws Exception { public void tearDown() throws Exception {
brokerB.stop(); brokerB.stop();
brokerB.waitUntilStopped(); brokerB.waitUntilStopped();
@ -83,6 +85,44 @@ public class FailoverStaticNetworkTest {
brokerA.waitUntilStopped(); brokerA.waitUntilStopped();
} }
@Test
public void testSendReceiveAfterReconnect() throws Exception {
brokerA = createBroker("tcp", "61617", null);
brokerA.start();
brokerB = createBroker("tcp", "62617", new String[]{"61617"});
brokerB.start();
doTestNetworkSendReceive();
LOG.info("stopping brokerA");
brokerA.stop();
brokerA.waitUntilStopped();
LOG.info("restarting brokerA");
brokerA = createBroker("tcp", "61617", null);
brokerA.start();
doTestNetworkSendReceive();
}
@Test
public void testSendReceiveFailover() throws Exception {
brokerA = createBroker("tcp", "61617", null);
brokerA.start();
brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"});
brokerB.start();
doTestNetworkSendReceive();
LOG.info("stopping brokerA");
brokerA.stop();
brokerA.waitUntilStopped();
LOG.info("restarting brokerA");
brokerA = createBroker("tcp", "63617", null);
brokerA.start();
doTestNetworkSendReceive();
}
/** /**
* networked broker started after target so first connect attempt succeeds * networked broker started after target so first connect attempt succeeds
* start order is important * start order is important
@ -95,7 +135,7 @@ public class FailoverStaticNetworkTest {
brokerB = createBroker("tcp", "62617", new String[]{"61617","1111"}); brokerB = createBroker("tcp", "62617", new String[]{"61617","1111"});
brokerB.start(); brokerB.start();
testNetworkSendReceive(); doTestNetworkSendReceive();
} }
@Test @Test
@ -106,14 +146,14 @@ public class FailoverStaticNetworkTest {
brokerB = createBroker("ssl", "62617", new String[]{"61617", "1111"}); brokerB = createBroker("ssl", "62617", new String[]{"61617", "1111"});
brokerB.start(); brokerB.start();
testNetworkSendReceive(); doTestNetworkSendReceive();
} }
private void testNetworkSendReceive() throws Exception, JMSException { private void doTestNetworkSendReceive() throws Exception, JMSException {
LOG.info("Creating Consumer on the networked broker ..."); LOG.info("Creating Consumer on the networked brokerA ...");
SslContext.setCurrentSslContext(sslContext); SslContext.setCurrentSslContext(sslContext);
// Create a consumer on brokerB // Create a consumer on brokerA
ConnectionFactory consFactory = createConnectionFactory(brokerA); ConnectionFactory consFactory = createConnectionFactory(brokerA);
Connection consConn = consFactory.createConnection(); Connection consConn = consFactory.createConnection();
consConn.start(); consConn.start();
@ -121,13 +161,20 @@ public class FailoverStaticNetworkTest {
ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME); ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME);
final MessageConsumer consumer = consSession.createConsumer(destination); final MessageConsumer consumer = consSession.createConsumer(destination);
LOG.info("publishing to brokerB");
sendMessageTo(destination, brokerB); sendMessageTo(destination, brokerB);
assertTrue("consumer got message", Wait.waitFor(new Wait.Condition() { boolean gotMessage = Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception { public boolean isSatisified() throws Exception {
return consumer.receive(1000) != null; return consumer.receive(1000) != null;
} }
})); });
try {
consConn.close();
} catch (JMSException ignored) {
}
assertTrue("consumer on A got message", gotMessage);
} }
private void sendMessageTo(ActiveMQDestination destination, BrokerService brokerService) throws Exception { private void sendMessageTo(ActiveMQDestination destination, BrokerService brokerService) throws Exception {