Adjust line-length of transport related classes to coding standard
This commit is contained in:
parent
c3f4eb36e3
commit
3688629e11
|
@ -404,10 +404,6 @@
|
|||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]PrimaryShardAllocator.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]ReplicaShardAllocator.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]TransportNodesListGatewayMetaState.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]HttpTransportSettings.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]netty[/\\]HttpRequestHandler.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]netty[/\\]NettyHttpChannel.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]netty[/\\]NettyHttpServerTransport.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]AlreadyExpiredException.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]CompositeIndexEventListener.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexSettings.java" checks="LineLength" />
|
||||
|
@ -808,12 +804,6 @@
|
|||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]snapshots[/\\]SnapshotShardsService.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]snapshots[/\\]SnapshotsService.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]threadpool[/\\]ThreadPool.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]transport[/\\]PlainTransportFuture.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]transport[/\\]RequestHandlerRegistry.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]transport[/\\]Transport.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]transport[/\\]TransportChannelResponseHandler.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]transport[/\\]TransportService.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]transport[/\\]netty[/\\]NettyTransport.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]apache[/\\]lucene[/\\]queries[/\\]BlendedTermQueryTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]apache[/\\]lucene[/\\]search[/\\]postingshighlight[/\\]CustomPostingsHighlighterTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ESExceptionTests.java" checks="LineLength" />
|
||||
|
@ -876,9 +866,6 @@
|
|||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]bwcompat[/\\]RecoveryWithUnsupportedIndicesIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]bwcompat[/\\]RestoreBackwardsCompatIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]AbstractClientHeadersTestCase.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]transport[/\\]FailAndRetryMockTransport.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]transport[/\\]TransportClientIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]client[/\\]transport[/\\]TransportClientRetryIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterHealthIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterInfoServiceIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]ClusterModuleTests.java" checks="LineLength" />
|
||||
|
@ -977,7 +964,6 @@
|
|||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]rounding[/\\]TimeZoneRoundingTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]settings[/\\]ScopedSettingsTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]settings[/\\]SettingTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]transport[/\\]BoundTransportAddressTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]unit[/\\]DistanceUnitTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]unit[/\\]FuzzinessTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]util[/\\]BigArraysTests.java" checks="LineLength" />
|
||||
|
@ -1023,8 +1009,6 @@
|
|||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]gateway[/\\]ReusePeerRecoverySharedTest.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]get[/\\]GetActionIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]netty[/\\]NettyHttpServerPipeliningTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]netty[/\\]NettyPipeliningDisabledIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]netty[/\\]NettyPipeliningEnabledIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexModuleTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexServiceTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexWithShadowReplicasIT.java" checks="LineLength" />
|
||||
|
@ -1305,15 +1289,6 @@
|
|||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]threadpool[/\\]ThreadPoolSerializationTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]threadpool[/\\]UpdateThreadPoolSettingsTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]timestamp[/\\]SimpleTimestampIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]transport[/\\]AbstractSimpleTransportTestCase.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]transport[/\\]ActionNamesIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]transport[/\\]ContextAndHeaderTransportIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]transport[/\\]NettySizeHeaderFrameDecoderTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]transport[/\\]local[/\\]SimpleLocalTransportTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]transport[/\\]netty[/\\]NettyScheduledPingTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]transport[/\\]netty[/\\]NettyTransportIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]transport[/\\]netty[/\\]NettyTransportMultiPortIntegrationIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]transport[/\\]netty[/\\]NettyTransportMultiPortTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]tribe[/\\]TribeIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]ttl[/\\]SimpleTTLIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]update[/\\]UpdateIT.java" checks="LineLength" />
|
||||
|
@ -1503,9 +1478,6 @@
|
|||
<suppress files="test[/\\]framework[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]rest[/\\]support[/\\]FileUtils.java" checks="LineLength" />
|
||||
<suppress files="test[/\\]framework[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]store[/\\]MockFSDirectoryService.java" checks="LineLength" />
|
||||
<suppress files="test[/\\]framework[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]store[/\\]MockFSIndexStore.java" checks="LineLength" />
|
||||
<suppress files="test[/\\]framework[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]transport[/\\]AssertingLocalTransport.java" checks="LineLength" />
|
||||
<suppress files="test[/\\]framework[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]transport[/\\]CapturingTransport.java" checks="LineLength" />
|
||||
<suppress files="test[/\\]framework[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]transport[/\\]MockTransportService.java" checks="LineLength" />
|
||||
<suppress files="test[/\\]framework[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]rest[/\\]test[/\\]FileUtilsTests.java" checks="LineLength" />
|
||||
<suppress files="test[/\\]framework[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]rest[/\\]test[/\\]JsonPathTests.java" checks="LineLength" />
|
||||
<suppress files="test[/\\]framework[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]test[/\\]rest[/\\]test[/\\]RestTestParserTests.java" checks="LineLength" />
|
||||
|
@ -1516,7 +1488,6 @@
|
|||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]settings[/\\]SettingsModule.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]rest[/\\]action[/\\]admin[/\\]indices[/\\]settings[/\\]RestGetSettingsAction.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]tribe[/\\]TribeService.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]transport[/\\]TransportModuleTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]search[/\\]sort[/\\]GeoDistanceSortBuilderIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]rest[/\\]CorsNotSetIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]settings[/\\]SettingsModuleTests.java" checks="LineLength" />
|
||||
|
|
|
@ -36,13 +36,13 @@ public final class HttpTransportSettings {
|
|||
public static final Setting<Boolean> SETTING_CORS_ENABLED =
|
||||
Setting.boolSetting("http.cors.enabled", false, Property.NodeScope);
|
||||
public static final Setting<String> SETTING_CORS_ALLOW_ORIGIN =
|
||||
new Setting<String>("http.cors.allow-origin", "", (value) -> value, Property.NodeScope);
|
||||
new Setting<>("http.cors.allow-origin", "", (value) -> value, Property.NodeScope);
|
||||
public static final Setting<Integer> SETTING_CORS_MAX_AGE =
|
||||
Setting.intSetting("http.cors.max-age", 1728000, Property.NodeScope);
|
||||
public static final Setting<String> SETTING_CORS_ALLOW_METHODS =
|
||||
new Setting<String>("http.cors.allow-methods", "OPTIONS, HEAD, GET, POST, PUT, DELETE", (value) -> value, Property.NodeScope);
|
||||
new Setting<>("http.cors.allow-methods", "OPTIONS, HEAD, GET, POST, PUT, DELETE", (value) -> value, Property.NodeScope);
|
||||
public static final Setting<String> SETTING_CORS_ALLOW_HEADERS =
|
||||
new Setting<String>("http.cors.allow-headers", "X-Requested-With, Content-Type, Content-Length", (value) -> value, Property.NodeScope);
|
||||
new Setting<>("http.cors.allow-headers", "X-Requested-With, Content-Type, Content-Length", (value) -> value, Property.NodeScope);
|
||||
public static final Setting<Boolean> SETTING_CORS_ALLOW_CREDENTIALS =
|
||||
Setting.boolSetting("http.cors.allow-credentials", false, Property.NodeScope);
|
||||
public static final Setting<Boolean> SETTING_PIPELINING =
|
||||
|
@ -61,7 +61,7 @@ public final class HttpTransportSettings {
|
|||
listSetting("http.bind_host", SETTING_HTTP_HOST, Function.identity(), Property.NodeScope);
|
||||
|
||||
public static final Setting<PortsRange> SETTING_HTTP_PORT =
|
||||
new Setting<PortsRange>("http.port", "9200-9300", PortsRange::new, Property.NodeScope);
|
||||
new Setting<>("http.port", "9200-9300", PortsRange::new, Property.NodeScope);
|
||||
public static final Setting<Integer> SETTING_HTTP_PUBLISH_PORT =
|
||||
Setting.intSetting("http.publish_port", -1, -1, Property.NodeScope);
|
||||
public static final Setting<Boolean> SETTING_HTTP_DETAILED_ERRORS_ENABLED =
|
||||
|
|
|
@ -118,7 +118,8 @@ public final class NettyHttpChannel extends AbstractRestChannel {
|
|||
ChannelFuture future;
|
||||
|
||||
if (orderedUpstreamMessageEvent != null) {
|
||||
OrderedDownstreamChannelEvent downstreamChannelEvent = new OrderedDownstreamChannelEvent(orderedUpstreamMessageEvent, 0, true, resp);
|
||||
OrderedDownstreamChannelEvent downstreamChannelEvent =
|
||||
new OrderedDownstreamChannelEvent(orderedUpstreamMessageEvent, 0, true, resp);
|
||||
future = downstreamChannelEvent.getFuture();
|
||||
channel.getPipeline().sendDownstream(downstreamChannelEvent);
|
||||
} else {
|
||||
|
|
|
@ -249,7 +249,8 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
|
|||
if (receivePredictorMax.bytes() == receivePredictorMin.bytes()) {
|
||||
receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory((int) receivePredictorMax.bytes());
|
||||
} else {
|
||||
receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory((int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
|
||||
receiveBufferSizePredictorFactory = new AdaptiveReceiveBufferSizePredictorFactory(
|
||||
(int) receivePredictorMin.bytes(), (int) receivePredictorMin.bytes(), (int) receivePredictorMax.bytes());
|
||||
}
|
||||
|
||||
this.compression = SETTING_HTTP_COMPRESSION.get(settings);
|
||||
|
@ -265,8 +266,9 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
|
|||
}
|
||||
this.maxContentLength = maxContentLength;
|
||||
|
||||
logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], receive_predictor[{}->{}], pipelining[{}], pipelining_max_events[{}]",
|
||||
maxChunkSize, maxHeaderSize, maxInitialLineLength, this.maxContentLength, receivePredictorMin, receivePredictorMax, pipelining, pipeliningMaxEvents);
|
||||
logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], " +
|
||||
"receive_predictor[{}->{}], pipelining[{}], pipelining_max_events[{}]", maxChunkSize, maxHeaderSize, maxInitialLineLength,
|
||||
this.maxContentLength, receivePredictorMin, receivePredictorMax, pipelining, pipeliningMaxEvents);
|
||||
}
|
||||
|
||||
public Settings settings() {
|
||||
|
@ -335,7 +337,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent<HttpSer
|
|||
|
||||
final int publishPort = resolvePublishPort(settings, boundAddresses, publishInetAddress);
|
||||
final InetSocketAddress publishAddress = new InetSocketAddress(publishInetAddress, publishPort);
|
||||
return new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[boundAddresses.size()]), new InetSocketTransportAddress(publishAddress));
|
||||
return new BoundTransportAddress(boundAddresses.toArray(new TransportAddress[0]), new InetSocketTransportAddress(publishAddress));
|
||||
}
|
||||
|
||||
// package private for tests
|
||||
|
|
|
@ -30,7 +30,8 @@ import java.util.concurrent.TimeoutException;
|
|||
/**
|
||||
*
|
||||
*/
|
||||
public class PlainTransportFuture<V extends TransportResponse> extends BaseFuture<V> implements TransportFuture<V>, TransportResponseHandler<V> {
|
||||
public class PlainTransportFuture<V extends TransportResponse> extends BaseFuture<V>
|
||||
implements TransportFuture<V>, TransportResponseHandler<V> {
|
||||
|
||||
private final TransportResponseHandler<V> handler;
|
||||
|
||||
|
|
|
@ -37,7 +37,8 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
|
|||
private final Supplier<Request> requestFactory;
|
||||
private final TaskManager taskManager;
|
||||
|
||||
public RequestHandlerRegistry(String action, Supplier<Request> requestFactory, TaskManager taskManager, TransportRequestHandler<Request> handler, String executor, boolean forceExecution) {
|
||||
public RequestHandlerRegistry(String action, Supplier<Request> requestFactory, TaskManager taskManager,
|
||||
TransportRequestHandler<Request> handler, String executor, boolean forceExecution) {
|
||||
this.action = action;
|
||||
this.requestFactory = requestFactory;
|
||||
assert newRequest() != null;
|
||||
|
|
|
@ -47,7 +47,7 @@ public interface Transport extends LifecycleComponent<Transport> {
|
|||
|
||||
/**
|
||||
* Further profile bound addresses
|
||||
* @return Should return null if transport does not support profiles, otherwise a map with name of profile and its bound transport address
|
||||
* @return <code>null</code> iff profiles are unsupported, otherwise a map with name of profile and its bound transport address
|
||||
*/
|
||||
Map<String, BoundTransportAddress> profileBoundAddresses();
|
||||
|
||||
|
|
|
@ -33,7 +33,9 @@ public abstract class TransportChannelResponseHandler<T extends TransportRespons
|
|||
/**
|
||||
* Convenience method for delegating an empty response to the provided transport channel
|
||||
*/
|
||||
public static TransportChannelResponseHandler<TransportResponse.Empty> emptyResponseHandler(ESLogger logger, TransportChannel channel, String extraInfoOnError) {
|
||||
public static TransportChannelResponseHandler<TransportResponse.Empty> emptyResponseHandler(ESLogger logger,
|
||||
TransportChannel channel,
|
||||
String extraInfoOnError) {
|
||||
return new TransportChannelResponseHandler<TransportResponse.Empty>(logger, channel, extraInfoOnError) {
|
||||
@Override
|
||||
public TransportResponse.Empty newInstance() {
|
||||
|
@ -45,7 +47,10 @@ public abstract class TransportChannelResponseHandler<T extends TransportRespons
|
|||
/**
|
||||
* Convenience method for delegating a response provided by supplier to the provided transport channel
|
||||
*/
|
||||
public static <T extends TransportResponse> TransportChannelResponseHandler responseHandler(ESLogger logger, Supplier<T> responseSupplier, TransportChannel channel, String extraInfoOnError) {
|
||||
public static <T extends TransportResponse> TransportChannelResponseHandler responseHandler(ESLogger logger,
|
||||
Supplier<T> responseSupplier,
|
||||
TransportChannel channel,
|
||||
String extraInfoOnError) {
|
||||
return new TransportChannelResponseHandler<T>(logger, channel, extraInfoOnError) {
|
||||
@Override
|
||||
public T newInstance() {
|
||||
|
|
|
@ -84,12 +84,13 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
|
||||
// An LRU (don't really care about concurrency here) that holds the latest timed out requests so if they
|
||||
// do show up, we can print more descriptive information about them
|
||||
final Map<Long, TimeoutInfoHolder> timeoutInfoHandlers = Collections.synchronizedMap(new LinkedHashMap<Long, TimeoutInfoHolder>(100, .75F, true) {
|
||||
@Override
|
||||
protected boolean removeEldestEntry(Map.Entry eldest) {
|
||||
return size() > 100;
|
||||
}
|
||||
});
|
||||
final Map<Long, TimeoutInfoHolder> timeoutInfoHandlers =
|
||||
Collections.synchronizedMap(new LinkedHashMap<Long, TimeoutInfoHolder>(100, .75F, true) {
|
||||
@Override
|
||||
protected boolean removeEldestEntry(Map.Entry eldest) {
|
||||
return size() > 100;
|
||||
}
|
||||
});
|
||||
|
||||
private final TransportService.Adapter adapter;
|
||||
|
||||
|
@ -203,7 +204,8 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
}
|
||||
@Override
|
||||
public void doRun() {
|
||||
holderToNotify.handler().handleException(new TransportException("transport stopped, action: " + holderToNotify.action()));
|
||||
TransportException ex = new TransportException("transport stopped, action: " + holderToNotify.action());
|
||||
holderToNotify.handler().handleException(ex);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -238,7 +240,8 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
}
|
||||
|
||||
public TransportStats stats() {
|
||||
return new TransportStats(transport.serverOpen(), adapter.rxMetric.count(), adapter.rxMetric.sum(), adapter.txMetric.count(), adapter.txMetric.sum());
|
||||
return new TransportStats(
|
||||
transport.serverOpen(), adapter.rxMetric.count(), adapter.rxMetric.sum(), adapter.txMetric.count(), adapter.txMetric.sum());
|
||||
}
|
||||
|
||||
public BoundTransportAddress boundAddress() {
|
||||
|
@ -288,7 +291,8 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
}
|
||||
|
||||
public <T extends TransportResponse> TransportFuture<T> submitRequest(DiscoveryNode node, String action, TransportRequest request,
|
||||
TransportRequestOptions options, TransportResponseHandler<T> handler) throws TransportException {
|
||||
TransportRequestOptions options,
|
||||
TransportResponseHandler<T> handler) throws TransportException {
|
||||
PlainTransportFuture<T> futureHandler = new PlainTransportFuture<>(handler);
|
||||
sendRequest(node, action, request, options, futureHandler);
|
||||
return futureHandler;
|
||||
|
@ -313,10 +317,12 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
} else {
|
||||
timeoutHandler = new TimeoutHandler(requestId);
|
||||
}
|
||||
clientHandlers.put(requestId, new RequestHolder<>(new ContextRestoreResponseHandler<T>(threadPool.getThreadContext().newStoredContext(), handler), node, action, timeoutHandler));
|
||||
TransportResponseHandler<T> responseHandler =
|
||||
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().newStoredContext(), handler);
|
||||
clientHandlers.put(requestId, new RequestHolder<>(responseHandler, node, action, timeoutHandler));
|
||||
if (lifecycle.stoppedOrClosed()) {
|
||||
// if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify the caller.
|
||||
// it will only notify if the toStop code hasn't done the work yet.
|
||||
// if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify
|
||||
// the caller. It will only notify if the toStop code hasn't done the work yet.
|
||||
throw new TransportException("TransportService is closed stopped can't send request");
|
||||
}
|
||||
if (timeoutHandler != null) {
|
||||
|
@ -432,7 +438,8 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
* @param executor The executor the request handling will be executed on
|
||||
* @param handler The handler itself that implements the request handling
|
||||
*/
|
||||
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory, String executor, TransportRequestHandler<Request> handler) {
|
||||
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> requestFactory, String executor,
|
||||
TransportRequestHandler<Request> handler) {
|
||||
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, requestFactory, taskManager, handler, executor, false);
|
||||
registerRequestHandler(reg);
|
||||
}
|
||||
|
@ -446,7 +453,9 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
* @param forceExecution Force execution on the executor queue and never reject it
|
||||
* @param handler The handler itself that implements the request handling
|
||||
*/
|
||||
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> request, String executor, boolean forceExecution, TransportRequestHandler<Request> handler) {
|
||||
public <Request extends TransportRequest> void registerRequestHandler(String action, Supplier<Request> request,
|
||||
String executor, boolean forceExecution,
|
||||
TransportRequestHandler<Request> handler) {
|
||||
RequestHandlerRegistry<Request> reg = new RequestHandlerRegistry<>(action, request, taskManager, handler, executor, forceExecution);
|
||||
registerRequestHandler(reg);
|
||||
}
|
||||
|
@ -487,7 +496,8 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) {
|
||||
public void onRequestSent(DiscoveryNode node, long requestId, String action, TransportRequest request,
|
||||
TransportRequestOptions options) {
|
||||
if (traceEnabled() && shouldTraceAction(action)) {
|
||||
traceRequestSent(node, requestId, action, options);
|
||||
}
|
||||
|
@ -555,7 +565,9 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
TimeoutInfoHolder timeoutInfoHolder = timeoutInfoHandlers.remove(requestId);
|
||||
if (timeoutInfoHolder != null) {
|
||||
long time = System.currentTimeMillis();
|
||||
logger.warn("Received response for a request that has timed out, sent [{}ms] ago, timed out [{}ms] ago, action [{}], node [{}], id [{}]", time - timeoutInfoHolder.sentTime(), time - timeoutInfoHolder.timeoutTime(), timeoutInfoHolder.action(), timeoutInfoHolder.node(), requestId);
|
||||
logger.warn("Received response for a request that has timed out, sent [{}ms] ago, timed out [{}ms] ago, " +
|
||||
"action [{}], node [{}], id [{}]", time - timeoutInfoHolder.sentTime(), time - timeoutInfoHolder.timeoutTime(),
|
||||
timeoutInfoHolder.action(), timeoutInfoHolder.node(), requestId);
|
||||
action = timeoutInfoHolder.action();
|
||||
sourceNode = timeoutInfoHolder.node();
|
||||
} else {
|
||||
|
@ -665,7 +677,9 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
final RequestHolder removedHolder = clientHandlers.remove(requestId);
|
||||
if (removedHolder != null) {
|
||||
assert removedHolder == holder : "two different holder instances for request [" + requestId + "]";
|
||||
removedHolder.handler().handleException(new ReceiveTimeoutTransportException(holder.node(), holder.action(), "request_id [" + requestId + "] timed out after [" + (timeoutTime - sentTime) + "ms]"));
|
||||
removedHolder.handler().handleException(
|
||||
new ReceiveTimeoutTransportException(holder.node(), holder.action(),
|
||||
"request_id [" + requestId + "] timed out after [" + (timeoutTime - sentTime) + "ms]"));
|
||||
} else {
|
||||
// response was processed, remove timeout info.
|
||||
timeoutInfoHandlers.remove(requestId);
|
||||
|
@ -678,7 +692,8 @@ public class TransportService extends AbstractLifecycleComponent<TransportServic
|
|||
* to make sure this doesn't run.
|
||||
*/
|
||||
public void cancel() {
|
||||
assert clientHandlers.get(requestId) == null : "cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers";
|
||||
assert clientHandlers.get(requestId) == null :
|
||||
"cancel must be called after the requestId [" + requestId + "] has been removed from clientHandlers";
|
||||
FutureUtils.cancel(future);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -63,7 +63,8 @@ abstract class FailAndRetryMockTransport<Response extends TransportResponse> imp
|
|||
|
||||
@Override
|
||||
@SuppressWarnings("unchecked")
|
||||
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
|
||||
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options)
|
||||
throws IOException, TransportException {
|
||||
|
||||
//we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info
|
||||
if (connectMode) {
|
||||
|
|
|
@ -65,8 +65,10 @@ public class TransportClientIT extends ESIntegTestCase {
|
|||
try {
|
||||
TransportAddress transportAddress = node.injector().getInstance(TransportService.class).boundAddress().publishAddress();
|
||||
client.addTransportAddress(transportAddress);
|
||||
assertThat(nodeService.connectedNodes().size(), greaterThanOrEqualTo(1)); // since we force transport clients there has to be one node started that we connect to.
|
||||
for (DiscoveryNode discoveryNode : nodeService.connectedNodes()) { // connected nodes have updated version
|
||||
// since we force transport clients there has to be one node started that we connect to.
|
||||
assertThat(nodeService.connectedNodes().size(), greaterThanOrEqualTo(1));
|
||||
// connected nodes have updated version
|
||||
for (DiscoveryNode discoveryNode : nodeService.connectedNodes()) {
|
||||
assertThat(discoveryNode.getVersion(), equalTo(Version.CURRENT));
|
||||
}
|
||||
|
||||
|
|
|
@ -58,9 +58,9 @@ public class TransportClientRetryIT extends ESIntegTestCase {
|
|||
.put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING.getKey(), true)
|
||||
.put(Environment.PATH_HOME_SETTING.getKey(), createTempDir());
|
||||
|
||||
try (TransportClient transportClient = TransportClient.builder().settings(builder.build()).build()) {
|
||||
transportClient.addTransportAddresses(addresses);
|
||||
assertThat(transportClient.connectedNodes().size(), equalTo(internalCluster().size()));
|
||||
try (TransportClient client = TransportClient.builder().settings(builder.build()).build()) {
|
||||
client.addTransportAddresses(addresses);
|
||||
assertThat(client.connectedNodes().size(), equalTo(internalCluster().size()));
|
||||
|
||||
int size = cluster().size();
|
||||
//kill all nodes one by one, leaving a single master/data node at the end of the loop
|
||||
|
@ -71,14 +71,14 @@ public class TransportClientRetryIT extends ESIntegTestCase {
|
|||
ClusterState clusterState;
|
||||
//use both variants of execute method: with and without listener
|
||||
if (randomBoolean()) {
|
||||
clusterState = transportClient.admin().cluster().state(clusterStateRequest).get().getState();
|
||||
clusterState = client.admin().cluster().state(clusterStateRequest).get().getState();
|
||||
} else {
|
||||
PlainListenableActionFuture<ClusterStateResponse> future = new PlainListenableActionFuture<>(transportClient.threadPool());
|
||||
transportClient.admin().cluster().state(clusterStateRequest, future);
|
||||
PlainListenableActionFuture<ClusterStateResponse> future = new PlainListenableActionFuture<>(client.threadPool());
|
||||
client.admin().cluster().state(clusterStateRequest, future);
|
||||
clusterState = future.get().getState();
|
||||
}
|
||||
assertThat(clusterState.nodes().getSize(), greaterThanOrEqualTo(size - j));
|
||||
assertThat(transportClient.connectedNodes().size(), greaterThanOrEqualTo(size - j));
|
||||
assertThat(client.connectedNodes().size(), greaterThanOrEqualTo(size - j));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,7 +44,8 @@ public class BoundTransportAddressTests extends ESTestCase {
|
|||
for (InetAddress address : inetAddresses) {
|
||||
transportAddressList.add(new InetSocketTransportAddress(address, randomIntBetween(9200, 9299)));
|
||||
}
|
||||
final BoundTransportAddress transportAddress = new BoundTransportAddress(transportAddressList.toArray(new InetSocketTransportAddress[0]), transportAddressList.get(0));
|
||||
final BoundTransportAddress transportAddress =
|
||||
new BoundTransportAddress(transportAddressList.toArray(new InetSocketTransportAddress[0]), transportAddressList.get(0));
|
||||
assertThat(transportAddress.boundAddresses().length, equalTo(transportAddressList.size()));
|
||||
|
||||
// serialize
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.http.netty;
|
|||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
|
@ -28,7 +29,6 @@ import org.elasticsearch.test.ESIntegTestCase.Scope;
|
|||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
@ -44,19 +44,24 @@ import static org.hamcrest.Matchers.hasSize;
|
|||
public class NettyPipeliningDisabledIT extends ESIntegTestCase {
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(NetworkModule.HTTP_ENABLED.getKey(), true).put("http.pipelining", false).build();
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put(NetworkModule.HTTP_ENABLED.getKey(), true)
|
||||
.put("http.pipelining", false)
|
||||
.build();
|
||||
}
|
||||
|
||||
public void testThatNettyHttpServerDoesNotSupportPipelining() throws Exception {
|
||||
ensureGreen();
|
||||
List<String> requests = Arrays.asList("/", "/_nodes/stats", "/", "/_cluster/state", "/", "/_nodes", "/");
|
||||
String[] requests = new String[] {"/", "/_nodes/stats", "/", "/_cluster/state", "/", "/_nodes", "/"};
|
||||
|
||||
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
|
||||
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses());
|
||||
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
|
||||
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(boundAddresses);
|
||||
|
||||
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
|
||||
Collection<HttpResponse> responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests.toArray(new String[]{}));
|
||||
assertThat(responses, hasSize(requests.size()));
|
||||
Collection<HttpResponse> responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests);
|
||||
assertThat(responses, hasSize(requests.length));
|
||||
|
||||
List<String> opaqueIds = new ArrayList<>(returnOpaqueIds(responses));
|
||||
|
||||
|
|
|
@ -21,15 +21,14 @@ package org.elasticsearch.http.netty;
|
|||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.http.HttpServerTransport;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
|
||||
import org.elasticsearch.test.ESIntegTestCase.Scope;
|
||||
import org.jboss.netty.handler.codec.http.HttpResponse;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
|
||||
import static org.elasticsearch.http.netty.NettyHttpClient.returnOpaqueIds;
|
||||
|
@ -41,17 +40,22 @@ import static org.hamcrest.Matchers.is;
|
|||
public class NettyPipeliningEnabledIT extends ESIntegTestCase {
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(NetworkModule.HTTP_ENABLED.getKey(), true).put("http.pipelining", true).build();
|
||||
return Settings.builder()
|
||||
.put(super.nodeSettings(nodeOrdinal))
|
||||
.put(NetworkModule.HTTP_ENABLED.getKey(), true)
|
||||
.put("http.pipelining", true)
|
||||
.build();
|
||||
}
|
||||
|
||||
public void testThatNettyHttpServerSupportsPipelining() throws Exception {
|
||||
List<String> requests = Arrays.asList("/", "/_nodes/stats", "/", "/_cluster/state", "/");
|
||||
String[] requests = new String[]{"/", "/_nodes/stats", "/", "/_cluster/state", "/"};
|
||||
|
||||
HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class);
|
||||
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses());
|
||||
TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses();
|
||||
InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(boundAddresses);
|
||||
|
||||
try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) {
|
||||
Collection<HttpResponse> responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests.toArray(new String[]{}));
|
||||
Collection<HttpResponse> responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests);
|
||||
assertThat(responses, hasSize(5));
|
||||
|
||||
Collection<String> opaqueIds = returnOpaqueIds(responses);
|
||||
|
@ -62,7 +66,7 @@ public class NettyPipeliningEnabledIT extends ESIntegTestCase {
|
|||
private void assertOpaqueIdsInOrder(Collection<String> opaqueIds) {
|
||||
// check if opaque ids are monotonically increasing
|
||||
int i = 0;
|
||||
String msg = String.format(Locale.ROOT, "Expected list of opaque ids to be monotonically increasing, got [" + opaqueIds + "]");
|
||||
String msg = String.format(Locale.ROOT, "Expected list of opaque ids to be monotonically increasing, got [%s]", opaqueIds);
|
||||
for (String opaqueId : opaqueIds) {
|
||||
assertThat(msg, opaqueId, is(String.valueOf(i++)));
|
||||
}
|
||||
|
|
|
@ -73,13 +73,21 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
super.setUp();
|
||||
threadPool = new ThreadPool(getClass().getName());
|
||||
serviceA = build(
|
||||
Settings.builder().put("name", "TS_A", TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "", TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING").build(),
|
||||
Settings.builder()
|
||||
.put("name", "TS_A")
|
||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
|
||||
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
|
||||
.build(),
|
||||
version0, new NamedWriteableRegistry()
|
||||
);
|
||||
serviceA.acceptIncomingRequests();
|
||||
nodeA = new DiscoveryNode("TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
|
||||
serviceB = build(
|
||||
Settings.builder().put("name", "TS_B", TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "", TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING").build(),
|
||||
Settings.builder()
|
||||
.put("name", "TS_B")
|
||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
|
||||
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
|
||||
.build(),
|
||||
version1, new NamedWriteableRegistry()
|
||||
);
|
||||
serviceB.acceptIncomingRequests();
|
||||
|
@ -131,7 +139,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testHelloWorld() {
|
||||
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler<StringMessageRequest>() {
|
||||
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
|
@ -175,8 +184,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
assertThat(e.getMessage(), false, equalTo(true));
|
||||
}
|
||||
|
||||
res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
res = serviceB.submitRequest(nodeA, "sayHello", new StringMessageRequest("moshe"),
|
||||
TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
|
@ -225,7 +234,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
});
|
||||
final Object context = new Object();
|
||||
final String executor = randomFrom(ThreadPool.THREAD_POOL_TYPES.keySet().toArray(new String[0]));
|
||||
BaseTransportResponseHandler<StringMessageResponse> baseTransportResponseHandler = new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
TransportResponseHandler<StringMessageResponse> responseHandler = new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
|
@ -255,7 +264,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
threadPool.getThreadContext().putHeader("test.ping.user", "ping_user");
|
||||
threadPool.getThreadContext().putTransient("my_private_context", context);
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "ping_pong", ping, baseTransportResponseHandler);
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "ping_pong", ping, responseHandler);
|
||||
|
||||
StringMessageResponse message = res.get();
|
||||
assertThat("pong", equalTo(message.message));
|
||||
|
@ -273,16 +282,17 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
serviceA.disconnectFromNode(nodeA);
|
||||
}
|
||||
final AtomicReference<Exception> exception = new AtomicReference<>();
|
||||
serviceA.registerRequestHandler("localNode", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
|
||||
try {
|
||||
channel.sendResponse(new StringMessageResponse(request.message));
|
||||
} catch (IOException e) {
|
||||
exception.set(e);
|
||||
serviceA.registerRequestHandler("localNode", StringMessageRequest::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
|
||||
try {
|
||||
channel.sendResponse(new StringMessageResponse(request.message));
|
||||
} catch (IOException e) {
|
||||
exception.set(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
final AtomicReference<String> responseString = new AtomicReference<>();
|
||||
final CountDownLatch responseLatch = new CountDownLatch(1);
|
||||
serviceA.sendRequest(nodeA, "localNode", new StringMessageRequest("test"), new TransportResponseHandler<StringMessageResponse>() {
|
||||
|
@ -314,40 +324,43 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testVoidMessageCompressed() {
|
||||
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC, new TransportRequestHandler<TransportRequest.Empty>() {
|
||||
@Override
|
||||
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
|
||||
try {
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.builder().withCompress(true).build());
|
||||
} catch (IOException e) {
|
||||
logger.error("Unexpected failure", e);
|
||||
fail(e.getMessage());
|
||||
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<TransportRequest.Empty>() {
|
||||
@Override
|
||||
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
|
||||
try {
|
||||
TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build();
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE, responseOptions);
|
||||
} catch (IOException e) {
|
||||
logger.error("Unexpected failure", e);
|
||||
fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
TransportFuture<TransportResponse.Empty> res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler<TransportResponse.Empty>() {
|
||||
@Override
|
||||
public TransportResponse.Empty newInstance() {
|
||||
return TransportResponse.Empty.INSTANCE;
|
||||
}
|
||||
TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(),
|
||||
new BaseTransportResponseHandler<TransportResponse.Empty>() {
|
||||
@Override
|
||||
public TransportResponse.Empty newInstance() {
|
||||
return TransportResponse.Empty.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(TransportResponse.Empty response) {
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(TransportResponse.Empty response) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response: " + exp.getMessage());
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response: " + exp.getMessage());
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
TransportResponse.Empty message = res.get();
|
||||
|
@ -360,42 +373,45 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testHelloWorldCompressed() {
|
||||
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
try {
|
||||
channel.sendResponse(new StringMessageResponse("hello " + request.message), TransportResponseOptions.builder().withCompress(true).build());
|
||||
} catch (IOException e) {
|
||||
logger.error("Unexpected failure", e);
|
||||
fail(e.getMessage());
|
||||
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
try {
|
||||
TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build();
|
||||
channel.sendResponse(new StringMessageResponse("hello " + request.message), responseOptions);
|
||||
} catch (IOException e) {
|
||||
logger.error("Unexpected failure", e);
|
||||
fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(),
|
||||
new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
assertThat("hello moshe", equalTo(response.message));
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
assertThat("hello moshe", equalTo(response.message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response: " + exp.getMessage());
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response: " + exp.getMessage());
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
StringMessageResponse message = res.get();
|
||||
|
@ -408,12 +424,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testErrorMessage() {
|
||||
serviceA.registerRequestHandler("sayHelloException", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
throw new RuntimeException("bad message !!!");
|
||||
}
|
||||
serviceA.registerRequestHandler("sayHelloException", StringMessageRequest::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
throw new RuntimeException("bad message !!!");
|
||||
}
|
||||
});
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloException",
|
||||
|
@ -470,18 +487,19 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
public void testNotifyOnShutdown() throws Exception {
|
||||
final CountDownLatch latch2 = new CountDownLatch(1);
|
||||
|
||||
serviceA.registerRequestHandler("foobar", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
|
||||
try {
|
||||
latch2.await();
|
||||
logger.info("Stop ServiceB now");
|
||||
serviceB.stop();
|
||||
} catch (Exception e) {
|
||||
fail(e.getMessage());
|
||||
serviceA.registerRequestHandler("foobar", StringMessageRequest::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
|
||||
try {
|
||||
latch2.await();
|
||||
logger.info("Stop ServiceB now");
|
||||
serviceB.stop();
|
||||
} catch (Exception e) {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
TransportFuture<TransportResponse.Empty> foobar = serviceB.submitRequest(nodeA, "foobar",
|
||||
new StringMessageRequest(""), TransportRequestOptions.EMPTY, EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||
latch2.countDown();
|
||||
|
@ -495,42 +513,38 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testTimeoutSendExceptionWithNeverSendingBackResponse() throws Exception {
|
||||
serviceA.registerRequestHandler("sayHelloTimeoutNoResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
// don't send back a response
|
||||
// try {
|
||||
// channel.sendResponse(new StringMessage("hello " + request.message));
|
||||
// } catch (IOException e) {
|
||||
// e.printStackTrace();
|
||||
// assertThat(e.getMessage(), false, equalTo(true));
|
||||
// }
|
||||
}
|
||||
});
|
||||
serviceA.registerRequestHandler("sayHelloTimeoutNoResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
// don't send back a response
|
||||
}
|
||||
});
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloTimeoutNoResponse",
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(),
|
||||
new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
fail("got response instead of exception");
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
fail("got response instead of exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class));
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class));
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
StringMessageResponse message = res.txGet();
|
||||
|
@ -544,48 +558,50 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
|
||||
public void testTimeoutSendExceptionWithDelayedResponse() throws Exception {
|
||||
CountDownLatch doneLatch = new CountDownLatch(1);
|
||||
serviceA.registerRequestHandler("sayHelloTimeoutDelayedResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
|
||||
TimeValue sleep = TimeValue.parseTimeValue(request.message, null, "sleep");
|
||||
try {
|
||||
doneLatch.await(sleep.millis(), TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
serviceA.registerRequestHandler("sayHelloTimeoutDelayedResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
|
||||
TimeValue sleep = TimeValue.parseTimeValue(request.message, null, "sleep");
|
||||
try {
|
||||
doneLatch.await(sleep.millis(), TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
// ignore
|
||||
}
|
||||
try {
|
||||
channel.sendResponse(new StringMessageResponse("hello " + request.message));
|
||||
} catch (IOException e) {
|
||||
logger.error("Unexpected failure", e);
|
||||
fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
try {
|
||||
channel.sendResponse(new StringMessageResponse("hello " + request.message));
|
||||
} catch (IOException e) {
|
||||
logger.error("Unexpected failure", e);
|
||||
fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
});
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse",
|
||||
new StringMessageRequest("2m"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
new StringMessageRequest("2m"), TransportRequestOptions.builder().withTimeout(100).build(),
|
||||
new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
latch.countDown();
|
||||
fail("got response instead of exception");
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
latch.countDown();
|
||||
fail("got response instead of exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
latch.countDown();
|
||||
assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class));
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
latch.countDown();
|
||||
assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class));
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
StringMessageResponse message = res.txGet();
|
||||
|
@ -599,28 +615,29 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
final int counter = i;
|
||||
// now, try and send another request, this times, with a short timeout
|
||||
res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse",
|
||||
new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(),
|
||||
new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
assertThat("hello " + counter + "ms", equalTo(response.message));
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
assertThat("hello " + counter + "ms", equalTo(response.message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response for " + counter + ": " + exp.getDetailedMessage());
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response for " + counter + ": " + exp.getDetailedMessage());
|
||||
}
|
||||
});
|
||||
|
||||
StringMessageResponse message = res.txGet();
|
||||
assertThat(message.message, equalTo("hello " + counter + "ms"));
|
||||
|
@ -718,8 +735,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
ClusterSettings service = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
|
||||
serviceA.setDynamicSettings(service);
|
||||
service.applySettings(Settings.builder()
|
||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), includeSettings, TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), excludeSettings)
|
||||
.build());
|
||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), includeSettings)
|
||||
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), excludeSettings)
|
||||
.build());
|
||||
|
||||
tracer.reset(4);
|
||||
serviceA.sendRequest(nodeB, "test", new StringMessageRequest(""), noopResponseHandler);
|
||||
|
@ -937,179 +955,188 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testVersionFrom0to1() throws Exception {
|
||||
serviceB.registerRequestHandler("/version", Version1Request::new, ThreadPool.Names.SAME, new TransportRequestHandler<Version1Request>() {
|
||||
@Override
|
||||
public void messageReceived(Version1Request request, TransportChannel channel) throws Exception {
|
||||
assertThat(request.value1, equalTo(1));
|
||||
assertThat(request.value2, equalTo(0)); // not set, coming from service A
|
||||
Version1Response response = new Version1Response();
|
||||
response.value1 = 1;
|
||||
response.value2 = 2;
|
||||
channel.sendResponse(response);
|
||||
}
|
||||
});
|
||||
serviceB.registerRequestHandler("/version", Version1Request::new, ThreadPool.Names.SAME,
|
||||
new TransportRequestHandler<Version1Request>() {
|
||||
@Override
|
||||
public void messageReceived(Version1Request request, TransportChannel channel) throws Exception {
|
||||
assertThat(request.value1, equalTo(1));
|
||||
assertThat(request.value2, equalTo(0)); // not set, coming from service A
|
||||
Version1Response response = new Version1Response();
|
||||
response.value1 = 1;
|
||||
response.value2 = 2;
|
||||
channel.sendResponse(response);
|
||||
}
|
||||
});
|
||||
|
||||
Version0Request version0Request = new Version0Request();
|
||||
version0Request.value1 = 1;
|
||||
Version0Response version0Response = serviceA.submitRequest(nodeB, "/version", version0Request, new BaseTransportResponseHandler<Version0Response>() {
|
||||
@Override
|
||||
public Version0Response newInstance() {
|
||||
return new Version0Response();
|
||||
}
|
||||
Version0Response version0Response = serviceA.submitRequest(nodeB, "/version", version0Request,
|
||||
new BaseTransportResponseHandler<Version0Response>() {
|
||||
@Override
|
||||
public Version0Response newInstance() {
|
||||
return new Version0Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(Version0Response response) {
|
||||
assertThat(response.value1, equalTo(1));
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(Version0Response response) {
|
||||
assertThat(response.value1, equalTo(1));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response: " + exp.getMessage());
|
||||
}
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response: " + exp.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
}).txGet();
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
}).txGet();
|
||||
|
||||
assertThat(version0Response.value1, equalTo(1));
|
||||
}
|
||||
|
||||
public void testVersionFrom1to0() throws Exception {
|
||||
serviceA.registerRequestHandler("/version", Version0Request::new, ThreadPool.Names.SAME, new TransportRequestHandler<Version0Request>() {
|
||||
@Override
|
||||
public void messageReceived(Version0Request request, TransportChannel channel) throws Exception {
|
||||
assertThat(request.value1, equalTo(1));
|
||||
Version0Response response = new Version0Response();
|
||||
response.value1 = 1;
|
||||
channel.sendResponse(response);
|
||||
}
|
||||
});
|
||||
serviceA.registerRequestHandler("/version", Version0Request::new, ThreadPool.Names.SAME,
|
||||
new TransportRequestHandler<Version0Request>() {
|
||||
@Override
|
||||
public void messageReceived(Version0Request request, TransportChannel channel) throws Exception {
|
||||
assertThat(request.value1, equalTo(1));
|
||||
Version0Response response = new Version0Response();
|
||||
response.value1 = 1;
|
||||
channel.sendResponse(response);
|
||||
}
|
||||
});
|
||||
|
||||
Version1Request version1Request = new Version1Request();
|
||||
version1Request.value1 = 1;
|
||||
version1Request.value2 = 2;
|
||||
Version1Response version1Response = serviceB.submitRequest(nodeA, "/version", version1Request, new BaseTransportResponseHandler<Version1Response>() {
|
||||
@Override
|
||||
public Version1Response newInstance() {
|
||||
return new Version1Response();
|
||||
}
|
||||
Version1Response version1Response = serviceB.submitRequest(nodeA, "/version", version1Request,
|
||||
new BaseTransportResponseHandler<Version1Response>() {
|
||||
@Override
|
||||
public Version1Response newInstance() {
|
||||
return new Version1Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(Version1Response response) {
|
||||
assertThat(response.value1, equalTo(1));
|
||||
assertThat(response.value2, equalTo(0)); // initial values, cause its serialized from version 0
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(Version1Response response) {
|
||||
assertThat(response.value1, equalTo(1));
|
||||
assertThat(response.value2, equalTo(0)); // initial values, cause its serialized from version 0
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response: " + exp.getMessage());
|
||||
}
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response: " + exp.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
}).txGet();
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
}).txGet();
|
||||
|
||||
assertThat(version1Response.value1, equalTo(1));
|
||||
assertThat(version1Response.value2, equalTo(0));
|
||||
}
|
||||
|
||||
public void testVersionFrom1to1() throws Exception {
|
||||
serviceB.registerRequestHandler("/version", Version1Request::new, ThreadPool.Names.SAME, new TransportRequestHandler<Version1Request>() {
|
||||
@Override
|
||||
public void messageReceived(Version1Request request, TransportChannel channel) throws Exception {
|
||||
assertThat(request.value1, equalTo(1));
|
||||
assertThat(request.value2, equalTo(2));
|
||||
Version1Response response = new Version1Response();
|
||||
response.value1 = 1;
|
||||
response.value2 = 2;
|
||||
channel.sendResponse(response);
|
||||
}
|
||||
});
|
||||
serviceB.registerRequestHandler("/version", Version1Request::new, ThreadPool.Names.SAME,
|
||||
new TransportRequestHandler<Version1Request>() {
|
||||
@Override
|
||||
public void messageReceived(Version1Request request, TransportChannel channel) throws Exception {
|
||||
assertThat(request.value1, equalTo(1));
|
||||
assertThat(request.value2, equalTo(2));
|
||||
Version1Response response = new Version1Response();
|
||||
response.value1 = 1;
|
||||
response.value2 = 2;
|
||||
channel.sendResponse(response);
|
||||
}
|
||||
});
|
||||
|
||||
Version1Request version1Request = new Version1Request();
|
||||
version1Request.value1 = 1;
|
||||
version1Request.value2 = 2;
|
||||
Version1Response version1Response = serviceB.submitRequest(nodeB, "/version", version1Request, new BaseTransportResponseHandler<Version1Response>() {
|
||||
@Override
|
||||
public Version1Response newInstance() {
|
||||
return new Version1Response();
|
||||
}
|
||||
Version1Response version1Response = serviceB.submitRequest(nodeB, "/version", version1Request,
|
||||
new BaseTransportResponseHandler<Version1Response>() {
|
||||
@Override
|
||||
public Version1Response newInstance() {
|
||||
return new Version1Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(Version1Response response) {
|
||||
assertThat(response.value1, equalTo(1));
|
||||
assertThat(response.value2, equalTo(2));
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(Version1Response response) {
|
||||
assertThat(response.value1, equalTo(1));
|
||||
assertThat(response.value2, equalTo(2));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response: " + exp.getMessage());
|
||||
}
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response: " + exp.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
}).txGet();
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
}).txGet();
|
||||
|
||||
assertThat(version1Response.value1, equalTo(1));
|
||||
assertThat(version1Response.value2, equalTo(2));
|
||||
}
|
||||
|
||||
public void testVersionFrom0to0() throws Exception {
|
||||
serviceA.registerRequestHandler("/version", Version0Request::new, ThreadPool.Names.SAME, new TransportRequestHandler<Version0Request>() {
|
||||
@Override
|
||||
public void messageReceived(Version0Request request, TransportChannel channel) throws Exception {
|
||||
assertThat(request.value1, equalTo(1));
|
||||
Version0Response response = new Version0Response();
|
||||
response.value1 = 1;
|
||||
channel.sendResponse(response);
|
||||
}
|
||||
});
|
||||
serviceA.registerRequestHandler("/version", Version0Request::new, ThreadPool.Names.SAME,
|
||||
new TransportRequestHandler<Version0Request>() {
|
||||
@Override
|
||||
public void messageReceived(Version0Request request, TransportChannel channel) throws Exception {
|
||||
assertThat(request.value1, equalTo(1));
|
||||
Version0Response response = new Version0Response();
|
||||
response.value1 = 1;
|
||||
channel.sendResponse(response);
|
||||
}
|
||||
});
|
||||
|
||||
Version0Request version0Request = new Version0Request();
|
||||
version0Request.value1 = 1;
|
||||
Version0Response version0Response = serviceA.submitRequest(nodeA, "/version", version0Request, new BaseTransportResponseHandler<Version0Response>() {
|
||||
@Override
|
||||
public Version0Response newInstance() {
|
||||
return new Version0Response();
|
||||
}
|
||||
Version0Response version0Response = serviceA.submitRequest(nodeA, "/version", version0Request,
|
||||
new BaseTransportResponseHandler<Version0Response>() {
|
||||
@Override
|
||||
public Version0Response newInstance() {
|
||||
return new Version0Response();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(Version0Response response) {
|
||||
assertThat(response.value1, equalTo(1));
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(Version0Response response) {
|
||||
assertThat(response.value1, equalTo(1));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response: " + exp.getMessage());
|
||||
}
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response: " + exp.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
}).txGet();
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
}).txGet();
|
||||
|
||||
assertThat(version0Response.value1, equalTo(1));
|
||||
}
|
||||
|
||||
public void testMockFailToSendNoConnectRule() {
|
||||
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
throw new RuntimeException("bad message !!!");
|
||||
}
|
||||
});
|
||||
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
throw new RuntimeException("bad message !!!");
|
||||
}
|
||||
});
|
||||
|
||||
serviceB.addFailToSendNoConnectRule(serviceA);
|
||||
|
||||
|
@ -1161,38 +1188,40 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testMockUnresponsiveRule() {
|
||||
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
throw new RuntimeException("bad message !!!");
|
||||
}
|
||||
});
|
||||
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
throw new RuntimeException("bad message !!!");
|
||||
}
|
||||
});
|
||||
|
||||
serviceB.addUnresponsiveRule(serviceA);
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(),
|
||||
new BaseTransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
fail("got response instead of exception");
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
fail("got response instead of exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class));
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class));
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
res.txGet();
|
||||
|
@ -1264,7 +1293,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
|
||||
public void testBlockingIncomingRequests() throws Exception {
|
||||
TransportService service = build(
|
||||
Settings.builder().put("name", "TS_TEST", TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "", TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING").build(),
|
||||
Settings.builder()
|
||||
.put("name", "TS_TEST")
|
||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
|
||||
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
|
||||
.build(),
|
||||
version0, new NamedWriteableRegistry()
|
||||
);
|
||||
AtomicBoolean requestProcessed = new AtomicBoolean();
|
||||
|
@ -1274,7 +1307,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
|||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
});
|
||||
|
||||
DiscoveryNode node = new DiscoveryNode("TS_TEST", "TS_TEST", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
|
||||
DiscoveryNode node =
|
||||
new DiscoveryNode("TS_TEST", "TS_TEST", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
|
||||
serviceA.connectToNode(node);
|
||||
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
|
|
|
@ -44,7 +44,8 @@ public class ActionNamesIT extends ESIntegTestCase {
|
|||
public void testActionNamesCategories() throws NoSuchFieldException, IllegalAccessException {
|
||||
TransportService transportService = internalCluster().getInstance(TransportService.class);
|
||||
for (String action : transportService.requestHandlers.keySet()) {
|
||||
assertThat("action doesn't belong to known category", action, either(startsWith("indices:admin")).or(startsWith("indices:monitor"))
|
||||
assertThat("action doesn't belong to known category", action,
|
||||
either(startsWith("indices:admin")).or(startsWith("indices:monitor"))
|
||||
.or(startsWith("indices:data/read")).or(startsWith("indices:data/write"))
|
||||
.or(startsWith("cluster:admin")).or(startsWith("cluster:monitor"))
|
||||
.or(startsWith("internal:")));
|
||||
|
|
|
@ -133,7 +133,8 @@ public class ContextAndHeaderTransportIT extends ESIntegTestCase {
|
|||
.setSource(jsonBuilder().startObject().field("username", "foo").endObject()).get();
|
||||
transportClient().admin().indices().prepareRefresh(queryIndex, lookupIndex).get();
|
||||
|
||||
TermsQueryBuilder termsLookupFilterBuilder = QueryBuilders.termsLookupQuery("username", new TermsLookup(lookupIndex, "type", "1", "followers"));
|
||||
TermsLookup termsLookup = new TermsLookup(lookupIndex, "type", "1", "followers");
|
||||
TermsQueryBuilder termsLookupFilterBuilder = QueryBuilders.termsLookupQuery("username", termsLookup);
|
||||
BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.matchAllQuery()).must(termsLookupFilterBuilder);
|
||||
|
||||
SearchResponse searchResponse = transportClient()
|
||||
|
@ -219,7 +220,9 @@ public class ContextAndHeaderTransportIT extends ESIntegTestCase {
|
|||
public void testThatPercolatingExistingDocumentGetRequestContainsContextAndHeaders() throws Exception {
|
||||
Client client = transportClient();
|
||||
client.prepareIndex(lookupIndex, ".percolator", "1")
|
||||
.setSource(jsonBuilder().startObject().startObject("query").startObject("match").field("name", "star wars").endObject().endObject().endObject())
|
||||
.setSource(
|
||||
jsonBuilder()
|
||||
.startObject().startObject("query").startObject("match").field("name", "star wars").endObject().endObject().endObject())
|
||||
.get();
|
||||
client.prepareIndex(lookupIndex, "type", "1")
|
||||
.setSource(jsonBuilder().startObject().field("name", "Star Wars - The new republic").endObject())
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.common.network.NetworkService;
|
|||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.MockBigArrays;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
|
@ -66,12 +67,14 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase {
|
|||
threadPool.setClusterSettings(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
|
||||
NetworkService networkService = new NetworkService(settings);
|
||||
BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService());
|
||||
nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService());
|
||||
nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT, new NamedWriteableRegistry(),
|
||||
new NoneCircuitBreakerService());
|
||||
nettyTransport.start();
|
||||
TransportService transportService = new TransportService(nettyTransport, threadPool);
|
||||
nettyTransport.transportServiceAdapter(transportService.createAdapter());
|
||||
|
||||
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(nettyTransport.boundAddress().boundAddresses());
|
||||
TransportAddress[] boundAddresses = nettyTransport.boundAddress().boundAddresses();
|
||||
InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(boundAddresses);
|
||||
port = transportAddress.address().getPort();
|
||||
host = transportAddress.address().getAddress();
|
||||
|
||||
|
|
|
@ -35,7 +35,8 @@ public class TransportModuleTests extends ModuleTestCase {
|
|||
|
||||
static class FakeTransport extends AssertingLocalTransport {
|
||||
@Inject
|
||||
public FakeTransport(Settings settings, CircuitBreakerService circuitBreakerService, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
public FakeTransport(Settings settings, CircuitBreakerService circuitBreakerService, ThreadPool threadPool, Version version,
|
||||
NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, circuitBreakerService, threadPool, version, namedWriteableRegistry);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,24 +53,31 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
public void testScheduledPing() throws Exception {
|
||||
ThreadPool threadPool = new ThreadPool(getClass().getName());
|
||||
|
||||
Settings settings = Settings.builder().put(NettyTransport.PING_SCHEDULE.getKey(), "5ms").put(TransportSettings.PORT.getKey(), 0).build();
|
||||
Settings settings = Settings.builder()
|
||||
.put(NettyTransport.PING_SCHEDULE.getKey(), "5ms")
|
||||
.put(TransportSettings.PORT.getKey(), 0)
|
||||
.build();
|
||||
|
||||
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
|
||||
|
||||
NamedWriteableRegistry registryA = new NamedWriteableRegistry();
|
||||
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryA, circuitBreakerService);
|
||||
final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings),
|
||||
BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryA, circuitBreakerService);
|
||||
MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool);
|
||||
serviceA.start();
|
||||
serviceA.acceptIncomingRequests();
|
||||
|
||||
NamedWriteableRegistry registryB = new NamedWriteableRegistry();
|
||||
final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryB, circuitBreakerService);
|
||||
final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings),
|
||||
BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryB, circuitBreakerService);
|
||||
MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool);
|
||||
serviceB.start();
|
||||
serviceB.acceptIncomingRequests();
|
||||
|
||||
DiscoveryNode nodeA = new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
DiscoveryNode nodeB = new DiscoveryNode("TS_B", "TS_B", serviceB.boundAddress().publishAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
DiscoveryNode nodeA =
|
||||
new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
DiscoveryNode nodeB =
|
||||
new DiscoveryNode("TS_B", "TS_B", serviceB.boundAddress().publishAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
|
||||
serviceA.connectToNode(nodeB);
|
||||
serviceB.connectToNode(nodeA);
|
||||
|
@ -85,7 +92,8 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
assertThat(nettyA.scheduledPing.failedPings.count(), equalTo(0L));
|
||||
assertThat(nettyB.scheduledPing.failedPings.count(), equalTo(0L));
|
||||
|
||||
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC, new TransportRequestHandler<TransportRequest.Empty>() {
|
||||
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<TransportRequest.Empty>() {
|
||||
@Override
|
||||
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
|
||||
try {
|
||||
|
@ -101,7 +109,8 @@ public class NettyScheduledPingTests extends ESTestCase {
|
|||
int rounds = scaledRandomIntBetween(100, 5000);
|
||||
for (int i = 0; i < rounds; i++) {
|
||||
serviceB.submitRequest(nodeA, "sayHello",
|
||||
TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(randomBoolean()).build(), new BaseTransportResponseHandler<TransportResponse.Empty>() {
|
||||
TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(randomBoolean()).build(),
|
||||
new BaseTransportResponseHandler<TransportResponse.Empty>() {
|
||||
@Override
|
||||
public TransportResponse.Empty newInstance() {
|
||||
return TransportResponse.Empty.INSTANCE;
|
||||
|
|
|
@ -101,7 +101,8 @@ public class NettyTransportIT extends ESIntegTestCase {
|
|||
|
||||
@Inject
|
||||
public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays,
|
||||
Version version, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) {
|
||||
Version version, NamedWriteableRegistry namedWriteableRegistry,
|
||||
CircuitBreakerService circuitBreakerService) {
|
||||
super(settings, threadPool, networkService, bigArrays, version, namedWriteableRegistry, circuitBreakerService);
|
||||
}
|
||||
|
||||
|
@ -114,15 +115,16 @@ public class NettyTransportIT extends ESIntegTestCase {
|
|||
|
||||
private final ESLogger logger;
|
||||
|
||||
public ErrorPipelineFactory(ExceptionThrowingNettyTransport exceptionThrowingNettyTransport, String name, Settings groupSettings) {
|
||||
super(exceptionThrowingNettyTransport, name, groupSettings);
|
||||
this.logger = exceptionThrowingNettyTransport.logger;
|
||||
public ErrorPipelineFactory(ExceptionThrowingNettyTransport nettyTransport, String name, Settings groupSettings) {
|
||||
super(nettyTransport, name, groupSettings);
|
||||
this.logger = nettyTransport.logger;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ChannelPipeline getPipeline() throws Exception {
|
||||
ChannelPipeline pipeline = super.getPipeline();
|
||||
pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger, TransportSettings.DEFAULT_PROFILE) {
|
||||
pipeline.replace("dispatcher", "dispatcher",
|
||||
new MessageChannelHandler(nettyTransport, logger, TransportSettings.DEFAULT_PROFILE) {
|
||||
|
||||
@Override
|
||||
protected String handleRequest(Channel channel, Marker marker, StreamInput buffer, long requestId,
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
|||
import org.elasticsearch.common.network.NetworkAddress;
|
||||
import org.elasticsearch.common.network.NetworkModule;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.BoundTransportAddress;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.env.Environment;
|
||||
|
@ -89,19 +90,21 @@ public class NettyTransportMultiPortIntegrationIT extends ESIntegTestCase {
|
|||
for (NodeInfo nodeInfo : response.getNodes()) {
|
||||
assertThat(nodeInfo.getTransport().getProfileAddresses().keySet(), hasSize(1));
|
||||
assertThat(nodeInfo.getTransport().getProfileAddresses(), hasKey("client1"));
|
||||
for (TransportAddress transportAddress : nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddresses()) {
|
||||
BoundTransportAddress boundTransportAddress = nodeInfo.getTransport().getProfileAddresses().get("client1");
|
||||
for (TransportAddress transportAddress : boundTransportAddress.boundAddresses()) {
|
||||
assertThat(transportAddress, instanceOf(InetSocketTransportAddress.class));
|
||||
}
|
||||
|
||||
// bound addresses
|
||||
for (TransportAddress transportAddress : nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddresses()) {
|
||||
for (TransportAddress transportAddress : boundTransportAddress.boundAddresses()) {
|
||||
assertThat(transportAddress, instanceOf(InetSocketTransportAddress.class));
|
||||
assertThat(((InetSocketTransportAddress) transportAddress).address().getPort(), is(allOf(greaterThanOrEqualTo(randomPort), lessThanOrEqualTo(randomPort + 10))));
|
||||
assertThat(((InetSocketTransportAddress) transportAddress).address().getPort(),
|
||||
is(allOf(greaterThanOrEqualTo(randomPort), lessThanOrEqualTo(randomPort + 10))));
|
||||
}
|
||||
|
||||
// publish address
|
||||
assertThat(nodeInfo.getTransport().getProfileAddresses().get("client1").publishAddress(), instanceOf(InetSocketTransportAddress.class));
|
||||
InetSocketTransportAddress publishAddress = (InetSocketTransportAddress) nodeInfo.getTransport().getProfileAddresses().get("client1").publishAddress();
|
||||
assertThat(boundTransportAddress.publishAddress(), instanceOf(InetSocketTransportAddress.class));
|
||||
InetSocketTransportAddress publishAddress = (InetSocketTransportAddress) boundTransportAddress.publishAddress();
|
||||
assertThat(NetworkAddress.format(publishAddress.address().getAddress()), is("127.0.0.7"));
|
||||
assertThat(publishAddress.address().getPort(), is(4321));
|
||||
}
|
||||
|
|
|
@ -80,7 +80,8 @@ public class AssertingLocalTransport extends LocalTransport {
|
|||
private final Version maxVersion;
|
||||
|
||||
@Inject
|
||||
public AssertingLocalTransport(Settings settings, CircuitBreakerService circuitBreakerService, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
public AssertingLocalTransport(Settings settings, CircuitBreakerService circuitBreakerService, ThreadPool threadPool,
|
||||
Version version, NamedWriteableRegistry namedWriteableRegistry) {
|
||||
super(settings, threadPool, version, namedWriteableRegistry, circuitBreakerService);
|
||||
final long seed = ESIntegTestCase.INDEX_TEST_SEED_SETTING.get(settings);
|
||||
random = new Random(seed);
|
||||
|
@ -96,7 +97,8 @@ public class AssertingLocalTransport extends LocalTransport {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
|
||||
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request,
|
||||
TransportRequestOptions options) throws IOException, TransportException {
|
||||
ElasticsearchAssertions.assertVersionSerializable(VersionUtils.randomVersionBetween(random, minVersion, maxVersion), request,
|
||||
namedWriteableRegistry);
|
||||
super.sendRequest(node, requestId, action, request, options);
|
||||
|
|
|
@ -169,7 +169,8 @@ public class CapturingTransport implements Transport {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
|
||||
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options)
|
||||
throws IOException, TransportException {
|
||||
requests.put(requestId, Tuple.tuple(node, action));
|
||||
capturedRequests.add(new CapturedRequest(node, requestId, action, request));
|
||||
}
|
||||
|
|
|
@ -193,7 +193,8 @@ public class MockTransportService extends TransportService {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
|
||||
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
|
||||
TransportRequestOptions options) throws IOException, TransportException {
|
||||
throw new ConnectTransportException(node, "DISCONNECT: simulated");
|
||||
}
|
||||
});
|
||||
|
@ -239,7 +240,8 @@ public class MockTransportService extends TransportService {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
|
||||
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
|
||||
TransportRequestOptions options) throws IOException, TransportException {
|
||||
if (blockedActions.contains(action)) {
|
||||
logger.info("--> preventing {} request", action);
|
||||
throw new ConnectTransportException(node, "DISCONNECT: prevented " + action + " request");
|
||||
|
@ -276,7 +278,8 @@ public class MockTransportService extends TransportService {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
|
||||
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
|
||||
TransportRequestOptions options) throws IOException, TransportException {
|
||||
// don't send anything, the receiving node is unresponsive
|
||||
}
|
||||
});
|
||||
|
@ -356,7 +359,8 @@ public class MockTransportService extends TransportService {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, TransportRequest request, final TransportRequestOptions options) throws IOException, TransportException {
|
||||
public void sendRequest(final DiscoveryNode node, final long requestId, final String action, TransportRequest request,
|
||||
final TransportRequestOptions options) throws IOException, TransportException {
|
||||
// delayed sending - even if larger then the request timeout to simulated a potential late response from target node
|
||||
|
||||
TimeValue delay = getDelay();
|
||||
|
@ -390,7 +394,7 @@ public class MockTransportService extends TransportService {
|
|||
/**
|
||||
* Adds a new delegate transport that is used for communication with the given transport service.
|
||||
*
|
||||
* @return <tt>true</tt> iff no other delegate was registered for any of the addresses bound by transport service, otherwise <tt>false</tt>
|
||||
* @return <tt>true</tt> iff no other delegate was registered for any of the addresses bound by transport service.
|
||||
*/
|
||||
public boolean addDelegate(TransportService transportService, DelegateTransport transport) {
|
||||
boolean noRegistered = true;
|
||||
|
@ -403,7 +407,7 @@ public class MockTransportService extends TransportService {
|
|||
/**
|
||||
* Adds a new delegate transport that is used for communication with the given transport address.
|
||||
*
|
||||
* @return <tt>true</tt> iff no other delegate was registered for this address before, otherwise <tt>false</tt>
|
||||
* @return <tt>true</tt> iff no other delegate was registered for this address before.
|
||||
*/
|
||||
public boolean addDelegate(TransportAddress transportAddress, DelegateTransport transport) {
|
||||
return transport().transports.put(transportAddress, transport) == null;
|
||||
|
@ -454,7 +458,8 @@ public class MockTransportService extends TransportService {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
|
||||
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
|
||||
TransportRequestOptions options) throws IOException, TransportException {
|
||||
getTransport(node).sendRequest(node, requestId, action, request, options);
|
||||
}
|
||||
}
|
||||
|
@ -513,7 +518,8 @@ public class MockTransportService extends TransportService {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
|
||||
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
|
||||
TransportRequestOptions options) throws IOException, TransportException {
|
||||
transport.sendRequest(node, requestId, action, request, options);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue