[Tests] Simplify GceDiscoverTests (#28726)
GceDiscoverTests can be simplified in a similar manner than #27945. It now uses a mocked GceInstancesService that exposes internal test cluster nodes as if they were real GCE nodes. It should also make the test more robust by not using a HTTP server anymore. closes #24313
This commit is contained in:
parent
779bc6fd5c
commit
207ca1cc38
|
@ -31,7 +31,7 @@ import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
|
||||||
public interface GceInstancesService {
|
public interface GceInstancesService extends Closeable {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* GCE API Version: Elasticsearch/GceCloud/1.0
|
* GCE API Version: Elasticsearch/GceCloud/1.0
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.cloud.gce;
|
package org.elasticsearch.cloud.gce;
|
||||||
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.security.GeneralSecurityException;
|
import java.security.GeneralSecurityException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -39,7 +38,6 @@ import com.google.api.services.compute.model.Instance;
|
||||||
import com.google.api.services.compute.model.InstanceList;
|
import com.google.api.services.compute.model.InstanceList;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.apache.logging.log4j.util.Supplier;
|
import org.apache.logging.log4j.util.Supplier;
|
||||||
import org.elasticsearch.SpecialPermission;
|
|
||||||
import org.elasticsearch.cloud.gce.util.Access;
|
import org.elasticsearch.cloud.gce.util.Access;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
|
@ -48,7 +46,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.discovery.gce.RetryHttpInitializerWrapper;
|
import org.elasticsearch.discovery.gce.RetryHttpInitializerWrapper;
|
||||||
|
|
||||||
public class GceInstancesServiceImpl extends AbstractComponent implements GceInstancesService, Closeable {
|
public class GceInstancesServiceImpl extends AbstractComponent implements GceInstancesService {
|
||||||
|
|
||||||
// all settings just used for testing - not registered by default
|
// all settings just used for testing - not registered by default
|
||||||
public static final Setting<Boolean> GCE_VALIDATE_CERTIFICATES =
|
public static final Setting<Boolean> GCE_VALIDATE_CERTIFICATES =
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close
|
||||||
private final Settings settings;
|
private final Settings settings;
|
||||||
private static final Logger logger = Loggers.getLogger(GceDiscoveryPlugin.class);
|
private static final Logger logger = Loggers.getLogger(GceDiscoveryPlugin.class);
|
||||||
// stashed when created in order to properly close
|
// stashed when created in order to properly close
|
||||||
private final SetOnce<GceInstancesServiceImpl> gceInstancesService = new SetOnce<>();
|
private final SetOnce<GceInstancesService> gceInstancesService = new SetOnce<>();
|
||||||
|
|
||||||
static {
|
static {
|
||||||
/*
|
/*
|
||||||
|
@ -72,13 +72,16 @@ public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close
|
||||||
logger.trace("starting gce discovery plugin...");
|
logger.trace("starting gce discovery plugin...");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// overrideable for tests
|
||||||
|
protected GceInstancesService createGceInstancesService() {
|
||||||
|
return new GceInstancesServiceImpl(settings);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
|
public Map<String, Supplier<UnicastHostsProvider>> getZenHostsProviders(TransportService transportService,
|
||||||
NetworkService networkService) {
|
NetworkService networkService) {
|
||||||
return Collections.singletonMap(GCE, () -> {
|
return Collections.singletonMap(GCE, () -> {
|
||||||
gceInstancesService.set(new GceInstancesServiceImpl(settings));
|
gceInstancesService.set(createGceInstancesService());
|
||||||
return new GceUnicastHostsProvider(settings, gceInstancesService.get(), transportService, networkService);
|
return new GceUnicastHostsProvider(settings, gceInstancesService.get(), transportService, networkService);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,88 +19,57 @@
|
||||||
|
|
||||||
package org.elasticsearch.discovery.gce;
|
package org.elasticsearch.discovery.gce;
|
||||||
|
|
||||||
import com.sun.net.httpserver.Headers;
|
import com.google.api.services.compute.model.Instance;
|
||||||
import com.sun.net.httpserver.HttpServer;
|
import com.google.api.services.compute.model.NetworkInterface;
|
||||||
import com.sun.net.httpserver.HttpsConfigurator;
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||||
import com.sun.net.httpserver.HttpsServer;
|
import org.elasticsearch.cloud.gce.GceInstancesService;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.elasticsearch.cloud.gce.util.Access;
|
||||||
import org.elasticsearch.cloud.gce.GceInstancesServiceImpl;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cloud.gce.GceMetadataService;
|
|
||||||
import org.elasticsearch.common.SuppressForbidden;
|
|
||||||
import org.elasticsearch.common.io.FileSystemUtils;
|
|
||||||
import org.elasticsearch.common.logging.Loggers;
|
|
||||||
import org.elasticsearch.common.settings.Setting;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.mocksocket.MockHttpServer;
|
import org.elasticsearch.discovery.zen.ZenDiscovery;
|
||||||
import org.elasticsearch.plugin.discovery.gce.GceDiscoveryPlugin;
|
import org.elasticsearch.plugin.discovery.gce.GceDiscoveryPlugin;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
import org.junit.AfterClass;
|
import org.elasticsearch.transport.TransportService;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.After;
|
||||||
|
|
||||||
import javax.net.ssl.KeyManagerFactory;
|
|
||||||
import javax.net.ssl.SSLContext;
|
|
||||||
import javax.net.ssl.TrustManagerFactory;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.util.ArrayList;
|
||||||
import java.io.OutputStream;
|
|
||||||
import java.net.InetAddress;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.nio.charset.StandardCharsets;
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Path;
|
|
||||||
import java.security.KeyStore;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import static java.util.Collections.singletonList;
|
||||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
|
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
|
||||||
|
|
||||||
|
|
||||||
@ESIntegTestCase.ClusterScope(supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0)
|
@ESIntegTestCase.ClusterScope(supportsDedicatedMasters = false, numDataNodes = 0, numClientNodes = 0)
|
||||||
@SuppressForbidden(reason = "use http server")
|
|
||||||
// TODO this should be a IT but currently all ITs in this project run against a real cluster
|
|
||||||
public class GceDiscoverTests extends ESIntegTestCase {
|
public class GceDiscoverTests extends ESIntegTestCase {
|
||||||
|
|
||||||
public static class TestPlugin extends Plugin {
|
/** Holds a list of the current discovery nodes started in tests **/
|
||||||
@Override
|
private static final Map<String, DiscoveryNode> nodes = new ConcurrentHashMap<>();
|
||||||
public List<Setting<?>> getSettings() {
|
|
||||||
return Arrays.asList(GceMetadataService.GCE_HOST, GceInstancesServiceImpl.GCE_ROOT_URL,
|
|
||||||
GceInstancesServiceImpl.GCE_VALIDATE_CERTIFICATES);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static HttpsServer httpsServer;
|
@After
|
||||||
private static HttpServer httpServer;
|
public void clearGceNodes() {
|
||||||
private static Path logDir;
|
nodes.clear();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||||
return Arrays.asList(GceDiscoveryPlugin.class, TestPlugin.class);
|
return singletonList(TestPlugin.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Settings nodeSettings(int nodeOrdinal) {
|
protected Settings nodeSettings(int nodeOrdinal) {
|
||||||
Path resolve = logDir.resolve(Integer.toString(nodeOrdinal));
|
return Settings.builder()
|
||||||
try {
|
.put(super.nodeSettings(nodeOrdinal))
|
||||||
Files.createDirectory(resolve);
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
|
|
||||||
.put("discovery.zen.hosts_provider", "gce")
|
.put("discovery.zen.hosts_provider", "gce")
|
||||||
.put("path.logs", resolve)
|
.put("cloud.gce.project_id", "test")
|
||||||
.put("transport.tcp.port", 0)
|
.put("cloud.gce.zone", "test")
|
||||||
.put("node.portsfile", "true")
|
// Make the test run faster
|
||||||
.put("cloud.gce.project_id", "testproject")
|
.put(ZenDiscovery.JOIN_TIMEOUT_SETTING.getKey(), "1s")
|
||||||
.put("cloud.gce.zone", "primaryzone")
|
.put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "500ms")
|
||||||
.put("cloud.gce.host", "http://" + httpServer.getAddress().getHostName() + ":" + httpServer.getAddress().getPort())
|
|
||||||
.put("cloud.gce.root_url", "https://" + httpsServer.getAddress().getHostName() +
|
|
||||||
":" + httpsServer.getAddress().getPort())
|
|
||||||
// this is annoying but by default the client pulls a static list of trusted CAs
|
|
||||||
.put("cloud.gce.validate_certificates", false)
|
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -109,102 +78,102 @@ public class GceDiscoverTests extends ESIntegTestCase {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
public void testJoin() {
|
||||||
public static void startHttpd() throws Exception {
|
// start master node
|
||||||
logDir = createTempDir();
|
final String masterNode = internalCluster().startMasterOnlyNode();
|
||||||
SSLContext sslContext = getSSLContext();
|
registerGceNode(masterNode);
|
||||||
httpsServer = MockHttpServer.createHttps(new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), 0), 0);
|
|
||||||
httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), 0), 0);
|
|
||||||
httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext));
|
|
||||||
httpServer.createContext("/computeMetadata/v1/instance/service-accounts/default/token", (s) -> {
|
|
||||||
String response = GceMockUtils.readGoogleInternalJsonResponse(
|
|
||||||
"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token");
|
|
||||||
byte[] responseAsBytes = response.getBytes(StandardCharsets.UTF_8);
|
|
||||||
s.sendResponseHeaders(200, responseAsBytes.length);
|
|
||||||
OutputStream responseBody = s.getResponseBody();
|
|
||||||
responseBody.write(responseAsBytes);
|
|
||||||
responseBody.close();
|
|
||||||
});
|
|
||||||
|
|
||||||
httpsServer.createContext("/compute/v1/projects/testproject/zones/primaryzone/instances", (s) -> {
|
ClusterStateResponse clusterStateResponse = client(masterNode).admin().cluster().prepareState()
|
||||||
Headers headers = s.getResponseHeaders();
|
.setMasterNodeTimeout("1s")
|
||||||
headers.add("Content-Type", "application/json; charset=UTF-8");
|
.clear()
|
||||||
Logger logger = Loggers.getLogger(GceDiscoverTests.class);
|
.setNodes(true)
|
||||||
try {
|
.get();
|
||||||
Path[] files = FileSystemUtils.files(logDir);
|
assertNotNull(clusterStateResponse.getState().nodes().getMasterNodeId());
|
||||||
StringBuilder builder = new StringBuilder("{\"id\": \"dummy\",\"items\":[");
|
|
||||||
int foundFiles = 0;
|
|
||||||
for (int i = 0; i < files.length; i++) {
|
|
||||||
Path resolve = files[i].resolve("transport.ports");
|
|
||||||
if (Files.exists(resolve)) {
|
|
||||||
if (foundFiles++ > 0) {
|
|
||||||
builder.append(",");
|
|
||||||
}
|
|
||||||
List<String> addressses = Files.readAllLines(resolve);
|
|
||||||
Collections.shuffle(addressses, random());
|
|
||||||
logger.debug("addresses for node: [{}] published addresses [{}]", files[i].getFileName(), addressses);
|
|
||||||
builder.append("{\"description\": \"ES Node ").append(files[i].getFileName())
|
|
||||||
.append("\",\"networkInterfaces\": [ {");
|
|
||||||
builder.append("\"networkIP\": \"").append(addressses.get(0)).append("\"}],");
|
|
||||||
builder.append("\"status\" : \"RUNNING\"}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
builder.append("]}");
|
|
||||||
String responseString = builder.toString();
|
|
||||||
final byte[] responseAsBytes = responseString.getBytes(StandardCharsets.UTF_8);
|
|
||||||
s.sendResponseHeaders(200, responseAsBytes.length);
|
|
||||||
OutputStream responseBody = s.getResponseBody();
|
|
||||||
responseBody.write(responseAsBytes);
|
|
||||||
responseBody.close();
|
|
||||||
} catch (Exception e) {
|
|
||||||
//
|
|
||||||
byte[] responseAsBytes = ("{ \"error\" : {\"message\" : \"" + e.toString() + "\" } }").getBytes(StandardCharsets.UTF_8);
|
|
||||||
s.sendResponseHeaders(500, responseAsBytes.length);
|
|
||||||
OutputStream responseBody = s.getResponseBody();
|
|
||||||
responseBody.write(responseAsBytes);
|
|
||||||
responseBody.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// start another node
|
||||||
|
final String secondNode = internalCluster().startNode();
|
||||||
|
registerGceNode(secondNode);
|
||||||
|
clusterStateResponse = client(secondNode).admin().cluster().prepareState()
|
||||||
|
.setMasterNodeTimeout("1s")
|
||||||
|
.clear()
|
||||||
|
.setNodes(true)
|
||||||
|
.setLocal(true)
|
||||||
|
.get();
|
||||||
|
assertNotNull(clusterStateResponse.getState().nodes().getMasterNodeId());
|
||||||
|
|
||||||
});
|
// wait for the cluster to form
|
||||||
httpsServer.start();
|
|
||||||
httpServer.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static SSLContext getSSLContext() throws Exception{
|
|
||||||
char[] passphrase = "keypass".toCharArray();
|
|
||||||
KeyStore ks = KeyStore.getInstance("JKS");
|
|
||||||
try (InputStream stream = GceDiscoverTests.class.getResourceAsStream("/test-node.jks")) {
|
|
||||||
assertNotNull("can't find keystore file", stream);
|
|
||||||
ks.load(stream, passphrase);
|
|
||||||
}
|
|
||||||
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
|
|
||||||
kmf.init(ks, passphrase);
|
|
||||||
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
|
|
||||||
tmf.init(ks);
|
|
||||||
SSLContext ssl = SSLContext.getInstance("TLS");
|
|
||||||
ssl.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
|
|
||||||
return ssl;
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void stopHttpd() throws IOException {
|
|
||||||
for (int i = 0; i < internalCluster().size(); i++) {
|
|
||||||
// shut them all down otherwise we get spammed with connection refused exceptions
|
|
||||||
internalCluster().stopRandomDataNode();
|
|
||||||
}
|
|
||||||
httpsServer.stop(0);
|
|
||||||
httpServer.stop(0);
|
|
||||||
httpsServer = null;
|
|
||||||
httpServer = null;
|
|
||||||
logDir = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testJoin() throws ExecutionException, InterruptedException {
|
|
||||||
// only wait for the cluster to form
|
|
||||||
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(2)).get());
|
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(2)).get());
|
||||||
|
assertNumberOfNodes(2);
|
||||||
|
|
||||||
// add one more node and wait for it to join
|
// add one more node and wait for it to join
|
||||||
internalCluster().startDataOnlyNode();
|
final String thirdNode = internalCluster().startDataOnlyNode();
|
||||||
|
registerGceNode(thirdNode);
|
||||||
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(3)).get());
|
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(3)).get());
|
||||||
|
assertNumberOfNodes(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register an existing node as a GCE node
|
||||||
|
*
|
||||||
|
* @param nodeName the name of the node
|
||||||
|
*/
|
||||||
|
private static void registerGceNode(final String nodeName) {
|
||||||
|
final TransportService transportService = internalCluster().getInstance(TransportService.class, nodeName);
|
||||||
|
assertNotNull(transportService);
|
||||||
|
final DiscoveryNode discoveryNode = transportService.getLocalNode();
|
||||||
|
assertNotNull(discoveryNode);
|
||||||
|
if (nodes.put(discoveryNode.getName(), discoveryNode) != null) {
|
||||||
|
throw new IllegalArgumentException("Node [" + discoveryNode.getName() + "] cannot be registered twice");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Asserts that the cluster nodes info contains an expected number of node
|
||||||
|
*
|
||||||
|
* @param expected the expected number of nodes
|
||||||
|
*/
|
||||||
|
private static void assertNumberOfNodes(final int expected) {
|
||||||
|
assertEquals(expected, client().admin().cluster().prepareNodesInfo().clear().get().getNodes().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test plugin that exposes internal test cluster nodes as if they were real GCE nodes.
|
||||||
|
* Use {@link #registerGceNode(String)} method to expose nodes in the tests.
|
||||||
|
*/
|
||||||
|
public static class TestPlugin extends GceDiscoveryPlugin {
|
||||||
|
|
||||||
|
public TestPlugin(Settings settings) {
|
||||||
|
super(settings);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected GceInstancesService createGceInstancesService() {
|
||||||
|
return new GceInstancesService() {
|
||||||
|
@Override
|
||||||
|
public Collection<Instance> instances() {
|
||||||
|
return Access.doPrivileged(() -> {
|
||||||
|
final List<Instance> instances = new ArrayList<>();
|
||||||
|
|
||||||
|
for (DiscoveryNode discoveryNode : nodes.values()) {
|
||||||
|
Instance instance = new Instance();
|
||||||
|
instance.setName(discoveryNode.getName());
|
||||||
|
instance.setStatus("STARTED");
|
||||||
|
|
||||||
|
NetworkInterface networkInterface = new NetworkInterface();
|
||||||
|
networkInterface.setNetworkIP(discoveryNode.getAddress().toString());
|
||||||
|
instance.setNetworkInterfaces(singletonList(networkInterface));
|
||||||
|
|
||||||
|
instances.add(instance);
|
||||||
|
}
|
||||||
|
|
||||||
|
return instances;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue