From 35b573ff24c1462de77252f53cd60b4dcab23eda Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Wed, 23 Oct 2013 15:55:27 -0700 Subject: [PATCH] Transport: Have a separate channel for recovery Have a separate channel for recovery, so it won't overflow the "low" channel which is also used for bulk indexing. Also, rename the channel names to be more descriptive. Change low to bulk (for bulk based operations, currently just bulk indexing), med to reg (for "regular" operations), and high to state (for state based communication). The new channel for recovery will be named recovery, and the ping channel will remain the same. closes #3954 --- .../elasticsearch/action/bulk/BulkAction.java | 2 +- .../TransportClientNodesService.java | 4 +- .../zen/fd/MasterFaultDetection.java | 4 +- .../discovery/zen/fd/NodesFaultDetection.java | 4 +- .../publish/PublishClusterStateAction.java | 4 +- .../indices/recovery/RecoverySource.java | 6 +- .../transport/TransportRequestOptions.java | 56 ++---- .../transport/netty/NettyTransport.java | 179 ++++++++++-------- .../BenchmarkNettyLargeMessages.java | 4 +- 9 files changed, 125 insertions(+), 138 deletions(-) diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkAction.java b/src/main/java/org/elasticsearch/action/bulk/BulkAction.java index 07bc5293a7d..786016927ac 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkAction.java @@ -48,7 +48,7 @@ public class BulkAction extends Action() { @Override public NodesInfoResponse newInstance() { @@ -378,7 +378,7 @@ public class TransportClientNodesService extends AbstractComponent { transportService.sendRequest(listedNode, ClusterStateAction.NAME, Requests.clusterStateRequest() .filterAll().filterNodes(false).local(true), - TransportRequestOptions.options().withHighType().withTimeout(pingTimeout), + TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout), new BaseTransportResponseHandler() { @Override diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index 9cea6350971..f9ef19e1f29 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -266,7 +266,7 @@ public class MasterFaultDetection extends AbstractComponent { threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this); return; } - transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withPingType().withTimeout(pingRetryTimeout), + transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), new BaseTransportResponseHandler() { @Override public MasterPingResponseResponse newInstance() { @@ -324,7 +324,7 @@ public class MasterFaultDetection extends AbstractComponent { notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout"); } else { // resend the request, not reschedule, rely on send timeout - transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withPingType().withTimeout(pingRetryTimeout), this); + transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), this); } } } diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index e3544d3c87b..fcaf51ef566 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -200,7 +200,7 @@ public class NodesFaultDetection extends AbstractComponent { if (!running) { return; } - transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withPingType().withTimeout(pingRetryTimeout), + transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), new BaseTransportResponseHandler() { @Override public PingResponse newInstance() { @@ -248,7 +248,7 @@ public class NodesFaultDetection extends AbstractComponent { } else { // resend the request, not reschedule, rely on send timeout transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), - options().withPingType().withTimeout(pingRetryTimeout), this); + options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), this); } } } diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 13536a1aa3f..93fae6a0113 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -80,7 +80,7 @@ public class PublishClusterStateAction extends AbstractComponent { } public void publish(ClusterState clusterState, final Discovery.AckListener ackListener) { - publish(clusterState, new AckClusterStatePublishResponseHandler(clusterState.nodes().size()-1, ackListener)); + publish(clusterState, new AckClusterStatePublishResponseHandler(clusterState.nodes().size() - 1, ackListener)); } private void publish(ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) { @@ -112,7 +112,7 @@ public class PublishClusterStateAction extends AbstractComponent { } } try { - TransportRequestOptions options = TransportRequestOptions.options().withHighType().withCompress(false); + TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false); // no need to put a timeout on the options here, because we want the response to eventually be received // and not log an error if it arrives after the timeout transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java index 37aff3d7a0c..6a4f9fdeb46 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java @@ -200,7 +200,7 @@ public class RecoverySource extends AbstractComponent { indexInput.readBytes(buf, 0, toRead, false); BytesArray content = new BytesArray(buf, 0, toRead); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), name, position, len, md.checksum(), content), - TransportRequestOptions.options().withCompress(shouldCompressRequest).withLowType().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + TransportRequestOptions.options().withCompress(shouldCompressRequest).withType(TransportRequestOptions.Type.RECOVERY).withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); readCount += toRead; } } catch (Throwable e) { @@ -299,7 +299,7 @@ public class RecoverySource extends AbstractComponent { } RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType().withTimeout(internalActionLongTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withType(TransportRequestOptions.Type.RECOVERY).withTimeout(internalActionLongTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); ops = 0; size = 0; operations.clear(); @@ -308,7 +308,7 @@ public class RecoverySource extends AbstractComponent { // send the leftover if (!operations.isEmpty()) { RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType().withTimeout(internalActionLongTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withType(TransportRequestOptions.Type.RECOVERY).withTimeout(internalActionLongTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } return totalOperations; } diff --git a/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java b/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java index d20e7382b37..297644c2f14 100644 --- a/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java +++ b/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java @@ -34,18 +34,21 @@ public class TransportRequestOptions { } public static enum Type { - LOW, - MED, - HIGH, + RECOVERY, + BULK, + REG, + STATE, PING; public static Type fromString(String type) { - if ("low".equalsIgnoreCase(type)) { - return LOW; - } else if ("med".equalsIgnoreCase(type)) { - return MED; - } else if ("high".equalsIgnoreCase(type)) { - return HIGH; + if ("bulk".equalsIgnoreCase(type)) { + return BULK; + } else if ("reg".equalsIgnoreCase(type)) { + return REG; + } else if ("state".equalsIgnoreCase(type)) { + return STATE; + } else if ("recovery".equalsIgnoreCase(type)) { + return RECOVERY; } else if ("ping".equalsIgnoreCase(type)) { return PING; } else { @@ -58,7 +61,7 @@ public class TransportRequestOptions { private boolean compress; - private Type type = Type.MED; + private Type type = Type.REG; public TransportRequestOptions withTimeout(long timeout) { return withTimeout(TimeValue.timeValueMillis(timeout)); @@ -79,39 +82,6 @@ public class TransportRequestOptions { return this; } - /** - * A request that requires very low latency. - */ - public TransportRequestOptions withPingType() { - this.type = Type.PING; - return this; - } - - - /** - * A channel reserved for high prio requests. - */ - public TransportRequestOptions withHighType() { - this.type = Type.HIGH; - return this; - } - - /** - * The typical requests flows go through this one. - */ - public TransportRequestOptions withMedType() { - this.type = Type.MED; - return this; - } - - /** - * Batch oriented (big payload) based requests use this one. - */ - public TransportRequestOptions withLowType() { - this.type = Type.LOW; - return this; - } - public TimeValue timeout() { return this.timeout; } diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index dc7e3ac9285..275bff900d1 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -22,6 +22,7 @@ package org.elasticsearch.transport.netty; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -126,9 +127,10 @@ public class NettyTransport extends AbstractLifecycleComponent implem final ByteSizeValue tcpReceiveBufferSize; final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory; - final int connectionsPerNodeLow; - final int connectionsPerNodeMed; - final int connectionsPerNodeHigh; + final int connectionsPerNodeRecovery; + final int connectionsPerNodeBulk; + final int connectionsPerNodeReg; + final int connectionsPerNodeState; final int connectionsPerNodePing; final ByteSizeValue maxCumulationBufferCapacity; @@ -183,11 +185,23 @@ public class NettyTransport extends AbstractLifecycleComponent implem this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress())); this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE)); this.tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE)); - this.connectionsPerNodeLow = componentSettings.getAsInt("connections_per_node.low", settings.getAsInt("transport.connections_per_node.low", 2)); - this.connectionsPerNodeMed = componentSettings.getAsInt("connections_per_node.med", settings.getAsInt("transport.connections_per_node.med", 6)); - this.connectionsPerNodeHigh = componentSettings.getAsInt("connections_per_node.high", settings.getAsInt("transport.connections_per_node.high", 1)); + this.connectionsPerNodeRecovery = componentSettings.getAsInt("connections_per_node.recovery", settings.getAsInt("transport.connections_per_node.recovery", 2)); + this.connectionsPerNodeBulk = componentSettings.getAsInt("connections_per_node.bulk", settings.getAsInt("transport.connections_per_node.bulk", 3)); + this.connectionsPerNodeReg = componentSettings.getAsInt("connections_per_node.reg", settings.getAsInt("transport.connections_per_node.reg", 6)); + this.connectionsPerNodeState = componentSettings.getAsInt("connections_per_node.high", settings.getAsInt("transport.connections_per_node.state", 1)); this.connectionsPerNodePing = componentSettings.getAsInt("connections_per_node.ping", settings.getAsInt("transport.connections_per_node.ping", 1)); + // we want to have at least 1 for reg/state/ping + if (this.connectionsPerNodeReg == 0) { + throw new ElasticSearchIllegalArgumentException("can't set [connection_per_node.reg] to 0"); + } + if (this.connectionsPerNodePing == 0) { + throw new ElasticSearchIllegalArgumentException("can't set [connection_per_node.ping] to 0"); + } + if (this.connectionsPerNodeState == 0) { + throw new ElasticSearchIllegalArgumentException("can't set [connection_per_node.state] to 0"); + } + this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null); this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1); @@ -207,8 +221,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes()); } - logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}], receive_predictor[{}->{}]", - workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeLow, connectionsPerNodeMed, connectionsPerNodeHigh, connectionsPerNodePing, receivePredictorMin, receivePredictorMax); + logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]", + workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin, receivePredictorMax); } public Settings settings() { @@ -603,7 +617,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem if (light) { nodeChannels = connectToChannelsLight(node); } else { - nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh], new Channel[connectionsPerNodePing]); + nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]); try { connectToChannels(nodeChannels, node); } catch (Exception e) { @@ -646,54 +660,67 @@ public class NettyTransport extends AbstractLifecycleComponent implem Channel[] channels = new Channel[1]; channels[0] = connect.getChannel(); channels[0].getCloseFuture().addListener(new ChannelCloseListener(node)); - return new NodeChannels(channels, channels, channels, channels); + return new NodeChannels(channels, channels, channels, channels, channels); } private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) { - ChannelFuture[] connectLow = new ChannelFuture[nodeChannels.low.length]; - ChannelFuture[] connectMed = new ChannelFuture[nodeChannels.med.length]; - ChannelFuture[] connectHigh = new ChannelFuture[nodeChannels.high.length]; + ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length]; + ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length]; + ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length]; + ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length]; ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length]; InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address(); - for (int i = 0; i < connectLow.length; i++) { - connectLow[i] = clientBootstrap.connect(address); + for (int i = 0; i < connectRecovery.length; i++) { + connectRecovery[i] = clientBootstrap.connect(address); } - for (int i = 0; i < connectMed.length; i++) { - connectMed[i] = clientBootstrap.connect(address); + for (int i = 0; i < connectBulk.length; i++) { + connectBulk[i] = clientBootstrap.connect(address); } - for (int i = 0; i < connectHigh.length; i++) { - connectHigh[i] = clientBootstrap.connect(address); + for (int i = 0; i < connectReg.length; i++) { + connectReg[i] = clientBootstrap.connect(address); + } + for (int i = 0; i < connectState.length; i++) { + connectState[i] = clientBootstrap.connect(address); } for (int i = 0; i < connectPing.length; i++) { connectPing[i] = clientBootstrap.connect(address); } try { - for (int i = 0; i < connectLow.length; i++) { - connectLow[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); - if (!connectLow[i].isSuccess()) { - throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectLow[i].getCause()); + for (int i = 0; i < connectRecovery.length; i++) { + connectRecovery[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); + if (!connectRecovery[i].isSuccess()) { + throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectRecovery[i].getCause()); } - nodeChannels.low[i] = connectLow[i].getChannel(); - nodeChannels.low[i].getCloseFuture().addListener(new ChannelCloseListener(node)); + nodeChannels.recovery[i] = connectRecovery[i].getChannel(); + nodeChannels.recovery[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } - for (int i = 0; i < connectMed.length; i++) { - connectMed[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); - if (!connectMed[i].isSuccess()) { - throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectMed[i].getCause()); + for (int i = 0; i < connectBulk.length; i++) { + connectBulk[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); + if (!connectBulk[i].isSuccess()) { + throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectBulk[i].getCause()); } - nodeChannels.med[i] = connectMed[i].getChannel(); - nodeChannels.med[i].getCloseFuture().addListener(new ChannelCloseListener(node)); + nodeChannels.bulk[i] = connectBulk[i].getChannel(); + nodeChannels.bulk[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } - for (int i = 0; i < connectHigh.length; i++) { - connectHigh[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); - if (!connectHigh[i].isSuccess()) { - throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectHigh[i].getCause()); + for (int i = 0; i < connectReg.length; i++) { + connectReg[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); + if (!connectReg[i].isSuccess()) { + throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectReg[i].getCause()); } - nodeChannels.high[i] = connectHigh[i].getChannel(); - nodeChannels.high[i].getCloseFuture().addListener(new ChannelCloseListener(node)); + nodeChannels.reg[i] = connectReg[i].getChannel(); + nodeChannels.reg[i].getCloseFuture().addListener(new ChannelCloseListener(node)); + } + + for (int i = 0; i < connectState.length; i++) { + connectState[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); + if (!connectState[i].isSuccess()) { + throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectState[i].getCause()); + } + nodeChannels.state[i] = connectState[i].getChannel(); + nodeChannels.state[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } for (int i = 0; i < connectPing.length; i++) { @@ -705,37 +732,19 @@ public class NettyTransport extends AbstractLifecycleComponent implem nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } - if (nodeChannels.low.length == 0) { - if (nodeChannels.med.length > 0) { - nodeChannels.low = nodeChannels.med; + if (nodeChannels.recovery.length == 0) { + if (nodeChannels.bulk.length > 0) { + nodeChannels.recovery = nodeChannels.bulk; } else { - nodeChannels.low = nodeChannels.high; + nodeChannels.recovery = nodeChannels.reg; } } - if (nodeChannels.med.length == 0) { - if (nodeChannels.high.length > 0) { - nodeChannels.med = nodeChannels.high; - } else { - nodeChannels.med = nodeChannels.low; - } - } - if (nodeChannels.high.length == 0) { - if (nodeChannels.med.length > 0) { - nodeChannels.high = nodeChannels.med; - } else { - nodeChannels.high = nodeChannels.low; - } - } - if (nodeChannels.ping.length == 0) { - if (nodeChannels.high.length > 0) { - nodeChannels.ping = nodeChannels.high; - } else { - nodeChannels.ping = nodeChannels.med; - } + if (nodeChannels.bulk.length == 0) { + nodeChannels.bulk = nodeChannels.reg; } } catch (RuntimeException e) { // clean the futures - for (ChannelFuture future : ImmutableList.builder().add(connectLow).add(connectMed).add(connectHigh).build()) { + for (ChannelFuture future : ImmutableList.builder().add(connectRecovery).add(connectBulk).add(connectReg).add(connectState).add(connectPing).build()) { future.cancel(); if (future.getChannel() != null && future.getChannel().isOpen()) { try { @@ -843,24 +852,27 @@ public class NettyTransport extends AbstractLifecycleComponent implem public static class NodeChannels { - private Channel[] low; - private final AtomicInteger lowCounter = new AtomicInteger(); - private Channel[] med; - private final AtomicInteger medCounter = new AtomicInteger(); - private Channel[] high; - private final AtomicInteger highCounter = new AtomicInteger(); + private Channel[] recovery; + private final AtomicInteger recoveryCounter = new AtomicInteger(); + private Channel[] bulk; + private final AtomicInteger bulkCounter = new AtomicInteger(); + private Channel[] reg; + private final AtomicInteger regCounter = new AtomicInteger(); + private Channel[] state; + private final AtomicInteger stateCounter = new AtomicInteger(); private Channel[] ping; private final AtomicInteger pingCounter = new AtomicInteger(); - public NodeChannels(Channel[] low, Channel[] med, Channel[] high, Channel[] ping) { - this.low = low; - this.med = med; - this.high = high; + public NodeChannels(Channel[] recovery, Channel[] bulk, Channel[] reg, Channel[] state, Channel[] ping) { + this.recovery = recovery; + this.bulk = bulk; + this.reg = reg; + this.state = state; this.ping = ping; } public boolean hasChannel(Channel channel) { - return hasChannel(channel, low) || hasChannel(channel, med) || hasChannel(channel, high) || hasChannel(channel, ping); + return hasChannel(channel, recovery) || hasChannel(channel, bulk) || hasChannel(channel, reg) || hasChannel(channel, state) || hasChannel(channel, ping); } private boolean hasChannel(Channel channel, Channel[] channels) { @@ -873,22 +885,27 @@ public class NettyTransport extends AbstractLifecycleComponent implem } public Channel channel(TransportRequestOptions.Type type) { - if (type == TransportRequestOptions.Type.MED) { - return med[Math.abs(medCounter.incrementAndGet()) % med.length]; - } else if (type == TransportRequestOptions.Type.HIGH) { - return high[Math.abs(highCounter.incrementAndGet()) % high.length]; + if (type == TransportRequestOptions.Type.REG) { + return reg[Math.abs(regCounter.incrementAndGet()) % reg.length]; + } else if (type == TransportRequestOptions.Type.STATE) { + return state[Math.abs(stateCounter.incrementAndGet()) % state.length]; } else if (type == TransportRequestOptions.Type.PING) { return ping[Math.abs(pingCounter.incrementAndGet()) % ping.length]; + } else if (type == TransportRequestOptions.Type.BULK) { + return bulk[Math.abs(bulkCounter.incrementAndGet()) % bulk.length]; + } else if (type == TransportRequestOptions.Type.RECOVERY) { + return recovery[Math.abs(recoveryCounter.incrementAndGet()) % recovery.length]; } else { - return low[Math.abs(lowCounter.incrementAndGet()) % low.length]; + throw new ElasticSearchIllegalArgumentException("no type channel for [" + type + "]"); } } public synchronized void close() { List futures = new ArrayList(); - closeChannelsAndWait(low, futures); - closeChannelsAndWait(med, futures); - closeChannelsAndWait(high, futures); + closeChannelsAndWait(recovery, futures); + closeChannelsAndWait(bulk, futures); + closeChannelsAndWait(reg, futures); + closeChannelsAndWait(state, futures); closeChannelsAndWait(ping, futures); for (ChannelFuture future : futures) { future.awaitUninterruptibly(); diff --git a/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java b/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java index bc7c46bb575..3b246eded80 100644 --- a/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java +++ b/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java @@ -87,7 +87,7 @@ public class BenchmarkNettyLargeMessages { public void run() { for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) { BenchmarkMessageRequest message = new BenchmarkMessageRequest(1, payload); - transportServiceClient.submitRequest(bigNode, "benchmark", message, options().withLowType(), new BaseTransportResponseHandler() { + transportServiceClient.submitRequest(bigNode, "benchmark", message, options().withType(TransportRequestOptions.Type.BULK), new BaseTransportResponseHandler() { @Override public BenchmarkMessageResponse newInstance() { return new BenchmarkMessageResponse(); @@ -119,7 +119,7 @@ public class BenchmarkNettyLargeMessages { for (int i = 0; i < 1; i++) { BenchmarkMessageRequest message = new BenchmarkMessageRequest(2, BytesRef.EMPTY_BYTES); long start = System.currentTimeMillis(); - transportServiceClient.submitRequest(smallNode, "benchmark", message, options().withHighType(), new BaseTransportResponseHandler() { + transportServiceClient.submitRequest(smallNode, "benchmark", message, options().withType(TransportRequestOptions.Type.STATE), new BaseTransportResponseHandler() { @Override public BenchmarkMessageResponse newInstance() { return new BenchmarkMessageResponse();