Tcp Transport: Connection pool between nodes and different connection types, closes #564.

This commit is contained in:
kimchy 2010-12-15 20:11:10 +02:00
parent 085066ed19
commit d6bab1a892
9 changed files with 294 additions and 46 deletions

View File

@ -291,8 +291,7 @@ public class TransportBulkAction extends BaseAction<BulkRequest, BulkResponse> {
}
@Override public boolean spawn() {
// no need to spawn, since in the doExecute we always execute with threaded operation set to true
return false;
return true; // spawn, we do some work here...
}
}
}

View File

@ -74,7 +74,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
@Override protected TransportRequestOptions transportOptions() {
return TransportRequestOptions.options().withCompress(true);
// low type since we don't want the large bulk requests to cause high latency on typical requests
return TransportRequestOptions.options().withCompress(true).withLowType();
}
@Override protected BulkShardRequest newRequestInstance() {

View File

@ -42,6 +42,6 @@ public class ClientTransportBulkAction extends BaseClientTransportAction<BulkReq
}
@Override protected TransportRequestOptions options() {
return TransportRequestOptions.options().withCompress(true);
return TransportRequestOptions.options().withLowType().withCompress(true);
}
}

View File

@ -258,7 +258,7 @@ public class MasterFaultDetection extends AbstractComponent {
threadPool.schedule(MasterPinger.this, pingInterval);
return;
}
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withTimeout(pingRetryTimeout),
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withHighType().withTimeout(pingRetryTimeout),
new BaseTransportResponseHandler<MasterPingResponseResponse>() {
@Override public MasterPingResponseResponse newInstance() {
return new MasterPingResponseResponse();
@ -296,7 +296,7 @@ public class MasterFaultDetection extends AbstractComponent {
notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
} else {
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withTimeout(pingRetryTimeout), this);
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withHighType().withTimeout(pingRetryTimeout), this);
}
}
}

View File

@ -198,7 +198,7 @@ public class NodesFaultDetection extends AbstractComponent {
if (!running) {
return;
}
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withTimeout(pingRetryTimeout),
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withHighType().withTimeout(pingRetryTimeout),
new BaseTransportResponseHandler<PingResponse>() {
@Override public PingResponse newInstance() {
return new PingResponse();
@ -232,7 +232,8 @@ public class NodesFaultDetection extends AbstractComponent {
}
} else {
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withTimeout(pingRetryTimeout), this);
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()),
options().withHighType().withTimeout(pingRetryTimeout), this);
}
}
}

View File

@ -167,7 +167,7 @@ public class RecoverySource extends AbstractComponent {
long position = indexInput.getFilePointer();
indexInput.readBytes(buf, 0, toRead, false);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), buf, toRead),
TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet();
TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE).txGet();
readCount += toRead;
}
indexInput.close();
@ -258,7 +258,7 @@ public class RecoverySource extends AbstractComponent {
totalOperations++;
if (++counter == translogBatchSize) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE).txGet();
counter = 0;
operations = Lists.newArrayList();
}
@ -266,7 +266,7 @@ public class RecoverySource extends AbstractComponent {
// send the leftover
if (!operations.isEmpty()) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE).txGet();
}
return totalOperations;
}

View File

@ -32,10 +32,18 @@ public class TransportRequestOptions {
return new TransportRequestOptions();
}
public static enum Type {
LOW,
MED,
HIGH
}
private TimeValue timeout;
private boolean compress;
private Type type = Type.MED;
public TransportRequestOptions withTimeout(long timeout) {
return withTimeout(TimeValue.timeValueMillis(timeout));
}
@ -50,6 +58,35 @@ public class TransportRequestOptions {
return this;
}
public TransportRequestOptions withType(Type type) {
this.type = type;
return this;
}
/**
* A request that requires very low latency. Usually reserved for ping requests with very small payload.
*/
public TransportRequestOptions withHighType() {
this.type = Type.HIGH;
return this;
}
/**
* The typical requests flows go through this one.
*/
public TransportRequestOptions withMedType() {
this.type = Type.MED;
return this;
}
/**
* Batch oriented (big payload) based requests use this one.
*/
public TransportRequestOptions withLowType() {
this.type = Type.LOW;
return this;
}
public TimeValue timeout() {
return this.timeout;
}
@ -57,4 +94,8 @@ public class TransportRequestOptions {
public boolean compress() {
return this.compress;
}
public Type type() {
return this.type;
}
}

View File

@ -62,6 +62,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.*;
@ -72,6 +73,10 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*;
import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
/**
* There are 3 types of connections per node, low/med/high. Low if for batch oriented APIs (like recovery or
* batch) with high payload that will cause regular request. (like search or single index) to take
* longer. Med is for the typical search / single doc index. And High is for ping type requests (like FD).
*
* @author kimchy (shay.banon)
*/
public class NettyTransport extends AbstractLifecycleComponent<Transport> implements Transport {
@ -112,6 +117,10 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
final ByteSizeValue tcpReceiveBufferSize;
final int connectionsPerNodeLow;
final int connectionsPerNodeMed;
final int connectionsPerNodeHigh;
private final ThreadPool threadPool;
private volatile OpenChannelsHandler serverOpenChannels;
@ -121,7 +130,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
private volatile ServerBootstrap serverBootstrap;
// node id to actual channel
final ConcurrentMap<DiscoveryNode, Channel> connectedNodes = newConcurrentMap();
final ConcurrentMap<DiscoveryNode, NodeChannels> connectedNodes = newConcurrentMap();
private volatile Channel serverChannel;
@ -156,6 +165,9 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress()));
this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE));
this.tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE));
this.connectionsPerNodeLow = componentSettings.getAsInt("connections_per_node.low", 2);
this.connectionsPerNodeMed = componentSettings.getAsInt("connections_per_node.low", 7);
this.connectionsPerNodeHigh = componentSettings.getAsInt("connections_per_node.low", 1);
}
public Settings settings() {
@ -309,10 +321,10 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
serverBootstrap = null;
}
for (Iterator<Channel> it = connectedNodes.values().iterator(); it.hasNext();) {
Channel channel = it.next();
for (Iterator<NodeChannels> it = connectedNodes.values().iterator(); it.hasNext();) {
NodeChannels nodeChannels = it.next();
it.remove();
closeChannel(channel);
nodeChannels.close();
}
if (clientBootstrap != null) {
@ -369,8 +381,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
if (isCloseConnectionException(e.getCause())) {
// disconnect the node
Channel channel = ctx.getChannel();
for (Map.Entry<DiscoveryNode, Channel> entry : connectedNodes.entrySet()) {
if (entry.getValue().equals(channel)) {
for (Map.Entry<DiscoveryNode, NodeChannels> entry : connectedNodes.entrySet()) {
if (entry.getValue().hasChannel(channel)) {
disconnectFromNode(entry.getKey());
}
}
@ -388,7 +400,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
@Override public <T extends Streamable> void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException {
Channel targetChannel = nodeChannel(node);
Channel targetChannel = nodeChannel(node, options);
if (compress) {
options.withCompress(true);
@ -420,30 +432,32 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
if (!lifecycle.started()) {
throw new ElasticSearchIllegalStateException("Can't add nodes to a stopped transport");
}
try {
if (node == null) {
throw new ConnectTransportException(node, "Can't connect to a null node");
}
Channel channel = connectedNodes.get(node);
if (channel != null) {
try {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
synchronized (this) {
// recheck here, within the sync block (we cache connections, so we don't care about this single sync block)
channel = connectedNodes.get(node);
if (channel != null) {
nodeChannels = connectedNodes.get(node);
if (nodeChannels != null) {
return;
}
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
ChannelFuture connectFuture = clientBootstrap.connect(address);
connectFuture.awaitUninterruptibly((long) (connectTimeout.millis() * 1.25));
if (!connectFuture.isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectFuture.getCause());
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh]);
try {
connectToChannels(nodeChannels.high, node);
connectToChannels(nodeChannels.med, node);
connectToChannels(nodeChannels.low, node);
} catch (Exception e) {
nodeChannels.close();
throw e;
}
channel = connectFuture.getChannel();
channel.getCloseFuture().addListener(new ChannelCloseListener(node));
connectedNodes.put(node, channel);
connectedNodes.put(node, nodeChannels);
if (logger.isDebugEnabled()) {
logger.debug("Connected to node [{}]", node);
@ -455,11 +469,24 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
}
private void connectToChannels(Channel[] channels, DiscoveryNode node) {
for (int i = 0; i < channels.length; i++) {
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
ChannelFuture connectFuture = clientBootstrap.connect(address);
connectFuture.awaitUninterruptibly((long) (connectTimeout.millis() * 1.25));
if (!connectFuture.isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectFuture.getCause());
}
channels[i] = connectFuture.getChannel();
channels[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
}
@Override public void disconnectFromNode(DiscoveryNode node) {
Channel channel = connectedNodes.remove(node);
if (channel != null) {
NodeChannels nodeChannels = connectedNodes.remove(node);
if (nodeChannels != null) {
try {
closeChannel(channel);
nodeChannels.close();
} finally {
logger.debug("Disconnected from [{}]", node);
transportServiceAdapter.raiseNodeDisconnected(node);
@ -467,18 +494,12 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
}
private Channel nodeChannel(DiscoveryNode node) throws ConnectTransportException {
Channel channel = connectedNodes.get(node);
if (channel == null) {
private Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException {
NodeChannels nodeChannels = connectedNodes.get(node);
if (nodeChannels == null) {
throw new NodeNotConnectedException(node, "Node not connected");
}
return channel;
}
private void closeChannel(Channel channel) {
if (channel.isOpen()) {
channel.close().awaitUninterruptibly();
}
return nodeChannels.channel(options.type());
}
private class ChannelCloseListener implements ChannelFutureListener {
@ -493,4 +514,61 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
disconnectFromNode(node);
}
}
public static class NodeChannels {
private Channel[] low;
private final AtomicInteger lowCounter = new AtomicInteger();
private Channel[] med;
private final AtomicInteger medCounter = new AtomicInteger();
private Channel[] high;
private final AtomicInteger highCounter = new AtomicInteger();
public NodeChannels(Channel[] low, Channel[] med, Channel[] high) {
this.low = low;
this.med = med;
this.high = high;
}
public boolean hasChannel(Channel channel) {
return hasChannel(channel, low) || hasChannel(channel, med) || hasChannel(channel, high);
}
private boolean hasChannel(Channel channel, Channel[] channels) {
for (Channel channel1 : channels) {
if (channel.equals(channel1)) {
return true;
}
}
return false;
}
public Channel channel(TransportRequestOptions.Type type) {
if (type == TransportRequestOptions.Type.MED) {
return med[Math.abs(medCounter.incrementAndGet()) % med.length];
} else if (type == TransportRequestOptions.Type.HIGH) {
return high[Math.abs(highCounter.incrementAndGet()) % high.length];
} else {
return low[Math.abs(lowCounter.incrementAndGet()) % low.length];
}
}
public void close() {
closeChannels(low);
closeChannels(med);
closeChannels(high);
}
private void closeChannels(Channel[] channels) {
for (Channel channel : channels) {
try {
if (channel != null && channel.isOpen()) {
channel.close();
}
} catch (Exception e) {
//ignore
}
}
}
}
}

View File

@ -0,0 +1,128 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport.netty.benchmark;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Bytes;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.cached.CachedThreadPool;
import org.elasticsearch.timer.TimerService;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.netty.NettyTransport;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.transport.TransportRequestOptions.*;
/**
* @author kimchy (shay.banon)
*/
public class BenchmarkNettyLargeMessages {
public static void main(String[] args) throws InterruptedException {
final ByteSizeValue payloadSize = new ByteSizeValue(10, ByteSizeUnit.MB);
final int NUMBER_OF_ITERATIONS = 100000;
final int NUMBER_OF_CLIENTS = 5;
final byte[] payload = new byte[(int) payloadSize.bytes()];
Settings settings = ImmutableSettings.settingsBuilder()
.build();
final ThreadPool threadPool = new CachedThreadPool(settings);
final TimerService timerService = new TimerService(settings, threadPool);
final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool), threadPool, timerService).start();
final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool), threadPool, timerService).start();
final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress("localhost", 9300));
// final DiscoveryNode smallNode = new DiscoveryNode("small", new InetSocketTransportAddress("localhost", 9300));
final DiscoveryNode smallNode = bigNode;
transportServiceClient.connectToNode(bigNode);
transportServiceClient.connectToNode(smallNode);
transportServiceServer.registerHandler("benchmark", new BaseTransportRequestHandler<BenchmarkMessage>() {
@Override public BenchmarkMessage newInstance() {
return new BenchmarkMessage();
}
@Override public void messageReceived(BenchmarkMessage request, TransportChannel channel) throws Exception {
channel.sendResponse(request);
}
@Override public boolean spawn() {
return true;
}
});
final CountDownLatch latch = new CountDownLatch(NUMBER_OF_CLIENTS);
for (int i = 0; i < NUMBER_OF_CLIENTS; i++) {
new Thread(new Runnable() {
@Override public void run() {
for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) {
BenchmarkMessage message = new BenchmarkMessage(1, payload);
transportServiceClient.submitRequest(bigNode, "benchmark", message, options().withLowType(), new BaseTransportResponseHandler<BenchmarkMessage>() {
@Override public BenchmarkMessage newInstance() {
return new BenchmarkMessage();
}
@Override public void handleResponse(BenchmarkMessage response) {
}
@Override public void handleException(TransportException exp) {
exp.printStackTrace();
}
}).txGet();
}
latch.countDown();
}
}).start();
}
new Thread(new Runnable() {
@Override public void run() {
for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) {
BenchmarkMessage message = new BenchmarkMessage(2, Bytes.EMPTY_ARRAY);
long start = System.currentTimeMillis();
transportServiceClient.submitRequest(smallNode, "benchmark", message, options().withHighType(), new BaseTransportResponseHandler<BenchmarkMessage>() {
@Override public BenchmarkMessage newInstance() {
return new BenchmarkMessage();
}
@Override public void handleResponse(BenchmarkMessage response) {
}
@Override public void handleException(TransportException exp) {
exp.printStackTrace();
}
}).txGet();
long took = System.currentTimeMillis() - start;
System.out.println("Took " + took + "ms");
}
}
}).start();
latch.await();
}
}