ARTEMIS-1727 - Make sure transport is stopped on failed OpenWire
connection To prevent a socket from hanging open by a bad client the broker should make sure to stop the transport if a connection attempt fails by an OpenWire client
This commit is contained in:
parent
95b7438e7a
commit
29250466ae
|
@ -40,6 +40,7 @@ import org.apache.activemq.advisory.AdvisorySupport;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQAddressExistsException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
|
||||
|
@ -299,11 +300,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
}
|
||||
|
||||
if (response instanceof ExceptionResponse) {
|
||||
Throwable cause = ((ExceptionResponse)response).getException();
|
||||
if (!responseRequired) {
|
||||
Throwable cause = ((ExceptionResponse) response).getException();
|
||||
serviceException(cause);
|
||||
response = null;
|
||||
}
|
||||
// If there was an exception when processing ConnectionInfo we should
|
||||
// stop the connection to prevent dangling sockets
|
||||
if (command instanceof ConnectionInfo) {
|
||||
delayedStop(2000, cause.getMessage(), cause);
|
||||
}
|
||||
}
|
||||
|
||||
if (responseRequired) {
|
||||
|
@ -640,13 +646,29 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
}
|
||||
}
|
||||
try {
|
||||
if (this.getConnectionInfo() != null) {
|
||||
protocolManager.removeConnection(this.getConnectionInfo(), me);
|
||||
}
|
||||
} catch (InvalidClientIDException e) {
|
||||
ActiveMQServerLogger.LOGGER.warn("Couldn't close connection because invalid clientID", e);
|
||||
}
|
||||
shutdown(true);
|
||||
}
|
||||
|
||||
private void delayedStop(final int waitTimeMillis, final String reason, Throwable cause) {
|
||||
if (waitTimeMillis > 0) {
|
||||
try {
|
||||
protocolManager.getScheduledPool().schedule(() -> {
|
||||
fail(new ActiveMQException(reason, cause, ActiveMQExceptionType.GENERIC_EXCEPTION), reason);
|
||||
ActiveMQServerLogger.LOGGER.warn("Stopping " + transportConnection.getRemoteAddress() + "because " +
|
||||
reason);
|
||||
}, waitTimeMillis, TimeUnit.MILLISECONDS);
|
||||
} catch (Throwable t) {
|
||||
ActiveMQServerLogger.LOGGER.warn("Cannot stop connection. This exception will be ignored.", t);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void setAdvisorySession(AMQSession amqSession) {
|
||||
this.advisorySession = amqSession;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,65 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.openwire.amq;
|
||||
|
||||
import javax.jms.Connection;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
|
||||
import org.apache.activemq.artemis.tests.util.Wait;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
public class ConnectionErrorSocketCloseTest extends BasicOpenWireTest {
|
||||
|
||||
@Override
|
||||
protected void createFactories() {
|
||||
super.createFactories();
|
||||
factory.setClientID("id");
|
||||
}
|
||||
|
||||
//We want to make sure that the socket will be closed if there as an error on broker.addConnection
|
||||
//even if the client doesn't close the connection to prevent dangling open sockets
|
||||
@Test(timeout = 60000)
|
||||
public void testDuplicateClientIdCloseConnection() throws Exception {
|
||||
connection.start();
|
||||
Wait.waitFor(() -> getActiveMQServer().getRemotingService().getConnections().size() == 1, 10000, 500);
|
||||
|
||||
try (Connection con = factory.createConnection()) {
|
||||
// Try and create second connection the second should fail because of a
|
||||
// duplicate clientId
|
||||
try {
|
||||
// Should fail because of previously started connection with same
|
||||
// client Id
|
||||
con.start();
|
||||
fail("Should have exception");
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
|
||||
// after 2 seconds the second connection should be terminated by the
|
||||
// broker because of the exception
|
||||
assertTrue(Wait.waitFor(() -> getActiveMQServer().getRemotingService().getConnections().size() == 1, 10000, 500));
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("deprecation")
|
||||
private ActiveMQServer getActiveMQServer() {
|
||||
return jmsServer.getActiveMQServer();
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue