From 6faffd690e6c15eef755d12baae88171ec4ad68b Mon Sep 17 00:00:00 2001 From: Stanislav Knot Date: Tue, 3 Oct 2017 13:57:19 +0200 Subject: [PATCH] ARTEMIS-1420 enforce timeout on network client handshake --- .../impl/netty/TransportConstants.java | 5 ++ .../core/protocol/ProtocolHandler.java | 23 ++++++ .../core/server/ActiveMQServerLogger.java | 4 + .../amqp/AmqpClientTestSupport.java | 1 - .../integration/amqp/AmqpSendReceiveTest.java | 25 +++--- tests/unit-tests/pom.xml | 12 +++ .../impl/netty/NettyHandshakeTimeoutTest.java | 79 +++++++++++++++++++ 7 files changed, 135 insertions(+), 14 deletions(-) create mode 100644 tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyHandshakeTimeoutTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index 9041348498..5d86aaafa0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -259,6 +259,10 @@ public class TransportConstants { public static final int DEFAULT_STOMP_MAX_FRAME_PAYLOAD_LENGTH = 65536; + public static final String HANDSHAKE_TIMEOUT = "handshake-timeout"; + + public static final int DEFAULT_HANDSHAKE_TIMEOUT = 10; + static { Set allowableAcceptorKeys = new HashSet<>(); allowableAcceptorKeys.add(TransportConstants.SSL_ENABLED_PROP_NAME); @@ -350,6 +354,7 @@ public class TransportConstants { allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec()); allowableConnectorKeys.add(TransportConstants.NETTY_CONNECT_TIMEOUT); allowableConnectorKeys.add(TransportConstants.USE_DEFAULT_SSL_CONTEXT_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.HANDSHAKE_TIMEOUT); ALLOWABLE_CONNECTOR_KEYS = Collections.unmodifiableSet(allowableConnectorKeys); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java index dba2ed5682..39d07e095a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import io.netty.buffer.ByteBuf; @@ -38,6 +39,7 @@ import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.remoting.impl.netty.ConnectionCreator; import org.apache.activemq.artemis.core.remoting.impl.netty.HttpAcceptorHandler; import org.apache.activemq.artemis.core.remoting.impl.netty.HttpKeepAliveRunnable; @@ -99,9 +101,25 @@ public class ProtocolHandler { private final boolean httpEnabled; + private ScheduledFuture timeoutFuture; + + private int handshakeTimeout; + + ProtocolDecoder(boolean http, boolean httpEnabled) { this.http = http; this.httpEnabled = httpEnabled; + this.handshakeTimeout = ConfigurationHelper.getIntProperty(TransportConstants.HANDSHAKE_TIMEOUT, TransportConstants.DEFAULT_HANDSHAKE_TIMEOUT, nettyAcceptor.getConfiguration()); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + if (handshakeTimeout > 0) { + timeoutFuture = scheduledThreadPool.schedule( () -> { + ActiveMQServerLogger.LOGGER.handshakeTimeout(handshakeTimeout); + ctx.channel().close(); + }, handshakeTimeout, TimeUnit.SECONDS); + } } @Override @@ -136,6 +154,11 @@ public class ProtocolHandler { return; } + if (handshakeTimeout > 0) { + timeoutFuture.cancel(true); + timeoutFuture = null; + } + final int magic1 = in.getUnsignedByte(in.readerIndex()); final int magic2 = in.getUnsignedByte(in.readerIndex() + 1); if (http && isHttp(magic1, magic2)) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java index 83ff12e798..743ff78c68 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServerLogger.java @@ -1908,4 +1908,8 @@ public interface ActiveMQServerLogger extends BasicLogger { @LogMessage(level = Logger.Level.ERROR) @Message(id = 224087, value = "Error announcing backup: backupServerLocator is null. {0}", format = Message.Format.MESSAGE_FORMAT) void errorAnnouncingBackup(String backupManager); + + @LogMessage(level = Logger.Level.ERROR) + @Message(id = 224088, value = "Timeout ({0} seconds) while handshaking has occurred.", format = Message.Format.MESSAGE_FORMAT) + void handshakeTimeout(int timeout); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java index 60b9b74b21..054e715c5e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpClientTestSupport.java @@ -185,7 +185,6 @@ public class AmqpClientTestSupport extends AmqpTestSupport { HashMap params = new HashMap<>(); params.put(TransportConstants.PORT_PROP_NAME, String.valueOf(port)); params.put(TransportConstants.PROTOCOLS_PROP_NAME, getConfiguredProtocols()); - HashMap amqpParams = new HashMap<>(); configureAMQPAcceptorParameters(amqpParams); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index 5c16dfb3f0..467ae50337 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -16,19 +16,6 @@ */ package org.apache.activemq.artemis.tests.integration.amqp; -import static org.apache.activemq.transport.amqp.AmqpSupport.contains; - -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import javax.jms.Topic; - import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.Queue; @@ -49,6 +36,18 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.Topic; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.apache.activemq.transport.amqp.AmqpSupport.contains; + /** * Test basic send and receive scenarios using only AMQP sender and receiver links. */ diff --git a/tests/unit-tests/pom.xml b/tests/unit-tests/pom.xml index 6de60d0906..d30cfec510 100644 --- a/tests/unit-tests/pom.xml +++ b/tests/unit-tests/pom.xml @@ -111,6 +111,18 @@ ${project.version} test + + org.apache.activemq + artemis-junit + 2.4.0-SNAPSHOT + test + + + org.apache.activemq.tests + artemis-test-support + 2.4.0-SNAPSHOT + test + diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyHandshakeTimeoutTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyHandshakeTimeoutTest.java new file mode 100644 index 0000000000..309897903b --- /dev/null +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyHandshakeTimeoutTest.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty; + +import io.netty.buffer.ByteBuf; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.junit.Wait; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.transport.amqp.client.transport.NettyTransport; +import org.apache.activemq.transport.amqp.client.transport.NettyTransportFactory; +import org.apache.activemq.transport.amqp.client.transport.NettyTransportListener; +import org.junit.Test; + +import java.net.URI; +import java.util.HashMap; +import java.util.concurrent.TimeUnit; + +public class NettyHandshakeTimeoutTest extends ActiveMQTestBase { + + protected ActiveMQServer server; + private Configuration conf; + + @Test + public void testHandshakeTimeout() throws Exception { + int handshakeTimeout = 3; + + setUp(); + ActiveMQTestBase.checkFreePort(TransportConstants.DEFAULT_PORT); + HashMap params = new HashMap<>(); + params.put(TransportConstants.HANDSHAKE_TIMEOUT, handshakeTimeout); + + conf = createDefaultInVMConfig().clearAcceptorConfigurations().addAcceptorConfiguration(new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params)); + server = addServer(ActiveMQServers.newActiveMQServer(conf, false)); + server.start(); + NettyTransport transport = NettyTransportFactory.createTransport(new URI("tcp://127.0.0.1:61616")); + transport.setTransportListener(new NettyTransportListener() { + @Override + public void onData(ByteBuf incoming) { + + } + + @Override + public void onTransportClosed() { + } + + @Override + public void onTransportError(Throwable cause) { + } + + }); + + try { + transport.connect(); + assertTrue("Connection should be closed now", Wait.waitFor(() -> !transport.isConnected(), TimeUnit.SECONDS.toMillis(handshakeTimeout + 1))); + } finally { + transport.close(); + tearDown(); + } + } +}