diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml index 6ea0c03de73..cafdacad5cd 100644 --- a/buildSrc/src/main/resources/checkstyle_suppressions.xml +++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml @@ -404,10 +404,6 @@ - - - - @@ -808,12 +804,6 @@ - - - - - - @@ -876,9 +866,6 @@ - - - @@ -977,7 +964,6 @@ - @@ -1023,8 +1009,6 @@ - - @@ -1305,15 +1289,6 @@ - - - - - - - - - @@ -1503,9 +1478,6 @@ - - - @@ -1516,7 +1488,6 @@ - diff --git a/core/src/main/java/org/elasticsearch/http/HttpTransportSettings.java b/core/src/main/java/org/elasticsearch/http/HttpTransportSettings.java index 48af1c83965..403c543041d 100644 --- a/core/src/main/java/org/elasticsearch/http/HttpTransportSettings.java +++ b/core/src/main/java/org/elasticsearch/http/HttpTransportSettings.java @@ -36,13 +36,13 @@ public final class HttpTransportSettings { public static final Setting SETTING_CORS_ENABLED = Setting.boolSetting("http.cors.enabled", false, Property.NodeScope); public static final Setting SETTING_CORS_ALLOW_ORIGIN = - new Setting("http.cors.allow-origin", "", (value) -> value, Property.NodeScope); + new Setting<>("http.cors.allow-origin", "", (value) -> value, Property.NodeScope); public static final Setting SETTING_CORS_MAX_AGE = Setting.intSetting("http.cors.max-age", 1728000, Property.NodeScope); public static final Setting SETTING_CORS_ALLOW_METHODS = - new Setting("http.cors.allow-methods", "OPTIONS, HEAD, GET, POST, PUT, DELETE", (value) -> value, Property.NodeScope); + new Setting<>("http.cors.allow-methods", "OPTIONS, HEAD, GET, POST, PUT, DELETE", (value) -> value, Property.NodeScope); public static final Setting SETTING_CORS_ALLOW_HEADERS = - new Setting("http.cors.allow-headers", "X-Requested-With, Content-Type, Content-Length", (value) -> value, Property.NodeScope); + new Setting<>("http.cors.allow-headers", "X-Requested-With, Content-Type, Content-Length", (value) -> value, Property.NodeScope); public static final Setting SETTING_CORS_ALLOW_CREDENTIALS = Setting.boolSetting("http.cors.allow-credentials", false, Property.NodeScope); public static final Setting SETTING_PIPELINING = @@ -61,7 +61,7 @@ public final class HttpTransportSettings { listSetting("http.bind_host", SETTING_HTTP_HOST, Function.identity(), Property.NodeScope); public static final Setting SETTING_HTTP_PORT = - new Setting("http.port", "9200-9300", PortsRange::new, Property.NodeScope); + new Setting<>("http.port", "9200-9300", PortsRange::new, Property.NodeScope); public static final Setting SETTING_HTTP_PUBLISH_PORT = Setting.intSetting("http.publish_port", -1, -1, Property.NodeScope); public static final Setting SETTING_HTTP_DETAILED_ERRORS_ENABLED = diff --git a/core/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java b/core/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java index a634db247aa..22d6743b186 100644 --- a/core/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java +++ b/core/src/main/java/org/elasticsearch/http/netty/NettyHttpChannel.java @@ -118,7 +118,8 @@ public final class NettyHttpChannel extends AbstractRestChannel { ChannelFuture future; if (orderedUpstreamMessageEvent != null) { - OrderedDownstreamChannelEvent downstreamChannelEvent = new OrderedDownstreamChannelEvent(orderedUpstreamMessageEvent, 0, true, resp); + OrderedDownstreamChannelEvent downstreamChannelEvent = + new OrderedDownstreamChannelEvent(orderedUpstreamMessageEvent, 0, true, resp); future = downstreamChannelEvent.getFuture(); channel.getPipeline().sendDownstream(downstreamChannelEvent); } else { diff --git a/core/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java b/core/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java index ccc9a29b629..e90a3710889 100644 --- a/core/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java +++ b/core/src/main/java/org/elasticsearch/http/netty/NettyHttpServerTransport.java @@ -249,7 +249,8 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent{}], pipelining[{}], pipelining_max_events[{}]", - maxChunkSize, maxHeaderSize, maxInitialLineLength, this.maxContentLength, receivePredictorMin, receivePredictorMax, pipelining, pipeliningMaxEvents); + logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], " + + "receive_predictor[{}->{}], pipelining[{}], pipelining_max_events[{}]", maxChunkSize, maxHeaderSize, maxInitialLineLength, + this.maxContentLength, receivePredictorMin, receivePredictorMax, pipelining, pipeliningMaxEvents); } public Settings settings() { @@ -335,7 +337,7 @@ public class NettyHttpServerTransport extends AbstractLifecycleComponent extends BaseFuture implements TransportFuture, TransportResponseHandler { +public class PlainTransportFuture extends BaseFuture + implements TransportFuture, TransportResponseHandler { private final TransportResponseHandler handler; diff --git a/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java b/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java index e58df27644e..4b816de52c3 100644 --- a/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java +++ b/core/src/main/java/org/elasticsearch/transport/RequestHandlerRegistry.java @@ -37,7 +37,8 @@ public class RequestHandlerRegistry { private final Supplier requestFactory; private final TaskManager taskManager; - public RequestHandlerRegistry(String action, Supplier requestFactory, TaskManager taskManager, TransportRequestHandler handler, String executor, boolean forceExecution) { + public RequestHandlerRegistry(String action, Supplier requestFactory, TaskManager taskManager, + TransportRequestHandler handler, String executor, boolean forceExecution) { this.action = action; this.requestFactory = requestFactory; assert newRequest() != null; diff --git a/core/src/main/java/org/elasticsearch/transport/Transport.java b/core/src/main/java/org/elasticsearch/transport/Transport.java index 5acd7504691..76793d5598f 100644 --- a/core/src/main/java/org/elasticsearch/transport/Transport.java +++ b/core/src/main/java/org/elasticsearch/transport/Transport.java @@ -47,7 +47,7 @@ public interface Transport extends LifecycleComponent { /** * Further profile bound addresses - * @return Should return null if transport does not support profiles, otherwise a map with name of profile and its bound transport address + * @return null iff profiles are unsupported, otherwise a map with name of profile and its bound transport address */ Map profileBoundAddresses(); diff --git a/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java b/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java index 69fc73e4af0..24be5ecc3ab 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java @@ -33,7 +33,9 @@ public abstract class TransportChannelResponseHandler emptyResponseHandler(ESLogger logger, TransportChannel channel, String extraInfoOnError) { + public static TransportChannelResponseHandler emptyResponseHandler(ESLogger logger, + TransportChannel channel, + String extraInfoOnError) { return new TransportChannelResponseHandler(logger, channel, extraInfoOnError) { @Override public TransportResponse.Empty newInstance() { @@ -45,7 +47,10 @@ public abstract class TransportChannelResponseHandler TransportChannelResponseHandler responseHandler(ESLogger logger, Supplier responseSupplier, TransportChannel channel, String extraInfoOnError) { + public static TransportChannelResponseHandler responseHandler(ESLogger logger, + Supplier responseSupplier, + TransportChannel channel, + String extraInfoOnError) { return new TransportChannelResponseHandler(logger, channel, extraInfoOnError) { @Override public T newInstance() { diff --git a/core/src/main/java/org/elasticsearch/transport/TransportService.java b/core/src/main/java/org/elasticsearch/transport/TransportService.java index 12c058dbc91..b9663da72c2 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportService.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportService.java @@ -84,12 +84,13 @@ public class TransportService extends AbstractLifecycleComponent timeoutInfoHandlers = Collections.synchronizedMap(new LinkedHashMap(100, .75F, true) { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return size() > 100; - } - }); + final Map timeoutInfoHandlers = + Collections.synchronizedMap(new LinkedHashMap(100, .75F, true) { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > 100; + } + }); private final TransportService.Adapter adapter; @@ -203,7 +204,8 @@ public class TransportService extends AbstractLifecycleComponent TransportFuture submitRequest(DiscoveryNode node, String action, TransportRequest request, - TransportRequestOptions options, TransportResponseHandler handler) throws TransportException { + TransportRequestOptions options, + TransportResponseHandler handler) throws TransportException { PlainTransportFuture futureHandler = new PlainTransportFuture<>(handler); sendRequest(node, action, request, options, futureHandler); return futureHandler; @@ -313,10 +317,12 @@ public class TransportService extends AbstractLifecycleComponent(new ContextRestoreResponseHandler(threadPool.getThreadContext().newStoredContext(), handler), node, action, timeoutHandler)); + TransportResponseHandler responseHandler = + new ContextRestoreResponseHandler<>(threadPool.getThreadContext().newStoredContext(), handler); + clientHandlers.put(requestId, new RequestHolder<>(responseHandler, node, action, timeoutHandler)); if (lifecycle.stoppedOrClosed()) { - // if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify the caller. - // it will only notify if the toStop code hasn't done the work yet. + // if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify + // the caller. It will only notify if the toStop code hasn't done the work yet. throw new TransportException("TransportService is closed stopped can't send request"); } if (timeoutHandler != null) { @@ -432,7 +438,8 @@ public class TransportService extends AbstractLifecycleComponent void registerRequestHandler(String action, Supplier requestFactory, String executor, TransportRequestHandler handler) { + public void registerRequestHandler(String action, Supplier requestFactory, String executor, + TransportRequestHandler handler) { RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, requestFactory, taskManager, handler, executor, false); registerRequestHandler(reg); } @@ -446,7 +453,9 @@ public class TransportService extends AbstractLifecycleComponent void registerRequestHandler(String action, Supplier request, String executor, boolean forceExecution, TransportRequestHandler handler) { + public void registerRequestHandler(String action, Supplier request, + String executor, boolean forceExecution, + TransportRequestHandler handler) { RequestHandlerRegistry reg = new RequestHandlerRegistry<>(action, request, taskManager, handler, executor, forceExecution); registerRequestHandler(reg); } @@ -487,7 +496,8 @@ public class TransportService extends AbstractLifecycleComponent imp @Override @SuppressWarnings("unchecked") - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws IOException, TransportException { //we make sure that nodes get added to the connected ones when calling addTransportAddress, by returning proper nodes info if (connectMode) { diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java index a62f15f9277..a20a5247ed6 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientIT.java @@ -65,8 +65,10 @@ public class TransportClientIT extends ESIntegTestCase { try { TransportAddress transportAddress = node.injector().getInstance(TransportService.class).boundAddress().publishAddress(); client.addTransportAddress(transportAddress); - assertThat(nodeService.connectedNodes().size(), greaterThanOrEqualTo(1)); // since we force transport clients there has to be one node started that we connect to. - for (DiscoveryNode discoveryNode : nodeService.connectedNodes()) { // connected nodes have updated version + // since we force transport clients there has to be one node started that we connect to. + assertThat(nodeService.connectedNodes().size(), greaterThanOrEqualTo(1)); + // connected nodes have updated version + for (DiscoveryNode discoveryNode : nodeService.connectedNodes()) { assertThat(discoveryNode.getVersion(), equalTo(Version.CURRENT)); } diff --git a/core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java b/core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java index e9b6833bf32..2fcadb51a10 100644 --- a/core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java +++ b/core/src/test/java/org/elasticsearch/client/transport/TransportClientRetryIT.java @@ -58,9 +58,9 @@ public class TransportClientRetryIT extends ESIntegTestCase { .put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING.getKey(), true) .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir()); - try (TransportClient transportClient = TransportClient.builder().settings(builder.build()).build()) { - transportClient.addTransportAddresses(addresses); - assertThat(transportClient.connectedNodes().size(), equalTo(internalCluster().size())); + try (TransportClient client = TransportClient.builder().settings(builder.build()).build()) { + client.addTransportAddresses(addresses); + assertThat(client.connectedNodes().size(), equalTo(internalCluster().size())); int size = cluster().size(); //kill all nodes one by one, leaving a single master/data node at the end of the loop @@ -71,14 +71,14 @@ public class TransportClientRetryIT extends ESIntegTestCase { ClusterState clusterState; //use both variants of execute method: with and without listener if (randomBoolean()) { - clusterState = transportClient.admin().cluster().state(clusterStateRequest).get().getState(); + clusterState = client.admin().cluster().state(clusterStateRequest).get().getState(); } else { - PlainListenableActionFuture future = new PlainListenableActionFuture<>(transportClient.threadPool()); - transportClient.admin().cluster().state(clusterStateRequest, future); + PlainListenableActionFuture future = new PlainListenableActionFuture<>(client.threadPool()); + client.admin().cluster().state(clusterStateRequest, future); clusterState = future.get().getState(); } assertThat(clusterState.nodes().getSize(), greaterThanOrEqualTo(size - j)); - assertThat(transportClient.connectedNodes().size(), greaterThanOrEqualTo(size - j)); + assertThat(client.connectedNodes().size(), greaterThanOrEqualTo(size - j)); } } } diff --git a/core/src/test/java/org/elasticsearch/common/transport/BoundTransportAddressTests.java b/core/src/test/java/org/elasticsearch/common/transport/BoundTransportAddressTests.java index 55bc2b8ddb9..45db5a33d21 100644 --- a/core/src/test/java/org/elasticsearch/common/transport/BoundTransportAddressTests.java +++ b/core/src/test/java/org/elasticsearch/common/transport/BoundTransportAddressTests.java @@ -44,7 +44,8 @@ public class BoundTransportAddressTests extends ESTestCase { for (InetAddress address : inetAddresses) { transportAddressList.add(new InetSocketTransportAddress(address, randomIntBetween(9200, 9299))); } - final BoundTransportAddress transportAddress = new BoundTransportAddress(transportAddressList.toArray(new InetSocketTransportAddress[0]), transportAddressList.get(0)); + final BoundTransportAddress transportAddress = + new BoundTransportAddress(transportAddressList.toArray(new InetSocketTransportAddress[0]), transportAddressList.get(0)); assertThat(transportAddress.boundAddresses().length, equalTo(transportAddressList.size())); // serialize diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java index 9420f1de928..f048b5526d5 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningDisabledIT.java @@ -21,6 +21,7 @@ package org.elasticsearch.http.netty; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; @@ -28,7 +29,6 @@ import org.elasticsearch.test.ESIntegTestCase.Scope; import org.jboss.netty.handler.codec.http.HttpResponse; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Locale; @@ -44,19 +44,24 @@ import static org.hamcrest.Matchers.hasSize; public class NettyPipeliningDisabledIT extends ESIntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(NetworkModule.HTTP_ENABLED.getKey(), true).put("http.pipelining", false).build(); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(NetworkModule.HTTP_ENABLED.getKey(), true) + .put("http.pipelining", false) + .build(); } public void testThatNettyHttpServerDoesNotSupportPipelining() throws Exception { ensureGreen(); - List requests = Arrays.asList("/", "/_nodes/stats", "/", "/_cluster/state", "/", "/_nodes", "/"); + String[] requests = new String[] {"/", "/_nodes/stats", "/", "/_cluster/state", "/", "/_nodes", "/"}; HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); - InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses()); + TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses(); + InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(boundAddresses); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { - Collection responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); - assertThat(responses, hasSize(requests.size())); + Collection responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests); + assertThat(responses, hasSize(requests.length)); List opaqueIds = new ArrayList<>(returnOpaqueIds(responses)); diff --git a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java index 1eccb946797..fd1c493ac36 100644 --- a/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java +++ b/core/src/test/java/org/elasticsearch/http/netty/NettyPipeliningEnabledIT.java @@ -21,15 +21,14 @@ package org.elasticsearch.http.netty; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.jboss.netty.handler.codec.http.HttpResponse; -import java.util.Arrays; import java.util.Collection; -import java.util.List; import java.util.Locale; import static org.elasticsearch.http.netty.NettyHttpClient.returnOpaqueIds; @@ -41,17 +40,22 @@ import static org.hamcrest.Matchers.is; public class NettyPipeliningEnabledIT extends ESIntegTestCase { @Override protected Settings nodeSettings(int nodeOrdinal) { - return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(NetworkModule.HTTP_ENABLED.getKey(), true).put("http.pipelining", true).build(); + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put(NetworkModule.HTTP_ENABLED.getKey(), true) + .put("http.pipelining", true) + .build(); } public void testThatNettyHttpServerSupportsPipelining() throws Exception { - List requests = Arrays.asList("/", "/_nodes/stats", "/", "/_cluster/state", "/"); + String[] requests = new String[]{"/", "/_nodes/stats", "/", "/_cluster/state", "/"}; HttpServerTransport httpServerTransport = internalCluster().getInstance(HttpServerTransport.class); - InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(httpServerTransport.boundAddress().boundAddresses()); + TransportAddress[] boundAddresses = httpServerTransport.boundAddress().boundAddresses(); + InetSocketTransportAddress inetSocketTransportAddress = (InetSocketTransportAddress) randomFrom(boundAddresses); try (NettyHttpClient nettyHttpClient = new NettyHttpClient()) { - Collection responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests.toArray(new String[]{})); + Collection responses = nettyHttpClient.get(inetSocketTransportAddress.address(), requests); assertThat(responses, hasSize(5)); Collection opaqueIds = returnOpaqueIds(responses); @@ -62,7 +66,7 @@ public class NettyPipeliningEnabledIT extends ESIntegTestCase { private void assertOpaqueIdsInOrder(Collection opaqueIds) { // check if opaque ids are monotonically increasing int i = 0; - String msg = String.format(Locale.ROOT, "Expected list of opaque ids to be monotonically increasing, got [" + opaqueIds + "]"); + String msg = String.format(Locale.ROOT, "Expected list of opaque ids to be monotonically increasing, got [%s]", opaqueIds); for (String opaqueId : opaqueIds) { assertThat(msg, opaqueId, is(String.valueOf(i++))); } diff --git a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index 38b1ef05335..edeecd61d8e 100644 --- a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -73,13 +73,21 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { super.setUp(); threadPool = new ThreadPool(getClass().getName()); serviceA = build( - Settings.builder().put("name", "TS_A", TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "", TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING").build(), + Settings.builder() + .put("name", "TS_A") + .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") + .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") + .build(), version0, new NamedWriteableRegistry() ); serviceA.acceptIncomingRequests(); nodeA = new DiscoveryNode("TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); serviceB = build( - Settings.builder().put("name", "TS_B", TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "", TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING").build(), + Settings.builder() + .put("name", "TS_B") + .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") + .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") + .build(), version1, new NamedWriteableRegistry() ); serviceB.acceptIncomingRequests(); @@ -131,7 +139,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testHelloWorld() { - serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { + serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, + new TransportRequestHandler() { @Override public void messageReceived(StringMessageRequest request, TransportChannel channel) { assertThat("moshe", equalTo(request.message)); @@ -175,8 +184,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { assertThat(e.getMessage(), false, equalTo(true)); } - res = serviceB.submitRequest(nodeA, "sayHello", - new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler() { + res = serviceB.submitRequest(nodeA, "sayHello", new StringMessageRequest("moshe"), + TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { return new StringMessageResponse(); @@ -225,7 +234,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { }); final Object context = new Object(); final String executor = randomFrom(ThreadPool.THREAD_POOL_TYPES.keySet().toArray(new String[0])); - BaseTransportResponseHandler baseTransportResponseHandler = new BaseTransportResponseHandler() { + TransportResponseHandler responseHandler = new BaseTransportResponseHandler() { @Override public StringMessageResponse newInstance() { return new StringMessageResponse(); @@ -255,7 +264,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { threadPool.getThreadContext().putHeader("test.ping.user", "ping_user"); threadPool.getThreadContext().putTransient("my_private_context", context); - TransportFuture res = serviceB.submitRequest(nodeA, "ping_pong", ping, baseTransportResponseHandler); + TransportFuture res = serviceB.submitRequest(nodeA, "ping_pong", ping, responseHandler); StringMessageResponse message = res.get(); assertThat("pong", equalTo(message.message)); @@ -273,16 +282,17 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { serviceA.disconnectFromNode(nodeA); } final AtomicReference exception = new AtomicReference<>(); - serviceA.registerRequestHandler("localNode", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { - @Override - public void messageReceived(StringMessageRequest request, TransportChannel channel) { - try { - channel.sendResponse(new StringMessageResponse(request.message)); - } catch (IOException e) { - exception.set(e); + serviceA.registerRequestHandler("localNode", StringMessageRequest::new, ThreadPool.Names.GENERIC, + new TransportRequestHandler() { + @Override + public void messageReceived(StringMessageRequest request, TransportChannel channel) { + try { + channel.sendResponse(new StringMessageResponse(request.message)); + } catch (IOException e) { + exception.set(e); + } } - } - }); + }); final AtomicReference responseString = new AtomicReference<>(); final CountDownLatch responseLatch = new CountDownLatch(1); serviceA.sendRequest(nodeA, "localNode", new StringMessageRequest("test"), new TransportResponseHandler() { @@ -314,40 +324,43 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testVoidMessageCompressed() { - serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { - @Override - public void messageReceived(TransportRequest.Empty request, TransportChannel channel) { - try { - channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.builder().withCompress(true).build()); - } catch (IOException e) { - logger.error("Unexpected failure", e); - fail(e.getMessage()); + serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC, + new TransportRequestHandler() { + @Override + public void messageReceived(TransportRequest.Empty request, TransportChannel channel) { + try { + TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build(); + channel.sendResponse(TransportResponse.Empty.INSTANCE, responseOptions); + } catch (IOException e) { + logger.error("Unexpected failure", e); + fail(e.getMessage()); + } } - } - }); + }); TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", - TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler() { - @Override - public TransportResponse.Empty newInstance() { - return TransportResponse.Empty.INSTANCE; - } + TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(), + new BaseTransportResponseHandler() { + @Override + public TransportResponse.Empty newInstance() { + return TransportResponse.Empty.INSTANCE; + } - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } - @Override - public void handleResponse(TransportResponse.Empty response) { - } + @Override + public void handleResponse(TransportResponse.Empty response) { + } - @Override - public void handleException(TransportException exp) { - logger.error("Unexpected failure", exp); - fail("got exception instead of a response: " + exp.getMessage()); - } - }); + @Override + public void handleException(TransportException exp) { + logger.error("Unexpected failure", exp); + fail("got exception instead of a response: " + exp.getMessage()); + } + }); try { TransportResponse.Empty message = res.get(); @@ -360,42 +373,45 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testHelloWorldCompressed() { - serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { - @Override - public void messageReceived(StringMessageRequest request, TransportChannel channel) { - assertThat("moshe", equalTo(request.message)); - try { - channel.sendResponse(new StringMessageResponse("hello " + request.message), TransportResponseOptions.builder().withCompress(true).build()); - } catch (IOException e) { - logger.error("Unexpected failure", e); - fail(e.getMessage()); + serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, + new TransportRequestHandler() { + @Override + public void messageReceived(StringMessageRequest request, TransportChannel channel) { + assertThat("moshe", equalTo(request.message)); + try { + TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build(); + channel.sendResponse(new StringMessageResponse("hello " + request.message), responseOptions); + } catch (IOException e) { + logger.error("Unexpected failure", e); + fail(e.getMessage()); + } } - } }); TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", - new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new BaseTransportResponseHandler() { - @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); - } + new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), + new BaseTransportResponseHandler() { + @Override + public StringMessageResponse newInstance() { + return new StringMessageResponse(); + } - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } - @Override - public void handleResponse(StringMessageResponse response) { - assertThat("hello moshe", equalTo(response.message)); - } + @Override + public void handleResponse(StringMessageResponse response) { + assertThat("hello moshe", equalTo(response.message)); + } - @Override - public void handleException(TransportException exp) { - logger.error("Unexpected failure", exp); - fail("got exception instead of a response: " + exp.getMessage()); - } - }); + @Override + public void handleException(TransportException exp) { + logger.error("Unexpected failure", exp); + fail("got exception instead of a response: " + exp.getMessage()); + } + }); try { StringMessageResponse message = res.get(); @@ -408,12 +424,13 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testErrorMessage() { - serviceA.registerRequestHandler("sayHelloException", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { - @Override - public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception { - assertThat("moshe", equalTo(request.message)); - throw new RuntimeException("bad message !!!"); - } + serviceA.registerRequestHandler("sayHelloException", StringMessageRequest::new, ThreadPool.Names.GENERIC, + new TransportRequestHandler() { + @Override + public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception { + assertThat("moshe", equalTo(request.message)); + throw new RuntimeException("bad message !!!"); + } }); TransportFuture res = serviceB.submitRequest(nodeA, "sayHelloException", @@ -470,18 +487,19 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testNotifyOnShutdown() throws Exception { final CountDownLatch latch2 = new CountDownLatch(1); - serviceA.registerRequestHandler("foobar", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { - @Override - public void messageReceived(StringMessageRequest request, TransportChannel channel) { - try { - latch2.await(); - logger.info("Stop ServiceB now"); - serviceB.stop(); - } catch (Exception e) { - fail(e.getMessage()); + serviceA.registerRequestHandler("foobar", StringMessageRequest::new, ThreadPool.Names.GENERIC, + new TransportRequestHandler() { + @Override + public void messageReceived(StringMessageRequest request, TransportChannel channel) { + try { + latch2.await(); + logger.info("Stop ServiceB now"); + serviceB.stop(); + } catch (Exception e) { + fail(e.getMessage()); + } } - } - }); + }); TransportFuture foobar = serviceB.submitRequest(nodeA, "foobar", new StringMessageRequest(""), TransportRequestOptions.EMPTY, EmptyTransportResponseHandler.INSTANCE_SAME); latch2.countDown(); @@ -495,42 +513,38 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testTimeoutSendExceptionWithNeverSendingBackResponse() throws Exception { - serviceA.registerRequestHandler("sayHelloTimeoutNoResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { - @Override - public void messageReceived(StringMessageRequest request, TransportChannel channel) { - assertThat("moshe", equalTo(request.message)); - // don't send back a response -// try { -// channel.sendResponse(new StringMessage("hello " + request.message)); -// } catch (IOException e) { -// e.printStackTrace(); -// assertThat(e.getMessage(), false, equalTo(true)); -// } - } - }); + serviceA.registerRequestHandler("sayHelloTimeoutNoResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC, + new TransportRequestHandler() { + @Override + public void messageReceived(StringMessageRequest request, TransportChannel channel) { + assertThat("moshe", equalTo(request.message)); + // don't send back a response + } + }); TransportFuture res = serviceB.submitRequest(nodeA, "sayHelloTimeoutNoResponse", - new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler() { - @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); - } + new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), + new BaseTransportResponseHandler() { + @Override + public StringMessageResponse newInstance() { + return new StringMessageResponse(); + } - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } - @Override - public void handleResponse(StringMessageResponse response) { - fail("got response instead of exception"); - } + @Override + public void handleResponse(StringMessageResponse response) { + fail("got response instead of exception"); + } - @Override - public void handleException(TransportException exp) { - assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class)); - } - }); + @Override + public void handleException(TransportException exp) { + assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class)); + } + }); try { StringMessageResponse message = res.txGet(); @@ -544,48 +558,50 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testTimeoutSendExceptionWithDelayedResponse() throws Exception { CountDownLatch doneLatch = new CountDownLatch(1); - serviceA.registerRequestHandler("sayHelloTimeoutDelayedResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { - @Override - public void messageReceived(StringMessageRequest request, TransportChannel channel) { - TimeValue sleep = TimeValue.parseTimeValue(request.message, null, "sleep"); - try { - doneLatch.await(sleep.millis(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // ignore + serviceA.registerRequestHandler("sayHelloTimeoutDelayedResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC, + new TransportRequestHandler() { + @Override + public void messageReceived(StringMessageRequest request, TransportChannel channel) { + TimeValue sleep = TimeValue.parseTimeValue(request.message, null, "sleep"); + try { + doneLatch.await(sleep.millis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + // ignore + } + try { + channel.sendResponse(new StringMessageResponse("hello " + request.message)); + } catch (IOException e) { + logger.error("Unexpected failure", e); + fail(e.getMessage()); + } } - try { - channel.sendResponse(new StringMessageResponse("hello " + request.message)); - } catch (IOException e) { - logger.error("Unexpected failure", e); - fail(e.getMessage()); - } - } }); final CountDownLatch latch = new CountDownLatch(1); TransportFuture res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse", - new StringMessageRequest("2m"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler() { - @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); - } + new StringMessageRequest("2m"), TransportRequestOptions.builder().withTimeout(100).build(), + new BaseTransportResponseHandler() { + @Override + public StringMessageResponse newInstance() { + return new StringMessageResponse(); + } - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } - @Override - public void handleResponse(StringMessageResponse response) { - latch.countDown(); - fail("got response instead of exception"); - } + @Override + public void handleResponse(StringMessageResponse response) { + latch.countDown(); + fail("got response instead of exception"); + } - @Override - public void handleException(TransportException exp) { - latch.countDown(); - assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class)); - } - }); + @Override + public void handleException(TransportException exp) { + latch.countDown(); + assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class)); + } + }); try { StringMessageResponse message = res.txGet(); @@ -599,28 +615,29 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { final int counter = i; // now, try and send another request, this times, with a short timeout res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse", - new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(), new BaseTransportResponseHandler() { - @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); - } + new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(), + new BaseTransportResponseHandler() { + @Override + public StringMessageResponse newInstance() { + return new StringMessageResponse(); + } - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } - @Override - public void handleResponse(StringMessageResponse response) { - assertThat("hello " + counter + "ms", equalTo(response.message)); - } + @Override + public void handleResponse(StringMessageResponse response) { + assertThat("hello " + counter + "ms", equalTo(response.message)); + } - @Override - public void handleException(TransportException exp) { - logger.error("Unexpected failure", exp); - fail("got exception instead of a response for " + counter + ": " + exp.getDetailedMessage()); - } - }); + @Override + public void handleException(TransportException exp) { + logger.error("Unexpected failure", exp); + fail("got exception instead of a response for " + counter + ": " + exp.getDetailedMessage()); + } + }); StringMessageResponse message = res.txGet(); assertThat(message.message, equalTo("hello " + counter + "ms")); @@ -718,8 +735,9 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { ClusterSettings service = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); serviceA.setDynamicSettings(service); service.applySettings(Settings.builder() - .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), includeSettings, TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), excludeSettings) - .build()); + .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), includeSettings) + .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), excludeSettings) + .build()); tracer.reset(4); serviceA.sendRequest(nodeB, "test", new StringMessageRequest(""), noopResponseHandler); @@ -937,179 +955,188 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testVersionFrom0to1() throws Exception { - serviceB.registerRequestHandler("/version", Version1Request::new, ThreadPool.Names.SAME, new TransportRequestHandler() { - @Override - public void messageReceived(Version1Request request, TransportChannel channel) throws Exception { - assertThat(request.value1, equalTo(1)); - assertThat(request.value2, equalTo(0)); // not set, coming from service A - Version1Response response = new Version1Response(); - response.value1 = 1; - response.value2 = 2; - channel.sendResponse(response); - } - }); + serviceB.registerRequestHandler("/version", Version1Request::new, ThreadPool.Names.SAME, + new TransportRequestHandler() { + @Override + public void messageReceived(Version1Request request, TransportChannel channel) throws Exception { + assertThat(request.value1, equalTo(1)); + assertThat(request.value2, equalTo(0)); // not set, coming from service A + Version1Response response = new Version1Response(); + response.value1 = 1; + response.value2 = 2; + channel.sendResponse(response); + } + }); Version0Request version0Request = new Version0Request(); version0Request.value1 = 1; - Version0Response version0Response = serviceA.submitRequest(nodeB, "/version", version0Request, new BaseTransportResponseHandler() { - @Override - public Version0Response newInstance() { - return new Version0Response(); - } + Version0Response version0Response = serviceA.submitRequest(nodeB, "/version", version0Request, + new BaseTransportResponseHandler() { + @Override + public Version0Response newInstance() { + return new Version0Response(); + } - @Override - public void handleResponse(Version0Response response) { - assertThat(response.value1, equalTo(1)); - } + @Override + public void handleResponse(Version0Response response) { + assertThat(response.value1, equalTo(1)); + } - @Override - public void handleException(TransportException exp) { - logger.error("Unexpected failure", exp); - fail("got exception instead of a response: " + exp.getMessage()); - } + @Override + public void handleException(TransportException exp) { + logger.error("Unexpected failure", exp); + fail("got exception instead of a response: " + exp.getMessage()); + } - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }).txGet(); + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }).txGet(); assertThat(version0Response.value1, equalTo(1)); } public void testVersionFrom1to0() throws Exception { - serviceA.registerRequestHandler("/version", Version0Request::new, ThreadPool.Names.SAME, new TransportRequestHandler() { - @Override - public void messageReceived(Version0Request request, TransportChannel channel) throws Exception { - assertThat(request.value1, equalTo(1)); - Version0Response response = new Version0Response(); - response.value1 = 1; - channel.sendResponse(response); - } - }); + serviceA.registerRequestHandler("/version", Version0Request::new, ThreadPool.Names.SAME, + new TransportRequestHandler() { + @Override + public void messageReceived(Version0Request request, TransportChannel channel) throws Exception { + assertThat(request.value1, equalTo(1)); + Version0Response response = new Version0Response(); + response.value1 = 1; + channel.sendResponse(response); + } + }); Version1Request version1Request = new Version1Request(); version1Request.value1 = 1; version1Request.value2 = 2; - Version1Response version1Response = serviceB.submitRequest(nodeA, "/version", version1Request, new BaseTransportResponseHandler() { - @Override - public Version1Response newInstance() { - return new Version1Response(); - } + Version1Response version1Response = serviceB.submitRequest(nodeA, "/version", version1Request, + new BaseTransportResponseHandler() { + @Override + public Version1Response newInstance() { + return new Version1Response(); + } - @Override - public void handleResponse(Version1Response response) { - assertThat(response.value1, equalTo(1)); - assertThat(response.value2, equalTo(0)); // initial values, cause its serialized from version 0 - } + @Override + public void handleResponse(Version1Response response) { + assertThat(response.value1, equalTo(1)); + assertThat(response.value2, equalTo(0)); // initial values, cause its serialized from version 0 + } - @Override - public void handleException(TransportException exp) { - logger.error("Unexpected failure", exp); - fail("got exception instead of a response: " + exp.getMessage()); - } + @Override + public void handleException(TransportException exp) { + logger.error("Unexpected failure", exp); + fail("got exception instead of a response: " + exp.getMessage()); + } - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }).txGet(); + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }).txGet(); assertThat(version1Response.value1, equalTo(1)); assertThat(version1Response.value2, equalTo(0)); } public void testVersionFrom1to1() throws Exception { - serviceB.registerRequestHandler("/version", Version1Request::new, ThreadPool.Names.SAME, new TransportRequestHandler() { - @Override - public void messageReceived(Version1Request request, TransportChannel channel) throws Exception { - assertThat(request.value1, equalTo(1)); - assertThat(request.value2, equalTo(2)); - Version1Response response = new Version1Response(); - response.value1 = 1; - response.value2 = 2; - channel.sendResponse(response); - } - }); + serviceB.registerRequestHandler("/version", Version1Request::new, ThreadPool.Names.SAME, + new TransportRequestHandler() { + @Override + public void messageReceived(Version1Request request, TransportChannel channel) throws Exception { + assertThat(request.value1, equalTo(1)); + assertThat(request.value2, equalTo(2)); + Version1Response response = new Version1Response(); + response.value1 = 1; + response.value2 = 2; + channel.sendResponse(response); + } + }); Version1Request version1Request = new Version1Request(); version1Request.value1 = 1; version1Request.value2 = 2; - Version1Response version1Response = serviceB.submitRequest(nodeB, "/version", version1Request, new BaseTransportResponseHandler() { - @Override - public Version1Response newInstance() { - return new Version1Response(); - } + Version1Response version1Response = serviceB.submitRequest(nodeB, "/version", version1Request, + new BaseTransportResponseHandler() { + @Override + public Version1Response newInstance() { + return new Version1Response(); + } - @Override - public void handleResponse(Version1Response response) { - assertThat(response.value1, equalTo(1)); - assertThat(response.value2, equalTo(2)); - } + @Override + public void handleResponse(Version1Response response) { + assertThat(response.value1, equalTo(1)); + assertThat(response.value2, equalTo(2)); + } - @Override - public void handleException(TransportException exp) { - logger.error("Unexpected failure", exp); - fail("got exception instead of a response: " + exp.getMessage()); - } + @Override + public void handleException(TransportException exp) { + logger.error("Unexpected failure", exp); + fail("got exception instead of a response: " + exp.getMessage()); + } - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }).txGet(); + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }).txGet(); assertThat(version1Response.value1, equalTo(1)); assertThat(version1Response.value2, equalTo(2)); } public void testVersionFrom0to0() throws Exception { - serviceA.registerRequestHandler("/version", Version0Request::new, ThreadPool.Names.SAME, new TransportRequestHandler() { - @Override - public void messageReceived(Version0Request request, TransportChannel channel) throws Exception { - assertThat(request.value1, equalTo(1)); - Version0Response response = new Version0Response(); - response.value1 = 1; - channel.sendResponse(response); - } - }); + serviceA.registerRequestHandler("/version", Version0Request::new, ThreadPool.Names.SAME, + new TransportRequestHandler() { + @Override + public void messageReceived(Version0Request request, TransportChannel channel) throws Exception { + assertThat(request.value1, equalTo(1)); + Version0Response response = new Version0Response(); + response.value1 = 1; + channel.sendResponse(response); + } + }); Version0Request version0Request = new Version0Request(); version0Request.value1 = 1; - Version0Response version0Response = serviceA.submitRequest(nodeA, "/version", version0Request, new BaseTransportResponseHandler() { - @Override - public Version0Response newInstance() { - return new Version0Response(); - } + Version0Response version0Response = serviceA.submitRequest(nodeA, "/version", version0Request, + new BaseTransportResponseHandler() { + @Override + public Version0Response newInstance() { + return new Version0Response(); + } - @Override - public void handleResponse(Version0Response response) { - assertThat(response.value1, equalTo(1)); - } + @Override + public void handleResponse(Version0Response response) { + assertThat(response.value1, equalTo(1)); + } - @Override - public void handleException(TransportException exp) { - logger.error("Unexpected failure", exp); - fail("got exception instead of a response: " + exp.getMessage()); - } + @Override + public void handleException(TransportException exp) { + logger.error("Unexpected failure", exp); + fail("got exception instead of a response: " + exp.getMessage()); + } - @Override - public String executor() { - return ThreadPool.Names.SAME; - } - }).txGet(); + @Override + public String executor() { + return ThreadPool.Names.SAME; + } + }).txGet(); assertThat(version0Response.value1, equalTo(1)); } public void testMockFailToSendNoConnectRule() { - serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { - @Override - public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception { - assertThat("moshe", equalTo(request.message)); - throw new RuntimeException("bad message !!!"); - } - }); + serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, + new TransportRequestHandler() { + @Override + public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception { + assertThat("moshe", equalTo(request.message)); + throw new RuntimeException("bad message !!!"); + } + }); serviceB.addFailToSendNoConnectRule(serviceA); @@ -1161,38 +1188,40 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public void testMockUnresponsiveRule() { - serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { - @Override - public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception { - assertThat("moshe", equalTo(request.message)); - throw new RuntimeException("bad message !!!"); - } - }); + serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, + new TransportRequestHandler() { + @Override + public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception { + assertThat("moshe", equalTo(request.message)); + throw new RuntimeException("bad message !!!"); + } + }); serviceB.addUnresponsiveRule(serviceA); TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", - new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new BaseTransportResponseHandler() { - @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); - } + new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), + new BaseTransportResponseHandler() { + @Override + public StringMessageResponse newInstance() { + return new StringMessageResponse(); + } - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } - @Override - public void handleResponse(StringMessageResponse response) { - fail("got response instead of exception"); - } + @Override + public void handleResponse(StringMessageResponse response) { + fail("got response instead of exception"); + } - @Override - public void handleException(TransportException exp) { - assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class)); - } - }); + @Override + public void handleException(TransportException exp) { + assertThat(exp, instanceOf(ReceiveTimeoutTransportException.class)); + } + }); try { res.txGet(); @@ -1264,7 +1293,11 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testBlockingIncomingRequests() throws Exception { TransportService service = build( - Settings.builder().put("name", "TS_TEST", TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "", TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING").build(), + Settings.builder() + .put("name", "TS_TEST") + .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") + .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") + .build(), version0, new NamedWriteableRegistry() ); AtomicBoolean requestProcessed = new AtomicBoolean(); @@ -1274,7 +1307,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { channel.sendResponse(TransportResponse.Empty.INSTANCE); }); - DiscoveryNode node = new DiscoveryNode("TS_TEST", "TS_TEST", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); + DiscoveryNode node = + new DiscoveryNode("TS_TEST", "TS_TEST", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); serviceA.connectToNode(node); CountDownLatch latch = new CountDownLatch(1); diff --git a/core/src/test/java/org/elasticsearch/transport/ActionNamesIT.java b/core/src/test/java/org/elasticsearch/transport/ActionNamesIT.java index d790137b38f..8ad8d42e5ef 100644 --- a/core/src/test/java/org/elasticsearch/transport/ActionNamesIT.java +++ b/core/src/test/java/org/elasticsearch/transport/ActionNamesIT.java @@ -44,7 +44,8 @@ public class ActionNamesIT extends ESIntegTestCase { public void testActionNamesCategories() throws NoSuchFieldException, IllegalAccessException { TransportService transportService = internalCluster().getInstance(TransportService.class); for (String action : transportService.requestHandlers.keySet()) { - assertThat("action doesn't belong to known category", action, either(startsWith("indices:admin")).or(startsWith("indices:monitor")) + assertThat("action doesn't belong to known category", action, + either(startsWith("indices:admin")).or(startsWith("indices:monitor")) .or(startsWith("indices:data/read")).or(startsWith("indices:data/write")) .or(startsWith("cluster:admin")).or(startsWith("cluster:monitor")) .or(startsWith("internal:"))); diff --git a/core/src/test/java/org/elasticsearch/transport/ContextAndHeaderTransportIT.java b/core/src/test/java/org/elasticsearch/transport/ContextAndHeaderTransportIT.java index bcae62d693c..c730d70bb0b 100644 --- a/core/src/test/java/org/elasticsearch/transport/ContextAndHeaderTransportIT.java +++ b/core/src/test/java/org/elasticsearch/transport/ContextAndHeaderTransportIT.java @@ -133,7 +133,8 @@ public class ContextAndHeaderTransportIT extends ESIntegTestCase { .setSource(jsonBuilder().startObject().field("username", "foo").endObject()).get(); transportClient().admin().indices().prepareRefresh(queryIndex, lookupIndex).get(); - TermsQueryBuilder termsLookupFilterBuilder = QueryBuilders.termsLookupQuery("username", new TermsLookup(lookupIndex, "type", "1", "followers")); + TermsLookup termsLookup = new TermsLookup(lookupIndex, "type", "1", "followers"); + TermsQueryBuilder termsLookupFilterBuilder = QueryBuilders.termsLookupQuery("username", termsLookup); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.matchAllQuery()).must(termsLookupFilterBuilder); SearchResponse searchResponse = transportClient() @@ -219,7 +220,9 @@ public class ContextAndHeaderTransportIT extends ESIntegTestCase { public void testThatPercolatingExistingDocumentGetRequestContainsContextAndHeaders() throws Exception { Client client = transportClient(); client.prepareIndex(lookupIndex, ".percolator", "1") - .setSource(jsonBuilder().startObject().startObject("query").startObject("match").field("name", "star wars").endObject().endObject().endObject()) + .setSource( + jsonBuilder() + .startObject().startObject("query").startObject("match").field("name", "star wars").endObject().endObject().endObject()) .get(); client.prepareIndex(lookupIndex, "type", "1") .setSource(jsonBuilder().startObject().field("name", "Star Wars - The new republic").endObject()) diff --git a/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java b/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java index 0e8c27c3143..46c3cdbe3aa 100644 --- a/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java +++ b/core/src/test/java/org/elasticsearch/transport/NettySizeHeaderFrameDecoderTests.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.network.NetworkService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; @@ -66,12 +67,14 @@ public class NettySizeHeaderFrameDecoderTests extends ESTestCase { threadPool.setClusterSettings(new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); NetworkService networkService = new NetworkService(settings); BigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(settings, threadPool), new NoneCircuitBreakerService()); - nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT, new NamedWriteableRegistry(), new NoneCircuitBreakerService()); + nettyTransport = new NettyTransport(settings, threadPool, networkService, bigArrays, Version.CURRENT, new NamedWriteableRegistry(), + new NoneCircuitBreakerService()); nettyTransport.start(); TransportService transportService = new TransportService(nettyTransport, threadPool); nettyTransport.transportServiceAdapter(transportService.createAdapter()); - InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(nettyTransport.boundAddress().boundAddresses()); + TransportAddress[] boundAddresses = nettyTransport.boundAddress().boundAddresses(); + InetSocketTransportAddress transportAddress = (InetSocketTransportAddress) randomFrom(boundAddresses); port = transportAddress.address().getPort(); host = transportAddress.address().getAddress(); diff --git a/core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java b/core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java index 1caeb1b13ef..888a73c9386 100644 --- a/core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java +++ b/core/src/test/java/org/elasticsearch/transport/TransportModuleTests.java @@ -35,7 +35,8 @@ public class TransportModuleTests extends ModuleTestCase { static class FakeTransport extends AssertingLocalTransport { @Inject - public FakeTransport(Settings settings, CircuitBreakerService circuitBreakerService, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) { + public FakeTransport(Settings settings, CircuitBreakerService circuitBreakerService, ThreadPool threadPool, Version version, + NamedWriteableRegistry namedWriteableRegistry) { super(settings, circuitBreakerService, threadPool, version, namedWriteableRegistry); } } diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java index 1baa087717c..49f86b909a6 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyScheduledPingTests.java @@ -53,24 +53,31 @@ public class NettyScheduledPingTests extends ESTestCase { public void testScheduledPing() throws Exception { ThreadPool threadPool = new ThreadPool(getClass().getName()); - Settings settings = Settings.builder().put(NettyTransport.PING_SCHEDULE.getKey(), "5ms").put(TransportSettings.PORT.getKey(), 0).build(); + Settings settings = Settings.builder() + .put(NettyTransport.PING_SCHEDULE.getKey(), "5ms") + .put(TransportSettings.PORT.getKey(), 0) + .build(); CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService(); NamedWriteableRegistry registryA = new NamedWriteableRegistry(); - final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryA, circuitBreakerService); + final NettyTransport nettyA = new NettyTransport(settings, threadPool, new NetworkService(settings), + BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryA, circuitBreakerService); MockTransportService serviceA = new MockTransportService(settings, nettyA, threadPool); serviceA.start(); serviceA.acceptIncomingRequests(); NamedWriteableRegistry registryB = new NamedWriteableRegistry(); - final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryB, circuitBreakerService); + final NettyTransport nettyB = new NettyTransport(settings, threadPool, new NetworkService(settings), + BigArrays.NON_RECYCLING_INSTANCE, Version.CURRENT, registryB, circuitBreakerService); MockTransportService serviceB = new MockTransportService(settings, nettyB, threadPool); serviceB.start(); serviceB.acceptIncomingRequests(); - DiscoveryNode nodeA = new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), Version.CURRENT); - DiscoveryNode nodeB = new DiscoveryNode("TS_B", "TS_B", serviceB.boundAddress().publishAddress(), emptyMap(), emptySet(), Version.CURRENT); + DiscoveryNode nodeA = + new DiscoveryNode("TS_A", "TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), Version.CURRENT); + DiscoveryNode nodeB = + new DiscoveryNode("TS_B", "TS_B", serviceB.boundAddress().publishAddress(), emptyMap(), emptySet(), Version.CURRENT); serviceA.connectToNode(nodeB); serviceB.connectToNode(nodeA); @@ -85,7 +92,8 @@ public class NettyScheduledPingTests extends ESTestCase { assertThat(nettyA.scheduledPing.failedPings.count(), equalTo(0L)); assertThat(nettyB.scheduledPing.failedPings.count(), equalTo(0L)); - serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { + serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC, + new TransportRequestHandler() { @Override public void messageReceived(TransportRequest.Empty request, TransportChannel channel) { try { @@ -101,7 +109,8 @@ public class NettyScheduledPingTests extends ESTestCase { int rounds = scaledRandomIntBetween(100, 5000); for (int i = 0; i < rounds; i++) { serviceB.submitRequest(nodeA, "sayHello", - TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(randomBoolean()).build(), new BaseTransportResponseHandler() { + TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(randomBoolean()).build(), + new BaseTransportResponseHandler() { @Override public TransportResponse.Empty newInstance() { return TransportResponse.Empty.INSTANCE; diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportIT.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportIT.java index 9c5ce454730..10e20928376 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportIT.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportIT.java @@ -101,7 +101,8 @@ public class NettyTransportIT extends ESIntegTestCase { @Inject public ExceptionThrowingNettyTransport(Settings settings, ThreadPool threadPool, NetworkService networkService, BigArrays bigArrays, - Version version, NamedWriteableRegistry namedWriteableRegistry, CircuitBreakerService circuitBreakerService) { + Version version, NamedWriteableRegistry namedWriteableRegistry, + CircuitBreakerService circuitBreakerService) { super(settings, threadPool, networkService, bigArrays, version, namedWriteableRegistry, circuitBreakerService); } @@ -114,15 +115,16 @@ public class NettyTransportIT extends ESIntegTestCase { private final ESLogger logger; - public ErrorPipelineFactory(ExceptionThrowingNettyTransport exceptionThrowingNettyTransport, String name, Settings groupSettings) { - super(exceptionThrowingNettyTransport, name, groupSettings); - this.logger = exceptionThrowingNettyTransport.logger; + public ErrorPipelineFactory(ExceptionThrowingNettyTransport nettyTransport, String name, Settings groupSettings) { + super(nettyTransport, name, groupSettings); + this.logger = nettyTransport.logger; } @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = super.getPipeline(); - pipeline.replace("dispatcher", "dispatcher", new MessageChannelHandler(nettyTransport, logger, TransportSettings.DEFAULT_PROFILE) { + pipeline.replace("dispatcher", "dispatcher", + new MessageChannelHandler(nettyTransport, logger, TransportSettings.DEFAULT_PROFILE) { @Override protected String handleRequest(Channel channel, Marker marker, StreamInput buffer, long requestId, diff --git a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationIT.java b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationIT.java index 51fd3bf7f75..7f673e00fee 100644 --- a/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationIT.java +++ b/core/src/test/java/org/elasticsearch/transport/netty/NettyTransportMultiPortIntegrationIT.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.network.NetworkAddress; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.transport.BoundTransportAddress; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.env.Environment; @@ -89,19 +90,21 @@ public class NettyTransportMultiPortIntegrationIT extends ESIntegTestCase { for (NodeInfo nodeInfo : response.getNodes()) { assertThat(nodeInfo.getTransport().getProfileAddresses().keySet(), hasSize(1)); assertThat(nodeInfo.getTransport().getProfileAddresses(), hasKey("client1")); - for (TransportAddress transportAddress : nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddresses()) { + BoundTransportAddress boundTransportAddress = nodeInfo.getTransport().getProfileAddresses().get("client1"); + for (TransportAddress transportAddress : boundTransportAddress.boundAddresses()) { assertThat(transportAddress, instanceOf(InetSocketTransportAddress.class)); } // bound addresses - for (TransportAddress transportAddress : nodeInfo.getTransport().getProfileAddresses().get("client1").boundAddresses()) { + for (TransportAddress transportAddress : boundTransportAddress.boundAddresses()) { assertThat(transportAddress, instanceOf(InetSocketTransportAddress.class)); - assertThat(((InetSocketTransportAddress) transportAddress).address().getPort(), is(allOf(greaterThanOrEqualTo(randomPort), lessThanOrEqualTo(randomPort + 10)))); + assertThat(((InetSocketTransportAddress) transportAddress).address().getPort(), + is(allOf(greaterThanOrEqualTo(randomPort), lessThanOrEqualTo(randomPort + 10)))); } // publish address - assertThat(nodeInfo.getTransport().getProfileAddresses().get("client1").publishAddress(), instanceOf(InetSocketTransportAddress.class)); - InetSocketTransportAddress publishAddress = (InetSocketTransportAddress) nodeInfo.getTransport().getProfileAddresses().get("client1").publishAddress(); + assertThat(boundTransportAddress.publishAddress(), instanceOf(InetSocketTransportAddress.class)); + InetSocketTransportAddress publishAddress = (InetSocketTransportAddress) boundTransportAddress.publishAddress(); assertThat(NetworkAddress.format(publishAddress.address().getAddress()), is("127.0.0.7")); assertThat(publishAddress.address().getPort(), is(4321)); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/AssertingLocalTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/AssertingLocalTransport.java index 1e18a160f4a..a9687c27873 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/AssertingLocalTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/AssertingLocalTransport.java @@ -80,7 +80,8 @@ public class AssertingLocalTransport extends LocalTransport { private final Version maxVersion; @Inject - public AssertingLocalTransport(Settings settings, CircuitBreakerService circuitBreakerService, ThreadPool threadPool, Version version, NamedWriteableRegistry namedWriteableRegistry) { + public AssertingLocalTransport(Settings settings, CircuitBreakerService circuitBreakerService, ThreadPool threadPool, + Version version, NamedWriteableRegistry namedWriteableRegistry) { super(settings, threadPool, version, namedWriteableRegistry, circuitBreakerService); final long seed = ESIntegTestCase.INDEX_TEST_SEED_SETTING.get(settings); random = new Random(seed); @@ -96,7 +97,8 @@ public class AssertingLocalTransport extends LocalTransport { } @Override - public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + public void sendRequest(final DiscoveryNode node, final long requestId, final String action, final TransportRequest request, + TransportRequestOptions options) throws IOException, TransportException { ElasticsearchAssertions.assertVersionSerializable(VersionUtils.randomVersionBetween(random, minVersion, maxVersion), request, namedWriteableRegistry); super.sendRequest(node, requestId, action, request, options); diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java index ebdd1da3987..654a1c971ca 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/CapturingTransport.java @@ -169,7 +169,8 @@ public class CapturingTransport implements Transport { } @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) + throws IOException, TransportException { requests.put(requestId, Tuple.tuple(node, action)); capturedRequests.add(new CapturedRequest(node, requestId, action, request)); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java index 20d9a228675..8e85134763d 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/elasticsearch/test/transport/MockTransportService.java @@ -193,7 +193,8 @@ public class MockTransportService extends TransportService { } @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException, TransportException { throw new ConnectTransportException(node, "DISCONNECT: simulated"); } }); @@ -239,7 +240,8 @@ public class MockTransportService extends TransportService { } @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException, TransportException { if (blockedActions.contains(action)) { logger.info("--> preventing {} request", action); throw new ConnectTransportException(node, "DISCONNECT: prevented " + action + " request"); @@ -276,7 +278,8 @@ public class MockTransportService extends TransportService { } @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException, TransportException { // don't send anything, the receiving node is unresponsive } }); @@ -356,7 +359,8 @@ public class MockTransportService extends TransportService { } @Override - public void sendRequest(final DiscoveryNode node, final long requestId, final String action, TransportRequest request, final TransportRequestOptions options) throws IOException, TransportException { + public void sendRequest(final DiscoveryNode node, final long requestId, final String action, TransportRequest request, + final TransportRequestOptions options) throws IOException, TransportException { // delayed sending - even if larger then the request timeout to simulated a potential late response from target node TimeValue delay = getDelay(); @@ -390,7 +394,7 @@ public class MockTransportService extends TransportService { /** * Adds a new delegate transport that is used for communication with the given transport service. * - * @return true iff no other delegate was registered for any of the addresses bound by transport service, otherwise false + * @return true iff no other delegate was registered for any of the addresses bound by transport service. */ public boolean addDelegate(TransportService transportService, DelegateTransport transport) { boolean noRegistered = true; @@ -403,7 +407,7 @@ public class MockTransportService extends TransportService { /** * Adds a new delegate transport that is used for communication with the given transport address. * - * @return true iff no other delegate was registered for this address before, otherwise false + * @return true iff no other delegate was registered for this address before. */ public boolean addDelegate(TransportAddress transportAddress, DelegateTransport transport) { return transport().transports.put(transportAddress, transport) == null; @@ -454,7 +458,8 @@ public class MockTransportService extends TransportService { } @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException, TransportException { getTransport(node).sendRequest(node, requestId, action, request, options); } } @@ -513,7 +518,8 @@ public class MockTransportService extends TransportService { } @Override - public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException, TransportException { transport.sendRequest(node, requestId, action, request, options); }