diff --git a/plugins/discovery-gce/build.gradle b/plugins/discovery-gce/build.gradle index b888b817679..bbd2221d8e0 100644 --- a/plugins/discovery-gce/build.gradle +++ b/plugins/discovery-gce/build.gradle @@ -1,3 +1,4 @@ +import org.elasticsearch.gradle.LoggedExec esplugin { description 'The Google Compute Engine (GCE) Discovery plugin allows to use GCE API for the unicast discovery mechanism.' @@ -21,6 +22,36 @@ dependencies { compile "commons-codec:commons-codec:${versions.commonscodec}" } + +// needed to be consistent with ssl host checking +String host = InetAddress.getLoopbackAddress().getHostAddress(); + +// location of keystore and files to generate it +File keystore = new File(project.buildDir, 'keystore/test-node.jks') + +// generate the keystore +task createKey(type: LoggedExec) { + doFirst { + project.delete(keystore.parentFile) + keystore.parentFile.mkdirs() + } + executable = 'keytool' + standardInput = new ByteArrayInputStream('FirstName LastName\nUnit\nOrganization\nCity\nState\nNL\nyes\n\n'.getBytes('UTF-8')) + args '-genkey', + '-alias', 'test-node', + '-keystore', keystore, + '-keyalg', 'RSA', + '-keysize', '2048', + '-validity', '712', + '-dname', 'CN=' + host, + '-keypass', 'keypass', + '-storepass', 'keypass' +} + +// add keystore to test classpath: it expects it there +sourceSets.test.resources.srcDir(keystore.parentFile) +processTestResources.dependsOn(createKey) + dependencyLicenses { mapping from: /google-.*/, to: 'google' } diff --git a/plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeService.java b/plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeService.java index a352bc02418..ce5154b3436 100644 --- a/plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeService.java +++ b/plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeService.java @@ -22,10 +22,8 @@ package org.elasticsearch.cloud.gce; import com.google.api.services.compute.model.Instance; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.settings.Setting; -import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import java.util.Arrays; import java.io.IOException; import java.util.Collection; import java.util.Collections; diff --git a/plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeServiceImpl.java b/plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeServiceImpl.java index cfddaf2548f..d9033b602d2 100644 --- a/plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeServiceImpl.java +++ b/plugins/discovery-gce/src/main/java/org/elasticsearch/cloud/gce/GceComputeServiceImpl.java @@ -25,6 +25,7 @@ import com.google.api.client.http.GenericUrl; import com.google.api.client.http.HttpHeaders; import com.google.api.client.http.HttpResponse; import com.google.api.client.http.HttpTransport; +import com.google.api.client.http.javanet.NetHttpTransport; import com.google.api.client.json.JsonFactory; import com.google.api.client.json.jackson2.JacksonFactory; import com.google.api.services.compute.Compute; @@ -36,12 +37,14 @@ import org.elasticsearch.cloud.gce.network.GceNameResolver; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.network.NetworkService; +import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.discovery.gce.RetryHttpInitializerWrapper; import java.io.IOException; import java.net.URL; +import java.nio.file.Files; import java.security.AccessController; import java.security.GeneralSecurityException; import java.security.PrivilegedAction; @@ -51,18 +54,29 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.function.Function; public class GceComputeServiceImpl extends AbstractLifecycleComponent implements GceComputeService { + // all settings just used for testing - not registered by default + public static final Setting GCE_VALIDATE_CERTIFICATES = + Setting.boolSetting("cloud.gce.validate_certificates", true, false, Setting.Scope.CLUSTER); + public static final Setting GCE_HOST = + new Setting<>("cloud.gce.host", "http://metadata.google.internal", Function.identity(), false, Setting.Scope.CLUSTER); + public static final Setting GCE_ROOT_URL = + new Setting<>("cloud.gce.root_url", "https://www.googleapis.com", Function.identity(), false, Setting.Scope.CLUSTER); + private final String project; private final List zones; - // Forcing Google Token API URL as set in GCE SDK to // http://metadata/computeMetadata/v1/instance/service-accounts/default/token // See https://developers.google.com/compute/docs/metadata#metadataserver - public static final String GCE_METADATA_URL = "http://metadata.google.internal/computeMetadata/v1/instance"; - public static final String TOKEN_SERVER_ENCODED_URL = GCE_METADATA_URL + "/service-accounts/default/token"; + private final String gceHost; + private final String metaDataUrl; + private final String tokenServerEncodedUrl; + private String gceRootUrl; + @Override public Collection instances() { @@ -85,7 +99,7 @@ public class GceComputeServiceImpl extends AbstractLifecycleComponentemptyList() : instanceList.getItems(); } catch (PrivilegedActionException e) { - logger.warn("Problem fetching instance list for zone {}", zoneId); + logger.warn("Problem fetching instance list for zone {}", e, zoneId); logger.debug("Full exception:", e); // assist type inference return Collections.emptyList(); @@ -104,7 +118,7 @@ public class GceComputeServiceImpl extends AbstractLifecycleComponent read mock file from [{}]", mockFileName); URL resource = GceComputeServiceMock.class.getResource(mockFileName); if (resource == null) { throw new IOException("can't read [" + url + "] in src/test/resources/org/elasticsearch/discovery/gce"); @@ -106,7 +107,6 @@ public class GceComputeServiceMock extends GceComputeServiceImpl { } }); String response = sb.toString(); - logger.trace("{}", response); return response; } } diff --git a/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java b/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java new file mode 100644 index 00000000000..dbedbe1a6a9 --- /dev/null +++ b/plugins/discovery-gce/src/test/java/org/elasticsearch/discovery/gce/GceDiscoverTests.java @@ -0,0 +1,215 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.discovery.gce; + +import com.sun.net.httpserver.Headers; +import com.sun.net.httpserver.HttpServer; +import com.sun.net.httpserver.HttpsConfigurator; +import com.sun.net.httpserver.HttpsServer; +import org.elasticsearch.cloud.gce.GceComputeServiceImpl; +import org.elasticsearch.common.SuppressForbidden; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsModule; +import org.elasticsearch.plugin.discovery.gce.GceDiscoveryPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.TrustManagerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.security.KeyStore; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout; + + +@ESIntegTestCase.SuppressLocalMode +@ESIntegTestCase.ClusterScope(numDataNodes = 2, numClientNodes = 0) +@SuppressForbidden(reason = "use http server") +// TODO this should be a IT but currently all ITs in this project run against a real cluster +public class GceDiscoverTests extends ESIntegTestCase { + + public static class TestPlugin extends Plugin { + + @Override + public String name() { + return "GceDiscoverTests"; + } + + @Override + public String description() { + return "GceDiscoverTests"; + } + + public void onModule(SettingsModule module) { + module.registerSetting(GceComputeServiceImpl.GCE_HOST); + module.registerSetting(GceComputeServiceImpl.GCE_ROOT_URL); + module.registerSetting(GceComputeServiceImpl.GCE_VALIDATE_CERTIFICATES); + } + } + + private static HttpsServer httpsServer; + private static HttpServer httpServer; + private static Path logDir; + + @Override + protected Collection> nodePlugins() { + return pluginList(GceDiscoveryPlugin.class, TestPlugin.class); + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + Path resolve = logDir.resolve(Integer.toString(nodeOrdinal)); + try { + Files.createDirectory(resolve); + } catch (IOException e) { + throw new RuntimeException(e); + } + return Settings.builder().put(super.nodeSettings(nodeOrdinal)) + .put("discovery.type", "gce") + .put("path.logs", resolve) + .put("transport.tcp.port", 0) + .put("node.portsfile", "true") + .put("cloud.gce.project_id", "testproject") + .put("cloud.gce.zone", "primaryzone") + .put("discovery.initial_state_timeout", "1s") + .put("cloud.gce.host", "http://" + httpServer.getAddress().getHostName() + ":" + httpServer.getAddress().getPort()) + .put("cloud.gce.root_url", "https://" + httpsServer.getAddress().getHostName() + + ":" + httpsServer.getAddress().getPort()) + // this is annoying but by default the client pulls a static list of trusted CAs + .put("cloud.gce.validate_certificates", false) + .build(); + } + + @BeforeClass + public static void startHttpd() throws Exception { + logDir = createTempDir(); + SSLContext sslContext = getSSLContext(); + httpsServer = HttpsServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), 0), 0); + httpServer = HttpServer.create(new InetSocketAddress(InetAddress.getLoopbackAddress().getHostAddress(), 0), 0); + httpsServer.setHttpsConfigurator(new HttpsConfigurator(sslContext)); + httpServer.createContext("/computeMetadata/v1/instance/service-accounts/default/token", (s) -> { + String response = GceComputeServiceMock.readGoogleInternalJsonResponse( + "http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token"); + byte[] responseAsBytes = response.getBytes(StandardCharsets.UTF_8); + s.sendResponseHeaders(200, responseAsBytes.length); + OutputStream responseBody = s.getResponseBody(); + responseBody.write(responseAsBytes); + responseBody.close(); + }); + + httpsServer.createContext("/compute/v1/projects/testproject/zones/primaryzone/instances", (s) -> { + Headers headers = s.getResponseHeaders(); + headers.add("Content-Type", "application/json; charset=UTF-8"); + ESLogger logger = Loggers.getLogger(GceDiscoverTests.class); + try { + Path[] files = FileSystemUtils.files(logDir); + StringBuilder builder = new StringBuilder("{\"id\": \"dummy\",\"items\":["); + int foundFiles = 0; + for (int i = 0; i < files.length; i++) { + Path resolve = files[i].resolve("transport.ports"); + if (Files.exists(resolve)) { + if (foundFiles++ > 0) { + builder.append(","); + } + List addressses = Files.readAllLines(resolve); + Collections.shuffle(addressses, random()); + logger.debug("addresses for node: [{}] published addresses [{}]", files[i].getFileName(), addressses); + builder.append("{\"description\": \"ES Node ").append(files[i].getFileName()) + .append("\",\"networkInterfaces\": [ {"); + builder.append("\"networkIP\": \"").append(addressses.get(0)).append("\"}],"); + builder.append("\"status\" : \"RUNNING\"}"); + } + } + builder.append("]}"); + String responseString = builder.toString(); + final byte[] responseAsBytes = responseString.getBytes(StandardCharsets.UTF_8); + s.sendResponseHeaders(200, responseAsBytes.length); + OutputStream responseBody = s.getResponseBody(); + responseBody.write(responseAsBytes); + responseBody.close(); + } catch (Exception e) { + // + byte[] responseAsBytes = ("{ \"error\" : {\"message\" : \"" + e.toString() + "\" } }").getBytes(StandardCharsets.UTF_8); + s.sendResponseHeaders(500, responseAsBytes.length); + OutputStream responseBody = s.getResponseBody(); + responseBody.write(responseAsBytes); + responseBody.close(); + } + + + }); + httpsServer.start(); + httpServer.start(); + } + + private static SSLContext getSSLContext() throws Exception{ + char[] passphrase = "keypass".toCharArray(); + KeyStore ks = KeyStore.getInstance("JKS"); + try (InputStream stream = GceDiscoverTests.class.getResourceAsStream("/test-node.jks")) { + assertNotNull("can't find keystore file", stream); + ks.load(stream, passphrase); + } + KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509"); + kmf.init(ks, passphrase); + TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509"); + tmf.init(ks); + SSLContext ssl = SSLContext.getInstance("TLS"); + ssl.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + return ssl; + } + + @AfterClass + public static void stopHttpd() throws IOException { + for (int i = 0; i < internalCluster().size(); i++) { + // shut them all down otherwise we get spammed with connection refused exceptions + internalCluster().stopRandomDataNode(); + } + httpsServer.stop(0); + httpServer.stop(0); + httpsServer = null; + httpServer = null; + logDir = null; + } + + public void testJoin() throws ExecutionException, InterruptedException { + // only wait for the cluster to form + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(2)).get()); + // add one more node and wait for it to join + internalCluster().startDataOnlyNodeAsync().get(); + assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(3)).get()); + } +}