diff --git a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java index c33db33b1f8..b65b313ebc7 100644 --- a/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java +++ b/core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTestCase.java @@ -19,8 +19,8 @@ package org.elasticsearch.transport; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -37,6 +37,8 @@ import org.junit.After; import org.junit.Before; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; @@ -51,9 +53,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; -/** - * - */ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { protected ThreadPool threadPool; @@ -62,7 +61,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { protected DiscoveryNode nodeA; protected MockTransportService serviceA; - protected static final Version version1 = Version.fromId(Version.CURRENT.id+1); + protected static final Version version1 = Version.fromId(Version.CURRENT.id + 1); protected DiscoveryNode nodeB; protected MockTransportService serviceB; @@ -74,24 +73,25 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { super.setUp(); threadPool = new TestThreadPool(getClass().getName()); serviceA = build( - Settings.builder() - .put("name", "TS_A") - .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") - .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") - .build(), - version0); + Settings.builder() + .put("name", "TS_A") + .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") + .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") + .build(), + version0); serviceA.acceptIncomingRequests(); nodeA = new DiscoveryNode("TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); + // serviceA.setLocalNode(nodeA); serviceB = build( - Settings.builder() - .put("name", "TS_B") - .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") - .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") - .build(), - version1); + Settings.builder() + .put("name", "TS_B") + .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") + .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") + .build(), + version1); serviceB.acceptIncomingRequests(); nodeB = new DiscoveryNode("TS_B", serviceB.boundAddress().publishAddress(), emptyMap(), emptySet(), version1); - + //serviceB.setLocalNode(nodeB); // wait till all nodes are properly connected and the event has been sent, so tests in this class // will not get this callback called on the connections done in this setup final boolean useLocalNode = randomBoolean(); @@ -140,41 +140,41 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testHelloWorld() { serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC, new TransportRequestHandler() { - @Override - public void messageReceived(StringMessageRequest request, TransportChannel channel) { - assertThat("moshe", equalTo(request.message)); - try { - channel.sendResponse(new StringMessageResponse("hello " + request.message)); - } catch (IOException e) { - logger.error("Unexpected failure", e); - fail(e.getMessage()); + @Override + public void messageReceived(StringMessageRequest request, TransportChannel channel) { + assertThat("moshe", equalTo(request.message)); + try { + channel.sendResponse(new StringMessageResponse("hello " + request.message)); + } catch (IOException e) { + logger.error("Unexpected failure", e); + fail(e.getMessage()); + } } - } - }); + }); TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", - new StringMessageRequest("moshe"), new TransportResponseHandler() { - @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); - } + new StringMessageRequest("moshe"), new TransportResponseHandler() { + @Override + public StringMessageResponse newInstance() { + return new StringMessageResponse(); + } - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } - @Override - public void handleResponse(StringMessageResponse response) { - assertThat("hello moshe", equalTo(response.message)); - } + @Override + public void handleResponse(StringMessageResponse response) { + assertThat("hello moshe", equalTo(response.message)); + } - @Override - public void handleException(TransportException exp) { - logger.error("Unexpected failure", exp); - fail("got exception instead of a response: " + exp.getMessage()); - } - }); + @Override + public void handleException(TransportException exp) { + logger.error("Unexpected failure", exp); + fail("got exception instead of a response: " + exp.getMessage()); + } + }); try { StringMessageResponse message = res.get(); @@ -185,27 +185,27 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { res = serviceB.submitRequest(nodeA, "sayHello", new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new TransportResponseHandler() { - @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); - } + @Override + public StringMessageResponse newInstance() { + return new StringMessageResponse(); + } - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } - @Override - public void handleResponse(StringMessageResponse response) { - assertThat("hello moshe", equalTo(response.message)); - } + @Override + public void handleResponse(StringMessageResponse response) { + assertThat("hello moshe", equalTo(response.message)); + } - @Override - public void handleException(TransportException exp) { - logger.error("Unexpected failure", exp); - fail("got exception instead of a response: " + exp.getMessage()); - } - }); + @Override + public void handleException(TransportException exp) { + logger.error("Unexpected failure", exp); + fail("got exception instead of a response: " + exp.getMessage()); + } + }); try { StringMessageResponse message = res.get(); @@ -335,7 +335,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { }); TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", - TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(), + TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(), new TransportResponseHandler() { @Override public TransportResponse.Empty newInstance() { @@ -382,10 +382,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { fail(e.getMessage()); } } - }); + }); TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", - new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), + new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(), new TransportResponseHandler() { @Override public StringMessageResponse newInstance() { @@ -427,30 +427,30 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { assertThat("moshe", equalTo(request.message)); throw new RuntimeException("bad message !!!"); } - }); + }); TransportFuture res = serviceB.submitRequest(nodeA, "sayHelloException", - new StringMessageRequest("moshe"), new TransportResponseHandler() { - @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); - } + new StringMessageRequest("moshe"), new TransportResponseHandler() { + @Override + public StringMessageResponse newInstance() { + return new StringMessageResponse(); + } - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } - @Override - public void handleResponse(StringMessageResponse response) { - fail("got response instead of exception"); - } + @Override + public void handleResponse(StringMessageResponse response) { + fail("got response instead of exception"); + } - @Override - public void handleException(TransportException exp) { - assertThat("runtime_exception: bad message !!!", equalTo(exp.getCause().getMessage())); - } - }); + @Override + public void handleException(TransportException exp) { + assertThat("runtime_exception: bad message !!!", equalTo(exp.getCause().getMessage())); + } + }); try { res.txGet(); @@ -497,7 +497,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } }); TransportFuture foobar = serviceB.submitRequest(nodeA, "foobar", - new StringMessageRequest(""), TransportRequestOptions.EMPTY, EmptyTransportResponseHandler.INSTANCE_SAME); + new StringMessageRequest(""), TransportRequestOptions.EMPTY, EmptyTransportResponseHandler.INSTANCE_SAME); latch2.countDown(); try { foobar.txGet(); @@ -519,7 +519,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { }); TransportFuture res = serviceB.submitRequest(nodeA, "sayHelloTimeoutNoResponse", - new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), + new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new TransportResponseHandler() { @Override public StringMessageResponse newInstance() { @@ -571,10 +571,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { fail(e.getMessage()); } } - }); + }); final CountDownLatch latch = new CountDownLatch(1); TransportFuture res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse", - new StringMessageRequest("2m"), TransportRequestOptions.builder().withTimeout(100).build(), + new StringMessageRequest("2m"), TransportRequestOptions.builder().withTimeout(100).build(), new TransportResponseHandler() { @Override public StringMessageResponse newInstance() { @@ -611,7 +611,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { final int counter = i; // now, try and send another request, this times, with a short timeout res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse", - new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(), + new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(), new TransportResponseHandler() { @Override public StringMessageResponse newInstance() { @@ -698,7 +698,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { tracer.reset(4); boolean timeout = randomBoolean(); - TransportRequestOptions options = timeout ? TransportRequestOptions.builder().withTimeout(1).build(): TransportRequestOptions.EMPTY; + TransportRequestOptions options = timeout ? TransportRequestOptions.builder().withTimeout(1).build() : + TransportRequestOptions.EMPTY; serviceA.sendRequest(nodeB, "test", new StringMessageRequest("", 10), options, noopResponseHandler); requestCompleted.acquire(); tracer.expectedEvents.get().await(); @@ -1137,27 +1138,27 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { serviceB.addFailToSendNoConnectRule(serviceA); TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", - new StringMessageRequest("moshe"), new TransportResponseHandler() { - @Override - public StringMessageResponse newInstance() { - return new StringMessageResponse(); - } + new StringMessageRequest("moshe"), new TransportResponseHandler() { + @Override + public StringMessageResponse newInstance() { + return new StringMessageResponse(); + } - @Override - public String executor() { - return ThreadPool.Names.GENERIC; - } + @Override + public String executor() { + return ThreadPool.Names.GENERIC; + } - @Override - public void handleResponse(StringMessageResponse response) { - fail("got response instead of exception"); - } + @Override + public void handleResponse(StringMessageResponse response) { + fail("got response instead of exception"); + } - @Override - public void handleException(TransportException exp) { - assertThat(exp.getCause().getMessage(), endsWith("DISCONNECT: simulated")); - } - }); + @Override + public void handleException(TransportException exp) { + assertThat(exp.getCause().getMessage(), endsWith("DISCONNECT: simulated")); + } + }); try { res.txGet(); @@ -1196,7 +1197,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { serviceB.addUnresponsiveRule(serviceA); TransportFuture res = serviceB.submitRequest(nodeA, "sayHello", - new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), + new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(), new TransportResponseHandler() { @Override public StringMessageResponse newInstance() { @@ -1289,18 +1290,18 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { public void testBlockingIncomingRequests() throws Exception { TransportService service = build( - Settings.builder() - .put("name", "TS_TEST") - .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") - .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") - .build(), - version0); + Settings.builder() + .put("name", "TS_TEST") + .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") + .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") + .build(), + version0); AtomicBoolean requestProcessed = new AtomicBoolean(); service.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME, - (request, channel) -> { - requestProcessed.set(true); - channel.sendResponse(TransportResponse.Empty.INSTANCE); - }); + (request, channel) -> { + requestProcessed.set(true); + channel.sendResponse(TransportResponse.Empty.INSTANCE); + }); DiscoveryNode node = new DiscoveryNode("TS_TEST", "TS_TEST", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); @@ -1340,8 +1341,239 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase { } public static class TestRequest extends TransportRequest { + + String info; + int resendCount; + + public TestRequest() { + } + + public TestRequest(String info) { + this.info = info; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + info = in.readOptionalString(); + resendCount = in.readInt(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(info); + out.writeInt(resendCount); + } + + @Override + public String toString() { + return "TestRequest{" + + "info='" + info + '\'' + + '}'; + } } private static class TestResponse extends TransportResponse { + + String info; + + public TestResponse() { + } + + public TestResponse(String info) { + this.info = info; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + info = in.readOptionalString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(info); + } + + @Override + public String toString() { + return "TestResponse{" + + "info='" + info + '\'' + + '}'; + } + } + + public void testSendRandomRequests() throws InterruptedException { + TransportService serviceC = build( + Settings.builder() + .put("name", "TS_TEST") + .put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "") + .put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING") + .build(), + version0); + DiscoveryNode nodeC = + new DiscoveryNode("TS_C", "TS_C", serviceC.boundAddress().publishAddress(), emptyMap(), emptySet(), version0); + serviceC.acceptIncomingRequests(); + + final CountDownLatch latch = new CountDownLatch(5); + TransportConnectionListener waitForConnection = new TransportConnectionListener() { + @Override + public void onNodeConnected(DiscoveryNode node) { + latch.countDown(); + } + + @Override + public void onNodeDisconnected(DiscoveryNode node) { + fail("disconnect should not be called " + node); + } + }; + serviceA.addConnectionListener(waitForConnection); + serviceB.addConnectionListener(waitForConnection); + serviceC.addConnectionListener(waitForConnection); + + serviceC.connectToNode(nodeA); + serviceC.connectToNode(nodeB); + serviceA.connectToNode(nodeC); + serviceB.connectToNode(nodeC); + serviceC.connectToNode(nodeC); + + latch.await(); + serviceA.removeConnectionListener(waitForConnection); + serviceB.removeConnectionListener(waitForConnection); + serviceB.removeConnectionListener(waitForConnection); + + + Map toNodeMap = new HashMap<>(); + toNodeMap.put(serviceA, nodeA); + toNodeMap.put(serviceB, nodeB); + toNodeMap.put(serviceC, nodeC); + AtomicBoolean fail = new AtomicBoolean(false); + class TestRequestHandler implements TransportRequestHandler { + + private final TransportService service; + + TestRequestHandler(TransportService service) { + this.service = service; + } + + @Override + public void messageReceived(TestRequest request, TransportChannel channel) throws Exception { + if (randomBoolean()) { + Thread.sleep(randomIntBetween(10, 50)); + } + if (fail.get()) { + throw new IOException("forced failure"); + } + + if (randomBoolean() && request.resendCount++ < 20) { + DiscoveryNode node = randomFrom(nodeA, nodeB, nodeC); + logger.debug("send secondary request from {} to {} - {}", toNodeMap.get(service), node, request.info); + service.sendRequest(node, "action1", new TestRequest("secondary " + request.info), + TransportRequestOptions.builder().withCompress(randomBoolean()).build(), + new TransportResponseHandler() { + @Override + public TestResponse newInstance() { + return new TestResponse(); + } + + @Override + public void handleResponse(TestResponse response) { + try { + if (randomBoolean()) { + Thread.sleep(randomIntBetween(10, 50)); + } + logger.debug("send secondary response {}", response.info); + + channel.sendResponse(response); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void handleException(TransportException exp) { + try { + logger.debug("send secondary exception response for request {}", request.info); + channel.sendResponse(exp); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public String executor() { + return randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC; + } + }); + } else { + logger.debug("send response for {}", request.info); + channel.sendResponse(new TestResponse("Response for: " + request.info)); + } + + } + } + serviceB.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), + new TestRequestHandler(serviceB)); + serviceC.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), + new TestRequestHandler(serviceC)); + serviceA.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC), + new TestRequestHandler(serviceA)); + int iters = randomIntBetween(30, 60); + CountDownLatch allRequestsDone = new CountDownLatch(iters); + class TestResponseHandler implements TransportResponseHandler { + + private final int id; + + public TestResponseHandler(int id) { + this.id = id; + } + + @Override + public TestResponse newInstance() { + return new TestResponse(); + } + + @Override + public void handleResponse(TestResponse response) { + logger.debug("---> received response: {}", response.info); + allRequestsDone.countDown(); + } + + @Override + public void handleException(TransportException exp) { + logger.debug("---> received exception for id {}", exp, id); + allRequestsDone.countDown(); + Throwable unwrap = ExceptionsHelper.unwrap(exp, IOException.class); + assertNotNull(unwrap); + assertEquals(IOException.class, unwrap.getClass()); + assertEquals("forced failure", unwrap.getMessage()); + } + + @Override + public String executor() { + return randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC; + } + } + + for (int i = 0; i < iters; i++) { + TransportService service = randomFrom(serviceC, serviceB, serviceA); + DiscoveryNode node = randomFrom(nodeC, nodeB, nodeA); + logger.debug("send from {} to {}", toNodeMap.get(service), node); + service.sendRequest(node, "action1", new TestRequest("REQ[" + i + "]"), + TransportRequestOptions.builder().withCompress(randomBoolean()).build(), new TestResponseHandler(i)); + } + logger.debug("waiting for response"); + fail.set(randomBoolean()); + boolean await = allRequestsDone.await(5, TimeUnit.SECONDS); + if (await == false) { + logger.debug("now failing forcefully"); + fail.set(true); + assertTrue(allRequestsDone.await(5, TimeUnit.SECONDS)); + } + logger.debug("DONE"); + serviceC.close(); + } }