Transport: Increase the default transport.tcp.connect_timeout from 1s to 30s, also add `network.tcp.connect_timeout` to conform with other common network settings, closes #576.

This commit is contained in:
kimchy 2010-12-23 14:20:11 +02:00
parent 5c338b7af2
commit 6dcc04b59c
3 changed files with 75 additions and 14 deletions

View File

@ -26,12 +26,14 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
/**
* @author kimchy (shay.banon)
@ -53,9 +55,11 @@ public class NetworkService extends AbstractComponent {
public static final String TCP_BLOCKING = "network.tcp.blocking";
public static final String TCP_BLOCKING_SERVER = "network.tcp.blocking_server";
public static final String TCP_BLOCKING_CLIENT = "network.tcp.blocking_client";
public static final String TCP_CONNECT_TIMEOUT = "network.tcp.connect_timeout";
public static ByteSizeValue TCP_DEFAULT_SEND_BUFFER_SIZE = new ByteSizeValue(32, ByteSizeUnit.KB);
public static ByteSizeValue TCP_DEFAULT_RECEIVE_BUFFER_SIZE = new ByteSizeValue(32, ByteSizeUnit.KB);
public static TimeValue TCP_DEFAULT_CONNECT_TIMEOUT = new TimeValue(30, TimeUnit.SECONDS);
}
public static interface CustomNameResolver {

View File

@ -23,6 +23,7 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableList;
import org.elasticsearch.common.collect.Lists;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
@ -68,7 +69,6 @@ import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.common.network.NetworkService.TcpSettings.*;
import static org.elasticsearch.common.settings.ImmutableSettings.Builder.*;
import static org.elasticsearch.common.transport.NetworkExceptionHelper.*;
import static org.elasticsearch.common.unit.TimeValue.*;
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*;
import static org.elasticsearch.common.util.concurrent.EsExecutors.*;
@ -159,7 +159,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
this.bindHost = componentSettings.get("bind_host");
this.publishHost = componentSettings.get("publish_host");
this.compress = settings.getAsBoolean("transport.tcp.compress", false);
this.connectTimeout = componentSettings.getAsTime("connect_timeout", settings.getAsTime("transport.tcp.connect_timeout", timeValueSeconds(1)));
this.connectTimeout = componentSettings.getAsTime("connect_timeout", settings.getAsTime("transport.tcp.connect_timeout", settings.getAsTime(TCP_CONNECT_TIMEOUT, TCP_DEFAULT_CONNECT_TIMEOUT)));
this.tcpNoDelay = componentSettings.getAsBoolean("tcp_no_delay", settings.getAsBoolean(TCP_NO_DELAY, true));
this.tcpKeepAlive = componentSettings.getAsBoolean("tcp_keep_alive", settings.getAsBoolean(TCP_KEEP_ALIVE, true));
this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress()));
@ -449,9 +449,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh]);
try {
connectToChannels(nodeChannels.high, node);
connectToChannels(nodeChannels.med, node);
connectToChannels(nodeChannels.low, node);
connectToChannels(nodeChannels, node);
} catch (Exception e) {
nodeChannels.close();
throw e;
@ -464,21 +462,68 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
}
}
transportServiceAdapter.raiseNodeConnected(node);
} catch (ConnectTransportException e) {
throw e;
} catch (Exception e) {
throw new ConnectTransportException(node, "General node connection failure", e);
}
}
private void connectToChannels(Channel[] channels, DiscoveryNode node) {
for (int i = 0; i < channels.length; i++) {
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
ChannelFuture connectFuture = clientBootstrap.connect(address);
connectFuture.awaitUninterruptibly((long) (connectTimeout.millis() * 1.25));
if (!connectFuture.isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectFuture.getCause());
private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
ChannelFuture[] connectLow = new ChannelFuture[nodeChannels.low.length];
ChannelFuture[] connectMed = new ChannelFuture[nodeChannels.med.length];
ChannelFuture[] connectHigh = new ChannelFuture[nodeChannels.high.length];
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
for (int i = 0; i < connectLow.length; i++) {
connectLow[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectMed.length; i++) {
connectMed[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectHigh.length; i++) {
connectHigh[i] = clientBootstrap.connect(address);
}
try {
for (int i = 0; i < connectLow.length; i++) {
connectLow[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectLow[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectLow[i].getCause());
}
nodeChannels.low[i] = connectLow[i].getChannel();
nodeChannels.low[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
channels[i] = connectFuture.getChannel();
channels[i].getCloseFuture().addListener(new ChannelCloseListener(node));
for (int i = 0; i < connectMed.length; i++) {
connectMed[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectMed[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectMed[i].getCause());
}
nodeChannels.med[i] = connectMed[i].getChannel();
nodeChannels.med[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
for (int i = 0; i < connectHigh.length; i++) {
connectHigh[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectHigh[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectHigh[i].getCause());
}
nodeChannels.high[i] = connectHigh[i].getChannel();
nodeChannels.high[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
} catch (RuntimeException e) {
// clean the futures
for (ChannelFuture future : ImmutableList.<ChannelFuture>builder().add(connectLow).add(connectMed).add(connectHigh).build()) {
future.cancel();
if (future.getChannel() != null && future.getChannel().isOpen()) {
try {
future.getChannel().close();
} catch (Exception e1) {
// ignore
}
}
}
throw e;
}
}

View File

@ -20,7 +20,9 @@
package org.elasticsearch.transport.netty;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.AbstractSimpleTransportTests;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportService;
import org.testng.annotations.Test;
@ -36,4 +38,14 @@ public class SimpleNettyTransportTests extends AbstractSimpleTransportTests {
serviceB = new TransportService(settingsBuilder().put("name", "B").build(), new NettyTransport(settingsBuilder().put("name", "B").build(), threadPool), threadPool, timerService).start();
serviceBNode = new DiscoveryNode("B", serviceB.boundAddress().publishAddress());
}
@Test public void testConnectException() {
try {
serviceA.connectToNode(new DiscoveryNode("C", new InetSocketTransportAddress("localhost", 9876)));
assert false;
} catch (ConnectTransportException e) {
// e.printStackTrace();
// all is well
}
}
}