diff --git a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java index b34f50de0f0..70b5b846099 100644 --- a/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java +++ b/modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Transport.java @@ -169,9 +169,8 @@ public class Netty4Transport extends TcpTransport { private void createServerBootstrap(ProfileSettings profileSettings, NioEventLoopGroup eventLoopGroup) { String name = profileSettings.profileName; if (logger.isDebugEnabled()) { - logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], compress[{}], " - + "receive_predictor[{}->{}]", - name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts, compress, + logger.debug("using profile[{}], worker_count[{}], port[{}], bind_host[{}], publish_host[{}], receive_predictor[{}->{}]", + name, workerCount, profileSettings.portOrRange, profileSettings.bindHosts, profileSettings.publishHosts, receivePredictorMin, receivePredictorMax); } diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java index bae0cb7cef9..0125b2c45c0 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/Netty4ScheduledPingTests.java @@ -38,7 +38,6 @@ import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportResponseHandler; -import org.elasticsearch.transport.TransportResponseOptions; import org.elasticsearch.transport.TransportService; import java.io.IOException; @@ -91,7 +90,7 @@ public class Netty4ScheduledPingTests extends ESTestCase { serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC, (request, channel, task) -> { try { - channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } catch (IOException e) { logger.error("Unexpected failure", e); fail(e.getMessage()); diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java index 4c651c31bee..006fbae6c42 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/SimpleNetty4TransportTests.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.node.Node; @@ -36,6 +35,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.AbstractSimpleTransportTestCase; import org.elasticsearch.transport.BindTransportException; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transport; @@ -58,9 +58,10 @@ public class SimpleNetty4TransportTests extends AbstractSimpleTransportTestCase BigArrays.NON_RECYCLING_INSTANCE, namedWriteableRegistry, new NoneCircuitBreakerService()) { @Override - public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener listener) { + public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, + ActionListener listener) { if (doHandshake) { - super.executeHandshake(node, channel, timeout, listener); + super.executeHandshake(node, channel, profile, listener); } else { listener.onResponse(version.minimumCompatibilityVersion()); } diff --git a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java index 8fc1dd04dd7..70acc2d1482 100644 --- a/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java +++ b/plugins/transport-nio/src/test/java/org/elasticsearch/transport/nio/SimpleNioTransportTests.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -37,6 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.AbstractSimpleTransportTestCase; import org.elasticsearch.transport.BindTransportException; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transport; @@ -62,9 +62,10 @@ public class SimpleNioTransportTests extends AbstractSimpleTransportTestCase { new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService()) { @Override - public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener listener) { + public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, + ActionListener listener) { if (doHandshake) { - super.executeHandshake(node, channel, timeout, listener); + super.executeHandshake(node, channel, profile, listener); } else { listener.onResponse(version.minimumCompatibilityVersion()); } diff --git a/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java b/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java index d6183655fa2..4fd03d86d95 100644 --- a/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java +++ b/server/src/main/java/org/elasticsearch/transport/ConnectionProfile.java @@ -38,40 +38,6 @@ import java.util.concurrent.atomic.AtomicInteger; */ public final class ConnectionProfile { - /** - * Builds a connection profile that is dedicated to a single channel type. Use this - * when opening single use connections - */ - public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, - @Nullable TimeValue connectTimeout, - @Nullable TimeValue handshakeTimeout) { - Builder builder = new Builder(); - builder.addConnections(1, channelType); - final EnumSet otherTypes = EnumSet.allOf(TransportRequestOptions.Type.class); - otherTypes.remove(channelType); - builder.addConnections(0, otherTypes.stream().toArray(TransportRequestOptions.Type[]::new)); - if (connectTimeout != null) { - builder.setConnectTimeout(connectTimeout); - } - if (handshakeTimeout != null) { - builder.setHandshakeTimeout(handshakeTimeout); - } - return builder.build(); - } - - private final List handles; - private final int numConnections; - private final TimeValue connectTimeout; - private final TimeValue handshakeTimeout; - - private ConnectionProfile(List handles, int numConnections, TimeValue connectTimeout, - TimeValue handshakeTimeout) { - this.handles = handles; - this.numConnections = numConnections; - this.connectTimeout = connectTimeout; - this.handshakeTimeout = handshakeTimeout; - } - /** * takes a {@link ConnectionProfile} resolves it to a fully specified (i.e., no nulls) profile */ @@ -79,7 +45,8 @@ public final class ConnectionProfile { Objects.requireNonNull(fallbackProfile); if (profile == null) { return fallbackProfile; - } else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null) { + } else if (profile.getConnectTimeout() != null && profile.getHandshakeTimeout() != null + && profile.getCompressionEnabled() != null) { return profile; } else { ConnectionProfile.Builder builder = new ConnectionProfile.Builder(profile); @@ -89,6 +56,9 @@ public final class ConnectionProfile { if (profile.getHandshakeTimeout() == null) { builder.setHandshakeTimeout(fallbackProfile.getHandshakeTimeout()); } + if (profile.getCompressionEnabled() == null) { + builder.setCompressionEnabled(fallbackProfile.getCompressionEnabled()); + } return builder.build(); } } @@ -108,6 +78,7 @@ public final class ConnectionProfile { Builder builder = new Builder(); builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); + builder.setCompressionEnabled(Transport.TRANSPORT_TCP_COMPRESS.get(settings)); builder.addConnections(connectionsPerNodeBulk, TransportRequestOptions.Type.BULK); builder.addConnections(connectionsPerNodePing, TransportRequestOptions.Type.PING); // if we are not master eligible we don't need a dedicated channel to publish the state @@ -118,13 +89,77 @@ public final class ConnectionProfile { return builder.build(); } + /** + * Builds a connection profile that is dedicated to a single channel type. Use this + * when opening single use connections + */ + public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType) { + return buildSingleChannelProfile(channelType, null, null, null); + } + + /** + * Builds a connection profile that is dedicated to a single channel type. Allows passing compression + * settings. + */ + public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, boolean compressionEnabled) { + return buildSingleChannelProfile(channelType, null, null, compressionEnabled); + } + + /** + * Builds a connection profile that is dedicated to a single channel type. Allows passing connection and + * handshake timeouts. + */ + public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, @Nullable TimeValue connectTimeout, + @Nullable TimeValue handshakeTimeout) { + return buildSingleChannelProfile(channelType, connectTimeout, handshakeTimeout, null); + } + + /** + * Builds a connection profile that is dedicated to a single channel type. Allows passing connection and + * handshake timeouts and compression settings. + */ + public static ConnectionProfile buildSingleChannelProfile(TransportRequestOptions.Type channelType, @Nullable TimeValue connectTimeout, + @Nullable TimeValue handshakeTimeout, @Nullable Boolean compressionEnabled) { + Builder builder = new Builder(); + builder.addConnections(1, channelType); + final EnumSet otherTypes = EnumSet.allOf(TransportRequestOptions.Type.class); + otherTypes.remove(channelType); + builder.addConnections(0, otherTypes.toArray(new TransportRequestOptions.Type[0])); + if (connectTimeout != null) { + builder.setConnectTimeout(connectTimeout); + } + if (handshakeTimeout != null) { + builder.setHandshakeTimeout(handshakeTimeout); + } + if (compressionEnabled != null) { + builder.setCompressionEnabled(compressionEnabled); + } + return builder.build(); + } + + private final List handles; + private final int numConnections; + private final TimeValue connectTimeout; + private final TimeValue handshakeTimeout; + private final Boolean compressionEnabled; + + private ConnectionProfile(List handles, int numConnections, TimeValue connectTimeout, + TimeValue handshakeTimeout, Boolean compressionEnabled) { + this.handles = handles; + this.numConnections = numConnections; + this.connectTimeout = connectTimeout; + this.handshakeTimeout = handshakeTimeout; + this.compressionEnabled = compressionEnabled; + } + /** * A builder to build a new {@link ConnectionProfile} */ public static class Builder { private final List handles = new ArrayList<>(); private final Set addedTypes = EnumSet.noneOf(TransportRequestOptions.Type.class); - private int offset = 0; + private int numConnections = 0; + private Boolean compressionEnabled; private TimeValue connectTimeout; private TimeValue handshakeTimeout; @@ -135,10 +170,11 @@ public final class ConnectionProfile { /** copy constructor, using another profile as a base */ public Builder(ConnectionProfile source) { handles.addAll(source.getHandles()); - offset = source.getNumConnections(); + numConnections = source.getNumConnections(); handles.forEach(th -> addedTypes.addAll(th.types)); connectTimeout = source.getConnectTimeout(); handshakeTimeout = source.getHandshakeTimeout(); + compressionEnabled = source.getCompressionEnabled(); } /** * Sets a connect timeout for this connection profile @@ -160,6 +196,13 @@ public final class ConnectionProfile { this.handshakeTimeout = handshakeTimeout; } + /** + * Sets compression enabled for this connection profile + */ + public void setCompressionEnabled(boolean compressionEnabled) { + this.compressionEnabled = compressionEnabled; + } + /** * Adds a number of connections for one or more types. Each type can only be added once. * @param numConnections the number of connections to use in the pool for the given connection types @@ -175,8 +218,8 @@ public final class ConnectionProfile { } } addedTypes.addAll(Arrays.asList(types)); - handles.add(new ConnectionTypeHandle(offset, numConnections, EnumSet.copyOf(Arrays.asList(types)))); - offset += numConnections; + handles.add(new ConnectionTypeHandle(this.numConnections, numConnections, EnumSet.copyOf(Arrays.asList(types)))); + this.numConnections += numConnections; } /** @@ -189,7 +232,8 @@ public final class ConnectionProfile { if (types.isEmpty() == false) { throw new IllegalStateException("not all types are added for this connection profile - missing types: " + types); } - return new ConnectionProfile(Collections.unmodifiableList(handles), offset, connectTimeout, handshakeTimeout); + return new ConnectionProfile(Collections.unmodifiableList(handles), numConnections, connectTimeout, handshakeTimeout, + compressionEnabled); } } @@ -208,6 +252,14 @@ public final class ConnectionProfile { return handshakeTimeout; } + /** + * Returns boolean indicating if compression is enabled or null if no explicit compression + * is set on this profile. + */ + public Boolean getCompressionEnabled() { + return compressionEnabled; + } + /** * Returns the total number of connections for this profile */ diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java index 9f53a42646b..c1961b266cb 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterConnection.java @@ -66,6 +66,8 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; +import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_COMPRESS; + /** * Represents a connection to a single remote cluster. In contrast to a local cluster a remote cluster is not joined such that the * current node is part of the cluster and it won't receive cluster state updates from the remote cluster. Remote clusters are also not @@ -86,6 +88,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo private final ConnectionProfile remoteProfile; private final ConnectedNodes connectedNodes; private final String clusterAlias; + private final boolean compress; private final int maxNumRemoteConnections; private final Predicate nodePredicate; private final ThreadPool threadPool; @@ -108,12 +111,13 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo * @param proxyAddress the proxy address */ RemoteClusterConnection(Settings settings, String clusterAlias, List> seedNodes, - TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, - Predicate nodePredicate, String proxyAddress) { + TransportService transportService, ConnectionManager connectionManager, int maxNumRemoteConnections, + Predicate nodePredicate, String proxyAddress) { this.transportService = transportService; this.maxNumRemoteConnections = maxNumRemoteConnections; this.nodePredicate = nodePredicate; this.clusterAlias = clusterAlias; + this.compress = REMOTE_CLUSTER_COMPRESS.getConcreteSettingForNamespace(clusterAlias).get(settings); ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); builder.setConnectTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); builder.setHandshakeTimeout(TransportService.TCP_CONNECT_TIMEOUT.get(settings)); @@ -122,6 +126,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo TransportRequestOptions.Type.BULK, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY); + builder.setCompressionEnabled(compress); remoteProfile = builder.build(); connectedNodes = new ConnectedNodes(clusterAlias); this.seedNodes = Collections.unmodifiableList(seedNodes); @@ -471,8 +476,8 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo }); } - private void collectRemoteNodes(Iterator> seedNodes, - final TransportService transportService, final ConnectionManager manager, ActionListener listener) { + private void collectRemoteNodes(Iterator> seedNodes, final TransportService transportService, + final ConnectionManager manager, ActionListener listener) { if (Thread.currentThread().isInterrupted()) { listener.onFailure(new InterruptedException("remote connect thread got interrupted")); } @@ -483,8 +488,9 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo logger.debug("[{}] opening connection to seed node: [{}] proxy address: [{}]", clusterAlias, seedNode, proxyAddress); final TransportService.HandshakeResponse handshakeResponse; - Transport.Connection connection = manager.openConnection(seedNode, - ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null)); + ConnectionProfile profile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, + compress); + Transport.Connection connection = manager.openConnection(seedNode, profile); boolean success = false; try { try { diff --git a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java index 08f08207eae..de0e550f2f7 100644 --- a/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java +++ b/server/src/main/java/org/elasticsearch/transport/RemoteClusterService.java @@ -173,6 +173,12 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl key -> timeSetting(key, TcpTransport.PING_SCHEDULE, Setting.Property.NodeScope), REMOTE_CLUSTERS_SEEDS); + public static final Setting.AffixSetting REMOTE_CLUSTER_COMPRESS = Setting.affixKeySetting( + "cluster.remote.", + "transport.compress", + key -> boolSetting(key, Transport.TRANSPORT_TCP_COMPRESS, Setting.Property.NodeScope), + REMOTE_CLUSTERS_SEEDS); + private static final Predicate DEFAULT_NODE_PREDICATE = (node) -> Version.CURRENT.isCompatible(node.getVersion()) && (node.isMasterNode() == false || node.isDataNode() || node.isIngestNode()); diff --git a/server/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java b/server/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java index 53400207eab..aa659906019 100644 --- a/server/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TaskTransportChannel.java @@ -54,12 +54,6 @@ public class TaskTransportChannel implements TransportChannel { channel.sendResponse(response); } - @Override - public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { - endTask(); - channel.sendResponse(response, options); - } - @Override public void sendResponse(Exception exception) throws IOException { endTask(); diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java index eedd064bca7..545055fadf0 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransport.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransport.java @@ -164,8 +164,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements key -> intSetting(key, -1, -1, Setting.Property.NodeScope)); // This is the number of bytes necessary to read the message size - public static final int BYTES_NEEDED_FOR_MESSAGE_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE; - public static final int PING_DATA_SIZE = -1; + private static final int BYTES_NEEDED_FOR_MESSAGE_SIZE = TcpHeader.MARKER_BYTES_SIZE + TcpHeader.MESSAGE_LENGTH_SIZE; + private static final int PING_DATA_SIZE = -1; protected final CounterMetric successfulPings = new CounterMetric(); protected final CounterMetric failedPings = new CounterMetric(); private static final long NINETY_PER_HEAP_SIZE = (long) (JvmInfo.jvmInfo().getMem().getHeapMax().getBytes() * 0.9); @@ -194,7 +194,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements // this lock is here to make sure we close this transport and disconnect all the client nodes // connections while no connect operations is going on private final ReadWriteLock closeLock = new ReentrantReadWriteLock(); - protected final boolean compress; + protected final boolean compressResponses; private volatile BoundTransportAddress boundAddress; private final String transportName; @@ -218,7 +218,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements this.bigArrays = bigArrays; this.circuitBreakerService = circuitBreakerService; this.namedWriteableRegistry = namedWriteableRegistry; - this.compress = Transport.TRANSPORT_TCP_COMPRESS.get(settings); + this.compressResponses = Transport.TRANSPORT_TCP_COMPRESS.get(settings); this.networkService = networkService; this.transportName = transportName; this.transportLogger = new TransportLogger(); @@ -284,6 +284,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements private final List channels; private final DiscoveryNode node; private final Version version; + private final boolean compress; private final AtomicBoolean isClosing = new AtomicBoolean(false); NodeChannels(DiscoveryNode node, List channels, ConnectionProfile connectionProfile, Version handshakeVersion) { @@ -297,6 +298,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements typeMapping.put(type, handle); } version = handshakeVersion; + compress = connectionProfile.getCompressionEnabled(); } @Override @@ -384,6 +386,10 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements throw new NodeNotConnectedException(node, "connection already closed"); } TcpChannel channel = channel(options.type()); + + if (compress) { + options = TransportRequestOptions.builder(options).withCompress(true).build(); + } sendRequestToChannel(this.node, channel, requestId, action, request, options, getVersion(), (byte) 0); } } @@ -573,8 +579,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } // package private for tests - public static int resolvePublishPort(ProfileSettings profileSettings, List boundAddresses, - InetAddress publishInetAddress) { + static int resolvePublishPort(ProfileSettings profileSettings, List boundAddresses, + InetAddress publishInetAddress) { int publishPort = profileSettings.publishPort; // if port not explicitly provided, search for port of address in boundAddresses that matches publishInetAddress @@ -811,16 +817,13 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements */ protected abstract void stopInternal(); - public boolean canCompress(TransportRequest request) { - return compress && (!(request instanceof BytesTransportRequest)); + private boolean canCompress(TransportRequest request) { + return request instanceof BytesTransportRequest == false; } private void sendRequestToChannel(final DiscoveryNode node, final TcpChannel channel, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options, Version channelVersion, byte status) throws IOException, TransportException { - if (compress) { - options = TransportRequestOptions.builder(options).withCompress(true).build(); - } // only compress if asked and the request is not bytes. Otherwise only // the header part is compressed, and the "body" can't be extracted as compressed @@ -935,10 +938,11 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements final String action, TransportResponseOptions options, byte status) throws IOException { - if (compress) { + if (compressResponses && options.compress() == false) { options = TransportResponseOptions.builder(options).withCompress(true).build(); } - status = TransportStatus.setResponse(status); // TODO share some code with sendRequest + + status = TransportStatus.setResponse(status); ReleasableBytesStreamOutput bStream = new ReleasableBytesStreamOutput(bigArrays); CompressibleBytesOutputStream stream = new CompressibleBytesOutputStream(bStream, options.compress()); boolean addedReleaseListener = false; @@ -1159,7 +1163,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements */ public static class HttpOnTransportException extends ElasticsearchException { - public HttpOnTransportException(String msg) { + private HttpOnTransportException(String msg) { super(msg); } @@ -1346,7 +1350,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements getInFlightRequestBreaker().addWithoutBreaking(messageLengthBytes); } transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName, - messageLengthBytes); + messageLengthBytes, TransportStatus.isCompress(status)); final TransportRequest request = reg.newRequest(stream); request.remoteAddress(new TransportAddress(remoteAddress)); // in case we throw an exception, i.e. when the limit is hit, we don't want to verify @@ -1356,8 +1360,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } catch (Exception e) { // the circuit breaker tripped if (transportChannel == null) { - transportChannel = - new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, profileName, 0); + transportChannel = new TcpTransportChannel(this, channel, transportName, action, requestId, version, features, + profileName, 0, TransportStatus.isCompress(status)); } try { transportChannel.sendResponse(e); @@ -1416,8 +1420,8 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements } } - public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener listener) { - handshaker.sendHandshake(responseHandlers.newRequestId(), node, channel, timeout, listener); + public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, ActionListener listener) { + handshaker.sendHandshake(responseHandlers.newRequestId(), node, channel, profile.getHandshakeTimeout(), listener); } final int getNumPendingHandshakes() { @@ -1629,7 +1633,7 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements if (countDown.countDown()) { final TcpChannel handshakeChannel = channels.get(0); try { - executeHandshake(node, handshakeChannel, connectionProfile.getHandshakeTimeout(), new ActionListener() { + executeHandshake(node, handshakeChannel, connectionProfile, new ActionListener() { @Override public void onResponse(Version version) { NodeChannels nodeChannels = new NodeChannels(node, channels, connectionProfile, version); diff --git a/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java b/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java index dbaacb7bd87..732e04bcd54 100644 --- a/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TcpTransportChannel.java @@ -26,6 +26,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; public final class TcpTransportChannel implements TransportChannel { + + private final AtomicBoolean released = new AtomicBoolean(); private final TcpTransport transport; private final Version version; private final Set features; @@ -33,12 +35,12 @@ public final class TcpTransportChannel implements TransportChannel { private final long requestId; private final String profileName; private final long reservedBytes; - private final AtomicBoolean released = new AtomicBoolean(); private final String channelType; private final TcpChannel channel; + private final boolean compressResponse; TcpTransportChannel(TcpTransport transport, TcpChannel channel, String channelType, String action, long requestId, Version version, - Set features, String profileName, long reservedBytes) { + Set features, String profileName, long reservedBytes, boolean compressResponse) { this.version = version; this.features = features; this.channel = channel; @@ -48,6 +50,7 @@ public final class TcpTransportChannel implements TransportChannel { this.profileName = profileName; this.reservedBytes = reservedBytes; this.channelType = channelType; + this.compressResponse = compressResponse; } @Override @@ -57,12 +60,13 @@ public final class TcpTransportChannel implements TransportChannel { @Override public void sendResponse(TransportResponse response) throws IOException { - sendResponse(response, TransportResponseOptions.EMPTY); - } - - @Override - public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { try { + TransportResponseOptions options; + if (compressResponse) { + options = TransportResponseOptions.builder().withCompress(true).build(); + } else { + options = TransportResponseOptions.EMPTY; + } transport.sendResponse(version, features, channel, response, requestId, action, options); } finally { release(false); diff --git a/server/src/main/java/org/elasticsearch/transport/TransportChannel.java b/server/src/main/java/org/elasticsearch/transport/TransportChannel.java index 7aeddfc9223..17e538f04c4 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportChannel.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportChannel.java @@ -34,8 +34,6 @@ public interface TransportChannel { void sendResponse(TransportResponse response) throws IOException; - void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException; - void sendResponse(Exception exception) throws IOException; /** diff --git a/server/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java b/server/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java index a36793ed5d8..b704c10ef39 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java @@ -38,8 +38,7 @@ public class TransportResponseOptions { } public static Builder builder(TransportResponseOptions options) { - return new Builder() - .withCompress(options.compress); + return new Builder().withCompress(options.compress); } public static class Builder { diff --git a/server/src/main/java/org/elasticsearch/transport/TransportService.java b/server/src/main/java/org/elasticsearch/transport/TransportService.java index c2ae982b3dc..38e8c01cbcf 100644 --- a/server/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/server/src/main/java/org/elasticsearch/transport/TransportService.java @@ -1169,12 +1169,7 @@ public class TransportService extends AbstractLifecycleComponent implements Tran @Override public void sendResponse(TransportResponse response) throws IOException { - sendResponse(response, TransportResponseOptions.EMPTY); - } - - @Override - public void sendResponse(final TransportResponse response, TransportResponseOptions options) throws IOException { - service.onResponseSent(requestId, action, response, options); + service.onResponseSent(requestId, action, response, TransportResponseOptions.EMPTY); final TransportResponseHandler handler = service.responseHandlers.onResponseReceived(requestId, service); // ignore if its null, the service logs it if (handler != null) { diff --git a/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java b/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java index 20383d8cf88..3d881307023 100644 --- a/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java @@ -58,7 +58,6 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportResponseOptions; import org.elasticsearch.transport.TransportService; import org.junit.After; import org.junit.AfterClass; @@ -487,10 +486,6 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase { capturedResponse = response; } - @Override - public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { - } - @Override public void sendResponse(Exception exception) throws IOException { } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index cbe490c87ae..aeda5f1c3fa 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -80,7 +80,6 @@ import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportResponseOptions; import org.elasticsearch.transport.TransportService; import org.hamcrest.Matcher; import org.junit.After; @@ -1248,11 +1247,6 @@ public class TransportReplicationActionTests extends ESTestCase { listener.onResponse(((TestResponse) response)); } - @Override - public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { - listener.onResponse(((TestResponse) response)); - } - @Override public void sendResponse(Exception exception) throws IOException { listener.onFailure(exception); diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java index 23b118ebbed..fd81996a74f 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/PublishClusterStateActionTests.java @@ -55,7 +55,6 @@ import org.elasticsearch.transport.BytesTransportRequest; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportConnectionListener; import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportResponseOptions; import org.elasticsearch.transport.TransportService; import org.junit.After; import org.junit.Before; @@ -921,12 +920,6 @@ public class PublishClusterStateActionTests extends ESTestCase { assertThat(error.get(), nullValue()); } - @Override - public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { - this.response.set(response); - assertThat(error.get(), nullValue()); - } - @Override public void sendResponse(Exception exception) throws IOException { this.error.set(exception); diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index b82ddde7eca..cbd609282f4 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -54,7 +54,6 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportResponse; -import org.elasticsearch.transport.TransportResponseOptions; import org.elasticsearch.transport.TransportService; import java.io.Closeable; @@ -393,11 +392,6 @@ public class ZenDiscoveryUnitTests extends ESTestCase { sendResponse.set(true); } - @Override - public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { - - } - @Override public void sendResponse(Exception exception) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java index 3dc9e0aece7..03ecf65737d 100644 --- a/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ConnectionManagerTests.java @@ -35,7 +35,6 @@ import java.net.InetAddress; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -55,7 +54,7 @@ public class ConnectionManagerTests extends ESTestCase { transport = mock(Transport.class); connectionManager = new ConnectionManager(settings, transport, threadPool); TimeValue oneSecond = new TimeValue(1000); - connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, oneSecond, oneSecond); + connectionProfile = ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, oneSecond, oneSecond, false); } @After @@ -63,73 +62,6 @@ public class ConnectionManagerTests extends ESTestCase { threadPool.shutdown(); } - public void testConnectionProfileResolve() { - final ConnectionProfile defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY); - assertEquals(defaultProfile, ConnectionProfile.resolveConnectionProfile(null, defaultProfile)); - - final ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); - builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.BULK); - builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.RECOVERY); - builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.REG); - builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.STATE); - builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.PING); - - final boolean connectionTimeoutSet = randomBoolean(); - if (connectionTimeoutSet) { - builder.setConnectTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); - } - final boolean connectionHandshakeSet = randomBoolean(); - if (connectionHandshakeSet) { - builder.setHandshakeTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); - } - - final ConnectionProfile profile = builder.build(); - final ConnectionProfile resolved = ConnectionProfile.resolveConnectionProfile(profile, defaultProfile); - assertNotEquals(resolved, defaultProfile); - assertThat(resolved.getNumConnections(), equalTo(profile.getNumConnections())); - assertThat(resolved.getHandles(), equalTo(profile.getHandles())); - - assertThat(resolved.getConnectTimeout(), - equalTo(connectionTimeoutSet ? profile.getConnectTimeout() : defaultProfile.getConnectTimeout())); - assertThat(resolved.getHandshakeTimeout(), - equalTo(connectionHandshakeSet ? profile.getHandshakeTimeout() : defaultProfile.getHandshakeTimeout())); - } - - public void testDefaultConnectionProfile() { - ConnectionProfile profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY); - assertEquals(13, profile.getNumConnections()); - assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); - assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); - assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); - assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); - assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); - - profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build()); - assertEquals(12, profile.getNumConnections()); - assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); - assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); - assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); - assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); - assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); - - profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build()); - assertEquals(11, profile.getNumConnections()); - assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); - assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); - assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); - assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); - assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); - - profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false) - .put("node.master", false).build()); - assertEquals(10, profile.getNumConnections()); - assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); - assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); - assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); - assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); - assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); - } - public void testConnectAndDisconnect() { AtomicInteger nodeConnectedCount = new AtomicInteger(); AtomicInteger nodeDisconnectedCount = new AtomicInteger(); diff --git a/server/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java b/server/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java index 318ee78ece5..8d053f7ade6 100644 --- a/server/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java +++ b/server/src/test/java/org/elasticsearch/transport/ConnectionProfileTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.transport; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; @@ -27,19 +28,26 @@ import java.util.Collections; import java.util.EnumSet; import java.util.List; +import static org.hamcrest.Matchers.equalTo; + public class ConnectionProfileTests extends ESTestCase { public void testBuildConnectionProfile() { ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); TimeValue connectTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 10)); - TimeValue handshaketTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 10)); + TimeValue handshakeTimeout = TimeValue.timeValueMillis(randomIntBetween(1, 10)); + boolean compressionEnabled = randomBoolean(); final boolean setConnectTimeout = randomBoolean(); if (setConnectTimeout) { builder.setConnectTimeout(connectTimeout); } final boolean setHandshakeTimeout = randomBoolean(); if (setHandshakeTimeout) { - builder.setHandshakeTimeout(handshaketTimeout); + builder.setHandshakeTimeout(handshakeTimeout); + } + final boolean setCompress = randomBoolean(); + if (setCompress) { + builder.setCompressionEnabled(compressionEnabled); } builder.addConnections(1, TransportRequestOptions.Type.BULK); builder.addConnections(2, TransportRequestOptions.Type.STATE, TransportRequestOptions.Type.RECOVERY); @@ -63,11 +71,17 @@ public class ConnectionProfileTests extends ESTestCase { } if (setHandshakeTimeout) { - assertEquals(handshaketTimeout, build.getHandshakeTimeout()); + assertEquals(handshakeTimeout, build.getHandshakeTimeout()); } else { assertNull(build.getHandshakeTimeout()); } + if (setCompress) { + assertEquals(compressionEnabled, build.getCompressionEnabled()); + } else { + assertNull(build.getCompressionEnabled()); + } + List list = new ArrayList<>(10); for (int i = 0; i < 10; i++) { list.add(i); @@ -126,4 +140,81 @@ public class ConnectionProfileTests extends ESTestCase { assertEquals(Integer.valueOf(0), build.getHandles().get(0).getChannel(array)); expectThrows(IllegalStateException.class, () -> build.getHandles().get(1).getChannel(array)); } + + public void testConnectionProfileResolve() { + final ConnectionProfile defaultProfile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY); + assertEquals(defaultProfile, ConnectionProfile.resolveConnectionProfile(null, defaultProfile)); + + final ConnectionProfile.Builder builder = new ConnectionProfile.Builder(); + builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.BULK); + builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.RECOVERY); + builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.REG); + builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.STATE); + builder.addConnections(randomIntBetween(0, 5), TransportRequestOptions.Type.PING); + + final boolean connectionTimeoutSet = randomBoolean(); + if (connectionTimeoutSet) { + builder.setConnectTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); + } + final boolean connectionHandshakeSet = randomBoolean(); + if (connectionHandshakeSet) { + builder.setHandshakeTimeout(TimeValue.timeValueMillis(randomNonNegativeLong())); + } + + final boolean connectionCompressSet = randomBoolean(); + if (connectionCompressSet) { + builder.setCompressionEnabled(randomBoolean()); + } + + final ConnectionProfile profile = builder.build(); + final ConnectionProfile resolved = ConnectionProfile.resolveConnectionProfile(profile, defaultProfile); + assertNotEquals(resolved, defaultProfile); + assertThat(resolved.getNumConnections(), equalTo(profile.getNumConnections())); + assertThat(resolved.getHandles(), equalTo(profile.getHandles())); + + assertThat(resolved.getConnectTimeout(), + equalTo(connectionTimeoutSet ? profile.getConnectTimeout() : defaultProfile.getConnectTimeout())); + assertThat(resolved.getHandshakeTimeout(), + equalTo(connectionHandshakeSet ? profile.getHandshakeTimeout() : defaultProfile.getHandshakeTimeout())); + assertThat(resolved.getCompressionEnabled(), + equalTo(connectionCompressSet ? profile.getCompressionEnabled() : defaultProfile.getCompressionEnabled())); + } + + public void testDefaultConnectionProfile() { + ConnectionProfile profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.EMPTY); + assertEquals(13, profile.getNumConnections()); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); + assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); + assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); + assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + assertEquals(TransportService.TCP_CONNECT_TIMEOUT.get(Settings.EMPTY), profile.getConnectTimeout()); + assertEquals(TransportService.TCP_CONNECT_TIMEOUT.get(Settings.EMPTY), profile.getHandshakeTimeout()); + assertEquals(Transport.TRANSPORT_TCP_COMPRESS.get(Settings.EMPTY), profile.getCompressionEnabled()); + + profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.master", false).build()); + assertEquals(12, profile.getNumConnections()); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); + assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); + assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); + assertEquals(2, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); + assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + + profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false).build()); + assertEquals(11, profile.getNumConnections()); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); + assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); + assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); + assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + + profile = ConnectionProfile.buildDefaultConnectionProfile(Settings.builder().put("node.data", false) + .put("node.master", false).build()); + assertEquals(10, profile.getNumConnections()); + assertEquals(1, profile.getNumConnectionsPerType(TransportRequestOptions.Type.PING)); + assertEquals(6, profile.getNumConnectionsPerType(TransportRequestOptions.Type.REG)); + assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.STATE)); + assertEquals(0, profile.getNumConnectionsPerType(TransportRequestOptions.Type.RECOVERY)); + assertEquals(3, profile.getNumConnectionsPerType(TransportRequestOptions.Type.BULK)); + } } diff --git a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java index b9ce7d3be37..f3e4ef46a71 100644 --- a/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java +++ b/server/src/test/java/org/elasticsearch/transport/TcpTransportTests.java @@ -187,8 +187,7 @@ public class TcpTransportTests extends ESTestCase { ThreadPool threadPool = new TestThreadPool(TcpTransportTests.class.getName()); AtomicReference messageCaptor = new AtomicReference<>(); try { - TcpTransport transport = new TcpTransport( - "test", Settings.builder().put("transport.tcp.compress", compressed).build(), Version.CURRENT, threadPool, + TcpTransport transport = new TcpTransport("test", Settings.EMPTY, Version.CURRENT, threadPool, new BigArrays(new PageCacheRecycler(Settings.EMPTY), null), null, null, null) { @Override @@ -207,17 +206,24 @@ public class TcpTransportTests extends ESTestCase { @Override public NodeChannels openConnection(DiscoveryNode node, ConnectionProfile connectionProfile) { - int numConnections = MockTcpTransport.LIGHT_PROFILE.getNumConnections(); + assertTrue(connectionProfile.getCompressionEnabled()); + int numConnections = connectionProfile.getNumConnections(); ArrayList fakeChannels = new ArrayList<>(numConnections); for (int i = 0; i < numConnections; ++i) { fakeChannels.add(new FakeChannel(messageCaptor)); } - return new NodeChannels(node, fakeChannels, MockTcpTransport.LIGHT_PROFILE, Version.CURRENT); + return new NodeChannels(node, fakeChannels, connectionProfile, Version.CURRENT); } }; DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(), Version.CURRENT); - Transport.Connection connection = transport.openConnection(node, null); + ConnectionProfile.Builder profileBuilder = new ConnectionProfile.Builder(MockTcpTransport.LIGHT_PROFILE); + if (compressed) { + profileBuilder.setCompressionEnabled(true); + } else { + profileBuilder.setCompressionEnabled(false); + } + Transport.Connection connection = transport.openConnection(node, profileBuilder.build()); connection.sendRequest(42, "foobar", request, TransportRequestOptions.EMPTY); BytesReference reference = messageCaptor.get(); diff --git a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 2015bbf353d..d858b080c49 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -499,106 +499,117 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testVoidMessageCompressed() { - serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC, - (request, channel, task) -> { - try { - TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build(); - channel.sendResponse(TransportResponse.Empty.INSTANCE, responseOptions); - } catch (IOException e) { - logger.error("Unexpected failure", e); - fail(e.getMessage()); - } - }); + try (MockTransportService serviceC = build(Settings.EMPTY, CURRENT_VERSION, null, true)) { + serviceC.start(); + serviceC.acceptIncomingRequests(); - TransportFuture res = serviceB.submitRequest(nodeA, "internal:sayHello", - TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(), - new TransportResponseHandler() { - @Override - public TransportResponse.Empty read(StreamInput in) { - return TransportResponse.Empty.INSTANCE; - } - - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } - - @Override - public void handleResponse(TransportResponse.Empty response) { - } - - @Override - public void handleException(TransportException exp) { - logger.error("Unexpected failure", exp); - fail("got exception instead of a response: " + exp.getMessage()); - } - }); - - try { - TransportResponse.Empty message = res.get(); - assertThat(message, notNullValue()); - } catch (Exception e) { - assertThat(e.getMessage(), false, equalTo(true)); - } - } - - public void testHelloWorldCompressed() { - serviceA.registerRequestHandler("internal:sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, - new TransportRequestHandler() { - @Override - public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) { - assertThat("moshe", equalTo(request.message)); + serviceA.registerRequestHandler("internal:sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC, + (request, channel, task) -> { try { - TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build(); - channel.sendResponse(new StringMessageResponse("hello " + request.message), responseOptions); + channel.sendResponse(TransportResponse.Empty.INSTANCE); } catch (IOException e) { logger.error("Unexpected failure", e); fail(e.getMessage()); } - } - }); + }); - TransportFuture res = serviceB.submitRequest(nodeA, "internal:sayHello", - new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), - new TransportResponseHandler() { - @Override - public StringMessageResponse read(StreamInput in) throws IOException { - return new StringMessageResponse(in); - } + Settings settingsWithCompress = Settings.builder().put(Transport.TRANSPORT_TCP_COMPRESS.getKey(), true).build(); + ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress); + serviceC.connectToNode(serviceA.getLocalDiscoNode(), connectionProfile); - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } - @Override - public void handleResponse(StringMessageResponse response) { - assertThat("hello moshe", equalTo(response.message)); - } + TransportFuture res = serviceC.submitRequest(nodeA, "internal:sayHello", + TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(), + new TransportResponseHandler() { + @Override + public TransportResponse.Empty read(StreamInput in) { + return TransportResponse.Empty.INSTANCE; + } - @Override - public void handleException(TransportException exp) { - logger.error("Unexpected failure", exp); - fail("got exception instead of a response: " + exp.getMessage()); - } - }); + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } - try { - StringMessageResponse message = res.get(); - assertThat("hello moshe", equalTo(message.message)); - } catch (Exception e) { - assertThat(e.getMessage(), false, equalTo(true)); + @Override + public void handleResponse(TransportResponse.Empty response) { + } + + @Override + public void handleException(TransportException exp) { + logger.error("Unexpected failure", exp); + fail("got exception instead of a response: " + exp.getMessage()); + } + }); + + try { + TransportResponse.Empty message = res.get(); + assertThat(message, notNullValue()); + } catch (Exception e) { + assertThat(e.getMessage(), false, equalTo(true)); + } + } + } + + public void testHelloWorldCompressed() throws IOException { + try (MockTransportService serviceC = build(Settings.EMPTY, CURRENT_VERSION, null, true)) { + serviceC.start(); + serviceC.acceptIncomingRequests(); + + serviceA.registerRequestHandler("internal:sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, + (request, channel, task) -> { + assertThat("moshe", equalTo(request.message)); + try { + channel.sendResponse(new StringMessageResponse("hello " + request.message)); + } catch (IOException e) { + logger.error("Unexpected failure", e); + fail(e.getMessage()); + } + }); + + Settings settingsWithCompress = Settings.builder().put(Transport.TRANSPORT_TCP_COMPRESS.getKey(), true).build(); + ConnectionProfile connectionProfile = ConnectionProfile.buildDefaultConnectionProfile(settingsWithCompress); + serviceC.connectToNode(serviceA.getLocalDiscoNode(), connectionProfile); + + TransportFuture res = serviceC.submitRequest(nodeA, "internal:sayHello", + new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), + new TransportResponseHandler() { + @Override + public StringMessageResponse read(StreamInput in) throws IOException { + return new StringMessageResponse(in); + } + + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } + + @Override + public void handleResponse(StringMessageResponse response) { + assertThat("hello moshe", equalTo(response.message)); + } + + @Override + public void handleException(TransportException exp) { + logger.error("Unexpected failure", exp); + fail("got exception instead of a response: " + exp.getMessage()); + } + }); + + try { + StringMessageResponse message = res.get(); + assertThat("hello moshe", equalTo(message.message)); + } catch (Exception e) { + assertThat(e.getMessage(), false, equalTo(true)); + } } } public void testErrorMessage() { serviceA.registerRequestHandler("internal:sayHelloException", StringMessageRequest::new, ThreadPool.Names.GENERIC, - new TransportRequestHandler() { - @Override - public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) throws Exception { - assertThat("moshe", equalTo(request.message)); - throw new RuntimeException("bad message !!!"); - } + (request, channel, task) -> { + assertThat("moshe", equalTo(request.message)); + throw new RuntimeException("bad message !!!"); }); TransportFuture res = serviceB.submitRequest(nodeA, "internal:sayHelloException", @@ -2028,10 +2039,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { TcpTransport.NodeChannels connection = originalTransport.openConnection( new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0), connectionProfile)) { - PlainActionFuture listener = PlainActionFuture.newFuture(); - originalTransport.executeHandshake(connection.getNode(), connection.channel(TransportRequestOptions.Type.PING), - TimeValue.timeValueSeconds(10), listener); - assertEquals(listener.actionGet(), Version.CURRENT); + assertEquals(connection.getVersion(), Version.CURRENT); } } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java index 2fddb42d570..d1977da9795 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/MockTcpTransport.java @@ -211,6 +211,7 @@ public class MockTcpTransport extends TcpTransport { } builder.setHandshakeTimeout(connectionProfile.getHandshakeTimeout()); builder.setConnectTimeout(connectionProfile.getConnectTimeout()); + builder.setCompressionEnabled(connectionProfile.getCompressionEnabled()); return builder.build(); } diff --git a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java index 0fb61142073..a98be373b2a 100644 --- a/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java @@ -154,6 +154,7 @@ public class MockNioTransport extends TcpTransport { } builder.setHandshakeTimeout(connectionProfile.getHandshakeTimeout()); builder.setConnectTimeout(connectionProfile.getConnectTimeout()); + builder.setCompressionEnabled(connectionProfile.getCompressionEnabled()); return builder.build(); } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java index 1e5c6092687..5c7fdc10649 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/MockTcpTransportTests.java @@ -25,7 +25,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.transport.MockTransportService; @@ -41,9 +40,10 @@ public class MockTcpTransportTests extends AbstractSimpleTransportTestCase { new NoneCircuitBreakerService(), namedWriteableRegistry, new NetworkService(Collections.emptyList()), version) { @Override - public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener listener) { + public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, + ActionListener listener) { if (doHandshake) { - super.executeHandshake(node, channel, timeout, listener); + super.executeHandshake(node, channel, profile, listener); } else { listener.onResponse(version.minimumCompatibilityVersion()); } diff --git a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java index c6ba13d4ca7..622b6b4657c 100644 --- a/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java +++ b/test/framework/src/test/java/org/elasticsearch/transport/nio/SimpleMockNioTransportTests.java @@ -27,7 +27,6 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -37,6 +36,7 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.AbstractSimpleTransportTestCase; import org.elasticsearch.transport.BindTransportException; import org.elasticsearch.transport.ConnectTransportException; +import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transport; @@ -62,9 +62,10 @@ public class SimpleMockNioTransportTests extends AbstractSimpleTransportTestCase new MockPageCacheRecycler(settings), namedWriteableRegistry, new NoneCircuitBreakerService()) { @Override - public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener listener) { + public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, + ActionListener listener) { if (doHandshake) { - super.executeHandshake(node, channel, timeout, listener); + super.executeHandshake(node, channel, profile, listener); } else { listener.onResponse(version.minimumCompatibilityVersion()); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java index 3b98bc8aa5f..a716d955bbe 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/AbstractSimpleSecurityTransportTestCase.java @@ -5,9 +5,7 @@ */ package org.elasticsearch.xpack.security.transport; -import java.util.concurrent.atomic.AtomicBoolean; import org.elasticsearch.Version; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.stream.OutputStreamStreamOutput; @@ -15,7 +13,6 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.MockSecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.node.Node; import org.elasticsearch.test.transport.MockTransportService; @@ -24,7 +21,6 @@ import org.elasticsearch.transport.BindTransportException; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpTransport; -import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.common.socket.SocketAccess; import org.elasticsearch.xpack.core.ssl.SSLConfiguration; @@ -39,6 +35,7 @@ import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.nio.file.Path; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; @@ -107,7 +104,7 @@ public abstract class AbstractSimpleSecurityTransportTestCase extends AbstractSi } @Override - public void testTcpHandshake() throws InterruptedException { + public void testTcpHandshake() { assumeTrue("only tcp transport has a handshake method", serviceA.getOriginalTransport() instanceof TcpTransport); TcpTransport originalTransport = (TcpTransport) serviceA.getOriginalTransport(); @@ -116,10 +113,7 @@ public abstract class AbstractSimpleSecurityTransportTestCase extends AbstractSi TcpTransport.NodeChannels connection = originalTransport.openConnection( new DiscoveryNode("TS_TPC", "TS_TPC", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0), connectionProfile)) { - PlainActionFuture listener = PlainActionFuture.newFuture(); - originalTransport.executeHandshake(connection.getNode(), connection.channel(TransportRequestOptions.Type.PING), - TimeValue.timeValueSeconds(10), listener); - assertEquals(listener.actionGet(), Version.CURRENT); + assertEquals(connection.getVersion(), Version.CURRENT); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java index 3e4fbee5197..22d53241b45 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/ServerTransportFilterIntegrationTests.java @@ -165,7 +165,7 @@ public class ServerTransportFilterIntegrationTests extends SecurityIntegTestCase node.start(); TransportService instance = node.injector().getInstance(TransportService.class); try (Transport.Connection connection = instance.openConnection(new DiscoveryNode("theNode", transportAddress, Version.CURRENT), - ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null))) { + ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG))) { // handshake should be ok final DiscoveryNode handshake = instance.handshake(connection, 10000); assertEquals(transport.boundAddress().publishAddress(), handshake.getAddress()); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java index 8c4dcf9e2fa..ec85c41e6b1 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/netty4/SimpleSecurityNetty4ServerTransportTests.java @@ -20,7 +20,6 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.transport.MockTransportService; @@ -77,9 +76,10 @@ public class SimpleSecurityNetty4ServerTransportTests extends AbstractSimpleSecu new NoneCircuitBreakerService(), null, createSSLService(settings1)) { @Override - public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener listener) { + public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, + ActionListener listener) { if (doHandshake) { - super.executeHandshake(node, channel, timeout, listener); + super.executeHandshake(node, channel, profile, listener); } else { listener.onResponse(version.minimumCompatibilityVersion()); } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java index 5f336e2e5d3..3ccd6c5f7e3 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/transport/nio/SimpleSecurityNioTransportTests.java @@ -12,12 +12,12 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ConnectionProfile; import org.elasticsearch.transport.TcpChannel; import org.elasticsearch.transport.TcpTransport; import org.elasticsearch.transport.Transport; @@ -39,9 +39,10 @@ public class SimpleSecurityNioTransportTests extends AbstractSimpleSecurityTrans createSSLService(settings1)) { @Override - public void executeHandshake(DiscoveryNode node, TcpChannel channel, TimeValue timeout, ActionListener listener) { + public void executeHandshake(DiscoveryNode node, TcpChannel channel, ConnectionProfile profile, + ActionListener listener) { if (doHandshake) { - super.executeHandshake(node, channel, timeout, listener); + super.executeHandshake(node, channel, profile, listener); } else { listener.onResponse(version.minimumCompatibilityVersion()); }