From fdf86967cec9f7aec795f1f45aa4466042606995 Mon Sep 17 00:00:00 2001 From: David Kyle Date: Wed, 10 May 2017 16:00:03 +0100 Subject: [PATCH] [ML] Fix bug initialising ML MetaData (elastic/x-pack-elasticsearch#1386) Slightly different to the commit I just reverted (elastic/x-pack-elasticsearch#1352) Original commit: elastic/x-pack-elasticsearch@46339418ae628e795c1cbb0bb18652465a9f012e --- .../xpack/ml/MlInitializationService.java | 13 +++++- .../ml/MlInitializationServiceTests.java | 44 +++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index 5226f77874f..4006ffe806f 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -15,6 +15,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,6 +46,11 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL @Override public void clusterChanged(ClusterChangedEvent event) { + if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // Wait until the gateway has recovered from disk. + return; + } + if (event.localNodeMaster()) { MetaData metaData = event.state().metaData(); installMlMetadata(metaData); @@ -61,6 +67,10 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL clusterService.submitStateUpdateTask("install-ml-metadata", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { + // If the metadata has been added already don't try to update + if (currentState.metaData().custom(MlMetadata.TYPE) != null) { + return currentState; + } ClusterState.Builder builder = new ClusterState.Builder(currentState); MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); metadataBuilder.putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA); @@ -70,7 +80,8 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL @Override public void onFailure(String source, Exception e) { - logger.error("unable to install ml metadata upon startup", e); + installMlMetadataCheck.set(false); + logger.error("unable to install ml metadata", e); } }); }); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index 1ee9cb66329..81dd9a1f342 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -19,10 +20,13 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; +import org.mockito.Mockito; import java.net.InetAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.elasticsearch.mock.orig.Mockito.times; @@ -125,6 +129,46 @@ public class MlInitializationServiceTests extends ESTestCase { verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any()); } + public void testInitialize_reintialiseAfterFailure() throws Exception { + MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); + + // Fail the first cluster state update + AtomicBoolean onFailureCalled = new AtomicBoolean(false); + Mockito.doAnswer(invocation -> { + ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1]; + task.onFailure("mock a failure", new IllegalStateException()); + onFailureCalled.set(true); + return null; + }).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); + + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT)) + .localNodeId("_node_id") + .masterNodeId("_node_id")) + .metaData(MetaData.builder()) + .build(); + initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); + assertTrue("Something went wrong mocking the cluster update task", onFailureCalled.get()); + verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); + + // 2nd update succeeds + AtomicReference clusterStateHolder = new AtomicReference<>(); + Mockito.doAnswer(invocation -> { + ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1]; + clusterStateHolder.set(task.execute(cs)); + return null; + }).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); + + initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); + assertTrue("Something went wrong mocking the sucessful cluster update task", clusterStateHolder.get() != null); + verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); + + // 3rd update won't be called as ML Metadata has been installed + initializationService.clusterChanged(new ClusterChangedEvent("_source", clusterStateHolder.get(), clusterStateHolder.get())); + verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); + } + public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception { MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class);