diff --git a/core/src/main/java/org/elasticsearch/action/bulk/BulkAction.java b/core/src/main/java/org/elasticsearch/action/bulk/BulkAction.java index 42d0c22508b..e442f61061a 100644 --- a/core/src/main/java/org/elasticsearch/action/bulk/BulkAction.java +++ b/core/src/main/java/org/elasticsearch/action/bulk/BulkAction.java @@ -47,9 +47,9 @@ public class BulkAction extends Action() { + transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new BaseTransportResponseHandler() { @Override public NodeResponse newInstance() { return newNodeResponse(); diff --git a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 8c9b7d67609..56befbb9b84 100644 --- a/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/core/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -359,7 +359,7 @@ public class TransportClientNodesService extends AbstractComponent { try { LivenessResponse livenessResponse = transportService.submitRequest(listedNode, TransportLivenessAction.NAME, headers.applyTo(new LivenessRequest()), - TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout), + TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(), new FutureTransportResponseHandler() { @Override public LivenessResponse newInstance() { @@ -430,7 +430,7 @@ public class TransportClientNodesService extends AbstractComponent { } transportService.sendRequest(listedNode, ClusterStateAction.NAME, headers.applyTo(Requests.clusterStateRequest().clear().nodes(true).local(true)), - TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout), + TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(), new BaseTransportResponseHandler() { @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java index 8e337dd90c4..8333b967c2f 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/fd/MasterFaultDetection.java @@ -39,8 +39,6 @@ import java.io.IOException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; -import static org.elasticsearch.transport.TransportRequestOptions.options; - /** * A fault detection that pings the master periodically to see if its alive. */ @@ -222,7 +220,7 @@ public class MasterFaultDetection extends FaultDetection { return; } final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName); - final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout); + final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout).build(); transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler() { @Override diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java b/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java index 5619b58dc53..53081f55d21 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/fd/NodesFaultDetection.java @@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap; -import static org.elasticsearch.transport.TransportRequestOptions.options; /** * A fault detection of multiple nodes. @@ -189,7 +188,7 @@ public class NodesFaultDetection extends FaultDetection { return; } final PingRequest pingRequest = new PingRequest(node.id(), clusterName, localNode, clusterStateVersion); - final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout); + final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout).build(); transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, new BaseTransportResponseHandler() { @Override public PingResponse newInstance() { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java index c9c4d298cc4..99feb4b7f72 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ping/unicast/UnicastZenPing.java @@ -437,7 +437,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent implemen private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) { logger.trace("[{}] sending to {}", id, nodeToSend); - transportService.sendRequest(nodeToSend, ACTION_NAME, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler() { + transportService.sendRequest(nodeToSend, ACTION_NAME, pingRequest, TransportRequestOptions.builder().withTimeout((long) (timeout.millis() * 1.25)).build(), new BaseTransportResponseHandler() { @Override public UnicastPingResponse newInstance() { diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java index a8c29523011..93d457d7382 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/publish/PublishClusterStateAction.java @@ -248,7 +248,7 @@ public class PublishClusterStateAction extends AbstractComponent { // -> no need to put a timeout on the options here, because we want the response to eventually be received // and not log an error if it arrives after the timeout // -> no need to compress, we already compressed the bytes - TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false); + TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withCompress(false).build(); transportService.sendRequest(node, SEND_ACTION_NAME, new BytesTransportRequest(bytes, node.version()), options, @@ -282,7 +282,7 @@ public class PublishClusterStateAction extends AbstractComponent { private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) { try { logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", clusterState.stateUUID(), clusterState.version(), node); - TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE); + TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build(); // no need to put a timeout on the options here, because we want the response to eventually be received // and not log an error if it arrives after the timeout transportService.sendRequest(node, COMMIT_ACTION_NAME, diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 6ace3c6b433..8d9e212fb31 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -109,10 +109,11 @@ public class RecoverySourceHandler { this.shardId = this.request.shardId().id(); this.response = new RecoveryResponse(); - this.requestOptions = TransportRequestOptions.options() + this.requestOptions = TransportRequestOptions.builder() .withCompress(recoverySettings.compress()) .withType(TransportRequestOptions.Type.RECOVERY) - .withTimeout(recoverySettings.internalActionTimeout()); + .withTimeout(recoverySettings.internalActionTimeout()) + .build(); } @@ -244,7 +245,7 @@ public class RecoverySourceHandler { response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes, translogView.totalOperations()); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, - TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); }); // How many bytes we've copied since we last called RateLimiter.pause @@ -263,7 +264,7 @@ public class RecoverySourceHandler { try { transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()), - TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } catch (RemoteTransportException remoteException) { final IOException corruptIndexException; @@ -332,7 +333,7 @@ public class RecoverySourceHandler { // garbage collection (not the JVM's GC!) of tombstone deletes transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG, new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId(), translogView.totalOperations()), - TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } }); @@ -390,7 +391,7 @@ public class RecoverySourceHandler { // during this time transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE, new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()), - TransportRequestOptions.options().withTimeout(recoverySettings.internalActionLongTimeout()), + TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); } }); @@ -431,10 +432,11 @@ public class RecoverySourceHandler { throw new ElasticsearchException("failed to get next operation from translog", ex); } - final TransportRequestOptions recoveryOptions = TransportRequestOptions.options() + final TransportRequestOptions recoveryOptions = TransportRequestOptions.builder() .withCompress(recoverySettings.compress()) .withType(TransportRequestOptions.Type.RECOVERY) - .withTimeout(recoverySettings.internalActionLongTimeout()); + .withTimeout(recoverySettings.internalActionLongTimeout()) + .build(); if (operation == null) { logger.trace("[{}][{}] no translog operations to send to {}", diff --git a/core/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java b/core/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java index 0d92d00f144..879d6aec661 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportRequestOptions.java @@ -21,64 +21,16 @@ package org.elasticsearch.transport; import org.elasticsearch.common.unit.TimeValue; -/** - * - */ public class TransportRequestOptions { - public static final TransportRequestOptions EMPTY = options(); + private final TimeValue timeout; + private final boolean compress; + private final Type type; - public static TransportRequestOptions options() { - return new TransportRequestOptions(); - } - - public static enum Type { - RECOVERY, - BULK, - REG, - STATE, - PING; - - public static Type fromString(String type) { - if ("bulk".equalsIgnoreCase(type)) { - return BULK; - } else if ("reg".equalsIgnoreCase(type)) { - return REG; - } else if ("state".equalsIgnoreCase(type)) { - return STATE; - } else if ("recovery".equalsIgnoreCase(type)) { - return RECOVERY; - } else if ("ping".equalsIgnoreCase(type)) { - return PING; - } else { - throw new IllegalArgumentException("failed to match transport type for [" + type + "]"); - } - } - } - - private TimeValue timeout; - - private boolean compress; - - private Type type = Type.REG; - - public TransportRequestOptions withTimeout(long timeout) { - return withTimeout(TimeValue.timeValueMillis(timeout)); - } - - public TransportRequestOptions withTimeout(TimeValue timeout) { + private TransportRequestOptions(TimeValue timeout, boolean compress, Type type) { this.timeout = timeout; - return this; - } - - public TransportRequestOptions withCompress(boolean compress) { this.compress = compress; - return this; - } - - public TransportRequestOptions withType(Type type) { this.type = type; - return this; } public TimeValue timeout() { @@ -92,4 +44,57 @@ public class TransportRequestOptions { public Type type() { return this.type; } + + public static final TransportRequestOptions EMPTY = new TransportRequestOptions.Builder().build(); + + public enum Type { + RECOVERY, + BULK, + REG, + STATE, + PING + } + + public static Builder builder() { + return new Builder(); + } + + public static Builder builder(TransportRequestOptions options) { + return new Builder() + .withTimeout(options.timeout) + .withCompress(options.compress) + .withType(options.type()); + } + + public static class Builder { + private TimeValue timeout; + private boolean compress; + private Type type = Type.REG; + + private Builder() { + } + + public Builder withTimeout(long timeout) { + return withTimeout(TimeValue.timeValueMillis(timeout)); + } + + public Builder withTimeout(TimeValue timeout) { + this.timeout = timeout; + return this; + } + + public Builder withCompress(boolean compress) { + this.compress = compress; + return this; + } + + public Builder withType(Type type) { + this.type = type; + return this; + } + + public TransportRequestOptions build() { + return new TransportRequestOptions(timeout, compress, type); + } + } } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java b/core/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java index 32dbf528b74..eb163641749 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportResponseOptions.java @@ -24,20 +24,37 @@ package org.elasticsearch.transport; */ public class TransportResponseOptions { - public static final TransportResponseOptions EMPTY = options(); + private final boolean compress; - public static TransportResponseOptions options() { - return new TransportResponseOptions(); - } - - private boolean compress; - - public TransportResponseOptions withCompress(boolean compress) { + private TransportResponseOptions(boolean compress) { this.compress = compress; - return this; } public boolean compress() { return this.compress; } + + public static final TransportResponseOptions EMPTY = TransportResponseOptions.builder().build(); + + public static Builder builder() { + return new Builder(); + } + + public static Builder builder(TransportResponseOptions options) { + return new Builder() + .withCompress(options.compress); + } + + public static class Builder { + private boolean compress; + + public Builder withCompress(boolean compress) { + this.compress = compress; + return this; + } + + public TransportResponseOptions build() { + return new TransportResponseOptions(compress); + } + } } diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java index b98957a38d9..e67904a6db2 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransport.java @@ -812,7 +812,7 @@ public class NettyTransport extends AbstractLifecycleComponent implem Channel targetChannel = nodeChannel(node, options); if (compress) { - options.withCompress(true); + options = TransportRequestOptions.builder(options).withCompress(true).build(); } byte status = 0; diff --git a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java index e601d8016d2..fe3a941f665 100644 --- a/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java +++ b/core/src/main/java/org/elasticsearch/transport/netty/NettyTransportChannel.java @@ -78,7 +78,7 @@ public class NettyTransportChannel implements TransportChannel { @Override public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException { if (transport.compress) { - options.withCompress(true); + options = TransportResponseOptions.builder(options).withCompress(transport.compress).build(); } byte status = 0; diff --git a/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java b/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java index d8a518e3ea0..553ef0c6ac6 100644 --- a/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java +++ b/core/src/test/java/org/elasticsearch/benchmark/transport/BenchmarkNettyLargeMessages.java @@ -38,8 +38,6 @@ import org.elasticsearch.transport.netty.NettyTransport; import java.net.InetAddress; import java.util.concurrent.CountDownLatch; -import static org.elasticsearch.transport.TransportRequestOptions.options; - /** * */ @@ -85,7 +83,7 @@ public class BenchmarkNettyLargeMessages { public void run() { for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) { BenchmarkMessageRequest message = new BenchmarkMessageRequest(1, payload); - transportServiceClient.submitRequest(bigNode, "benchmark", message, options().withType(TransportRequestOptions.Type.BULK), new BaseTransportResponseHandler() { + transportServiceClient.submitRequest(bigNode, "benchmark", message, TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK).build(), new BaseTransportResponseHandler() { @Override public BenchmarkMessageResponse newInstance() { return new BenchmarkMessageResponse(); @@ -117,7 +115,7 @@ public class BenchmarkNettyLargeMessages { for (int i = 0; i < 1; i++) { BenchmarkMessageRequest message = new BenchmarkMessageRequest(2, BytesRef.EMPTY_BYTES); long start = System.currentTimeMillis(); - transportServiceClient.submitRequest(smallNode, "benchmark", message, options().withType(TransportRequestOptions.Type.STATE), new BaseTransportResponseHandler() { + transportServiceClient.submitRequest(smallNode, "benchmark", message, TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build(), new BaseTransportResponseHandler() { @Override public BenchmarkMessageResponse newInstance() { return new BenchmarkMessageResponse(); diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java index 89b39f43020..3f5a91960fc 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientNodesServiceTests.java @@ -129,7 +129,7 @@ public class TransportClientNodesServiceTests extends ESTestCase { throw new IllegalArgumentException(); } - iteration.transportService.sendRequest(node, "action", new TestRequest(), new TransportRequestOptions(), new BaseTransportResponseHandler() { + iteration.transportService.sendRequest(node, "action", new TestRequest(), TransportRequestOptions.EMPTY, new BaseTransportResponseHandler() { @Override public TestResponse newInstance() { return new TestResponse(); diff --git a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index fc72edfcb3b..becb61666da 100644 --- a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.emptyMap; -import static org.elasticsearch.transport.TransportRequestOptions.options; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; @@ -171,7 +170,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } res = serviceB.submitRequest(nodeA, "sayHello", - new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler() { + new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { return new StringMessageResponse(); @@ -256,7 +255,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { @Override public void messageReceived(TransportRequest.Empty request, TransportChannel channel) { try { - channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.options().withCompress(true)); + channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.builder().withCompress(true).build()); } catch (IOException e) { e.printStackTrace(); assertThat(e.getMessage(), false, equalTo(true)); @@ -265,7 +264,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { }); TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", - TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler() { + TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler() { @Override public TransportResponse.Empty newInstance() { return TransportResponse.Empty.INSTANCE; @@ -303,7 +302,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void messageReceived(StringMessageRequest request, TransportChannel channel) { assertThat("moshe", equalTo(request.message)); try { - channel.sendResponse(new StringMessageResponse("hello " + request.message), TransportResponseOptions.options().withCompress(true)); + channel.sendResponse(new StringMessageResponse("hello " + request.message), TransportResponseOptions.builder().withCompress(true).build()); } catch (IOException e) { e.printStackTrace(); assertThat(e.getMessage(), false, equalTo(true)); @@ -312,7 +311,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { }); TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", - new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler() { + new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { return new StringMessageResponse(); @@ -421,7 +420,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } }); TransportFuture foobar = serviceB.submitRequest(nodeA, "foobar", - new StringMessageRequest(""), options(), EmptyTransportResponseHandler.INSTANCE_SAME); + new StringMessageRequest(""), TransportRequestOptions.EMPTY, EmptyTransportResponseHandler.INSTANCE_SAME); latch2.countDown(); try { foobar.txGet(); @@ -448,7 +447,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { }); TransportFuture res = serviceB.submitRequest(nodeA, "sayHelloTimeoutNoResponse", - new StringMessageRequest("moshe"), options().withTimeout(100), new BaseTransportResponseHandler() { + new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { return new StringMessageResponse(); @@ -500,7 +499,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { }); final CountDownLatch latch = new CountDownLatch(1); TransportFuture res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse", - new StringMessageRequest("300ms"), options().withTimeout(100), new BaseTransportResponseHandler() { + new StringMessageRequest("300ms"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { return new StringMessageResponse(); @@ -536,7 +535,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { final int counter = i; // now, try and send another request, this times, with a short timeout res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse", - new StringMessageRequest(counter + "ms"), options().withTimeout(3000), new BaseTransportResponseHandler() { + new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { return new StringMessageResponse(); @@ -621,7 +620,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { tracer.reset(4); boolean timeout = randomBoolean(); - TransportRequestOptions options = timeout ? new TransportRequestOptions().withTimeout(1) : TransportRequestOptions.EMPTY; + TransportRequestOptions options = timeout ? TransportRequestOptions.builder().withTimeout(1).build(): TransportRequestOptions.EMPTY; serviceA.sendRequest(nodeB, "test", new StringMessageRequest("", 10), options, noopResponseHandler); requestCompleted.acquire(); tracer.expectedEvents.get().await(); @@ -1107,7 +1106,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { serviceB.addUnresponsiveRule(serviceA); TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", - new StringMessageRequest("moshe"), TransportRequestOptions.options().withTimeout(100), new BaseTransportResponseHandler() { + new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { return new StringMessageResponse(); diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java index 9634af7d393..afb4d1d75fc 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java @@ -81,7 +81,7 @@ public class NettyScheduledPingTests extends ESTestCase { @Override public void messageReceived(TransportRequest.Empty request, TransportChannel channel) { try { - channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.options()); + channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY); } catch (IOException e) { e.printStackTrace(); assertThat(e.getMessage(), false, equalTo(true)); @@ -93,7 +93,7 @@ public class NettyScheduledPingTests extends ESTestCase { int rounds = scaledRandomIntBetween(100, 5000); for (int i = 0; i < rounds; i++) { serviceB.submitRequest(nodeA, "sayHello", - TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(randomBoolean()), new BaseTransportResponseHandler() { + TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(randomBoolean()).build(), new BaseTransportResponseHandler() { @Override public TransportResponse.Empty newInstance() { return TransportResponse.Empty.INSTANCE;