Merge pull request #16101 from s1monw/client_wrapper_for_test_cluster

Add infrastructure to wrap/filter all clients exposed by the test clusters
This commit is contained in:
Simon Willnauer 2016-01-20 09:34:14 +01:00
commit f5b68cd0c0
4 changed files with 30 additions and 18 deletions

View File

@ -51,6 +51,7 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Map; import java.util.Map;
import java.util.function.Function;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@ -93,7 +94,7 @@ public class TribeIT extends ESIntegTestCase {
}; };
cluster2 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), randomLong(), createTempDir(), 2, 2, cluster2 = new InternalTestCluster(InternalTestCluster.configuredNodeMode(), randomLong(), createTempDir(), 2, 2,
Strings.randomBase64UUID(getRandom()), nodeConfigurationSource, 0, false, SECOND_CLUSTER_NODE_PREFIX, Collections.emptyList()); Strings.randomBase64UUID(getRandom()), nodeConfigurationSource, 0, false, SECOND_CLUSTER_NODE_PREFIX, Collections.emptyList(), Function.identity());
cluster2.beforeTest(getRandom(), 0.1); cluster2.beforeTest(getRandom(), 0.1);
cluster2.ensureAtLeastNumDataNodes(2); cluster2.ensureAtLeastNumDataNodes(2);

View File

@ -1803,7 +1803,16 @@ public abstract class ESIntegTestCase extends ESTestCase {
return new InternalTestCluster(nodeMode, seed, createTempDir(), minNumDataNodes, maxNumDataNodes, return new InternalTestCluster(nodeMode, seed, createTempDir(), minNumDataNodes, maxNumDataNodes,
InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(), InternalTestCluster.clusterName(scope.name(), seed) + "-cluster", nodeConfigurationSource, getNumClientNodes(),
InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins); InternalTestCluster.DEFAULT_ENABLE_HTTP_PIPELINING, nodePrefix, mockPlugins, getClientWrapper());
}
/**
* Returns a function that allows to wrap / filter all clients that are exposed by the test cluster. This is useful
* for debugging or request / response pre and post processing. It also allows to intercept all calls done by the test
* framework. By default this method returns an identity function {@link Function#identity()}.
*/
protected Function<Client,Client> getClientWrapper() {
return Function.identity();
} }
/** Return the mock plugins the cluster should use */ /** Return the mock plugins the cluster should use */

View File

@ -114,6 +114,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate; import java.util.function.Predicate;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -144,8 +145,6 @@ public final class InternalTestCluster extends TestCluster {
private final ESLogger logger = Loggers.getLogger(getClass()); private final ESLogger logger = Loggers.getLogger(getClass());
static NodeConfigurationSource DEFAULT_SETTINGS_SOURCE = NodeConfigurationSource.EMPTY;
/** /**
* A node level setting that holds a per node random seed that is consistent across node restarts * A node level setting that holds a per node random seed that is consistent across node restarts
*/ */
@ -221,14 +220,16 @@ public final class InternalTestCluster extends TestCluster {
private ServiceDisruptionScheme activeDisruptionScheme; private ServiceDisruptionScheme activeDisruptionScheme;
private String nodeMode; private String nodeMode;
private Function<Client, Client> clientWrapper;
public InternalTestCluster(String nodeMode, long clusterSeed, Path baseDir, public InternalTestCluster(String nodeMode, long clusterSeed, Path baseDir,
int minNumDataNodes, int maxNumDataNodes, String clusterName, NodeConfigurationSource nodeConfigurationSource, int numClientNodes, int minNumDataNodes, int maxNumDataNodes, String clusterName, NodeConfigurationSource nodeConfigurationSource, int numClientNodes,
boolean enableHttpPipelining, String nodePrefix, Collection<Class<? extends Plugin>> mockPlugins) { boolean enableHttpPipelining, String nodePrefix, Collection<Class<? extends Plugin>> mockPlugins, Function<Client, Client> clientWrapper) {
super(clusterSeed); super(clusterSeed);
if ("network".equals(nodeMode) == false && "local".equals(nodeMode) == false) { if ("network".equals(nodeMode) == false && "local".equals(nodeMode) == false) {
throw new IllegalArgumentException("Unknown nodeMode: " + nodeMode); throw new IllegalArgumentException("Unknown nodeMode: " + nodeMode);
} }
this.clientWrapper = clientWrapper;
this.nodeMode = nodeMode; this.nodeMode = nodeMode;
this.baseDir = baseDir; this.baseDir = baseDir;
this.clusterName = clusterName; this.clusterName = clusterName;
@ -798,20 +799,20 @@ public final class InternalTestCluster extends TestCluster {
} }
private Client getOrBuildNodeClient() { private Client getOrBuildNodeClient() {
if (nodeClient != null) { if (nodeClient == null) {
return nodeClient; nodeClient = node.client();
} }
return nodeClient = node.client(); return clientWrapper.apply(nodeClient);
} }
private Client getOrBuildTransportClient() { private Client getOrBuildTransportClient() {
if (transportClient != null) { if (transportClient == null) {
return transportClient; /* no sniff client for now - doesn't work will all tests since it might throw NoNodeAvailableException if nodes are shut down.
* we first need support of transportClientRatio as annotations or so
*/
transportClient = new TransportClientFactory(false, nodeConfigurationSource.transportClientSettings(), baseDir, nodeMode, nodeConfigurationSource.transportClientPlugins()).client(node, clusterName);
} }
/* no sniff client for now - doesn't work will all tests since it might throw NoNodeAvailableException if nodes are shut down. return clientWrapper.apply(transportClient);
* we first need support of transportClientRatio as annotations or so
*/
return transportClient = new TransportClientFactory(false, nodeConfigurationSource.transportClientSettings(), baseDir, nodeMode, nodeConfigurationSource.transportClientPlugins()).client(node, clusterName);
} }
void resetClient() throws IOException { void resetClient() throws IOException {

View File

@ -34,6 +34,7 @@ import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.function.Function;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry; import static org.hamcrest.Matchers.hasEntry;
@ -56,8 +57,8 @@ public class InternalTestClusterTests extends ESTestCase {
String nodePrefix = randomRealisticUnicodeOfCodepointLengthBetween(1, 10); String nodePrefix = randomRealisticUnicodeOfCodepointLengthBetween(1, 10);
Path baseDir = createTempDir(); Path baseDir = createTempDir();
InternalTestCluster cluster0 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList()); InternalTestCluster cluster0 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity());
InternalTestCluster cluster1 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList()); InternalTestCluster cluster1 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity());
// TODO: this is not ideal - we should have a way to make sure ports are initialized in the same way // TODO: this is not ideal - we should have a way to make sure ports are initialized in the same way
assertClusters(cluster0, cluster1, false); assertClusters(cluster0, cluster1, false);
@ -114,8 +115,8 @@ public class InternalTestClusterTests extends ESTestCase {
String nodePrefix = "foobar"; String nodePrefix = "foobar";
Path baseDir = createTempDir(); Path baseDir = createTempDir();
InternalTestCluster cluster0 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList()); InternalTestCluster cluster0 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName1, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity());
InternalTestCluster cluster1 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList()); InternalTestCluster cluster1 = new InternalTestCluster("local", clusterSeed, baseDir, minNumDataNodes, maxNumDataNodes, clusterName2, nodeConfigurationSource, numClientNodes, enableHttpPipelining, nodePrefix, Collections.emptyList(), Function.identity());
assertClusters(cluster0, cluster1, false); assertClusters(cluster0, cluster1, false);
long seed = randomLong(); long seed = randomLong();