diff --git a/plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceInstancesService.java b/plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceInstancesService.java index 4c90fd5c373..7e8fb7cf396 100644 --- a/plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceInstancesService.java +++ b/plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceInstancesService.java @@ -31,7 +31,7 @@ import java.util.Collections; import java.util.List; import java.util.function.Function; -public interface GceInstancesService { +public interface GceInstancesService extends Closeable { /** * GCE API Version: Elasticsearch/GceCloud/1.0 diff --git a/plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceInstancesServiceImpl.java b/plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceInstancesServiceImpl.java index ea5b44d994e..ed0bf07d75c 100644 --- a/plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceInstancesServiceImpl.java +++ b/plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceInstancesServiceImpl.java @@ -19,7 +19,6 @@ package org.elasticsearch.cloud.gce; -import java.io.Closeable; import java.io.IOException; import java.security.GeneralSecurityException; import java.util.ArrayList; @@ -39,7 +38,6 @@ import com.google.api.services.compute.model.Instance; import com.google.api.services.compute.model.InstanceList; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; -import org.elasticsearch.SpecialPermission; import org.elasticsearch.cloud.gce.util.Access; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Setting; @@ -48,7 +46,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.gce.RetryHttpInitializerWrapper; -public class GceInstancesServiceImpl extends AbstractComponent implements GceInstancesService, Closeable { +public class GceInstancesServiceImpl extends AbstractComponent implements GceInstancesService { // all settings just used for testing - not registered by default public static final Setting GCE_VALIDATE_CERTIFICATES = diff --git a/plugins/discovery-gce/src/main/java/org/elasticsearch/plugin/discovery/gce/GceDiscoveryPlugin.java b/plugins/discovery-gce/src/main/java/org/elasticsearch/plugin/discovery/gce/GceDiscoveryPlugin.java index 139416c1e64..04685e38b22 100644 --- a/plugins/discovery-gce/src/main/java/org/elasticsearch/plugin/discovery/gce/GceDiscoveryPlugin.java +++ b/plugins/discovery-gce/src/main/java/org/elasticsearch/plugin/discovery/gce/GceDiscoveryPlugin.java @@ -53,7 +53,7 @@ public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close private final Settings settings; private static final Logger logger = Loggers.getLogger(GceDiscoveryPlugin.class); // stashed when created in order to properly close - private final SetOnce gceInstancesService = new SetOnce<>(); + private final SetOnce gceInstancesService = new SetOnce<>(); static { /* @@ -72,13 +72,16 @@ public class GceDiscoveryPlugin extends Plugin implements DiscoveryPlugin, Close logger.trace("starting gce discovery plugin..."); } - + // overrideable for tests + protected GceInstancesService createGceInstancesService() { + return new GceInstancesServiceImpl(settings); + } @Override public Map> getZenHostsProviders(TransportService transportService, NetworkService networkService) { return Collections.singletonMap(GCE, () -> { - gceInstancesService.set(new GceInstancesServiceImpl(settings)); + gceInstancesService.set(createGceInstancesService()); return new GceUnicastHostsProvider(settings, gceInstancesService.get(), transportService, networkService); }); } diff --git a/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java b/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java index 4b339658f88..3ceacdedcf7 100644 --- a/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java +++ b/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java @@ -19,88 +19,57 @@ package org.elasticsearch.discovery.gce; -import com.sun.net.httpserver.Headers; -import com.sun.net.httpserver.HttpServer; -import com.sun.net.httpserver.HttpsConfigurator; -import com.sun.net.httpserver.HttpsServer; -import org.apache.logging.log4j.Logger; -import org.elasticsearch.cloud.gce.GceInstancesServiceImpl; -import org.elasticsearch.cloud.gce.GceMetadataService; -import org.elasticsearch.common.SuppressForbidden; -import org.elasticsearch.common.io.FileSystemUtils; -import org.elasticsearch.common.logging.Loggers; -import org.elasticsearch.common.settings.Setting; +import com.google.api.services.compute.model.Instance; +import com.google.api.services.compute.model.NetworkInterface; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; +import org.elasticsearch.cloud.gce.GceInstancesService; +import org.elasticsearch.cloud.gce.util.Access; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.mocksocket.MockHttpServer; +import org.elasticsearch.discovery.zen.ZenDiscovery; import org.elasticsearch.plugin.discovery.gce.GceDiscoveryPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.elasticsearch.transport.TransportService; +import org.junit.After; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.TrustManagerFactory; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.security.KeyStore; -import java.util.Arrays; +import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; -import java.util.concurrent.ExecutionException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import static java.util.Collections.singletonList; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; -@ESIntegTestCase.ClusterScope(supportsDedicatedMasters = false, numDataNodes = 2, numClientNodes = 0) -@SuppressForbidden(reason = "use http server") -// TODO this should be a IT but currently all ITs in this project run against a real cluster +@ESIntegTestCase.ClusterScope(supportsDedicatedMasters = false, numDataNodes = 0, numClientNodes = 0) public class GceDiscoverTests extends ESIntegTestCase { - public static class TestPlugin extends Plugin { - @Override - public List> getSettings() { - return Arrays.asList(GceMetadataService.GCE_HOST, GceInstancesServiceImpl.GCE_ROOT_URL, - GceInstancesServiceImpl.GCE_VALIDATE_CERTIFICATES); - } - } + /** Holds a list of the current discovery nodes started in tests **/ + private static final Map nodes = new ConcurrentHashMap<>(); - private static HttpsServer httpsServer; - private static HttpServer httpServer; - private static Path logDir; + @After + public void clearGceNodes() { + nodes.clear(); + } @Override protected Collection> nodePlugins() { - return Arrays.asList(GceDiscoveryPlugin.class, TestPlugin.class); + return singletonList(TestPlugin.class); } @Override protected Settings nodeSettings(int nodeOrdinal) { - Path resolve = logDir.resolve(Integer.toString(nodeOrdinal)); - try { - Files.createDirectory(resolve); - } catch (IOException e) { - throw new RuntimeException(e); - } - return Settings.builder().put(super.nodeSettings(nodeOrdinal)) - .put("discovery.zen.hosts_provider", "gce") - .put("path.logs", resolve) - .put("transport.tcp.port", 0) - .put("node.portsfile", "true") - .put("cloud.gce.project_id", "testproject") - .put("cloud.gce.zone", "primaryzone") - .put("cloud.gce.host", "http://" + httpServer.getAddress().getHostName() + ":" + httpServer.getAddress().getPort()) - .put("cloud.gce.root_url", "https://" + httpsServer.getAddress().getHostName() + - ":" + httpsServer.getAddress().getPort()) - // this is annoying but by default the client pulls a static list of trusted CAs - .put("cloud.gce.validate_certificates", false) + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put("discovery.zen.hosts_provider", "gce") + .put("cloud.gce.project_id", "test") + .put("cloud.gce.zone", "test") + // Make the test run faster + .put(ZenDiscovery.JOIN_TIMEOUT_SETTING.getKey(), "1s") + .put(ZenDiscovery.PING_TIMEOUT_SETTING.getKey(), "500ms") .build(); } @@ -109,102 +78,102 @@ public class GceDiscoverTests extends ESIntegTestCase { return false; } - @BeforeClass - public static void startHttpd() throws Exception { - logDir = createTempDir(); - SSLContext sslContext = getSSLContext(); - httpsServer = MockHttpServer.createHttps(new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), 0), 0); - httpServer = MockHttpServer.createHttp(new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), 0), 0); - httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext)); - httpServer.createContext("/computeMetadata/v1/instance/service-accounts/default/token", (s) -> { - String response = GceMockUtils.readGoogleInternalJsonResponse( - "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token"); - byte[] responseAsBytes = response.getBytes(StandardCharsets.UTF_8); - s.sendResponseHeaders(200, responseAsBytes.length); - OutputStream responseBody = s.getResponseBody(); - responseBody.write(responseAsBytes); - responseBody.close(); - }); + public void testJoin() { + // start master node + final String masterNode = internalCluster().startMasterOnlyNode(); + registerGceNode(masterNode); - httpsServer.createContext("/compute/v1/projects/testproject/zones/primaryzone/instances", (s) -> { - Headers headers = s.getResponseHeaders(); - headers.add("Content-Type", "application/json; charset=UTF-8"); - Logger logger = Loggers.getLogger(GceDiscoverTests.class); - try { - Path[] files = FileSystemUtils.files(logDir); - StringBuilder builder = new StringBuilder("{\"id\": \"dummy\",\"items\":["); - int foundFiles = 0; - for (int i = 0; i < files.length; i++) { - Path resolve = files[i].resolve("transport.ports"); - if (Files.exists(resolve)) { - if (foundFiles++ > 0) { - builder.append(","); - } - List addressses = Files.readAllLines(resolve); - Collections.shuffle(addressses, random()); - logger.debug("addresses for node: [{}] published addresses [{}]", files[i].getFileName(), addressses); - builder.append("{\"description\": \"ES Node ").append(files[i].getFileName()) - .append("\",\"networkInterfaces\": [ {"); - builder.append("\"networkIP\": \"").append(addressses.get(0)).append("\"}],"); - builder.append("\"status\" : \"RUNNING\"}"); - } - } - builder.append("]}"); - String responseString = builder.toString(); - final byte[] responseAsBytes = responseString.getBytes(StandardCharsets.UTF_8); - s.sendResponseHeaders(200, responseAsBytes.length); - OutputStream responseBody = s.getResponseBody(); - responseBody.write(responseAsBytes); - responseBody.close(); - } catch (Exception e) { - // - byte[] responseAsBytes = ("{ \"error\" : {\"message\" : \"" + e.toString() + "\" } }").getBytes(StandardCharsets.UTF_8); - s.sendResponseHeaders(500, responseAsBytes.length); - OutputStream responseBody = s.getResponseBody(); - responseBody.write(responseAsBytes); - responseBody.close(); - } + ClusterStateResponse clusterStateResponse = client(masterNode).admin().cluster().prepareState() + .setMasterNodeTimeout("1s") + .clear() + .setNodes(true) + .get(); + assertNotNull(clusterStateResponse.getState().nodes().getMasterNodeId()); + // start another node + final String secondNode = internalCluster().startNode(); + registerGceNode(secondNode); + clusterStateResponse = client(secondNode).admin().cluster().prepareState() + .setMasterNodeTimeout("1s") + .clear() + .setNodes(true) + .setLocal(true) + .get(); + assertNotNull(clusterStateResponse.getState().nodes().getMasterNodeId()); - }); - httpsServer.start(); - httpServer.start(); - } - - private static SSLContext getSSLContext() throws Exception{ - char[] passphrase = "keypass".toCharArray(); - KeyStore ks = KeyStore.getInstance("JKS"); - try (InputStream stream = GceDiscoverTests.class.getResourceAsStream("/test-node.jks")) { - assertNotNull("can't find keystore file", stream); - ks.load(stream, passphrase); - } - KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); - kmf.init(ks, passphrase); - TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); - tmf.init(ks); - SSLContext ssl = SSLContext.getInstance("TLS"); - ssl.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); - return ssl; - } - - @AfterClass - public static void stopHttpd() throws IOException { - for (int i = 0; i < internalCluster().size(); i++) { - // shut them all down otherwise we get spammed with connection refused exceptions - internalCluster().stopRandomDataNode(); - } - httpsServer.stop(0); - httpServer.stop(0); - httpsServer = null; - httpServer = null; - logDir = null; - } - - public void testJoin() throws ExecutionException, InterruptedException { - // only wait for the cluster to form + // wait for the cluster to form assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(2)).get()); + assertNumberOfNodes(2); + // add one more node and wait for it to join - internalCluster().startDataOnlyNode(); + final String thirdNode = internalCluster().startDataOnlyNode(); + registerGceNode(thirdNode); assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(3)).get()); + assertNumberOfNodes(3); + } + + /** + * Register an existing node as a GCE node + * + * @param nodeName the name of the node + */ + private static void registerGceNode(final String nodeName) { + final TransportService transportService = internalCluster().getInstance(TransportService.class, nodeName); + assertNotNull(transportService); + final DiscoveryNode discoveryNode = transportService.getLocalNode(); + assertNotNull(discoveryNode); + if (nodes.put(discoveryNode.getName(), discoveryNode) != null) { + throw new IllegalArgumentException("Node [" + discoveryNode.getName() + "] cannot be registered twice"); + } + } + + /** + * Asserts that the cluster nodes info contains an expected number of node + * + * @param expected the expected number of nodes + */ + private static void assertNumberOfNodes(final int expected) { + assertEquals(expected, client().admin().cluster().prepareNodesInfo().clear().get().getNodes().size()); + } + + /** + * Test plugin that exposes internal test cluster nodes as if they were real GCE nodes. + * Use {@link #registerGceNode(String)} method to expose nodes in the tests. + */ + public static class TestPlugin extends GceDiscoveryPlugin { + + public TestPlugin(Settings settings) { + super(settings); + } + + @Override + protected GceInstancesService createGceInstancesService() { + return new GceInstancesService() { + @Override + public Collection instances() { + return Access.doPrivileged(() -> { + final List instances = new ArrayList<>(); + + for (DiscoveryNode discoveryNode : nodes.values()) { + Instance instance = new Instance(); + instance.setName(discoveryNode.getName()); + instance.setStatus("STARTED"); + + NetworkInterface networkInterface = new NetworkInterface(); + networkInterface.setNetworkIP(discoveryNode.getAddress().toString()); + instance.setNetworkInterfaces(singletonList(networkInterface)); + + instances.add(instance); + } + + return instances; + }); + } + + @Override + public void close() throws IOException { + } + }; + } } }