[ML] Fix bug initialising ML Metadata (elastic/x-pack-elasticsearch#1377)

* Fix bug initialising ML MetaData


Original commit: elastic/x-pack-elasticsearch@4e4923634a
This commit is contained in:
David Kyle 2017-05-10 15:01:37 +01:00 committed by GitHub
parent a5a44a2e2e
commit d34192ff6f
2 changed files with 108 additions and 2 deletions

View File

@ -10,11 +10,13 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -45,6 +47,11 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL
@Override @Override
public void clusterChanged(ClusterChangedEvent event) { public void clusterChanged(ClusterChangedEvent event) {
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// Wait until the gateway has recovered from disk.
return;
}
if (event.localNodeMaster()) { if (event.localNodeMaster()) {
MetaData metaData = event.state().metaData(); MetaData metaData = event.state().metaData();
installMlMetadata(metaData); installMlMetadata(metaData);
@ -61,6 +68,10 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL
clusterService.submitStateUpdateTask("install-ml-metadata", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("install-ml-metadata", new ClusterStateUpdateTask() {
@Override @Override
public ClusterState execute(ClusterState currentState) throws Exception { public ClusterState execute(ClusterState currentState) throws Exception {
// If the metadata has been added already don't try to update
if (currentState.metaData().custom(MlMetadata.TYPE) != null) {
return currentState;
}
ClusterState.Builder builder = new ClusterState.Builder(currentState); ClusterState.Builder builder = new ClusterState.Builder(currentState);
MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData()); MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
metadataBuilder.putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA); metadataBuilder.putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA);
@ -70,7 +81,13 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL
@Override @Override
public void onFailure(String source, Exception e) { public void onFailure(String source, Exception e) {
installMlMetadataCheck.set(false);
logger.error("unable to install ml metadata upon startup", e); logger.error("unable to install ml metadata upon startup", e);
// Don't retry if no longer master
if (e instanceof NotMasterException == false) {
logger.debug("Retry installing ML metadata");
installMlMetadata(metaData);
}
} }
}); });
}); });

View File

@ -10,6 +10,8 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -19,16 +21,21 @@ import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before; import org.junit.Before;
import org.mockito.Mockito;
import java.net.InetAddress; import java.net.InetAddress;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.mock.orig.Mockito.doAnswer; import static org.elasticsearch.mock.orig.Mockito.doAnswer;
import static org.elasticsearch.mock.orig.Mockito.times; import static org.elasticsearch.mock.orig.Mockito.times;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
@ -89,7 +96,6 @@ public class MlInitializationServiceTests extends ESTestCase {
} }
public void testInitialize_alreadyInitialized() throws Exception { public void testInitialize_alreadyInitialized() throws Exception {
ClusterService clusterService = mock(ClusterService.class);
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
ClusterState cs = ClusterState.builder(new ClusterName("_name")) ClusterState cs = ClusterState.builder(new ClusterName("_name"))
@ -109,7 +115,6 @@ public class MlInitializationServiceTests extends ESTestCase {
} }
public void testInitialize_onlyOnce() throws Exception { public void testInitialize_onlyOnce() throws Exception {
ClusterService clusterService = mock(ClusterService.class);
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
ClusterState cs = ClusterState.builder(new ClusterName("_name")) ClusterState cs = ClusterState.builder(new ClusterName("_name"))
@ -125,6 +130,90 @@ public class MlInitializationServiceTests extends ESTestCase {
verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any()); verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any());
} }
public void testInitialize_retiresMetaDataUpdateAfterFailure() throws Exception {
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.metaData(MetaData.builder())
.build();
AtomicBoolean onFailureCalled = new AtomicBoolean(false);
AtomicReference<ClusterState> clusterStateHolder = new AtomicReference<>();
Mockito.doAnswer(invocation -> {
// We expect this method to be called twice.
// The first time the onFailure method is invoked and the
// second is successful
ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1];
if (onFailureCalled.get()) {
clusterStateHolder.set(task.execute(cs));
} else {
onFailureCalled.set(true);
task.onFailure("mock a failure", new IllegalStateException());
}
return null;
}).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
assertTrue("Something went wrong mocking the cluster update task", onFailureCalled.get());
assertTrue("Something went wrong mocking the sucessful cluster update task", clusterStateHolder.get() != null);
verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
// 3rd update won't be called as ML Metadata has been installed
initializationService.clusterChanged(new ClusterChangedEvent("_source", clusterStateHolder.get(), clusterStateHolder.get()));
verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
}
public void testInitialize_retiresStopsTryingIfNoLongerMaster() throws Exception {
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.metaData(MetaData.builder())
.build();
AtomicInteger tryCount = new AtomicInteger();
Mockito.doAnswer(invocation -> {
// Fail with a general exception first time then a NotMasterException
// The NotMasterException should stop the retrying
ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1];
if (tryCount.incrementAndGet() >= 2) {
task.onFailure("mock a not master failure", new NotMasterException("MLInitializationServiceTest"));
} else {
task.onFailure("mock a failure", new IllegalStateException());
}
return null;
}).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
assertTrue("Something went wrong mocking the cluster update task", tryCount.get() == 2);
// Should have been called exactly twice
verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
// Imagine the node has become master again, this time the update should work
AtomicReference<ClusterState> clusterStateHolder = new AtomicReference<>();
Mockito.doAnswer(invocation -> {
// We expect this method to be called twice.
// The first time the onFailure method is invoked and the
// second is successful
ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1];
clusterStateHolder.set(task.execute(cs));
return null;
}).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
assertTrue("Something went wrong mocking the sucessful cluster update task", clusterStateHolder.get() != null);
verify(clusterService, times(3)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
}
public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception { public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception {
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client); MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class); MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class);