Add setFactory permission to GceDiscoveryPlugin
This commit adds a missing permission and a simple test that ensures we discover other nodes via a mock http endpoint. Closes #16485
This commit is contained in:
parent
7140d184ab
commit
ecca717339
|
@ -22,10 +22,8 @@ package org.elasticsearch.cloud.gce;
|
|||
import com.google.api.services.compute.model.Instance;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
|
|
@ -25,6 +25,7 @@ import com.google.api.client.http.GenericUrl;
|
|||
import com.google.api.client.http.HttpHeaders;
|
||||
import com.google.api.client.http.HttpResponse;
|
||||
import com.google.api.client.http.HttpTransport;
|
||||
import com.google.api.client.http.javanet.NetHttpTransport;
|
||||
import com.google.api.client.json.JsonFactory;
|
||||
import com.google.api.client.json.jackson2.JacksonFactory;
|
||||
import com.google.api.services.compute.Compute;
|
||||
|
@ -36,12 +37,14 @@ import org.elasticsearch.cloud.gce.network.GceNameResolver;
|
|||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.network.NetworkService;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.discovery.gce.RetryHttpInitializerWrapper;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.nio.file.Files;
|
||||
import java.security.AccessController;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.security.PrivilegedAction;
|
||||
|
@ -51,18 +54,29 @@ import java.util.ArrayList;
|
|||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
public class GceComputeServiceImpl extends AbstractLifecycleComponent<GceComputeService>
|
||||
implements GceComputeService {
|
||||
|
||||
// all settings just used for testing - not registered by default
|
||||
public static final Setting<Boolean> GCE_VALIDATE_CERTIFICATES =
|
||||
Setting.boolSetting("cloud.gce.validate_certificates", true, false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<String> GCE_HOST =
|
||||
new Setting<>("cloud.gce.host", "http://metadata.google.internal", Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<String> GCE_ROOT_URL =
|
||||
new Setting<>("cloud.gce.root_url", "https://www.googleapis.com", Function.identity(), false, Setting.Scope.CLUSTER);
|
||||
|
||||
private final String project;
|
||||
private final List<String> zones;
|
||||
|
||||
// Forcing Google Token API URL as set in GCE SDK to
|
||||
// http://metadata/computeMetadata/v1/instance/service-accounts/default/token
|
||||
// See https://developers.google.com/compute/docs/metadata#metadataserver
|
||||
public static final String GCE_METADATA_URL = "http://metadata.google.internal/computeMetadata/v1/instance";
|
||||
public static final String TOKEN_SERVER_ENCODED_URL = GCE_METADATA_URL + "/service-accounts/default/token";
|
||||
private final String gceHost;
|
||||
private final String metaDataUrl;
|
||||
private final String tokenServerEncodedUrl;
|
||||
private String gceRootUrl;
|
||||
|
||||
|
||||
@Override
|
||||
public Collection<Instance> instances() {
|
||||
|
@ -85,7 +99,7 @@ public class GceComputeServiceImpl extends AbstractLifecycleComponent<GceCompute
|
|||
// assist type inference
|
||||
return instanceList.isEmpty() ? Collections.<Instance>emptyList() : instanceList.getItems();
|
||||
} catch (PrivilegedActionException e) {
|
||||
logger.warn("Problem fetching instance list for zone {}", zoneId);
|
||||
logger.warn("Problem fetching instance list for zone {}", e, zoneId);
|
||||
logger.debug("Full exception:", e);
|
||||
// assist type inference
|
||||
return Collections.<Instance>emptyList();
|
||||
|
@ -104,7 +118,7 @@ public class GceComputeServiceImpl extends AbstractLifecycleComponent<GceCompute
|
|||
|
||||
@Override
|
||||
public String metadata(String metadataPath) throws IOException {
|
||||
String urlMetadataNetwork = GCE_METADATA_URL + "/" + metadataPath;
|
||||
String urlMetadataNetwork = this.metaDataUrl + "/" + metadataPath;
|
||||
logger.debug("get metadata from [{}]", urlMetadataNetwork);
|
||||
final URL url = new URL(urlMetadataNetwork);
|
||||
HttpHeaders headers;
|
||||
|
@ -153,17 +167,28 @@ public class GceComputeServiceImpl extends AbstractLifecycleComponent<GceCompute
|
|||
/** Global instance of the JSON factory. */
|
||||
private JsonFactory gceJsonFactory;
|
||||
|
||||
private final boolean validateCerts;
|
||||
@Inject
|
||||
public GceComputeServiceImpl(Settings settings, NetworkService networkService) {
|
||||
super(settings);
|
||||
this.project = PROJECT_SETTING.get(settings);
|
||||
this.zones = ZONE_SETTING.get(settings);
|
||||
this.gceHost = GCE_HOST.get(settings);
|
||||
this.metaDataUrl = gceHost + "/computeMetadata/v1/instance";
|
||||
this.gceRootUrl = GCE_ROOT_URL.get(settings);
|
||||
tokenServerEncodedUrl = metaDataUrl + "/service-accounts/default/token";
|
||||
this.validateCerts = GCE_VALIDATE_CERTIFICATES.get(settings);
|
||||
networkService.addCustomNameResolver(new GceNameResolver(settings, this));
|
||||
}
|
||||
|
||||
protected synchronized HttpTransport getGceHttpTransport() throws GeneralSecurityException, IOException {
|
||||
if (gceHttpTransport == null) {
|
||||
gceHttpTransport = GoogleNetHttpTransport.newTrustedTransport();
|
||||
if (validateCerts) {
|
||||
gceHttpTransport = GoogleNetHttpTransport.newTrustedTransport();
|
||||
} else {
|
||||
// this is only used for testing - alternative we could use the defaul keystore but this requires special configs too..
|
||||
gceHttpTransport = new NetHttpTransport.Builder().doNotValidateCertificate().build();
|
||||
}
|
||||
}
|
||||
return gceHttpTransport;
|
||||
}
|
||||
|
@ -183,7 +208,7 @@ public class GceComputeServiceImpl extends AbstractLifecycleComponent<GceCompute
|
|||
|
||||
logger.info("starting GCE discovery service");
|
||||
ComputeCredential credential = new ComputeCredential.Builder(getGceHttpTransport(), gceJsonFactory)
|
||||
.setTokenServerEncodedUrl(TOKEN_SERVER_ENCODED_URL)
|
||||
.setTokenServerEncodedUrl(this.tokenServerEncodedUrl)
|
||||
.build();
|
||||
|
||||
// hack around code messiness in GCE code
|
||||
|
@ -205,7 +230,9 @@ public class GceComputeServiceImpl extends AbstractLifecycleComponent<GceCompute
|
|||
refreshInterval = TimeValue.timeValueSeconds(credential.getExpiresInSeconds() - 1);
|
||||
}
|
||||
|
||||
Compute.Builder builder = new Compute.Builder(getGceHttpTransport(), gceJsonFactory, null).setApplicationName(VERSION);
|
||||
|
||||
Compute.Builder builder = new Compute.Builder(getGceHttpTransport(), gceJsonFactory, null).setApplicationName(VERSION)
|
||||
.setRootUrl(gceRootUrl);
|
||||
|
||||
if (RETRY_SETTING.exists(settings)) {
|
||||
TimeValue maxWait = MAX_WAIT_SETTING.get(settings);
|
||||
|
|
|
@ -20,5 +20,6 @@
|
|||
grant {
|
||||
// needed because of problems in gce
|
||||
permission java.lang.RuntimePermission "accessDeclaredMembers";
|
||||
permission java.lang.RuntimePermission "setFactory";
|
||||
permission java.lang.reflect.ReflectPermission "suppressAccessChecks";
|
||||
};
|
||||
|
|
|
@ -55,6 +55,8 @@ public class GceComputeServiceMock extends GceComputeServiceImpl {
|
|||
return this.mockHttpTransport;
|
||||
}
|
||||
|
||||
public static final String GCE_METADATA_URL = "http://metadata.google.internal/computeMetadata/v1/instance";
|
||||
|
||||
protected HttpTransport configureMock() {
|
||||
return new MockHttpTransport() {
|
||||
@Override
|
||||
|
@ -80,19 +82,18 @@ public class GceComputeServiceMock extends GceComputeServiceImpl {
|
|||
};
|
||||
}
|
||||
|
||||
private String readGoogleInternalJsonResponse(String url) throws IOException {
|
||||
public static String readGoogleInternalJsonResponse(String url) throws IOException {
|
||||
return readJsonResponse(url, "http://metadata.google.internal/");
|
||||
}
|
||||
|
||||
private String readGoogleApiJsonResponse(String url) throws IOException {
|
||||
public static String readGoogleApiJsonResponse(String url) throws IOException {
|
||||
return readJsonResponse(url, "https://www.googleapis.com/");
|
||||
}
|
||||
|
||||
private String readJsonResponse(String url, String urlRoot) throws IOException {
|
||||
private static String readJsonResponse(String url, String urlRoot) throws IOException {
|
||||
// We extract from the url the mock file path we want to use
|
||||
String mockFileName = Strings.replace(url, urlRoot, "");
|
||||
|
||||
logger.debug("--> read mock file from [{}]", mockFileName);
|
||||
URL resource = GceComputeServiceMock.class.getResource(mockFileName);
|
||||
if (resource == null) {
|
||||
throw new IOException("can't read [" + url + "] in src/test/resources/org/elasticsearch/discovery/gce");
|
||||
|
@ -106,7 +107,6 @@ public class GceComputeServiceMock extends GceComputeServiceImpl {
|
|||
}
|
||||
});
|
||||
String response = sb.toString();
|
||||
logger.trace("{}", response);
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,209 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.discovery.gce;
|
||||
|
||||
import com.sun.net.httpserver.Headers;
|
||||
import com.sun.net.httpserver.HttpServer;
|
||||
import com.sun.net.httpserver.HttpsConfigurator;
|
||||
import com.sun.net.httpserver.HttpsServer;
|
||||
import org.elasticsearch.cloud.gce.GceComputeServiceImpl;
|
||||
import org.elasticsearch.common.SuppressForbidden;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.Loggers;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsModule;
|
||||
import org.elasticsearch.plugin.discovery.gce.GceDiscoveryPlugin;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import javax.net.ssl.KeyManagerFactory;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.TrustManagerFactory;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.security.KeyStore;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
|
||||
|
||||
|
||||
@ESIntegTestCase.SuppressLocalMode
|
||||
@ESIntegTestCase.ClusterScope(numDataNodes = 2, numClientNodes = 0)
|
||||
@SuppressForbidden(reason = "use http server")
|
||||
// TODO this should be a IT but currently all ITs in this project run against a real cluster
|
||||
public class GceDiscoverTests extends ESIntegTestCase {
|
||||
|
||||
public static class TestPlugin extends Plugin {
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return "GceDiscoverTests";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return "GceDiscoverTests";
|
||||
}
|
||||
|
||||
public void onModule(SettingsModule module) {
|
||||
module.registerSetting(GceComputeServiceImpl.GCE_HOST);
|
||||
module.registerSetting(GceComputeServiceImpl.GCE_ROOT_URL);
|
||||
module.registerSetting(GceComputeServiceImpl.GCE_VALIDATE_CERTIFICATES);
|
||||
}
|
||||
}
|
||||
|
||||
private static HttpsServer httpsServer;
|
||||
private static HttpServer httpServer;
|
||||
private static Path logDir;
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return pluginList(GceDiscoveryPlugin.class, TestPlugin.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Settings nodeSettings(int nodeOrdinal) {
|
||||
Path resolve = logDir.resolve(Integer.toString(nodeOrdinal));
|
||||
try {
|
||||
Files.createDirectory(resolve);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
|
||||
.put("discovery.type", "gce")
|
||||
.put("path.logs", resolve)
|
||||
.put("transport.tcp.port", 0)
|
||||
.put("node.portsfile", "true")
|
||||
.put("cloud.gce.project_id", "testproject")
|
||||
.put("cloud.gce.zone", "primaryzone")
|
||||
.put("discovery.initial_state_timeout", "1s")
|
||||
.put("cloud.gce.host", "http://" + httpServer.getAddress().getHostName() + ":" + httpServer.getAddress().getPort())
|
||||
.put("cloud.gce.root_url", "https://" + httpsServer.getAddress().getHostName() +
|
||||
":" + httpsServer.getAddress().getPort())
|
||||
// this is annoying but by default the client pulls a static list of trusted CAs
|
||||
.put("cloud.gce.validate_certificates", false)
|
||||
.build();
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
public static void startHttpd() throws Exception {
|
||||
logDir = createTempDir();
|
||||
httpsServer = HttpsServer.create(new InetSocketAddress("127.0.0.1", 0), 0);
|
||||
httpServer = HttpServer.create(new InetSocketAddress("127.0.0.1", 0), 0);
|
||||
httpsServer.setHttpsConfigurator(new HttpsConfigurator(getSSLContext()));
|
||||
httpServer.createContext("/computeMetadata/v1/instance/service-accounts/default/token", (s) -> {
|
||||
String response = GceComputeServiceMock.readGoogleInternalJsonResponse(
|
||||
"http://metadata.google.internal/computeMetadata/v1/instance/service-accounts/default/token");
|
||||
byte[] responseAsBytes = response.getBytes(StandardCharsets.UTF_8);
|
||||
s.sendResponseHeaders(200, responseAsBytes.length);
|
||||
OutputStream responseBody = s.getResponseBody();
|
||||
responseBody.write(responseAsBytes);
|
||||
responseBody.close();
|
||||
});
|
||||
|
||||
httpsServer.createContext("/compute/v1/projects/testproject/zones/primaryzone/instances", (s) -> {
|
||||
Headers headers = s.getResponseHeaders();
|
||||
headers.add("Content-Type", "application/json; charset=UTF-8");
|
||||
ESLogger logger = Loggers.getLogger(GceDiscoverTests.class);
|
||||
try {
|
||||
Path[] files = FileSystemUtils.files(logDir);
|
||||
StringBuilder builder = new StringBuilder("{\"id\": \"dummy\",\"items\":[");
|
||||
int foundFiles = 0;
|
||||
for (int i = 0; i < files.length; i++) {
|
||||
Path resolve = files[i].resolve("transport.ports");
|
||||
if (Files.exists(resolve)) {
|
||||
if (foundFiles++ > 0) {
|
||||
builder.append(",");
|
||||
}
|
||||
List<String> addressses = Files.readAllLines(resolve);
|
||||
Collections.shuffle(addressses, random());
|
||||
logger.debug("addresses for node: [{}] published addresses [{}]", files[i].getFileName(), addressses);
|
||||
builder.append("{\"description\": \"ES Node ").append(files[i].getFileName())
|
||||
.append("\",\"networkInterfaces\": [ {");
|
||||
builder.append("\"networkIP\": \"").append(addressses.get(0)).append("\"}],");
|
||||
builder.append("\"status\" : \"RUNNING\"}");
|
||||
}
|
||||
}
|
||||
builder.append("]}");
|
||||
String responseString = builder.toString();
|
||||
final byte[] responseAsBytes = responseString.getBytes(StandardCharsets.UTF_8);
|
||||
s.sendResponseHeaders(200, responseAsBytes.length);
|
||||
OutputStream responseBody = s.getResponseBody();
|
||||
responseBody.write(responseAsBytes);
|
||||
responseBody.close();
|
||||
} catch (Exception e) {
|
||||
//
|
||||
byte[] responseAsBytes = ("{ \"error\" : {\"message\" : \"" + e.toString() + "\" } }").getBytes(StandardCharsets.UTF_8);
|
||||
s.sendResponseHeaders(500, responseAsBytes.length);
|
||||
OutputStream responseBody = s.getResponseBody();
|
||||
responseBody.write(responseAsBytes);
|
||||
responseBody.close();
|
||||
}
|
||||
|
||||
|
||||
});
|
||||
httpsServer.start();
|
||||
httpServer.start();
|
||||
}
|
||||
|
||||
private static SSLContext getSSLContext() throws Exception{
|
||||
char[] passphrase = "passphrase".toCharArray();
|
||||
KeyStore ks = KeyStore.getInstance("JKS");
|
||||
ks.load(GceDiscoverTests.class.getResourceAsStream("keystore.jks"), passphrase);
|
||||
KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
|
||||
kmf.init(ks, passphrase);
|
||||
TrustManagerFactory tmf = TrustManagerFactory.getInstance("SunX509");
|
||||
tmf.init(ks);
|
||||
SSLContext ssl = SSLContext.getInstance("TLS");
|
||||
ssl.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
|
||||
return ssl;
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void stopHttpd() throws IOException {
|
||||
for (int i = 0; i < internalCluster().size(); i++) {
|
||||
// shut them all down otherwise we get spammed with connection refused exceptions
|
||||
internalCluster().stopRandomDataNode();
|
||||
}
|
||||
httpsServer.stop(0);
|
||||
httpServer.stop(0);
|
||||
httpsServer = null;
|
||||
httpServer = null;
|
||||
logDir = null;
|
||||
}
|
||||
|
||||
public void testJoin() throws ExecutionException, InterruptedException {
|
||||
// only wait for the cluster to form
|
||||
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(2)).get());
|
||||
// add one more node and wait for it to join
|
||||
internalCluster().startDataOnlyNodeAsync().get();
|
||||
assertNoTimeout(client().admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(3)).get());
|
||||
}
|
||||
}
|
Binary file not shown.
Loading…
Reference in New Issue