mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 10:25:15 +00:00
Add a unit test that sends random requests among 3 nodes (#19329)
This adds a test that uses transport implementation and sends random requests to 3 different nodes, the request handlers maybe forwarding the requests to yet another node etc. until returning the response. This test basically tests that nodes are not deadlocking in a distributed fashion.
This commit is contained in:
parent
d29d2f8793
commit
f6ac147b1d
@ -19,8 +19,8 @@
|
||||
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
@ -37,6 +37,8 @@ import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Semaphore;
|
||||
@ -51,9 +53,6 @@ import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
|
||||
protected ThreadPool threadPool;
|
||||
@ -62,7 +61,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
protected DiscoveryNode nodeA;
|
||||
protected MockTransportService serviceA;
|
||||
|
||||
protected static final Version version1 = Version.fromId(Version.CURRENT.id+1);
|
||||
protected static final Version version1 = Version.fromId(Version.CURRENT.id + 1);
|
||||
protected DiscoveryNode nodeB;
|
||||
protected MockTransportService serviceB;
|
||||
|
||||
@ -74,24 +73,25 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
super.setUp();
|
||||
threadPool = new TestThreadPool(getClass().getName());
|
||||
serviceA = build(
|
||||
Settings.builder()
|
||||
.put("name", "TS_A")
|
||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
|
||||
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
|
||||
.build(),
|
||||
version0);
|
||||
Settings.builder()
|
||||
.put("name", "TS_A")
|
||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
|
||||
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
|
||||
.build(),
|
||||
version0);
|
||||
serviceA.acceptIncomingRequests();
|
||||
nodeA = new DiscoveryNode("TS_A", serviceA.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
|
||||
// serviceA.setLocalNode(nodeA);
|
||||
serviceB = build(
|
||||
Settings.builder()
|
||||
.put("name", "TS_B")
|
||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
|
||||
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
|
||||
.build(),
|
||||
version1);
|
||||
Settings.builder()
|
||||
.put("name", "TS_B")
|
||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
|
||||
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
|
||||
.build(),
|
||||
version1);
|
||||
serviceB.acceptIncomingRequests();
|
||||
nodeB = new DiscoveryNode("TS_B", serviceB.boundAddress().publishAddress(), emptyMap(), emptySet(), version1);
|
||||
|
||||
//serviceB.setLocalNode(nodeB);
|
||||
// wait till all nodes are properly connected and the event has been sent, so tests in this class
|
||||
// will not get this callback called on the connections done in this setup
|
||||
final boolean useLocalNode = randomBoolean();
|
||||
@ -140,41 +140,41 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
public void testHelloWorld() {
|
||||
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
|
||||
new TransportRequestHandler<StringMessageRequest>() {
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
try {
|
||||
channel.sendResponse(new StringMessageResponse("hello " + request.message));
|
||||
} catch (IOException e) {
|
||||
logger.error("Unexpected failure", e);
|
||||
fail(e.getMessage());
|
||||
@Override
|
||||
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
try {
|
||||
channel.sendResponse(new StringMessageResponse("hello " + request.message));
|
||||
} catch (IOException e) {
|
||||
logger.error("Unexpected failure", e);
|
||||
fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
new StringMessageRequest("moshe"), new TransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
new StringMessageRequest("moshe"), new TransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
assertThat("hello moshe", equalTo(response.message));
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
assertThat("hello moshe", equalTo(response.message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response: " + exp.getMessage());
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response: " + exp.getMessage());
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
StringMessageResponse message = res.get();
|
||||
@ -185,27 +185,27 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
|
||||
res = serviceB.submitRequest(nodeA, "sayHello", new StringMessageRequest("moshe"),
|
||||
TransportRequestOptions.builder().withCompress(true).build(), new TransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
assertThat("hello moshe", equalTo(response.message));
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
assertThat("hello moshe", equalTo(response.message));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response: " + exp.getMessage());
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.error("Unexpected failure", exp);
|
||||
fail("got exception instead of a response: " + exp.getMessage());
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
StringMessageResponse message = res.get();
|
||||
@ -335,7 +335,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
});
|
||||
|
||||
TransportFuture<TransportResponse.Empty> res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(),
|
||||
TransportRequest.Empty.INSTANCE, TransportRequestOptions.builder().withCompress(true).build(),
|
||||
new TransportResponseHandler<TransportResponse.Empty>() {
|
||||
@Override
|
||||
public TransportResponse.Empty newInstance() {
|
||||
@ -382,10 +382,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(),
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withCompress(true).build(),
|
||||
new TransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
@ -427,30 +427,30 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
assertThat("moshe", equalTo(request.message));
|
||||
throw new RuntimeException("bad message !!!");
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloException",
|
||||
new StringMessageRequest("moshe"), new TransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
new StringMessageRequest("moshe"), new TransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
fail("got response instead of exception");
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
fail("got response instead of exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
assertThat("runtime_exception: bad message !!!", equalTo(exp.getCause().getMessage()));
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
assertThat("runtime_exception: bad message !!!", equalTo(exp.getCause().getMessage()));
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
res.txGet();
|
||||
@ -497,7 +497,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
}
|
||||
});
|
||||
TransportFuture<TransportResponse.Empty> foobar = serviceB.submitRequest(nodeA, "foobar",
|
||||
new StringMessageRequest(""), TransportRequestOptions.EMPTY, EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||
new StringMessageRequest(""), TransportRequestOptions.EMPTY, EmptyTransportResponseHandler.INSTANCE_SAME);
|
||||
latch2.countDown();
|
||||
try {
|
||||
foobar.txGet();
|
||||
@ -519,7 +519,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
});
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloTimeoutNoResponse",
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(),
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(),
|
||||
new TransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
@ -571,10 +571,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
fail(e.getMessage());
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse",
|
||||
new StringMessageRequest("2m"), TransportRequestOptions.builder().withTimeout(100).build(),
|
||||
new StringMessageRequest("2m"), TransportRequestOptions.builder().withTimeout(100).build(),
|
||||
new TransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
@ -611,7 +611,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
final int counter = i;
|
||||
// now, try and send another request, this times, with a short timeout
|
||||
res = serviceB.submitRequest(nodeA, "sayHelloTimeoutDelayedResponse",
|
||||
new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(),
|
||||
new StringMessageRequest(counter + "ms"), TransportRequestOptions.builder().withTimeout(3000).build(),
|
||||
new TransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
@ -698,7 +698,8 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
|
||||
tracer.reset(4);
|
||||
boolean timeout = randomBoolean();
|
||||
TransportRequestOptions options = timeout ? TransportRequestOptions.builder().withTimeout(1).build(): TransportRequestOptions.EMPTY;
|
||||
TransportRequestOptions options = timeout ? TransportRequestOptions.builder().withTimeout(1).build() :
|
||||
TransportRequestOptions.EMPTY;
|
||||
serviceA.sendRequest(nodeB, "test", new StringMessageRequest("", 10), options, noopResponseHandler);
|
||||
requestCompleted.acquire();
|
||||
tracer.expectedEvents.get().await();
|
||||
@ -1137,27 +1138,27 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
serviceB.addFailToSendNoConnectRule(serviceA);
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
new StringMessageRequest("moshe"), new TransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
new StringMessageRequest("moshe"), new TransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
return new StringMessageResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
@Override
|
||||
public String executor() {
|
||||
return ThreadPool.Names.GENERIC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
fail("got response instead of exception");
|
||||
}
|
||||
@Override
|
||||
public void handleResponse(StringMessageResponse response) {
|
||||
fail("got response instead of exception");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
assertThat(exp.getCause().getMessage(), endsWith("DISCONNECT: simulated"));
|
||||
}
|
||||
});
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
assertThat(exp.getCause().getMessage(), endsWith("DISCONNECT: simulated"));
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
res.txGet();
|
||||
@ -1196,7 +1197,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
serviceB.addUnresponsiveRule(serviceA);
|
||||
|
||||
TransportFuture<StringMessageResponse> res = serviceB.submitRequest(nodeA, "sayHello",
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(),
|
||||
new StringMessageRequest("moshe"), TransportRequestOptions.builder().withTimeout(100).build(),
|
||||
new TransportResponseHandler<StringMessageResponse>() {
|
||||
@Override
|
||||
public StringMessageResponse newInstance() {
|
||||
@ -1289,18 +1290,18 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
|
||||
public void testBlockingIncomingRequests() throws Exception {
|
||||
TransportService service = build(
|
||||
Settings.builder()
|
||||
.put("name", "TS_TEST")
|
||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
|
||||
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
|
||||
.build(),
|
||||
version0);
|
||||
Settings.builder()
|
||||
.put("name", "TS_TEST")
|
||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
|
||||
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
|
||||
.build(),
|
||||
version0);
|
||||
AtomicBoolean requestProcessed = new AtomicBoolean();
|
||||
service.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
|
||||
(request, channel) -> {
|
||||
requestProcessed.set(true);
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
});
|
||||
(request, channel) -> {
|
||||
requestProcessed.set(true);
|
||||
channel.sendResponse(TransportResponse.Empty.INSTANCE);
|
||||
});
|
||||
|
||||
DiscoveryNode node =
|
||||
new DiscoveryNode("TS_TEST", "TS_TEST", service.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
|
||||
@ -1340,8 +1341,239 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
|
||||
}
|
||||
|
||||
public static class TestRequest extends TransportRequest {
|
||||
|
||||
String info;
|
||||
int resendCount;
|
||||
|
||||
public TestRequest() {
|
||||
}
|
||||
|
||||
public TestRequest(String info) {
|
||||
this.info = info;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
info = in.readOptionalString();
|
||||
resendCount = in.readInt();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeOptionalString(info);
|
||||
out.writeInt(resendCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TestRequest{" +
|
||||
"info='" + info + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestResponse extends TransportResponse {
|
||||
|
||||
String info;
|
||||
|
||||
public TestResponse() {
|
||||
}
|
||||
|
||||
public TestResponse(String info) {
|
||||
this.info = info;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
info = in.readOptionalString();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeOptionalString(info);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TestResponse{" +
|
||||
"info='" + info + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
||||
public void testSendRandomRequests() throws InterruptedException {
|
||||
TransportService serviceC = build(
|
||||
Settings.builder()
|
||||
.put("name", "TS_TEST")
|
||||
.put(TransportService.TRACE_LOG_INCLUDE_SETTING.getKey(), "")
|
||||
.put(TransportService.TRACE_LOG_EXCLUDE_SETTING.getKey(), "NOTHING")
|
||||
.build(),
|
||||
version0);
|
||||
DiscoveryNode nodeC =
|
||||
new DiscoveryNode("TS_C", "TS_C", serviceC.boundAddress().publishAddress(), emptyMap(), emptySet(), version0);
|
||||
serviceC.acceptIncomingRequests();
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(5);
|
||||
TransportConnectionListener waitForConnection = new TransportConnectionListener() {
|
||||
@Override
|
||||
public void onNodeConnected(DiscoveryNode node) {
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNodeDisconnected(DiscoveryNode node) {
|
||||
fail("disconnect should not be called " + node);
|
||||
}
|
||||
};
|
||||
serviceA.addConnectionListener(waitForConnection);
|
||||
serviceB.addConnectionListener(waitForConnection);
|
||||
serviceC.addConnectionListener(waitForConnection);
|
||||
|
||||
serviceC.connectToNode(nodeA);
|
||||
serviceC.connectToNode(nodeB);
|
||||
serviceA.connectToNode(nodeC);
|
||||
serviceB.connectToNode(nodeC);
|
||||
serviceC.connectToNode(nodeC);
|
||||
|
||||
latch.await();
|
||||
serviceA.removeConnectionListener(waitForConnection);
|
||||
serviceB.removeConnectionListener(waitForConnection);
|
||||
serviceB.removeConnectionListener(waitForConnection);
|
||||
|
||||
|
||||
Map<TransportService, DiscoveryNode> toNodeMap = new HashMap<>();
|
||||
toNodeMap.put(serviceA, nodeA);
|
||||
toNodeMap.put(serviceB, nodeB);
|
||||
toNodeMap.put(serviceC, nodeC);
|
||||
AtomicBoolean fail = new AtomicBoolean(false);
|
||||
class TestRequestHandler implements TransportRequestHandler<TestRequest> {
|
||||
|
||||
private final TransportService service;
|
||||
|
||||
TestRequestHandler(TransportService service) {
|
||||
this.service = service;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(TestRequest request, TransportChannel channel) throws Exception {
|
||||
if (randomBoolean()) {
|
||||
Thread.sleep(randomIntBetween(10, 50));
|
||||
}
|
||||
if (fail.get()) {
|
||||
throw new IOException("forced failure");
|
||||
}
|
||||
|
||||
if (randomBoolean() && request.resendCount++ < 20) {
|
||||
DiscoveryNode node = randomFrom(nodeA, nodeB, nodeC);
|
||||
logger.debug("send secondary request from {} to {} - {}", toNodeMap.get(service), node, request.info);
|
||||
service.sendRequest(node, "action1", new TestRequest("secondary " + request.info),
|
||||
TransportRequestOptions.builder().withCompress(randomBoolean()).build(),
|
||||
new TransportResponseHandler<TestResponse>() {
|
||||
@Override
|
||||
public TestResponse newInstance() {
|
||||
return new TestResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(TestResponse response) {
|
||||
try {
|
||||
if (randomBoolean()) {
|
||||
Thread.sleep(randomIntBetween(10, 50));
|
||||
}
|
||||
logger.debug("send secondary response {}", response.info);
|
||||
|
||||
channel.sendResponse(response);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
try {
|
||||
logger.debug("send secondary exception response for request {}", request.info);
|
||||
channel.sendResponse(exp);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC;
|
||||
}
|
||||
});
|
||||
} else {
|
||||
logger.debug("send response for {}", request.info);
|
||||
channel.sendResponse(new TestResponse("Response for: " + request.info));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
serviceB.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
|
||||
new TestRequestHandler(serviceB));
|
||||
serviceC.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
|
||||
new TestRequestHandler(serviceC));
|
||||
serviceA.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
|
||||
new TestRequestHandler(serviceA));
|
||||
int iters = randomIntBetween(30, 60);
|
||||
CountDownLatch allRequestsDone = new CountDownLatch(iters);
|
||||
class TestResponseHandler implements TransportResponseHandler<TestResponse> {
|
||||
|
||||
private final int id;
|
||||
|
||||
public TestResponseHandler(int id) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TestResponse newInstance() {
|
||||
return new TestResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleResponse(TestResponse response) {
|
||||
logger.debug("---> received response: {}", response.info);
|
||||
allRequestsDone.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
logger.debug("---> received exception for id {}", exp, id);
|
||||
allRequestsDone.countDown();
|
||||
Throwable unwrap = ExceptionsHelper.unwrap(exp, IOException.class);
|
||||
assertNotNull(unwrap);
|
||||
assertEquals(IOException.class, unwrap.getClass());
|
||||
assertEquals("forced failure", unwrap.getMessage());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String executor() {
|
||||
return randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC;
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < iters; i++) {
|
||||
TransportService service = randomFrom(serviceC, serviceB, serviceA);
|
||||
DiscoveryNode node = randomFrom(nodeC, nodeB, nodeA);
|
||||
logger.debug("send from {} to {}", toNodeMap.get(service), node);
|
||||
service.sendRequest(node, "action1", new TestRequest("REQ[" + i + "]"),
|
||||
TransportRequestOptions.builder().withCompress(randomBoolean()).build(), new TestResponseHandler(i));
|
||||
}
|
||||
logger.debug("waiting for response");
|
||||
fail.set(randomBoolean());
|
||||
boolean await = allRequestsDone.await(5, TimeUnit.SECONDS);
|
||||
if (await == false) {
|
||||
logger.debug("now failing forcefully");
|
||||
fail.set(true);
|
||||
assertTrue(allRequestsDone.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
logger.debug("DONE");
|
||||
serviceC.close();
|
||||
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user