diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java index 392dcd1b0db..9c0dc143343 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportBulkAction.java @@ -291,8 +291,7 @@ public class TransportBulkAction extends BaseAction { } @Override public boolean spawn() { - // no need to spawn, since in the doExecute we always execute with threaded operation set to true - return false; + return true; // spawn, we do some work here... } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 8cfc1269003..af3cec7bf48 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -74,7 +74,8 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } @Override protected TransportRequestOptions transportOptions() { - return TransportRequestOptions.options().withCompress(true); + // low type since we don't want the large bulk requests to cause high latency on typical requests + return TransportRequestOptions.options().withCompress(true).withLowType(); } @Override protected BulkShardRequest newRequestInstance() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/bulk/ClientTransportBulkAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/bulk/ClientTransportBulkAction.java index 67f64fd298e..0b7b40774f7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/bulk/ClientTransportBulkAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/bulk/ClientTransportBulkAction.java @@ -42,6 +42,6 @@ public class ClientTransportBulkAction extends BaseClientTransportAction() { @Override public MasterPingResponseResponse newInstance() { return new MasterPingResponseResponse(); @@ -296,7 +296,7 @@ public class MasterFaultDetection extends AbstractComponent { notifyMasterFailure(masterToPing, "failed to ping, tried [" + pingRetryCount + "] times, each with maximum [" + pingRetryTimeout + "] timeout"); } else { // resend the request, not reschedule, rely on send timeout - transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withTimeout(pingRetryTimeout), this); + transportService.sendRequest(masterToPing, MasterPingRequestHandler.ACTION, new MasterPingRequest(nodesProvider.nodes().localNode().id(), masterToPing.id()), options().withHighType().withTimeout(pingRetryTimeout), this); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index a0625d01270..76c60c43644 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -198,7 +198,7 @@ public class NodesFaultDetection extends AbstractComponent { if (!running) { return; } - transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withTimeout(pingRetryTimeout), + transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withHighType().withTimeout(pingRetryTimeout), new BaseTransportResponseHandler() { @Override public PingResponse newInstance() { return new PingResponse(); @@ -232,7 +232,8 @@ public class NodesFaultDetection extends AbstractComponent { } } else { // resend the request, not reschedule, rely on send timeout - transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), options().withTimeout(pingRetryTimeout), this); + transportService.sendRequest(node, PingRequestHandler.ACTION, new PingRequest(node.id()), + options().withHighType().withTimeout(pingRetryTimeout), this); } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java index 0dc990a2552..314f08f79cb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java @@ -167,7 +167,7 @@ public class RecoverySource extends AbstractComponent { long position = indexInput.getFilePointer(); indexInput.readBytes(buf, 0, toRead, false); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), buf, toRead), - TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet(); + TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE).txGet(); readCount += toRead; } indexInput.close(); @@ -258,7 +258,7 @@ public class RecoverySource extends AbstractComponent { totalOperations++; if (++counter == translogBatchSize) { RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE).txGet(); counter = 0; operations = Lists.newArrayList(); } @@ -266,7 +266,7 @@ public class RecoverySource extends AbstractComponent { // send the leftover if (!operations.isEmpty()) { RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress).withLowType(), VoidTransportResponseHandler.INSTANCE).txGet(); } return totalOperations; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java index a21813bdf21..17e3e6d4c11 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java @@ -32,10 +32,18 @@ public class TransportRequestOptions { return new TransportRequestOptions(); } + public static enum Type { + LOW, + MED, + HIGH + } + private TimeValue timeout; private boolean compress; + private Type type = Type.MED; + public TransportRequestOptions withTimeout(long timeout) { return withTimeout(TimeValue.timeValueMillis(timeout)); } @@ -50,6 +58,35 @@ public class TransportRequestOptions { return this; } + public TransportRequestOptions withType(Type type) { + this.type = type; + return this; + } + + /** + * A request that requires very low latency. Usually reserved for ping requests with very small payload. + */ + public TransportRequestOptions withHighType() { + this.type = Type.HIGH; + return this; + } + + /** + * The typical requests flows go through this one. + */ + public TransportRequestOptions withMedType() { + this.type = Type.MED; + return this; + } + + /** + * Batch oriented (big payload) based requests use this one. + */ + public TransportRequestOptions withLowType() { + this.type = Type.LOW; + return this; + } + public TimeValue timeout() { return this.timeout; } @@ -57,4 +94,8 @@ public class TransportRequestOptions { public boolean compress() { return this.compress; } + + public Type type() { + return this.type; + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index 8db96a09ac9..4b95d077657 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -62,6 +62,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.common.network.NetworkService.TcpSettings.*; @@ -72,6 +73,10 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*; import static org.elasticsearch.common.util.concurrent.EsExecutors.*; /** + * There are 3 types of connections per node, low/med/high. Low if for batch oriented APIs (like recovery or + * batch) with high payload that will cause regular request. (like search or single index) to take + * longer. Med is for the typical search / single doc index. And High is for ping type requests (like FD). + * * @author kimchy (shay.banon) */ public class NettyTransport extends AbstractLifecycleComponent implements Transport { @@ -112,6 +117,10 @@ public class NettyTransport extends AbstractLifecycleComponent implem final ByteSizeValue tcpReceiveBufferSize; + final int connectionsPerNodeLow; + final int connectionsPerNodeMed; + final int connectionsPerNodeHigh; + private final ThreadPool threadPool; private volatile OpenChannelsHandler serverOpenChannels; @@ -121,7 +130,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem private volatile ServerBootstrap serverBootstrap; // node id to actual channel - final ConcurrentMap connectedNodes = newConcurrentMap(); + final ConcurrentMap connectedNodes = newConcurrentMap(); private volatile Channel serverChannel; @@ -156,6 +165,9 @@ public class NettyTransport extends AbstractLifecycleComponent implem this.reuseAddress = componentSettings.getAsBoolean("reuse_address", settings.getAsBoolean(TCP_REUSE_ADDRESS, NetworkUtils.defaultReuseAddress())); this.tcpSendBufferSize = componentSettings.getAsBytesSize("tcp_send_buffer_size", settings.getAsBytesSize(TCP_SEND_BUFFER_SIZE, TCP_DEFAULT_SEND_BUFFER_SIZE)); this.tcpReceiveBufferSize = componentSettings.getAsBytesSize("tcp_receive_buffer_size", settings.getAsBytesSize(TCP_RECEIVE_BUFFER_SIZE, TCP_DEFAULT_RECEIVE_BUFFER_SIZE)); + this.connectionsPerNodeLow = componentSettings.getAsInt("connections_per_node.low", 2); + this.connectionsPerNodeMed = componentSettings.getAsInt("connections_per_node.low", 7); + this.connectionsPerNodeHigh = componentSettings.getAsInt("connections_per_node.low", 1); } public Settings settings() { @@ -309,10 +321,10 @@ public class NettyTransport extends AbstractLifecycleComponent implem serverBootstrap = null; } - for (Iterator it = connectedNodes.values().iterator(); it.hasNext();) { - Channel channel = it.next(); + for (Iterator it = connectedNodes.values().iterator(); it.hasNext();) { + NodeChannels nodeChannels = it.next(); it.remove(); - closeChannel(channel); + nodeChannels.close(); } if (clientBootstrap != null) { @@ -369,8 +381,8 @@ public class NettyTransport extends AbstractLifecycleComponent implem if (isCloseConnectionException(e.getCause())) { // disconnect the node Channel channel = ctx.getChannel(); - for (Map.Entry entry : connectedNodes.entrySet()) { - if (entry.getValue().equals(channel)) { + for (Map.Entry entry : connectedNodes.entrySet()) { + if (entry.getValue().hasChannel(channel)) { disconnectFromNode(entry.getKey()); } } @@ -388,7 +400,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem } @Override public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final Streamable message, TransportRequestOptions options) throws IOException, TransportException { - Channel targetChannel = nodeChannel(node); + Channel targetChannel = nodeChannel(node, options); if (compress) { options.withCompress(true); @@ -420,30 +432,32 @@ public class NettyTransport extends AbstractLifecycleComponent implem if (!lifecycle.started()) { throw new ElasticSearchIllegalStateException("Can't add nodes to a stopped transport"); } + if (node == null) { + throw new ConnectTransportException(node, "Can't connect to a null node"); + } try { - if (node == null) { - throw new ConnectTransportException(node, "Can't connect to a null node"); - } - Channel channel = connectedNodes.get(node); - if (channel != null) { + NodeChannels nodeChannels = connectedNodes.get(node); + if (nodeChannels != null) { return; } synchronized (this) { // recheck here, within the sync block (we cache connections, so we don't care about this single sync block) - channel = connectedNodes.get(node); - if (channel != null) { + nodeChannels = connectedNodes.get(node); + if (nodeChannels != null) { return; } - InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address(); - ChannelFuture connectFuture = clientBootstrap.connect(address); - connectFuture.awaitUninterruptibly((long) (connectTimeout.millis() * 1.25)); - if (!connectFuture.isSuccess()) { - throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectFuture.getCause()); + nodeChannels = new NodeChannels(new Channel[connectionsPerNodeLow], new Channel[connectionsPerNodeMed], new Channel[connectionsPerNodeHigh]); + try { + connectToChannels(nodeChannels.high, node); + connectToChannels(nodeChannels.med, node); + connectToChannels(nodeChannels.low, node); + } catch (Exception e) { + nodeChannels.close(); + throw e; } - channel = connectFuture.getChannel(); - channel.getCloseFuture().addListener(new ChannelCloseListener(node)); - connectedNodes.put(node, channel); + + connectedNodes.put(node, nodeChannels); if (logger.isDebugEnabled()) { logger.debug("Connected to node [{}]", node); @@ -455,11 +469,24 @@ public class NettyTransport extends AbstractLifecycleComponent implem } } + private void connectToChannels(Channel[] channels, DiscoveryNode node) { + for (int i = 0; i < channels.length; i++) { + InetSocketAddress address = ((InetSocketTransportAddress) node.address()).address(); + ChannelFuture connectFuture = clientBootstrap.connect(address); + connectFuture.awaitUninterruptibly((long) (connectTimeout.millis() * 1.25)); + if (!connectFuture.isSuccess()) { + throw new ConnectTransportException(node, "connect_timeout[" + connectTimeout + "]", connectFuture.getCause()); + } + channels[i] = connectFuture.getChannel(); + channels[i].getCloseFuture().addListener(new ChannelCloseListener(node)); + } + } + @Override public void disconnectFromNode(DiscoveryNode node) { - Channel channel = connectedNodes.remove(node); - if (channel != null) { + NodeChannels nodeChannels = connectedNodes.remove(node); + if (nodeChannels != null) { try { - closeChannel(channel); + nodeChannels.close(); } finally { logger.debug("Disconnected from [{}]", node); transportServiceAdapter.raiseNodeDisconnected(node); @@ -467,18 +494,12 @@ public class NettyTransport extends AbstractLifecycleComponent implem } } - private Channel nodeChannel(DiscoveryNode node) throws ConnectTransportException { - Channel channel = connectedNodes.get(node); - if (channel == null) { + private Channel nodeChannel(DiscoveryNode node, TransportRequestOptions options) throws ConnectTransportException { + NodeChannels nodeChannels = connectedNodes.get(node); + if (nodeChannels == null) { throw new NodeNotConnectedException(node, "Node not connected"); } - return channel; - } - - private void closeChannel(Channel channel) { - if (channel.isOpen()) { - channel.close().awaitUninterruptibly(); - } + return nodeChannels.channel(options.type()); } private class ChannelCloseListener implements ChannelFutureListener { @@ -493,4 +514,61 @@ public class NettyTransport extends AbstractLifecycleComponent implem disconnectFromNode(node); } } + + public static class NodeChannels { + + private Channel[] low; + private final AtomicInteger lowCounter = new AtomicInteger(); + private Channel[] med; + private final AtomicInteger medCounter = new AtomicInteger(); + private Channel[] high; + private final AtomicInteger highCounter = new AtomicInteger(); + + public NodeChannels(Channel[] low, Channel[] med, Channel[] high) { + this.low = low; + this.med = med; + this.high = high; + } + + public boolean hasChannel(Channel channel) { + return hasChannel(channel, low) || hasChannel(channel, med) || hasChannel(channel, high); + } + + private boolean hasChannel(Channel channel, Channel[] channels) { + for (Channel channel1 : channels) { + if (channel.equals(channel1)) { + return true; + } + } + return false; + } + + public Channel channel(TransportRequestOptions.Type type) { + if (type == TransportRequestOptions.Type.MED) { + return med[Math.abs(medCounter.incrementAndGet()) % med.length]; + } else if (type == TransportRequestOptions.Type.HIGH) { + return high[Math.abs(highCounter.incrementAndGet()) % high.length]; + } else { + return low[Math.abs(lowCounter.incrementAndGet()) % low.length]; + } + } + + public void close() { + closeChannels(low); + closeChannels(med); + closeChannels(high); + } + + private void closeChannels(Channel[] channels) { + for (Channel channel : channels) { + try { + if (channel != null && channel.isOpen()) { + channel.close(); + } + } catch (Exception e) { + //ignore + } + } + } + } } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/benchmark/BenchmarkNettyLargeMessages.java b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/benchmark/BenchmarkNettyLargeMessages.java new file mode 100644 index 00000000000..d01390f96c3 --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/transport/netty/benchmark/BenchmarkNettyLargeMessages.java @@ -0,0 +1,128 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.transport.netty.benchmark; + +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Bytes; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.threadpool.cached.CachedThreadPool; +import org.elasticsearch.timer.TimerService; +import org.elasticsearch.transport.*; +import org.elasticsearch.transport.netty.NettyTransport; + +import java.util.concurrent.CountDownLatch; + +import static org.elasticsearch.transport.TransportRequestOptions.*; + +/** + * @author kimchy (shay.banon) + */ +public class BenchmarkNettyLargeMessages { + + public static void main(String[] args) throws InterruptedException { + final ByteSizeValue payloadSize = new ByteSizeValue(10, ByteSizeUnit.MB); + final int NUMBER_OF_ITERATIONS = 100000; + final int NUMBER_OF_CLIENTS = 5; + final byte[] payload = new byte[(int) payloadSize.bytes()]; + + Settings settings = ImmutableSettings.settingsBuilder() + .build(); + + final ThreadPool threadPool = new CachedThreadPool(settings); + final TimerService timerService = new TimerService(settings, threadPool); + final TransportService transportServiceServer = new TransportService(new NettyTransport(settings, threadPool), threadPool, timerService).start(); + final TransportService transportServiceClient = new TransportService(new NettyTransport(settings, threadPool), threadPool, timerService).start(); + + final DiscoveryNode bigNode = new DiscoveryNode("big", new InetSocketTransportAddress("localhost", 9300)); +// final DiscoveryNode smallNode = new DiscoveryNode("small", new InetSocketTransportAddress("localhost", 9300)); + final DiscoveryNode smallNode = bigNode; + + transportServiceClient.connectToNode(bigNode); + transportServiceClient.connectToNode(smallNode); + + transportServiceServer.registerHandler("benchmark", new BaseTransportRequestHandler() { + @Override public BenchmarkMessage newInstance() { + return new BenchmarkMessage(); + } + + @Override public void messageReceived(BenchmarkMessage request, TransportChannel channel) throws Exception { + channel.sendResponse(request); + } + + @Override public boolean spawn() { + return true; + } + }); + + final CountDownLatch latch = new CountDownLatch(NUMBER_OF_CLIENTS); + for (int i = 0; i < NUMBER_OF_CLIENTS; i++) { + new Thread(new Runnable() { + @Override public void run() { + for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) { + BenchmarkMessage message = new BenchmarkMessage(1, payload); + transportServiceClient.submitRequest(bigNode, "benchmark", message, options().withLowType(), new BaseTransportResponseHandler() { + @Override public BenchmarkMessage newInstance() { + return new BenchmarkMessage(); + } + + @Override public void handleResponse(BenchmarkMessage response) { + } + + @Override public void handleException(TransportException exp) { + exp.printStackTrace(); + } + }).txGet(); + } + latch.countDown(); + } + }).start(); + } + + new Thread(new Runnable() { + @Override public void run() { + for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) { + BenchmarkMessage message = new BenchmarkMessage(2, Bytes.EMPTY_ARRAY); + long start = System.currentTimeMillis(); + transportServiceClient.submitRequest(smallNode, "benchmark", message, options().withHighType(), new BaseTransportResponseHandler() { + @Override public BenchmarkMessage newInstance() { + return new BenchmarkMessage(); + } + + @Override public void handleResponse(BenchmarkMessage response) { + } + + @Override public void handleException(TransportException exp) { + exp.printStackTrace(); + } + }).txGet(); + long took = System.currentTimeMillis() - start; + System.out.println("Took " + took + "ms"); + } + } + }).start(); + + latch.await(); + } +}