Revert "[ML] Fix bug initialising ML Metadata (elastic/x-pack-elasticsearch#1377)"
This reverts commit elastic/x-pack-elasticsearch@4e4923634a. Original commit: elastic/x-pack-elasticsearch@3de80a1577
This commit is contained in:
parent
c43ae014b4
commit
1994b42cd5
|
@ -10,13 +10,11 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateListener;
|
import org.elasticsearch.cluster.ClusterStateListener;
|
||||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||||
import org.elasticsearch.cluster.NotMasterException;
|
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.component.LifecycleListener;
|
import org.elasticsearch.common.component.LifecycleListener;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.gateway.GatewayService;
|
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
@ -47,11 +45,6 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void clusterChanged(ClusterChangedEvent event) {
|
public void clusterChanged(ClusterChangedEvent event) {
|
||||||
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
|
|
||||||
// Wait until the gateway has recovered from disk.
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (event.localNodeMaster()) {
|
if (event.localNodeMaster()) {
|
||||||
MetaData metaData = event.state().metaData();
|
MetaData metaData = event.state().metaData();
|
||||||
installMlMetadata(metaData);
|
installMlMetadata(metaData);
|
||||||
|
@ -68,10 +61,6 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL
|
||||||
clusterService.submitStateUpdateTask("install-ml-metadata", new ClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("install-ml-metadata", new ClusterStateUpdateTask() {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||||
// If the metadata has been added already don't try to update
|
|
||||||
if (currentState.metaData().custom(MlMetadata.TYPE) != null) {
|
|
||||||
return currentState;
|
|
||||||
}
|
|
||||||
ClusterState.Builder builder = new ClusterState.Builder(currentState);
|
ClusterState.Builder builder = new ClusterState.Builder(currentState);
|
||||||
MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
|
MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
|
||||||
metadataBuilder.putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA);
|
metadataBuilder.putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA);
|
||||||
|
@ -81,13 +70,7 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(String source, Exception e) {
|
public void onFailure(String source, Exception e) {
|
||||||
installMlMetadataCheck.set(false);
|
|
||||||
logger.error("unable to install ml metadata upon startup", e);
|
logger.error("unable to install ml metadata upon startup", e);
|
||||||
// Don't retry if no longer master
|
|
||||||
if (e instanceof NotMasterException == false) {
|
|
||||||
logger.debug("Retry installing ML metadata");
|
|
||||||
installMlMetadata(metaData);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
|
@ -10,8 +10,6 @@ import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
import org.elasticsearch.cluster.ClusterName;
|
import org.elasticsearch.cluster.ClusterName;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
|
||||||
import org.elasticsearch.cluster.NotMasterException;
|
|
||||||
import org.elasticsearch.cluster.metadata.MetaData;
|
import org.elasticsearch.cluster.metadata.MetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
|
@ -21,21 +19,16 @@ import org.elasticsearch.common.transport.TransportAddress;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.mockito.Mockito;
|
|
||||||
|
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledFuture;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
|
|
||||||
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
|
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
|
||||||
import static org.elasticsearch.mock.orig.Mockito.times;
|
import static org.elasticsearch.mock.orig.Mockito.times;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyString;
|
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
@ -96,6 +89,7 @@ public class MlInitializationServiceTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testInitialize_alreadyInitialized() throws Exception {
|
public void testInitialize_alreadyInitialized() throws Exception {
|
||||||
|
ClusterService clusterService = mock(ClusterService.class);
|
||||||
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
|
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
|
||||||
|
|
||||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||||
|
@ -115,6 +109,7 @@ public class MlInitializationServiceTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testInitialize_onlyOnce() throws Exception {
|
public void testInitialize_onlyOnce() throws Exception {
|
||||||
|
ClusterService clusterService = mock(ClusterService.class);
|
||||||
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
|
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
|
||||||
|
|
||||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||||
|
@ -130,90 +125,6 @@ public class MlInitializationServiceTests extends ESTestCase {
|
||||||
verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any());
|
verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testInitialize_retiresMetaDataUpdateAfterFailure() throws Exception {
|
|
||||||
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
|
|
||||||
|
|
||||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
|
||||||
.nodes(DiscoveryNodes.builder()
|
|
||||||
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
|
|
||||||
.localNodeId("_node_id")
|
|
||||||
.masterNodeId("_node_id"))
|
|
||||||
.metaData(MetaData.builder())
|
|
||||||
.build();
|
|
||||||
|
|
||||||
AtomicBoolean onFailureCalled = new AtomicBoolean(false);
|
|
||||||
AtomicReference<ClusterState> clusterStateHolder = new AtomicReference<>();
|
|
||||||
Mockito.doAnswer(invocation -> {
|
|
||||||
// We expect this method to be called twice.
|
|
||||||
// The first time the onFailure method is invoked and the
|
|
||||||
// second is successful
|
|
||||||
ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1];
|
|
||||||
if (onFailureCalled.get()) {
|
|
||||||
clusterStateHolder.set(task.execute(cs));
|
|
||||||
} else {
|
|
||||||
onFailureCalled.set(true);
|
|
||||||
task.onFailure("mock a failure", new IllegalStateException());
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
|
|
||||||
|
|
||||||
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
|
|
||||||
|
|
||||||
assertTrue("Something went wrong mocking the cluster update task", onFailureCalled.get());
|
|
||||||
assertTrue("Something went wrong mocking the sucessful cluster update task", clusterStateHolder.get() != null);
|
|
||||||
verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
|
|
||||||
|
|
||||||
// 3rd update won't be called as ML Metadata has been installed
|
|
||||||
initializationService.clusterChanged(new ClusterChangedEvent("_source", clusterStateHolder.get(), clusterStateHolder.get()));
|
|
||||||
verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testInitialize_retiresStopsTryingIfNoLongerMaster() throws Exception {
|
|
||||||
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
|
|
||||||
|
|
||||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
|
||||||
.nodes(DiscoveryNodes.builder()
|
|
||||||
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
|
|
||||||
.localNodeId("_node_id")
|
|
||||||
.masterNodeId("_node_id"))
|
|
||||||
.metaData(MetaData.builder())
|
|
||||||
.build();
|
|
||||||
|
|
||||||
AtomicInteger tryCount = new AtomicInteger();
|
|
||||||
Mockito.doAnswer(invocation -> {
|
|
||||||
// Fail with a general exception first time then a NotMasterException
|
|
||||||
// The NotMasterException should stop the retrying
|
|
||||||
ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1];
|
|
||||||
if (tryCount.incrementAndGet() >= 2) {
|
|
||||||
task.onFailure("mock a not master failure", new NotMasterException("MLInitializationServiceTest"));
|
|
||||||
} else {
|
|
||||||
task.onFailure("mock a failure", new IllegalStateException());
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
|
|
||||||
|
|
||||||
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
|
|
||||||
|
|
||||||
assertTrue("Something went wrong mocking the cluster update task", tryCount.get() == 2);
|
|
||||||
// Should have been called exactly twice
|
|
||||||
verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
|
|
||||||
|
|
||||||
// Imagine the node has become master again, this time the update should work
|
|
||||||
AtomicReference<ClusterState> clusterStateHolder = new AtomicReference<>();
|
|
||||||
Mockito.doAnswer(invocation -> {
|
|
||||||
// We expect this method to be called twice.
|
|
||||||
// The first time the onFailure method is invoked and the
|
|
||||||
// second is successful
|
|
||||||
ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1];
|
|
||||||
clusterStateHolder.set(task.execute(cs));
|
|
||||||
return null;
|
|
||||||
}).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
|
|
||||||
|
|
||||||
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
|
|
||||||
assertTrue("Something went wrong mocking the sucessful cluster update task", clusterStateHolder.get() != null);
|
|
||||||
verify(clusterService, times(3)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception {
|
public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception {
|
||||||
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
|
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
|
||||||
MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class);
|
MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class);
|
||||||
|
|
Loading…
Reference in New Issue