Netty 4 transport working

Original commit: elastic/x-pack-elasticsearch@081e68c087
This commit is contained in:
jaymode 2016-08-02 13:42:48 -04:00
parent b498fd32a2
commit b525891212
5 changed files with 233 additions and 2 deletions

View File

@ -28,6 +28,7 @@ licenseHeaders {
dependencies {
// security deps
compile project(path: ':modules:transport-netty3', configuration: 'runtime')
compile project(path: ':modules:transport-netty4', configuration: 'runtime')
compile 'dk.brics.automaton:automaton:1.11-8'
compile 'com.unboundid:unboundid-ldapsdk:3.1.1'
compile 'org.bouncycastle:bcprov-jdk15on:1.54'

View File

@ -120,6 +120,7 @@ import org.elasticsearch.xpack.security.transport.SecurityTransportModule;
import org.elasticsearch.xpack.security.transport.filter.IPFilter;
import org.elasticsearch.xpack.security.transport.netty3.SecurityNetty3HttpServerTransport;
import org.elasticsearch.xpack.security.transport.netty3.SecurityNetty3Transport;
import org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4Transport;
import org.elasticsearch.xpack.security.user.AnonymousUser;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
@ -345,7 +346,7 @@ public class Security implements ActionPlugin, IngestPlugin {
// pkg private for testing
static Settings additionalSettings(Settings settings) {
Settings.Builder settingsBuilder = Settings.builder();
settingsBuilder.put(NetworkModule.TRANSPORT_TYPE_KEY, Security.NAME);
settingsBuilder.put(NetworkModule.TRANSPORT_TYPE_KEY, Security.NAME + "4");
settingsBuilder.put(NetworkModule.TRANSPORT_SERVICE_TYPE_KEY, Security.NAME);
settingsBuilder.put(NetworkModule.HTTP_TYPE_SETTING.getKey(), Security.NAME);
SecurityNetty3HttpServerTransport.overrideSettings(settingsBuilder, settings);
@ -501,6 +502,7 @@ public class Security implements ActionPlugin, IngestPlugin {
if (transportClientMode) {
if (enabled) {
module.registerTransport(Security.NAME, SecurityNetty3Transport.class);
module.registerTransport(Security.NAME + "4", SecurityNetty4Transport.class);
module.registerTransportService(Security.NAME, SecurityClientTransportService.class);
}
return;
@ -508,6 +510,7 @@ public class Security implements ActionPlugin, IngestPlugin {
if (enabled) {
module.registerTransport(Security.NAME, SecurityNetty3Transport.class);
module.registerTransport(Security.NAME + "4", SecurityNetty4Transport.class);
module.registerTransportService(Security.NAME, SecurityServerTransportService.class);
module.registerHttpTransport(Security.NAME, SecurityNetty3HttpServerTransport.class);
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.xpack.security.transport.netty4.SecurityNetty4Transport;
import java.util.Collections;
import java.util.HashMap;
@ -110,7 +111,7 @@ public class SecurityServerTransportService extends TransportService {
}
protected Map<String, ServerTransportFilter> initializeProfileFilters() {
if (!(transport instanceof SecurityNetty3Transport)) {
if ((transport instanceof SecurityNetty3Transport) == false && (transport instanceof SecurityNetty4Transport) == false) {
return Collections.<String, ServerTransportFilter>singletonMap(TransportSettings.DEFAULT_PROFILE,
new ServerTransportFilter.NodeProfile(authcService, authzService, actionMapper, threadPool.getThreadContext(), false));
}

View File

@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security.transport.netty4;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.ipfilter.AbstractRemoteAddressFilter;
import org.elasticsearch.xpack.security.transport.filter.IPFilter;
import java.net.InetSocketAddress;
/**
*
*/
public class IPFilterNetty4Handler extends AbstractRemoteAddressFilter<InetSocketAddress> {
private final IPFilter filter;
private final String profile;
public IPFilterNetty4Handler(IPFilter filter, String profile) {
this.filter = filter;
this.profile = profile;
}
@Override
protected boolean accept(ChannelHandlerContext ctx, InetSocketAddress inetSocketAddress) throws Exception {
return filter.accept(profile, inetSocketAddress.getAddress());
}
}

View File

@ -0,0 +1,195 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security.transport.netty4;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslHandler;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.netty4.Netty4Transport;
import org.elasticsearch.xpack.security.ssl.ClientSSLService;
import org.elasticsearch.xpack.security.ssl.ServerSSLService;
import org.elasticsearch.xpack.security.transport.SSLClientAuth;
import org.elasticsearch.xpack.security.transport.filter.IPFilter;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.xpack.security.Security.settingPrefix;
import static org.elasticsearch.xpack.security.transport.netty3.SecurityNetty3Transport.CLIENT_AUTH_SETTING;
import static org.elasticsearch.xpack.security.transport.netty3.SecurityNetty3Transport.DEPRECATED_PROFILE_SSL_SETTING;
import static org.elasticsearch.xpack.security.transport.netty3.SecurityNetty3Transport.HOSTNAME_VERIFICATION_RESOLVE_NAME_SETTING;
import static org.elasticsearch.xpack.security.transport.netty3.SecurityNetty3Transport.HOSTNAME_VERIFICATION_SETTING;
import static org.elasticsearch.xpack.security.transport.netty3.SecurityNetty3Transport.PROFILE_CLIENT_AUTH_SETTING;
import static org.elasticsearch.xpack.security.transport.netty3.SecurityNetty3Transport.PROFILE_SSL_SETTING;
import static org.elasticsearch.xpack.security.transport.netty3.SecurityNetty3Transport.SSL_SETTING;
/**
*
*/
public class SecurityNetty4Transport extends Netty4Transport {
private final ServerSSLService serverSslService;
private final ClientSSLService clientSSLService;
@Nullable private final IPFilter authenticator;
private final SSLClientAuth clientAuth;
private final boolean ssl;
@Inject
public SecurityNetty4Transport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService,
@Nullable IPFilter authenticator, @Nullable ServerSSLService serverSSLService,
ClientSSLService clientSSLService) {
super(settings, threadPool, networkService, bigArrays, namedWriteableRegistry, circuitBreakerService);
this.authenticator = authenticator;
this.ssl = SSL_SETTING.get(settings);
this.clientAuth = CLIENT_AUTH_SETTING.get(settings);
this.serverSslService = serverSSLService;
this.clientSSLService = clientSSLService;
}
@Override
protected void doStart() {
super.doStart();
if (authenticator != null) {
authenticator.setBoundTransportAddress(boundAddress(), profileBoundAddresses());
}
}
@Override
protected ChannelInitializer<SocketChannel> getServerChannelInitializer(String name, Settings settings) {
return new SecurityServerChannelInitializer(name, settings);
}
@Override
protected ChannelInitializer<SocketChannel> getClientChannelInitializer() {
return new SecurityClientChannelInitializer();
}
@Override
protected void onAfterChannelsConnected(NodeChannels nodeChannels) {
for (Channel channel : nodeChannels.allChannels) {
SslHandler handler = channel.pipeline().get(SslHandler.class);
if (handler != null) {
handler.handshakeFuture().awaitUninterruptibly(30L, TimeUnit.SECONDS);
if (!handler.handshakeFuture().isSuccess()) {
throw new ElasticsearchException("handshake failed for channel [{}]", channel);
}
}
}
}
class SecurityServerChannelInitializer extends ServerChannelInitializer {
private final boolean sslEnabled;
protected SecurityServerChannelInitializer(String name, Settings settings) {
super(name, settings);
this.sslEnabled = profileSSL(settings, ssl);
}
@Override
protected void initChannel(SocketChannel ch) throws Exception {
super.initChannel(ch);
if (sslEnabled) {
Settings securityProfileSettings = settings.getByPrefix(settingPrefix());
SSLEngine serverEngine = serverSslService.createSSLEngine(securityProfileSettings);
serverEngine.setUseClientMode(false);
final SSLClientAuth profileClientAuth = profileClientAuth(settings, clientAuth);
profileClientAuth.configure(serverEngine);
ch.pipeline().addFirst(new SslHandler(serverEngine));
}
if (authenticator != null) {
ch.pipeline().addFirst(new IPFilterNetty4Handler(authenticator, name));
}
}
}
class SecurityClientChannelInitializer extends ClientChannelInitializer {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
super.initChannel(ch);
if (ssl) {
ch.pipeline().addFirst(new ClientSslHandlerInitializer());
}
}
}
private class ClientSslHandlerInitializer extends ChannelOutboundHandlerAdapter {
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
final SSLEngine sslEngine;
if (HOSTNAME_VERIFICATION_SETTING.get(settings)) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) remoteAddress;
sslEngine = clientSSLService.createSSLEngine(Settings.EMPTY, getHostname(inetSocketAddress), inetSocketAddress.getPort());
// By default, a SSLEngine will not perform hostname verification. In order to perform hostname verification
// we need to specify a EndpointIdentificationAlgorithm. We use the HTTPS algorithm to prevent against
// man in the middle attacks for transport connections
SSLParameters parameters = new SSLParameters();
parameters.setEndpointIdentificationAlgorithm("HTTPS");
sslEngine.setSSLParameters(parameters);
} else {
sslEngine = clientSSLService.createSSLEngine();
}
sslEngine.setUseClientMode(true);
ctx.pipeline().replace(this, "ssl", new SslHandler(sslEngine));
super.connect(ctx, remoteAddress, localAddress, promise);
}
@SuppressForbidden(reason = "need to use getHostName to resolve DNS name for SSL connections and hostname verification")
private String getHostname(InetSocketAddress inetSocketAddress) {
String hostname;
if (HOSTNAME_VERIFICATION_RESOLVE_NAME_SETTING.get(settings)) {
hostname = inetSocketAddress.getHostName();
} else {
hostname = inetSocketAddress.getHostString();
}
if (logger.isTraceEnabled()) {
logger.trace("resolved hostname [{}] for address [{}] to be used in ssl hostname verification", hostname,
inetSocketAddress);
}
return hostname;
}
}
static boolean profileSSL(Settings profileSettings, boolean defaultSSL) {
if (PROFILE_SSL_SETTING.exists(profileSettings)) {
return PROFILE_SSL_SETTING.get(profileSettings);
} else if (DEPRECATED_PROFILE_SSL_SETTING.exists(profileSettings)) {
return DEPRECATED_PROFILE_SSL_SETTING.get(profileSettings);
} else {
return defaultSSL;
}
}
static SSLClientAuth profileClientAuth(Settings settings, SSLClientAuth clientAuth) {
if (PROFILE_CLIENT_AUTH_SETTING.exists(settings)) {
return PROFILE_CLIENT_AUTH_SETTING.get(settings);
}
return clientAuth;
}
}