diff --git a/x-pack/docs/en/ml/forecasting.asciidoc b/x-pack/docs/en/ml/forecasting.asciidoc index 95693a1677f..cd01aa0fb77 100644 --- a/x-pack/docs/en/ml/forecasting.asciidoc +++ b/x-pack/docs/en/ml/forecasting.asciidoc @@ -59,10 +59,7 @@ For more information about any of these functions, see <>. * Forecasts run concurrently with real-time {ml} analysis. That is to say, {ml} analysis does not stop while forecasts are generated. Forecasts can have an impact on {ml} jobs, however, especially in terms of memory usage. For this -reason, forecasts run only if the model memory status is acceptable and the -snapshot models for the forecast do not require more than 20 MB. If these memory -limits are reached, consider splitting the job into multiple smaller jobs and -creating forecasts for these. +reason, forecasts run only if the model memory status is acceptable. * The job must be open when you create a forecast. Otherwise, an error occurs. * If there is insufficient data to generate any meaningful predictions, an error occurs. In general, forecasts that are created early in the learning phase diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index c5402152fea..bdefabdb294 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -286,7 +286,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu DataCountsReporter.ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING, DataCountsReporter.ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING, AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE, - AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE)); + AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE, + AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP)); } public Settings additionalSettings() { @@ -403,6 +404,9 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu // This object's constructor attaches to the license state, so there's no need to retain another reference to it new InvalidLicenseEnforcer(settings, getLicenseState(), threadPool, datafeedManager, autodetectProcessManager); + // run node startup tasks + autodetectProcessManager.onNodeStartup(); + return Arrays.asList( mlLifeCycleService, jobProvider, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java index 3b09377b477..aaa59e7e8ca 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportForecastJobAction.java @@ -15,6 +15,8 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; @@ -28,6 +30,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManage import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams; import java.io.IOException; +import java.nio.file.Path; import java.util.List; import java.util.function.Consumer; @@ -36,6 +39,8 @@ import static org.elasticsearch.xpack.core.ml.action.ForecastJobAction.Request.D public class TransportForecastJobAction extends TransportJobTaskAction { + private static final ByteSizeValue FORECAST_LOCAL_STORAGE_LIMIT = new ByteSizeValue(500, ByteSizeUnit.MB); + private final JobProvider jobProvider; @Inject public TransportForecastJobAction(Settings settings, TransportService transportService, ThreadPool threadPool, @@ -73,6 +78,13 @@ public class TransportForecastJobAction extends TransportJobTaskAction { if (e == null) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeStorageProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeStorageProvider.java new file mode 100644 index 00000000000..8a0268a8d07 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/NativeStorageProvider.java @@ -0,0 +1,123 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.job.process; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.env.Environment; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +/** + * Provide storage for native components. + */ +public class NativeStorageProvider { + + private static final Logger LOGGER = Loggers.getLogger(NativeStorageProvider.class); + + + private static final String LOCAL_STORAGE_SUBFOLDER = "ml-local-data"; + private static final String LOCAL_STORAGE_TMP_FOLDER = "tmp"; + + private final Environment environment; + + // do not allow any usage below this threshold + private final ByteSizeValue minLocalStorageAvailable; + + public NativeStorageProvider(Environment environment, ByteSizeValue minDiskSpaceOffHeap) { + this.environment = environment; + this.minLocalStorageAvailable = minDiskSpaceOffHeap; + } + + /** + * Removes any temporary storage leftovers. + * + * Removes all temp files and folder which might be there as a result of an + * unclean node shutdown or broken clients. + * + * Do not call while there are running jobs. + * + * @throws IOException if cleanup fails + */ + public void cleanupLocalTmpStorageInCaseOfUncleanShutdown() throws IOException { + for (Path p : environment.dataFiles()) { + IOUtils.rm(p.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER)); + } + } + + /** + * Tries to find local storage for storing temporary data. + * + * @param uniqueIdentifier An identifier to be used as sub folder + * @param requestedSize The maximum size required + * @return Path for temporary storage if available, null otherwise + */ + public Path tryGetLocalTmpStorage(String uniqueIdentifier, ByteSizeValue requestedSize) { + for (Path path : environment.dataFiles()) { + try { + if (getUsableSpace(path) >= requestedSize.getBytes() + minLocalStorageAvailable.getBytes()) { + Path tmpDirectory = path.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER).resolve(uniqueIdentifier); + Files.createDirectories(tmpDirectory); + return tmpDirectory; + } + } catch (IOException e) { + LOGGER.debug("Failed to obtain information about path [{}]: {}", path, e); + } + + } + LOGGER.debug("Failed to find native storage for [{}], returning null", uniqueIdentifier); + return null; + } + + public boolean localTmpStorageHasEnoughSpace(Path path, ByteSizeValue requestedSize) { + Path realPath = path.toAbsolutePath(); + for (Path p : environment.dataFiles()) { + try { + if (realPath.startsWith(p.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER))) { + return getUsableSpace(p) >= requestedSize.getBytes() + minLocalStorageAvailable.getBytes(); + } + } catch (IOException e) { + LOGGER.debug("Failed to optain information about path [{}]: {}", path, e); + } + } + + LOGGER.debug("Not enough space left for path [{}]", path); + return false; + } + + /** + * Delete temporary storage, previously allocated + * + * @param path + * Path to temporary storage + * @throws IOException + * if path can not be cleaned up + */ + public void cleanupLocalTmpStorage(Path path) throws IOException { + // do not allow to breakout from the tmp storage provided + Path realPath = path.toAbsolutePath(); + for (Path p : environment.dataFiles()) { + if (realPath.startsWith(p.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER))) { + IOUtils.rm(path); + } + } + } + + long getUsableSpace(Path path) throws IOException { + long freeSpaceInBytes = Environment.getFileStore(path).getUsableSpace(); + + /* See: https://bugs.openjdk.java.net/browse/JDK-8162520 */ + if (freeSpaceInBytes < 0) { + freeSpaceInBytes = Long.MAX_VALUE; + } + return freeSpaceInBytes; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index cca591682d8..d3a848ef382 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.ml.job.process.autodetect; import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; import org.elasticsearch.core.internal.io.IOUtils; +import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; @@ -15,11 +16,12 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.Environment; import org.elasticsearch.index.analysis.AnalysisRegistry; @@ -47,6 +49,7 @@ import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersiste import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister; import org.elasticsearch.xpack.ml.job.persistence.StateStreamer; import org.elasticsearch.xpack.ml.job.process.DataCountsReporter; +import org.elasticsearch.xpack.ml.job.process.NativeStorageProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor; import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams; import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams; @@ -59,6 +62,7 @@ import org.elasticsearch.xpack.ml.notifications.Auditor; import java.io.IOException; import java.io.InputStream; +import java.nio.file.Path; import java.time.Duration; import java.time.ZonedDateTime; import java.util.Date; @@ -96,6 +100,10 @@ public class AutodetectProcessManager extends AbstractComponent { public static final Setting MAX_OPEN_JOBS_PER_NODE = Setting.intSetting("xpack.ml.max_open_jobs", MAX_RUNNING_JOBS_PER_NODE, 1, Property.NodeScope); + // Undocumented setting for integration test purposes + public static final Setting MIN_DISK_SPACE_OFF_HEAP = + Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Property.NodeScope); + private final Client client; private final Environment environment; private final ThreadPool threadPool; @@ -107,8 +115,12 @@ public class AutodetectProcessManager extends AbstractComponent { private final JobResultsPersister jobResultsPersister; private final JobDataCountsPersister jobDataCountsPersister; + private NativeStorageProvider nativeStorageProvider; private final ConcurrentMap processByAllocation = new ConcurrentHashMap<>(); + // a map that manages the allocation of temporary space to jobs + private final ConcurrentMap nativeTmpStorage = new ConcurrentHashMap<>(); + private final int maxAllowedRunningJobs; private final NamedXContentRegistry xContentRegistry; @@ -133,6 +145,15 @@ public class AutodetectProcessManager extends AbstractComponent { this.jobResultsPersister = jobResultsPersister; this.jobDataCountsPersister = jobDataCountsPersister; this.auditor = auditor; + this.nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings)); + } + + public void onNodeStartup() { + try { + nativeStorageProvider.cleanupLocalTmpStorageInCaseOfUncleanShutdown(); + } catch (Exception e) { + logger.warn("Failed to cleanup native storage from previous invocation", e); + } } public synchronized void closeAllJobsOnThisNode(String reason) throws IOException { @@ -251,6 +272,28 @@ public class AutodetectProcessManager extends AbstractComponent { }); } + /** + * Request temporary storage to be used for the job + * + * @param jobTask The job task + * @param requestedSize requested size + * @return a Path to local storage or null if storage is not available + */ + public Path tryGetTmpStorage(JobTask jobTask, ByteSizeValue requestedSize) { + String jobId = jobTask.getJobId(); + Path path = nativeTmpStorage.get(jobId); + if (path == null) { + path = nativeStorageProvider.tryGetLocalTmpStorage(jobId, requestedSize); + if (path != null) { + nativeTmpStorage.put(jobId, path); + } + } else if (!nativeStorageProvider.localTmpStorageHasEnoughSpace(path, requestedSize)) { + // the previous tmp location ran out of disk space, do not allow further usage + return null; + } + return path; + } + /** * Do a forecast for the running job. * @@ -258,10 +301,11 @@ public class AutodetectProcessManager extends AbstractComponent { * @param params Forecast parameters */ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer handler) { - logger.debug("Forecasting job {}", jobTask.getJobId()); + String jobId = jobTask.getJobId(); + logger.debug("Forecasting job {}", jobId); AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask); if (communicator == null) { - String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobTask.getJobId()); + String message = String.format(Locale.ROOT, "Cannot forecast because job [%s] is not open", jobId); logger.debug(message); handler.accept(ExceptionsHelper.conflictStatusException(message)); return; @@ -271,7 +315,7 @@ public class AutodetectProcessManager extends AbstractComponent { if (e == null) { handler.accept(null); } else { - String msg = String.format(Locale.ROOT, "[%s] exception while forecasting job", jobTask.getJobId()); + String msg = String.format(Locale.ROOT, "[%s] exception while forecasting job", jobId); logger.error(msg, e); handler.accept(ExceptionsHelper.serverError(msg, e)); } @@ -477,6 +521,11 @@ public class AutodetectProcessManager extends AbstractComponent { } } setJobState(jobTask, JobState.FAILED); + try { + removeTmpStorage(jobTask.getJobId()); + } catch (IOException e) { + logger.error(new ParameterizedMessage("[{}] Failed to delete temporary files", jobTask.getJobId()), e); + } }; } @@ -535,6 +584,12 @@ public class AutodetectProcessManager extends AbstractComponent { // thread that gets into this method blocks until the first thread has finished closing the job processContext.unlock(); } + // delete any tmp storage + try { + removeTmpStorage(jobId); + } catch (IOException e) { + logger.error(new ParameterizedMessage("[{}]Failed to delete temporary files", jobId), e); + } } int numberOfOpenJobs() { @@ -613,6 +668,13 @@ public class AutodetectProcessManager extends AbstractComponent { return Optional.of(new Tuple<>(communicator.getDataCounts(), communicator.getModelSizeStats())); } + private void removeTmpStorage(String jobId) throws IOException { + Path path = nativeTmpStorage.get(jobId); + if (path != null) { + nativeStorageProvider.cleanupLocalTmpStorage(path); + } + } + ExecutorService createAutodetectExecutorService(ExecutorService executorService) { AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext()); executorService.submit(autoDetectWorkerExecutor::start); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java index 0afd3b8a473..f243195c3a7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/params/ForecastParams.java @@ -16,12 +16,14 @@ public class ForecastParams { private final long createTime; private final long duration; private final long expiresIn; + private final String tmpStorage; - private ForecastParams(String forecastId, long createTime, long duration, long expiresIn) { + private ForecastParams(String forecastId, long createTime, long duration, long expiresIn, String tmpStorage) { this.forecastId = forecastId; this.createTime = createTime; this.duration = duration; this.expiresIn = expiresIn; + this.tmpStorage = tmpStorage; } public String getForecastId() { @@ -52,9 +54,18 @@ public class ForecastParams { return expiresIn; } + /** + * Temporary storage forecast is allowed to use for persisting models. + * + * @return path to tmp storage + */ + public String getTmpStorage() { + return tmpStorage; + } + @Override public int hashCode() { - return Objects.hash(forecastId, createTime, duration, expiresIn); + return Objects.hash(forecastId, createTime, duration, expiresIn, tmpStorage); } @Override @@ -69,7 +80,8 @@ public class ForecastParams { return Objects.equals(forecastId, other.forecastId) && Objects.equals(createTime, other.createTime) && Objects.equals(duration, other.duration) - && Objects.equals(expiresIn, other.expiresIn); + && Objects.equals(expiresIn, other.expiresIn) + && Objects.equals(tmpStorage, other.tmpStorage); } public static Builder builder() { @@ -81,6 +93,7 @@ public class ForecastParams { private final long createTimeEpochSecs; private long durationSecs; private long expiresInSecs; + private String tmpStorage; private Builder() { forecastId = UUIDs.base64UUID(); @@ -101,8 +114,13 @@ public class ForecastParams { return this; } + public Builder tmpStorage(String tmpStorage) { + this.tmpStorage = tmpStorage; + return this; + } + public ForecastParams build() { - return new ForecastParams(forecastId, createTimeEpochSecs, durationSecs, expiresInSecs); + return new ForecastParams(forecastId, createTimeEpochSecs, durationSecs, expiresInSecs, tmpStorage); } } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java index 2a91797d28d..2c026ec1550 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/writer/ControlMsgToProcessWriter.java @@ -164,6 +164,9 @@ public class ControlMsgToProcessWriter { if (params.getExpiresIn() != -1) { builder.field("expires_in", params.getExpiresIn()); } + if (params.getTmpStorage() != null) { + builder.field("tmp_storage", params.getTmpStorage()); + } builder.endObject(); writeMessage(FORECAST_MESSAGE_CODE + Strings.toString(builder)); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeStorageProviderTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeStorageProviderTests.java new file mode 100644 index 00000000000..3103e76c82b --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/NativeStorageProviderTests.java @@ -0,0 +1,139 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.job.process; + +import org.elasticsearch.common.io.PathUtils; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.env.Environment; +import org.elasticsearch.test.ESTestCase; +import org.junit.Assert; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.any; + +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.doAnswer; + +public class NativeStorageProviderTests extends ESTestCase { + + public void testTmpStorage() throws IOException { + Map storage = new HashMap<>(); + Path tmpDir = createTempDir(); + + storage.put(tmpDir, new ByteSizeValue(6, ByteSizeUnit.GB).getBytes()); + NativeStorageProvider storageProvider = createNativeStorageProvider(storage); + + Assert.assertNotNull( + storageProvider.tryGetLocalTmpStorage(randomAlphaOfLengthBetween(4, 10), new ByteSizeValue(100, ByteSizeUnit.BYTES))); + Assert.assertNull(storageProvider.tryGetLocalTmpStorage(randomAlphaOfLengthBetween(4, 10), + new ByteSizeValue(1024 * 1024 * 1024 + 1, ByteSizeUnit.BYTES))); + + String id = randomAlphaOfLengthBetween(4, 10); + Path path = storageProvider.tryGetLocalTmpStorage(id, new ByteSizeValue(1, ByteSizeUnit.GB)); + Assert.assertNotNull(path); + + Assert.assertEquals(tmpDir.resolve("ml-local-data").resolve("tmp").resolve(id).toString(), path.toString()); + } + + public void testTmpStorageChooseDisk() throws IOException { + Map storage = new HashMap<>(); + Path tmpDir = createTempDir(); + + // low disk space + Path disk1 = tmpDir.resolve(randomAlphaOfLengthBetween(4, 10)); + storage.put(disk1, new ByteSizeValue(1, ByteSizeUnit.GB).getBytes()); + + // sufficient disk space + Path disk2 = tmpDir.resolve(randomAlphaOfLengthBetween(4, 10)); + storage.put(disk2, new ByteSizeValue(20, ByteSizeUnit.GB).getBytes()); + + NativeStorageProvider storageProvider = createNativeStorageProvider(storage); + + String id = randomAlphaOfLengthBetween(4, 10); + Path path = storageProvider.tryGetLocalTmpStorage(id, new ByteSizeValue(1, ByteSizeUnit.GB)); + Assert.assertNotNull(path); + + // should resolve to disk2 as disk1 is low on space + Assert.assertEquals(disk2.resolve("ml-local-data").resolve("tmp").resolve(id).toString(), path.toString()); + } + + public void testTmpStorageCleanup() throws IOException { + Map storage = new HashMap<>(); + Path tmpDir = createTempDir(); + storage.put(tmpDir, new ByteSizeValue(6, ByteSizeUnit.GB).getBytes()); + NativeStorageProvider storageProvider = createNativeStorageProvider(storage); + String id = randomAlphaOfLengthBetween(4, 10); + + Path path = storageProvider.tryGetLocalTmpStorage(id, new ByteSizeValue(1, ByteSizeUnit.KB)); + + Assert.assertTrue(Files.exists(path)); + Path testFile = PathUtils.get(path.toString(), "testFile"); + BufferedWriter writer = Files.newBufferedWriter(testFile, StandardCharsets.UTF_8); + writer.write("created by NativeStorageProviderTests::testTmpStorageDelete"); + + writer.close(); + Assert.assertTrue(Files.exists(testFile)); + Assert.assertTrue(Files.isRegularFile(testFile)); + + // the native component should cleanup itself, but assume it has crashed + storageProvider.cleanupLocalTmpStorage(path); + Assert.assertFalse(Files.exists(testFile)); + Assert.assertFalse(Files.exists(path)); + } + + public void testTmpStorageCleanupOnStart() throws IOException { + Map storage = new HashMap<>(); + Path tmpDir = createTempDir(); + storage.put(tmpDir, new ByteSizeValue(6, ByteSizeUnit.GB).getBytes()); + NativeStorageProvider storageProvider = createNativeStorageProvider(storage); + String id = randomAlphaOfLengthBetween(4, 10); + + Path path = storageProvider.tryGetLocalTmpStorage(id, new ByteSizeValue(1, ByteSizeUnit.KB)); + + Assert.assertTrue(Files.exists(path)); + Path testFile = PathUtils.get(path.toString(), "testFile"); + + BufferedWriter writer = Files.newBufferedWriter(testFile, StandardCharsets.UTF_8); + writer.write("created by NativeStorageProviderTests::testTmpStorageWipe"); + + writer.close(); + Assert.assertTrue(Files.exists(testFile)); + Assert.assertTrue(Files.isRegularFile(testFile)); + + // create a new storage provider to test the case of a crashed node + storageProvider = createNativeStorageProvider(storage); + storageProvider.cleanupLocalTmpStorageInCaseOfUncleanShutdown(); + Assert.assertFalse(Files.exists(testFile)); + Assert.assertFalse(Files.exists(path)); + } + + private NativeStorageProvider createNativeStorageProvider(Map paths) throws IOException { + Environment environment = mock(Environment.class); + + when(environment.dataFiles()).thenReturn(paths.keySet().toArray(new Path[paths.size()])); + NativeStorageProvider storageProvider = spy(new NativeStorageProvider(environment, new ByteSizeValue(5, ByteSizeUnit.GB))); + + doAnswer(invocation -> { + return paths.getOrDefault(invocation.getArguments()[0], Long.valueOf(0)).longValue(); + } + + ).when(storageProvider).getUsableSpace(any(Path.class)); + + return storageProvider; + } + +} diff --git a/x-pack/qa/ml-native-tests/build.gradle b/x-pack/qa/ml-native-tests/build.gradle index 94b7be3a44d..657aa7cfef6 100644 --- a/x-pack/qa/ml-native-tests/build.gradle +++ b/x-pack/qa/ml-native-tests/build.gradle @@ -61,6 +61,7 @@ integTestCluster { setting 'xpack.security.transport.ssl.verification_mode', 'certificate' setting 'xpack.security.audit.enabled', 'true' setting 'xpack.license.self_generated.type', 'trial' + setting 'xpack.ml.min_disk_space_off_heap', '200mb' keystoreSetting 'bootstrap.password', 'x-pack-test-password' keystoreSetting 'xpack.security.transport.ssl.keystore.secure_password', 'keypass' diff --git a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java index 14bdd533c6b..84557798390 100644 --- a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java +++ b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.ml.integration; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisLimits; @@ -206,8 +207,7 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase { assertThat(e.getMessage(), equalTo("Cannot run forecast: Forecast cannot be executed as model memory status is not OK")); } - @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/30399") - public void testMemoryLimit() throws Exception { + public void testOverflowToDisk() throws Exception { Detector.Builder detector = new Detector.Builder("mean", "value"); detector.setByFieldName("clientIP"); @@ -216,7 +216,9 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase { analysisConfig.setBucketSpan(bucketSpan); DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setTimeFormat("epoch"); - Job.Builder job = new Job.Builder("forecast-it-test-memory-limit"); + Job.Builder job = new Job.Builder("forecast-it-test-overflow-to-disk"); + AnalysisLimits limits = new AnalysisLimits(2048L, null); + job.setAnalysisLimits(limits); job.setAnalysisConfig(analysisConfig); job.setDataDescription(dataDescription); @@ -224,28 +226,47 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase { putJob(job); openJob(job.getId()); createDataWithLotsOfClientIps(bucketSpan, job); - ElasticsearchException e = expectThrows(ElasticsearchException.class, - () -> forecast(job.getId(), TimeValue.timeValueMinutes(120), null)); - assertThat(e.getMessage(), - equalTo("Cannot run forecast: Forecast cannot be executed as forecast memory usage is predicted to exceed 20MB")); + + try { + String forecastId = forecast(job.getId(), TimeValue.timeValueHours(1), null); + + waitForecastToFinish(job.getId(), forecastId); + } catch (ElasticsearchStatusException e) { + if (e.getMessage().contains("disk space")) { + throw new ElasticsearchStatusException( + "Test likely fails due to insufficient disk space on test machine, please free up space.", e.status(), e); + } + throw e; + } + + closeJob(job.getId()); + + List forecastStats = getForecastStats(); + assertThat(forecastStats.size(), equalTo(1)); + ForecastRequestStats forecastRequestStats = forecastStats.get(0); + List forecasts = getForecasts(job.getId(), forecastRequestStats); + + assertThat(forecastRequestStats.getRecordCount(), equalTo(8000L)); + assertThat(forecasts.size(), equalTo(8000)); } private void createDataWithLotsOfClientIps(TimeValue bucketSpan, Job.Builder job) throws IOException { long now = Instant.now().getEpochSecond(); - long timestamp = now - 50 * bucketSpan.seconds(); - while (timestamp < now) { - for (int i = 1; i < 256; i++) { + long timestamp = now - 15 * bucketSpan.seconds(); + + for (int h = 0; h < 15; h++) { + for (int i = 1; i < 101; i++) { List data = new ArrayList<>(); - for (int j = 1; j < 100; j++) { + for (int j = 1; j < 81; j++) { Map record = new HashMap<>(); record.put("time", timestamp); - record.put("value", 10.0); + record.put("value", 10.0 + h); record.put("clientIP", String.format(Locale.ROOT, "192.168.%d.%d", i, j)); data.add(createJsonRecord(record)); } postData(job.getId(), data.stream().collect(Collectors.joining())); - timestamp += bucketSpan.seconds(); } + timestamp += bucketSpan.seconds(); } flushJob(job.getId(), false); }