This closes #1220
This commit is contained in:
commit
e22f77d8ca
|
@ -37,13 +37,13 @@ public class ActiveMQChannelHandler extends ChannelDuplexHandler {
|
||||||
|
|
||||||
private final BufferHandler handler;
|
private final BufferHandler handler;
|
||||||
|
|
||||||
private final BaseConnectionLifeCycleListener listener;
|
private final BaseConnectionLifeCycleListener<?> listener;
|
||||||
|
|
||||||
volatile boolean active;
|
volatile boolean active;
|
||||||
|
|
||||||
protected ActiveMQChannelHandler(final ChannelGroup group,
|
protected ActiveMQChannelHandler(final ChannelGroup group,
|
||||||
final BufferHandler handler,
|
final BufferHandler handler,
|
||||||
final BaseConnectionLifeCycleListener listener) {
|
final BaseConnectionLifeCycleListener<?> listener) {
|
||||||
this.group = group;
|
this.group = group;
|
||||||
this.handler = handler;
|
this.handler = handler;
|
||||||
this.listener = listener;
|
this.listener = listener;
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class NettyConnection implements Connection {
|
||||||
private static final int DEFAULT_WAIT_MILLIS = 10_000;
|
private static final int DEFAULT_WAIT_MILLIS = 10_000;
|
||||||
|
|
||||||
protected final Channel channel;
|
protected final Channel channel;
|
||||||
private final BaseConnectionLifeCycleListener listener;
|
private final BaseConnectionLifeCycleListener<?> listener;
|
||||||
private final boolean directDeliver;
|
private final boolean directDeliver;
|
||||||
private final Map<String, Object> configuration;
|
private final Map<String, Object> configuration;
|
||||||
/**
|
/**
|
||||||
|
@ -81,7 +81,7 @@ public class NettyConnection implements Connection {
|
||||||
|
|
||||||
public NettyConnection(final Map<String, Object> configuration,
|
public NettyConnection(final Map<String, Object> configuration,
|
||||||
final Channel channel,
|
final Channel channel,
|
||||||
final BaseConnectionLifeCycleListener listener,
|
final BaseConnectionLifeCycleListener<?> listener,
|
||||||
boolean batchingEnabled,
|
boolean batchingEnabled,
|
||||||
boolean directDeliver) {
|
boolean directDeliver) {
|
||||||
this.configuration = configuration;
|
this.configuration = configuration;
|
||||||
|
|
|
@ -16,9 +16,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.activemq.artemis.core.remoting.impl.netty;
|
package org.apache.activemq.artemis.core.remoting.impl.netty;
|
||||||
|
|
||||||
import javax.net.ssl.SSLContext;
|
|
||||||
import javax.net.ssl.SSLEngine;
|
|
||||||
import javax.net.ssl.SSLParameters;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.net.Inet6Address;
|
import java.net.Inet6Address;
|
||||||
|
@ -29,14 +26,12 @@ import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.security.AccessController;
|
|
||||||
import java.security.MessageDigest;
|
import java.security.MessageDigest;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
import java.security.PrivilegedAction;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
@ -45,6 +40,10 @@ import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.net.ssl.SSLContext;
|
||||||
|
import javax.net.ssl.SSLEngine;
|
||||||
|
import javax.net.ssl.SSLParameters;
|
||||||
|
|
||||||
import io.netty.bootstrap.Bootstrap;
|
import io.netty.bootstrap.Bootstrap;
|
||||||
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBuf;
|
||||||
import io.netty.buffer.Unpooled;
|
import io.netty.buffer.Unpooled;
|
||||||
|
@ -66,14 +65,12 @@ import io.netty.channel.group.DefaultChannelGroup;
|
||||||
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
||||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||||
import io.netty.handler.codec.base64.Base64;
|
import io.netty.handler.codec.base64.Base64;
|
||||||
import io.netty.handler.codec.http.Cookie;
|
|
||||||
import io.netty.handler.codec.http.CookieDecoder;
|
|
||||||
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
import io.netty.handler.codec.http.DefaultFullHttpRequest;
|
||||||
import io.netty.handler.codec.http.DefaultHttpRequest;
|
import io.netty.handler.codec.http.DefaultHttpRequest;
|
||||||
import io.netty.handler.codec.http.FullHttpRequest;
|
import io.netty.handler.codec.http.FullHttpRequest;
|
||||||
import io.netty.handler.codec.http.FullHttpResponse;
|
import io.netty.handler.codec.http.FullHttpResponse;
|
||||||
import io.netty.handler.codec.http.HttpClientCodec;
|
import io.netty.handler.codec.http.HttpClientCodec;
|
||||||
import io.netty.handler.codec.http.HttpHeaders;
|
import io.netty.handler.codec.http.HttpHeaderNames;
|
||||||
import io.netty.handler.codec.http.HttpMethod;
|
import io.netty.handler.codec.http.HttpMethod;
|
||||||
import io.netty.handler.codec.http.HttpObject;
|
import io.netty.handler.codec.http.HttpObject;
|
||||||
import io.netty.handler.codec.http.HttpObjectAggregator;
|
import io.netty.handler.codec.http.HttpObjectAggregator;
|
||||||
|
@ -84,17 +81,19 @@ import io.netty.handler.codec.http.HttpResponseDecoder;
|
||||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||||
import io.netty.handler.codec.http.HttpVersion;
|
import io.netty.handler.codec.http.HttpVersion;
|
||||||
import io.netty.handler.codec.http.LastHttpContent;
|
import io.netty.handler.codec.http.LastHttpContent;
|
||||||
import io.netty.handler.codec.http.cookie.ClientCookieEncoder;
|
import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
|
||||||
|
import io.netty.handler.codec.http.cookie.Cookie;
|
||||||
import io.netty.handler.ssl.SslHandler;
|
import io.netty.handler.ssl.SslHandler;
|
||||||
import io.netty.util.AttributeKey;
|
import io.netty.util.AttributeKey;
|
||||||
import io.netty.util.ResourceLeakDetector;
|
import io.netty.util.ResourceLeakDetector;
|
||||||
|
import io.netty.util.ResourceLeakDetector.Level;
|
||||||
import io.netty.util.concurrent.Future;
|
import io.netty.util.concurrent.Future;
|
||||||
import io.netty.util.concurrent.GlobalEventExecutor;
|
import io.netty.util.concurrent.GlobalEventExecutor;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
|
||||||
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
|
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager;
|
import org.apache.activemq.artemis.core.protocol.core.impl.ActiveMQClientProtocolManager;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport;
|
import org.apache.activemq.artemis.core.remoting.impl.ssl.SSLSupport;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
|
@ -142,7 +141,7 @@ public class NettyConnector extends AbstractConnector {
|
||||||
|
|
||||||
static {
|
static {
|
||||||
// Disable resource leak detection for performance reasons by default
|
// Disable resource leak detection for performance reasons by default
|
||||||
ResourceLeakDetector.setEnabled(false);
|
ResourceLeakDetector.setLevel(Level.DISABLED);
|
||||||
|
|
||||||
// Set default Configuration
|
// Set default Configuration
|
||||||
Map<String, Object> config = new HashMap<>();
|
Map<String, Object> config = new HashMap<>();
|
||||||
|
@ -161,7 +160,7 @@ public class NettyConnector extends AbstractConnector {
|
||||||
|
|
||||||
private final BufferHandler handler;
|
private final BufferHandler handler;
|
||||||
|
|
||||||
private final BaseConnectionLifeCycleListener listener;
|
private final BaseConnectionLifeCycleListener<?> listener;
|
||||||
|
|
||||||
private boolean sslEnabled = TransportConstants.DEFAULT_SSL_ENABLED;
|
private boolean sslEnabled = TransportConstants.DEFAULT_SSL_ENABLED;
|
||||||
|
|
||||||
|
@ -251,7 +250,7 @@ public class NettyConnector extends AbstractConnector {
|
||||||
// Public --------------------------------------------------------
|
// Public --------------------------------------------------------
|
||||||
public NettyConnector(final Map<String, Object> configuration,
|
public NettyConnector(final Map<String, Object> configuration,
|
||||||
final BufferHandler handler,
|
final BufferHandler handler,
|
||||||
final BaseConnectionLifeCycleListener listener,
|
final BaseConnectionLifeCycleListener<?> listener,
|
||||||
final Executor closeExecutor,
|
final Executor closeExecutor,
|
||||||
final Executor threadPool,
|
final Executor threadPool,
|
||||||
final ScheduledExecutorService scheduledThreadPool) {
|
final ScheduledExecutorService scheduledThreadPool) {
|
||||||
|
@ -260,7 +259,7 @@ public class NettyConnector extends AbstractConnector {
|
||||||
|
|
||||||
public NettyConnector(final Map<String, Object> configuration,
|
public NettyConnector(final Map<String, Object> configuration,
|
||||||
final BufferHandler handler,
|
final BufferHandler handler,
|
||||||
final BaseConnectionLifeCycleListener listener,
|
final BaseConnectionLifeCycleListener<?> listener,
|
||||||
final Executor closeExecutor,
|
final Executor closeExecutor,
|
||||||
final Executor threadPool,
|
final Executor threadPool,
|
||||||
final ScheduledExecutorService scheduledThreadPool,
|
final ScheduledExecutorService scheduledThreadPool,
|
||||||
|
@ -697,9 +696,9 @@ public class NettyConnector extends AbstractConnector {
|
||||||
}
|
}
|
||||||
URI uri = new URI(scheme, null, IPV6Util.encloseHost(host), port, null, null, null);
|
URI uri = new URI(scheme, null, IPV6Util.encloseHost(host), port, null, null, null);
|
||||||
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
|
HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());
|
||||||
request.headers().set(HttpHeaders.Names.HOST, host);
|
request.headers().set(HttpHeaderNames.HOST, host);
|
||||||
request.headers().set(HttpHeaders.Names.UPGRADE, ACTIVEMQ_REMOTING);
|
request.headers().set(HttpHeaderNames.UPGRADE, ACTIVEMQ_REMOTING);
|
||||||
request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.UPGRADE);
|
request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderNames.UPGRADE);
|
||||||
final String serverName = ConfigurationHelper.getStringProperty(TransportConstants.ACTIVEMQ_SERVER_NAME, null, configuration);
|
final String serverName = ConfigurationHelper.getStringProperty(TransportConstants.ACTIVEMQ_SERVER_NAME, null, configuration);
|
||||||
if (serverName != null) {
|
if (serverName != null) {
|
||||||
request.headers().set(TransportConstants.ACTIVEMQ_SERVER_NAME, serverName);
|
request.headers().set(TransportConstants.ACTIVEMQ_SERVER_NAME, serverName);
|
||||||
|
@ -799,7 +798,7 @@ public class NettyConnector extends AbstractConnector {
|
||||||
}
|
}
|
||||||
if (msg instanceof HttpResponse) {
|
if (msg instanceof HttpResponse) {
|
||||||
HttpResponse response = (HttpResponse) msg;
|
HttpResponse response = (HttpResponse) msg;
|
||||||
if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaders.Names.UPGRADE).equals(ACTIVEMQ_REMOTING)) {
|
if (response.status().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaderNames.UPGRADE).equals(ACTIVEMQ_REMOTING)) {
|
||||||
String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT);
|
String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT);
|
||||||
String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get());
|
String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get());
|
||||||
|
|
||||||
|
@ -894,10 +893,12 @@ public class NettyConnector extends AbstractConnector {
|
||||||
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
|
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
|
||||||
FullHttpResponse response = (FullHttpResponse) msg;
|
FullHttpResponse response = (FullHttpResponse) msg;
|
||||||
if (httpRequiresSessionId && !active) {
|
if (httpRequiresSessionId && !active) {
|
||||||
Set<Cookie> cookieMap = CookieDecoder.decode(response.headers().get(HttpHeaders.Names.SET_COOKIE));
|
final List<String> setCookieHeaderValues = response.headers().getAll(HttpHeaderNames.SET_COOKIE);
|
||||||
for (Cookie cookie : cookieMap) {
|
for (String setCookieHeaderValue : setCookieHeaderValues) {
|
||||||
if (cookie.getName().equals("JSESSIONID")) {
|
final Cookie cookie = ClientCookieDecoder.LAX.decode(setCookieHeaderValue);
|
||||||
this.cookie = ClientCookieEncoder.LAX.encode(cookie);
|
if ("JSESSIONID".equals(cookie.name())) {
|
||||||
|
this.cookie = setCookieHeaderValue;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
active = true;
|
active = true;
|
||||||
|
@ -922,11 +923,11 @@ public class NettyConnector extends AbstractConnector {
|
||||||
|
|
||||||
ByteBuf buf = (ByteBuf) msg;
|
ByteBuf buf = (ByteBuf) msg;
|
||||||
FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, url, buf);
|
FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, url, buf);
|
||||||
httpRequest.headers().add(HttpHeaders.Names.HOST, NettyConnector.this.host);
|
httpRequest.headers().add(HttpHeaderNames.HOST, NettyConnector.this.host);
|
||||||
if (cookie != null) {
|
if (cookie != null) {
|
||||||
httpRequest.headers().add(HttpHeaders.Names.COOKIE, cookie);
|
httpRequest.headers().add(HttpHeaderNames.COOKIE, cookie);
|
||||||
}
|
}
|
||||||
httpRequest.headers().add(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buf.readableBytes()));
|
httpRequest.headers().add(HttpHeaderNames.CONTENT_LENGTH, String.valueOf(buf.readableBytes()));
|
||||||
ctx.write(httpRequest, promise);
|
ctx.write(httpRequest, promise);
|
||||||
lastSendTime = System.currentTimeMillis();
|
lastSendTime = System.currentTimeMillis();
|
||||||
} else {
|
} else {
|
||||||
|
@ -949,7 +950,7 @@ public class NettyConnector extends AbstractConnector {
|
||||||
|
|
||||||
if (!waitingGet && System.currentTimeMillis() > lastSendTime + httpMaxClientIdleTime) {
|
if (!waitingGet && System.currentTimeMillis() > lastSendTime + httpMaxClientIdleTime) {
|
||||||
FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
|
FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, url);
|
||||||
httpRequest.headers().add(HttpHeaders.Names.HOST, NettyConnector.this.host);
|
httpRequest.headers().add(HttpHeaderNames.HOST, NettyConnector.this.host);
|
||||||
waitingGet = true;
|
waitingGet = true;
|
||||||
channel.writeAndFlush(httpRequest);
|
channel.writeAndFlush(httpRequest);
|
||||||
}
|
}
|
||||||
|
@ -978,7 +979,9 @@ public class NettyConnector extends AbstractConnector {
|
||||||
if (connections.putIfAbsent(connection.getID(), connection) != null) {
|
if (connections.putIfAbsent(connection.getID(), connection) != null) {
|
||||||
throw ActiveMQClientMessageBundle.BUNDLE.connectionExists(connection.getID());
|
throw ActiveMQClientMessageBundle.BUNDLE.connectionExists(connection.getID());
|
||||||
}
|
}
|
||||||
listener.connectionCreated(component, connection, protocol);
|
@SuppressWarnings("unchecked")
|
||||||
|
final BaseConnectionLifeCycleListener<ClientProtocolManager> clientListener = (BaseConnectionLifeCycleListener<ClientProtocolManager>) listener;
|
||||||
|
clientListener.connectionCreated(component, connection, protocol);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1093,16 +1096,6 @@ public class NettyConnector extends AbstractConnector {
|
||||||
SharedEventLoopGroup.forceShutdown();
|
SharedEventLoopGroup.forceShutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ClassLoader getThisClassLoader() {
|
|
||||||
return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
|
|
||||||
@Override
|
|
||||||
public ClassLoader run() {
|
|
||||||
return ClientSessionFactoryImpl.class.getClassLoader();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String base64(byte[] data) {
|
private static String base64(byte[] data) {
|
||||||
ByteBuf encodedData = Unpooled.wrappedBuffer(data);
|
ByteBuf encodedData = Unpooled.wrappedBuffer(data);
|
||||||
ByteBuf encoded = Base64.encode(encodedData);
|
ByteBuf encoded = Base64.encode(encodedData);
|
||||||
|
|
|
@ -30,6 +30,7 @@ import io.netty.util.concurrent.Future;
|
||||||
import io.netty.util.concurrent.FutureListener;
|
import io.netty.util.concurrent.FutureListener;
|
||||||
import io.netty.util.concurrent.ImmediateEventExecutor;
|
import io.netty.util.concurrent.ImmediateEventExecutor;
|
||||||
import io.netty.util.concurrent.Promise;
|
import io.netty.util.concurrent.Promise;
|
||||||
|
|
||||||
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
|
import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryImpl;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
|
|
||||||
|
@ -47,7 +48,7 @@ public class SharedEventLoopGroup extends DelegatingEventLoopGroup {
|
||||||
|
|
||||||
public static synchronized void forceShutdown() {
|
public static synchronized void forceShutdown() {
|
||||||
if (instance != null) {
|
if (instance != null) {
|
||||||
instance.shutdown();
|
instance.forEach(executor -> executor.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS));
|
||||||
instance.channelFactoryCount.set(0);
|
instance.channelFactoryCount.set(0);
|
||||||
instance = null;
|
instance = null;
|
||||||
}
|
}
|
||||||
|
@ -55,7 +56,7 @@ public class SharedEventLoopGroup extends DelegatingEventLoopGroup {
|
||||||
|
|
||||||
public static synchronized SharedEventLoopGroup getInstance(Function<ThreadFactory, EventLoopGroup> eventLoopGroupSupplier) {
|
public static synchronized SharedEventLoopGroup getInstance(Function<ThreadFactory, EventLoopGroup> eventLoopGroupSupplier) {
|
||||||
if (instance != null) {
|
if (instance != null) {
|
||||||
ScheduledFuture f = instance.shutdown.getAndSet(null);
|
ScheduledFuture<?> f = instance.shutdown.getAndSet(null);
|
||||||
if (f != null) {
|
if (f != null) {
|
||||||
f.cancel(false);
|
f.cancel(false);
|
||||||
}
|
}
|
||||||
|
@ -92,7 +93,7 @@ public class SharedEventLoopGroup extends DelegatingEventLoopGroup {
|
||||||
Future<?> future = SharedEventLoopGroup.super.shutdownGracefully(l, l2, timeUnit);
|
Future<?> future = SharedEventLoopGroup.super.shutdownGracefully(l, l2, timeUnit);
|
||||||
future.addListener(new FutureListener<Object>() {
|
future.addListener(new FutureListener<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public void operationComplete(Future future) throws Exception {
|
public void operationComplete(Future<Object> future) throws Exception {
|
||||||
if (future.isSuccess()) {
|
if (future.isSuccess()) {
|
||||||
terminationPromise.setSuccess(null);
|
terminationPromise.setSuccess(null);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -28,8 +28,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
|
||||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
|
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -76,12 +77,12 @@ public class NettyConnectionTest extends ActiveMQTestBase {
|
||||||
return new EmbeddedChannel(new ChannelInboundHandlerAdapter());
|
return new EmbeddedChannel(new ChannelInboundHandlerAdapter());
|
||||||
}
|
}
|
||||||
|
|
||||||
class MyListener implements ConnectionLifeCycleListener {
|
class MyListener implements ClientConnectionLifeCycleListener {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void connectionCreated(final ActiveMQComponent component,
|
public void connectionCreated(final ActiveMQComponent component,
|
||||||
final Connection connection,
|
final Connection connection,
|
||||||
final String protocol) {
|
final ClientProtocolManager protocol) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,8 +26,9 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
|
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.ClientConnectionLifeCycleListener;
|
||||||
|
import org.apache.activemq.artemis.spi.core.remoting.ClientProtocolManager;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||||
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
|
|
||||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||||
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -43,7 +44,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Map<String, Object> params = new HashMap<>();
|
Map<String, Object> params = new HashMap<>();
|
||||||
ConnectionLifeCycleListener listener = new ConnectionLifeCycleListener() {
|
ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
|
||||||
@Override
|
@Override
|
||||||
public void connectionException(final Object connectionID, final ActiveMQException me) {
|
public void connectionException(final Object connectionID, final ActiveMQException me) {
|
||||||
}
|
}
|
||||||
|
@ -55,7 +56,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
@Override
|
@Override
|
||||||
public void connectionCreated(final ActiveMQComponent component,
|
public void connectionCreated(final ActiveMQComponent component,
|
||||||
final Connection connection,
|
final Connection connection,
|
||||||
final String protocol) {
|
final ClientProtocolManager protocol) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -79,7 +80,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
Map<String, Object> params = new HashMap<>();
|
Map<String, Object> params = new HashMap<>();
|
||||||
ConnectionLifeCycleListener listener = new ConnectionLifeCycleListener() {
|
ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
|
||||||
@Override
|
@Override
|
||||||
public void connectionException(final Object connectionID, final ActiveMQException me) {
|
public void connectionException(final Object connectionID, final ActiveMQException me) {
|
||||||
}
|
}
|
||||||
|
@ -91,7 +92,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
@Override
|
@Override
|
||||||
public void connectionCreated(final ActiveMQComponent component,
|
public void connectionCreated(final ActiveMQComponent component,
|
||||||
final Connection connection,
|
final Connection connection,
|
||||||
final String protocol) {
|
final ClientProtocolManager protocol) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -129,7 +130,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "bad password");
|
params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "bad password");
|
||||||
params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "bad path");
|
params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "bad path");
|
||||||
params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "bad password");
|
params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "bad password");
|
||||||
ConnectionLifeCycleListener listener = new ConnectionLifeCycleListener() {
|
ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
|
||||||
@Override
|
@Override
|
||||||
public void connectionException(final Object connectionID, final ActiveMQException me) {
|
public void connectionException(final Object connectionID, final ActiveMQException me) {
|
||||||
}
|
}
|
||||||
|
@ -141,7 +142,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
@Override
|
@Override
|
||||||
public void connectionCreated(final ActiveMQComponent component,
|
public void connectionCreated(final ActiveMQComponent component,
|
||||||
final Connection connection,
|
final Connection connection,
|
||||||
final String protocol) {
|
final ClientProtocolManager protocol) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -175,7 +176,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "bad password");
|
params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "bad password");
|
||||||
params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "bad path");
|
params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "bad path");
|
||||||
params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "bad password");
|
params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "bad password");
|
||||||
ConnectionLifeCycleListener listener = new ConnectionLifeCycleListener() {
|
ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
|
||||||
@Override
|
@Override
|
||||||
public void connectionException(final Object connectionID, final ActiveMQException me) {
|
public void connectionException(final Object connectionID, final ActiveMQException me) {
|
||||||
}
|
}
|
||||||
|
@ -187,7 +188,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
@Override
|
@Override
|
||||||
public void connectionCreated(final ActiveMQComponent component,
|
public void connectionCreated(final ActiveMQComponent component,
|
||||||
final Connection connection,
|
final Connection connection,
|
||||||
final String protocol) {
|
final ClientProtocolManager protocol) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -223,7 +224,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
Map<String, Object> params = new HashMap<>();
|
Map<String, Object> params = new HashMap<>();
|
||||||
params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
|
params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
|
||||||
params.put(TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME, "myBadCipherSuite");
|
params.put(TransportConstants.ENABLED_CIPHER_SUITES_PROP_NAME, "myBadCipherSuite");
|
||||||
ConnectionLifeCycleListener listener = new ConnectionLifeCycleListener() {
|
ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
|
||||||
@Override
|
@Override
|
||||||
public void connectionException(final Object connectionID, final ActiveMQException me) {
|
public void connectionException(final Object connectionID, final ActiveMQException me) {
|
||||||
}
|
}
|
||||||
|
@ -235,7 +236,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
@Override
|
@Override
|
||||||
public void connectionCreated(final ActiveMQComponent component,
|
public void connectionCreated(final ActiveMQComponent component,
|
||||||
final Connection connection,
|
final Connection connection,
|
||||||
final String protocol) {
|
final ClientProtocolManager protocol) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -262,7 +263,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
Map<String, Object> params = new HashMap<>();
|
Map<String, Object> params = new HashMap<>();
|
||||||
params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
|
params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
|
||||||
params.put(TransportConstants.ENABLED_PROTOCOLS_PROP_NAME, "myBadProtocol");
|
params.put(TransportConstants.ENABLED_PROTOCOLS_PROP_NAME, "myBadProtocol");
|
||||||
ConnectionLifeCycleListener listener = new ConnectionLifeCycleListener() {
|
ClientConnectionLifeCycleListener listener = new ClientConnectionLifeCycleListener() {
|
||||||
@Override
|
@Override
|
||||||
public void connectionException(final Object connectionID, final ActiveMQException me) {
|
public void connectionException(final Object connectionID, final ActiveMQException me) {
|
||||||
}
|
}
|
||||||
|
@ -274,7 +275,7 @@ public class NettyConnectorTest extends ActiveMQTestBase {
|
||||||
@Override
|
@Override
|
||||||
public void connectionCreated(final ActiveMQComponent component,
|
public void connectionCreated(final ActiveMQComponent component,
|
||||||
final Connection connection,
|
final Connection connection,
|
||||||
final String protocol) {
|
final ClientProtocolManager protocol) {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue