Merge pull request #14760 from jasontedor/immutable-transport-options
Transport options should be immutable
This commit is contained in:
commit
c09103c35b
|
@ -47,9 +47,9 @@ public class BulkAction extends Action<BulkRequest, BulkResponse, BulkRequestBui
|
|||
|
||||
@Override
|
||||
public TransportRequestOptions transportOptions(Settings settings) {
|
||||
return TransportRequestOptions.options()
|
||||
return TransportRequestOptions.builder()
|
||||
.withType(TransportRequestOptions.Type.BULK)
|
||||
.withCompress(settings.getAsBoolean("action.bulk.compress", true)
|
||||
);
|
||||
).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -126,11 +126,11 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
|
|||
});
|
||||
return;
|
||||
}
|
||||
TransportRequestOptions transportRequestOptions = TransportRequestOptions.options();
|
||||
TransportRequestOptions.Builder builder = TransportRequestOptions.builder();
|
||||
if (request.timeout() != null) {
|
||||
transportRequestOptions.withTimeout(request.timeout());
|
||||
builder.withTimeout(request.timeout());
|
||||
}
|
||||
transportRequestOptions.withCompress(transportCompress());
|
||||
builder.withCompress(transportCompress());
|
||||
for (int i = 0; i < nodesIds.length; i++) {
|
||||
final String nodeId = nodesIds[i];
|
||||
final int idx = i;
|
||||
|
@ -145,7 +145,7 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
|
|||
onFailure(idx, nodeId, new NodeShouldNotConnectException(clusterService.localNode(), node));
|
||||
} else {
|
||||
NodeRequest nodeRequest = newNodeRequest(nodeId, request);
|
||||
transportService.sendRequest(node, transportNodeAction, nodeRequest, transportRequestOptions, new BaseTransportResponseHandler<NodeResponse>() {
|
||||
transportService.sendRequest(node, transportNodeAction, nodeRequest, builder.build(), new BaseTransportResponseHandler<NodeResponse>() {
|
||||
@Override
|
||||
public NodeResponse newInstance() {
|
||||
return newNodeResponse();
|
||||
|
|
|
@ -359,7 +359,7 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
try {
|
||||
LivenessResponse livenessResponse = transportService.submitRequest(listedNode, TransportLivenessAction.NAME,
|
||||
headers.applyTo(new LivenessRequest()),
|
||||
TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout),
|
||||
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
|
||||
new FutureTransportResponseHandler<LivenessResponse>() {
|
||||
@Override
|
||||
public LivenessResponse newInstance() {
|
||||
|
@ -430,7 +430,7 @@ public class TransportClientNodesService extends AbstractComponent {
|
|||
}
|
||||
transportService.sendRequest(listedNode, ClusterStateAction.NAME,
|
||||
headers.applyTo(Requests.clusterStateRequest().clear().nodes(true).local(true)),
|
||||
TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout),
|
||||
TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout).build(),
|
||||
new BaseTransportResponseHandler<ClusterStateResponse>() {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -39,8 +39,6 @@ import java.io.IOException;
|
|||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.elasticsearch.transport.TransportRequestOptions.options;
|
||||
|
||||
/**
|
||||
* A fault detection that pings the master periodically to see if its alive.
|
||||
*/
|
||||
|
@ -222,7 +220,7 @@ public class MasterFaultDetection extends FaultDetection {
|
|||
return;
|
||||
}
|
||||
final MasterPingRequest request = new MasterPingRequest(clusterService.localNode().id(), masterToPing.id(), clusterName);
|
||||
final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
|
||||
final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout).build();
|
||||
transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, new BaseTransportResponseHandler<MasterPingResponseResponse>() {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -34,7 +34,6 @@ import java.util.concurrent.ConcurrentMap;
|
|||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
|
||||
import static org.elasticsearch.transport.TransportRequestOptions.options;
|
||||
|
||||
/**
|
||||
* A fault detection of multiple nodes.
|
||||
|
@ -189,7 +188,7 @@ public class NodesFaultDetection extends FaultDetection {
|
|||
return;
|
||||
}
|
||||
final PingRequest pingRequest = new PingRequest(node.id(), clusterName, localNode, clusterStateVersion);
|
||||
final TransportRequestOptions options = options().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout);
|
||||
final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING).withTimeout(pingRetryTimeout).build();
|
||||
transportService.sendRequest(node, PING_ACTION_NAME, pingRequest, options, new BaseTransportResponseHandler<PingResponse>() {
|
||||
@Override
|
||||
public PingResponse newInstance() {
|
||||
|
|
|
@ -437,7 +437,7 @@ public class UnicastZenPing extends AbstractLifecycleComponent<ZenPing> implemen
|
|||
|
||||
private void sendPingRequestToNode(final int id, final TimeValue timeout, final UnicastPingRequest pingRequest, final CountDownLatch latch, final DiscoveryNode node, final DiscoveryNode nodeToSend) {
|
||||
logger.trace("[{}] sending to {}", id, nodeToSend);
|
||||
transportService.sendRequest(nodeToSend, ACTION_NAME, pingRequest, TransportRequestOptions.options().withTimeout((long) (timeout.millis() * 1.25)), new BaseTransportResponseHandler<UnicastPingResponse>() {
|
||||
transportService.sendRequest(nodeToSend, ACTION_NAME, pingRequest, TransportRequestOptions.builder().withTimeout((long) (timeout.millis() * 1.25)).build(), new BaseTransportResponseHandler<UnicastPingResponse>() {
|
||||
|
||||
@Override
|
||||
public UnicastPingResponse newInstance() {
|
||||
|
|
|
@ -248,7 +248,7 @@ public class PublishClusterStateAction extends AbstractComponent {
|
|||
// -> no need to put a timeout on the options here, because we want the response to eventually be received
|
||||
// and not log an error if it arrives after the timeout
|
||||
// -> no need to compress, we already compressed the bytes
|
||||
TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withCompress(false);
|
||||
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).withCompress(false).build();
|
||||
transportService.sendRequest(node, SEND_ACTION_NAME,
|
||||
new BytesTransportRequest(bytes, node.version()),
|
||||
options,
|
||||
|
@ -282,7 +282,7 @@ public class PublishClusterStateAction extends AbstractComponent {
|
|||
private void sendCommitToNode(final DiscoveryNode node, final ClusterState clusterState, final SendingController sendingController) {
|
||||
try {
|
||||
logger.trace("sending commit for cluster state (uuid: [{}], version [{}]) to [{}]", clusterState.stateUUID(), clusterState.version(), node);
|
||||
TransportRequestOptions options = TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE);
|
||||
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build();
|
||||
// no need to put a timeout on the options here, because we want the response to eventually be received
|
||||
// and not log an error if it arrives after the timeout
|
||||
transportService.sendRequest(node, COMMIT_ACTION_NAME,
|
||||
|
|
|
@ -109,10 +109,11 @@ public class RecoverySourceHandler {
|
|||
this.shardId = this.request.shardId().id();
|
||||
|
||||
this.response = new RecoveryResponse();
|
||||
this.requestOptions = TransportRequestOptions.options()
|
||||
this.requestOptions = TransportRequestOptions.builder()
|
||||
.withCompress(recoverySettings.compress())
|
||||
.withType(TransportRequestOptions.Type.RECOVERY)
|
||||
.withTimeout(recoverySettings.internalActionTimeout());
|
||||
.withTimeout(recoverySettings.internalActionTimeout())
|
||||
.build();
|
||||
|
||||
}
|
||||
|
||||
|
@ -244,7 +245,7 @@ public class RecoverySourceHandler {
|
|||
response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes,
|
||||
translogView.totalOperations());
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
|
||||
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
|
||||
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
|
||||
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
});
|
||||
// How many bytes we've copied since we last called RateLimiter.pause
|
||||
|
@ -263,7 +264,7 @@ public class RecoverySourceHandler {
|
|||
try {
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
|
||||
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()),
|
||||
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
|
||||
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
|
||||
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
} catch (RemoteTransportException remoteException) {
|
||||
final IOException corruptIndexException;
|
||||
|
@ -332,7 +333,7 @@ public class RecoverySourceHandler {
|
|||
// garbage collection (not the JVM's GC!) of tombstone deletes
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.PREPARE_TRANSLOG,
|
||||
new RecoveryPrepareForTranslogOperationsRequest(request.recoveryId(), request.shardId(), translogView.totalOperations()),
|
||||
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -390,7 +391,7 @@ public class RecoverySourceHandler {
|
|||
// during this time
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FINALIZE,
|
||||
new RecoveryFinalizeRecoveryRequest(request.recoveryId(), request.shardId()),
|
||||
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionLongTimeout()),
|
||||
TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionLongTimeout()).build(),
|
||||
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
}
|
||||
});
|
||||
|
@ -431,10 +432,11 @@ public class RecoverySourceHandler {
|
|||
throw new ElasticsearchException("failed to get next operation from translog", ex);
|
||||
}
|
||||
|
||||
final TransportRequestOptions recoveryOptions = TransportRequestOptions.options()
|
||||
final TransportRequestOptions recoveryOptions = TransportRequestOptions.builder()
|
||||
.withCompress(recoverySettings.compress())
|
||||
.withType(TransportRequestOptions.Type.RECOVERY)
|
||||
.withTimeout(recoverySettings.internalActionLongTimeout());
|
||||
.withTimeout(recoverySettings.internalActionLongTimeout())
|
||||
.build();
|
||||
|
||||
if (operation == null) {
|
||||
logger.trace("[{}][{}] no translog operations to send to {}",
|
||||
|
|
|
@ -21,64 +21,16 @@ package org.elasticsearch.transport;
|
|||
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class TransportRequestOptions {
|
||||
|
||||
public static final TransportRequestOptions EMPTY = options();
|
||||
private final TimeValue timeout;
|
||||
private final boolean compress;
|
||||
private final Type type;
|
||||
|
||||
public static TransportRequestOptions options() {
|
||||
return new TransportRequestOptions();
|
||||
}
|
||||
|
||||
public static enum Type {
|
||||
RECOVERY,
|
||||
BULK,
|
||||
REG,
|
||||
STATE,
|
||||
PING;
|
||||
|
||||
public static Type fromString(String type) {
|
||||
if ("bulk".equalsIgnoreCase(type)) {
|
||||
return BULK;
|
||||
} else if ("reg".equalsIgnoreCase(type)) {
|
||||
return REG;
|
||||
} else if ("state".equalsIgnoreCase(type)) {
|
||||
return STATE;
|
||||
} else if ("recovery".equalsIgnoreCase(type)) {
|
||||
return RECOVERY;
|
||||
} else if ("ping".equalsIgnoreCase(type)) {
|
||||
return PING;
|
||||
} else {
|
||||
throw new IllegalArgumentException("failed to match transport type for [" + type + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private TimeValue timeout;
|
||||
|
||||
private boolean compress;
|
||||
|
||||
private Type type = Type.REG;
|
||||
|
||||
public TransportRequestOptions withTimeout(long timeout) {
|
||||
return withTimeout(TimeValue.timeValueMillis(timeout));
|
||||
}
|
||||
|
||||
public TransportRequestOptions withTimeout(TimeValue timeout) {
|
||||
private TransportRequestOptions(TimeValue timeout, boolean compress, Type type) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TransportRequestOptions withCompress(boolean compress) {
|
||||
this.compress = compress;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TransportRequestOptions withType(Type type) {
|
||||
this.type = type;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TimeValue timeout() {
|
||||
|
@ -92,4 +44,57 @@ public class TransportRequestOptions {
|
|||
public Type type() {
|
||||
return this.type;
|
||||
}
|
||||
|
||||
public static final TransportRequestOptions EMPTY = new TransportRequestOptions.Builder().build();
|
||||
|
||||
public enum Type {
|
||||
RECOVERY,
|
||||
BULK,
|
||||
REG,
|
||||
STATE,
|
||||
PING
|
||||
}
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public static Builder builder(TransportRequestOptions options) {
|
||||
return new Builder()
|
||||
.withTimeout(options.timeout)
|
||||
.withCompress(options.compress)
|
||||
.withType(options.type());
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private TimeValue timeout;
|
||||
private boolean compress;
|
||||
private Type type = Type.REG;
|
||||
|
||||
private Builder() {
|
||||
}
|
||||
|
||||
public Builder withTimeout(long timeout) {
|
||||
return withTimeout(TimeValue.timeValueMillis(timeout));
|
||||
}
|
||||
|
||||
public Builder withTimeout(TimeValue timeout) {
|
||||
this.timeout = timeout;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withCompress(boolean compress) {
|
||||
this.compress = compress;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder withType(Type type) {
|
||||
this.type = type;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TransportRequestOptions build() {
|
||||
return new TransportRequestOptions(timeout, compress, type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,20 +24,37 @@ package org.elasticsearch.transport;
|
|||
*/
|
||||
public class TransportResponseOptions {
|
||||
|
||||
public static final TransportResponseOptions EMPTY = options();
|
||||
private final boolean compress;
|
||||
|
||||
public static TransportResponseOptions options() {
|
||||
return new TransportResponseOptions();
|
||||
}
|
||||
|
||||
private boolean compress;
|
||||
|
||||
public TransportResponseOptions withCompress(boolean compress) {
|
||||
private TransportResponseOptions(boolean compress) {
|
||||
this.compress = compress;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean compress() {
|
||||
return this.compress;
|
||||
}
|
||||
|
||||
public static final TransportResponseOptions EMPTY = TransportResponseOptions.builder().build();
|
||||
|
||||
public static Builder builder() {
|
||||
return new Builder();
|
||||
}
|
||||
|
||||
public static Builder builder(TransportResponseOptions options) {
|
||||
return new Builder()
|
||||
.withCompress(options.compress);
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
private boolean compress;
|
||||
|
||||
public Builder withCompress(boolean compress) {
|
||||
this.compress = compress;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TransportResponseOptions build() {
|
||||
return new TransportResponseOptions(compress);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -812,7 +812,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
|
|||
Channel targetChannel = nodeChannel(node, options);
|
||||
|
||||
if (compress) {
|
||||
options.withCompress(true);
|
||||
options = TransportRequestOptions.builder(options).withCompress(true).build();
|
||||
}
|
||||
|
||||
byte status = 0;
|
||||
|
|
|
@ -78,7 +78,7 @@ public class NettyTransportChannel implements TransportChannel {
|
|||
@Override
|
||||
public void sendResponse(TransportResponse response, TransportResponseOptions options) throws IOException {
|
||||
if (transport.compress) {
|
||||
options.withCompress(true);
|
||||
options = TransportResponseOptions.builder(options).withCompress(transport.compress).build();
|
||||
}
|
||||
|
||||
byte status = 0;
|
||||
|
|
|
@ -38,8 +38,6 @@ import org.elasticsearch.transport.netty.NettyTransport;
|
|||
import java.net.InetAddress;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.elasticsearch.transport.TransportRequestOptions.options;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
|
@ -85,7 +83,7 @@ public class BenchmarkNettyLargeMessages {
|
|||
public void run() {
|
||||
for (int i = 0; i < NUMBER_OF_ITERATIONS; i++) {
|
||||
BenchmarkMessageRequest message = new BenchmarkMessageRequest(1, payload);
|
||||
transportServiceClient.submitRequest(bigNode, "benchmark", message, options().withType(TransportRequestOptions.Type.BULK), new BaseTransportResponseHandler<BenchmarkMessageResponse>() {
|
||||
transportServiceClient.submitRequest(bigNode, "benchmark", message, TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK).build(), new BaseTransportResponseHandler<BenchmarkMessageResponse>() {
|
||||
@Override
|
||||
public BenchmarkMessageResponse newInstance() {
|
||||
return new BenchmarkMessageResponse();
|
||||
|
@ -117,7 +115,7 @@ public class BenchmarkNettyLargeMessages {
|
|||
for (int i = 0; i < 1; i++) {
|
||||
BenchmarkMessageRequest message = new BenchmarkMessageRequest(2, BytesRef.EMPTY_BYTES);
|
||||
long start = System.currentTimeMillis();
|
||||
transportServiceClient.submitRequest(smallNode, "benchmark", message, options().withType(TransportRequestOptions.Type.STATE), new BaseTransportResponseHandler<BenchmarkMessageResponse>() {
|
||||
transportServiceClient.submitRequest(smallNode, "benchmark", message, TransportRequestOptions.builder().withType(TransportRequestOptions.Type.STATE).build(), new BaseTransportResponseHandler<BenchmarkMessageResponse>() {
|
||||
@Override
|
||||
public BenchmarkMessageResponse newInstance() {
|
||||
return new BenchmarkMessageResponse();
|
||||
|
|
|
@ -129,7 +129,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
|
|||
throw new IllegalArgumentException();
|
||||
}
|
||||
|
||||
iteration.transportService.sendRequest(node, "action", new TestRequest(), new TransportRequestOptions(), new BaseTransportResponseHandler<TestResponse>() {
|
||||
iteration.transportService.sendRequest(node, "action", new TestRequest(), TransportRequestOptions.EMPTY, new BaseTransportResponseHandler<TestResponse>() {
|
||||
@Override
|
||||
public TestResponse newInstance() {
|
||||
return new TestResponse();
|
||||
|
|
|
@ -41,7 +41,6 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static org.elasticsearch.transport.TransportRequestOptions.options;
|
||||
import static org.hamcrest.Matchers.endsWith;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
|
@ -171,7 +170,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
|
@ -256,7 +255,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
@Override
|
||||
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
|
||||
try {
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.options().withCompress(true));
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.builder().withCompress(true).build());
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
assertThat(e.getMessage(), false, equalTo(true));
|
||||
|
@ -265,7 +264,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
});
|
||||
|
||||
TransportFuture<TransportResponse.Empty> res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<TransportResponse.Empty>() {
|
||||
TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler<TransportResponse.Empty>() {
|
||||
@Override
|
||||
public TransportResponse.Empty newInstance() {
|
||||
return TransportResponse.Empty.INSTANCE;
|
||||
|
@ -303,7 +302,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
try {
|
||||
channel.sendResponse(new StringMessageResponse("hello " + request.message), TransportResponseOptions.options().withCompress(true));
|
||||
channel.sendResponse(new StringMessageResponse("hello " + request.message), TransportResponseOptions.builder().withCompress(true).build());
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
assertThat(e.getMessage(), false, equalTo(true));
|
||||
|
@ -312,7 +311,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
});
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
|
@ -421,7 +420,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
}
|
||||
});
|
||||
TransportFuture<TransportResponse.Empty> foobar = serviceB.submitRequest(nodeA, "foobar",
|
||||
new StringMessageRequest(""), options(), EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||
new StringMessageRequest(""), TransportRequestOptions.EMPTY, EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||
latch2.countDown();
|
||||
try {
|
||||
foobar.txGet();
|
||||
|
@ -448,7 +447,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
});
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloTimeoutNoResponse",
|
||||
new StringMessageRequest("moshe"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
|
@ -500,7 +499,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
});
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse",
|
||||
new StringMessageRequest("300ms"), options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
new StringMessageRequest("300ms"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
|
@ -536,7 +535,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
final int counter = i;
|
||||
// now, try and send another request, this times, with a short timeout
|
||||
res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse",
|
||||
new StringMessageRequest(counter + "ms"), options().withTimeout(3000), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
|
@ -621,7 +620,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
|
||||
tracer.reset(4);
|
||||
boolean timeout = randomBoolean();
|
||||
TransportRequestOptions options = timeout ? new TransportRequestOptions().withTimeout(1) : TransportRequestOptions.EMPTY;
|
||||
TransportRequestOptions options = timeout ? TransportRequestOptions.builder().withTimeout(1).build(): TransportRequestOptions.EMPTY;
|
||||
serviceA.sendRequest(nodeB, "test", new StringMessageRequest("", 10), options, noopResponseHandler);
|
||||
requestCompleted.acquire();
|
||||
tracer.expectedEvents.get().await();
|
||||
|
@ -1107,7 +1106,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
serviceB.addUnresponsiveRule(serviceA);
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.options().withTimeout(100), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
|
|
|
@ -81,7 +81,7 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
@Override
|
||||
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
|
||||
try {
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.options());
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
|
||||
} catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
assertThat(e.getMessage(), false, equalTo(true));
|
||||
|
@ -93,7 +93,7 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
int rounds = scaledRandomIntBetween(100, 5000);
|
||||
for (int i = 0; i < rounds; i++) {
|
||||
serviceB.submitRequest(nodeA, "sayHello",
|
||||
TransportRequest.Empty.INSTANCE, TransportRequestOptions.options().withCompress(randomBoolean()), new BaseTransportResponseHandler<TransportResponse.Empty>() {
|
||||
TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(randomBoolean()).build(), new BaseTransportResponseHandler<TransportResponse.Empty>() {
|
||||
@Override
|
||||
public TransportResponse.Empty newInstance() {
|
||||
return TransportResponse.Empty.INSTANCE;
|
||||
|
|
Loading…
Reference in New Issue