diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 90dfe97b42..499fb4bbaf 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -40,6 +40,7 @@ import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException; @@ -299,11 +300,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } if (response instanceof ExceptionResponse) { + Throwable cause = ((ExceptionResponse)response).getException(); if (!responseRequired) { - Throwable cause = ((ExceptionResponse) response).getException(); serviceException(cause); response = null; } + // If there was an exception when processing ConnectionInfo we should + // stop the connection to prevent dangling sockets + if (command instanceof ConnectionInfo) { + delayedStop(2000, cause.getMessage(), cause); + } } if (responseRequired) { @@ -640,13 +646,29 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } try { - protocolManager.removeConnection(this.getConnectionInfo(), me); + if (this.getConnectionInfo() != null) { + protocolManager.removeConnection(this.getConnectionInfo(), me); + } } catch (InvalidClientIDException e) { ActiveMQServerLogger.LOGGER.warn("Couldn't close connection because invalid clientID", e); } shutdown(true); } + private void delayedStop(final int waitTimeMillis, final String reason, Throwable cause) { + if (waitTimeMillis > 0) { + try { + protocolManager.getScheduledPool().schedule(() -> { + fail(new ActiveMQException(reason, cause, ActiveMQExceptionType.GENERIC_EXCEPTION), reason); + ActiveMQServerLogger.LOGGER.warn("Stopping " + transportConnection.getRemoteAddress() + "because " + + reason); + }, waitTimeMillis, TimeUnit.MILLISECONDS); + } catch (Throwable t) { + ActiveMQServerLogger.LOGGER.warn("Cannot stop connection. This exception will be ignored.", t); + } + } + } + public void setAdvisorySession(AMQSession amqSession) { this.advisorySession = amqSession; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ConnectionErrorSocketCloseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ConnectionErrorSocketCloseTest.java new file mode 100644 index 0000000000..1c58bcce3d --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ConnectionErrorSocketCloseTest.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire.amq; + +import javax.jms.Connection; + +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Test; + + +public class ConnectionErrorSocketCloseTest extends BasicOpenWireTest { + + @Override + protected void createFactories() { + super.createFactories(); + factory.setClientID("id"); + } + + //We want to make sure that the socket will be closed if there as an error on broker.addConnection + //even if the client doesn't close the connection to prevent dangling open sockets + @Test(timeout = 60000) + public void testDuplicateClientIdCloseConnection() throws Exception { + connection.start(); + Wait.waitFor(() -> getActiveMQServer().getRemotingService().getConnections().size() == 1, 10000, 500); + + try (Connection con = factory.createConnection()) { + // Try and create second connection the second should fail because of a + // duplicate clientId + try { + // Should fail because of previously started connection with same + // client Id + con.start(); + fail("Should have exception"); + } catch (Exception e) { + e.printStackTrace(); + } + + // after 2 seconds the second connection should be terminated by the + // broker because of the exception + assertTrue(Wait.waitFor(() -> getActiveMQServer().getRemotingService().getConnections().size() == 1, 10000, 500)); + } + } + + @SuppressWarnings("deprecation") + private ActiveMQServer getActiveMQServer() { + return jmsServer.getActiveMQServer(); + } + +}