Install prelert metadata and create required indices only once.
Original commit: elastic/x-pack-elasticsearch@12c8ba0ce0
This commit is contained in:
parent
27c9f39bf5
commit
51e1199860
|
@ -18,12 +18,18 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
import org.elasticsearch.xpack.prelert.job.persistence.AnomalyDetectorsIndex;
|
||||
import org.elasticsearch.xpack.prelert.job.persistence.JobProvider;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public class PrelertInitializationService extends AbstractComponent implements ClusterStateListener {
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
private final ClusterService clusterService;
|
||||
private final JobProvider jobProvider;
|
||||
|
||||
private final AtomicBoolean installPrelertMetadataCheck = new AtomicBoolean(false);
|
||||
private final AtomicBoolean createPrelertUsageIndexCheck = new AtomicBoolean(false);
|
||||
private final AtomicBoolean createStateIndexCheck = new AtomicBoolean(false);
|
||||
|
||||
public PrelertInitializationService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
|
||||
JobProvider jobProvider) {
|
||||
super(settings);
|
||||
|
@ -38,6 +44,7 @@ public class PrelertInitializationService extends AbstractComponent implements C
|
|||
if (event.localNodeMaster()) {
|
||||
MetaData metaData = event.state().metaData();
|
||||
if (metaData.custom(PrelertMetadata.TYPE) == null) {
|
||||
if (installPrelertMetadataCheck.compareAndSet(false, true)) {
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
||||
clusterService.submitStateUpdateTask("install-prelert-metadata", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
|
@ -56,11 +63,16 @@ public class PrelertInitializationService extends AbstractComponent implements C
|
|||
});
|
||||
});
|
||||
}
|
||||
} else {
|
||||
installPrelertMetadataCheck.set(false);
|
||||
}
|
||||
if (metaData.hasIndex(JobProvider.PRELERT_USAGE_INDEX) == false) {
|
||||
if (createPrelertUsageIndexCheck.compareAndSet(false, true)) {
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
||||
jobProvider.createUsageMeteringIndex((result, error) -> {
|
||||
if (result) {
|
||||
logger.info("successfully created prelert-usage index");
|
||||
createPrelertUsageIndexCheck.set(false);
|
||||
} else {
|
||||
if (error instanceof ResourceAlreadyExistsException) {
|
||||
logger.debug("not able to create prelert-usage index as it already exists");
|
||||
|
@ -71,12 +83,15 @@ public class PrelertInitializationService extends AbstractComponent implements C
|
|||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
String stateIndexName = AnomalyDetectorsIndex.jobStateIndexName();
|
||||
if (metaData.hasIndex(stateIndexName) == false) {
|
||||
if (createStateIndexCheck.compareAndSet(false, true)) {
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> {
|
||||
jobProvider.createJobStateIndex((result, error) -> {
|
||||
if (result) {
|
||||
logger.info("successfully created {} index", stateIndexName);
|
||||
createStateIndexCheck.set(false);
|
||||
} else {
|
||||
if (error instanceof ResourceAlreadyExistsException) {
|
||||
logger.debug("not able to create {} index as it already exists", stateIndexName);
|
||||
|
@ -90,3 +105,4 @@ public class PrelertInitializationService extends AbstractComponent implements C
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -124,4 +124,33 @@ public class PrelertInitializationServiceTests extends ESTestCase {
|
|||
verify(jobProvider, times(0)).createUsageMeteringIndex(any());
|
||||
verify(jobProvider, times(0)).createJobStateIndex(any());
|
||||
}
|
||||
|
||||
public void testInitialize_onlyOnce() {
|
||||
ThreadPool threadPool = mock(ThreadPool.class);
|
||||
ExecutorService executorService = mock(ExecutorService.class);
|
||||
doAnswer(invocation -> {
|
||||
((Runnable) invocation.getArguments()[0]).run();
|
||||
return null;
|
||||
}).when(executorService).execute(any(Runnable.class));
|
||||
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executorService);
|
||||
|
||||
ClusterService clusterService = mock(ClusterService.class);
|
||||
JobProvider jobProvider = mock(JobProvider.class);
|
||||
PrelertInitializationService initializationService =
|
||||
new PrelertInitializationService(Settings.EMPTY, threadPool, clusterService, jobProvider);
|
||||
|
||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT))
|
||||
.localNodeId("_node_id")
|
||||
.masterNodeId("_node_id"))
|
||||
.metaData(MetaData.builder())
|
||||
.build();
|
||||
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
|
||||
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
|
||||
|
||||
verify(clusterService, times(1)).submitStateUpdateTask(eq("install-prelert-metadata"), any());
|
||||
verify(jobProvider, times(1)).createUsageMeteringIndex(any());
|
||||
verify(jobProvider, times(1)).createJobStateIndex(any());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue