From 3acd7800cb7023f1fa221cd3e5406ef5e649596d Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Fri, 13 Nov 2015 22:59:49 -0500 Subject: [PATCH] Transport options should be immutable This commit changes TransportRequestOptions and TransportResponseOptions to be immutable. This is to address an issue where the empty options were being mutated permanently altering their state. Making these objects immutable is just good, clean coding. --- .../elasticsearch/action/bulk/BulkAction.java | 4 +- .../support/nodes/TransportNodesAction.java | 8 +- .../TransportClientNodesService.java | 4 +- .../zen/fd/MasterFaultDetection.java | 4 +- .../discovery/zen/fd/NodesFaultDetection.java | 3 +- .../zen/ping/unicast/UnicastZenPing.java | 2 +- .../publish/PublishClusterStateAction.java | 4 +- .../recovery/RecoverySourceHandler.java | 18 +-- .../transport/TransportRequestOptions.java | 109 +++++++++--------- .../transport/TransportResponseOptions.java | 35 ++++-- .../transport/netty/NettyTransport.java | 2 +- .../netty/NettyTransportChannel.java | 2 +- .../BenchmarkNettyLargeMessages.java | 6 +- .../TransportClientNodesServiceTests.java | 2 +- .../AbstractSimpleTransportTestCase.java | 23 ++-- .../netty/NettyScheduledPingTests.java | 4 +- 16 files changed, 124 insertions(+), 106 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkAction.java index 42d0c22508b..e442f61061a 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkAction.java @@ -47,9 +47,9 @@ public class BulkAction extends Action() { + transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new BaseTransportResponseHandler() { @Override public NodeResponse newInstance() { return newNodeResponse(); diff --git a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 8c9b7d67609..56befbb9b84 100644 --- a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -359,7 +359,7 @@ public class TransportClientNodesService extends AbstractComponent { try { LivenessResponse livenessResponse = transportService.submitRequest(listedNode, TransportLivenessAction.NAME, headers.applyTo(new LivenessRequest()), - TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout), + TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(), new FutureTransportResponseHandler() { @Override public LivenessResponse newInstance() { @@ -430,7 +430,7 @@ public class TransportClientNodesService extends AbstractComponent { } transportService.sendRequest(listedNode, ClusterStateAction.NAME, headers.applyTo(Requests.clusterStateRequest().clear().nodes(true).local(true)), - TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout), + TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(), new BaseTransportResponseHandler() { @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index 8e337dd90c4..8333b967c2f 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -39,8 +39,6 @@ import java.io.IOException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; -import static org.elasticsearch.transport.TransportRequestOptions.options; - /** * A fault detection that pings the master periodically to see if its alive. */ @@ -222,7 +220,7 @@ public class MasterFaultDetection extends FaultDetection { return; } final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName); - final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout); + final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout).build(); transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler() { @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index 5619b58dc53..53081f55d21 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; -import static org.elasticsearch.transport.TransportRequestOptions.options; /** * A fault detection of multiple nodes. @@ -189,7 +188,7 @@ public class NodesFaultDetection extends FaultDetection { return; } final PingRequest pingRequest = new PingRequest(node.id(), clusterName, localNode, clusterStateVersion); - final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout); + final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout).build(); transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, new BaseTransportResponseHandler() { @Override public PingResponse newInstance() { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index c9c4d298cc4..99feb4b7f72 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -437,7 +437,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) { logger.trace("[{}] sending to {}", id, nodeToSend); - transportService.sendRequest(nodeToSend, ACTION_NAME, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler() { + transportService.sendRequest(nodeToSend, ACTION_NAME, pingRequest, TransportRequestOptions.builder().withTimeout((long) (timeout.millis() * 1.25)).build(), new BaseTransportResponseHandler() { @Override public UnicastPingResponse newInstance() { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index a8c29523011..93d457d7382 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -248,7 +248,7 @@ public class PublishClusterStateAction extends AbstractComponent { // -> no need to put a timeout on the options here, because we want the response to eventually be received // and not log an error if it arrives after the timeout // -> no need to compress, we already compressed the bytes - TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false); + TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withCompress(false).build(); transportService.sendRequest(node, SEND_ACTION_NAME, new BytesTransportRequest(bytes, node.version()), options, @@ -282,7 +282,7 @@ public class PublishClusterStateAction extends AbstractComponent { private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) { try { logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", clusterState.stateUUID(), clusterState.version(), node); - TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE); + TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build(); // no need to put a timeout on the options here, because we want the response to eventually be received // and not log an error if it arrives after the timeout transportService.sendRequest(node, COMMIT_ACTION_NAME, diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 6ace3c6b433..8d9e212fb31 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -109,10 +109,11 @@ public class RecoverySourceHandler { this.shardId = this.request.shardId().id(); this.response = new RecoveryResponse(); - this.requestOptions = TransportRequestOptions.options() + this.requestOptions = TransportRequestOptions.builder() .withCompress(recoverySettings.compress()) .withType(TransportRequestOptions.Type.RECOVERY) - .withTimeout(recoverySettings.internalActionTimeout()); + .withTimeout(recoverySettings.internalActionTimeout()) + .build(); } @@ -244,7 +245,7 @@ public class RecoverySourceHandler { response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes, translogView.totalOperations()); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, - TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); }); // How many bytes we've copied since we last called RateLimiter.pause @@ -263,7 +264,7 @@ public class RecoverySourceHandler { try { transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()), - TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } catch (RemoteTransportException remoteException) { final IOException corruptIndexException; @@ -332,7 +333,7 @@ public class RecoverySourceHandler { // garbage collection (not the JVM's GC!) of tombstone deletes transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId(), translogView.totalOperations()), - TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } }); @@ -390,7 +391,7 @@ public class RecoverySourceHandler { // during this time transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()), - TransportRequestOptions.options().withTimeout(recoverySettings.internalActionLongTimeout()), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } }); @@ -431,10 +432,11 @@ public class RecoverySourceHandler { throw new ElasticsearchException("failed to get next operation from translog", ex); } - final TransportRequestOptions recoveryOptions = TransportRequestOptions.options() + final TransportRequestOptions recoveryOptions = TransportRequestOptions.builder() .withCompress(recoverySettings.compress()) .withType(TransportRequestOptions.Type.RECOVERY) - .withTimeout(recoverySettings.internalActionLongTimeout()); + .withTimeout(recoverySettings.internalActionLongTimeout()) + .build(); if (operation == null) { logger.trace("[{}][{}] no translog operations to send to {}", diff --git a/core/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java b/core/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java index 0d92d00f144..879d6aec661 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java @@ -21,64 +21,16 @@ package org.elasticsearch.transport; import org.elasticsearch.common.unit.TimeValue; -/** - * - */ public class TransportRequestOptions { - public static final TransportRequestOptions EMPTY = options(); + private final TimeValue timeout; + private final boolean compress; + private final Type type; - public static TransportRequestOptions options() { - return new TransportRequestOptions(); - } - - public static enum Type { - RECOVERY, - BULK, - REG, - STATE, - PING; - - public static Type fromString(String type) { - if ("bulk".equalsIgnoreCase(type)) { - return BULK; - } else if ("reg".equalsIgnoreCase(type)) { - return REG; - } else if ("state".equalsIgnoreCase(type)) { - return STATE; - } else if ("recovery".equalsIgnoreCase(type)) { - return RECOVERY; - } else if ("ping".equalsIgnoreCase(type)) { - return PING; - } else { - throw new IllegalArgumentException("failed to match transport type for [" + type + "]"); - } - } - } - - private TimeValue timeout; - - private boolean compress; - - private Type type = Type.REG; - - public TransportRequestOptions withTimeout(long timeout) { - return withTimeout(TimeValue.timeValueMillis(timeout)); - } - - public TransportRequestOptions withTimeout(TimeValue timeout) { + private TransportRequestOptions(TimeValue timeout, boolean compress, Type type) { this.timeout = timeout; - return this; - } - - public TransportRequestOptions withCompress(boolean compress) { this.compress = compress; - return this; - } - - public TransportRequestOptions withType(Type type) { this.type = type; - return this; } public TimeValue timeout() { @@ -92,4 +44,57 @@ public class TransportRequestOptions { public Type type() { return this.type; } + + public static final TransportRequestOptions EMPTY = new TransportRequestOptions.Builder().build(); + + public enum Type { + RECOVERY, + BULK, + REG, + STATE, + PING + } + + public static Builder builder() { + return new Builder(); + } + + public static Builder builder(TransportRequestOptions options) { + return new Builder() + .withTimeout(options.timeout) + .withCompress(options.compress) + .withType(options.type()); + } + + public static class Builder { + private TimeValue timeout; + private boolean compress; + private Type type = Type.REG; + + private Builder() { + } + + public Builder withTimeout(long timeout) { + return withTimeout(TimeValue.timeValueMillis(timeout)); + } + + public Builder withTimeout(TimeValue timeout) { + this.timeout = timeout; + return this; + } + + public Builder withCompress(boolean compress) { + this.compress = compress; + return this; + } + + public Builder withType(Type type) { + this.type = type; + return this; + } + + public TransportRequestOptions build() { + return new TransportRequestOptions(timeout, compress, type); + } + } } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java b/core/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java index 32dbf528b74..eb163641749 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java @@ -24,20 +24,37 @@ package org.elasticsearch.transport; */ public class TransportResponseOptions { - public static final TransportResponseOptions EMPTY = options(); + private final boolean compress; - public static TransportResponseOptions options() { - return new TransportResponseOptions(); - } - - private boolean compress; - - public TransportResponseOptions withCompress(boolean compress) { + private TransportResponseOptions(boolean compress) { this.compress = compress; - return this; } public boolean compress() { return this.compress; } + + public static final TransportResponseOptions EMPTY = TransportResponseOptions.builder().build(); + + public static Builder builder() { + return new Builder(); + } + + public static Builder builder(TransportResponseOptions options) { + return new Builder() + .withCompress(options.compress); + } + + public static class Builder { + private boolean compress; + + public Builder withCompress(boolean compress) { + this.compress = compress; + return this; + } + + public TransportResponseOptions build() { + return new TransportResponseOptions(compress); + } + } } diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index b98957a38d9..e67904a6db2 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -812,7 +812,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem Channel targetChannel = nodeChannel(node, options); if (compress) { - options.withCompress(true); + options = TransportRequestOptions.builder(options).withCompress(true).build(); } byte status = 0; diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index e601d8016d2..fe3a941f665 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -78,7 +78,7 @@ public class NettyTransportChannel implements TransportChannel { @Override public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { if (transport.compress) { - options.withCompress(true); + options = TransportResponseOptions.builder(options).withCompress(transport.compress).build(); } byte status = 0; diff --git a/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java b/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java index d8a518e3ea0..553ef0c6ac6 100644 --- a/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java +++ b/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java @@ -38,8 +38,6 @@ import org.elasticsearch.transport.netty.NettyTransport; import java.net.InetAddress; import java.util.concurrent.CountDownLatch; -import static org.elasticsearch.transport.TransportRequestOptions.options; - /** * */ @@ -85,7 +83,7 @@ public class BenchmarkNettyLargeMessages { public void run() { for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) { BenchmarkMessageRequest message = new BenchmarkMessageRequest(1, payload); - transportServiceClient.submitRequest(bigNode, "benchmark", message, options().withType(TransportRequestOptions.Type.BULK), new BaseTransportResponseHandler() { + transportServiceClient.submitRequest(bigNode, "benchmark", message, TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK).build(), new BaseTransportResponseHandler() { @Override public BenchmarkMessageResponse newInstance() { return new BenchmarkMessageResponse(); @@ -117,7 +115,7 @@ public class BenchmarkNettyLargeMessages { for (int i = 0; i < 1; i++) { BenchmarkMessageRequest message = new BenchmarkMessageRequest(2, BytesRef.EMPTY_BYTES); long start = System.currentTimeMillis(); - transportServiceClient.submitRequest(smallNode, "benchmark", message, options().withType(TransportRequestOptions.Type.STATE), new BaseTransportResponseHandler() { + transportServiceClient.submitRequest(smallNode, "benchmark", message, TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build(), new BaseTransportResponseHandler() { @Override public BenchmarkMessageResponse newInstance() { return new BenchmarkMessageResponse(); diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index 89b39f43020..3f5a91960fc 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -129,7 +129,7 @@ public class TransportClientNodesServiceTests extends ESTestCase { throw new IllegalArgumentException(); } - iteration.transportService.sendRequest(node, "action", new TestRequest(), new TransportRequestOptions(), new BaseTransportResponseHandler() { + iteration.transportService.sendRequest(node, "action", new TestRequest(), TransportRequestOptions.EMPTY, new BaseTransportResponseHandler() { @Override public TestResponse newInstance() { return new TestResponse(); diff --git a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index fc72edfcb3b..becb61666da 100644 --- a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; -import static org.elasticsearch.transport.TransportRequestOptions.options; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -171,7 +170,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } res = serviceB.submitRequest(nodeA, "sayHello", - new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler() { + new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { return new StringMessageResponse(); @@ -256,7 +255,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Override public void messageReceived(TransportRequest.Empty request, TransportChannel channel) { try { - channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.options().withCompress(true)); + channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.builder().withCompress(true).build()); } catch (IOException e) { e.printStackTrace(); assertThat(e.getMessage(), false, equalTo(true)); @@ -265,7 +264,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { }); TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", - TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler() { + TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler() { @Override public TransportResponse.Empty newInstance() { return TransportResponse.Empty.INSTANCE; @@ -303,7 +302,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void messageReceived(StringMessageRequest request, TransportChannel channel) { assertThat("moshe", equalTo(request.message)); try { - channel.sendResponse(new StringMessageResponse("hello " + request.message), TransportResponseOptions.options().withCompress(true)); + channel.sendResponse(new StringMessageResponse("hello " + request.message), TransportResponseOptions.builder().withCompress(true).build()); } catch (IOException e) { e.printStackTrace(); assertThat(e.getMessage(), false, equalTo(true)); @@ -312,7 +311,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { }); TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", - new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler() { + new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { return new StringMessageResponse(); @@ -421,7 +420,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } }); TransportFuture foobar = serviceB.submitRequest(nodeA, "foobar", - new StringMessageRequest(""), options(), EmptyTransportResponseHandler.INSTANCE_SAME); + new StringMessageRequest(""), TransportRequestOptions.EMPTY, EmptyTransportResponseHandler.INSTANCE_SAME); latch2.countDown(); try { foobar.txGet(); @@ -448,7 +447,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { }); TransportFuture res = serviceB.submitRequest(nodeA, "sayHelloTimeoutNoResponse", - new StringMessageRequest("moshe"), options().withTimeout(100), new BaseTransportResponseHandler() { + new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { return new StringMessageResponse(); @@ -500,7 +499,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { }); final CountDownLatch latch = new CountDownLatch(1); TransportFuture res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse", - new StringMessageRequest("300ms"), options().withTimeout(100), new BaseTransportResponseHandler() { + new StringMessageRequest("300ms"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { return new StringMessageResponse(); @@ -536,7 +535,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { final int counter = i; // now, try and send another request, this times, with a short timeout res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse", - new StringMessageRequest(counter + "ms"), options().withTimeout(3000), new BaseTransportResponseHandler() { + new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { return new StringMessageResponse(); @@ -621,7 +620,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { tracer.reset(4); boolean timeout = randomBoolean(); - TransportRequestOptions options = timeout ? new TransportRequestOptions().withTimeout(1) : TransportRequestOptions.EMPTY; + TransportRequestOptions options = timeout ? TransportRequestOptions.builder().withTimeout(1).build(): TransportRequestOptions.EMPTY; serviceA.sendRequest(nodeB, "test", new StringMessageRequest("", 10), options, noopResponseHandler); requestCompleted.acquire(); tracer.expectedEvents.get().await(); @@ -1107,7 +1106,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { serviceB.addUnresponsiveRule(serviceA); TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", - new StringMessageRequest("moshe"), TransportRequestOptions.options().withTimeout(100), new BaseTransportResponseHandler() { + new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { return new StringMessageResponse(); diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java index 9634af7d393..afb4d1d75fc 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java @@ -81,7 +81,7 @@ public class NettyScheduledPingTests extends ESTestCase { @Override public void messageReceived(TransportRequest.Empty request, TransportChannel channel) { try { - channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.options()); + channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY); } catch (IOException e) { e.printStackTrace(); assertThat(e.getMessage(), false, equalTo(true)); @@ -93,7 +93,7 @@ public class NettyScheduledPingTests extends ESTestCase { int rounds = scaledRandomIntBetween(100, 5000); for (int i = 0; i < rounds; i++) { serviceB.submitRequest(nodeA, "sayHello", - TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(randomBoolean()), new BaseTransportResponseHandler() { + TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(randomBoolean()).build(), new BaseTransportResponseHandler() { @Override public TransportResponse.Empty newInstance() { return TransportResponse.Empty.INSTANCE;