diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java index d852a3aab1..bab287f045 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java @@ -21,6 +21,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.balancing.RedirectHandler; import org.apache.activemq.artemis.utils.ConfigurationHelper; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.ConnectionError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Connection; @@ -44,7 +45,10 @@ public class AMQPRedirectHandler extends RedirectHandler { ErrorCondition error = new ErrorCondition(); error.setCondition(ConnectionError.CONNECTION_FORCED); error.setDescription(String.format("Broker balancer %s is not ready to redirect", context.getConnection().getTransportConnection().getRedirectTo())); - context.getProtonConnection().setCondition(error); + + Connection protonConnection = context.getProtonConnection(); + protonConnection.setCondition(error); + addConnectionOpenFailureHint(protonConnection); } @Override @@ -55,10 +59,20 @@ public class AMQPRedirectHandler extends RedirectHandler { ErrorCondition error = new ErrorCondition(); error.setCondition(ConnectionError.REDIRECT); error.setDescription(String.format("Connection redirected to %s:%d by broker balancer %s", host, port, context.getConnection().getTransportConnection().getRedirectTo())); - Map info = new HashMap(); + Map info = new HashMap<>(); info.put(AmqpSupport.NETWORK_HOST, host); info.put(AmqpSupport.PORT, port); error.setInfo(info); - context.getProtonConnection().setCondition(error); + + Connection protonConnection = context.getProtonConnection(); + protonConnection.setCondition(error); + addConnectionOpenFailureHint(protonConnection); + } + + private void addConnectionOpenFailureHint(Connection connection) { + Map connProps = new HashMap<>(); + connProps.put(AmqpSupport.CONNECTION_OPEN_FAILED, true); + + connection.setProperties(connProps); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java index 3da8f878c6..f6189d0425 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java @@ -68,8 +68,8 @@ public class AmqpSupport { public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted"); public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced"); public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS"); - static final Symbol NETWORK_HOST = Symbol.valueOf("network-host"); - static final Symbol PORT = Symbol.valueOf("port"); + public static final Symbol NETWORK_HOST = Symbol.valueOf("network-host"); + public static final Symbol PORT = Symbol.valueOf("port"); static final Symbol SCHEME = Symbol.valueOf("scheme"); static final Symbol HOSTNAME = Symbol.valueOf("hostname"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java new file mode 100644 index 0000000000..e3768486e7 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.balancing; + +import java.net.URI; +import java.util.Map; + +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.balancing.policies.FirstElementPolicy; +import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey; +import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpValidator; +import org.apache.qpid.proton.amqp.transport.ConnectionError; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Connection; +import org.junit.Test; + +/** + * Note: The primary balancer tests for AMQP clients are in e.g {@link RedirectTest} along with those for other protocols. + * + * This class only adds some additional validations that are AMQP-specific. + */ +public class AmqpRedirectTest extends BalancingTestBase { + + @Test + public void testBalancerRejectionDueToOfflineTargetPool() throws Exception { + setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false); + setupLiveServerWithDiscovery(1, GROUP_ADDRESS, GROUP_PORT, true, true, false); + + // Zero quorum size to avoid the quorum delay, given it will never be satisfied + setupBalancerServerWithStaticConnectors(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 0, 1); + + // Only start the balancer, so it can never become ready to redirect. + startServers(0); + + URI uri = new URI("tcp://localhost:" + TransportConstants.DEFAULT_PORT); + AmqpClient client = new AmqpClient(uri, "admin", "admin"); + + AmqpConnection connection = client.createConnection(); + connection.setContainerId(getName()); + + connection.setStateInspector(new AmqpValidator() { + + @Override + public void inspectOpenedResource(Connection connection) { + if (!connection.getRemoteProperties().containsKey(AmqpSupport.CONNECTION_OPEN_FAILED)) { + markAsInvalid("Broker did not set connection establishment failed hint"); + } + } + + @Override + public void inspectClosedResource(Connection connection) { + ErrorCondition remoteError = connection.getRemoteCondition(); + if (remoteError == null || remoteError.getCondition() == null) { + markAsInvalid("Broker did not add error condition for connection"); + return; + } + + if (!remoteError.getCondition().equals(ConnectionError.CONNECTION_FORCED)) { + markAsInvalid("Broker did not set condition to " + ConnectionError.CONNECTION_FORCED); + return; + } + + String expectedDescription = "Broker balancer " + BROKER_BALANCER_NAME + " is not ready to redirect"; + String actualDescription = remoteError.getDescription(); + if (!expectedDescription.equals(actualDescription)) { + markAsInvalid("Broker did not set description as expected, was: " + actualDescription); + return; + } + } + }); + + try { + connection.connect(); + fail("Expected connection to fail, without redirect"); + } catch (Exception e) { + // Expected + } + + connection.getStateInspector().assertValid(); + connection.close(); + + stopServers(0); + } + + @Test + public void testBalancerRedirectDetails() throws Exception { + setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false); + setupLiveServerWithDiscovery(1, GROUP_ADDRESS, GROUP_PORT, true, true, false); + + setupBalancerServerWithStaticConnectors(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 1, 1); + + startServers(0, 1); + + URI uri = new URI("tcp://localhost:" + TransportConstants.DEFAULT_PORT); + AmqpClient client = new AmqpClient(uri, "admin", "admin"); + + AmqpConnection connection = client.createConnection(); + connection.setContainerId(getName()); + + connection.setStateInspector(new AmqpValidator() { + + @Override + public void inspectOpenedResource(Connection connection) { + if (!connection.getRemoteProperties().containsKey(AmqpSupport.CONNECTION_OPEN_FAILED)) { + markAsInvalid("Broker did not set connection establishment failed hint"); + } + } + + @Override + public void inspectClosedResource(Connection connection) { + ErrorCondition remoteError = connection.getRemoteCondition(); + if (remoteError == null || remoteError.getCondition() == null) { + markAsInvalid("Broker did not add error condition for connection"); + return; + } + + if (!remoteError.getCondition().equals(ConnectionError.REDIRECT)) { + markAsInvalid("Broker did not set condition to " + ConnectionError.REDIRECT); + return; + } + + Integer redirectPort = TransportConstants.DEFAULT_PORT + 1; + + String expectedDescription = "Connection redirected to localhost:" + redirectPort + " by broker balancer " + BROKER_BALANCER_NAME; + String actualDescription = remoteError.getDescription(); + if (!expectedDescription.equals(actualDescription)) { + markAsInvalid("Broker did not set description as expected, was: " + actualDescription); + return; + } + + // Validate the info map contains expected redirect info + Map infoMap = remoteError.getInfo(); + if (infoMap == null) { + markAsInvalid("Broker did not set an info map on condition with redirect details"); + return; + } + + if (!infoMap.containsKey(AmqpSupport.NETWORK_HOST)) { + markAsInvalid("Info map does not contain key " + AmqpSupport.NETWORK_HOST); + return; + } else { + Object value = infoMap.get(AmqpSupport.NETWORK_HOST); + if (!"localhost".equals(value)) { + markAsInvalid("Info map does not contain expected network-host value, was: " + value); + return; + } + } + + if (!infoMap.containsKey(AmqpSupport.PORT)) { + markAsInvalid("Info map does not contain key " + AmqpSupport.PORT); + return; + } else { + Object value = infoMap.get(AmqpSupport.PORT); + if (value == null || !redirectPort.equals(value)) { + markAsInvalid("Info map does not contain expected port value, was: " + value); + return; + } + } + } + }); + + try { + connection.connect(); + fail("Expected connection to fail, with redirect"); + } catch (Exception e) { + // Expected + } + + connection.getStateInspector().assertValid(); + connection.close(); + + stopServers(0, 1); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java index 185c552e1c..4c18af3f93 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/RedirectTest.java @@ -183,7 +183,7 @@ public class RedirectTest extends BalancingTestBase { queueControls[node] = (QueueControl)getServer(node).getManagementService() .getResource(ResourceNames.QUEUE + queueName); - Assert.assertEquals(0, queueControls[node].countMessages()); + Assert.assertEquals("Unexpected messagecount for node " + node, 0, queueControls[node].countMessages()); } @@ -229,7 +229,7 @@ public class RedirectTest extends BalancingTestBase { } for (int node : nodes) { - Assert.assertEquals(0, queueControls[node].countMessages()); + Assert.assertEquals("Unexpected message count for node " + node, 0, queueControls[node].countMessages()); } stopServers(nodes); @@ -266,8 +266,8 @@ public class RedirectTest extends BalancingTestBase { QueueControl queueControl1 = (QueueControl)getServer(1).getManagementService() .getResource(ResourceNames.QUEUE + queueName); - Assert.assertEquals(0, queueControl0.countMessages()); - Assert.assertEquals(0, queueControl1.countMessages()); + Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages()); + Assert.assertEquals("Unexpected message count for node 1", 0, queueControl1.countMessages()); ConnectionFactory connectionFactory0 = createFactory(protocol, false, TransportConstants.DEFAULT_HOST, TransportConstants.DEFAULT_PORT + 0, null, "admin", "admin"); @@ -302,8 +302,8 @@ public class RedirectTest extends BalancingTestBase { } } - Assert.assertEquals(0, queueControl0.countMessages()); - Assert.assertEquals(0, queueControl1.countMessages()); + Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages()); + Assert.assertEquals("Unexpected message count for node 1", 0, queueControl1.countMessages()); stopServers(0, 1); } @@ -339,9 +339,9 @@ public class RedirectTest extends BalancingTestBase { QueueControl queueControl2 = (QueueControl)getServer(2).getManagementService() .getResource(ResourceNames.QUEUE + queueName); - Assert.assertEquals(0, queueControl0.countMessages()); - Assert.assertEquals(0, queueControl1.countMessages()); - Assert.assertEquals(0, queueControl2.countMessages()); + Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages()); + Assert.assertEquals("Unexpected message count for node 1", 0, queueControl1.countMessages()); + Assert.assertEquals("Unexpected message count for node 2", 0, queueControl2.countMessages()); int failedNode; ConnectionFactory connectionFactory = createFactory(protocol, false, TransportConstants.DEFAULT_HOST, @@ -370,9 +370,9 @@ public class RedirectTest extends BalancingTestBase { startServers(failedNode); - Assert.assertEquals(0, queueControl0.countMessages()); - Assert.assertEquals(1, queueControl1.countMessages()); - Assert.assertEquals(1, queueControl2.countMessages()); + Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages()); + Assert.assertEquals("Unexpected message count for node 1", 1, queueControl1.countMessages()); + Assert.assertEquals("Unexpected message count for node 2", 1, queueControl2.countMessages()); try (Connection connection = connectionFactory.createConnection()) { connection.start(); @@ -385,13 +385,13 @@ public class RedirectTest extends BalancingTestBase { } } - Assert.assertEquals(0, queueControl0.countMessages()); + Assert.assertEquals("Unexpected message count for node 0", 0, queueControl0.countMessages()); if (failedNode == 1) { - Assert.assertEquals(1, queueControl1.countMessages()); - Assert.assertEquals(0, queueControl2.countMessages()); + Assert.assertEquals("Unexpected message count for node 1", 1, queueControl1.countMessages()); + Assert.assertEquals("Unexpected message count for node 2", 0, queueControl2.countMessages()); } else { - Assert.assertEquals(0, queueControl1.countMessages()); - Assert.assertEquals(1, queueControl2.countMessages()); + Assert.assertEquals("Unexpected message count for node 1", 0, queueControl1.countMessages()); + Assert.assertEquals("Unexpected message count for node 2", 1, queueControl2.countMessages()); } stopServers(0, 1, 2);