diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 281a374b97b..67acc1d0d67 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -31,6 +31,8 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.settings.SettingsModule; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -188,6 +190,7 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.process.MlMemoryTracker; import org.elasticsearch.xpack.ml.process.NativeController; import org.elasticsearch.xpack.ml.process.NativeControllerHolder; +import org.elasticsearch.xpack.ml.process.NativeStorageProvider; import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction; import org.elasticsearch.xpack.ml.rest.RestFindFileStructureAction; import org.elasticsearch.xpack.ml.rest.RestMlInfoAction; @@ -293,6 +296,10 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu public static final Setting MAX_OPEN_JOBS_PER_NODE = Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, MAX_MAX_OPEN_JOBS_PER_NODE, Property.Dynamic, Property.NodeScope); + // Undocumented setting for integration test purposes + public static final Setting MIN_DISK_SPACE_OFF_HEAP = + Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Setting.Property.NodeScope); + private static final Logger logger = LogManager.getLogger(XPackPlugin.class); private final Settings settings; @@ -333,7 +340,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING, AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC, MAX_OPEN_JOBS_PER_NODE, - AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP, + MIN_DISK_SPACE_OFF_HEAP, MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION)); } @@ -424,6 +431,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client); JobResultsPersister jobResultsPersister = new JobResultsPersister(client); + NativeStorageProvider nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings)); + AutodetectProcessFactory autodetectProcessFactory; NormalizerProcessFactory normalizerProcessFactory; if (MachineLearningField.AUTODETECT_PROCESS.get(settings) && MachineLearningFeatureSet.isRunningOnMlPlatform(true)) { @@ -454,8 +463,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME)); AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(env, settings, client, threadPool, - jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, - normalizerFactory, xContentRegistry, auditor, clusterService); + xContentRegistry, auditor, clusterService, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, + autodetectProcessFactory, normalizerFactory, nativeStorageProvider); this.autodetectProcessManager.set(autodetectProcessManager); DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry, auditor, System::currentTimeMillis); @@ -472,8 +481,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu new InvalidLicenseEnforcer(getLicenseState(), threadPool, datafeedManager, autodetectProcessManager); enforcer.listenForLicenseStateChanges(); - // run node startup tasks - autodetectProcessManager.onNodeStartup(); + // Perform node startup operations + nativeStorageProvider.cleanupLocalTmpStorageInCaseOfUncleanShutdown(); return Arrays.asList( mlLifeCycleService, @@ -488,7 +497,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu datafeedManager, auditor, new MlAssignmentNotifier(settings, auditor, threadPool, client, clusterService), - memoryTracker + memoryTracker, + nativeStorageProvider ); } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java index ac3d41f26e0..14e434099f8 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; +import org.elasticsearch.xpack.ml.process.NativeStorageProvider; import java.nio.file.Path; import java.util.List; @@ -38,16 +39,19 @@ public class TransportForecastJobAction extends TransportJobTaskAction MIN_DISK_SPACE_OFF_HEAP = - Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Property.NodeScope); - private static final Logger logger = LogManager.getLogger(AutodetectProcessManager.class); private final Client client; @@ -111,9 +104,6 @@ public class AutodetectProcessManager implements ClusterStateListener { private NativeStorageProvider nativeStorageProvider; private final ConcurrentMap processByAllocation = new ConcurrentHashMap<>(); - // a map that manages the allocation of temporary space to jobs - private final ConcurrentMap nativeTmpStorage = new ConcurrentHashMap<>(); - private volatile int maxAllowedRunningJobs; private final NamedXContentRegistry xContentRegistry; @@ -123,10 +113,10 @@ public class AutodetectProcessManager implements ClusterStateListener { private volatile boolean upgradeInProgress; public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool, + NamedXContentRegistry xContentRegistry, Auditor auditor, ClusterService clusterService, JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister, - JobDataCountsPersister jobDataCountsPersister, - AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory, - NamedXContentRegistry xContentRegistry, Auditor auditor, ClusterService clusterService) { + JobDataCountsPersister jobDataCountsPersister, AutodetectProcessFactory autodetectProcessFactory, + NormalizerFactory normalizerFactory, NativeStorageProvider nativeStorageProvider) { this.environment = environment; this.client = client; this.threadPool = threadPool; @@ -139,7 +129,7 @@ public class AutodetectProcessManager implements ClusterStateListener { this.jobResultsPersister = jobResultsPersister; this.jobDataCountsPersister = jobDataCountsPersister; this.auditor = auditor; - this.nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings)); + this.nativeStorageProvider = Objects.requireNonNull(nativeStorageProvider); clusterService.addListener(this); clusterService.getClusterSettings() .addSettingsUpdateConsumer(MachineLearning.MAX_OPEN_JOBS_PER_NODE, this::setMaxAllowedRunningJobs); @@ -149,14 +139,6 @@ public class AutodetectProcessManager implements ClusterStateListener { this.maxAllowedRunningJobs = maxAllowedRunningJobs; } - public void onNodeStartup() { - try { - nativeStorageProvider.cleanupLocalTmpStorageInCaseOfUncleanShutdown(); - } catch (Exception e) { - logger.warn("Failed to cleanup native storage from previous invocation", e); - } - } - public synchronized void closeAllJobsOnThisNode(String reason) { int numJobs = processByAllocation.size(); if (numJobs != 0) { @@ -283,28 +265,6 @@ public class AutodetectProcessManager implements ClusterStateListener { }); } - /** - * Request temporary storage to be used for the job - * - * @param jobTask The job task - * @param requestedSize requested size - * @return a Path to local storage or null if storage is not available - */ - public Path tryGetTmpStorage(JobTask jobTask, ByteSizeValue requestedSize) { - String jobId = jobTask.getJobId(); - Path path = nativeTmpStorage.get(jobId); - if (path == null) { - path = nativeStorageProvider.tryGetLocalTmpStorage(jobId, requestedSize); - if (path != null) { - nativeTmpStorage.put(jobId, path); - } - } else if (!nativeStorageProvider.localTmpStorageHasEnoughSpace(path, requestedSize)) { - // the previous tmp location ran out of disk space, do not allow further usage - return null; - } - return path; - } - /** * Do a forecast for the running job. * @@ -602,7 +562,7 @@ public class AutodetectProcessManager implements ClusterStateListener { } setJobState(jobTask, JobState.FAILED, reason); try { - removeTmpStorage(jobTask.getJobId()); + nativeStorageProvider.cleanupLocalTmpStorage(jobTask.getDescription()); } catch (IOException e) { logger.error(new ParameterizedMessage("[{}] Failed to delete temporary files", jobTask.getJobId()), e); } @@ -666,7 +626,7 @@ public class AutodetectProcessManager implements ClusterStateListener { } // delete any tmp storage try { - removeTmpStorage(jobId); + nativeStorageProvider.cleanupLocalTmpStorage(jobTask.getDescription()); } catch (IOException e) { logger.error(new ParameterizedMessage("[{}]Failed to delete temporary files", jobId), e); } @@ -760,13 +720,6 @@ public class AutodetectProcessManager implements ClusterStateListener { return Optional.of(new Tuple<>(communicator.getDataCounts(), communicator.getModelSizeStats())); } - private void removeTmpStorage(String jobId) throws IOException { - Path path = nativeTmpStorage.get(jobId); - if (path != null) { - nativeStorageProvider.cleanupLocalTmpStorage(path); - } - } - ExecutorService createAutodetectExecutorService(ExecutorService executorService) { AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext()); executorService.submit(autoDetectWorkerExecutor::start); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeStorageProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeStorageProvider.java index 9f366ab1131..93432af962f 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeStorageProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/process/NativeStorageProvider.java @@ -15,6 +15,8 @@ import org.elasticsearch.env.Environment; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Provide storage for native components. @@ -23,7 +25,6 @@ public class NativeStorageProvider { private static final Logger LOGGER = LogManager.getLogger(NativeStorageProvider.class); - private static final String LOCAL_STORAGE_SUBFOLDER = "ml-local-data"; private static final String LOCAL_STORAGE_TMP_FOLDER = "tmp"; @@ -32,6 +33,9 @@ public class NativeStorageProvider { // do not allow any usage below this threshold private final ByteSizeValue minLocalStorageAvailable; + // A map to keep track of allocated native storage by resource id + private final ConcurrentMap allocatedStorage = new ConcurrentHashMap<>(); + public NativeStorageProvider(Environment environment, ByteSizeValue minDiskSpaceOffHeap) { this.environment = environment; this.minLocalStorageAvailable = minDiskSpaceOffHeap; @@ -44,12 +48,14 @@ public class NativeStorageProvider { * unclean node shutdown or broken clients. * * Do not call while there are running jobs. - * - * @throws IOException if cleanup fails */ - public void cleanupLocalTmpStorageInCaseOfUncleanShutdown() throws IOException { - for (Path p : environment.dataFiles()) { - IOUtils.rm(p.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER)); + public void cleanupLocalTmpStorageInCaseOfUncleanShutdown() { + try { + for (Path p : environment.dataFiles()) { + IOUtils.rm(p.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER)); + } + } catch (Exception e) { + LOGGER.warn("Failed to cleanup native storage from previous invocation", e); } } @@ -61,17 +67,28 @@ public class NativeStorageProvider { * @return Path for temporary storage if available, null otherwise */ public Path tryGetLocalTmpStorage(String uniqueIdentifier, ByteSizeValue requestedSize) { + Path path = allocatedStorage.get(uniqueIdentifier); + if (path != null && localTmpStorageHasEnoughSpace(path, requestedSize) == false) { + LOGGER.debug("Previous tmp storage for [{}] run out, returning null", uniqueIdentifier); + return null; + } else { + path = tryAllocateStorage(uniqueIdentifier, requestedSize); + } + return path; + } + + private Path tryAllocateStorage(String uniqueIdentifier, ByteSizeValue requestedSize) { for (Path path : environment.dataFiles()) { try { if (getUsableSpace(path) >= requestedSize.getBytes() + minLocalStorageAvailable.getBytes()) { Path tmpDirectory = path.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER).resolve(uniqueIdentifier); Files.createDirectories(tmpDirectory); + allocatedStorage.put(uniqueIdentifier, tmpDirectory); return tmpDirectory; } } catch (IOException e) { LOGGER.debug("Failed to obtain information about path [{}]: {}", path, e); } - } LOGGER.debug("Failed to find native storage for [{}], returning null", uniqueIdentifier); return null; @@ -96,17 +113,18 @@ public class NativeStorageProvider { /** * Delete temporary storage, previously allocated * - * @param path - * Path to temporary storage - * @throws IOException - * if path can not be cleaned up + * @param uniqueIdentifier the identifier to which storage was allocated + * @throws IOException if path can not be cleaned up */ - public void cleanupLocalTmpStorage(Path path) throws IOException { - // do not allow to breakout from the tmp storage provided - Path realPath = path.toAbsolutePath(); - for (Path p : environment.dataFiles()) { - if (realPath.startsWith(p.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER))) { - IOUtils.rm(path); + public void cleanupLocalTmpStorage(String uniqueIdentifier) throws IOException { + Path path = allocatedStorage.remove(uniqueIdentifier); + if (path != null) { + // do not allow to breakout from the tmp storage provided + Path realPath = path.toAbsolutePath(); + for (Path p : environment.dataFiles()) { + if (realPath.startsWith(p.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER))) { + IOUtils.rm(path); + } } } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index 9a147dfd1bc..5f1ce1ddf7d 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -54,6 +54,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange; import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory; import org.elasticsearch.xpack.ml.notifications.Auditor; +import org.elasticsearch.xpack.ml.process.NativeStorageProvider; import org.junit.Before; import org.mockito.ArgumentCaptor; @@ -121,6 +122,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { private Auditor auditor; private ClusterState clusterState; private ClusterService clusterService; + private NativeStorageProvider nativeStorageProvider; private DataCounts dataCounts = new DataCounts("foo"); private ModelSizeStats modelSizeStats = new ModelSizeStats.Builder("foo").build(); @@ -159,6 +161,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { clusterState = mock(ClusterState.class); when(clusterState.getMetaData()).thenReturn(metaData); when(clusterState.metaData()).thenReturn(metaData); + nativeStorageProvider = mock(NativeStorageProvider.class); doAnswer(invocationOnMock -> { @SuppressWarnings("unchecked") @@ -685,9 +688,8 @@ public class AutodetectProcessManagerTests extends ESTestCase { private AutodetectProcessManager createManager(Settings settings) { return new AutodetectProcessManager(environment, settings, - client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, - autodetectFactory, normalizerFactory, - new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService); + client, threadPool, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService, jobManager, jobResultsProvider, + jobResultsPersister, jobDataCountsPersister, autodetectFactory, normalizerFactory, nativeStorageProvider); } private AutodetectProcessManager createSpyManagerAndCallProcessData(String jobId) { AutodetectProcessManager manager = createSpyManager(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeStorageProviderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeStorageProviderTests.java index fd87e29387e..4a2b7efc1b5 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeStorageProviderTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/process/NativeStorageProviderTests.java @@ -89,7 +89,7 @@ public class NativeStorageProviderTests extends ESTestCase { Assert.assertTrue(Files.isRegularFile(testFile)); // the native component should cleanup itself, but assume it has crashed - storageProvider.cleanupLocalTmpStorage(path); + storageProvider.cleanupLocalTmpStorage(id); Assert.assertFalse(Files.exists(testFile)); Assert.assertFalse(Files.exists(path)); }