Transport: Add a dedicated ping channel

Today, we have a low/med/high channel groups in our transport layer. High is used to publish cluster state and send ping requests. Sometimes, the overhead of publishing large cluster states can interfere with ping requests.

Introduce a new, dedicated ping channel (with size 1) to have a channel that only handles ping requests.
closes #3362
This commit is contained in:
Shay Banon 2013-07-22 10:29:57 +02:00
parent e2961c0c7a
commit 7a9350c9a1
4 changed files with 55 additions and 14 deletions

View File

@ -266,7 +266,7 @@ public class MasterFaultDetection extends AbstractComponent {
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
return;
}
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withHighType().withTimeout(pingRetryTimeout),
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withPingType().withTimeout(pingRetryTimeout),
new BaseTransportResponseHandler<MasterPingResponseResponse>() {
@Override
public MasterPingResponseResponse newInstance() {
@ -324,7 +324,7 @@ public class MasterFaultDetection extends AbstractComponent {
notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
} else {
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withHighType().withTimeout(pingRetryTimeout), this);
transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withPingType().withTimeout(pingRetryTimeout), this);
}
}
}

View File

@ -200,7 +200,7 @@ public class NodesFaultDetection extends AbstractComponent {
if (!running) {
return;
}
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withHighType().withTimeout(pingRetryTimeout),
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withPingType().withTimeout(pingRetryTimeout),
new BaseTransportResponseHandler<PingResponse>() {
@Override
public PingResponse newInstance() {
@ -248,7 +248,7 @@ public class NodesFaultDetection extends AbstractComponent {
} else {
// resend the request, not reschedule, rely on send timeout
transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()),
options().withHighType().withTimeout(pingRetryTimeout), this);
options().withPingType().withTimeout(pingRetryTimeout), this);
}
}
}

View File

@ -36,7 +36,8 @@ public class TransportRequestOptions {
public static enum Type {
LOW,
MED,
HIGH;
HIGH,
PING;
public static Type fromString(String type) {
if ("low".equalsIgnoreCase(type)) {
@ -45,6 +46,8 @@ public class TransportRequestOptions {
return MED;
} else if ("high".equalsIgnoreCase(type)) {
return HIGH;
} else if ("ping".equalsIgnoreCase(type)) {
return PING;
} else {
throw new ElasticSearchIllegalArgumentException("failed to match transport type for [" + type + "]");
}
@ -77,7 +80,16 @@ public class TransportRequestOptions {
}
/**
* A request that requires very low latency. Usually reserved for ping requests with very small payload.
* A request that requires very low latency.
*/
public TransportRequestOptions withPingType() {
this.type = Type.PING;
return this;
}
/**
* A channel reserved for high prio requests.
*/
public TransportRequestOptions withHighType() {
this.type = Type.HIGH;

View File

@ -82,9 +82,10 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.new
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
/**
* There are 3 types of connections per node, low/med/high. Low if for batch oriented APIs (like recovery or
* There are 4 types of connections per node, low/med/high/ping. Low if for batch oriented APIs (like recovery or
* batch) with high payload that will cause regular request. (like search or single index) to take
* longer. Med is for the typical search / single doc index. And High is for ping type requests (like FD).
* longer. Med is for the typical search / single doc index. And High for things like cluster state. Ping is reserved for
* sending out ping requests to other nodes.
*/
public class NettyTransport extends AbstractLifecycleComponent<Transport> implements Transport {
@ -124,6 +125,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
final int connectionsPerNodeLow;
final int connectionsPerNodeMed;
final int connectionsPerNodeHigh;
final int connectionsPerNodePing;
final ByteSizeValue maxCumulationBufferCapacity;
final int maxCompositeBufferComponents;
@ -191,6 +193,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
this.connectionsPerNodeLow = componentSettings.getAsInt("connections_per_node.low", settings.getAsInt("transport.connections_per_node.low", 2));
this.connectionsPerNodeMed = componentSettings.getAsInt("connections_per_node.med", settings.getAsInt("transport.connections_per_node.med", 6));
this.connectionsPerNodeHigh = componentSettings.getAsInt("connections_per_node.high", settings.getAsInt("transport.connections_per_node.high", 1));
this.connectionsPerNodePing = componentSettings.getAsInt("connections_per_node.ping", settings.getAsInt("transport.connections_per_node.ping", 1));
this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null);
this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1);
@ -211,8 +214,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
}
logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}], receive_predictor[{}->{}]",
workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeLow, connectionsPerNodeMed, connectionsPerNodeHigh, receivePredictorMin, receivePredictorMax);
logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}], receive_predictor[{}->{}]",
workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeLow, connectionsPerNodeMed, connectionsPerNodeHigh, connectionsPerNodePing, receivePredictorMin, receivePredictorMax);
}
public Settings settings() {
@ -605,7 +608,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
if (light) {
nodeChannels = connectToChannelsLight(node);
} else {
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh]);
nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh], new Channel[connectionsPerNodePing]);
try {
connectToChannels(nodeChannels, node);
} catch (Exception e) {
@ -646,13 +649,14 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
Channel[] channels = new Channel[1];
channels[0] = connect.getChannel();
channels[0].getCloseFuture().addListener(new ChannelCloseListener(node));
return new NodeChannels(channels, channels, channels);
return new NodeChannels(channels, channels, channels, channels);
}
private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) {
ChannelFuture[] connectLow = new ChannelFuture[nodeChannels.low.length];
ChannelFuture[] connectMed = new ChannelFuture[nodeChannels.med.length];
ChannelFuture[] connectHigh = new ChannelFuture[nodeChannels.high.length];
ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length];
InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address();
for (int i = 0; i < connectLow.length; i++) {
connectLow[i] = clientBootstrap.connect(address);
@ -663,6 +667,9 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
for (int i = 0; i < connectHigh.length; i++) {
connectHigh[i] = clientBootstrap.connect(address);
}
for (int i = 0; i < connectPing.length; i++) {
connectPing[i] = clientBootstrap.connect(address);
}
try {
for (int i = 0; i < connectLow.length; i++) {
@ -692,6 +699,15 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
nodeChannels.high[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
for (int i = 0; i < connectPing.length; i++) {
connectPing[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5));
if (!connectPing[i].isSuccess()) {
throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectPing[i].getCause());
}
nodeChannels.ping[i] = connectPing[i].getChannel();
nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node));
}
if (nodeChannels.low.length == 0) {
if (nodeChannels.med.length > 0) {
nodeChannels.low = nodeChannels.med;
@ -713,6 +729,13 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
nodeChannels.high = nodeChannels.low;
}
}
if (nodeChannels.ping.length == 0) {
if (nodeChannels.high.length > 0) {
nodeChannels.ping = nodeChannels.high;
} else {
nodeChannels.ping = nodeChannels.med;
}
}
} catch (RuntimeException e) {
// clean the futures
for (ChannelFuture future : ImmutableList.<ChannelFuture>builder().add(connectLow).add(connectMed).add(connectHigh).build()) {
@ -821,15 +844,18 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
private final AtomicInteger medCounter = new AtomicInteger();
private Channel[] high;
private final AtomicInteger highCounter = new AtomicInteger();
private Channel[] ping;
private final AtomicInteger pingCounter = new AtomicInteger();
public NodeChannels(Channel[] low, Channel[] med, Channel[] high) {
public NodeChannels(Channel[] low, Channel[] med, Channel[] high, Channel[] ping) {
this.low = low;
this.med = med;
this.high = high;
this.ping = ping;
}
public boolean hasChannel(Channel channel) {
return hasChannel(channel, low) || hasChannel(channel, med) || hasChannel(channel, high);
return hasChannel(channel, low) || hasChannel(channel, med) || hasChannel(channel, high) || hasChannel(channel, ping);
}
private boolean hasChannel(Channel channel, Channel[] channels) {
@ -846,6 +872,8 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
return med[Math.abs(medCounter.incrementAndGet()) % med.length];
} else if (type == TransportRequestOptions.Type.HIGH) {
return high[Math.abs(highCounter.incrementAndGet()) % high.length];
} else if (type == TransportRequestOptions.Type.PING) {
return ping[Math.abs(pingCounter.incrementAndGet()) % ping.length];
} else {
return low[Math.abs(lowCounter.incrementAndGet()) % low.length];
}
@ -856,6 +884,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
closeChannelsAndWait(low, futures);
closeChannelsAndWait(med, futures);
closeChannelsAndWait(high, futures);
closeChannelsAndWait(ping, futures);
for (ChannelFuture future : futures) {
future.awaitUninterruptibly();
}