From f14b3353f46859dcaa8af3dc10621662214a4f68 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Mon, 18 May 2015 15:17:40 +0100 Subject: [PATCH] Support reconnect on OpenWire failover transport --- .../protocol/openwire/OpenWireConnection.java | 13 ++++-- .../protocol/openwire/amq/AMQSession.java | 9 ++++ .../openwire/BasicOpenWireTest.java | 5 ++- .../openwire/SimpleOpenWireTest.java | 42 +++++++++++++++++++ 4 files changed, 64 insertions(+), 5 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index b7258c3c3c..5ff77e043e 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -447,8 +447,11 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor @Override public void fail(ActiveMQException me) { - ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), - me.getType()); + if (me != null) + { + ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); + } + // Then call the listeners callFailureListeners(me); @@ -1465,8 +1468,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor || (context.isNetworkConnection() && this.acceptorUsed .isAuditNetworkProducers())) { - result.setLastStoredSequenceId(protocolManager - .getPersistenceAdapter().getLastProducerSequenceId(id)); + if (protocolManager.getPersistenceAdapter() != null) + { + result.setLastStoredSequenceId(protocolManager.getPersistenceAdapter().getLastProducerSequenceId(id)); + } } SessionState ss = state.getSessionState(id.getParentId()); if (ss != null) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 57f39a8d01..521262800b 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -292,6 +292,15 @@ public class AMQSession implements SessionCallback for (ActiveMQDestination dest : actualDestinations) { ServerMessageImpl coreMsg = new ServerMessageImpl(-1, 1024); + + /* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did + * not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to + * the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the + * message comes from failover connection. If so we add a DUPLICATE_ID to handle duplicates after a resend. */ + if (producerExchange.getConnectionContext().isFaultTolerant() && !messageSend.getProperties().containsKey(ServerMessage.HDR_DUPLICATE_DETECTION_ID)) + { + coreMsg.putStringProperty(ServerMessage.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString()); + } OpenWireMessageConverter.toCoreMessage(coreMsg, messageSend, connection.getMarshaller()); SimpleString address = OpenWireUtil.toCoreAddress(dest); coreMsg.setAddress(address); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java index 6ca20fe489..cb11d00374 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java @@ -96,7 +96,10 @@ public class BasicOpenWireTest extends OpenWireTestBase while (iterQueues.hasNext()) { SimpleString coreQ = iterQueues.next(); - this.server.destroyQueue(coreQ); + if (server.locateQueue(coreQ) != null) + { + this.server.destroyQueue(coreQ); + } System.out.println("Destroyed queue: " + coreQ); } testQueues.clear(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java index 442e9c7548..5bb5fd67e5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java @@ -37,6 +37,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; +import java.util.concurrent.TimeUnit; + public class SimpleOpenWireTest extends BasicOpenWireTest { @Rule @@ -376,4 +378,44 @@ public class SimpleOpenWireTest extends BasicOpenWireTest } } + + @Test + public void testFailoverTransportReconnect() throws Exception + { + Connection exConn = null; + + try + { + String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT + ")"; + ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString); + + Queue queue = new ActiveMQQueue(durableQueueName); + + exConn = exFact.createConnection(); + exConn.start(); + + Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer messageProducer = session.createProducer(queue); + messageProducer.send(session.createTextMessage("Test")); + + MessageConsumer consumer = session.createConsumer(queue); + assertNotNull(consumer.receive(5000)); + + server.stop(); + Thread.sleep(3000); + + server.start(); + server.waitForActivation(10, TimeUnit.SECONDS); + + messageProducer.send(session.createTextMessage("Test2")); + assertNotNull(consumer.receive(5000)); + } + finally + { + if (exConn != null) + { + exConn.close(); + } + } + } }