Transport options should be immutable
This commit changes TransportRequestOptions and TransportResponseOptions to be immutable. This is to address an issue where the empty options were being mutated permanently altering their state. Making these objects immutable is just good, clean coding.
This commit is contained in:
parent
55d5da485f
commit
3acd7800cb
|
@ -47,9 +47,9 @@ public class BulkAction extends Action<BulkRequest, BulkResponse, BulkRequestBui
|
|||
|
||||
@Override
|
||||
public TransportRequestOptions transportOptions(Settings settings) {
|
||||
return TransportRequestOptions.options()
|
||||
return TransportRequestOptions.builder()
|
||||
.withType(TransportRequestOptions.Type.BULK)
|
||||
.withCompress(settings.getAsBoolean("action.bulk.compress", true)
|
||||
);
|
||||
).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -126,11 +126,11 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
|
|||
});
|
||||
return;
|
||||
}
|
||||
TransportRequestOptions transportRequestOptions = TransportRequestOptions.options();
|
||||
TransportRequestOptions.Builder builder = TransportRequestOptions.builder();
|
||||
if (request.timeout() != null) {
|
||||
transportRequestOptions.withTimeout(request.timeout());
|
||||
builder.withTimeout(request.timeout());
|
||||
}
|
||||
transportRequestOptions.withCompress(transportCompress());
|
||||
builder.withCompress(transportCompress());
|
||||
for (int i = 0; i < nodesIds.length; i++) {
|
||||
final String nodeId = nodesIds[i];
|
||||
final int idx = i;
|
||||
|
@ -145,7 +145,7 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
|
|||
onFailure(idx, nodeId, new NodeShouldNotConnectException(clusterService.localNode(), node));
|
||||
} else {
|
||||
NodeRequest nodeRequest = newNodeRequest(nodeId, request);
|
||||
transportService.sendRequest(node, transportNodeAction, nodeRequest, transportRequestOptions, new BaseTransportResponseHandler<NodeResponse>() {
|
||||
transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new BaseTransportResponseHandler<NodeResponse>() {
|
||||
@Override
|
||||
public NodeResponse newInstance() {
|
||||
return newNodeResponse();
|
||||
|
|
|
@ -359,7 +359,7 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
try {
|
||||
LivenessResponse livenessResponse = transportService.submitRequest(listedNode, TransportLivenessAction.NAME,
|
||||
headers.applyTo(new LivenessRequest()),
|
||||
TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout),
|
||||
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
|
||||
new FutureTransportResponseHandler<LivenessResponse>() {
|
||||
@Override
|
||||
public LivenessResponse newInstance() {
|
||||
|
@ -430,7 +430,7 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
}
|
||||
transportService.sendRequest(listedNode, ClusterStateAction.NAME,
|
||||
headers.applyTo(Requests.clusterStateRequest().clear().nodes(true).local(true)),
|
||||
TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout),
|
||||
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
|
||||
new BaseTransportResponseHandler<ClusterStateResponse>() {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -39,8 +39,6 @@ import java.io.IOException;
|
|||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.transport.TransportRequestOptions.options;
|
||||
|
||||
/**
|
||||
* A fault detection that pings the master periodically to see if its alive.
|
||||
*/
|
||||
|
@ -222,7 +220,7 @@ public class MasterFaultDetection extends FaultDetection {
|
|||
return;
|
||||
}
|
||||
final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName);
|
||||
final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
|
||||
final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout).build();
|
||||
transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentMap;
|
|||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
|
||||
import static org.elasticsearch.transport.TransportRequestOptions.options;
|
||||
|
||||
/**
|
||||
* A fault detection of multiple nodes.
|
||||
|
@ -189,7 +188,7 @@ public class NodesFaultDetection extends FaultDetection {
|
|||
return;
|
||||
}
|
||||
final PingRequest pingRequest = new PingRequest(node.id(), clusterName, localNode, clusterStateVersion);
|
||||
final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
|
||||
final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout).build();
|
||||
transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, new BaseTransportResponseHandler<PingResponse>() {
|
||||
@Override
|
||||
public PingResponse newInstance() {
|
||||
|
|
|
@ -437,7 +437,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
|
|||
|
||||
private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) {
|
||||
logger.trace("[{}] sending to {}", id, nodeToSend);
|
||||
transportService.sendRequest(nodeToSend, ACTION_NAME, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler<UnicastPingResponse>() {
|
||||
transportService.sendRequest(nodeToSend, ACTION_NAME, pingRequest, TransportRequestOptions.builder().withTimeout((long) (timeout.millis() * 1.25)).build(), new BaseTransportResponseHandler<UnicastPingResponse>() {
|
||||
|
||||
@Override
|
||||
public UnicastPingResponse newInstance() {
|
||||
|
|
|
@ -248,7 +248,7 @@ public class PublishClusterStateAction extends AbstractComponent {
|
|||
// -> no need to put a timeout on the options here, because we want the response to eventually be received
|
||||
// and not log an error if it arrives after the timeout
|
||||
// -> no need to compress, we already compressed the bytes
|
||||
TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false);
|
||||
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withCompress(false).build();
|
||||
transportService.sendRequest(node, SEND_ACTION_NAME,
|
||||
new BytesTransportRequest(bytes, node.version()),
|
||||
options,
|
||||
|
@ -282,7 +282,7 @@ public class PublishClusterStateAction extends AbstractComponent {
|
|||
private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) {
|
||||
try {
|
||||
logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", clusterState.stateUUID(), clusterState.version(), node);
|
||||
TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE);
|
||||
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build();
|
||||
// no need to put a timeout on the options here, because we want the response to eventually be received
|
||||
// and not log an error if it arrives after the timeout
|
||||
transportService.sendRequest(node, COMMIT_ACTION_NAME,
|
||||
|
|
|
@ -109,10 +109,11 @@ public class RecoverySourceHandler {
|
|||
this.shardId = this.request.shardId().id();
|
||||
|
||||
this.response = new RecoveryResponse();
|
||||
this.requestOptions = TransportRequestOptions.options()
|
||||
this.requestOptions = TransportRequestOptions.builder()
|
||||
.withCompress(recoverySettings.compress())
|
||||
.withType(TransportRequestOptions.Type.RECOVERY)
|
||||
.withTimeout(recoverySettings.internalActionTimeout());
|
||||
.withTimeout(recoverySettings.internalActionTimeout())
|
||||
.build();
|
||||
|
||||
}
|
||||
|
||||
|
@ -244,7 +245,7 @@ public class RecoverySourceHandler {
|
|||
response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes,
|
||||
translogView.totalOperations());
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
|
||||
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
|
||||
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
|
||||
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
});
|
||||
// How many bytes we've copied since we last called RateLimiter.pause
|
||||
|
@ -263,7 +264,7 @@ public class RecoverySourceHandler {
|
|||
try {
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
|
||||
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()),
|
||||
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
|
||||
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
|
||||
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
} catch (RemoteTransportException remoteException) {
|
||||
final IOException corruptIndexException;
|
||||
|
@ -332,7 +333,7 @@ public class RecoverySourceHandler {
|
|||
// garbage collection (not the JVM's GC!) of tombstone deletes
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG,
|
||||
new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId(), translogView.totalOperations()),
|
||||
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -390,7 +391,7 @@ public class RecoverySourceHandler {
|
|||
// during this time
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE,
|
||||
new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()),
|
||||
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionLongTimeout()),
|
||||
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
|
||||
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
}
|
||||
});
|
||||
|
@ -431,10 +432,11 @@ public class RecoverySourceHandler {
|
|||
throw new ElasticsearchException("failed to get next operation from translog", ex);
|
||||
}
|
||||
|
||||
final TransportRequestOptions recoveryOptions = TransportRequestOptions.options()
|
||||
final TransportRequestOptions recoveryOptions = TransportRequestOptions.builder()
|
||||
.withCompress(recoverySettings.compress())
|
||||
.withType(TransportRequestOptions.Type.RECOVERY)
|
||||
.withTimeout(recoverySettings.internalActionLongTimeout());
|
||||
.withTimeout(recoverySettings.internalActionLongTimeout())
|
||||
.build();
|
||||
|
||||
if (operation == null) {
|
||||
logger.trace("[{}][{}] no translog operations to send to {}",
|
||||
|
|
|
@ -21,64 +21,16 @@ package org.elasticsearch.transport;
|
|||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class TransportRequestOptions {
|
||||
|
||||
public static final TransportRequestOptions EMPTY = options();
|
||||
private final TimeValue timeout;
|
||||
private final boolean compress;
|
||||
private final Type type;
|
||||
|
||||
public static TransportRequestOptions options() {
|
||||
return new TransportRequestOptions();
|
||||
}
|
||||
|
||||
public static enum Type {
|
||||
RECOVERY,
|
||||
BULK,
|
||||
REG,
|
||||
STATE,
|
||||
PING;
|
||||
|
||||
public static Type fromString(String type) {
|
||||
if ("bulk".equalsIgnoreCase(type)) {
|
||||
return BULK;
|
||||
} else if ("reg".equalsIgnoreCase(type)) {
|
||||
return REG;
|
||||
} else if ("state".equalsIgnoreCase(type)) {
|
||||
return STATE;
|
||||
} else if ("recovery".equalsIgnoreCase(type)) {
|
||||
return RECOVERY;
|
||||
} else if ("ping".equalsIgnoreCase(type)) {
|
||||
return PING;
|
||||
} else {
|
||||
throw new IllegalArgumentException("failed to match transport type for [" + type + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private TimeValue timeout;
|
||||
|
||||
private boolean compress;
|
||||
|
||||
private Type type = Type.REG;
|
||||
|
||||
public TransportRequestOptions withTimeout(long timeout) {
|
||||
return withTimeout(TimeValue.timeValueMillis(timeout));
|
||||
}
|
||||
|
||||
public TransportRequestOptions withTimeout(TimeValue timeout) {
|
||||
private TransportRequestOptions(TimeValue timeout, boolean compress, Type type) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TransportRequestOptions withCompress(boolean compress) {
|
||||
this.compress = compress;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TransportRequestOptions withType(Type type) {
|
||||
this.type = type;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TimeValue timeout() {
|
||||
|
@ -92,4 +44,57 @@ public class TransportRequestOptions {
|
|||
public Type type() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
public static final TransportRequestOptions EMPTY = new TransportRequestOptions.Builder().build();
|
||||
|
||||
public enum Type {
|
||||
RECOVERY,
|
||||
BULK,
|
||||
REG,
|
||||
STATE,
|
||||
PING
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public static Builder builder(TransportRequestOptions options) {
|
||||
return new Builder()
|
||||
.withTimeout(options.timeout)
|
||||
.withCompress(options.compress)
|
||||
.withType(options.type());
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private TimeValue timeout;
|
||||
private boolean compress;
|
||||
private Type type = Type.REG;
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
||||
public Builder withTimeout(long timeout) {
|
||||
return withTimeout(TimeValue.timeValueMillis(timeout));
|
||||
}
|
||||
|
||||
public Builder withTimeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withCompress(boolean compress) {
|
||||
this.compress = compress;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withType(Type type) {
|
||||
this.type = type;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TransportRequestOptions build() {
|
||||
return new TransportRequestOptions(timeout, compress, type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,20 +24,37 @@ package org.elasticsearch.transport;
|
|||
*/
|
||||
public class TransportResponseOptions {
|
||||
|
||||
public static final TransportResponseOptions EMPTY = options();
|
||||
private final boolean compress;
|
||||
|
||||
public static TransportResponseOptions options() {
|
||||
return new TransportResponseOptions();
|
||||
}
|
||||
|
||||
private boolean compress;
|
||||
|
||||
public TransportResponseOptions withCompress(boolean compress) {
|
||||
private TransportResponseOptions(boolean compress) {
|
||||
this.compress = compress;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean compress() {
|
||||
return this.compress;
|
||||
}
|
||||
|
||||
public static final TransportResponseOptions EMPTY = TransportResponseOptions.builder().build();
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public static Builder builder(TransportResponseOptions options) {
|
||||
return new Builder()
|
||||
.withCompress(options.compress);
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private boolean compress;
|
||||
|
||||
public Builder withCompress(boolean compress) {
|
||||
this.compress = compress;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TransportResponseOptions build() {
|
||||
return new TransportResponseOptions(compress);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -812,7 +812,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
Channel targetChannel = nodeChannel(node, options);
|
||||
|
||||
if (compress) {
|
||||
options.withCompress(true);
|
||||
options = TransportRequestOptions.builder(options).withCompress(true).build();
|
||||
}
|
||||
|
||||
byte status = 0;
|
||||
|
|
|
@ -78,7 +78,7 @@ public class NettyTransportChannel implements TransportChannel {
|
|||
@Override
|
||||
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
|
||||
if (transport.compress) {
|
||||
options.withCompress(true);
|
||||
options = TransportResponseOptions.builder(options).withCompress(transport.compress).build();
|
||||
}
|
||||
|
||||
byte status = 0;
|
||||
|
|
|
@ -38,8 +38,6 @@ import org.elasticsearch.transport.netty.NettyTransport;
|
|||
import java.net.InetAddress;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.elasticsearch.transport.TransportRequestOptions.options;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
@ -85,7 +83,7 @@ public class BenchmarkNettyLargeMessages {
|
|||
public void run() {
|
||||
for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) {
|
||||
BenchmarkMessageRequest message = new BenchmarkMessageRequest(1, payload);
|
||||
transportServiceClient.submitRequest(bigNode, "benchmark", message, options().withType(TransportRequestOptions.Type.BULK), new BaseTransportResponseHandler<BenchmarkMessageResponse>() {
|
||||
transportServiceClient.submitRequest(bigNode, "benchmark", message, TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK).build(), new BaseTransportResponseHandler<BenchmarkMessageResponse>() {
|
||||
@Override
|
||||
public BenchmarkMessageResponse newInstance() {
|
||||
return new BenchmarkMessageResponse();
|
||||
|
@ -117,7 +115,7 @@ public class BenchmarkNettyLargeMessages {
|
|||
for (int i = 0; i < 1; i++) {
|
||||
BenchmarkMessageRequest message = new BenchmarkMessageRequest(2, BytesRef.EMPTY_BYTES);
|
||||
long start = System.currentTimeMillis();
|
||||
transportServiceClient.submitRequest(smallNode, "benchmark", message, options().withType(TransportRequestOptions.Type.STATE), new BaseTransportResponseHandler<BenchmarkMessageResponse>() {
|
||||
transportServiceClient.submitRequest(smallNode, "benchmark", message, TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build(), new BaseTransportResponseHandler<BenchmarkMessageResponse>() {
|
||||
@Override
|
||||
public BenchmarkMessageResponse newInstance() {
|
||||
return new BenchmarkMessageResponse();
|
||||
|
|
|
@ -129,7 +129,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
|
|||
throw new IllegalArgumentException();
|
||||
}
|
||||
|
||||
iteration.transportService.sendRequest(node, "action", new TestRequest(), new TransportRequestOptions(), new BaseTransportResponseHandler<TestResponse>() {
|
||||
iteration.transportService.sendRequest(node, "action", new TestRequest(), TransportRequestOptions.EMPTY, new BaseTransportResponseHandler<TestResponse>() {
|
||||
@Override
|
||||
public TestResponse newInstance() {
|
||||
return new TestResponse();
|
||||
|
|
|
@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.transport.TransportRequestOptions.options;
|
||||
import static org.hamcrest.Matchers.endsWith;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
@ -171,7 +170,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
|
@ -256,7 +255,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
@Override
|
||||
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
|
||||
try {
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.options().withCompress(true));
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.builder().withCompress(true).build());
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
assertThat(e.getMessage(), false, equalTo(true));
|
||||
|
@ -265,7 +264,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
});
|
||||
|
||||
TransportFuture<TransportResponse.Empty> res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<TransportResponse.Empty>() {
|
||||
TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler<TransportResponse.Empty>() {
|
||||
@Override
|
||||
public TransportResponse.Empty newInstance() {
|
||||
return TransportResponse.Empty.INSTANCE;
|
||||
|
@ -303,7 +302,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
try {
|
||||
channel.sendResponse(new StringMessageResponse("hello " + request.message), TransportResponseOptions.options().withCompress(true));
|
||||
channel.sendResponse(new StringMessageResponse("hello " + request.message), TransportResponseOptions.builder().withCompress(true).build());
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
assertThat(e.getMessage(), false, equalTo(true));
|
||||
|
@ -312,7 +311,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
});
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
|
@ -421,7 +420,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
}
|
||||
});
|
||||
TransportFuture<TransportResponse.Empty> foobar = serviceB.submitRequest(nodeA, "foobar",
|
||||
new StringMessageRequest(""), options(), EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||
new StringMessageRequest(""), TransportRequestOptions.EMPTY, EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||
latch2.countDown();
|
||||
try {
|
||||
foobar.txGet();
|
||||
|
@ -448,7 +447,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
});
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloTimeoutNoResponse",
|
||||
new StringMessageRequest("moshe"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
|
@ -500,7 +499,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
});
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse",
|
||||
new StringMessageRequest("300ms"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
new StringMessageRequest("300ms"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
|
@ -536,7 +535,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
final int counter = i;
|
||||
// now, try and send another request, this times, with a short timeout
|
||||
res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse",
|
||||
new StringMessageRequest(counter + "ms"), options().withTimeout(3000), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
|
@ -621,7 +620,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
|
||||
tracer.reset(4);
|
||||
boolean timeout = randomBoolean();
|
||||
TransportRequestOptions options = timeout ? new TransportRequestOptions().withTimeout(1) : TransportRequestOptions.EMPTY;
|
||||
TransportRequestOptions options = timeout ? TransportRequestOptions.builder().withTimeout(1).build(): TransportRequestOptions.EMPTY;
|
||||
serviceA.sendRequest(nodeB, "test", new StringMessageRequest("", 10), options, noopResponseHandler);
|
||||
requestCompleted.acquire();
|
||||
tracer.expectedEvents.get().await();
|
||||
|
@ -1107,7 +1106,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
serviceB.addUnresponsiveRule(serviceA);
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
|
|
|
@ -81,7 +81,7 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
@Override
|
||||
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
|
||||
try {
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.options());
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
assertThat(e.getMessage(), false, equalTo(true));
|
||||
|
@ -93,7 +93,7 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
int rounds = scaledRandomIntBetween(100, 5000);
|
||||
for (int i = 0; i < rounds; i++) {
|
||||
serviceB.submitRequest(nodeA, "sayHello",
|
||||
TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(randomBoolean()), new BaseTransportResponseHandler<TransportResponse.Empty>() {
|
||||
TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(randomBoolean()).build(), new BaseTransportResponseHandler<TransportResponse.Empty>() {
|
||||
@Override
|
||||
public TransportResponse.Empty newInstance() {
|
||||
return TransportResponse.Empty.INSTANCE;
|
||||
|
|
Loading…
Reference in New Issue