diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java index cc4407c88e..feb19f5275 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/ActiveMQChannelHandler.java @@ -37,13 +37,13 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler { private final BufferHandler handler; - private final BaseConnectionLifeCycleListener listener; + private final BaseConnectionLifeCycleListener listener; volatile boolean active; protected ActiveMQChannelHandler(final ChannelGroup group, final BufferHandler handler, - final BaseConnectionLifeCycleListener listener) { + final BaseConnectionLifeCycleListener listener) { this.group = group; this.handler = handler; this.listener = listener; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java index 2181643de7..2c03ed6eb9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnection.java @@ -53,7 +53,7 @@ public class NettyConnection implements Connection { private static final int DEFAULT_WAIT_MILLIS = 10_000; protected final Channel channel; - private final BaseConnectionLifeCycleListener listener; + private final BaseConnectionLifeCycleListener listener; private final boolean directDeliver; private final Map configuration; /** @@ -81,7 +81,7 @@ public class NettyConnection implements Connection { public NettyConnection(final Map configuration, final Channel channel, - final BaseConnectionLifeCycleListener listener, + final BaseConnectionLifeCycleListener listener, boolean batchingEnabled, boolean directDeliver) { this.configuration = configuration; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index ebe97ecdf6..b856421681 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -16,9 +16,6 @@ */ package org.apache.activemq.artemis.core.remoting.impl.netty; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLParameters; import java.io.IOException; import java.net.ConnectException; import java.net.Inet6Address; @@ -29,14 +26,12 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; import java.nio.charset.StandardCharsets; -import java.security.AccessController; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.security.PrivilegedAction; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -45,6 +40,10 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLParameters; + import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -66,14 +65,12 @@ import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.base64.Base64; -import io.netty.handler.codec.http.Cookie; -import io.netty.handler.codec.http.CookieDecoder; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpClientCodec; -import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpObject; import io.netty.handler.codec.http.HttpObjectAggregator; @@ -84,17 +81,19 @@ import io.netty.handler.codec.http.HttpResponseDecoder; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; -import io.netty.handler.codec.http.cookie.ClientCookieEncoder; +import io.netty.handler.codec.http.cookie.ClientCookieDecoder; +import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.ssl.SslHandler; import io.netty.util.AttributeKey; import io.netty.util.ResourceLeakDetector; +import io.netty.util.ResourceLeakDetector.Level; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GlobalEventExecutor; + import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; -import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager; import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport; import org.apache.activemq.artemis.core.server.ActiveMQComponent; @@ -142,7 +141,7 @@ public class NettyConnector extends AbstractConnector { static { // Disable resource leak detection for performance reasons by default - ResourceLeakDetector.setEnabled(false); + ResourceLeakDetector.setLevel(Level.DISABLED); // Set default Configuration Map config = new HashMap<>(); @@ -161,7 +160,7 @@ public class NettyConnector extends AbstractConnector { private final BufferHandler handler; - private final BaseConnectionLifeCycleListener listener; + private final BaseConnectionLifeCycleListener listener; private boolean sslEnabled = TransportConstants.DEFAULT_SSL_ENABLED; @@ -251,7 +250,7 @@ public class NettyConnector extends AbstractConnector { // Public -------------------------------------------------------- public NettyConnector(final Map configuration, final BufferHandler handler, - final BaseConnectionLifeCycleListener listener, + final BaseConnectionLifeCycleListener listener, final Executor closeExecutor, final Executor threadPool, final ScheduledExecutorService scheduledThreadPool) { @@ -260,7 +259,7 @@ public class NettyConnector extends AbstractConnector { public NettyConnector(final Map configuration, final BufferHandler handler, - final BaseConnectionLifeCycleListener listener, + final BaseConnectionLifeCycleListener listener, final Executor closeExecutor, final Executor threadPool, final ScheduledExecutorService scheduledThreadPool, @@ -697,9 +696,9 @@ public class NettyConnector extends AbstractConnector { } URI uri = new URI(scheme, null, IPV6Util.encloseHost(host), port, null, null, null); HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath()); - request.headers().set(HttpHeaders.Names.HOST, host); - request.headers().set(HttpHeaders.Names.UPGRADE, ACTIVEMQ_REMOTING); - request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.UPGRADE); + request.headers().set(HttpHeaderNames.HOST, host); + request.headers().set(HttpHeaderNames.UPGRADE, ACTIVEMQ_REMOTING); + request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderNames.UPGRADE); final String serverName = ConfigurationHelper.getStringProperty(TransportConstants.ACTIVEMQ_SERVER_NAME, null, configuration); if (serverName != null) { request.headers().set(TransportConstants.ACTIVEMQ_SERVER_NAME, serverName); @@ -799,7 +798,7 @@ public class NettyConnector extends AbstractConnector { } if (msg instanceof HttpResponse) { HttpResponse response = (HttpResponse) msg; - if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaders.Names.UPGRADE).equals(ACTIVEMQ_REMOTING)) { + if (response.status().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaderNames.UPGRADE).equals(ACTIVEMQ_REMOTING)) { String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT); String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get()); @@ -894,10 +893,12 @@ public class NettyConnector extends AbstractConnector { public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception { FullHttpResponse response = (FullHttpResponse) msg; if (httpRequiresSessionId && !active) { - Set cookieMap = CookieDecoder.decode(response.headers().get(HttpHeaders.Names.SET_COOKIE)); - for (Cookie cookie : cookieMap) { - if (cookie.getName().equals("JSESSIONID")) { - this.cookie = ClientCookieEncoder.LAX.encode(cookie); + final List setCookieHeaderValues = response.headers().getAll(HttpHeaderNames.SET_COOKIE); + for (String setCookieHeaderValue : setCookieHeaderValues) { + final Cookie cookie = ClientCookieDecoder.LAX.decode(setCookieHeaderValue); + if ("JSESSIONID".equals(cookie.name())) { + this.cookie = setCookieHeaderValue; + break; } } active = true; @@ -922,11 +923,11 @@ public class NettyConnector extends AbstractConnector { ByteBuf buf = (ByteBuf) msg; FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, url, buf); - httpRequest.headers().add(HttpHeaders.Names.HOST, NettyConnector.this.host); + httpRequest.headers().add(HttpHeaderNames.HOST, NettyConnector.this.host); if (cookie != null) { - httpRequest.headers().add(HttpHeaders.Names.COOKIE, cookie); + httpRequest.headers().add(HttpHeaderNames.COOKIE, cookie); } - httpRequest.headers().add(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buf.readableBytes())); + httpRequest.headers().add(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(buf.readableBytes())); ctx.write(httpRequest, promise); lastSendTime = System.currentTimeMillis(); } else { @@ -949,7 +950,7 @@ public class NettyConnector extends AbstractConnector { if (!waitingGet && System.currentTimeMillis() > lastSendTime + httpMaxClientIdleTime) { FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url); - httpRequest.headers().add(HttpHeaders.Names.HOST, NettyConnector.this.host); + httpRequest.headers().add(HttpHeaderNames.HOST, NettyConnector.this.host); waitingGet = true; channel.writeAndFlush(httpRequest); } @@ -978,7 +979,9 @@ public class NettyConnector extends AbstractConnector { if (connections.putIfAbsent(connection.getID(), connection) != null) { throw ActiveMQClientMessageBundle.BUNDLE.connectionExists(connection.getID()); } - listener.connectionCreated(component, connection, protocol); + @SuppressWarnings("unchecked") + final BaseConnectionLifeCycleListener clientListener = (BaseConnectionLifeCycleListener) listener; + clientListener.connectionCreated(component, connection, protocol); } @Override @@ -1093,16 +1096,6 @@ public class NettyConnector extends AbstractConnector { SharedEventLoopGroup.forceShutdown(); } - private static ClassLoader getThisClassLoader() { - return AccessController.doPrivileged(new PrivilegedAction() { - @Override - public ClassLoader run() { - return ClientSessionFactoryImpl.class.getClassLoader(); - } - }); - - } - private static String base64(byte[] data) { ByteBuf encodedData = Unpooled.wrappedBuffer(data); ByteBuf encoded = Base64.encode(encodedData); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java index 0af54de134..a6f935c1fe 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java @@ -30,6 +30,7 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.ImmediateEventExecutor; import io.netty.util.concurrent.Promise; + import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; @@ -47,7 +48,7 @@ public class SharedEventLoopGroup extends DelegatingEventLoopGroup { public static synchronized void forceShutdown() { if (instance != null) { - instance.shutdown(); + instance.forEach(executor -> executor.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS)); instance.channelFactoryCount.set(0); instance = null; } @@ -55,7 +56,7 @@ public class SharedEventLoopGroup extends DelegatingEventLoopGroup { public static synchronized SharedEventLoopGroup getInstance(Function eventLoopGroupSupplier) { if (instance != null) { - ScheduledFuture f = instance.shutdown.getAndSet(null); + ScheduledFuture f = instance.shutdown.getAndSet(null); if (f != null) { f.cancel(false); } @@ -92,7 +93,7 @@ public class SharedEventLoopGroup extends DelegatingEventLoopGroup { Future future = SharedEventLoopGroup.super.shutdownGracefully(l, l2, timeUnit); future.addListener(new FutureListener() { @Override - public void operationComplete(Future future) throws Exception { + public void operationComplete(Future future) throws Exception { if (future.isSuccess()) { terminationPromise.setSuccess(null); } else { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java index f99b25a32a..3801e7e1c8 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java @@ -28,8 +28,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.core.server.ActiveMQComponent; +import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener; +import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.junit.Assert; import org.junit.Test; @@ -76,12 +77,12 @@ public class NettyConnectionTest extends ActiveMQTestBase { return new EmbeddedChannel(new ChannelInboundHandlerAdapter()); } - class MyListener implements ConnectionLifeCycleListener { + class MyListener implements ClientConnectionLifeCycleListener { @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, - final String protocol) { + final ClientProtocolManager protocol) { } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java index 34b9abb820..5b60ef0777 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyConnectorTest.java @@ -26,8 +26,9 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; +import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener; +import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.ActiveMQThreadFactory; import org.junit.Assert; @@ -43,7 +44,7 @@ public class NettyConnectorTest extends ActiveMQTestBase { } }; Map params = new HashMap<>(); - ConnectionLifeCycleListener listener = new ConnectionLifeCycleListener() { + ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() { @Override public void connectionException(final Object connectionID, final ActiveMQException me) { } @@ -55,7 +56,7 @@ public class NettyConnectorTest extends ActiveMQTestBase { @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, - final String protocol) { + final ClientProtocolManager protocol) { } @Override @@ -79,7 +80,7 @@ public class NettyConnectorTest extends ActiveMQTestBase { } }; Map params = new HashMap<>(); - ConnectionLifeCycleListener listener = new ConnectionLifeCycleListener() { + ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() { @Override public void connectionException(final Object connectionID, final ActiveMQException me) { } @@ -91,7 +92,7 @@ public class NettyConnectorTest extends ActiveMQTestBase { @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, - final String protocol) { + final ClientProtocolManager protocol) { } @Override @@ -129,7 +130,7 @@ public class NettyConnectorTest extends ActiveMQTestBase { params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "bad password"); params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "bad path"); params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "bad password"); - ConnectionLifeCycleListener listener = new ConnectionLifeCycleListener() { + ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() { @Override public void connectionException(final Object connectionID, final ActiveMQException me) { } @@ -141,7 +142,7 @@ public class NettyConnectorTest extends ActiveMQTestBase { @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, - final String protocol) { + final ClientProtocolManager protocol) { } @Override @@ -175,7 +176,7 @@ public class NettyConnectorTest extends ActiveMQTestBase { params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "bad password"); params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "bad path"); params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "bad password"); - ConnectionLifeCycleListener listener = new ConnectionLifeCycleListener() { + ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() { @Override public void connectionException(final Object connectionID, final ActiveMQException me) { } @@ -187,7 +188,7 @@ public class NettyConnectorTest extends ActiveMQTestBase { @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, - final String protocol) { + final ClientProtocolManager protocol) { } @Override @@ -223,7 +224,7 @@ public class NettyConnectorTest extends ActiveMQTestBase { Map params = new HashMap<>(); params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true); params.put(TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME, "myBadCipherSuite"); - ConnectionLifeCycleListener listener = new ConnectionLifeCycleListener() { + ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() { @Override public void connectionException(final Object connectionID, final ActiveMQException me) { } @@ -235,7 +236,7 @@ public class NettyConnectorTest extends ActiveMQTestBase { @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, - final String protocol) { + final ClientProtocolManager protocol) { } @Override @@ -262,7 +263,7 @@ public class NettyConnectorTest extends ActiveMQTestBase { Map params = new HashMap<>(); params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true); params.put(TransportConstants.ENABLED_PROTOCOLS_PROP_NAME, "myBadProtocol"); - ConnectionLifeCycleListener listener = new ConnectionLifeCycleListener() { + ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() { @Override public void connectionException(final Object connectionID, final ActiveMQException me) { } @@ -274,7 +275,7 @@ public class NettyConnectorTest extends ActiveMQTestBase { @Override public void connectionCreated(final ActiveMQComponent component, final Connection connection, - final String protocol) { + final ClientProtocolManager protocol) { } @Override