Ensure only 1 in-flight request to create each watcher index template (elastic/x-pack-elasticsearch#1660)

The aim of this change is to prevent many identical requests to create
watcher index templates being submitted when a cluster first starts up
and many cluster state updates are happening.  Prior to this change, if
watcher's original index template creation requests queued up behind other
cluster state change requests then for each other request watcher would
re-request creation of all its index templates.  After this change it
uses a strategy similar to that used by ML to only have one creation
request per index template in the cluster state change queue at any time.

Relates elastic/x-pack-elasticsearch#1368
Relates elastic/x-pack-elasticsearch#1631
Relates elastic/x-pack-elasticsearch#1650

Original commit: elastic/x-pack-elasticsearch@ad87bf3f78
This commit is contained in:
David Roberts 2017-06-08 10:05:39 +01:00 committed by GitHub
parent 29c11c30f3
commit f097ff906d
2 changed files with 53 additions and 12 deletions

View File

@ -28,7 +28,10 @@ import org.elasticsearch.xpack.template.TemplateUtils;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import static java.util.Collections.unmodifiableMap; import static java.util.Collections.unmodifiableMap;
@ -59,6 +62,8 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
private final ClusterService clusterService; private final ClusterService clusterService;
private final TemplateConfig[] indexTemplates; private final TemplateConfig[] indexTemplates;
private final ConcurrentMap<String, AtomicBoolean> templateCreationsInProgress = new ConcurrentHashMap<>();
private volatile Map<String, Settings> customIndexSettings; private volatile Map<String, Settings> customIndexSettings;
public WatcherIndexTemplateRegistry(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService, public WatcherIndexTemplateRegistry(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService,
@ -99,11 +104,17 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
void addTemplatesIfMissing(ClusterState state) { void addTemplatesIfMissing(ClusterState state) {
for (TemplateConfig template : indexTemplates) { for (TemplateConfig template : indexTemplates) {
if (!state.metaData().getTemplates().containsKey(template.getTemplateName())) { final String templateName = template.getTemplateName();
logger.debug("adding index template [{}], because it doesn't exist", template.getTemplateName()); final AtomicBoolean creationCheck = templateCreationsInProgress.computeIfAbsent(templateName, key -> new AtomicBoolean(false));
putTemplate(template); if (creationCheck.compareAndSet(false, true)) {
if (!state.metaData().getTemplates().containsKey(templateName)) {
logger.debug("adding index template [{}], because it doesn't exist", templateName);
putTemplate(template, creationCheck);
} else { } else {
logger.trace("not adding index template [{}], because it already exists", template.getTemplateName()); creationCheck.set(false);
logger.trace("not adding index template [{}], because it already exists", templateName);
}
} }
} }
} }
@ -145,17 +156,18 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
Map<String, Settings> customIndexSettings = new HashMap<String, Settings>(this.customIndexSettings); Map<String, Settings> customIndexSettings = new HashMap<String, Settings>(this.customIndexSettings);
customIndexSettings.put(config.getSetting().getKey(), builder.build()); customIndexSettings.put(config.getSetting().getKey(), builder.build());
this.customIndexSettings = customIndexSettings; this.customIndexSettings = customIndexSettings;
putTemplate(config); putTemplate(config, templateCreationsInProgress.computeIfAbsent(config.getTemplateName(), key -> new AtomicBoolean(true)));
} }
} }
private void putTemplate(final TemplateConfig config) { private void putTemplate(final TemplateConfig config, final AtomicBoolean creationCheck) {
final Executor executor = threadPool.generic(); final Executor executor = threadPool.generic();
executor.execute(() -> { executor.execute(() -> {
final byte[] template = TemplateUtils.loadTemplate("/" + config.getFileName()+ ".json", INDEX_TEMPLATE_VERSION, final String templateName = config.getTemplateName();
final byte[] template = TemplateUtils.loadTemplate("/" + config.getFileName() + ".json", INDEX_TEMPLATE_VERSION,
Pattern.quote("${xpack.watcher.template.version}")).getBytes(StandardCharsets.UTF_8); Pattern.quote("${xpack.watcher.template.version}")).getBytes(StandardCharsets.UTF_8);
PutIndexTemplateRequest request = new PutIndexTemplateRequest(config.getTemplateName()).source(template, XContentType.JSON); PutIndexTemplateRequest request = new PutIndexTemplateRequest(templateName).source(template, XContentType.JSON);
request.masterNodeTimeout(TimeValue.timeValueMinutes(1)); request.masterNodeTimeout(TimeValue.timeValueMinutes(1));
Settings customSettings = customIndexSettings.get(config.getSetting().getKey()); Settings customSettings = customIndexSettings.get(config.getSetting().getKey());
if (customSettings != null && customSettings.names().size() > 0) { if (customSettings != null && customSettings.names().size() > 0) {
@ -168,15 +180,16 @@ public class WatcherIndexTemplateRegistry extends AbstractComponent implements C
client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() { client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
@Override @Override
public void onResponse(PutIndexTemplateResponse response) { public void onResponse(PutIndexTemplateResponse response) {
creationCheck.set(false);
if (response.isAcknowledged() == false) { if (response.isAcknowledged() == false) {
logger.error("Error adding watcher template [{}], request was not acknowledged", config.getTemplateName()); logger.error("Error adding watcher template [{}], request was not acknowledged", templateName);
} }
} }
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
logger.error((Supplier<?>) () -> new ParameterizedMessage("Error adding watcher template [{}]", creationCheck.set(false);
config.getTemplateName()), e); logger.error(new ParameterizedMessage("Error adding watcher template [{}]", templateName), e);
} }
}); });
}); });

View File

@ -5,7 +5,10 @@
*/ */
package org.elasticsearch.xpack.watcher.support; package org.elasticsearch.xpack.watcher.support;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.client.IndicesAdminClient; import org.elasticsearch.client.IndicesAdminClient;
@ -27,6 +30,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.security.InternalClient; import org.elasticsearch.xpack.security.InternalClient;
import org.junit.Before; import org.junit.Before;
import org.mockito.ArgumentCaptor; import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -36,8 +41,11 @@ import java.util.Set;
import static org.elasticsearch.mock.orig.Mockito.verify; import static org.elasticsearch.mock.orig.Mockito.verify;
import static org.elasticsearch.mock.orig.Mockito.when; import static org.elasticsearch.mock.orig.Mockito.when;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
@ -64,6 +72,16 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase {
IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
when(adminClient.indices()).thenReturn(indicesAdminClient); when(adminClient.indices()).thenReturn(indicesAdminClient);
when(client.admin()).thenReturn(adminClient); when(client.admin()).thenReturn(adminClient);
doAnswer(new Answer<Void>() {
@SuppressWarnings("unchecked")
@Override
public Void answer(InvocationOnMock invocationOnMock) {
ActionListener<PutIndexTemplateResponse> listener =
(ActionListener<PutIndexTemplateResponse>) invocationOnMock.getArguments()[2];
listener.onResponse(new TestPutIndexTemplateResponse(true));
return null;
}
}).when(client).execute(same(PutIndexTemplateAction.INSTANCE), any(), any());
ClusterService clusterService = mock(ClusterService.class); ClusterService clusterService = mock(ClusterService.class);
registry = new WatcherIndexTemplateRegistry(Settings.EMPTY, clusterSettings, clusterService, threadPool, internalClient); registry = new WatcherIndexTemplateRegistry(Settings.EMPTY, clusterSettings, clusterService, threadPool, internalClient);
@ -102,4 +120,14 @@ public class WatcherIndexTemplateRegistryTests extends ESTestCase {
registry.clusterChanged(newEvent); registry.clusterChanged(newEvent);
verify(client, times(4)).execute(anyObject(), argumentCaptor.capture(), anyObject()); verify(client, times(4)).execute(anyObject(), argumentCaptor.capture(), anyObject());
} }
private static class TestPutIndexTemplateResponse extends PutIndexTemplateResponse {
TestPutIndexTemplateResponse(boolean acknowledged) {
super(acknowledged);
}
TestPutIndexTemplateResponse() {
super();
}
}
} }