merge PR #272 - Open wire protocol fixes
This commit is contained in:
commit
e2e73ffa07
|
@ -84,7 +84,7 @@ public class AMQConnectorImpl implements AMQConnector
|
|||
public boolean isAllowLinkStealing()
|
||||
{
|
||||
// TODO Auto-generated method stub
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -447,8 +447,11 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
@Override
|
||||
public void fail(ActiveMQException me)
|
||||
{
|
||||
ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(),
|
||||
me.getType());
|
||||
if (me != null)
|
||||
{
|
||||
ActiveMQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType());
|
||||
}
|
||||
|
||||
// Then call the listeners
|
||||
callFailureListeners(me);
|
||||
|
||||
|
@ -1465,8 +1468,10 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor
|
|||
|| (context.isNetworkConnection() && this.acceptorUsed
|
||||
.isAuditNetworkProducers()))
|
||||
{
|
||||
result.setLastStoredSequenceId(protocolManager
|
||||
.getPersistenceAdapter().getLastProducerSequenceId(id));
|
||||
if (protocolManager.getPersistenceAdapter() != null)
|
||||
{
|
||||
result.setLastStoredSequenceId(protocolManager.getPersistenceAdapter().getLastProducerSequenceId(id));
|
||||
}
|
||||
}
|
||||
SessionState ss = state.getSessionState(id.getParentId());
|
||||
if (ss != null)
|
||||
|
|
|
@ -232,6 +232,13 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>
|
|||
{
|
||||
case CommandTypes.CONNECTION_INFO:
|
||||
break;
|
||||
case CommandTypes.CONNECTION_CONTROL:
|
||||
/** The ConnectionControl packet sent from client informs the broker that is capable of supporting dynamic
|
||||
* failover and load balancing. These features are not yet implemented for Artemis OpenWire. Instead we
|
||||
* simply drop the packet. See: ACTIVEMQ6-108 */
|
||||
break;
|
||||
case CommandTypes.CONSUMER_CONTROL:
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException("Cannot handle command: " + command);
|
||||
}
|
||||
|
|
|
@ -292,6 +292,15 @@ public class AMQSession implements SessionCallback
|
|||
for (ActiveMQDestination dest : actualDestinations)
|
||||
{
|
||||
ServerMessageImpl coreMsg = new ServerMessageImpl(-1, 1024);
|
||||
|
||||
/* ActiveMQ failover transport will attempt to reconnect after connection failure. Any sent messages that did
|
||||
* not receive acks will be resent. (ActiveMQ broker handles this by returning a last sequence id received to
|
||||
* the client). To handle this in Artemis we use a duplicate ID cache. To do this we check to see if the
|
||||
* message comes from failover connection. If so we add a DUPLICATE_ID to handle duplicates after a resend. */
|
||||
if (producerExchange.getConnectionContext().isFaultTolerant() && !messageSend.getProperties().containsKey(ServerMessage.HDR_DUPLICATE_DETECTION_ID))
|
||||
{
|
||||
coreMsg.putStringProperty(ServerMessage.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString());
|
||||
}
|
||||
OpenWireMessageConverter.toCoreMessage(coreMsg, messageSend, connection.getMarshaller());
|
||||
SimpleString address = OpenWireUtil.toCoreAddress(dest);
|
||||
coreMsg.setAddress(address);
|
||||
|
|
|
@ -96,7 +96,10 @@ public class BasicOpenWireTest extends OpenWireTestBase
|
|||
while (iterQueues.hasNext())
|
||||
{
|
||||
SimpleString coreQ = iterQueues.next();
|
||||
if (server.locateQueue(coreQ) != null)
|
||||
{
|
||||
this.server.destroyQueue(coreQ);
|
||||
}
|
||||
System.out.println("Destroyed queue: " + coreQ);
|
||||
}
|
||||
testQueues.clear();
|
||||
|
|
|
@ -37,6 +37,8 @@ import org.junit.Rule;
|
|||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class SimpleOpenWireTest extends BasicOpenWireTest
|
||||
{
|
||||
@Rule
|
||||
|
@ -376,4 +378,44 @@ public class SimpleOpenWireTest extends BasicOpenWireTest
|
|||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailoverTransportReconnect() throws Exception
|
||||
{
|
||||
Connection exConn = null;
|
||||
|
||||
try
|
||||
{
|
||||
String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT + ")";
|
||||
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString);
|
||||
|
||||
Queue queue = new ActiveMQQueue(durableQueueName);
|
||||
|
||||
exConn = exFact.createConnection();
|
||||
exConn.start();
|
||||
|
||||
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer messageProducer = session.createProducer(queue);
|
||||
messageProducer.send(session.createTextMessage("Test"));
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
assertNotNull(consumer.receive(5000));
|
||||
|
||||
server.stop();
|
||||
Thread.sleep(3000);
|
||||
|
||||
server.start();
|
||||
server.waitForActivation(10, TimeUnit.SECONDS);
|
||||
|
||||
messageProducer.send(session.createTextMessage("Test2"));
|
||||
assertNotNull(consumer.receive(5000));
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (exConn != null)
|
||||
{
|
||||
exConn.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue