monitoring: local exporter waits for in flight requests before retrying

The local exporter previously fired off asynchronous requests every time a cluster state was
observed that did not contain all of the required items for monitoring. This change adds a
flag so that monitoring can wait for the pending requests to complete before retrying. This
will reduce the number of duplicated log messages as well.

Additionally, the log message for adding modern aliases now contains the name of the indices.

Closes elastic/elasticsearch#3756

Original commit: elastic/x-pack-elasticsearch@727a0adfbe
This commit is contained in:
Jay Modi 2016-12-05 09:01:24 -05:00 committed by GitHub
parent 2b7c03848f
commit 9afb6dd4f2
1 changed files with 91 additions and 40 deletions

View File

@ -19,7 +19,9 @@ import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.action.ingest.PutPipelineRequest;
import org.elasticsearch.action.ingest.WritePipelineResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
@ -46,10 +48,16 @@ import org.elasticsearch.xpack.security.InternalClient;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -68,6 +76,7 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
private final CleanerService cleanerService;
private final AtomicReference<State> state = new AtomicReference<>(State.INITIALIZED);
private final AtomicBoolean installingSomething = new AtomicBoolean(false);
public LocalExporter(Exporter.Config config, InternalClient client,
ClusterService clusterService, CleanerService cleanerService) {
@ -158,35 +167,41 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
logger.trace("monitoring index templates and pipelines are installed, service can start");
} else {
// TODO we really shouldn't continually attempt to put the resources on every cluster state change. We should be patient.
// we are on the elected master
//
// Check that there is nothing that could block metadata updates
if (clusterState.blocks().hasGlobalBlock(ClusterBlockLevel.METADATA_WRITE)) {
logger.debug("waiting until metadata writes are unblocked");
return null;
}
// whenever we install anything, we return null to force it to retry to give the cluster a chance to catch up
boolean installedSomething = false;
if (installingSomething.get() == true) {
logger.trace("already installing something, waiting for install to complete");
return null;
}
// build a list of runnables for everything that is missing, but do not start execution
final List<Runnable> asyncActions = new ArrayList<>();
final AtomicInteger pendingResponses = new AtomicInteger(0);
// Check that each required template exist, installing it if needed
for (Map.Entry<String, String> template : templates.entrySet()) {
if (hasTemplate(template.getKey(), clusterState) == false) {
logger.debug("template [{}] not found", template.getKey());
putTemplate(template.getKey(), template.getValue());
installedSomething = true;
} else {
logger.trace("template [{}] found", template.getKey());
final List<Entry<String, String>> missingTemplates = templates.entrySet()
.stream()
.filter((e) -> hasTemplate(e.getKey(), clusterState) == false)
.collect(Collectors.toList());
if (missingTemplates.isEmpty() == false) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("template {} not found",
missingTemplates.stream().map(Map.Entry::getKey).collect(Collectors.toList())));
for (Entry<String, String> template : missingTemplates) {
asyncActions.add(() -> putTemplate(template.getKey(), template.getValue(),
new ResponseActionListener<>("template", template.getKey(), pendingResponses)));
}
}
// if we don't have the ingest pipeline, then install it
if (hasIngestPipelines(clusterState) == false) {
logger.debug("pipeline [{}] not found", EXPORT_PIPELINE_NAME);
putIngestPipeline();
installedSomething = true;
asyncActions.add(() -> putIngestPipeline(new ResponseActionListener<>("pipeline", EXPORT_PIPELINE_NAME, pendingResponses)));
} else {
logger.trace("pipeline [{}] found", EXPORT_PIPELINE_NAME);
}
@ -195,31 +210,54 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
if (addAliasesTo2xIndices == null) {
logger.trace("there are no 2.x monitoring indices or they have all the aliases they need");
} else {
logger.debug("there are 2.x monitoring indices and they are missing some aliases to make them compatible with 5.x");
client.execute(IndicesAliasesAction.INSTANCE, addAliasesTo2xIndices, new ActionListener<IndicesAliasesResponse>() {
final List<String> monitoringIndices2x = addAliasesTo2xIndices.getAliasActions().stream()
.flatMap((a) -> Arrays.stream(a.indices()))
.collect(Collectors.toList());
logger.debug("there are 2.x monitoring indices {} and they are missing some aliases to make them compatible with 5.x",
monitoringIndices2x);
asyncActions.add(() -> client.execute(IndicesAliasesAction.INSTANCE, addAliasesTo2xIndices,
new ActionListener<IndicesAliasesResponse>() {
@Override
public void onResponse(IndicesAliasesResponse response) {
responseReceived();
if (response.isAcknowledged()) {
logger.info("Added modern aliases to 2.x monitoring indices");
logger.info("Added modern aliases to 2.x monitoring indices {}", monitoringIndices2x);
} else {
logger.info("Unable to add modern aliases to 2.x monitoring indices, response not acknowledged.");
logger.info("Unable to add modern aliases to 2.x monitoring indices {}, response not acknowledged.",
monitoringIndices2x);
}
}
@Override
public void onFailure(Exception e) {
logger.error("Unable to add modern aliases to 2.x monitoring indices", e);
}
});
installedSomething = true;
responseReceived();
logger.error((Supplier<?>)
() -> new ParameterizedMessage("Unable to add modern aliases to 2.x monitoring indices {}",
monitoringIndices2x), e);
}
if (installedSomething) {
// let the cluster catch up (and because we do the PUTs asynchronously)
private void responseReceived() {
if (pendingResponses.decrementAndGet() <= 0) {
logger.trace("all installation requests returned a response");
if (installingSomething.compareAndSet(true, false) == false) {
throw new IllegalStateException("could not reset installing flag to false");
}
}
}
}));
}
if (asyncActions.size() > 0) {
if (installingSomething.compareAndSet(false, true)) {
pendingResponses.set(asyncActions.size());
asyncActions.forEach(Runnable::run);
} else {
// let the cluster catch up since requested installations may be ongoing
return null;
}
logger.trace("monitoring index templates and pipelines are installed on master node, service can start");
} else {
logger.debug("monitoring index templates and pipelines are installed on master node, service can start");
}
}
if (state.compareAndSet(State.INITIALIZED, State.RUNNING)) {
@ -260,13 +298,13 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
* </code></pre>
* That should be used (until something better exists) to ensure that we do not override <em>newer</em> pipelines with our own.
*/
private void putIngestPipeline() {
private void putIngestPipeline(ActionListener<WritePipelineResponse> listener) {
logger.debug("installing ingest pipeline [{}]", EXPORT_PIPELINE_NAME);
final BytesReference emptyPipeline = emptyPipeline(XContentType.JSON).bytes();
final PutPipelineRequest request = new PutPipelineRequest(EXPORT_PIPELINE_NAME, emptyPipeline);
client.admin().cluster().putPipeline(request, new ResponseActionListener<>("pipeline", EXPORT_PIPELINE_NAME));
client.admin().cluster().putPipeline(request, listener);
}
/**
@ -297,14 +335,14 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
return templates.size() > 0;
}
private void putTemplate(String template, String source) {
private void putTemplate(String template, String source, ActionListener<PutIndexTemplateResponse> listener) {
logger.debug("installing template [{}]",template);
PutIndexTemplateRequest request = new PutIndexTemplateRequest(template).source(source);
assert !Thread.currentThread().isInterrupted() : "current thread has been interrupted before putting index template!!!";
// async call, so we won't block cluster event thread
client.admin().indices().putTemplate(request, new ResponseActionListener<>("template", template));
client.admin().indices().putTemplate(request, listener);
}
@Override
@ -422,14 +460,17 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
private final String type;
private final String name;
private final AtomicInteger countDown;
public ResponseActionListener(String type, String name) {
private ResponseActionListener(String type, String name, AtomicInteger countDown) {
this.type = Objects.requireNonNull(type);
this.name = Objects.requireNonNull(name);
this.countDown = Objects.requireNonNull(countDown);
}
@Override
public void onResponse(Response response) {
responseReceived();
if (response.isAcknowledged()) {
logger.trace("successfully set monitoring {} [{}]", type, name);
} else {
@ -439,7 +480,17 @@ public class LocalExporter extends Exporter implements ClusterStateListener, Cle
@Override
public void onFailure(Exception e) {
responseReceived();
logger.error((Supplier<?>) () -> new ParameterizedMessage("failed to set monitoring index {} [{}]", type, name), e);
}
private void responseReceived() {
if (countDown.decrementAndGet() <= 0) {
logger.trace("all installation requests returned a response");
if (installingSomething.compareAndSet(true, false) == false) {
throw new IllegalStateException("could not reset installing flag to false");
}
}
}
}
}