From 51e1199860b416de760e71b5ae09f109bea57ace Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Wed, 4 Jan 2017 15:50:14 +0100 Subject: [PATCH] Install prelert metadata and create required indices only once. Original commit: elastic/x-pack-elasticsearch@12c8ba0ce0c5271f34d04c3f35a9701420d600c7 --- .../PrelertInitializationService.java | 86 +++++++++++-------- .../PrelertInitializationServiceTests.java | 29 +++++++ 2 files changed, 80 insertions(+), 35 deletions(-) diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationService.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationService.java index 86aa73842b2..5e436493bd9 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationService.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationService.java @@ -18,12 +18,18 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.prelert.job.persistence.AnomalyDetectorsIndex; import org.elasticsearch.xpack.prelert.job.persistence.JobProvider; +import java.util.concurrent.atomic.AtomicBoolean; + public class PrelertInitializationService extends AbstractComponent implements ClusterStateListener { private final ThreadPool threadPool; private final ClusterService clusterService; private final JobProvider jobProvider; + private final AtomicBoolean installPrelertMetadataCheck = new AtomicBoolean(false); + private final AtomicBoolean createPrelertUsageIndexCheck = new AtomicBoolean(false); + private final AtomicBoolean createStateIndexCheck = new AtomicBoolean(false); + public PrelertInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService, JobProvider jobProvider) { super(settings); @@ -38,54 +44,64 @@ public class PrelertInitializationService extends AbstractComponent implements C if (event.localNodeMaster()) { MetaData metaData = event.state().metaData(); if (metaData.custom(PrelertMetadata.TYPE) == null) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - clusterService.submitStateUpdateTask("install-prelert-metadata", new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) throws Exception { - ClusterState.Builder builder = new ClusterState.Builder(currentState); - MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); - metadataBuilder.putCustom(PrelertMetadata.TYPE, PrelertMetadata.EMPTY_METADATA); - builder.metaData(metadataBuilder.build()); - return builder.build(); - } + if (installPrelertMetadataCheck.compareAndSet(false, true)) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + clusterService.submitStateUpdateTask("install-prelert-metadata", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + ClusterState.Builder builder = new ClusterState.Builder(currentState); + MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); + metadataBuilder.putCustom(PrelertMetadata.TYPE, PrelertMetadata.EMPTY_METADATA); + builder.metaData(metadataBuilder.build()); + return builder.build(); + } - @Override - public void onFailure(String source, Exception e) { - logger.error("unable to install prelert metadata upon startup", e); - } + @Override + public void onFailure(String source, Exception e) { + logger.error("unable to install prelert metadata upon startup", e); + } + }); }); - }); + } + } else { + installPrelertMetadataCheck.set(false); } if (metaData.hasIndex(JobProvider.PRELERT_USAGE_INDEX) == false) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - jobProvider.createUsageMeteringIndex((result, error) -> { - if (result) { - logger.info("successfully created prelert-usage index"); - } else { - if (error instanceof ResourceAlreadyExistsException) { - logger.debug("not able to create prelert-usage index as it already exists"); + if (createPrelertUsageIndexCheck.compareAndSet(false, true)) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + jobProvider.createUsageMeteringIndex((result, error) -> { + if (result) { + logger.info("successfully created prelert-usage index"); + createPrelertUsageIndexCheck.set(false); } else { - logger.error("not able to create prelert-usage index", error); + if (error instanceof ResourceAlreadyExistsException) { + logger.debug("not able to create prelert-usage index as it already exists"); + } else { + logger.error("not able to create prelert-usage index", error); + } } - } + }); }); - }); + } } String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName(); if (metaData.hasIndex(stateIndexName) == false) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { - jobProvider.createJobStateIndex((result, error) -> { - if (result) { - logger.info("successfully created {} index", stateIndexName); - } else { - if (error instanceof ResourceAlreadyExistsException) { - logger.debug("not able to create {} index as it already exists", stateIndexName); + if (createStateIndexCheck.compareAndSet(false, true)) { + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> { + jobProvider.createJobStateIndex((result, error) -> { + if (result) { + logger.info("successfully created {} index", stateIndexName); + createStateIndexCheck.set(false); } else { - logger.error("not able to create " + stateIndexName + " index", error); + if (error instanceof ResourceAlreadyExistsException) { + logger.debug("not able to create {} index as it already exists", stateIndexName); + } else { + logger.error("not able to create " + stateIndexName + " index", error); + } } - } + }); }); - }); + } } } } diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationServiceTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationServiceTests.java index df2dbf4cd67..6ab19401eb2 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationServiceTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/prelert/job/metadata/PrelertInitializationServiceTests.java @@ -124,4 +124,33 @@ public class PrelertInitializationServiceTests extends ESTestCase { verify(jobProvider, times(0)).createUsageMeteringIndex(any()); verify(jobProvider, times(0)).createJobStateIndex(any()); } + + public void testInitialize_onlyOnce() { + ThreadPool threadPool = mock(ThreadPool.class); + ExecutorService executorService = mock(ExecutorService.class); + doAnswer(invocation -> { + ((Runnable) invocation.getArguments()[0]).run(); + return null; + }).when(executorService).execute(any(Runnable.class)); + when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService); + + ClusterService clusterService = mock(ClusterService.class); + JobProvider jobProvider = mock(JobProvider.class); + PrelertInitializationService initializationService = + new PrelertInitializationService(Settings.EMPTY, threadPool, clusterService, jobProvider); + + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT)) + .localNodeId("_node_id") + .masterNodeId("_node_id")) + .metaData(MetaData.builder()) + .build(); + initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); + initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); + + verify(clusterService, times(1)).submitStateUpdateTask(eq("install-prelert-metadata"), any()); + verify(jobProvider, times(1)).createUsageMeteringIndex(any()); + verify(jobProvider, times(1)).createJobStateIndex(any()); + } }