diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java index f873d8699b9..bfe0cdef415 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportOpenJobAction.java @@ -89,7 +89,6 @@ public class TransportOpenJobAction extends TransportMasterNodeAction unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsWriteAlias, clusterState); - if (unavailableIndices.size() != 0) { - String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" + - String.join(",", unavailableIndices) + "]"; - logger.debug(reason); - return new PersistentTasksCustomMetaData.Assignment(null, reason); - } // Try to allocate jobs according to memory usage, but if that's not possible (maybe due to a mixed version cluster or maybe // because of some weird OS problem) then fall back to the old mechanism of only considering numbers of assigned jobs - boolean allocateByMemory = true; - - if (memoryTracker.isRecentlyRefreshed() == false) { - - boolean scheduledRefresh = memoryTracker.asyncRefresh(); - if (scheduledRefresh) { - String reason = "Not opening job [" + jobId + "] because job memory requirements are stale - refresh requested"; - logger.debug(reason); - return new PersistentTasksCustomMetaData.Assignment(null, reason); - } else { - allocateByMemory = false; - logger.warn("Falling back to allocating job [{}] by job counts because a memory requirement refresh could not be scheduled", - jobId); - } + boolean allocateByMemory = isMemoryTrackerRecentlyRefreshed; + if (isMemoryTrackerRecentlyRefreshed == false) { + logger.warn("Falling back to allocating job [{}] by job counts because a memory requirement refresh could not be scheduled", + jobId); } List reasons = new LinkedList<>(); @@ -592,12 +573,33 @@ public class TransportOpenJobAction extends TransportMasterNodeAction unavailableIndices = verifyIndicesPrimaryShardsAreActive(resultsWriteAlias, clusterState); + if (unavailableIndices.size() != 0) { + String reason = "Not opening job [" + jobId + "], because not all primary shards are active for the following indices [" + + String.join(",", unavailableIndices) + "]"; + logger.debug(reason); + return new PersistentTasksCustomMetaData.Assignment(null, reason); + } + + boolean isMemoryTrackerRecentlyRefreshed = memoryTracker.isRecentlyRefreshed(); + if (isMemoryTrackerRecentlyRefreshed == false) { + boolean scheduledRefresh = memoryTracker.asyncRefresh(); + if (scheduledRefresh) { + String reason = "Not opening job [" + jobId + "] because job memory requirements are stale - refresh requested"; + logger.debug(reason); + return new PersistentTasksCustomMetaData.Assignment(null, reason); + } + } + + PersistentTasksCustomMetaData.Assignment assignment = selectLeastLoadedMlNode(jobId, params.getJob(), clusterState, maxConcurrentJobAllocations, maxMachineMemoryPercent, memoryTracker, + isMemoryTrackerRecentlyRefreshed, logger); if (assignment.getExecutorNode() == null) { int numMlNodes = 0; diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java index 9b7673338f6..e489a6a9a7c 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportOpenJobActionTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -60,11 +61,9 @@ import org.junit.Before; import java.net.InetAddress; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.SortedMap; @@ -79,11 +78,13 @@ import static org.mockito.Mockito.when; public class TransportOpenJobActionTests extends ESTestCase { private MlMemoryTracker memoryTracker; + private boolean isMemoryTrackerRecentlyRefreshed; @Before public void setup() { memoryTracker = mock(MlMemoryTracker.class); - when(memoryTracker.isRecentlyRefreshed()).thenReturn(true); + isMemoryTrackerRecentlyRefreshed = true; + when(memoryTracker.isRecentlyRefreshed()).thenReturn(isMemoryTrackerRecentlyRefreshed); } public void testValidate_jobMissing() { @@ -141,7 +142,7 @@ public class TransportOpenJobActionTests extends ESTestCase { jobBuilder.setJobVersion(Version.CURRENT); Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id4", jobBuilder.build(), - cs.build(), 2, 30, memoryTracker, logger); + cs.build(), 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger); assertEquals("", result.getExplanation()); assertEquals("_node_id3", result.getExecutorNode()); } @@ -178,7 +179,7 @@ public class TransportOpenJobActionTests extends ESTestCase { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id0", new ByteSizeValue(150, ByteSizeUnit.MB)).build(new Date()); Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id0", job, cs.build(), 2, - 30, memoryTracker, logger); + 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger); assertNull(result.getExecutorNode()); assertTrue(result.getExplanation().contains("because this node is full. Number of opened jobs [" + maxRunningJobsPerNode + "], xpack.ml.max_open_jobs [" + maxRunningJobsPerNode + "]")); @@ -204,7 +205,8 @@ public class TransportOpenJobActionTests extends ESTestCase { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id2", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 30, memoryTracker, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id2", job, cs.build(), 2, 30, memoryTracker, + isMemoryTrackerRecentlyRefreshed, logger); assertTrue(result.getExplanation().contains("because this node isn't a ml node")); assertNull(result.getExecutorNode()); } @@ -239,7 +241,8 @@ public class TransportOpenJobActionTests extends ESTestCase { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id6", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); ClusterState cs = csBuilder.build(); - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 30, memoryTracker, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id6", job, cs, 2, 30, memoryTracker, + isMemoryTrackerRecentlyRefreshed, logger); assertEquals("_node_id3", result.getExecutorNode()); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); @@ -249,7 +252,8 @@ public class TransportOpenJobActionTests extends ESTestCase { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, + logger); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -260,7 +264,8 @@ public class TransportOpenJobActionTests extends ESTestCase { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, + logger); assertNull("no node selected, because stale task", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); @@ -271,7 +276,8 @@ public class TransportOpenJobActionTests extends ESTestCase { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, + logger); assertNull("no node selected, because null state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -310,7 +316,8 @@ public class TransportOpenJobActionTests extends ESTestCase { Job job = BaseMlIntegTestCase.createFareQuoteJob("job_id7", new ByteSizeValue(2, ByteSizeUnit.MB)).build(new Date()); // Allocation won't be possible if the stale failed job is treated as opening - Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, logger); + Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id7", job, cs, 2, 30, memoryTracker, + isMemoryTrackerRecentlyRefreshed, logger); assertEquals("_node_id1", result.getExecutorNode()); tasksBuilder = PersistentTasksCustomMetaData.builder(tasks); @@ -320,7 +327,8 @@ public class TransportOpenJobActionTests extends ESTestCase { csBuilder = ClusterState.builder(cs); csBuilder.metaData(MetaData.builder(cs.metaData()).putCustom(PersistentTasksCustomMetaData.TYPE, tasks)); cs = csBuilder.build(); - result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 30, memoryTracker, logger); + result = TransportOpenJobAction.selectLeastLoadedMlNode("job_id8", job, cs, 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, + logger); assertNull("no node selected, because OPENING state", result.getExecutorNode()); assertTrue(result.getExplanation().contains("because node exceeds [2] the maximum number of jobs [2] in opening state")); } @@ -353,7 +361,7 @@ public class TransportOpenJobActionTests extends ESTestCase { metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("incompatible_type_job", job, cs.build(), 2, 30, - memoryTracker, logger); + memoryTracker, isMemoryTrackerRecentlyRefreshed, logger); assertThat(result.getExplanation(), containsString("because this node does not support jobs of type [incompatible_type]")); assertNull(result.getExecutorNode()); } @@ -384,7 +392,7 @@ public class TransportOpenJobActionTests extends ESTestCase { metaData.putCustom(PersistentTasksCustomMetaData.TYPE, tasks); cs.metaData(metaData); Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_incompatible_model_snapshot", job, cs.build(), - 2, 30, memoryTracker, logger); + 2, 30, memoryTracker, isMemoryTrackerRecentlyRefreshed, logger); assertThat(result.getExplanation(), containsString( "because the job's model snapshot requires a node of version [6.3.0] or higher")); assertNull(result.getExecutorNode()); @@ -413,7 +421,7 @@ public class TransportOpenJobActionTests extends ESTestCase { Job job = jobWithRules("job_with_rules"); Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 30, memoryTracker, - logger); + isMemoryTrackerRecentlyRefreshed, logger); assertThat(result.getExplanation(), containsString( "because jobs using custom_rules require a node of version [6.4.0] or higher")); assertNull(result.getExecutorNode()); @@ -442,7 +450,7 @@ public class TransportOpenJobActionTests extends ESTestCase { Job job = jobWithRules("job_with_rules"); Assignment result = TransportOpenJobAction.selectLeastLoadedMlNode("job_with_rules", job, cs.build(), 2, 30, memoryTracker, - logger); + isMemoryTrackerRecentlyRefreshed, logger); assertNotNull(result.getExecutorNode()); } @@ -529,10 +537,10 @@ public class TransportOpenJobActionTests extends ESTestCase { public void testGetAssignment_GivenJobThatRequiresMigration() { ClusterService clusterService = mock(ClusterService.class); - ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, new HashSet<>( - Arrays.asList(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT, - MachineLearning.MAX_LAZY_ML_NODES) - )); + ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, + Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT, + MachineLearning.MAX_LAZY_ML_NODES) + ); when(clusterService.getClusterSettings()).thenReturn(clusterSettings); TransportOpenJobAction.OpenJobPersistentTasksExecutor executor = new TransportOpenJobAction.OpenJobPersistentTasksExecutor( @@ -542,6 +550,34 @@ public class TransportOpenJobActionTests extends ESTestCase { assertEquals(TransportOpenJobAction.AWAITING_MIGRATION, executor.getAssignment(params, mock(ClusterState.class))); } + // An index being unavailable should take precedence over waiting for a lazy node + public void testGetAssignment_GivenUnavailableIndicesWithLazyNode() { + Settings settings = Settings.builder().put(MachineLearning.MAX_LAZY_ML_NODES.getKey(), 1).build(); + ClusterService clusterService = mock(ClusterService.class); + ClusterSettings clusterSettings = new ClusterSettings(settings, + Sets.newHashSet(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, MachineLearning.MAX_MACHINE_MEMORY_PERCENT, + MachineLearning.MAX_LAZY_ML_NODES) + ); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + + ClusterState.Builder csBuilder = ClusterState.builder(new ClusterName("_name")); + MetaData.Builder metaData = MetaData.builder(); + RoutingTable.Builder routingTable = RoutingTable.builder(); + addIndices(metaData, routingTable); + routingTable.remove(".ml-state"); + csBuilder.metaData(metaData); + csBuilder.routingTable(routingTable.build()); + + TransportOpenJobAction.OpenJobPersistentTasksExecutor executor = new TransportOpenJobAction.OpenJobPersistentTasksExecutor( + settings, clusterService, mock(AutodetectProcessManager.class), mock(MlMemoryTracker.class), mock(Client.class)); + + OpenJobAction.JobParams params = new OpenJobAction.JobParams("unavailable_index_with_lazy_node"); + params.setJob(mock(Job.class)); + assertEquals("Not opening job [unavailable_index_with_lazy_node], " + + "because not all primary shards are active for the following indices [.ml-state]", + executor.getAssignment(params, csBuilder.build()).getExplanation()); + } + public static void addJobTask(String jobId, String nodeId, JobState jobState, PersistentTasksCustomMetaData.Builder builder) { addJobTask(jobId, nodeId, jobState, builder, false); }