diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index e677e1d9d5..4fb9066187 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -98,6 +98,7 @@ import io.netty.handler.proxy.ProxyHandler; import io.netty.handler.proxy.Socks4ProxyHandler; import io.netty.handler.proxy.Socks5ProxyHandler; import io.netty.handler.ssl.SslHandler; +import io.netty.resolver.NoopAddressResolverGroup; import io.netty.util.AttributeKey; import io.netty.util.ResourceLeakDetector; import io.netty.util.ResourceLeakDetector.Level; @@ -210,6 +211,8 @@ public class NettyConnector extends AbstractConnector { private String proxyPassword; + private boolean proxyRemoteDNS; + private boolean useServlet; private String host; @@ -354,6 +357,8 @@ public class NettyConnector extends AbstractConnector { proxyUsername = ConfigurationHelper.getStringProperty(TransportConstants.PROXY_USERNAME_PROP_NAME, TransportConstants.DEFAULT_PROXY_USERNAME, configuration); proxyPassword = ConfigurationHelper.getStringProperty(TransportConstants.PROXY_PASSWORD_PROP_NAME, TransportConstants.DEFAULT_PROXY_PASSWORD, configuration); + + proxyRemoteDNS = ConfigurationHelper.getBooleanProperty(TransportConstants.PROXY_REMOTE_DNS_PROP_NAME, TransportConstants.DEFAULT_PROXY_REMOTE_DNS, configuration); } remotingThreads = ConfigurationHelper.getIntProperty(TransportConstants.NIO_REMOTING_THREADS_PROPNAME, -1, configuration); @@ -564,7 +569,7 @@ public class NettyConnector extends AbstractConnector { public void initChannel(Channel channel) throws Exception { final ChannelPipeline pipeline = channel.pipeline(); - if (proxyEnabled && !isTargetLocalHost()) { + if (proxyEnabled && (proxyRemoteDNS || !isTargetLocalHost())) { InetSocketAddress proxyAddress = new InetSocketAddress(proxyHost, proxyPort); ProxyHandler proxyHandler; switch (proxyVersion) { @@ -581,6 +586,10 @@ public class NettyConnector extends AbstractConnector { channel.pipeline().addLast(proxyHandler); logger.debug("Using a SOCKS proxy at " + proxyHost + ":" + proxyPort); + + if (proxyRemoteDNS) { + bootstrap.resolver(NoopAddressResolverGroup.INSTANCE); + } } if (sslEnabled && !useServlet) { @@ -800,7 +809,12 @@ public class NettyConnector extends AbstractConnector { return null; } - InetSocketAddress remoteDestination = new InetSocketAddress(IPV6Util.stripBracketsAndZoneID(host), port); + InetSocketAddress remoteDestination; + if (proxyEnabled && proxyRemoteDNS) { + remoteDestination = InetSocketAddress.createUnresolved(IPV6Util.stripBracketsAndZoneID(host), port); + } else { + remoteDestination = new InetSocketAddress(IPV6Util.stripBracketsAndZoneID(host), port); + } logger.debug("Remote destination: " + remoteDestination); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index 728d6b9427..018b2bad93 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -181,6 +181,8 @@ public class TransportConstants { public static final String PROXY_PASSWORD_PROP_NAME = "socksPassword"; + public static final String PROXY_REMOTE_DNS_PROP_NAME = "socksRemoteDNS"; + public static final boolean DEFAULT_SSL_ENABLED = false; public static final String DEFAULT_SSL_KRB5_CONFIG = null; @@ -340,6 +342,8 @@ public class TransportConstants { public static final String DEFAULT_PROXY_PASSWORD = null; + public static final boolean DEFAULT_PROXY_REMOTE_DNS = false; + private static int parseDefaultVariable(String variableName, int defaultValue) { try { String variable = System.getProperty(TransportConstants.class.getName() + "." + variableName); @@ -461,6 +465,7 @@ public class TransportConstants { allowableConnectorKeys.add(TransportConstants.PROXY_VERSION_PROP_NAME); allowableConnectorKeys.add(TransportConstants.PROXY_USERNAME_PROP_NAME); allowableConnectorKeys.add(TransportConstants.PROXY_PASSWORD_PROP_NAME); + allowableConnectorKeys.add(TransportConstants.PROXY_REMOTE_DNS_PROP_NAME); allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropMaskPassword()); allowableConnectorKeys.add(ActiveMQDefaultConfiguration.getPropPasswordCodec()); allowableConnectorKeys.add(TransportConstants.NETTY_CONNECT_TIMEOUT); diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java index 67eed00776..6f4a8a2c8d 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java @@ -16,10 +16,6 @@ */ package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty; -import java.net.InetAddress; -import java.net.NetworkInterface; -import java.net.SocketException; -import java.util.Enumeration; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -27,7 +23,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import io.netty.channel.ChannelPipeline; -import io.netty.handler.proxy.Socks5ProxyHandler; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.TransportConfiguration; @@ -44,7 +39,6 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.junit.Assert; -import org.junit.Assume; import org.junit.Before; import org.junit.Test; @@ -399,114 +393,4 @@ public class NettyConnectorTest extends ActiveMQTestBase { scheduledThreadPool.shutdownNow(); } } - - @Test - public void testSocksProxyHandlerAdded() throws Exception { - InetAddress address = getNonLoopbackAddress(); - Assume.assumeTrue("Cannot find non-loopback address", address != null); - - BufferHandler handler = (connectionID, buffer) -> { - }; - Map params = new HashMap<>(); - - params.put(TransportConstants.HOST_PROP_NAME, address.getHostAddress()); - params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true); - params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost"); - - ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() { - @Override - public void connectionException(final Object connectionID, final ActiveMQException me) { - } - - @Override - public void connectionDestroyed(final Object connectionID) { - } - - @Override - public void connectionCreated(final ActiveMQComponent component, - final Connection connection, - final ClientProtocolManager protocol) { - } - - @Override - public void connectionReadyForWrites(Object connectionID, boolean ready) { - } - }; - - NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory())); - - connector.start(); - Assert.assertTrue(connector.isStarted()); - - ChannelPipeline pipeline = connector.getBootStrap().register().await().channel().pipeline(); - Assert.assertNotNull(pipeline.get(Socks5ProxyHandler.class)); - - connector.close(); - Assert.assertFalse(connector.isStarted()); - } - - private InetAddress getNonLoopbackAddress() throws SocketException { - Enumeration n = NetworkInterface.getNetworkInterfaces(); - InetAddress addr = null; - for (; n.hasMoreElements(); ) { - NetworkInterface e = n.nextElement(); - Enumeration a = e.getInetAddresses(); - boolean found = false; - for (; a.hasMoreElements(); ) { - addr = a.nextElement(); - if (!addr.isLoopbackAddress()) { - found = true; - break; - } - } - if (found) { - break; - } - } - return addr; - } - - @Test - public void testSocksProxyHandlerNotAddedForLocalhost() throws Exception { - BufferHandler handler = new BufferHandler() { - @Override - public void bufferReceived(final Object connectionID, final ActiveMQBuffer buffer) { - } - }; - Map params = new HashMap<>(); - params.put(TransportConstants.HOST_PROP_NAME, "localhost"); - params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true); - params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost"); - - ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() { - @Override - public void connectionException(final Object connectionID, final ActiveMQException me) { - } - - @Override - public void connectionDestroyed(final Object connectionID) { - } - - @Override - public void connectionCreated(final ActiveMQComponent component, - final Connection connection, - final ClientProtocolManager protocol) { - } - - @Override - public void connectionReadyForWrites(Object connectionID, boolean ready) { - } - }; - - NettyConnector connector = new NettyConnector(params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory())); - - connector.start(); - Assert.assertTrue(connector.isStarted()); - - ChannelPipeline pipeline = connector.getBootStrap().register().await().channel().pipeline(); - Assert.assertNull(pipeline.get(Socks5ProxyHandler.class)); - - connector.close(); - Assert.assertFalse(connector.isStarted()); - } } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java new file mode 100644 index 0000000000..a0ce33ae0a --- /dev/null +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/SocksProxyTest.java @@ -0,0 +1,283 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.unit.core.remoting.impl.netty; + +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.NetworkInterface; +import java.net.SocketException; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.proxy.Socks5ProxyHandler; +import io.netty.resolver.AddressResolverGroup; +import io.netty.resolver.NoopAddressResolverGroup; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.ActiveMQComponent; +import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; +import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener; +import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; +import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; +import org.junit.After; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; + +public class SocksProxyTest extends ActiveMQTestBase { + + private static final int SOCKS_PORT = 1080; + + private ExecutorService closeExecutor; + private ExecutorService threadPool; + private ScheduledExecutorService scheduledThreadPool; + + private NioEventLoopGroup bossGroup; + private NioEventLoopGroup workerGroup; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + closeExecutor = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()); + threadPool = Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()); + scheduledThreadPool = Executors.newScheduledThreadPool(5, ActiveMQThreadFactory.defaultThreadFactory()); + + startSocksProxy(); + } + + @Override + @After + public void tearDown() throws Exception { + closeExecutor.shutdownNow(); + threadPool.shutdownNow(); + scheduledThreadPool.shutdownNow(); + + stopSocksProxy(); + + super.tearDown(); + } + + @Test + public void testSocksProxyHandlerAdded() throws Exception { + InetAddress address = getNonLoopbackAddress(); + Assume.assumeTrue("Cannot find non-loopback address", address != null); + + BufferHandler handler = (connectionID, buffer) -> { + }; + + Map params = new HashMap<>(); + params.put(TransportConstants.HOST_PROP_NAME, address.getHostAddress()); + params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true); + params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost"); + + ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() { + @Override + public void connectionException(final Object connectionID, final ActiveMQException me) { + } + + @Override + public void connectionDestroyed(final Object connectionID) { + } + + @Override + public void connectionCreated(final ActiveMQComponent component, + final Connection connection, + final ClientProtocolManager protocol) { + } + + @Override + public void connectionReadyForWrites(Object connectionID, boolean ready) { + } + }; + + NettyConnector connector = new NettyConnector(params, handler, listener, closeExecutor, threadPool, scheduledThreadPool); + + connector.start(); + Assert.assertTrue(connector.isStarted()); + + ChannelPipeline pipeline = connector.getBootStrap().register().await().channel().pipeline(); + Assert.assertNotNull(pipeline.get(Socks5ProxyHandler.class)); + + connector.close(); + Assert.assertFalse(connector.isStarted()); + } + + private InetAddress getNonLoopbackAddress() throws SocketException { + Enumeration n = NetworkInterface.getNetworkInterfaces(); + InetAddress addr = null; + for (; n.hasMoreElements(); ) { + NetworkInterface e = n.nextElement(); + Enumeration a = e.getInetAddresses(); + boolean found = false; + for (; a.hasMoreElements(); ) { + addr = a.nextElement(); + if (!addr.isLoopbackAddress()) { + found = true; + break; + } + } + if (found) { + break; + } + } + return addr; + } + + @Test + public void testSocksProxyHandlerNotAddedForLocalhost() throws Exception { + BufferHandler handler = (connectionID, buffer) -> { + }; + + Map params = new HashMap<>(); + params.put(TransportConstants.HOST_PROP_NAME, "localhost"); + params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true); + params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost"); + + ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() { + @Override + public void connectionException(final Object connectionID, final ActiveMQException me) { + } + + @Override + public void connectionDestroyed(final Object connectionID) { + } + + @Override + public void connectionCreated(final ActiveMQComponent component, + final Connection connection, + final ClientProtocolManager protocol) { + } + + @Override + public void connectionReadyForWrites(Object connectionID, boolean ready) { + } + }; + + NettyConnector connector = new NettyConnector(params, handler, listener, closeExecutor, threadPool, scheduledThreadPool); + + connector.start(); + Assert.assertTrue(connector.isStarted()); + + ChannelPipeline pipeline = connector.getBootStrap().register().await().channel().pipeline(); + Assert.assertNull(pipeline.get(Socks5ProxyHandler.class)); + + connector.close(); + Assert.assertFalse(connector.isStarted()); + } + + @Test + public void testSocks5hSupport() throws Exception { + BufferHandler handler = (connectionID, buffer) -> { + }; + Map params = new HashMap<>(); + + params.put(TransportConstants.HOST_PROP_NAME, "only-resolvable-on-proxy"); + params.put(TransportConstants.PROXY_ENABLED_PROP_NAME, true); + params.put(TransportConstants.PROXY_HOST_PROP_NAME, "localhost"); + params.put(TransportConstants.PROXY_PORT_PROP_NAME, SOCKS_PORT); + params.put(TransportConstants.PROXY_REMOTE_DNS_PROP_NAME, true); + + ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() { + @Override + public void connectionException(final Object connectionID, final ActiveMQException me) { + } + + @Override + public void connectionDestroyed(final Object connectionID) { + } + + @Override + public void connectionCreated(final ActiveMQComponent component, + final Connection connection, + final ClientProtocolManager protocol) { + } + + @Override + public void connectionReadyForWrites(Object connectionID, boolean ready) { + } + }; + + NettyConnector connector = new NettyConnector(params, handler, listener, closeExecutor, threadPool, scheduledThreadPool); + + connector.start(); + Assert.assertTrue(connector.isStarted()); + + connector.getBootStrap().register().await().channel().pipeline(); + + AddressResolverGroup resolver = connector.getBootStrap().config().resolver(); + Assert.assertSame(resolver, NoopAddressResolverGroup.INSTANCE); + + Connection connection = connector.createConnection(future -> { + future.awaitUninterruptibly(); + Assert.assertTrue(future.isSuccess()); + + Socks5ProxyHandler socks5Handler = future.channel().pipeline().get(Socks5ProxyHandler.class); + Assert.assertNotNull(socks5Handler); + + InetSocketAddress remoteAddress = (InetSocketAddress)socks5Handler.destinationAddress(); + Assert.assertTrue(remoteAddress.isUnresolved()); + }); + Assert.assertNotNull(connection); + + Assert.assertTrue(connection.isOpen()); + connection.close(); + Assert.assertFalse(connection.isOpen()); + + connector.close(); + Assert.assertFalse(connector.isStarted()); + } + + private void startSocksProxy() throws Exception { + bossGroup = new NioEventLoopGroup(); + workerGroup = new NioEventLoopGroup(); + + ServerBootstrap b = new ServerBootstrap(); + b.group(bossGroup, workerGroup); + b.channel(NioServerSocketChannel.class); + b.childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + // We can further configure SOCKS, but have to assume Netty is doing the right thing, + // we just need something listening on the port to make the initial connection + } + }); + + b.bind(SOCKS_PORT).sync(); + } + + private void stopSocksProxy() { + bossGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS); + workerGroup.shutdownGracefully(0, 0, TimeUnit.SECONDS); + } +}