ARTEMIS-1920 AMQP throw NPE if can't find a backup server

This commit is contained in:
Clebert Suconic 2018-06-08 16:27:42 -04:00
parent 22f3f02aea
commit de0747a9a4
3 changed files with 89 additions and 2 deletions

View File

@ -291,7 +291,9 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
ClusterConnection clusterConnection = clusterManager.getDefaultConnection(null);
if (clusterConnection != null) {
TopologyMemberImpl member = clusterConnection.getTopology().getMember(server.getNodeID().toString());
return member.toBackupURI();
if (member != null) {
return member.toBackupURI();
}
}
return null;
}

View File

@ -258,7 +258,7 @@ public final class BindingsImpl implements Bindings {
if (entry.getValue() instanceof RemoteQueueBinding) {
RemoteQueueBinding remoteQueueBinding = (RemoteQueueBinding) entry.getValue();
if (remoteQueueBinding.getRemoteQueueID() == id) {
message.putBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
message.putExtraBytesProperty(Message.HDR_ROUTE_TO_IDS, ByteBuffer.allocate(8).putLong(remoteQueueBinding.getID()).array());
}
}
}

View File

@ -26,6 +26,7 @@ import javax.jms.Session;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
@ -37,7 +38,9 @@ import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImp
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.junit.Wait;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
@ -282,6 +285,88 @@ public class ProtocolsMessageLoadBalancingTest extends ClusterTestBase {
connection.close();
}
@Test
public void testRestartConnection() throws Exception {
startServers(MessageLoadBalancingType.STRICT);
System.out.println("connections " + servers[1].getRemotingService().getConnections().size());
Wait.assertEquals(3, () -> servers[1].getRemotingService().getConnections().size());
Wait.assertEquals(3, () -> servers[0].getRemotingService().getConnections().size());
RemotingConnection[] connectionsServer1 = servers[1].getRemotingService().getConnections().toArray(new RemotingConnection[3]);
RemotingConnection[] connectionsServer0 = servers[0].getRemotingService().getConnections().toArray(new RemotingConnection[3]);
ConnectionFactory[] factory = new ConnectionFactory[NUMBER_OF_SERVERS];
Connection[] connection = new Connection[NUMBER_OF_SERVERS];
Session[] session = new Session[NUMBER_OF_SERVERS];
MessageConsumer[] consumer = new MessageConsumer[NUMBER_OF_SERVERS];
// this will pre create consumers to make sure messages are distributed evenly without redistribution
for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
factory[node] = getJmsConnectionFactory(node);
connection[node] = factory[node].createConnection();
session[node] = connection[node].createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer[node] = session[node].createConsumer(session[node].createQueue(queueName.toString()));
}
waitForBindings(0, "queues.0", 1, 1, true);
waitForBindings(1, "queues.0", 1, 1, true);
waitForBindings(0, "queues.0", 1, 1, false);
waitForBindings(1, "queues.0", 1, 1, false);
for (RemotingConnection remotingConnection : servers[1].getRemotingService().getConnections()) {
remotingConnection.fail(new ActiveMQException("forcing failure"));
}
for (RemotingConnection remotingConnection : servers[1].getRemotingService().getConnections()) {
remotingConnection.fail(new ActiveMQException("forcing failure"));
}
// this is to allow reconnects
Thread.sleep(500);
// this will pre create consumers to make sure messages are distributed evenly without redistribution
for (int node = 0; node < NUMBER_OF_SERVERS; node++) {
try {
connection[node].close();
} catch (Throwable e) {
e.printStackTrace();
}
factory[node] = getJmsConnectionFactory(node);
connection[node] = factory[node].createConnection();
session[node] = connection[node].createSession(false, Session.AUTO_ACKNOWLEDGE);
consumer[node] = session[node].createConsumer(session[node].createQueue(queueName.toString()));
}
waitForBindings(0, "queues.0", 1, 1, true);
waitForBindings(1, "queues.0", 1, 1, true);
waitForBindings(0, "queues.0", 1, 1, false);
waitForBindings(1, "queues.0", 1, 1, false);
System.out.println("connections " + servers[1].getRemotingService().getConnections().size());
// sending Messages.. they should be load balanced
{
ConnectionFactory cf = getJmsConnectionFactory(0);
Connection cn = cf.createConnection();
Session sn = cn.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer pd = sn.createProducer(sn.createQueue(queueName.toString()));
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
pd.send(sn.createTextMessage("hello " + i));
}
cn.close();
}
receiveMessages(connection[0], consumer[0], NUMBER_OF_MESSAGES / 2, true);
receiveMessages(connection[1], consumer[1], NUMBER_OF_MESSAGES / 2, true);
}
private void receiveMessages(Connection connection,
MessageConsumer messageConsumer,
int messageCount,