Security Netty 4 HTTP server implementation

This commit is a work-in-progress commit on a Netty 4-based HTTP server
implementation.

Original commit: elastic/x-pack-elasticsearch@705a202574
This commit is contained in:
Jason Tedor 2016-08-02 14:11:34 -04:00
parent af16eec512
commit 8fa06fbab7
16 changed files with 320 additions and 15 deletions

View File

@ -12,6 +12,7 @@ import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.xpack.MockNetty3Plugin;
import org.elasticsearch.xpack.MockNetty4Plugin;
import org.elasticsearch.xpack.XPackPlugin;
import java.nio.charset.StandardCharsets;
@ -48,7 +49,7 @@ public class LicenseServiceClusterTests extends AbstractLicensesIntegrationTestC
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(XPackPlugin.class, MockNetty3Plugin.class);
return Arrays.asList(XPackPlugin.class, MockNetty3Plugin.class, MockNetty4Plugin.class);
}
@Override

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.MockNetty3Plugin;
import org.elasticsearch.xpack.MockNetty4Plugin;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.agent.AgentService;
import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase;
@ -52,6 +53,7 @@ public class MonitoringSettingsTests extends MonitoringIntegTestCase {
protected Collection<Class<? extends Plugin>> nodePlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(MockNetty3Plugin.class); // for http
plugins.add(MockNetty4Plugin.class); // for http
return plugins;
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.MockNetty3Plugin;
import org.elasticsearch.xpack.MockNetty4Plugin;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase;
import org.elasticsearch.xpack.security.authc.support.SecuredString;
@ -49,6 +50,7 @@ public class MonitoringSettingsFilterTests extends MonitoringIntegTestCase {
protected Collection<Class<? extends Plugin>> nodePlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(MockNetty3Plugin.class); // for http
plugins.add(MockNetty4Plugin.class); // for http
return plugins;
}
@ -82,4 +84,5 @@ public class MonitoringSettingsFilterTests extends MonitoringIntegTestCase {
private void assertNullSetting(Map<String, Object> settings, String setting) {
assertThat(extractValue(setting, settings), nullValue());
}
}

View File

@ -120,6 +120,7 @@ import org.elasticsearch.xpack.security.transport.SecurityTransportModule;
import org.elasticsearch.xpack.security.transport.filter.IPFilter;
import org.elasticsearch.xpack.security.transport.netty3.SecurityNetty3HttpServerTransport;
import org.elasticsearch.xpack.security.transport.netty3.SecurityNetty3Transport;
import org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4HttpServerTransport;
import org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4Transport;
import org.elasticsearch.xpack.security.user.AnonymousUser;
import org.joda.time.DateTime;
@ -335,8 +336,8 @@ public class Security implements ActionPlugin, IngestPlugin {
public Settings additionalSettings() {
if (enabled == false) {
return Settings.builder()
.put(NetworkModule.HTTP_TYPE_KEY, "netty3")
.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty3")
.put(NetworkModule.HTTP_TYPE_KEY, "netty4")
.put(NetworkModule.TRANSPORT_TYPE_KEY, "netty4")
.build();
}
@ -348,8 +349,9 @@ public class Security implements ActionPlugin, IngestPlugin {
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(NetworkModule.TRANSPORT_TYPE_KEY, Security.NAME + "4");
settingsBuilder.put(NetworkModule.TRANSPORT_SERVICE_TYPE_KEY, Security.NAME);
settingsBuilder.put(NetworkModule.HTTP_TYPE_SETTING.getKey(), Security.NAME);
SecurityNetty3HttpServerTransport.overrideSettings(settingsBuilder, settings);
settingsBuilder.put(NetworkModule.HTTP_TYPE_SETTING.getKey(), Security.NAME + "4");
// nocommit
SecurityNetty4HttpServerTransport.overrideSettings(settingsBuilder, settings);
addUserSettings(settings, settingsBuilder);
addTribeSettings(settings, settingsBuilder);
return settingsBuilder.build();
@ -513,6 +515,7 @@ public class Security implements ActionPlugin, IngestPlugin {
module.registerTransport(Security.NAME + "4", SecurityNetty4Transport.class);
module.registerTransportService(Security.NAME, SecurityServerTransportService.class);
module.registerHttpTransport(Security.NAME, SecurityNetty3HttpServerTransport.class);
module.registerHttpTransport(Security.NAME + "4", SecurityNetty4HttpServerTransport.class);
}
}

View File

@ -0,0 +1,125 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security.transport.netty4;
import org.elasticsearch.common.logging.ESLogger;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.DownstreamMessageEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.handler.ssl.SslHandler;
import java.util.LinkedList;
import java.util.Queue;
/**
* Netty requires that nothing be written to the channel prior to the handshake. Writing before the handshake
* completes, results in odd SSLExceptions being thrown. Channel writes can happen from any thread that
* can access the channel and Netty does not provide a way to ensure the handshake has occurred before the
* application writes to the channel. This handler will queue up writes until the handshake has occurred and
* then will pass the writes through the pipeline. After all writes have been completed, this handler removes
* itself from the pipeline.
*
* NOTE: This class assumes that the transport will not use a closed channel again or attempt to reconnect, which
* is the way that Netty3Transport currently works
*/
public class HandshakeWaitingHandler extends SimpleChannelHandler {
private final ESLogger logger;
private boolean handshaken = false;
private Queue<MessageEvent> pendingWrites = new LinkedList<>();
/**
* @param logger We pass a context aware logger here (logger that is aware of the node name &amp; env)
*/
public HandshakeWaitingHandler(ESLogger logger) {
this.logger = logger;
}
@Override
public void channelConnected(final ChannelHandlerContext ctx, final ChannelStateEvent e) throws Exception {
SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class);
final ChannelFuture handshakeFuture = sslHandler.handshake();
handshakeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (handshakeFuture.isSuccess()) {
logger.debug("SSL/TLS handshake completed for channel");
// We synchronize here to allow all pending writes to be processed prior to any writes coming from
// another thread
synchronized (HandshakeWaitingHandler.this) {
handshaken = true;
while (!pendingWrites.isEmpty()) {
MessageEvent event = pendingWrites.remove();
ctx.sendDownstream(event);
}
ctx.getPipeline().remove(HandshakeWaitingHandler.class);
}
ctx.sendUpstream(e);
} else {
Throwable cause = handshakeFuture.getCause();
if (logger.isDebugEnabled()) {
logger.debug("SSL/TLS handshake failed, closing channel: {}", cause, cause.getMessage());
} else {
logger.error("SSL/TLS handshake failed, closing channel: {}", cause.getMessage());
}
synchronized (HandshakeWaitingHandler.this) {
// Set failure on the futures of each message so that listeners are called
while (!pendingWrites.isEmpty()) {
DownstreamMessageEvent event = (DownstreamMessageEvent) pendingWrites.remove();
event.getFuture().setFailure(cause);
}
// Some writes may be waiting to acquire lock, if so the SetFailureOnAddQueue will set
// failure on their futures
pendingWrites = new SetFailureOnAddQueue(cause);
handshakeFuture.getChannel().close();
}
}
}
});
}
@Override
public synchronized void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
// Writes can come from any thread so we need to ensure that we do not let any through
// until handshake has completed
if (!handshaken) {
pendingWrites.add(e);
return;
}
ctx.sendDownstream(e);
}
synchronized boolean hasPendingWrites() {
return !pendingWrites.isEmpty();
}
private static class SetFailureOnAddQueue extends LinkedList<MessageEvent> {
private final Throwable cause;
SetFailureOnAddQueue(Throwable cause) {
super();
this.cause = cause;
}
@Override
public boolean add(MessageEvent messageEvent) {
DownstreamMessageEvent event = (DownstreamMessageEvent) messageEvent;
event.getFuture().setFailure(cause);
return false;
}
}
}

View File

@ -5,27 +5,28 @@
*/
package org.elasticsearch.xpack.security.transport.netty4;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ipfilter.AbstractRemoteAddressFilter;
import org.elasticsearch.xpack.security.transport.filter.IPFilter;
import java.net.InetSocketAddress;
/**
*
*/
public class IPFilterNetty4Handler extends AbstractRemoteAddressFilter<InetSocketAddress> {
@ChannelHandler.Sharable
class IpFilterRemoteAddressFilter extends AbstractRemoteAddressFilter<InetSocketAddress> {
private final IPFilter filter;
private final String profile;
public IPFilterNetty4Handler(IPFilter filter, String profile) {
IpFilterRemoteAddressFilter(final IPFilter filter, final String profile) {
this.filter = filter;
this.profile = profile;
}
@Override
protected boolean accept(ChannelHandlerContext ctx, InetSocketAddress inetSocketAddress) throws Exception {
return filter.accept(profile, inetSocketAddress.getAddress());
protected boolean accept(final ChannelHandlerContext ctx, final InetSocketAddress remoteAddress) throws Exception {
// at this stage no auth has happened, so we do not have any principal anyway
return filter.accept(profile, remoteAddress.getAddress());
}
}

View File

@ -0,0 +1,136 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security.transport.netty4;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslHandler;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.http.netty4.Netty4HttpServerTransport;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.security.ssl.ServerSSLService;
import org.elasticsearch.xpack.security.transport.SSLClientAuth;
import org.elasticsearch.xpack.security.transport.filter.IPFilter;
import javax.net.ssl.SSLEngine;
import java.util.List;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_COMPRESSION;
import static org.elasticsearch.xpack.security.Security.setting;
import static org.elasticsearch.xpack.security.transport.SSLExceptionHelper.isCloseDuringHandshakeException;
import static org.elasticsearch.xpack.security.transport.SSLExceptionHelper.isNotSslRecordException;
public class SecurityNetty4HttpServerTransport extends Netty4HttpServerTransport {
public static final boolean SSL_DEFAULT = false;
public static final String CLIENT_AUTH_DEFAULT = SSLClientAuth.NO.name();
public static final Setting<Boolean> DEPRECATED_SSL_SETTING =
Setting.boolSetting(setting("http.ssl"), SSL_DEFAULT, Property.NodeScope, Property.Deprecated);
public static final Setting<Boolean> SSL_SETTING =
Setting.boolSetting(setting("http.ssl.enabled"), DEPRECATED_SSL_SETTING, Property.NodeScope);
public static final Setting<SSLClientAuth> CLIENT_AUTH_SETTING =
new Setting<>(setting("http.ssl.client.auth"), CLIENT_AUTH_DEFAULT, SSLClientAuth::parse, Property.NodeScope);
private final IPFilter ipFilter;
private final ServerSSLService sslService;
private final boolean ssl;
private final Settings sslSettings;
@Inject
public SecurityNetty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, IPFilter ipFilter,
ServerSSLService sslService, ThreadPool threadPool) {
super(settings, networkService, bigArrays, threadPool);
this.ipFilter = ipFilter;
this.ssl = SSL_SETTING.get(settings);
this.sslService = sslService;
if (ssl) {
sslSettings = settings.getByPrefix(setting("http.ssl."));
} else {
sslSettings = Settings.EMPTY;
}
}
@Override
protected void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (!lifecycle.started()) {
return;
}
if (isNotSslRecordException(cause)) {
if (logger.isTraceEnabled()) {
logger.trace("received plaintext http traffic on a https channel, closing connection {}", cause, ctx.channel());
} else {
logger.warn("received plaintext http traffic on a https channel, closing connection {}", ctx.channel());
}
ctx.channel().close();
} else if (isCloseDuringHandshakeException(cause)) {
if (logger.isTraceEnabled()) {
logger.trace("connection {} closed during handshake", cause, ctx.channel());
} else {
logger.warn("connection {} closed during handshake", ctx.channel());
}
ctx.channel().close();
} else {
super.exceptionCaught(ctx, cause);
}
}
@Override
protected void doStart() {
super.doStart();
ipFilter.setBoundHttpTransportAddress(this.boundAddress());
}
@Override
public ChannelHandler configureServerChannelHandler() {
return new HttpSslChannelHandler(this);
}
private class HttpSslChannelHandler extends HttpChannelHandler {
private final SSLClientAuth clientAuth;
HttpSslChannelHandler(Netty4HttpServerTransport transport) {
super(transport, detailedErrorsEnabled, threadPool.getThreadContext());
clientAuth = CLIENT_AUTH_SETTING.get(settings);
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
super.initChannel(ch);
if (ssl) {
final SSLEngine engine = sslService.createSSLEngine(sslSettings);
engine.setUseClientMode(false);
clientAuth.configure(engine);
ch.pipeline().addFirst("ssl", new SslHandler(engine));
}
ch.pipeline().addFirst("ip_filter", new IpFilterRemoteAddressFilter(ipFilter, IPFilter.HTTP_PROFILE_NAME));
}
}
public static void addSettings(List<Setting<?>> settings) {
settings.add(SSL_SETTING);
settings.add(CLIENT_AUTH_SETTING);
settings.add(DEPRECATED_SSL_SETTING);
}
public static void overrideSettings(Settings.Builder settingsBuilder, Settings settings) {
if (SSL_SETTING.get(settings) && SETTING_HTTP_COMPRESSION.exists(settings) == false) {
settingsBuilder.put(SETTING_HTTP_COMPRESSION.getKey(), false);
}
}
}

View File

@ -124,7 +124,7 @@ public class SecurityNetty4Transport extends Netty4Transport {
ch.pipeline().addFirst(new SslHandler(serverEngine));
}
if (authenticator != null) {
ch.pipeline().addFirst(new IPFilterNetty4Handler(authenticator, name));
ch.pipeline().addFirst(new IpFilterRemoteAddressFilter(authenticator, name));
}
}
}
@ -180,6 +180,7 @@ public class SecurityNetty4Transport extends Netty4Transport {
return hostname;
}
}
static boolean profileSSL(Settings profileSettings, boolean defaultSSL) {
if (PROFILE_SSL_SETTING.exists(profileSettings)) {
return PROFILE_SSL_SETTING.get(profileSettings);
@ -196,4 +197,5 @@ public class SecurityNetty4Transport extends Netty4Transport {
}
return clientAuth;
}
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.xpack.MockNetty3Plugin;
import org.elasticsearch.xpack.MockNetty4Plugin;
import org.elasticsearch.xpack.security.authc.support.SecuredString;
import org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken;
import org.junit.Before;
@ -138,6 +139,7 @@ public class IndexPrivilegeTests extends AbstractPrivilegeTestCase {
protected Collection<Class<? extends Plugin>> nodePlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(MockNetty3Plugin.class); // for http
plugins.add(MockNetty4Plugin.class); // for http
return plugins;
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.test.SecuritySettingsSource;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.xpack.MockNetty3Plugin;
import org.elasticsearch.xpack.MockNetty4Plugin;
import org.elasticsearch.xpack.XPackTransportClient;
import org.elasticsearch.xpack.security.Security;
import org.elasticsearch.xpack.security.authc.support.UsernamePasswordToken;
@ -102,6 +103,7 @@ public class LicensingTests extends SecurityIntegTestCase {
protected Collection<Class<? extends Plugin>> nodePlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(MockNetty3Plugin.class); // for http
plugins.add(MockNetty4Plugin.class); // for http
return plugins;
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.xpack.MockNetty3Plugin;
import org.elasticsearch.xpack.MockNetty4Plugin;
import org.elasticsearch.xpack.monitoring.Monitoring;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase.Scope;
@ -161,7 +162,7 @@ public class SecuritySettingsSource extends ClusterDiscoveryConfiguration.Unicas
@Override
public Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(xpackPluginClass(), MockNetty3Plugin.class);
return Arrays.asList(xpackPluginClass(), MockNetty3Plugin.class, MockNetty4Plugin.class);
}
@Override

View File

@ -0,0 +1,20 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.transport.Netty3Plugin;
import org.elasticsearch.transport.Netty4Plugin;
public final class MockNetty4Plugin extends Netty4Plugin {
// see Netty4Plugin.... this runs without the permission from the netty4 module so it will fail since reindex can't set the property
// to make it still work we disable that check for pseudo integ tests
public MockNetty4Plugin(Settings settings) {
super(Settings.builder().put(settings).put("netty.assert.buglevel", false).build());
}
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.client.ResponseException;
import org.elasticsearch.common.network.NetworkModule;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.MockNetty3Plugin;
import org.elasticsearch.xpack.MockNetty4Plugin;
import org.elasticsearch.xpack.monitoring.Monitoring;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.security.Security;
@ -51,7 +52,7 @@ public class WatcherPluginDisableTests extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(XPackPlugin.class, MockNetty3Plugin.class);
return Arrays.asList(XPackPlugin.class, MockNetty3Plugin.class, MockNetty4Plugin.class);
}
@Override

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.MockNetty3Plugin;
import org.elasticsearch.xpack.MockNetty4Plugin;
import org.elasticsearch.xpack.watcher.input.http.HttpInput;
import org.elasticsearch.xpack.common.http.HttpRequestTemplate;
import org.elasticsearch.xpack.common.http.auth.basic.BasicAuth;
@ -48,6 +49,7 @@ public class ChainIntegrationTests extends AbstractWatcherIntegrationTestCase {
protected Collection<Class<? extends Plugin>> nodePlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(MockNetty3Plugin.class); // for http
plugins.add(MockNetty4Plugin.class); // for http
return plugins;
}

View File

@ -13,6 +13,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.xpack.MockNetty3Plugin;
import org.elasticsearch.xpack.MockNetty4Plugin;
import org.elasticsearch.xpack.watcher.client.WatcherClient;
import org.elasticsearch.xpack.watcher.condition.compare.CompareCondition;
import org.elasticsearch.xpack.watcher.history.HistoryStore;
@ -55,6 +56,7 @@ public class HttpInputIntegrationTests extends AbstractWatcherIntegrationTestCas
protected Collection<Class<? extends Plugin>> nodePlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(MockNetty3Plugin.class); // for http
plugins.add(MockNetty4Plugin.class); // for http
return plugins;
}

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.xpack.MockNetty3Plugin;
import org.elasticsearch.xpack.MockNetty4Plugin;
import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase;
import org.elasticsearch.xpack.security.authc.support.SecuredString;
import org.elasticsearch.xpack.watcher.test.AbstractWatcherIntegrationTestCase;
@ -55,6 +56,7 @@ public class WatcherSettingsFilterTests extends AbstractWatcherIntegrationTestCa
protected Collection<Class<? extends Plugin>> nodePlugins() {
ArrayList<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
plugins.add(MockNetty3Plugin.class); // for http
plugins.add(MockNetty4Plugin.class); // for http
return plugins;
}