diff --git a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java index 1350dcbb8ed..da01ca935f4 100644 --- a/core/src/test/java/org/elasticsearch/tribe/TribeIT.java +++ b/core/src/test/java/org/elasticsearch/tribe/TribeIT.java @@ -51,6 +51,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.function.Function; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -93,7 +94,7 @@ public class TribeIT extends ESIntegTestCase { }; cluster2 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), randomLong(), createTempDir(), 2, 2, - Strings.randomBase64UUID(getRandom()), nodeConfigurationSource, 0, false, SECOND_CLUSTER_NODE_PREFIX, Collections.emptyList()); + Strings.randomBase64UUID(getRandom()), nodeConfigurationSource, 0, false, SECOND_CLUSTER_NODE_PREFIX, Collections.emptyList(), Function.identity()); cluster2.beforeTest(getRandom(), 0.1); cluster2.ensureAtLeastNumDataNodes(2); diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index f1f757fa08f..ff09ba0e8f6 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -1803,7 +1803,16 @@ public abstract class ESIntegTestCase extends ESTestCase { return new InternalTestCluster(nodeMode, seed, createTempDir(), minNumDataNodes, maxNumDataNodes, InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(), - InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins); + InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins, getClientWrapper()); + } + + /** + * Returns a function that allows to wrap / filter all clients that are exposed by the test cluster. This is useful + * for debugging or request / response pre and post processing. It also allows to intercept all calls done by the test + * framework. By default this method returns an identity function {@link Function#identity()}. + */ + protected Function getClientWrapper() { + return Function.identity(); } /** Return the mock plugins the cluster should use */ diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 2f02e46f58b..01988f61558 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -114,6 +114,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -144,8 +145,6 @@ public final class InternalTestCluster extends TestCluster { private final ESLogger logger = Loggers.getLogger(getClass()); - static NodeConfigurationSource DEFAULT_SETTINGS_SOURCE = NodeConfigurationSource.EMPTY; - /** * A node level setting that holds a per node random seed that is consistent across node restarts */ @@ -221,14 +220,16 @@ public final class InternalTestCluster extends TestCluster { private ServiceDisruptionScheme activeDisruptionScheme; private String nodeMode; + private Function clientWrapper; public InternalTestCluster(String nodeMode, long clusterSeed, Path baseDir, int minNumDataNodes, int maxNumDataNodes, String clusterName, NodeConfigurationSource nodeConfigurationSource, int numClientNodes, - boolean enableHttpPipelining, String nodePrefix, Collection> mockPlugins) { + boolean enableHttpPipelining, String nodePrefix, Collection> mockPlugins, Function clientWrapper) { super(clusterSeed); if ("network".equals(nodeMode) == false && "local".equals(nodeMode) == false) { throw new IllegalArgumentException("Unknown nodeMode: " + nodeMode); } + this.clientWrapper = clientWrapper; this.nodeMode = nodeMode; this.baseDir = baseDir; this.clusterName = clusterName; @@ -798,20 +799,20 @@ public final class InternalTestCluster extends TestCluster { } private Client getOrBuildNodeClient() { - if (nodeClient != null) { - return nodeClient; + if (nodeClient == null) { + nodeClient = node.client(); } - return nodeClient = node.client(); + return clientWrapper.apply(nodeClient); } private Client getOrBuildTransportClient() { - if (transportClient != null) { - return transportClient; + if (transportClient == null) { + /* no sniff client for now - doesn't work will all tests since it might throw NoNodeAvailableException if nodes are shut down. + * we first need support of transportClientRatio as annotations or so + */ + transportClient = new TransportClientFactory(false, nodeConfigurationSource.transportClientSettings(), baseDir, nodeMode, nodeConfigurationSource.transportClientPlugins()).client(node, clusterName); } - /* no sniff client for now - doesn't work will all tests since it might throw NoNodeAvailableException if nodes are shut down. - * we first need support of transportClientRatio as annotations or so - */ - return transportClient = new TransportClientFactory(false, nodeConfigurationSource.transportClientSettings(), baseDir, nodeMode, nodeConfigurationSource.transportClientPlugins()).client(node, clusterName); + return clientWrapper.apply(transportClient); } void resetClient() throws IOException { diff --git a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java index c1cfa56c8be..0764ad4ebea 100644 --- a/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java +++ b/test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterTests.java @@ -34,6 +34,7 @@ import java.util.Iterator; import java.util.Map; import java.util.Random; import java.util.Set; +import java.util.function.Function; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; @@ -56,8 +57,8 @@ public class InternalTestClusterTests extends ESTestCase { String nodePrefix = randomRealisticUnicodeOfCodepointLengthBetween(1, 10); Path baseDir = createTempDir(); - InternalTestCluster cluster0 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList()); - InternalTestCluster cluster1 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList()); + InternalTestCluster cluster0 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity()); + InternalTestCluster cluster1 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity()); // TODO: this is not ideal - we should have a way to make sure ports are initialized in the same way assertClusters(cluster0, cluster1, false); @@ -114,8 +115,8 @@ public class InternalTestClusterTests extends ESTestCase { String nodePrefix = "foobar"; Path baseDir = createTempDir(); - InternalTestCluster cluster0 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList()); - InternalTestCluster cluster1 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList()); + InternalTestCluster cluster0 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity()); + InternalTestCluster cluster1 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity()); assertClusters(cluster0, cluster1, false); long seed = randomLong();