diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java index 5226f77874f..ba6acb4c0b6 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MlInitializationService.java @@ -10,11 +10,13 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.threadpool.ThreadPool; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,6 +47,11 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL @Override public void clusterChanged(ClusterChangedEvent event) { + if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) { + // Wait until the gateway has recovered from disk. + return; + } + if (event.localNodeMaster()) { MetaData metaData = event.state().metaData(); installMlMetadata(metaData); @@ -61,6 +68,10 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL clusterService.submitStateUpdateTask("install-ml-metadata", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { + // If the metadata has been added already don't try to update + if (currentState.metaData().custom(MlMetadata.TYPE) != null) { + return currentState; + } ClusterState.Builder builder = new ClusterState.Builder(currentState); MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); metadataBuilder.putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA); @@ -70,7 +81,13 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL @Override public void onFailure(String source, Exception e) { + installMlMetadataCheck.set(false); logger.error("unable to install ml metadata upon startup", e); + // Don't retry if no longer master + if (e instanceof NotMasterException == false) { + logger.debug("Retry installing ML metadata"); + installMlMetadata(metaData); + } } }); }); diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java index 1ee9cb66329..4a03f3ee2da 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/MlInitializationServiceTests.java @@ -10,6 +10,8 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -19,16 +21,21 @@ import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.junit.Before; +import org.mockito.Mockito; import java.net.InetAddress; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.elasticsearch.mock.orig.Mockito.times; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -89,7 +96,6 @@ public class MlInitializationServiceTests extends ESTestCase { } public void testInitialize_alreadyInitialized() throws Exception { - ClusterService clusterService = mock(ClusterService.class); MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); ClusterState cs = ClusterState.builder(new ClusterName("_name")) @@ -109,7 +115,6 @@ public class MlInitializationServiceTests extends ESTestCase { } public void testInitialize_onlyOnce() throws Exception { - ClusterService clusterService = mock(ClusterService.class); MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); ClusterState cs = ClusterState.builder(new ClusterName("_name")) @@ -125,6 +130,90 @@ public class MlInitializationServiceTests extends ESTestCase { verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any()); } + public void testInitialize_retiresMetaDataUpdateAfterFailure() throws Exception { + MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); + + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)) + .localNodeId("_node_id") + .masterNodeId("_node_id")) + .metaData(MetaData.builder()) + .build(); + + AtomicBoolean onFailureCalled = new AtomicBoolean(false); + AtomicReference clusterStateHolder = new AtomicReference<>(); + Mockito.doAnswer(invocation -> { + // We expect this method to be called twice. + // The first time the onFailure method is invoked and the + // second is successful + ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1]; + if (onFailureCalled.get()) { + clusterStateHolder.set(task.execute(cs)); + } else { + onFailureCalled.set(true); + task.onFailure("mock a failure", new IllegalStateException()); + } + return null; + }).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); + + initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); + + assertTrue("Something went wrong mocking the cluster update task", onFailureCalled.get()); + assertTrue("Something went wrong mocking the sucessful cluster update task", clusterStateHolder.get() != null); + verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); + + // 3rd update won't be called as ML Metadata has been installed + initializationService.clusterChanged(new ClusterChangedEvent("_source", clusterStateHolder.get(), clusterStateHolder.get())); + verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); + } + + public void testInitialize_retiresStopsTryingIfNoLongerMaster() throws Exception { + MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); + + ClusterState cs = ClusterState.builder(new ClusterName("_name")) + .nodes(DiscoveryNodes.builder() + .add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)) + .localNodeId("_node_id") + .masterNodeId("_node_id")) + .metaData(MetaData.builder()) + .build(); + + AtomicInteger tryCount = new AtomicInteger(); + Mockito.doAnswer(invocation -> { + // Fail with a general exception first time then a NotMasterException + // The NotMasterException should stop the retrying + ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1]; + if (tryCount.incrementAndGet() >= 2) { + task.onFailure("mock a not master failure", new NotMasterException("MLInitializationServiceTest")); + } else { + task.onFailure("mock a failure", new IllegalStateException()); + } + return null; + }).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); + + initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); + + assertTrue("Something went wrong mocking the cluster update task", tryCount.get() == 2); + // Should have been called exactly twice + verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); + + // Imagine the node has become master again, this time the update should work + AtomicReference clusterStateHolder = new AtomicReference<>(); + Mockito.doAnswer(invocation -> { + // We expect this method to be called twice. + // The first time the onFailure method is invoked and the + // second is successful + ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1]; + clusterStateHolder.set(task.execute(cs)); + return null; + }).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); + + initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs)); + assertTrue("Something went wrong mocking the sucessful cluster update task", clusterStateHolder.get() != null); + verify(clusterService, times(3)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class)); + } + public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception { MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class);