diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index c307a277af..39fa1964d3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -243,6 +243,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, } ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory); + server.getRemotingService().addConnectionEntry(connection, entry); protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection; connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler())); @@ -304,7 +305,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, } private static void uninstallMirrorController(AMQPMirrorBrokerConnectionElement replicaConfig, ActiveMQServer server) { - + // TODO implement this as part of https://issues.apache.org/jira/browse/ARTEMIS-2965 } /** The reason this method is static is the following: @@ -498,6 +499,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, @Override public void connectionDestroyed(Object connectionID) { + server.getRemotingService().removeConnection(connectionID); redoConnection(); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java index 90382e49e3..aad8884e49 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnectionManager.java @@ -70,7 +70,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon for (AMQPBrokerConnectConfiguration config : amqpConnectionsConfig) { NettyConnectorFactory factory = new NettyConnectorFactory().setServerConnector(true); - protonProtocolManager = (ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, null, null, null); + protonProtocolManager = (ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, config.getTransportConfigurations().get(0).getExtraParams(), null, null); NettyConnector bridgesConnector = (NettyConnector)factory.createConnector(config.getTransportConfigurations().get(0).getParams(), null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(), server.getScheduledPool(), new ClientProtocolManagerWithAMQP(protonProtocolManager)); bridgesConnector.start(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java index ba6e3901db..92f44f06b7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/RemotingService.java @@ -25,9 +25,11 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.security.ActiveMQPrincipal; import org.apache.activemq.artemis.core.server.ActiveMQComponent; +import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry; import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Acceptor; +import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.utils.ReusableLatch; public interface RemotingService { @@ -123,4 +125,6 @@ public interface RemotingService { void destroyAcceptor(String name) throws Exception; void loadProtocolServices(List protocolServices); + + void addConnectionEntry(Connection connection, ConnectionEntry entry); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java index 9e914a8176..6a1c0f3ed9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/server/impl/RemotingServiceImpl.java @@ -562,11 +562,16 @@ public class RemotingServiceImpl implements RemotingService, ServerConnectionLif logger.trace("Connection created " + connection); } - connections.put(connection.getID(), entry); + addConnectionEntry(connection, entry); connectionCountLatch.countUp(); totalConnectionCount.incrementAndGet(); } + @Override + public void addConnectionEntry(Connection connection, ConnectionEntry entry) { + connections.put(connection.getID(), entry); + } + @Override public void connectionDestroyed(final Object connectionID) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeTest.java index 085c5701d5..e4b7fbeaa2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPBridgeTest.java @@ -54,17 +54,6 @@ public class AMQPBridgeTest extends AmqpClientTestSupport { return createServer(AMQP_PORT, false); } - @Test - public void testsSimpleConnect() throws Exception { - server.start(); - server_2 = createServer(AMQP_PORT_2, false); - - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:" + AMQP_PORT); - server_2.getConfiguration().addAMQPConnection(amqpConnection); - - server_2.start(); - } - @Test public void testSimpleTransferPush() throws Exception { internalTransferPush("TEST", false); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java index a510bbe13f..369f0300bf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/QpidDispatchPeerTest.java @@ -67,7 +67,9 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { @Override protected ActiveMQServer createServer() throws Exception { - return createServer(AMQP_PORT, false); + ActiveMQServer server = createServer(AMQP_PORT, false); + server.getConfiguration().setNetworkCheckPeriod(100); + return server; } @Before @@ -81,25 +83,35 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { qpidProcess.kill(); } + @Test(timeout = 60_000) + public void testWithMatchingDifferentNamesOnQueueKill() throws Exception { + internalMultipleQueues(true, true, true); + } + @Test(timeout = 60_000) public void testWithMatchingDifferentNamesOnQueue() throws Exception { - internalMultipleQueues(true, true); + internalMultipleQueues(true, true, false); } @Test(timeout = 60_000) public void testWithMatching() throws Exception { - internalMultipleQueues(true, false); + internalMultipleQueues(true, false, false); } @Test(timeout = 60_000) public void testwithQueueName() throws Exception { - internalMultipleQueues(false, true); + internalMultipleQueues(false, false, false); } - private void internalMultipleQueues(boolean useMatching, boolean distinctNaming) throws Exception { + @Test(timeout = 60_000) + public void testwithQueueNameDistinctName() throws Exception { + internalMultipleQueues(false, true, false); + } + + private void internalMultipleQueues(boolean useMatching, boolean distinctNaming, boolean kill) throws Exception { final int numberOfMessages = 100; final int numberOfQueues = 10; - AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:24622").setRetryInterval(10).setReconnectAttempts(-1); + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("test", "tcp://localhost:24622?amqpIdleTimeout=1000").setRetryInterval(10).setReconnectAttempts(-1); if (useMatching) { amqpConnection.addElement(new AMQPBrokerConnectionElement().setMatchAddress("queue.#").setType(AMQPBrokerConnectionAddressType.PEER)); } else { @@ -118,7 +130,7 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { ConnectionFactory factoryProducer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622"); Connection connection = null; - connection = createConnectionDumbRetry(factoryProducer, connection); + connection = createConnectionDumbRetry(factoryProducer); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queue = session.createQueue("queue.test" + dest); @@ -135,10 +147,14 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { connection.close(); } + if (kill) { + stopQpidRouter(); + startQpidRouter(); + } for (int dest = 0; dest < numberOfQueues; dest++) { ConnectionFactory factoryConsumer = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:24622"); - Connection connectionConsumer = factoryConsumer.createConnection(); + Connection connectionConsumer = createConnectionDumbRetry(factoryConsumer); Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue queueConsumer = sessionConsumer.createQueue("queue.test" + dest); MessageConsumer consumer = sessionConsumer.createConsumer(queueConsumer); @@ -167,7 +183,6 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { org.apache.activemq.artemis.core.server.Queue testQueueOnServer = server.locateQueue(createQueueName(dest, distinctNaming)); Wait.assertEquals(0, testQueueOnServer::getMessageCount); } - } private String createQueueName(int i, boolean useDistinctName) { @@ -178,18 +193,16 @@ public class QpidDispatchPeerTest extends AmqpClientTestSupport { } } - private Connection createConnectionDumbRetry(ConnectionFactory factoryProducer, - Connection connection) throws InterruptedException { + private Connection createConnectionDumbRetry(ConnectionFactory factoryProducer) throws InterruptedException { for (int i = 0; i < 100; i++) { try { // Some retry - connection = factoryProducer.createConnection(); - break; + return factoryProducer.createConnection(); } catch (Exception e) { Thread.sleep(10); } } - return connection; + return null; } }