ARTEMIS-3520: set the Open failure hint when balancer redirects or refuses AMQP connection, add lower level test for balancer related redirect/refusal protocol behaviour

This commit is contained in:
Robbie Gemmell 2021-10-06 18:01:59 +01:00 committed by Gary Tully
parent a5b5a504e0
commit d7f37ae313
4 changed files with 228 additions and 22 deletions

View File

@ -21,6 +21,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.balancing.RedirectHandler;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ConnectionError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
@ -44,7 +45,10 @@ public class AMQPRedirectHandler extends RedirectHandler<AMQPRedirectContext> {
ErrorCondition error = new ErrorCondition();
error.setCondition(ConnectionError.CONNECTION_FORCED);
error.setDescription(String.format("Broker balancer %s is not ready to redirect", context.getConnection().getTransportConnection().getRedirectTo()));
context.getProtonConnection().setCondition(error);
Connection protonConnection = context.getProtonConnection();
protonConnection.setCondition(error);
addConnectionOpenFailureHint(protonConnection);
}
@Override
@ -55,10 +59,20 @@ public class AMQPRedirectHandler extends RedirectHandler<AMQPRedirectContext> {
ErrorCondition error = new ErrorCondition();
error.setCondition(ConnectionError.REDIRECT);
error.setDescription(String.format("Connection redirected to %s:%d by broker balancer %s", host, port, context.getConnection().getTransportConnection().getRedirectTo()));
Map info = new HashMap();
Map<Symbol, Object> info = new HashMap<>();
info.put(AmqpSupport.NETWORK_HOST, host);
info.put(AmqpSupport.PORT, port);
error.setInfo(info);
context.getProtonConnection().setCondition(error);
Connection protonConnection = context.getProtonConnection();
protonConnection.setCondition(error);
addConnectionOpenFailureHint(protonConnection);
}
private void addConnectionOpenFailureHint(Connection connection) {
Map<Symbol, Object> connProps = new HashMap<>();
connProps.put(AmqpSupport.CONNECTION_OPEN_FAILED, true);
connection.setProperties(connProps);
}
}

View File

@ -68,8 +68,8 @@ public class AmqpSupport {
public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted");
public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced");
public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS");
static final Symbol NETWORK_HOST = Symbol.valueOf("network-host");
static final Symbol PORT = Symbol.valueOf("port");
public static final Symbol NETWORK_HOST = Symbol.valueOf("network-host");
public static final Symbol PORT = Symbol.valueOf("port");
static final Symbol SCHEME = Symbol.valueOf("scheme");
static final Symbol HOSTNAME = Symbol.valueOf("hostname");

View File

@ -0,0 +1,192 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.balancing;
import java.net.URI;
import java.util.Map;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.balancing.policies.FirstElementPolicy;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpValidator;
import org.apache.qpid.proton.amqp.transport.ConnectionError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
/**
* Note: The primary balancer tests for AMQP clients are in e.g {@link RedirectTest} along with those for other protocols.
*
* This class only adds some additional validations that are AMQP-specific.
*/
public class AmqpRedirectTest extends BalancingTestBase {
@Test
public void testBalancerRejectionDueToOfflineTargetPool() throws Exception {
setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false);
setupLiveServerWithDiscovery(1, GROUP_ADDRESS, GROUP_PORT, true, true, false);
// Zero quorum size to avoid the quorum delay, given it will never be satisfied
setupBalancerServerWithStaticConnectors(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 0, 1);
// Only start the balancer, so it can never become ready to redirect.
startServers(0);
URI uri = new URI("tcp://localhost:" + TransportConstants.DEFAULT_PORT);
AmqpClient client = new AmqpClient(uri, "admin", "admin");
AmqpConnection connection = client.createConnection();
connection.setContainerId(getName());
connection.setStateInspector(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {
if (!connection.getRemoteProperties().containsKey(AmqpSupport.CONNECTION_OPEN_FAILED)) {
markAsInvalid("Broker did not set connection establishment failed hint");
}
}
@Override
public void inspectClosedResource(Connection connection) {
ErrorCondition remoteError = connection.getRemoteCondition();
if (remoteError == null || remoteError.getCondition() == null) {
markAsInvalid("Broker did not add error condition for connection");
return;
}
if (!remoteError.getCondition().equals(ConnectionError.CONNECTION_FORCED)) {
markAsInvalid("Broker did not set condition to " + ConnectionError.CONNECTION_FORCED);
return;
}
String expectedDescription = "Broker balancer " + BROKER_BALANCER_NAME + " is not ready to redirect";
String actualDescription = remoteError.getDescription();
if (!expectedDescription.equals(actualDescription)) {
markAsInvalid("Broker did not set description as expected, was: " + actualDescription);
return;
}
}
});
try {
connection.connect();
fail("Expected connection to fail, without redirect");
} catch (Exception e) {
// Expected
}
connection.getStateInspector().assertValid();
connection.close();
stopServers(0);
}
@Test
public void testBalancerRedirectDetails() throws Exception {
setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false);
setupLiveServerWithDiscovery(1, GROUP_ADDRESS, GROUP_PORT, true, true, false);
setupBalancerServerWithStaticConnectors(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 1, 1);
startServers(0, 1);
URI uri = new URI("tcp://localhost:" + TransportConstants.DEFAULT_PORT);
AmqpClient client = new AmqpClient(uri, "admin", "admin");
AmqpConnection connection = client.createConnection();
connection.setContainerId(getName());
connection.setStateInspector(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {
if (!connection.getRemoteProperties().containsKey(AmqpSupport.CONNECTION_OPEN_FAILED)) {
markAsInvalid("Broker did not set connection establishment failed hint");
}
}
@Override
public void inspectClosedResource(Connection connection) {
ErrorCondition remoteError = connection.getRemoteCondition();
if (remoteError == null || remoteError.getCondition() == null) {
markAsInvalid("Broker did not add error condition for connection");
return;
}
if (!remoteError.getCondition().equals(ConnectionError.REDIRECT)) {
markAsInvalid("Broker did not set condition to " + ConnectionError.REDIRECT);
return;
}
Integer redirectPort = TransportConstants.DEFAULT_PORT + 1;
String expectedDescription = "Connection redirected to localhost:" + redirectPort + " by broker balancer " + BROKER_BALANCER_NAME;
String actualDescription = remoteError.getDescription();
if (!expectedDescription.equals(actualDescription)) {
markAsInvalid("Broker did not set description as expected, was: " + actualDescription);
return;
}
// Validate the info map contains expected redirect info
Map<?, ?> infoMap = remoteError.getInfo();
if (infoMap == null) {
markAsInvalid("Broker did not set an info map on condition with redirect details");
return;
}
if (!infoMap.containsKey(AmqpSupport.NETWORK_HOST)) {
markAsInvalid("Info map does not contain key " + AmqpSupport.NETWORK_HOST);
return;
} else {
Object value = infoMap.get(AmqpSupport.NETWORK_HOST);
if (!"localhost".equals(value)) {
markAsInvalid("Info map does not contain expected network-host value, was: " + value);
return;
}
}
if (!infoMap.containsKey(AmqpSupport.PORT)) {
markAsInvalid("Info map does not contain key " + AmqpSupport.PORT);
return;
} else {
Object value = infoMap.get(AmqpSupport.PORT);
if (value == null || !redirectPort.equals(value)) {
markAsInvalid("Info map does not contain expected port value, was: " + value);
return;
}
}
}
});
try {
connection.connect();
fail("Expected connection to fail, with redirect");
} catch (Exception e) {
// Expected
}
connection.getStateInspector().assertValid();
connection.close();
stopServers(0, 1);
}
}

View File

@ -183,7 +183,7 @@ public class RedirectTest extends BalancingTestBase {
queueControls[node] = (QueueControl)getServer(node).getManagementService()
.getResource(ResourceNames.QUEUE + queueName);
Assert.assertEquals(0, queueControls[node].countMessages());
Assert.assertEquals("Unexpected messagecount for node " + node, 0, queueControls[node].countMessages());
}
@ -229,7 +229,7 @@ public class RedirectTest extends BalancingTestBase {
}
for (int node : nodes) {
Assert.assertEquals(0, queueControls[node].countMessages());
Assert.assertEquals("Unexpected message count for node " + node, 0, queueControls[node].countMessages());
}
stopServers(nodes);
@ -266,8 +266,8 @@ public class RedirectTest extends BalancingTestBase {
QueueControl queueControl1 = (QueueControl)getServer(1).getManagementService()
.getResource(ResourceNames.QUEUE + queueName);
Assert.assertEquals(0, queueControl0.countMessages());
Assert.assertEquals(0, queueControl1.countMessages());
Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages());
Assert.assertEquals("Unexpected message count for node 1", 0, queueControl1.countMessages());
ConnectionFactory connectionFactory0 = createFactory(protocol, false, TransportConstants.DEFAULT_HOST,
TransportConstants.DEFAULT_PORT + 0, null, "admin", "admin");
@ -302,8 +302,8 @@ public class RedirectTest extends BalancingTestBase {
}
}
Assert.assertEquals(0, queueControl0.countMessages());
Assert.assertEquals(0, queueControl1.countMessages());
Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages());
Assert.assertEquals("Unexpected message count for node 1", 0, queueControl1.countMessages());
stopServers(0, 1);
}
@ -339,9 +339,9 @@ public class RedirectTest extends BalancingTestBase {
QueueControl queueControl2 = (QueueControl)getServer(2).getManagementService()
.getResource(ResourceNames.QUEUE + queueName);
Assert.assertEquals(0, queueControl0.countMessages());
Assert.assertEquals(0, queueControl1.countMessages());
Assert.assertEquals(0, queueControl2.countMessages());
Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages());
Assert.assertEquals("Unexpected message count for node 1", 0, queueControl1.countMessages());
Assert.assertEquals("Unexpected message count for node 2", 0, queueControl2.countMessages());
int failedNode;
ConnectionFactory connectionFactory = createFactory(protocol, false, TransportConstants.DEFAULT_HOST,
@ -370,9 +370,9 @@ public class RedirectTest extends BalancingTestBase {
startServers(failedNode);
Assert.assertEquals(0, queueControl0.countMessages());
Assert.assertEquals(1, queueControl1.countMessages());
Assert.assertEquals(1, queueControl2.countMessages());
Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages());
Assert.assertEquals("Unexpected message count for node 1", 1, queueControl1.countMessages());
Assert.assertEquals("Unexpected message count for node 2", 1, queueControl2.countMessages());
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
@ -385,13 +385,13 @@ public class RedirectTest extends BalancingTestBase {
}
}
Assert.assertEquals(0, queueControl0.countMessages());
Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages());
if (failedNode == 1) {
Assert.assertEquals(1, queueControl1.countMessages());
Assert.assertEquals(0, queueControl2.countMessages());
Assert.assertEquals("Unexpected message count for node 1", 1, queueControl1.countMessages());
Assert.assertEquals("Unexpected message count for node 2", 0, queueControl2.countMessages());
} else {
Assert.assertEquals(0, queueControl1.countMessages());
Assert.assertEquals(1, queueControl2.countMessages());
Assert.assertEquals("Unexpected message count for node 1", 0, queueControl1.countMessages());
Assert.assertEquals("Unexpected message count for node 2", 1, queueControl2.countMessages());
}
stopServers(0, 1, 2);