[ML] Fix bug initialising ML MetaData (elastic/x-pack-elasticsearch#1386)
Slightly different to the commit I just reverted (elastic/x-pack-elasticsearch#1352) Original commit: elastic/x-pack-elasticsearch@46339418ae
This commit is contained in:
parent
7a32304f2c
commit
fdf86967ce
|
@ -15,6 +15,7 @@ import org.elasticsearch.cluster.service.ClusterService;
|
|||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.component.LifecycleListener;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.gateway.GatewayService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -45,6 +46,11 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL
|
|||
|
||||
@Override
|
||||
public void clusterChanged(ClusterChangedEvent event) {
|
||||
if (event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
|
||||
// Wait until the gateway has recovered from disk.
|
||||
return;
|
||||
}
|
||||
|
||||
if (event.localNodeMaster()) {
|
||||
MetaData metaData = event.state().metaData();
|
||||
installMlMetadata(metaData);
|
||||
|
@ -61,6 +67,10 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL
|
|||
clusterService.submitStateUpdateTask("install-ml-metadata", new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
// If the metadata has been added already don't try to update
|
||||
if (currentState.metaData().custom(MlMetadata.TYPE) != null) {
|
||||
return currentState;
|
||||
}
|
||||
ClusterState.Builder builder = new ClusterState.Builder(currentState);
|
||||
MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
|
||||
metadataBuilder.putCustom(MlMetadata.TYPE, MlMetadata.EMPTY_METADATA);
|
||||
|
@ -70,7 +80,8 @@ class MlInitializationService extends AbstractComponent implements ClusterStateL
|
|||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
logger.error("unable to install ml metadata upon startup", e);
|
||||
installMlMetadataCheck.set(false);
|
||||
logger.error("unable to install ml metadata", e);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
|
@ -19,10 +20,13 @@ import org.elasticsearch.common.transport.TransportAddress;
|
|||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.junit.Before;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.net.InetAddress;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.elasticsearch.mock.orig.Mockito.doAnswer;
|
||||
import static org.elasticsearch.mock.orig.Mockito.times;
|
||||
|
@ -125,6 +129,46 @@ public class MlInitializationServiceTests extends ESTestCase {
|
|||
verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any());
|
||||
}
|
||||
|
||||
public void testInitialize_reintialiseAfterFailure() throws Exception {
|
||||
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
|
||||
|
||||
// Fail the first cluster state update
|
||||
AtomicBoolean onFailureCalled = new AtomicBoolean(false);
|
||||
Mockito.doAnswer(invocation -> {
|
||||
ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1];
|
||||
task.onFailure("mock a failure", new IllegalStateException());
|
||||
onFailureCalled.set(true);
|
||||
return null;
|
||||
}).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
|
||||
|
||||
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
|
||||
.nodes(DiscoveryNodes.builder()
|
||||
.add(new DiscoveryNode("_node_id", new LocalTransportAddress("_id"), Version.CURRENT))
|
||||
.localNodeId("_node_id")
|
||||
.masterNodeId("_node_id"))
|
||||
.metaData(MetaData.builder())
|
||||
.build();
|
||||
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
|
||||
assertTrue("Something went wrong mocking the cluster update task", onFailureCalled.get());
|
||||
verify(clusterService, times(1)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
|
||||
|
||||
// 2nd update succeeds
|
||||
AtomicReference<ClusterState> clusterStateHolder = new AtomicReference<>();
|
||||
Mockito.doAnswer(invocation -> {
|
||||
ClusterStateUpdateTask task = (ClusterStateUpdateTask) invocation.getArguments()[1];
|
||||
clusterStateHolder.set(task.execute(cs));
|
||||
return null;
|
||||
}).when(clusterService).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
|
||||
|
||||
initializationService.clusterChanged(new ClusterChangedEvent("_source", cs, cs));
|
||||
assertTrue("Something went wrong mocking the sucessful cluster update task", clusterStateHolder.get() != null);
|
||||
verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
|
||||
|
||||
// 3rd update won't be called as ML Metadata has been installed
|
||||
initializationService.clusterChanged(new ClusterChangedEvent("_source", clusterStateHolder.get(), clusterStateHolder.get()));
|
||||
verify(clusterService, times(2)).submitStateUpdateTask(eq("install-ml-metadata"), any(ClusterStateUpdateTask.class));
|
||||
}
|
||||
|
||||
public void testNodeGoesFromMasterToNonMasterAndBack() throws Exception {
|
||||
MlInitializationService initializationService = new MlInitializationService(Settings.EMPTY, threadPool, clusterService, client);
|
||||
MlDailyMaintenanceService initialDailyMaintenanceService = mock(MlDailyMaintenanceService.class);
|
||||
|
|
Loading…
Reference in New Issue