diff --git a/src/main/java/org/elasticsearch/action/bulk/BulkAction.java b/src/main/java/org/elasticsearch/action/bulk/BulkAction.java index 07bc5293a7d..786016927ac 100644 --- a/src/main/java/org/elasticsearch/action/bulk/BulkAction.java +++ b/src/main/java/org/elasticsearch/action/bulk/BulkAction.java @@ -48,7 +48,7 @@ public class BulkAction extends Action() { @Override public NodesInfoResponse newInstance() { @@ -378,7 +378,7 @@ public class TransportClientNodesService extends AbstractComponent { transportService.sendRequest(listedNode, ClusterStateAction.NAME, Requests.clusterStateRequest() .filterAll().filterNodes(false).local(true), - TransportRequestOptions.options().withHighType().withTimeout(pingTimeout), + TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout), new BaseTransportResponseHandler() { @Override diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index 9cea6350971..f9ef19e1f29 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -266,7 +266,7 @@ public class MasterFaultDetection extends AbstractComponent { threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this); return; } - transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withPingType().withTimeout(pingRetryTimeout), + transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), new BaseTransportResponseHandler() { @Override public MasterPingResponseResponse newInstance() { @@ -324,7 +324,7 @@ public class MasterFaultDetection extends AbstractComponent { notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout"); } else { // resend the request, not reschedule, rely on send timeout - transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withPingType().withTimeout(pingRetryTimeout), this); + transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), this); } } } diff --git a/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index e3544d3c87b..fcaf51ef566 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -200,7 +200,7 @@ public class NodesFaultDetection extends AbstractComponent { if (!running) { return; } - transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withPingType().withTimeout(pingRetryTimeout), + transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), new BaseTransportResponseHandler() { @Override public PingResponse newInstance() { @@ -248,7 +248,7 @@ public class NodesFaultDetection extends AbstractComponent { } else { // resend the request, not reschedule, rely on send timeout transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), - options().withPingType().withTimeout(pingRetryTimeout), this); + options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout), this); } } } diff --git a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index 13536a1aa3f..93fae6a0113 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -80,7 +80,7 @@ public class PublishClusterStateAction extends AbstractComponent { } public void publish(ClusterState clusterState, final Discovery.AckListener ackListener) { - publish(clusterState, new AckClusterStatePublishResponseHandler(clusterState.nodes().size()-1, ackListener)); + publish(clusterState, new AckClusterStatePublishResponseHandler(clusterState.nodes().size() - 1, ackListener)); } private void publish(ClusterState clusterState, final ClusterStatePublishResponseHandler publishResponseHandler) { @@ -112,7 +112,7 @@ public class PublishClusterStateAction extends AbstractComponent { } } try { - TransportRequestOptions options = TransportRequestOptions.options().withHighType().withCompress(false); + TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false); // no need to put a timeout on the options here, because we want the response to eventually be received // and not log an error if it arrives after the timeout transportService.sendRequest(node, PublishClusterStateRequestHandler.ACTION, diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java index 37aff3d7a0c..6a4f9fdeb46 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java @@ -200,7 +200,7 @@ public class RecoverySource extends AbstractComponent { indexInput.readBytes(buf, 0, toRead, false); BytesArray content = new BytesArray(buf, 0, toRead); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), name, position, len, md.checksum(), content), - TransportRequestOptions.options().withCompress(shouldCompressRequest).withLowType().withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + TransportRequestOptions.options().withCompress(shouldCompressRequest).withType(TransportRequestOptions.Type.RECOVERY).withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); readCount += toRead; } } catch (Throwable e) { @@ -299,7 +299,7 @@ public class RecoverySource extends AbstractComponent { } RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType().withTimeout(internalActionLongTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withType(TransportRequestOptions.Type.RECOVERY).withTimeout(internalActionLongTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); ops = 0; size = 0; operations.clear(); @@ -308,7 +308,7 @@ public class RecoverySource extends AbstractComponent { // send the leftover if (!operations.isEmpty()) { RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.recoveryId(), request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType().withTimeout(internalActionLongTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withType(TransportRequestOptions.Type.RECOVERY).withTimeout(internalActionLongTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } return totalOperations; } diff --git a/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java b/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java index d20e7382b37..297644c2f14 100644 --- a/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java +++ b/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java @@ -34,18 +34,21 @@ public class TransportRequestOptions { } public static enum Type { - LOW, - MED, - HIGH, + RECOVERY, + BULK, + REG, + STATE, PING; public static Type fromString(String type) { - if ("low".equalsIgnoreCase(type)) { - return LOW; - } else if ("med".equalsIgnoreCase(type)) { - return MED; - } else if ("high".equalsIgnoreCase(type)) { - return HIGH; + if ("bulk".equalsIgnoreCase(type)) { + return BULK; + } else if ("reg".equalsIgnoreCase(type)) { + return REG; + } else if ("state".equalsIgnoreCase(type)) { + return STATE; + } else if ("recovery".equalsIgnoreCase(type)) { + return RECOVERY; } else if ("ping".equalsIgnoreCase(type)) { return PING; } else { @@ -58,7 +61,7 @@ public class TransportRequestOptions { private boolean compress; - private Type type = Type.MED; + private Type type = Type.REG; public TransportRequestOptions withTimeout(long timeout) { return withTimeout(TimeValue.timeValueMillis(timeout)); @@ -79,39 +82,6 @@ public class TransportRequestOptions { return this; } - /** - * A request that requires very low latency. - */ - public TransportRequestOptions withPingType() { - this.type = Type.PING; - return this; - } - - - /** - * A channel reserved for high prio requests. - */ - public TransportRequestOptions withHighType() { - this.type = Type.HIGH; - return this; - } - - /** - * The typical requests flows go through this one. - */ - public TransportRequestOptions withMedType() { - this.type = Type.MED; - return this; - } - - /** - * Batch oriented (big payload) based requests use this one. - */ - public TransportRequestOptions withLowType() { - this.type = Type.LOW; - return this; - } - public TimeValue timeout() { return this.timeout; } diff --git a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index dc7e3ac9285..275bff900d1 100644 --- a/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -22,6 +22,7 @@ package org.elasticsearch.transport.netty; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.Version; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -126,9 +127,10 @@ public class NettyTransport extends AbstractLifecycleComponent implem final ByteSizeValue tcpReceiveBufferSize; final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory; - final int connectionsPerNodeLow; - final int connectionsPerNodeMed; - final int connectionsPerNodeHigh; + final int connectionsPerNodeRecovery; + final int connectionsPerNodeBulk; + final int connectionsPerNodeReg; + final int connectionsPerNodeState; final int connectionsPerNodePing; final ByteSizeValue maxCumulationBufferCapacity; @@ -183,11 +185,23 @@ public class NettyTransport extends AbstractLifecycleComponent implem this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress())); this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE)); this.tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE)); - this.connectionsPerNodeLow = componentSettings.getAsInt("connections_per_node.low", settings.getAsInt("transport.connections_per_node.low", 2)); - this.connectionsPerNodeMed = componentSettings.getAsInt("connections_per_node.med", settings.getAsInt("transport.connections_per_node.med", 6)); - this.connectionsPerNodeHigh = componentSettings.getAsInt("connections_per_node.high", settings.getAsInt("transport.connections_per_node.high", 1)); + this.connectionsPerNodeRecovery = componentSettings.getAsInt("connections_per_node.recovery", settings.getAsInt("transport.connections_per_node.recovery", 2)); + this.connectionsPerNodeBulk = componentSettings.getAsInt("connections_per_node.bulk", settings.getAsInt("transport.connections_per_node.bulk", 3)); + this.connectionsPerNodeReg = componentSettings.getAsInt("connections_per_node.reg", settings.getAsInt("transport.connections_per_node.reg", 6)); + this.connectionsPerNodeState = componentSettings.getAsInt("connections_per_node.high", settings.getAsInt("transport.connections_per_node.state", 1)); this.connectionsPerNodePing = componentSettings.getAsInt("connections_per_node.ping", settings.getAsInt("transport.connections_per_node.ping", 1)); + // we want to have at least 1 for reg/state/ping + if (this.connectionsPerNodeReg == 0) { + throw new ElasticSearchIllegalArgumentException("can't set [connection_per_node.reg] to 0"); + } + if (this.connectionsPerNodePing == 0) { + throw new ElasticSearchIllegalArgumentException("can't set [connection_per_node.ping] to 0"); + } + if (this.connectionsPerNodeState == 0) { + throw new ElasticSearchIllegalArgumentException("can't set [connection_per_node.state] to 0"); + } + this.maxCumulationBufferCapacity = componentSettings.getAsBytesSize("max_cumulation_buffer_capacity", null); this.maxCompositeBufferComponents = componentSettings.getAsInt("max_composite_buffer_components", -1); @@ -207,8 +221,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes()); } - logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}], receive_predictor[{}->{}]", - workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeLow, connectionsPerNodeMed, connectionsPerNodeHigh, connectionsPerNodePing, receivePredictorMin, receivePredictorMax); + logger.debug("using worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], connect_timeout[{}], connections_per_node[{}/{}/{}/{}/{}], receive_predictor[{}->{}]", + workerCount, port, bindHost, publishHost, compress, connectTimeout, connectionsPerNodeRecovery, connectionsPerNodeBulk, connectionsPerNodeReg, connectionsPerNodeState, connectionsPerNodePing, receivePredictorMin, receivePredictorMax); } public Settings settings() { @@ -603,7 +617,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem if (light) { nodeChannels = connectToChannelsLight(node); } else { - nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh], new Channel[connectionsPerNodePing]); + nodeChannels = new NodeChannels(new Channel[connectionsPerNodeRecovery], new Channel[connectionsPerNodeBulk], new Channel[connectionsPerNodeReg], new Channel[connectionsPerNodeState], new Channel[connectionsPerNodePing]); try { connectToChannels(nodeChannels, node); } catch (Exception e) { @@ -646,54 +660,67 @@ public class NettyTransport extends AbstractLifecycleComponent implem Channel[] channels = new Channel[1]; channels[0] = connect.getChannel(); channels[0].getCloseFuture().addListener(new ChannelCloseListener(node)); - return new NodeChannels(channels, channels, channels, channels); + return new NodeChannels(channels, channels, channels, channels, channels); } private void connectToChannels(NodeChannels nodeChannels, DiscoveryNode node) { - ChannelFuture[] connectLow = new ChannelFuture[nodeChannels.low.length]; - ChannelFuture[] connectMed = new ChannelFuture[nodeChannels.med.length]; - ChannelFuture[] connectHigh = new ChannelFuture[nodeChannels.high.length]; + ChannelFuture[] connectRecovery = new ChannelFuture[nodeChannels.recovery.length]; + ChannelFuture[] connectBulk = new ChannelFuture[nodeChannels.bulk.length]; + ChannelFuture[] connectReg = new ChannelFuture[nodeChannels.reg.length]; + ChannelFuture[] connectState = new ChannelFuture[nodeChannels.state.length]; ChannelFuture[] connectPing = new ChannelFuture[nodeChannels.ping.length]; InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address(); - for (int i = 0; i < connectLow.length; i++) { - connectLow[i] = clientBootstrap.connect(address); + for (int i = 0; i < connectRecovery.length; i++) { + connectRecovery[i] = clientBootstrap.connect(address); } - for (int i = 0; i < connectMed.length; i++) { - connectMed[i] = clientBootstrap.connect(address); + for (int i = 0; i < connectBulk.length; i++) { + connectBulk[i] = clientBootstrap.connect(address); } - for (int i = 0; i < connectHigh.length; i++) { - connectHigh[i] = clientBootstrap.connect(address); + for (int i = 0; i < connectReg.length; i++) { + connectReg[i] = clientBootstrap.connect(address); + } + for (int i = 0; i < connectState.length; i++) { + connectState[i] = clientBootstrap.connect(address); } for (int i = 0; i < connectPing.length; i++) { connectPing[i] = clientBootstrap.connect(address); } try { - for (int i = 0; i < connectLow.length; i++) { - connectLow[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); - if (!connectLow[i].isSuccess()) { - throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectLow[i].getCause()); + for (int i = 0; i < connectRecovery.length; i++) { + connectRecovery[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); + if (!connectRecovery[i].isSuccess()) { + throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectRecovery[i].getCause()); } - nodeChannels.low[i] = connectLow[i].getChannel(); - nodeChannels.low[i].getCloseFuture().addListener(new ChannelCloseListener(node)); + nodeChannels.recovery[i] = connectRecovery[i].getChannel(); + nodeChannels.recovery[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } - for (int i = 0; i < connectMed.length; i++) { - connectMed[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); - if (!connectMed[i].isSuccess()) { - throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectMed[i].getCause()); + for (int i = 0; i < connectBulk.length; i++) { + connectBulk[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); + if (!connectBulk[i].isSuccess()) { + throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectBulk[i].getCause()); } - nodeChannels.med[i] = connectMed[i].getChannel(); - nodeChannels.med[i].getCloseFuture().addListener(new ChannelCloseListener(node)); + nodeChannels.bulk[i] = connectBulk[i].getChannel(); + nodeChannels.bulk[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } - for (int i = 0; i < connectHigh.length; i++) { - connectHigh[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); - if (!connectHigh[i].isSuccess()) { - throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectHigh[i].getCause()); + for (int i = 0; i < connectReg.length; i++) { + connectReg[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); + if (!connectReg[i].isSuccess()) { + throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectReg[i].getCause()); } - nodeChannels.high[i] = connectHigh[i].getChannel(); - nodeChannels.high[i].getCloseFuture().addListener(new ChannelCloseListener(node)); + nodeChannels.reg[i] = connectReg[i].getChannel(); + nodeChannels.reg[i].getCloseFuture().addListener(new ChannelCloseListener(node)); + } + + for (int i = 0; i < connectState.length; i++) { + connectState[i].awaitUninterruptibly((long) (connectTimeout.millis() * 1.5)); + if (!connectState[i].isSuccess()) { + throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectState[i].getCause()); + } + nodeChannels.state[i] = connectState[i].getChannel(); + nodeChannels.state[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } for (int i = 0; i < connectPing.length; i++) { @@ -705,37 +732,19 @@ public class NettyTransport extends AbstractLifecycleComponent implem nodeChannels.ping[i].getCloseFuture().addListener(new ChannelCloseListener(node)); } - if (nodeChannels.low.length == 0) { - if (nodeChannels.med.length > 0) { - nodeChannels.low = nodeChannels.med; + if (nodeChannels.recovery.length == 0) { + if (nodeChannels.bulk.length > 0) { + nodeChannels.recovery = nodeChannels.bulk; } else { - nodeChannels.low = nodeChannels.high; + nodeChannels.recovery = nodeChannels.reg; } } - if (nodeChannels.med.length == 0) { - if (nodeChannels.high.length > 0) { - nodeChannels.med = nodeChannels.high; - } else { - nodeChannels.med = nodeChannels.low; - } - } - if (nodeChannels.high.length == 0) { - if (nodeChannels.med.length > 0) { - nodeChannels.high = nodeChannels.med; - } else { - nodeChannels.high = nodeChannels.low; - } - } - if (nodeChannels.ping.length == 0) { - if (nodeChannels.high.length > 0) { - nodeChannels.ping = nodeChannels.high; - } else { - nodeChannels.ping = nodeChannels.med; - } + if (nodeChannels.bulk.length == 0) { + nodeChannels.bulk = nodeChannels.reg; } } catch (RuntimeException e) { // clean the futures - for (ChannelFuture future : ImmutableList.builder().add(connectLow).add(connectMed).add(connectHigh).build()) { + for (ChannelFuture future : ImmutableList.builder().add(connectRecovery).add(connectBulk).add(connectReg).add(connectState).add(connectPing).build()) { future.cancel(); if (future.getChannel() != null && future.getChannel().isOpen()) { try { @@ -843,24 +852,27 @@ public class NettyTransport extends AbstractLifecycleComponent implem public static class NodeChannels { - private Channel[] low; - private final AtomicInteger lowCounter = new AtomicInteger(); - private Channel[] med; - private final AtomicInteger medCounter = new AtomicInteger(); - private Channel[] high; - private final AtomicInteger highCounter = new AtomicInteger(); + private Channel[] recovery; + private final AtomicInteger recoveryCounter = new AtomicInteger(); + private Channel[] bulk; + private final AtomicInteger bulkCounter = new AtomicInteger(); + private Channel[] reg; + private final AtomicInteger regCounter = new AtomicInteger(); + private Channel[] state; + private final AtomicInteger stateCounter = new AtomicInteger(); private Channel[] ping; private final AtomicInteger pingCounter = new AtomicInteger(); - public NodeChannels(Channel[] low, Channel[] med, Channel[] high, Channel[] ping) { - this.low = low; - this.med = med; - this.high = high; + public NodeChannels(Channel[] recovery, Channel[] bulk, Channel[] reg, Channel[] state, Channel[] ping) { + this.recovery = recovery; + this.bulk = bulk; + this.reg = reg; + this.state = state; this.ping = ping; } public boolean hasChannel(Channel channel) { - return hasChannel(channel, low) || hasChannel(channel, med) || hasChannel(channel, high) || hasChannel(channel, ping); + return hasChannel(channel, recovery) || hasChannel(channel, bulk) || hasChannel(channel, reg) || hasChannel(channel, state) || hasChannel(channel, ping); } private boolean hasChannel(Channel channel, Channel[] channels) { @@ -873,22 +885,27 @@ public class NettyTransport extends AbstractLifecycleComponent implem } public Channel channel(TransportRequestOptions.Type type) { - if (type == TransportRequestOptions.Type.MED) { - return med[Math.abs(medCounter.incrementAndGet()) % med.length]; - } else if (type == TransportRequestOptions.Type.HIGH) { - return high[Math.abs(highCounter.incrementAndGet()) % high.length]; + if (type == TransportRequestOptions.Type.REG) { + return reg[Math.abs(regCounter.incrementAndGet()) % reg.length]; + } else if (type == TransportRequestOptions.Type.STATE) { + return state[Math.abs(stateCounter.incrementAndGet()) % state.length]; } else if (type == TransportRequestOptions.Type.PING) { return ping[Math.abs(pingCounter.incrementAndGet()) % ping.length]; + } else if (type == TransportRequestOptions.Type.BULK) { + return bulk[Math.abs(bulkCounter.incrementAndGet()) % bulk.length]; + } else if (type == TransportRequestOptions.Type.RECOVERY) { + return recovery[Math.abs(recoveryCounter.incrementAndGet()) % recovery.length]; } else { - return low[Math.abs(lowCounter.incrementAndGet()) % low.length]; + throw new ElasticSearchIllegalArgumentException("no type channel for [" + type + "]"); } } public synchronized void close() { List futures = new ArrayList(); - closeChannelsAndWait(low, futures); - closeChannelsAndWait(med, futures); - closeChannelsAndWait(high, futures); + closeChannelsAndWait(recovery, futures); + closeChannelsAndWait(bulk, futures); + closeChannelsAndWait(reg, futures); + closeChannelsAndWait(state, futures); closeChannelsAndWait(ping, futures); for (ChannelFuture future : futures) { future.awaitUninterruptibly(); diff --git a/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java b/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java index bc7c46bb575..3b246eded80 100644 --- a/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java +++ b/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java @@ -87,7 +87,7 @@ public class BenchmarkNettyLargeMessages { public void run() { for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) { BenchmarkMessageRequest message = new BenchmarkMessageRequest(1, payload); - transportServiceClient.submitRequest(bigNode, "benchmark", message, options().withLowType(), new BaseTransportResponseHandler() { + transportServiceClient.submitRequest(bigNode, "benchmark", message, options().withType(TransportRequestOptions.Type.BULK), new BaseTransportResponseHandler() { @Override public BenchmarkMessageResponse newInstance() { return new BenchmarkMessageResponse(); @@ -119,7 +119,7 @@ public class BenchmarkNettyLargeMessages { for (int i = 0; i < 1; i++) { BenchmarkMessageRequest message = new BenchmarkMessageRequest(2, BytesRef.EMPTY_BYTES); long start = System.currentTimeMillis(); - transportServiceClient.submitRequest(smallNode, "benchmark", message, options().withHighType(), new BaseTransportResponseHandler() { + transportServiceClient.submitRequest(smallNode, "benchmark", message, options().withType(TransportRequestOptions.Type.STATE), new BaseTransportResponseHandler() { @Override public BenchmarkMessageResponse newInstance() { return new BenchmarkMessageResponse();