allow to configure tcp network modules (transport, memcached and http) to use blocking io (just enabling it to do some performance tests)

This commit is contained in:
kimchy 2010-06-29 21:07:38 +03:00
parent 01ff81fa89
commit 3f6cd46736
8 changed files with 64 additions and 75 deletions

View File

@ -48,6 +48,9 @@ public class NetworkService extends AbstractComponent {
public static final String TCP_REUSE_ADDRESS = "network.tcp.reuse_address";
public static final String TCP_SEND_BUFFER_SIZE = "network.tcp.send_buffer_size";
public static final String TCP_RECEIVE_BUFFER_SIZE = "network.tcp.receive_buffer_size";
public static final String TCP_BLOCKING = "network.tcp.blocking";
public static final String TCP_BLOCKING_SERVER = "network.tcp.blocking_server";
public static final String TCP_BLOCKING_CLIENT = "network.tcp.blocking_client";
}
public static interface CustomNameResolver {

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.netty.OpenChannelsHandler;
import org.elasticsearch.common.netty.bootstrap.ServerBootstrap;
import org.elasticsearch.common.netty.channel.*;
import org.elasticsearch.common.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.elasticsearch.common.netty.channel.socket.oio.OioServerSocketChannelFactory;
import org.elasticsearch.common.netty.handler.codec.http.HttpChunkAggregator;
import org.elasticsearch.common.netty.handler.codec.http.HttpRequestDecoder;
import org.elasticsearch.common.netty.handler.codec.http.HttpResponseEncoder;
@ -73,6 +74,8 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
private final int workerCount;
private final boolean blockingServer;
private final String port;
private final String bindHost;
@ -104,6 +107,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
this.networkService = networkService;
ByteSizeValue maxContentLength = componentSettings.getAsBytesSize("max_content_length", settings.getAsBytesSize("http.max_content_length", new ByteSizeValue(100, ByteSizeUnit.MB)));
this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors());
this.blockingServer = componentSettings.getAsBoolean("http.blocking_server", componentSettings.getAsBoolean(TCP_BLOCKING_SERVER, componentSettings.getAsBoolean(TCP_BLOCKING, false)));
this.port = componentSettings.get("port", settings.get("http.port", "9200-9300"));
this.bindHost = componentSettings.get("bind_host");
this.publishHost = componentSettings.get("publish_host");
@ -128,10 +132,17 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
@Override protected void doStart() throws ElasticSearchException {
this.serverOpenChannels = new OpenChannelsHandler();
serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "httpBoss")),
Executors.newCachedThreadPool(daemonThreadFactory(settings, "httpIoWorker")),
workerCount));
if (blockingServer) {
serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "http_server_boss")),
Executors.newCachedThreadPool(daemonThreadFactory(settings, "http_server_worker"))
));
} else {
serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "http_server_boss")),
Executors.newCachedThreadPool(daemonThreadFactory(settings, "http_server_worker")),
workerCount));
}
final HttpRequestHandler requestHandler = new HttpRequestHandler(this);

View File

@ -37,6 +37,8 @@ import org.elasticsearch.common.netty.buffer.ChannelBuffers;
import org.elasticsearch.common.netty.channel.*;
import org.elasticsearch.common.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.elasticsearch.common.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.elasticsearch.common.netty.channel.socket.oio.OioClientSocketChannelFactory;
import org.elasticsearch.common.netty.channel.socket.oio.OioServerSocketChannelFactory;
import org.elasticsearch.common.netty.logging.InternalLogger;
import org.elasticsearch.common.netty.logging.InternalLoggerFactory;
import org.elasticsearch.common.network.NetworkService;
@ -88,6 +90,10 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
final int workerCount;
final boolean blockingServer;
final boolean blockingClient;
final String port;
final String bindHost;
@ -138,6 +144,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
this.networkService = networkService;
this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors());
this.blockingServer = componentSettings.getAsBoolean("transport.tcp.blocking_server", componentSettings.getAsBoolean(TCP_BLOCKING_SERVER, componentSettings.getAsBoolean(TCP_BLOCKING, false)));
this.blockingClient = componentSettings.getAsBoolean("transport.tcp.blocking_client", componentSettings.getAsBoolean(TCP_BLOCKING_CLIENT, componentSettings.getAsBoolean(TCP_BLOCKING, false)));
this.port = componentSettings.get("port", settings.get("transport.tcp.port", "9300-9400"));
this.bindHost = componentSettings.get("bind_host");
this.publishHost = componentSettings.get("publish_host");
@ -166,10 +174,14 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
@Override protected void doStart() throws ElasticSearchException {
clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transportClientBoss")),
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transportClientIoWorker")),
workerCount));
if (blockingClient) {
clientBootstrap = new ClientBootstrap(new OioClientSocketChannelFactory(Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker"))));
} else {
clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_boss")),
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_client_worker")),
workerCount));
}
ChannelPipelineFactory clientPipelineFactory = new ChannelPipelineFactory() {
@Override public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
@ -201,10 +213,17 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
serverOpenChannels = new OpenChannelsHandler();
serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transportServerBoss")),
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transportServerIoWorker")),
workerCount));
if (blockingServer) {
serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_boss")),
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_worker"))
));
} else {
serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_boss")),
Executors.newCachedThreadPool(daemonThreadFactory(settings, "transport_server_worker")),
workerCount));
}
ChannelPipelineFactory serverPipelineFactory = new ChannelPipelineFactory() {
@Override public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();

View File

@ -54,6 +54,7 @@ public class BenchmarkNettyClient {
Settings settings = ImmutableSettings.settingsBuilder()
.put("network.server", false)
.put("network.tcp.blocking", false)
.build();
final ThreadPool threadPool = new CachedThreadPool(settings);

View File

@ -54,6 +54,7 @@ public class BenchmarkNettyClientBlocking {
Settings settings = ImmutableSettings.settingsBuilder()
.put("network.server", false)
.put("network.tcp.blocking", false)
.build();
final ThreadPool threadPool = new CachedThreadPool(settings);

View File

@ -39,6 +39,7 @@ public class BenchmarkNettyServer {
Settings settings = ImmutableSettings.settingsBuilder()
.put("transport.netty.port", 9999)
.put("network.tcp.blocking", false)
.build();
final ThreadPool threadPool = new CachedThreadPool(settings);

View File

@ -22,12 +22,14 @@ package org.elasticsearch.memcached.netty;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.netty.OpenChannelsHandler;
import org.elasticsearch.common.netty.bootstrap.ServerBootstrap;
import org.elasticsearch.common.netty.channel.Channel;
import org.elasticsearch.common.netty.channel.ChannelPipeline;
import org.elasticsearch.common.netty.channel.ChannelPipelineFactory;
import org.elasticsearch.common.netty.channel.Channels;
import org.elasticsearch.common.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.elasticsearch.common.netty.channel.socket.oio.OioServerSocketChannelFactory;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.network.NetworkUtils;
import org.elasticsearch.common.settings.Settings;
@ -60,6 +62,8 @@ public class NettyMemcachedServerTransport extends AbstractLifecycleComponent<Me
private final int workerCount;
private final boolean blockingServer;
private final String port;
private final String bindHost;
@ -90,6 +94,7 @@ public class NettyMemcachedServerTransport extends AbstractLifecycleComponent<Me
this.networkService = networkService;
this.workerCount = componentSettings.getAsInt("worker_count", Runtime.getRuntime().availableProcessors());
this.blockingServer = componentSettings.getAsBoolean("memcached.blocking_server", componentSettings.getAsBoolean(TCP_BLOCKING_SERVER, componentSettings.getAsBoolean(TCP_BLOCKING, false)));
this.port = componentSettings.get("port", settings.get("memcached.port", "11211-11311"));
this.bindHost = componentSettings.get("bind_host");
this.publishHost = componentSettings.get("publish_host");
@ -107,10 +112,17 @@ public class NettyMemcachedServerTransport extends AbstractLifecycleComponent<Me
@Override protected void doStart() throws ElasticSearchException {
this.serverOpenChannels = new OpenChannelsHandler();
serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "memcachedBoss")),
Executors.newCachedThreadPool(daemonThreadFactory(settings, "memcachedIoWorker")),
workerCount));
if (blockingServer) {
serverBootstrap = new ServerBootstrap(new OioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "memcached_server_boss")),
Executors.newCachedThreadPool(daemonThreadFactory(settings, "memcached_server_worker"))
));
} else {
serverBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(daemonThreadFactory(settings, "memcached_server_boss")),
Executors.newCachedThreadPool(daemonThreadFactory(settings, "memcached_server_worker")),
workerCount));
}
ChannelPipelineFactory pipelineFactory = new ChannelPipelineFactory() {
@Override public ChannelPipeline getPipeline() throws Exception {

View File

@ -1,59 +0,0 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.memcached.netty;
import org.elasticsearch.common.netty.channel.*;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import java.util.Set;
/**
* @author kimchy (shay.banon)
*/
@ChannelHandler.Sharable
public class OpenChannelsHandler implements ChannelUpstreamHandler {
private Set<Channel> openChannels = ConcurrentCollections.newConcurrentSet();
private final ChannelFutureListener remover = new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
openChannels.remove(future.getChannel());
}
};
@Override public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof ChannelStateEvent) {
ChannelStateEvent evt = (ChannelStateEvent) e;
if (evt.getState() == ChannelState.OPEN) {
boolean added = openChannels.add(ctx.getChannel());
if (added) {
ctx.getChannel().getCloseFuture().addListener(remover);
}
}
}
ctx.sendUpstream(e);
}
public void close() {
for (Channel channel : openChannels) {
channel.close().awaitUninterruptibly();
}
}
}