Introduce http and tcp server channels (#31446)

Historically in TcpTransport server channels were represented by the
same channel interface as socket channels. This was necessary as
TcpTransport was parameterized by the channel type. This commit
introduces TcpServerChannel and HttpServerChannel classes. Additionally,
it adds the implementations for the various transports. This allows
server channels to have unique functionality and not implement the
methods they do not support (such as send and getRemoteAddress).

Additionally, with the introduction of HttpServerChannel this commit
extracts some of the storing and closing channel work to the abstract
http server transport.
This commit is contained in:
Tim Brooks 2018-06-20 16:34:56 -06:00 committed by GitHub
parent 8bfb9aadd9
commit 9ab1325953
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 501 additions and 314 deletions

View File

@ -29,8 +29,6 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.http.HttpPipelinedRequest;
import org.elasticsearch.transport.netty4.Netty4Utils;
import static org.elasticsearch.http.netty4.Netty4HttpServerTransport.HTTP_CHANNEL_KEY;
@ChannelHandler.Sharable
class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest<FullHttpRequest>> {
@ -42,7 +40,7 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest<FullHttpRequest> msg) throws Exception {
Netty4HttpChannel channel = ctx.channel().attr(HTTP_CHANNEL_KEY).get();
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
FullHttpRequest request = msg.getRequest();
try {
@ -77,12 +75,11 @@ class Netty4HttpRequestHandler extends SimpleChannelInboundHandler<HttpPipelined
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4Utils.maybeDie(cause);
Netty4HttpChannel httpChannel = ctx.channel().attr(HTTP_CHANNEL_KEY).get();
Netty4HttpChannel channel = ctx.channel().attr(Netty4HttpServerTransport.HTTP_CHANNEL_KEY).get();
if (cause instanceof Error) {
serverTransport.onException(httpChannel, new Exception(cause));
serverTransport.onException(channel, new Exception(cause));
} else {
serverTransport.onException(httpChannel, (Exception) cause);
serverTransport.onException(channel, (Exception) cause);
}
}
}

View File

@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.netty4;
import io.netty.channel.Channel;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.net.InetSocketAddress;
public class Netty4HttpServerChannel implements HttpServerChannel {
private final Channel channel;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
Netty4HttpServerChannel(Channel channel) {
this.channel = channel;
this.channel.closeFuture().addListener(f -> {
if (f.isSuccess()) {
closeContext.complete(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
Netty4Utils.maybeDie(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);
}
}
});
}
@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) channel.localAddress();
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
closeContext.addListener(ActionListener.toBiConsumer(listener));
}
@Override
public boolean isOpen() {
return channel.isOpen();
}
@Override
public void close() {
channel.close();
}
@Override
public String toString() {
return "Netty4HttpChannel{localAddress=" + getLocalAddress() + "}";
}
}

View File

@ -23,6 +23,7 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@ -42,22 +43,19 @@ import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AttributeKey;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.AbstractHttpServerTransport;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfig;
import org.elasticsearch.http.netty4.cors.Netty4CorsConfigBuilder;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
@ -65,14 +63,9 @@ import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
@ -154,12 +147,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
private final int pipeliningMaxEvents;
private final boolean tcpNoDelay;
private final boolean tcpKeepAlive;
private final boolean reuseAddress;
private final ByteSizeValue tcpSendBufferSize;
private final ByteSizeValue tcpReceiveBufferSize;
private final RecvByteBufAllocator recvByteBufAllocator;
private final int readTimeoutMillis;
@ -167,8 +154,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
protected volatile ServerBootstrap serverBootstrap;
protected final List<Channel> serverChannels = new ArrayList<>();
private final Netty4CorsConfig corsConfig;
public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
@ -184,11 +169,6 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings);
this.tcpNoDelay = SETTING_HTTP_TCP_NO_DELAY.get(settings);
this.tcpKeepAlive = SETTING_HTTP_TCP_KEEP_ALIVE.get(settings);
this.reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
this.tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
this.tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis());
ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings);
@ -217,6 +197,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(configureServerChannelHandler());
serverBootstrap.handler(new ServerChannelExceptionHandler(this));
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));
@ -238,10 +219,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);
this.boundAddress = createBoundHttpAddress();
if (logger.isInfoEnabled()) {
logger.info("{}", boundAddress);
}
bindServer();
success = true;
} finally {
if (success == false) {
@ -284,78 +262,29 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
}
@Override
protected TransportAddress bindAddress(final InetAddress hostAddress) {
final AtomicReference<Exception> lastException = new AtomicReference<>();
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
boolean success = port.iterate(portNumber -> {
try {
synchronized (serverChannels) {
ChannelFuture future = serverBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)).sync();
serverChannels.add(future.channel());
boundSocket.set((InetSocketAddress) future.channel().localAddress());
}
} catch (Exception e) {
lastException.set(e);
return false;
}
return true;
});
if (!success) {
throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get());
}
if (logger.isDebugEnabled()) {
logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get()));
}
return new TransportAddress(boundSocket.get());
protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Exception {
ChannelFuture future = serverBootstrap.bind(socketAddress).sync();
Channel channel = future.channel();
Netty4HttpServerChannel httpServerChannel = new Netty4HttpServerChannel(channel);
channel.attr(HTTP_SERVER_CHANNEL_KEY).set(httpServerChannel);
return httpServerChannel;
}
@Override
protected void doStop() {
synchronized (serverChannels) {
if (!serverChannels.isEmpty()) {
try {
Netty4Utils.closeChannels(serverChannels);
} catch (IOException e) {
logger.trace("exception while closing channels", e);
} finally {
serverChannels.clear();
}
}
}
// TODO: Move all of channel closing to abstract class once server channels are handled
try {
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
logger.warn("unexpected exception while closing http channels", e);
}
httpChannels.clear();
protected void stopInternal() {
if (serverBootstrap != null) {
serverBootstrap.config().group().shutdownGracefully(0, 5, TimeUnit.SECONDS).awaitUninterruptibly();
serverBootstrap = null;
}
}
@Override
protected void doClose() {
}
@Override
public HttpStats stats() {
return new HttpStats(httpChannels.size(), totalChannelsAccepted.get());
}
@Override
protected void onException(HttpChannel channel, Exception cause) {
if (cause instanceof ReadTimeoutException) {
if (logger.isTraceEnabled()) {
logger.trace("Http read timeout {}", channel);
}
CloseableChannel.closeChannel(channel);;
CloseableChannel.closeChannel(channel);
} else {
super.onException(channel, cause);
}
@ -366,6 +295,7 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
}
static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel");
static final AttributeKey<Netty4HttpServerChannel> HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-http-server-channel");
protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
@ -413,4 +343,24 @@ public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
}
}
@ChannelHandler.Sharable
private static class ServerChannelExceptionHandler extends ChannelHandlerAdapter {
private final Netty4HttpServerTransport transport;
private ServerChannelExceptionHandler(Netty4HttpServerTransport transport) {
this.transport = transport;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Netty4Utils.maybeDie(cause);
Netty4HttpServerChannel httpServerChannel = ctx.channel().attr(HTTP_SERVER_CHANNEL_KEY).get();
if (cause instanceof Error) {
transport.onServerException(httpServerChannel, new Exception(cause));
} else {
transport.onServerException(httpServerChannel, (Exception) cause);
}
}
}
}

View File

@ -24,6 +24,8 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Attribute;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.transport.TcpHeader;
import org.elasticsearch.transport.Transports;
@ -36,11 +38,9 @@ import org.elasticsearch.transport.Transports;
final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
private final Netty4Transport transport;
private final String profileName;
Netty4MessageChannelHandler(Netty4Transport transport, String profileName) {
Netty4MessageChannelHandler(Netty4Transport transport) {
this.transport = transport;
this.profileName = profileName;
}
@Override
@ -58,7 +58,7 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
// netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
// buffer, or in the cumulative buffer, which is cleaned each time so it could be bigger than the actual size
BytesReference reference = Netty4Utils.toBytesReference(buffer, remainingMessageSize);
Attribute<NettyTcpChannel> channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY);
Attribute<Netty4TcpChannel> channelAttribute = channel.attr(Netty4Transport.CHANNEL_KEY);
transport.messageReceived(reference, channelAttribute.get());
} finally {
// Set the expected position of the buffer, no matter what happened
@ -69,7 +69,13 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Netty4Utils.maybeDie(cause);
transport.exceptionCaught(ctx, cause);
final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
final Throwable newCause = unwrapped != null ? unwrapped : cause;
Netty4TcpChannel tcpChannel = ctx.channel().attr(Netty4Transport.CHANNEL_KEY).get();
if (newCause instanceof Error) {
transport.onException(tcpChannel, new Exception(newCause));
} else {
transport.onException(tcpChannel, (Exception) newCause);
}
}
}

View File

@ -30,13 +30,13 @@ import org.elasticsearch.transport.TransportException;
import java.net.InetSocketAddress;
public class NettyTcpChannel implements TcpChannel {
public class Netty4TcpChannel implements TcpChannel {
private final Channel channel;
private final String profile;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
NettyTcpChannel(Channel channel, String profile) {
Netty4TcpChannel(Channel channel, String profile) {
this.channel = channel;
this.profile = profile;
this.channel.closeFuture().addListener(f -> {
@ -118,7 +118,7 @@ public class NettyTcpChannel implements TcpChannel {
@Override
public String toString() {
return "NettyTcpChannel{" +
return "Netty4TcpChannel{" +
"localAddress=" + getLocalAddress() +
", remoteAddress=" + channel.remoteAddress() +
'}';

View File

@ -0,0 +1,84 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty4;
import io.netty.channel.Channel;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.transport.TcpServerChannel;
import java.net.InetSocketAddress;
public class Netty4TcpServerChannel implements TcpServerChannel {
private final Channel channel;
private final String profile;
private final CompletableContext<Void> closeContext = new CompletableContext<>();
Netty4TcpServerChannel(Channel channel, String profile) {
this.channel = channel;
this.profile = profile;
this.channel.closeFuture().addListener(f -> {
if (f.isSuccess()) {
closeContext.complete(null);
} else {
Throwable cause = f.cause();
if (cause instanceof Error) {
Netty4Utils.maybeDie(cause);
closeContext.completeExceptionally(new Exception(cause));
} else {
closeContext.completeExceptionally((Exception) cause);
}
}
});
}
@Override
public String getProfile() {
return profile;
}
@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) channel.localAddress();
}
@Override
public void close() {
channel.close();
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
closeContext.addListener(ActionListener.toBiConsumer(listener));
}
@Override
public boolean isOpen() {
return channel.isOpen();
}
@Override
public String toString() {
return "Netty4TcpChannel{" +
"localAddress=" + getLocalAddress() +
'}';
}
}

View File

@ -25,6 +25,7 @@ import io.netty.channel.AdaptiveRecvByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@ -37,8 +38,6 @@ import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.collect.Tuple;
@ -196,6 +195,7 @@ public class Netty4Transport extends TcpTransport {
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.childHandler(getServerChannelInitializer(name));
serverBootstrap.handler(new ServerChannelExceptionHandler());
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, profileSettings.tcpNoDelay);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, profileSettings.tcpKeepAlive);
@ -226,17 +226,11 @@ public class Netty4Transport extends TcpTransport {
return new ClientChannelInitializer();
}
static final AttributeKey<NettyTcpChannel> CHANNEL_KEY = AttributeKey.newInstance("es-channel");
protected final void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
final Throwable unwrapped = ExceptionsHelper.unwrap(cause, ElasticsearchException.class);
final Throwable t = unwrapped != null ? unwrapped : cause;
Channel channel = ctx.channel();
onException(channel.attr(CHANNEL_KEY).get(), t instanceof Exception ? (Exception) t : new ElasticsearchException(t));
}
static final AttributeKey<Netty4TcpChannel> CHANNEL_KEY = AttributeKey.newInstance("es-channel");
static final AttributeKey<Netty4TcpServerChannel> SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-server-channel");
@Override
protected NettyTcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> listener) throws IOException {
protected Netty4TcpChannel initiateChannel(InetSocketAddress address, ActionListener<Void> listener) throws IOException {
ChannelFuture channelFuture = bootstrap.connect(address);
Channel channel = channelFuture.channel();
if (channel == null) {
@ -245,7 +239,7 @@ public class Netty4Transport extends TcpTransport {
}
addClosedExceptionLogger(channel);
NettyTcpChannel nettyChannel = new NettyTcpChannel(channel, "default");
Netty4TcpChannel nettyChannel = new Netty4TcpChannel(channel, "default");
channel.attr(CHANNEL_KEY).set(nettyChannel);
channelFuture.addListener(f -> {
@ -266,10 +260,10 @@ public class Netty4Transport extends TcpTransport {
}
@Override
protected NettyTcpChannel bind(String name, InetSocketAddress address) {
protected Netty4TcpServerChannel bind(String name, InetSocketAddress address) {
Channel channel = serverBootstraps.get(name).bind(address).syncUninterruptibly().channel();
NettyTcpChannel esChannel = new NettyTcpChannel(channel, name);
channel.attr(CHANNEL_KEY).set(esChannel);
Netty4TcpServerChannel esChannel = new Netty4TcpServerChannel(channel, name);
channel.attr(SERVER_CHANNEL_KEY).set(esChannel);
return esChannel;
}
@ -310,7 +304,7 @@ public class Netty4Transport extends TcpTransport {
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
// using a dot as a prefix means this cannot come from any settings parsed
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, ".client"));
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this));
}
@Override
@ -331,11 +325,11 @@ public class Netty4Transport extends TcpTransport {
@Override
protected void initChannel(Channel ch) throws Exception {
addClosedExceptionLogger(ch);
NettyTcpChannel nettyTcpChannel = new NettyTcpChannel(ch, name);
Netty4TcpChannel nettyTcpChannel = new Netty4TcpChannel(ch, name);
ch.attr(CHANNEL_KEY).set(nettyTcpChannel);
ch.pipeline().addLast("logging", new ESLoggingHandler());
ch.pipeline().addLast("size", new Netty4SizeHeaderFrameDecoder());
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this, name));
ch.pipeline().addLast("dispatcher", new Netty4MessageChannelHandler(Netty4Transport.this));
serverAcceptedChannel(nettyTcpChannel);
}
@ -353,4 +347,19 @@ public class Netty4Transport extends TcpTransport {
}
});
}
@ChannelHandler.Sharable
private class ServerChannelExceptionHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
Netty4Utils.maybeDie(cause);
Netty4TcpServerChannel serverChannel = ctx.channel().attr(SERVER_CHANNEL_KEY).get();
if (cause instanceof Error) {
onServerException(serverChannel, new Exception(cause));
} else {
onServerException(serverChannel, (Exception) cause);
}
}
}
}

View File

@ -70,7 +70,7 @@ public class Netty4SizeHeaderFrameDecoderTests extends ESTestCase {
nettyTransport.start();
TransportAddress[] boundAddresses = nettyTransport.boundAddress().boundAddresses();
TransportAddress transportAddress = (TransportAddress) randomFrom(boundAddresses);
TransportAddress transportAddress = randomFrom(boundAddresses);
port = transportAddress.address().getPort();
host = transportAddress.address().getAddress();
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.nio.NioServerSocketChannel;
import java.io.IOException;
import java.nio.channels.ServerSocketChannel;
public class NioHttpServerChannel extends NioServerSocketChannel implements HttpServerChannel {
NioHttpServerChannel(ServerSocketChannel serverSocketChannel) throws IOException {
super(serverSocketChannel);
}
@Override
public void addCloseListener(ActionListener<Void> listener) {
addCloseListener(ActionListener.toBiConsumer(listener));
}
@Override
public String toString() {
return "NioHttpServerChannel{localAddress=" + getLocalAddress() + "}";
}
}

View File

@ -21,40 +21,29 @@ package org.elasticsearch.http.nio;
import io.netty.handler.codec.http.HttpMethod;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.http.AbstractHttpServerTransport;
import org.elasticsearch.http.BindHttpException;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.http.HttpServerTransport;
import org.elasticsearch.http.HttpStats;
import org.elasticsearch.http.nio.cors.NioCorsConfig;
import org.elasticsearch.http.nio.cors.NioCorsConfigBuilder;
import org.elasticsearch.nio.BytesChannelContext;
import org.elasticsearch.nio.ChannelFactory;
import org.elasticsearch.nio.EventHandler;
import org.elasticsearch.nio.InboundChannelBuffer;
import org.elasticsearch.nio.NioChannel;
import org.elasticsearch.nio.NioGroup;
import org.elasticsearch.nio.NioSelector;
import org.elasticsearch.nio.NioServerSocketChannel;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.nio.SocketChannelContext;
@ -62,18 +51,11 @@ import org.elasticsearch.rest.RestUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.regex.Pattern;
@ -113,7 +95,6 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
private final int tcpSendBufferSize;
private final int tcpReceiveBufferSize;
private final Set<NioServerSocketChannel> serverChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private NioGroup nioGroup;
private HttpChannelFactory channelFactory;
private final NioCorsConfig corsConfig;
@ -156,12 +137,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
daemonThreadFactory(this.settings, HTTP_SERVER_WORKER_THREAD_NAME_PREFIX), workerCount,
(s) -> new EventHandler(this::onNonChannelException, s));
channelFactory = new HttpChannelFactory();
this.boundAddress = createBoundHttpAddress();
if (logger.isInfoEnabled()) {
logger.info("{}", boundAddress);
}
bindServer();
success = true;
} catch (IOException e) {
throw new ElasticsearchException(e);
@ -173,26 +149,7 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
}
@Override
protected void doStop() {
synchronized (serverChannels) {
if (serverChannels.isEmpty() == false) {
try {
closeChannels(new ArrayList<>(serverChannels));
} catch (Exception e) {
logger.error("unexpected exception while closing http server channels", e);
}
serverChannels.clear();
}
}
// TODO: Move all of channel closing to abstract class once server channels are handled
try {
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
logger.warn("unexpected exception while closing http channels", e);
}
httpChannels.clear();
protected void stopInternal() {
try {
nioGroup.close();
} catch (Exception e) {
@ -201,40 +158,8 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
}
@Override
protected void doClose() throws IOException {
}
@Override
protected TransportAddress bindAddress(InetAddress hostAddress) {
final AtomicReference<Exception> lastException = new AtomicReference<>();
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
boolean success = port.iterate(portNumber -> {
try {
synchronized (serverChannels) {
InetSocketAddress address = new InetSocketAddress(hostAddress, portNumber);
NioServerSocketChannel channel = nioGroup.bindServerChannel(address, channelFactory);
serverChannels.add(channel);
boundSocket.set(channel.getLocalAddress());
}
} catch (Exception e) {
lastException.set(e);
return false;
}
return true;
});
if (success == false) {
throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get());
}
if (logger.isDebugEnabled()) {
logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get()));
}
return new TransportAddress(boundSocket.get());
}
@Override
public HttpStats stats() {
return new HttpStats(serverChannels.size(), totalChannelsAccepted.get());
protected HttpServerChannel bind(InetSocketAddress socketAddress) throws IOException {
return nioGroup.bindServerChannel(socketAddress, channelFactory);
}
static NioCorsConfig buildCorsConfig(Settings settings) {
@ -269,33 +194,11 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
.build();
}
private void closeChannels(List<NioChannel> channels) {
List<ActionFuture<Void>> futures = new ArrayList<>(channels.size());
for (NioChannel channel : channels) {
PlainActionFuture<Void> future = PlainActionFuture.newFuture();
channel.addCloseListener(ActionListener.toBiConsumer(future));
futures.add(future);
channel.close();
}
List<RuntimeException> closeExceptions = new ArrayList<>();
for (ActionFuture<Void> f : futures) {
try {
f.actionGet();
} catch (RuntimeException e) {
closeExceptions.add(e);
}
}
ExceptionsHelper.rethrowAndSuppress(closeExceptions);
}
private void acceptChannel(NioSocketChannel socketChannel) {
super.serverAcceptedChannel((HttpChannel) socketChannel);
}
private class HttpChannelFactory extends ChannelFactory<NioServerSocketChannel, NioHttpChannel> {
private class HttpChannelFactory extends ChannelFactory<NioHttpServerChannel, NioHttpChannel> {
private HttpChannelFactory() {
super(new RawChannelFactory(tcpNoDelay, tcpKeepAlive, reuseAddress, tcpSendBufferSize, tcpReceiveBufferSize));
@ -303,29 +206,28 @@ public class NioHttpServerTransport extends AbstractHttpServerTransport {
@Override
public NioHttpChannel createChannel(NioSelector selector, SocketChannel channel) throws IOException {
NioHttpChannel nioChannel = new NioHttpChannel(channel);
NioHttpChannel httpChannel = new NioHttpChannel(channel);
java.util.function.Supplier<InboundChannelBuffer.Page> pageSupplier = () -> {
Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false);
return new InboundChannelBuffer.Page(ByteBuffer.wrap(bytes.v()), bytes::close);
};
HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(nioChannel,NioHttpServerTransport.this,
HttpReadWriteHandler httpReadWritePipeline = new HttpReadWriteHandler(httpChannel,NioHttpServerTransport.this,
handlingSettings, corsConfig);
Consumer<Exception> exceptionHandler = (e) -> onException(nioChannel, e);
SocketChannelContext context = new BytesChannelContext(nioChannel, selector, exceptionHandler, httpReadWritePipeline,
Consumer<Exception> exceptionHandler = (e) -> onException(httpChannel, e);
SocketChannelContext context = new BytesChannelContext(httpChannel, selector, exceptionHandler, httpReadWritePipeline,
new InboundChannelBuffer(pageSupplier));
nioChannel.setContext(context);
return nioChannel;
httpChannel.setContext(context);
return httpChannel;
}
@Override
public NioServerSocketChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
NioServerSocketChannel nioChannel = new NioServerSocketChannel(channel);
Consumer<Exception> exceptionHandler = (e) -> logger.error(() ->
new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e);
public NioHttpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
NioHttpServerChannel httpServerChannel = new NioHttpServerChannel(channel);
Consumer<Exception> exceptionHandler = (e) -> onServerException(httpServerChannel, e);
Consumer<NioSocketChannel> acceptor = NioHttpServerTransport.this::acceptChannel;
ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler);
nioChannel.setContext(context);
return nioChannel;
ServerChannelContext context = new ServerChannelContext(httpServerChannel, this, selector, acceptor, exceptionHandler);
httpServerChannel.setContext(context);
return httpServerChannel;
}
}

View File

@ -20,19 +20,17 @@
package org.elasticsearch.transport.nio;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.nio.NioServerSocketChannel;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpServerChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
/**
* This is an implementation of {@link NioServerSocketChannel} that adheres to the {@link TcpChannel}
* This is an implementation of {@link NioServerSocketChannel} that adheres to the {@link TcpServerChannel}
* interface. As it is a server socket, setting SO_LINGER and sending messages is not supported.
*/
public class NioTcpServerChannel extends NioServerSocketChannel implements TcpChannel {
public class NioTcpServerChannel extends NioServerSocketChannel implements TcpServerChannel {
private final String profile;
@ -41,21 +39,6 @@ public class NioTcpServerChannel extends NioServerSocketChannel implements TcpCh
this.profile = profile;
}
@Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
throw new UnsupportedOperationException("Cannot send a message to a server channel.");
}
@Override
public void setSoLinger(int value) throws IOException {
throw new UnsupportedOperationException("Cannot set SO_LINGER on a server channel.");
}
@Override
public InetSocketAddress getRemoteAddress() {
return null;
}
@Override
public void close() {
getContext().closeChannel();

View File

@ -19,7 +19,6 @@
package org.elasticsearch.transport.nio;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -176,8 +175,7 @@ public class NioTransport extends TcpTransport {
@Override
public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel);
Consumer<Exception> exceptionHandler = (e) -> logger.error(() ->
new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e);
Consumer<Exception> exceptionHandler = (e) -> onServerException(nioChannel, e);
Consumer<NioSocketChannel> acceptor = NioTransport.this::acceptChannel;
ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler);
nioChannel.setContext(context);

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.BoundTransportAddress;
@ -53,6 +54,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_BIND_HOST;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
@ -74,9 +76,10 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
private final String[] bindHosts;
private final String[] publishHosts;
protected final AtomicLong totalChannelsAccepted = new AtomicLong();
protected final Set<HttpChannel> httpChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
protected volatile BoundTransportAddress boundAddress;
private volatile BoundTransportAddress boundAddress;
private final AtomicLong totalChannelsAccepted = new AtomicLong();
private final Set<HttpChannel> httpChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final Set<HttpServerChannel> httpServerChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
protected AbstractHttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher) {
@ -116,7 +119,12 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
return new HttpInfo(boundTransportAddress, maxContentLength.getBytes());
}
protected BoundTransportAddress createBoundHttpAddress() {
@Override
public HttpStats stats() {
return new HttpStats(httpChannels.size(), totalChannelsAccepted.get());
}
protected void bindServer() {
// Bind and start to accept incoming connections.
InetAddress hostAddresses[];
try {
@ -138,11 +146,71 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
}
final int publishPort = resolvePublishPort(settings, boundAddresses, publishInetAddress);
final InetSocketAddress publishAddress = new InetSocketAddress(publishInetAddress, publishPort);
return new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[0]), new TransportAddress(publishAddress));
TransportAddress publishAddress = new TransportAddress(new InetSocketAddress(publishInetAddress, publishPort));
this.boundAddress = new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[0]), publishAddress);
logger.info("{}", boundAddress);
}
protected abstract TransportAddress bindAddress(InetAddress hostAddress);
private TransportAddress bindAddress(final InetAddress hostAddress) {
final AtomicReference<Exception> lastException = new AtomicReference<>();
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
boolean success = port.iterate(portNumber -> {
try {
synchronized (httpServerChannels) {
HttpServerChannel httpServerChannel = bind(new InetSocketAddress(hostAddress, portNumber));
httpServerChannels.add(httpServerChannel);
boundSocket.set(httpServerChannel.getLocalAddress());
}
} catch (Exception e) {
lastException.set(e);
return false;
}
return true;
});
if (!success) {
throw new BindHttpException("Failed to bind to [" + port.getPortRangeString() + "]", lastException.get());
}
if (logger.isDebugEnabled()) {
logger.debug("Bound http to address {{}}", NetworkAddress.format(boundSocket.get()));
}
return new TransportAddress(boundSocket.get());
}
protected abstract HttpServerChannel bind(InetSocketAddress hostAddress) throws Exception;
@Override
protected void doStop() {
synchronized (httpServerChannels) {
if (httpServerChannels.isEmpty() == false) {
try {
CloseableChannel.closeChannels(new ArrayList<>(httpServerChannels), true);
} catch (Exception e) {
logger.warn("exception while closing channels", e);
} finally {
httpServerChannels.clear();
}
}
}
try {
CloseableChannel.closeChannels(new ArrayList<>(httpChannels), true);
} catch (Exception e) {
logger.warn("unexpected exception while closing http channels", e);
}
httpChannels.clear();
stopInternal();
}
@Override
protected void doClose() {
}
/**
* Called to tear down internal resources
*/
protected abstract void stopInternal();
// package private for tests
static int resolvePublishPort(Settings settings, List<TransportAddress> boundAddresses, InetAddress publishInetAddress) {
@ -202,14 +270,18 @@ public abstract class AbstractHttpServerTransport extends AbstractLifecycleCompo
}
}
protected void onServerException(HttpServerChannel channel, Exception e) {
logger.error(new ParameterizedMessage("exception from http server channel caught on transport layer [channel={}]", channel), e);
}
/**
* Exception handler for exceptions that are not associated with a specific channel.
*
* @param exception the exception
*/
protected void onNonChannelException(Exception exception) {
logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", Thread.currentThread().getName()),
exception);
String threadName = Thread.currentThread().getName();
logger.warn(new ParameterizedMessage("exception caught on transport layer [thread={}]", threadName), exception);
}
protected void serverAcceptedChannel(HttpChannel httpChannel) {

View File

@ -0,0 +1,34 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http;
import org.elasticsearch.common.network.CloseableChannel;
import java.net.InetSocketAddress;
public interface HttpServerChannel extends CloseableChannel {
/**
* Returns the local address for this channel.
*
* @return the local address of this channel.
*/
InetSocketAddress getLocalAddress();
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
import org.elasticsearch.common.network.CloseableChannel;
import java.net.InetSocketAddress;
/**
* This is a tcp channel representing a server channel listening for new connections. It is the server
* channel abstraction used by the {@link TcpTransport} and {@link TransportService}. All tcp transport
* implementations must return server channels that adhere to the required method contracts.
*/
public interface TcpServerChannel extends CloseableChannel {
/**
* This returns the profile for this channel.
*/
String getProfile();
/**
* Returns the local address for this channel.
*
* @return the local address of this channel.
*/
InetSocketAddress getLocalAddress();
}

View File

@ -21,9 +21,6 @@ package org.elasticsearch.transport;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.hppc.IntSet;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionFuture;
@ -31,6 +28,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NotifyOnceListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
@ -52,6 +50,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.network.NetworkAddress;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
@ -68,6 +67,7 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.rest.RestStatus;
@ -210,7 +210,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
private final ConcurrentMap<String, BoundTransportAddress> profileBoundAddresses = newConcurrentMap();
// node id to actual channel
private final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
private final Map<String, List<TcpChannel>> serverChannels = newConcurrentMap();
private final Map<String, List<TcpServerChannel>> serverChannels = newConcurrentMap();
private final Set<TcpChannel> acceptedChannels = Collections.newSetFromMap(new ConcurrentHashMap<>());
private final KeyedLock<String> connectionLock = new KeyedLock<>();
@ -792,9 +792,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
final AtomicReference<InetSocketAddress> boundSocket = new AtomicReference<>();
boolean success = portsRange.iterate(portNumber -> {
try {
TcpChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber));
TcpServerChannel channel = bind(name, new InetSocketAddress(hostAddress, portNumber));
synchronized (serverChannels) {
List<TcpChannel> list = serverChannels.get(name);
List<TcpServerChannel> list = serverChannels.get(name);
if (list == null) {
list = new ArrayList<>();
serverChannels.put(name, list);
@ -957,9 +957,9 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
closeLock.writeLock().lock();
try {
// first stop to accept any incoming connections so nobody can connect to this transport
for (Map.Entry<String, List<TcpChannel>> entry : serverChannels.entrySet()) {
for (Map.Entry<String, List<TcpServerChannel>> entry : serverChannels.entrySet()) {
String profile = entry.getKey();
List<TcpChannel> channels = entry.getValue();
List<TcpServerChannel> channels = entry.getValue();
ActionListener<Void> closeFailLogger = ActionListener.wrap(c -> {},
e -> logger.warn(() -> new ParameterizedMessage("Error closing serverChannel for profile [{}]", profile), e));
channels.forEach(c -> c.addCloseListener(closeFailLogger));
@ -999,7 +999,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
}
protected void onException(TcpChannel channel, Exception e) {
public void onException(TcpChannel channel, Exception e) {
if (!lifecycle.started()) {
// just close and ignore - we are already stopped and just need to make sure we release all resources
CloseableChannel.closeChannel(channel);
@ -1049,6 +1049,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
}
}
protected void onServerException(TcpServerChannel channel, Exception e) {
logger.error(new ParameterizedMessage("exception from server channel caught on transport layer [channel={}]", channel), e);
}
/**
* Exception handler for exceptions that are not associated with a specific channel.
*
@ -1072,7 +1076,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
* @param name the profile name
* @param address the address to bind to
*/
protected abstract TcpChannel bind(String name, InetSocketAddress address) throws IOException;
protected abstract TcpServerChannel bind(String name, InetSocketAddress address) throws IOException;
/**
* Initiate a single tcp socket channel.
@ -1087,8 +1091,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
/**
* Called to tear down internal resources
*/
protected void stopInternal() {
}
protected abstract void stopInternal();
public boolean canCompress(TransportRequest request) {
return compress && (!(request instanceof BytesTransportRequest));

View File

@ -35,8 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
@ -128,8 +127,9 @@ public class AbstractHttpServerTransportTests extends ESTestCase {
try (AbstractHttpServerTransport transport =
new AbstractHttpServerTransport(Settings.EMPTY, networkService, bigArrays, threadPool, xContentRegistry(), dispatcher) {
@Override
protected TransportAddress bindAddress(InetAddress hostAddress) {
protected HttpServerChannel bind(InetSocketAddress hostAddress) {
return null;
}
@ -139,12 +139,7 @@ public class AbstractHttpServerTransportTests extends ESTestCase {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() throws IOException {
protected void stopInternal() {
}

View File

@ -193,6 +193,10 @@ public class TcpTransportTests extends ESTestCase {
return new FakeChannel(messageCaptor);
}
@Override
protected void stopInternal() {
}
@Override
public NodeChannels getConnection(DiscoveryNode node) {
int numConnections = MockTcpTransport.LIGHT_PROFILE.getNumConnections();
@ -237,7 +241,7 @@ public class TcpTransportTests extends ESTestCase {
}
}
private static final class FakeChannel implements TcpChannel {
private static final class FakeChannel implements TcpChannel, TcpServerChannel {
private final AtomicReference<BytesReference> messageCaptor;

View File

@ -225,7 +225,7 @@ public class MockTcpTransport extends TcpTransport {
socket.setReuseAddress(TCP_REUSE_ADDRESS.get(settings));
}
public final class MockChannel implements Closeable, TcpChannel {
public final class MockChannel implements Closeable, TcpChannel, TcpServerChannel {
private final AtomicBoolean isOpen = new AtomicBoolean(true);
private final InetSocketAddress localAddress;
private final ServerSocket serverSocket;

View File

@ -41,6 +41,7 @@ import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.nio.ServerChannelContext;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpServerChannel;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.transport.Transports;
@ -191,7 +192,7 @@ public class MockNioTransport extends TcpTransport {
}
}
private static class MockServerChannel extends NioServerSocketChannel implements TcpChannel {
private static class MockServerChannel extends NioServerSocketChannel implements TcpServerChannel {
private final String profile;
@ -215,21 +216,6 @@ public class MockNioTransport extends TcpTransport {
public void addCloseListener(ActionListener<Void> listener) {
addCloseListener(ActionListener.toBiConsumer(listener));
}
@Override
public void setSoLinger(int value) throws IOException {
throw new UnsupportedOperationException("Cannot set SO_LINGER on a server channel.");
}
@Override
public InetSocketAddress getRemoteAddress() {
return null;
}
@Override
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
throw new UnsupportedOperationException("Cannot send a message to a server channel.");
}
}
private static class MockSocketChannel extends NioSocketChannel implements TcpChannel {

View File

@ -109,7 +109,7 @@ public class SecurityNetty4Transport extends Netty4Transport {
}
@Override
protected void onException(TcpChannel channel, Exception e) {
public void onException(TcpChannel channel, Exception e) {
if (!lifecycle.started()) {
// just close and ignore - we are already stopped and just need to make sure we release all resources
CloseableChannel.closeChannel(channel);

View File

@ -24,7 +24,7 @@ import org.elasticsearch.transport.TcpTransportChannel;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.netty4.NettyTcpChannel;
import org.elasticsearch.transport.netty4.Netty4TcpChannel;
import org.elasticsearch.xpack.core.security.SecurityContext;
import org.elasticsearch.xpack.core.security.authc.Authentication;
import org.elasticsearch.xpack.core.security.user.KibanaUser;
@ -116,8 +116,8 @@ public interface ServerTransportFilter {
}
if (extractClientCert && (unwrappedChannel instanceof TcpTransportChannel) &&
((TcpTransportChannel) unwrappedChannel).getChannel() instanceof NettyTcpChannel) {
Channel channel = ((NettyTcpChannel) ((TcpTransportChannel) unwrappedChannel).getChannel()).getLowLevelChannel();
((TcpTransportChannel) unwrappedChannel).getChannel() instanceof Netty4TcpChannel) {
Channel channel = ((Netty4TcpChannel) ((TcpTransportChannel) unwrappedChannel).getChannel()).getLowLevelChannel();
SslHandler sslHandler = channel.pipeline().get(SslHandler.class);
if (channel.isOpen()) {
assert sslHandler != null : "channel [" + channel + "] did not have a ssl handler. pipeline " + channel.pipeline();

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.security.transport.nio;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.recycler.Recycler;
@ -131,9 +130,8 @@ public class SecurityNioTransport extends NioTransport {
@Override
public NioTcpServerChannel createServerChannel(NioSelector selector, ServerSocketChannel channel) throws IOException {
NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel);
Consumer<Exception> exceptionHandler = (e) -> logger.error(() ->
new ParameterizedMessage("exception from server channel caught on transport layer [{}]", channel), e);
NioTcpServerChannel nioChannel = new NioTcpServerChannel(profileName, channel);;
Consumer<Exception> exceptionHandler = (e) -> onServerException(nioChannel, e);
Consumer<NioSocketChannel> acceptor = SecurityNioTransport.this::acceptChannel;
ServerChannelContext context = new ServerChannelContext(nioChannel, this, selector, acceptor, exceptionHandler);
nioChannel.setContext(context);