Remove blocking TCP clients and servers (#22639)

This commit removes the option to use the blocking variants of the TCP
transport server, TCP transport client, or http server.
This commit is contained in:
Tim Brooks 2017-01-16 18:38:51 -06:00 committed by GitHub
parent ebd38e2a6a
commit 16a76d9bc0
11 changed files with 15 additions and 161 deletions

View File

@ -60,12 +60,6 @@ public class NetworkService extends AbstractComponent {
Setting.byteSizeSetting("network.tcp.send_buffer_size", new ByteSizeValue(-1), Property.NodeScope);
public static final Setting<ByteSizeValue> TCP_RECEIVE_BUFFER_SIZE =
Setting.byteSizeSetting("network.tcp.receive_buffer_size", new ByteSizeValue(-1), Property.NodeScope);
public static final Setting<Boolean> TCP_BLOCKING =
Setting.boolSetting("network.tcp.blocking", false, Property.NodeScope);
public static final Setting<Boolean> TCP_BLOCKING_SERVER =
Setting.boolSetting("network.tcp.blocking_server", TCP_BLOCKING, Property.NodeScope);
public static final Setting<Boolean> TCP_BLOCKING_CLIENT =
Setting.boolSetting("network.tcp.blocking_client", TCP_BLOCKING, Property.NodeScope);
public static final Setting<TimeValue> TCP_CONNECT_TIMEOUT =
Setting.timeSetting("network.tcp.connect_timeout", new TimeValue(30, TimeUnit.SECONDS), Property.NodeScope);
}

View File

@ -273,7 +273,6 @@ public final class ClusterSettings extends AbstractScopedSettings {
TcpTransport.CONNECTIONS_PER_NODE_STATE,
TcpTransport.CONNECTIONS_PER_NODE_PING,
TcpTransport.PING_SCHEDULE,
TcpTransport.TCP_BLOCKING_CLIENT,
TcpTransport.TCP_CONNECT_TIMEOUT,
NetworkService.NETWORK_SERVER,
TcpTransport.TCP_NO_DELAY,
@ -281,7 +280,6 @@ public final class ClusterSettings extends AbstractScopedSettings {
TcpTransport.TCP_REUSE_ADDRESS,
TcpTransport.TCP_SEND_BUFFER_SIZE,
TcpTransport.TCP_RECEIVE_BUFFER_SIZE,
TcpTransport.TCP_BLOCKING_SERVER,
NetworkService.GLOBAL_NETWORK_HOST_SETTING,
NetworkService.GLOBAL_NETWORK_BINDHOST_SETTING,
NetworkService.GLOBAL_NETWORK_PUBLISHHOST_SETTING,
@ -290,9 +288,6 @@ public final class ClusterSettings extends AbstractScopedSettings {
NetworkService.TcpSettings.TCP_REUSE_ADDRESS,
NetworkService.TcpSettings.TCP_SEND_BUFFER_SIZE,
NetworkService.TcpSettings.TCP_RECEIVE_BUFFER_SIZE,
NetworkService.TcpSettings.TCP_BLOCKING,
NetworkService.TcpSettings.TCP_BLOCKING_SERVER,
NetworkService.TcpSettings.TCP_BLOCKING_CLIENT,
NetworkService.TcpSettings.TCP_CONNECT_TIMEOUT,
IndexSettings.QUERY_STRING_ANALYZE_WILDCARD,
IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD,

View File

@ -28,7 +28,6 @@ import org.elasticsearch.rest.RestRequest;
public interface HttpServerTransport extends LifecycleComponent {
String HTTP_SERVER_WORKER_THREAD_NAME_PREFIX = "http_server_worker";
String HTTP_SERVER_BOSS_THREAD_NAME_PREFIX = "http_server_boss";
BoundTransportAddress boundAddress();

View File

@ -112,8 +112,6 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.new
public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent implements Transport {
public static final String TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX = "transport_server_worker";
public static final String TRANSPORT_SERVER_BOSS_THREAD_NAME_PREFIX = "transport_server_boss";
public static final String TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX = "transport_client_worker";
public static final String TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX = "transport_client_boss";
// the scheduled internal ping interval setting, defaults to disabled (-1)
@ -137,10 +135,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
boolSetting("transport.tcp.keep_alive", NetworkService.TcpSettings.TCP_KEEP_ALIVE, Setting.Property.NodeScope);
public static final Setting<Boolean> TCP_REUSE_ADDRESS =
boolSetting("transport.tcp.reuse_address", NetworkService.TcpSettings.TCP_REUSE_ADDRESS, Setting.Property.NodeScope);
public static final Setting<Boolean> TCP_BLOCKING_CLIENT =
boolSetting("transport.tcp.blocking_client", NetworkService.TcpSettings.TCP_BLOCKING_CLIENT, Setting.Property.NodeScope);
public static final Setting<Boolean> TCP_BLOCKING_SERVER =
boolSetting("transport.tcp.blocking_server", NetworkService.TcpSettings.TCP_BLOCKING_SERVER, Setting.Property.NodeScope);
public static final Setting<ByteSizeValue> TCP_SEND_BUFFER_SIZE =
Setting.byteSizeSetting("transport.tcp.send_buffer_size", NetworkService.TcpSettings.TCP_SEND_BUFFER_SIZE,
Setting.Property.NodeScope);
@ -150,7 +144,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9);
private static final int PING_DATA_SIZE = -1;
protected final boolean blockingClient;
private final CircuitBreakerService circuitBreakerService;
// package visibility for tests
protected final ScheduledPing scheduledPing;
@ -194,7 +187,6 @@ public abstract class TcpTransport<Channel> extends AbstractLifecycleComponent i
this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings);
this.networkService = networkService;
this.transportName = transportName;
this.blockingClient = TCP_BLOCKING_CLIENT.get(settings);
defaultConnectionProfile = buildDefaultConnectionProfile(settings);
}

View File

@ -37,11 +37,8 @@ public enum Transports {
public static final boolean isTransportThread(Thread t) {
final String threadName = t.getName();
for (String s : Arrays.asList(
HttpServerTransport.HTTP_SERVER_BOSS_THREAD_NAME_PREFIX,
HttpServerTransport.HTTP_SERVER_WORKER_THREAD_NAME_PREFIX,
TcpTransport.TRANSPORT_SERVER_BOSS_THREAD_NAME_PREFIX,
TcpTransport.TRANSPORT_SERVER_WORKER_THREAD_NAME_PREFIX,
TcpTransport.TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX,
TcpTransport.TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX,
TEST_MOCK_TRANSPORT_THREAD_PREFIX)) {
if (threadName.contains(s)) {

View File

@ -20,3 +20,10 @@ recognized anymore.
The `default` `index.store.type` has been removed. If you were using it, we
advise that you simply remove it from your index settings and Elasticsearch
will use the best `store` implementation for your operating system.
==== Network settings
The blocking TCP client, blocking TCP server, and blocking HTTP server have been removed.
As a consequence, the `network.tcp.blocking_server`, `network.tcp.blocking_client`,
`network.tcp.blocking`,`transport.tcp.blocking_client`, `transport.tcp.blocking_server`,
and `http.tcp.blocking_server` settings are not recognized anymore.

View File

@ -32,7 +32,6 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpContentDecompressor;
@ -78,7 +77,6 @@ import org.elasticsearch.transport.BindTransportException;
import org.elasticsearch.transport.netty4.Netty4OpenChannelsHandler;
import org.elasticsearch.transport.netty4.Netty4Utils;
import org.elasticsearch.transport.netty4.channel.PrivilegedNioServerSocketChannel;
import org.elasticsearch.transport.netty4.channel.PrivilegedOioServerSocketChannel;
import java.io.IOException;
import java.net.InetAddress;
@ -135,8 +133,6 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
boolSetting("http.tcp_no_delay", NetworkService.TcpSettings.TCP_NO_DELAY, Property.NodeScope, Property.Shared);
public static final Setting<Boolean> SETTING_HTTP_TCP_KEEP_ALIVE =
boolSetting("http.tcp.keep_alive", NetworkService.TcpSettings.TCP_KEEP_ALIVE, Property.NodeScope, Property.Shared);
public static final Setting<Boolean> SETTING_HTTP_TCP_BLOCKING_SERVER =
boolSetting("http.tcp.blocking_server", NetworkService.TcpSettings.TCP_BLOCKING_SERVER, Property.NodeScope, Property.Shared);
public static final Setting<Boolean> SETTING_HTTP_TCP_REUSE_ADDRESS =
boolSetting("http.tcp.reuse_address", NetworkService.TcpSettings.TCP_REUSE_ADDRESS, Property.NodeScope, Property.Shared);
@ -174,8 +170,6 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
protected final int workerCount;
protected final boolean blockingServer;
protected final boolean pipelining;
protected final int pipeliningMaxEvents;
@ -240,7 +234,6 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
this.maxCumulationBufferCapacity = SETTING_HTTP_NETTY_MAX_CUMULATION_BUFFER_CAPACITY.get(settings);
this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
this.workerCount = SETTING_HTTP_WORKER_COUNT.get(settings);
this.blockingServer = SETTING_HTTP_TCP_BLOCKING_SERVER.get(settings);
this.port = SETTING_HTTP_PORT.get(settings);
this.bindHosts = SETTING_HTTP_BIND_HOST.get(settings).toArray(Strings.EMPTY_ARRAY);
this.publishHosts = SETTING_HTTP_PUBLISH_HOST.get(settings).toArray(Strings.EMPTY_ARRAY);
@ -293,15 +286,10 @@ public class Netty4HttpServerTransport extends AbstractLifecycleComponent implem
this.serverOpenChannels = new Netty4OpenChannelsHandler(logger);
serverBootstrap = new ServerBootstrap();
if (blockingServer) {
serverBootstrap.group(new OioEventLoopGroup(workerCount, daemonThreadFactory(settings,
HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
serverBootstrap.channel(PrivilegedOioServerSocketChannel.class);
} else {
serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
serverBootstrap.channel(PrivilegedNioServerSocketChannel.class);
}
serverBootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings,
HTTP_SERVER_WORKER_THREAD_NAME_PREFIX)));
serverBootstrap.channel(PrivilegedNioServerSocketChannel.class);
serverBootstrap.childHandler(configureServerChannelHandler());

View File

@ -58,7 +58,6 @@ public class Netty4Plugin extends Plugin implements NetworkPlugin {
Netty4HttpServerTransport.SETTING_HTTP_WORKER_COUNT,
Netty4HttpServerTransport.SETTING_HTTP_TCP_NO_DELAY,
Netty4HttpServerTransport.SETTING_HTTP_TCP_KEEP_ALIVE,
Netty4HttpServerTransport.SETTING_HTTP_TCP_BLOCKING_SERVER,
Netty4HttpServerTransport.SETTING_HTTP_TCP_REUSE_ADDRESS,
Netty4HttpServerTransport.SETTING_HTTP_TCP_SEND_BUFFER_SIZE,
Netty4HttpServerTransport.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE,

View File

@ -32,7 +32,6 @@ import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.util.concurrent.Future;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
@ -65,8 +64,6 @@ import org.elasticsearch.transport.TransportServiceAdapter;
import org.elasticsearch.transport.TransportSettings;
import org.elasticsearch.transport.netty4.channel.PrivilegedNioServerSocketChannel;
import org.elasticsearch.transport.netty4.channel.PrivilegedNioSocketChannel;
import org.elasticsearch.transport.netty4.channel.PrivilegedOioServerSocketChannel;
import org.elasticsearch.transport.netty4.channel.PrivilegedOioSocketChannel;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -193,13 +190,8 @@ public class Netty4Transport extends TcpTransport<Channel> {
private Bootstrap createBootstrap() {
final Bootstrap bootstrap = new Bootstrap();
if (TCP_BLOCKING_CLIENT.get(settings)) {
bootstrap.group(new OioEventLoopGroup(1, daemonThreadFactory(settings, TRANSPORT_CLIENT_WORKER_THREAD_NAME_PREFIX)));
bootstrap.channel(PrivilegedOioSocketChannel.class);
} else {
bootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)));
bootstrap.channel(PrivilegedNioSocketChannel.class);
}
bootstrap.group(new NioEventLoopGroup(workerCount, daemonThreadFactory(settings, TRANSPORT_CLIENT_BOSS_THREAD_NAME_PREFIX)));
bootstrap.channel(PrivilegedNioSocketChannel.class);
bootstrap.handler(getClientChannelInitializer());
@ -282,13 +274,8 @@ public class Netty4Transport extends TcpTransport<Channel> {
final ServerBootstrap serverBootstrap = new ServerBootstrap();
if (TCP_BLOCKING_SERVER.get(settings)) {
serverBootstrap.group(new OioEventLoopGroup(workerCount, workerFactory));
serverBootstrap.channel(PrivilegedOioServerSocketChannel.class);
} else {
serverBootstrap.group(new NioEventLoopGroup(workerCount, workerFactory));
serverBootstrap.channel(PrivilegedNioServerSocketChannel.class);
}
serverBootstrap.group(new NioEventLoopGroup(workerCount, workerFactory));
serverBootstrap.channel(PrivilegedNioServerSocketChannel.class);
serverBootstrap.childHandler(getServerChannelInitializer(name, settings));

View File

@ -1,50 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty4.channel;
import io.netty.channel.socket.oio.OioServerSocketChannel;
import org.elasticsearch.SpecialPermission;
import java.net.ServerSocket;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.List;
/**
* Wraps netty calls to {@link ServerSocket#accept()} in {@link AccessController#doPrivileged(PrivilegedAction)} blocks.
* This is necessary to limit {@link java.net.SocketPermission} to the transport module.
*/
public class PrivilegedOioServerSocketChannel extends OioServerSocketChannel {
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(new SpecialPermission());
}
try {
return AccessController.doPrivileged((PrivilegedExceptionAction<Integer>) () -> super.doReadMessages(buf));
} catch (PrivilegedActionException e) {
throw (Exception) e.getCause();
}
}
}

View File

@ -1,54 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty4.channel;
import io.netty.channel.socket.oio.OioSocketChannel;
import org.elasticsearch.SpecialPermission;
import java.net.SocketAddress;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
/**
* Wraps netty calls to {@link java.net.Socket#connect(SocketAddress)} in
* {@link AccessController#doPrivileged(PrivilegedAction)} blocks. This is necessary to limit
* {@link java.net.SocketPermission} to the transport module.
*/
public class PrivilegedOioSocketChannel extends OioSocketChannel {
@Override
protected void doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(new SpecialPermission());
}
try {
AccessController.doPrivileged((PrivilegedExceptionAction<Void>) () -> {
super.doConnect(remoteAddress, localAddress);
return null;
});
} catch (PrivilegedActionException e) {
throw (Exception) e.getCause();
}
super.doConnect(remoteAddress, localAddress);
}
}