[ML] Wait for templates to be installed before running ML Integ tests (elastic/x-pack-elasticsearch#681)

* [ML] Wait for templates to be installed before running ML Integ tests

* Revert put job template check and preserve template change

* Review comments

Original commit: elastic/x-pack-elasticsearch@21deb34f4a
This commit is contained in:
David Kyle 2017-03-03 10:04:34 +00:00 committed by GitHub
parent be2ee21d16
commit 01de84a19f
9 changed files with 104 additions and 73 deletions

View File

@ -53,6 +53,9 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
private final Client client;
private final ThreadPool threadPool;
public static String [] TEMPLATE_NAMES = new String [] {Auditor.NOTIFICATIONS_INDEX, AnomalyDetectorsIndex.ML_META_INDEX,
AnomalyDetectorsIndex.jobStateIndexName(), AnomalyDetectorsIndex.jobResultsIndexPrefix()};
final AtomicBoolean putMlNotificationsIndexTemplateCheck = new AtomicBoolean(false);
final AtomicBoolean putMlMetaIndexTemplateCheck = new AtomicBoolean(false);
final AtomicBoolean putStateIndexTemplateCheck = new AtomicBoolean(false);
@ -84,33 +87,24 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
// wait until the gateway has recovered from disk,
// otherwise we think may not have the index templates while they actually do exist
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK) == false) {
addTemplatesIfMissing(event.state(), () -> {});
addTemplatesIfMissing(event.state());
}
}
}
/**
* Blocking call adds the registered index templates if missing to the
* Puts the registered index templates if missing to the
* cluster waiting until the templates have been updated.
*/
public void addTemplatesIfMissing(ClusterState state) throws InterruptedException {
// to be sure that the templates exist after this method call, we should wait until the put index templates calls
// have returned if the templates were missing
CountDownLatch latch = new CountDownLatch(4);
addTemplatesIfMissing(state, latch::countDown);
latch.await();
}
private void addTemplatesIfMissing(ClusterState state, Runnable callback) {
public void addTemplatesIfMissing(ClusterState state) {
MetaData metaData = state.metaData();
addMlNotificationsIndexTemplate(metaData, callback);
addMlMetaIndexTemplate(metaData, callback);
addStateIndexTemplate(metaData, callback);
addResultsIndexTemplate(metaData, callback);
addMlNotificationsIndexTemplate(metaData);
addMlMetaIndexTemplate(metaData);
addStateIndexTemplate(metaData);
addResultsIndexTemplate(metaData);
}
boolean templateIsPresentAndUpToDate(String templateName, MetaData metaData) {
static boolean templateIsPresentAndUpToDate(String templateName, MetaData metaData) {
IndexTemplateMetaData templateMetaData = metaData.templates().get(templateName);
if (templateMetaData == null) {
return false;
@ -119,7 +113,7 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
return templateMetaData.version() != null && templateMetaData.version() >= Version.CURRENT.id;
}
private void addMlNotificationsIndexTemplate(MetaData metaData, Runnable callback) {
private void addMlNotificationsIndexTemplate(MetaData metaData) {
if (templateIsPresentAndUpToDate(Auditor.NOTIFICATIONS_INDEX, metaData) == false) {
if (putMlNotificationsIndexTemplateCheck.compareAndSet(false, true)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
@ -131,18 +125,13 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
logger.error(
new ParameterizedMessage("not able to create {} index template", Auditor.NOTIFICATIONS_INDEX), error);
}
callback.run();
});
});
} else {
callback.run();
}
} else {
callback.run();
}
}
private void addMlMetaIndexTemplate(MetaData metaData, Runnable callback) {
private void addMlMetaIndexTemplate(MetaData metaData) {
if (templateIsPresentAndUpToDate(AnomalyDetectorsIndex.ML_META_INDEX, metaData) == false) {
if (putMlMetaIndexTemplateCheck.compareAndSet(false, true)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
@ -154,18 +143,13 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
logger.error(new ParameterizedMessage(
"not able to create {} index template", AnomalyDetectorsIndex.ML_META_INDEX), error);
}
callback.run();
});
});
} else {
callback.run();
}
} else {
callback.run();
}
}
private void addStateIndexTemplate(MetaData metaData, Runnable callback) {
private void addStateIndexTemplate(MetaData metaData) {
String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName();
if (templateIsPresentAndUpToDate(stateIndexName, metaData) == false) {
if (putStateIndexTemplateCheck.compareAndSet(false, true)) {
@ -177,18 +161,13 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
} else {
logger.error("not able to create " + stateIndexName + " index template", error);
}
callback.run();
});
});
} else {
callback.run();
}
} else {
callback.run();
}
}
private void addResultsIndexTemplate(MetaData metaData, Runnable callback) {
private void addResultsIndexTemplate(MetaData metaData) {
if (templateIsPresentAndUpToDate(AnomalyDetectorsIndex.jobResultsIndexPrefix(), metaData) == false) {
if (putResultsIndexTemplateCheck.compareAndSet(false, true)) {
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
@ -201,14 +180,9 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
new ParameterizedMessage("not able to create {} index template",
AnomalyDetectorsIndex.jobResultsIndexPrefix()), error);
}
callback.run();
});
});
} else {
callback.run();
}
} else {
callback.run();
}
}
@ -344,4 +318,13 @@ public class MachineLearningTemplateRegistry extends AbstractComponent implemen
// much faster
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), ASYNC);
}
public static boolean allTemplatesInstalled(MetaData metaData) {
boolean allPresent = true;
for (String templateName : TEMPLATE_NAMES) {
allPresent = allPresent && templateIsPresentAndUpToDate(templateName, metaData);
}
return allPresent;
}
}

View File

@ -19,6 +19,7 @@ import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.MachineLearningTemplateRegistry;
import org.elasticsearch.xpack.ml.action.DeleteJobAction;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.RevertModelSnapshotAction;

View File

@ -39,7 +39,9 @@ import org.junit.Before;
import org.mockito.ArgumentCaptor;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
@ -271,24 +273,45 @@ public class MachineLearningTemplateRegistryTests extends ESTestCase {
}
public void testTemplateIsPresentAndUpToDate() {
MachineLearningTemplateRegistry templateRegistry =
new MachineLearningTemplateRegistry(createSettings(), clusterService, client, threadPool);
// missing template
MetaData metaData = MetaData.builder().build();
assertFalse(templateRegistry.templateIsPresentAndUpToDate(Auditor.NOTIFICATIONS_INDEX, metaData));
assertFalse(MachineLearningTemplateRegistry.templateIsPresentAndUpToDate(Auditor.NOTIFICATIONS_INDEX, metaData));
// old version of template
IndexTemplateMetaData templateMetaData = IndexTemplateMetaData.builder(Auditor.NOTIFICATIONS_INDEX)
.version(Version.CURRENT.id - 1).build();
metaData = MetaData.builder().put(templateMetaData).build();
assertFalse(templateRegistry.templateIsPresentAndUpToDate(Auditor.NOTIFICATIONS_INDEX, metaData));
assertFalse(MachineLearningTemplateRegistry.templateIsPresentAndUpToDate(Auditor.NOTIFICATIONS_INDEX, metaData));
// latest template
templateMetaData = IndexTemplateMetaData.builder(Auditor.NOTIFICATIONS_INDEX)
.version(Version.CURRENT.id).build();
metaData = MetaData.builder().put(templateMetaData).build();
assertTrue(templateRegistry.templateIsPresentAndUpToDate(Auditor.NOTIFICATIONS_INDEX, metaData));
assertTrue(MachineLearningTemplateRegistry.templateIsPresentAndUpToDate(Auditor.NOTIFICATIONS_INDEX, metaData));
}
public void testAllTemplatesInstalled() {
MetaData metaData = MetaData.builder()
.put(IndexTemplateMetaData.builder(Auditor.NOTIFICATIONS_INDEX).version(Version.CURRENT.id).build())
.put(IndexTemplateMetaData.builder(AnomalyDetectorsIndex.ML_META_INDEX).version(Version.CURRENT.id).build())
.put(IndexTemplateMetaData.builder(AnomalyDetectorsIndex.jobStateIndexName()).version(Version.CURRENT.id).build())
.put(IndexTemplateMetaData.builder(
AnomalyDetectorsIndex.jobResultsIndexPrefix()).version(Version.CURRENT.id).build()).build();
assertTrue(MachineLearningTemplateRegistry.allTemplatesInstalled(metaData));
}
public void testAllTemplatesInstalled_OneMissing() {
MetaData.Builder metaDataBuilder = MetaData.builder();
String missing = randomFrom(MachineLearningTemplateRegistry.TEMPLATE_NAMES);
for (String templateName : MachineLearningTemplateRegistry.TEMPLATE_NAMES) {
if (templateName.equals(missing)) {
continue;
}
metaDataBuilder.put(IndexTemplateMetaData.builder(templateName).version(Version.CURRENT.id).build());
}
assertFalse(MachineLearningTemplateRegistry.allTemplatesInstalled(metaDataBuilder.build()));
}
private Settings createSettings() {

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.integration;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.Writeable;
@ -52,6 +53,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.mockito.Matchers.any;
@ -226,7 +228,7 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
assertResultsAreSame(allRecords, persistedRecords);
}
private void putIndexTemplates() throws InterruptedException {
private void putIndexTemplates() throws Exception {
ThreadPool threadPool = mock(ThreadPool.class);
ExecutorService executorService = mock(ExecutorService.class);
doAnswer(invocation -> {
@ -235,8 +237,15 @@ public class AutodetectResultProcessorIT extends ESSingleNodeTestCase {
}).when(executorService).execute(any(Runnable.class));
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
new MachineLearningTemplateRegistry(Settings.EMPTY, mock(ClusterService.class), client(), threadPool)
.addTemplatesIfMissing(client().admin().cluster().state(new ClusterStateRequest().all()).actionGet().getState());
new MachineLearningTemplateRegistry(Settings.EMPTY, mock(ClusterService.class), client(), threadPool)
.addTemplatesIfMissing(client().admin().cluster().state(new ClusterStateRequest().all()).actionGet().getState());
// block until the templates are installed
assertBusy(() -> {
MetaData metaData = client().admin().cluster().prepareState().get().getState().getMetaData();
assertTrue("Timed out waiting for the ML templates to be installed",
MachineLearningTemplateRegistry.allTemplatesInstalled(metaData));
});
}
private Bucket createBucket(boolean isInterim) {

View File

@ -39,9 +39,14 @@ public class DatafeedJobIT extends ESRestTestCase {
return Settings.builder().put(ThreadContext.PREFIX + ".Authorization", BASIC_AUTH_VALUE).build();
}
@Override
protected boolean preserveTemplatesUponCompletion() {
return true;
}
@Before
public void setUpData() throws Exception {
// Create empty index
String mappings = "{"
+ " \"mappings\": {"
+ " \"response\": {"

View File

@ -42,6 +42,11 @@ public class MlJobIT extends ESRestTestCase {
return Settings.builder().put(super.restClientSettings()).put(ThreadContext.PREFIX + ".Authorization", BASIC_AUTH_VALUE).build();
}
@Override
protected boolean preserveTemplatesUponCompletion() {
return true;
}
private static final String RESULT_MAPPING = "{ \"mappings\": {\"result\": { \"properties\": { " +
"\"result_type\": { \"type\" : \"keyword\" }," +
"\"timestamp\": { \"type\" : \"date\" }, " +

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.ml.job;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
@ -13,6 +14,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.MlMetadata;
@ -21,16 +23,9 @@ import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.junit.Before;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.ml.job.config.JobTests.buildJobBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class JobManagerTests extends ESTestCase {
@ -56,15 +51,6 @@ public class JobManagerTests extends ESTestCase {
assertThat(doc.results().get(0).getId(), equalTo("foo"));
}
public void testFilter() {
Set<String> running = new HashSet<>(Arrays.asList("henry", "dim", "dave"));
Set<String> diff = new HashSet<>(Arrays.asList("dave", "tom")).stream().filter((s) -> !running.contains(s))
.collect(Collectors.toCollection(HashSet::new));
assertTrue(diff.size() == 1);
assertTrue(diff.contains("tom"));
}
public void testGetJobOrThrowIfUnknown_GivenUnknownJob() {
ClusterState cs = createClusterState();
ESTestCase.expectThrows(ResourceNotFoundException.class, () -> JobManager.getJobOrThrowIfUnknown(cs, "foo"));

View File

@ -23,6 +23,7 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.SecurityIntegTestCase;
import org.elasticsearch.xpack.XPackSettings;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MachineLearningTemplateRegistry;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.action.CloseJobAction;
import org.elasticsearch.xpack.ml.action.DeleteDatafeedAction;
@ -39,6 +40,7 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.process.autodetect.state.DataCounts;
import org.junit.After;
import org.junit.Before;
import java.util.Collection;
import java.util.Collections;
@ -48,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
@ -97,6 +100,15 @@ public abstract class BaseMlIntegTestCase extends SecurityIntegTestCase {
return mocks;
}
@Before
public void ensureTemplatesArePresent() throws Exception {
assertBusy(() -> {
MetaData metaData = client().admin().cluster().prepareState().get().getState().getMetaData();
assertTrue("Timed out waiting for the ML templates to be installed",
MachineLearningTemplateRegistry.allTemplatesInstalled(metaData));
});
}
protected Job.Builder createJob(String id) {
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setFormat(DataDescription.DataFormat.JSON);

View File

@ -6,8 +6,10 @@
package org.elasticsearch.xpack.test.rest;
import org.apache.http.HttpStatus;
import org.elasticsearch.common.Strings;
import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
import org.elasticsearch.test.rest.yaml.ClientYamlTestResponse;
import org.elasticsearch.xpack.ml.MachineLearningTemplateRegistry;
import org.elasticsearch.xpack.ml.integration.MlRestTestStateCleaner;
import org.elasticsearch.xpack.security.SecurityLifecycleService;
import org.junit.After;
@ -34,17 +36,22 @@ public class XPackRestIT extends XPackRestTestCase {
}
/**
* Waits for the Security template to be created by the {@link SecurityLifecycleService}.
* Waits for the Security template to be created by the {@link SecurityLifecycleService} and
* the Machine Learning templates to be created by {@link MachineLearningTemplateRegistry}
*/
@Before
public void waitForSecurityTemplate() throws Exception {
String templateApi = "indices.exists_template";
Map<String, String> params = singletonMap("name", SecurityLifecycleService.SECURITY_TEMPLATE_NAME);
public void waitForTemplates() throws Exception {
waitForTemplate(SecurityLifecycleService.SECURITY_TEMPLATE_NAME);
waitForTemplate(Strings.arrayToCommaDelimitedString(MachineLearningTemplateRegistry.TEMPLATE_NAMES));
}
private void waitForTemplate(String templateName) throws Exception {
Map<String, String> params = singletonMap("name", templateName);
AtomicReference<IOException> exceptionHolder = new AtomicReference<>();
awaitBusy(() -> {
try {
ClientYamlTestResponse response = getAdminExecutionContext().callApi(templateApi, params, emptyList(), emptyMap());
ClientYamlTestResponse response = getAdminExecutionContext().callApi("indices.exists_template",
params, emptyList(), emptyMap());
// We don't check the version of the template - it is the right one when testing documentation.
if (response.getStatusCode() == HttpStatus.SC_OK) {
exceptionHolder.set(null);
@ -58,7 +65,7 @@ public class XPackRestIT extends XPackRestTestCase {
IOException exception = exceptionHolder.get();
if (exception != null) {
throw new IllegalStateException("Exception when waiting for security template to be created", exception);
throw new IllegalStateException("Exception when waiting for [" + templateName + "] template to be created", exception);
}
}
}