Marvel: HttpExporter should not clean indices

This commit removes the current implementation in HttpExporter so that it does not automatically clean indices anymore.

Original commit: elastic/x-pack-elasticsearch@7d30338355
This commit is contained in:
Tanguy Leroux 2016-01-06 10:35:40 +01:00
parent 57b7c7eb85
commit 54215200ba
5 changed files with 13 additions and 276 deletions

View File

@ -8,20 +8,16 @@ package org.elasticsearch.marvel.agent.exporter.http;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment;
@ -32,11 +28,8 @@ import org.elasticsearch.marvel.agent.exporter.MarvelTemplateUtils;
import org.elasticsearch.marvel.agent.renderer.Renderer;
import org.elasticsearch.marvel.agent.renderer.RendererRegistry;
import org.elasticsearch.marvel.agent.settings.MarvelSettings;
import org.elasticsearch.marvel.cleaner.CleanerService;
import org.elasticsearch.marvel.shield.MarvelSettingsFilter;
import org.elasticsearch.marvel.support.VersionUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
@ -59,14 +52,12 @@ import java.nio.file.Path;
import java.security.KeyStore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
/**
*
*/
public class HttpExporter extends Exporter implements CleanerService.Listener {
public class HttpExporter extends Exporter {
public static final String TYPE = "http";
@ -101,7 +92,6 @@ public class HttpExporter extends Exporter implements CleanerService.Listener {
final Environment env;
final RendererRegistry rendererRegistry;
final CleanerService cleanerService;
final @Nullable TimeValue templateCheckTimeout;
@ -115,12 +105,11 @@ public class HttpExporter extends Exporter implements CleanerService.Listener {
final ConnectionKeepAliveWorker keepAliveWorker;
Thread keepAliveThread;
public HttpExporter(Exporter.Config config, Environment env, RendererRegistry rendererRegistry, CleanerService cleanerService) {
public HttpExporter(Exporter.Config config, Environment env, RendererRegistry rendererRegistry) {
super(TYPE, config);
this.env = env;
this.rendererRegistry = rendererRegistry;
this.cleanerService = cleanerService;
hosts = config.settings().getAsArray(HOST_SETTING, Strings.EMPTY_ARRAY);
if (hosts.length == 0) {
@ -150,7 +139,6 @@ public class HttpExporter extends Exporter implements CleanerService.Listener {
if (templateVersion == null) {
throw new IllegalStateException("unable to find built-in template version");
}
cleanerService.add(this);
logger.debug("initialized with hosts [{}], index prefix [{}], index resolver [{}], template version [{}]",
Strings.arrayToCommaDelimitedString(hosts),
@ -165,7 +153,6 @@ public class HttpExporter extends Exporter implements CleanerService.Listener {
@Override
public void close() {
cleanerService.remove(this);
if (keepAliveThread != null && keepAliveThread.isAlive()) {
keepAliveWorker.closed = true;
keepAliveThread.interrupt();
@ -490,127 +477,6 @@ public class HttpExporter extends Exporter implements CleanerService.Listener {
}
}
@Override
public void onCleanUpIndices(TimeValue retention) {
// Retention duration can be overridden at exporter level
TimeValue exporterRetention = config.settings().getAsTime(CleanerService.HISTORY_DURATION, null);
if (exporterRetention != null) {
try {
cleanerService.validateRetention(exporterRetention);
retention = exporterRetention;
} catch (IllegalArgumentException e) {
logger.warn("http exporter [{}] - unable to use custom history duration [{}]: {}", name(), exporterRetention, e.getMessage());
}
}
// Reference date time will be compared to index.creation_date settings,
// that's why it must be in UTC
DateTime expiration = new DateTime(DateTimeZone.UTC).minus(retention.millis());
logger.debug("http exporter [{}] - cleaning indices [expiration={}, retention={}]", name(), expiration, retention);
Set<String> indices = new HashSet<>();
String host = hosts[0];
HttpURLConnection connection = null;
try {
String url = String.format("/%s*/_settings/%s", MarvelSettings.MARVEL_INDICES_PREFIX, IndexMetaData.SETTING_CREATION_DATE);
connection = openConnection(host, "GET", url, null);
if (connection == null) {
throw new ElasticsearchException("unable to clean indices: no available connection for host [" + host + "]");
}
long expirationTime = expiration.getMillis();
try (InputStream is = connection.getInputStream()) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
Streams.copy(is, out);
try (XContentParser parser = XContentHelper.createParser(new BytesArray(out.toByteArray()))) {
XContentParser.Token token;
String indexName = null;
while ((token = parser.nextToken()) != null) {
if (token == XContentParser.Token.FIELD_NAME) {
if ("settings".equals(parser.currentName()) || ("index".equals(parser.currentName())) || ("creation_date".equals(parser.currentName()))) {
continue;
}
indexName = parser.currentName();
} else if (token.isValue()) {
if ("creation_date".equals(parser.currentName())) {
if (Regex.simpleMatch(MarvelSettings.MARVEL_INDICES_PREFIX + "*", indexName)) {
// Never delete the data indices
if (indexName.startsWith(MarvelSettings.MARVEL_DATA_INDEX_PREFIX)) {
continue;
}
// Never delete the current timestamped index
if (indexName.equals(indexNameResolver().resolve(System.currentTimeMillis()))) {
continue;
}
long creationDate = parser.longValue();
if (creationDate <= expirationTime) {
if (logger.isDebugEnabled()) {
logger.debug("http exporter [{}] - detected expired index [name={}, created={}, expired={}]", name(),
indexName, new DateTime(creationDate, DateTimeZone.UTC), expiration);
}
indices.add(indexName);
}
}
}
}
}
}
}
}
} catch (IOException e) {
throw new ElasticsearchException("failed to verify the remote cluster version on host [" + host + "]:\n" + e.getMessage());
} finally {
if (connection != null) {
try {
connection.getInputStream().close();
} catch (IOException e) {
// Ignore
}
}
}
if (!indices.isEmpty()) {
logger.info("http exporter [{}] - cleaning up [{}] old indices", name(), indices.size());
deleteIndices(host, indices);
} else {
logger.debug("http exporter [{}] - no old indices found for clean up", name());
}
}
void deleteIndices(String host, Set<String> indices) {
logger.trace("http exporter [{}] - deleting {} indices: {}", name(), indices.size(), Strings.collectionToCommaDelimitedString(indices));
HttpURLConnection connection = null;
try {
connection = openConnection(host, "DELETE", "/" + Strings.collectionToCommaDelimitedString(indices), XContentType.JSON.restContentType());
if (connection == null) {
logger.debug("http exporter [{}] - no available connection to delete indices", name());
return;
}
if (connection.getResponseCode() != 200) {
logConnectionError("http exporter [" + name() +"] - unable to delete indices on host [" + host + "]", connection);
return;
}
logger.debug("http exporter [{}] - indices deleted", name());
} catch (Exception e) {
logger.error("local exporter [{}] - failed to delete indices on host [{}]", e, name(), host);
} finally {
if (connection != null) {
try {
connection.getInputStream().close();
} catch (IOException e) {
// Ignore
}
}
}
}
private void logConnectionError(String msg, HttpURLConnection conn) {
InputStream inputStream = conn.getErrorStream();
String err = "";
@ -848,19 +714,17 @@ public class HttpExporter extends Exporter implements CleanerService.Listener {
private final Environment env;
private final RendererRegistry rendererRegistry;
private final CleanerService cleanerService;
@Inject
public Factory(Environment env, RendererRegistry rendererRegistry, CleanerService cleanerService) {
public Factory(Environment env, RendererRegistry rendererRegistry) {
super(TYPE, false);
this.env = env;
this.rendererRegistry = rendererRegistry;
this.cleanerService = cleanerService;
}
@Override
public HttpExporter create(Config config) {
return new HttpExporter(config, env, rendererRegistry, cleanerService);
return new HttpExporter(config, env, rendererRegistry);
}
@Override

View File

@ -16,6 +16,7 @@ import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.marvel.license.MarvelLicensee;
import org.elasticsearch.threadpool.ThreadPool;
import org.joda.time.DateTime;
import org.joda.time.chrono.ISOChronology;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@ -54,7 +55,7 @@ public class CleanerService extends AbstractLifecycleComponent<CleanerService> {
protected void doStart() {
logger.debug("starting cleaning service");
this.runnable = new IndicesCleaner();
threadPool.schedule(executionScheduler.nextExecutionDelay(DateTime.now()), executorName(), runnable);
threadPool.schedule(executionScheduler.nextExecutionDelay(new DateTime(ISOChronology.getInstance())), executorName(), runnable);
logger.debug("cleaning service started");
}
@ -143,7 +144,7 @@ public class CleanerService extends AbstractLifecycleComponent<CleanerService> {
}
}
DateTime start = DateTime.now();
DateTime start = new DateTime(ISOChronology.getInstance());
if (globalRetention.millis() > 0) {
logger.trace("cleaning up indices with retention [{}]", globalRetention);

View File

@ -49,7 +49,7 @@ public class MarvelLicensee extends AbstractLicenseeComponent<MarvelLicensee> im
"separate and dedicated Marvel instance for each [{}] cluster you wish to monitor.",
newLicense.type(), newLicense.type(), newLicense.type()),
LoggerMessageFormat.format(
"Automatic index deletion is disabled for clusters with [{}] license.", newLicense.type())
"Automatic index cleanup is disabled for clusters with [{}] license.", newLicense.type())
};
}

View File

@ -20,6 +20,8 @@ import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.util.Locale;
import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@ -127,7 +129,7 @@ public abstract class AbstractIndicesCleanerTestCase extends MarvelIntegTestCase
public void testRetentionAsGlobalSetting() throws Exception {
final int max = 10;
final int retention = randomIntBetween(1, max);
internalCluster().startNode(Settings.builder().put(CleanerService.HISTORY_SETTING.getKey(), String.format("%dd", retention)));
internalCluster().startNode(Settings.builder().put(CleanerService.HISTORY_SETTING.getKey(), String.format(Locale.ROOT, "%dd", retention)));
final DateTime now = now();
for (int i = 0; i < max; i++) {
@ -146,7 +148,7 @@ public abstract class AbstractIndicesCleanerTestCase extends MarvelIntegTestCase
// Default retention is between 3 and max days
final int defaultRetention = randomIntBetween(3, max);
internalCluster().startNode(Settings.builder().put(CleanerService.HISTORY_SETTING.getKey(), String.format("%dd", defaultRetention)));
internalCluster().startNode(Settings.builder().put(CleanerService.HISTORY_SETTING.getKey(), String.format(Locale.ROOT, "%dd", defaultRetention)));
final DateTime now = now();
for (int i = 0; i < max; i++) {
@ -161,7 +163,7 @@ public abstract class AbstractIndicesCleanerTestCase extends MarvelIntegTestCase
// Updates the retention setting for the exporter
Exporters exporters = internalCluster().getInstance(Exporters.class);
for (Exporter exporter : exporters) {
Settings transientSettings = Settings.builder().put("marvel.agent.exporters." + exporter.name() + "." + CleanerService.HISTORY_DURATION, String.format("%dd", exporterRetention)).build();
Settings transientSettings = Settings.builder().put("marvel.agent.exporters." + exporter.name() + "." + CleanerService.HISTORY_DURATION, String.format(Locale.ROOT, "%dd", exporterRetention)).build();
assertAcked(client().admin().cluster().prepareUpdateSettings().setTransientSettings(transientSettings));
}

View File

@ -1,130 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.marvel.cleaner.http;
import com.squareup.okhttp.mockwebserver.Dispatcher;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.MockWebServer;
import com.squareup.okhttp.mockwebserver.RecordedRequest;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.marvel.agent.exporter.Exporter;
import org.elasticsearch.marvel.agent.exporter.http.HttpExporter;
import org.elasticsearch.marvel.cleaner.AbstractIndicesCleanerTestCase;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Before;
import java.io.IOException;
import java.net.BindException;
import java.util.Map;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.equalTo;
public class HttpIndicesCleanerTests extends AbstractIndicesCleanerTestCase {
private MockWebServer webServer;
private MockServerDispatcher dispatcher;
@Before
public void startWebServer() throws Exception {
for (int webPort = 9250; webPort < 9300; webPort++) {
try {
webServer = new MockWebServer();
dispatcher = new MockServerDispatcher();
webServer.setDispatcher(dispatcher);
webServer.start(webPort);
return;
} catch (BindException be) {
logger.warn("port [{}] was already in use trying next port", webPort);
}
}
throw new ElasticsearchException("unable to find open port between 9200 and 9300");
}
@After
public void stopWebServer() throws Exception {
webServer.shutdown();
}
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put("marvel.agent.exporters._http.type", HttpExporter.TYPE)
.put("marvel.agent.exporters._http.host", webServer.getHostName() + ":" + webServer.getPort())
.put("marvel.agent.exporters._http.connection.keep_alive", false)
.build();
}
@Override
protected void createIndex(String name, DateTime creationDate) {
dispatcher.addIndex(name, creationDate.getMillis());
}
@Override
protected void assertIndicesCount(int count) throws Exception {
assertThat(dispatcher.indices.size(), equalTo(count));
}
class MockServerDispatcher extends Dispatcher {
private Map<String, Long> indices = ConcurrentCollections.newConcurrentMap();
@Override
public MockResponse dispatch(RecordedRequest request) throws InterruptedException {
final String[] paths = Strings.splitStringToArray(request.getPath(), '/');
switch (request.getMethod()) {
case "GET":
if ((paths != null) && (paths.length == 3) && "_settings".equals(paths[1])) {
try {
// Builds a Get Settings response
XContentBuilder builder = jsonBuilder().startObject();
for (Map.Entry<String, Long> index : indices.entrySet()) {
builder.startObject(index.getKey());
builder.startObject("settings");
Settings settings = Settings.builder().put(IndexMetaData.SETTING_CREATION_DATE, index.getValue()).build();
settings.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
builder.endObject();
}
builder.endObject();
return new MockResponse().setResponseCode(200).setBody(builder.string());
} catch (IOException e) {
return new MockResponse().setResponseCode(500).setBody(e.getMessage());
}
}
break;
case "DELETE":
if ((paths != null) && (paths.length == 1)) {
String[] deletions = Strings.splitStringByCommaToArray(paths[0]);
if (deletions != null && deletions.length > 0) {
for (String deletion : deletions) {
if (indices.containsKey(deletion)) {
indices.remove(deletion);
} else {
return new MockResponse().setResponseCode(404);
}
}
return new MockResponse().setResponseCode(200);
}
}
break;
}
return new MockResponse().setResponseCode(404).setBody("unsupported request: " + request.getRequestLine());
}
public void addIndex(String name, long creation) {
indices.put(name, creation);
}
}
}