Support reconnect on OpenWire failover transport

This commit is contained in:
Martyn Taylor 2015-05-18 15:17:40 +01:00
parent 2de3bfa9f1
commit f14b3353f4
4 changed files with 64 additions and 5 deletions

View File

@ -447,8 +447,11 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
@Override
public void fail(ActiveMQException me)
{
ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(),
me.getType());
if (me != null)
{
ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
}
// Then call the listeners
callFailureListeners(me);
@ -1465,8 +1468,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|| (context.isNetworkConnection() && this.acceptorUsed
.isAuditNetworkProducers()))
{
result.setLastStoredSequenceId(protocolManager
.getPersistenceAdapter().getLastProducerSequenceId(id));
if (protocolManager.getPersistenceAdapter() != null)
{
result.setLastStoredSequenceId(protocolManager.getPersistenceAdapter().getLastProducerSequenceId(id));
}
}
SessionState ss = state.getSessionState(id.getParentId());
if (ss != null)

View File

@ -292,6 +292,15 @@ public class AMQSession implements SessionCallback
for (ActiveMQDestination dest : actualDestinations)
{
ServerMessageImpl coreMsg = new ServerMessageImpl(-1, 1024);
/* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did
* not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to
* the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the
* message comes from failover connection. If so we add a DUPLICATE_ID to handle duplicates after a resend. */
if (producerExchange.getConnectionContext().isFaultTolerant() && !messageSend.getProperties().containsKey(ServerMessage.HDR_DUPLICATE_DETECTION_ID))
{
coreMsg.putStringProperty(ServerMessage.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
}
OpenWireMessageConverter.toCoreMessage(coreMsg, messageSend, connection.getMarshaller());
SimpleString address = OpenWireUtil.toCoreAddress(dest);
coreMsg.setAddress(address);

View File

@ -96,7 +96,10 @@ public class BasicOpenWireTest extends OpenWireTestBase
while (iterQueues.hasNext())
{
SimpleString coreQ = iterQueues.next();
this.server.destroyQueue(coreQ);
if (server.locateQueue(coreQ) != null)
{
this.server.destroyQueue(coreQ);
}
System.out.println("Destroyed queue: " + coreQ);
}
testQueues.clear();

View File

@ -37,6 +37,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.concurrent.TimeUnit;
public class SimpleOpenWireTest extends BasicOpenWireTest
{
@Rule
@ -376,4 +378,44 @@ public class SimpleOpenWireTest extends BasicOpenWireTest
}
}
@Test
public void testFailoverTransportReconnect() throws Exception
{
Connection exConn = null;
try
{
String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT + ")";
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString);
Queue queue = new ActiveMQQueue(durableQueueName);
exConn = exFact.createConnection();
exConn.start();
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.send(session.createTextMessage("Test"));
MessageConsumer consumer = session.createConsumer(queue);
assertNotNull(consumer.receive(5000));
server.stop();
Thread.sleep(3000);
server.start();
server.waitForActivation(10, TimeUnit.SECONDS);
messageProducer.send(session.createTextMessage("Test2"));
assertNotNull(consumer.receive(5000));
}
finally
{
if (exConn != null)
{
exConn.close();
}
}
}
}