[ML] Refactor NativeStorageProvider to enable reuse (#41414) (#41746)

* [ML] Refactor NativeStorageProvider to enable reuse

Moves `NativeStorageProvider` as a machine learning component
so that it can be reused for other job types. Also, we now
pass the persistent task description as unique identifier which
avoids conflicts between jobs of different type but with same ids.

* Adding nativeStorageProvider as component

Since `TransportForecastJobAction` is expected to get injected a `NativeStorageProvider` class, we need to make sure that it is a constructed component, as it does not have a zero parametered, public ctor.
This commit is contained in:
Benjamin Trent 2019-05-02 09:46:22 -05:00 committed by GitHub
parent be7ec5a47a
commit a92c06ae09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 71 additions and 84 deletions

View File

@ -31,6 +31,8 @@ import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -188,6 +190,7 @@ import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.process.MlMemoryTracker;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.NativeControllerHolder;
import org.elasticsearch.xpack.ml.process.NativeStorageProvider;
import org.elasticsearch.xpack.ml.rest.RestDeleteExpiredDataAction;
import org.elasticsearch.xpack.ml.rest.RestFindFileStructureAction;
import org.elasticsearch.xpack.ml.rest.RestMlInfoAction;
@ -293,6 +296,10 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
public static final Setting<Integer> MAX_OPEN_JOBS_PER_NODE =
Setting.intSetting("xpack.ml.max_open_jobs", 20, 1, MAX_MAX_OPEN_JOBS_PER_NODE, Property.Dynamic, Property.NodeScope);
// Undocumented setting for integration test purposes
public static final Setting<ByteSizeValue> MIN_DISK_SPACE_OFF_HEAP =
Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Setting.Property.NodeScope);
private static final Logger logger = LogManager.getLogger(XPackPlugin.class);
private final Settings settings;
@ -333,7 +340,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
AutodetectBuilder.DONT_PERSIST_MODEL_STATE_SETTING,
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC,
MAX_OPEN_JOBS_PER_NODE,
AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP,
MIN_DISK_SPACE_OFF_HEAP,
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION));
}
@ -424,6 +431,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
JobDataCountsPersister jobDataCountsPersister = new JobDataCountsPersister(client);
JobResultsPersister jobResultsPersister = new JobResultsPersister(client);
NativeStorageProvider nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings));
AutodetectProcessFactory autodetectProcessFactory;
NormalizerProcessFactory normalizerProcessFactory;
if (MachineLearningField.AUTODETECT_PROCESS.get(settings) && MachineLearningFeatureSet.isRunningOnMlPlatform(true)) {
@ -454,8 +463,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME));
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(env, settings, client, threadPool,
jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, xContentRegistry, auditor, clusterService);
xContentRegistry, auditor, clusterService, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister,
autodetectProcessFactory, normalizerFactory, nativeStorageProvider);
this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry,
auditor, System::currentTimeMillis);
@ -472,8 +481,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
new InvalidLicenseEnforcer(getLicenseState(), threadPool, datafeedManager, autodetectProcessManager);
enforcer.listenForLicenseStateChanges();
// run node startup tasks
autodetectProcessManager.onNodeStartup();
// Perform node startup operations
nativeStorageProvider.cleanupLocalTmpStorageInCaseOfUncleanShutdown();
return Arrays.asList(
mlLifeCycleService,
@ -488,7 +497,8 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
datafeedManager,
auditor,
new MlAssignmentNotifier(settings, auditor, threadPool, client, clusterService),
memoryTracker
memoryTracker,
nativeStorageProvider
);
}

View File

@ -24,6 +24,7 @@ import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.ml.process.NativeStorageProvider;
import java.nio.file.Path;
import java.util.List;
@ -38,16 +39,19 @@ public class TransportForecastJobAction extends TransportJobTaskAction<ForecastJ
private final JobResultsProvider jobResultsProvider;
private final JobManager jobManager;
private final NativeStorageProvider nativeStorageProvider;
@Inject
public TransportForecastJobAction(TransportService transportService,
ClusterService clusterService, ActionFilters actionFilters,
JobResultsProvider jobResultsProvider, AutodetectProcessManager processManager,
JobManager jobManager) {
JobManager jobManager, NativeStorageProvider nativeStorageProvider) {
super(ForecastJobAction.NAME, clusterService, transportService, actionFilters,
ForecastJobAction.Request::new, ForecastJobAction.Response::new,
ThreadPool.Names.SAME, processManager);
this.jobResultsProvider = jobResultsProvider;
this.jobManager = jobManager;
this.nativeStorageProvider = nativeStorageProvider;
// ThreadPool.Names.SAME, because operations is executed by autodetect worker thread
}
@ -70,7 +74,7 @@ public class TransportForecastJobAction extends TransportJobTaskAction<ForecastJ
// tmp storage might be null, we do not log here, because it might not be
// required
Path tmpStorage = processManager.tryGetTmpStorage(task, FORECAST_LOCAL_STORAGE_LIMIT);
Path tmpStorage = nativeStorageProvider.tryGetLocalTmpStorage(task.getDescription(), FORECAST_LOCAL_STORAGE_LIMIT);
if (tmpStorage != null) {
paramsBuilder.tmpStorage(tmpStorage.toString());
}

View File

@ -17,9 +17,7 @@ import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
@ -33,9 +31,9 @@ import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
import org.elasticsearch.xpack.core.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
@ -72,12 +70,12 @@ import org.elasticsearch.xpack.ml.process.NativeStorageProvider;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Date;
import java.util.Iterator;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -85,16 +83,11 @@ import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.elasticsearch.common.settings.Setting.Property;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;
public class AutodetectProcessManager implements ClusterStateListener {
// Undocumented setting for integration test purposes
public static final Setting<ByteSizeValue> MIN_DISK_SPACE_OFF_HEAP =
Setting.byteSizeSetting("xpack.ml.min_disk_space_off_heap", new ByteSizeValue(5, ByteSizeUnit.GB), Property.NodeScope);
private static final Logger logger = LogManager.getLogger(AutodetectProcessManager.class);
private final Client client;
@ -111,9 +104,6 @@ public class AutodetectProcessManager implements ClusterStateListener {
private NativeStorageProvider nativeStorageProvider;
private final ConcurrentMap<Long, ProcessContext> processByAllocation = new ConcurrentHashMap<>();
// a map that manages the allocation of temporary space to jobs
private final ConcurrentMap<String, Path> nativeTmpStorage = new ConcurrentHashMap<>();
private volatile int maxAllowedRunningJobs;
private final NamedXContentRegistry xContentRegistry;
@ -123,10 +113,10 @@ public class AutodetectProcessManager implements ClusterStateListener {
private volatile boolean upgradeInProgress;
public AutodetectProcessManager(Environment environment, Settings settings, Client client, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Auditor auditor, ClusterService clusterService,
JobManager jobManager, JobResultsProvider jobResultsProvider, JobResultsPersister jobResultsPersister,
JobDataCountsPersister jobDataCountsPersister,
AutodetectProcessFactory autodetectProcessFactory, NormalizerFactory normalizerFactory,
NamedXContentRegistry xContentRegistry, Auditor auditor, ClusterService clusterService) {
JobDataCountsPersister jobDataCountsPersister, AutodetectProcessFactory autodetectProcessFactory,
NormalizerFactory normalizerFactory, NativeStorageProvider nativeStorageProvider) {
this.environment = environment;
this.client = client;
this.threadPool = threadPool;
@ -139,7 +129,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
this.jobResultsPersister = jobResultsPersister;
this.jobDataCountsPersister = jobDataCountsPersister;
this.auditor = auditor;
this.nativeStorageProvider = new NativeStorageProvider(environment, MIN_DISK_SPACE_OFF_HEAP.get(settings));
this.nativeStorageProvider = Objects.requireNonNull(nativeStorageProvider);
clusterService.addListener(this);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.MAX_OPEN_JOBS_PER_NODE, this::setMaxAllowedRunningJobs);
@ -149,14 +139,6 @@ public class AutodetectProcessManager implements ClusterStateListener {
this.maxAllowedRunningJobs = maxAllowedRunningJobs;
}
public void onNodeStartup() {
try {
nativeStorageProvider.cleanupLocalTmpStorageInCaseOfUncleanShutdown();
} catch (Exception e) {
logger.warn("Failed to cleanup native storage from previous invocation", e);
}
}
public synchronized void closeAllJobsOnThisNode(String reason) {
int numJobs = processByAllocation.size();
if (numJobs != 0) {
@ -283,28 +265,6 @@ public class AutodetectProcessManager implements ClusterStateListener {
});
}
/**
* Request temporary storage to be used for the job
*
* @param jobTask The job task
* @param requestedSize requested size
* @return a Path to local storage or null if storage is not available
*/
public Path tryGetTmpStorage(JobTask jobTask, ByteSizeValue requestedSize) {
String jobId = jobTask.getJobId();
Path path = nativeTmpStorage.get(jobId);
if (path == null) {
path = nativeStorageProvider.tryGetLocalTmpStorage(jobId, requestedSize);
if (path != null) {
nativeTmpStorage.put(jobId, path);
}
} else if (!nativeStorageProvider.localTmpStorageHasEnoughSpace(path, requestedSize)) {
// the previous tmp location ran out of disk space, do not allow further usage
return null;
}
return path;
}
/**
* Do a forecast for the running job.
*
@ -602,7 +562,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
}
setJobState(jobTask, JobState.FAILED, reason);
try {
removeTmpStorage(jobTask.getJobId());
nativeStorageProvider.cleanupLocalTmpStorage(jobTask.getDescription());
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}] Failed to delete temporary files", jobTask.getJobId()), e);
}
@ -666,7 +626,7 @@ public class AutodetectProcessManager implements ClusterStateListener {
}
// delete any tmp storage
try {
removeTmpStorage(jobId);
nativeStorageProvider.cleanupLocalTmpStorage(jobTask.getDescription());
} catch (IOException e) {
logger.error(new ParameterizedMessage("[{}]Failed to delete temporary files", jobId), e);
}
@ -760,13 +720,6 @@ public class AutodetectProcessManager implements ClusterStateListener {
return Optional.of(new Tuple<>(communicator.getDataCounts(), communicator.getModelSizeStats()));
}
private void removeTmpStorage(String jobId) throws IOException {
Path path = nativeTmpStorage.get(jobId);
if (path != null) {
nativeStorageProvider.cleanupLocalTmpStorage(path);
}
}
ExecutorService createAutodetectExecutorService(ExecutorService executorService) {
AutodetectWorkerExecutorService autoDetectWorkerExecutor = new AutodetectWorkerExecutorService(threadPool.getThreadContext());
executorService.submit(autoDetectWorkerExecutor::start);

View File

@ -15,6 +15,8 @@ import org.elasticsearch.env.Environment;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Provide storage for native components.
@ -23,7 +25,6 @@ public class NativeStorageProvider {
private static final Logger LOGGER = LogManager.getLogger(NativeStorageProvider.class);
private static final String LOCAL_STORAGE_SUBFOLDER = "ml-local-data";
private static final String LOCAL_STORAGE_TMP_FOLDER = "tmp";
@ -32,6 +33,9 @@ public class NativeStorageProvider {
// do not allow any usage below this threshold
private final ByteSizeValue minLocalStorageAvailable;
// A map to keep track of allocated native storage by resource id
private final ConcurrentMap<String, Path> allocatedStorage = new ConcurrentHashMap<>();
public NativeStorageProvider(Environment environment, ByteSizeValue minDiskSpaceOffHeap) {
this.environment = environment;
this.minLocalStorageAvailable = minDiskSpaceOffHeap;
@ -44,13 +48,15 @@ public class NativeStorageProvider {
* unclean node shutdown or broken clients.
*
* Do not call while there are running jobs.
*
* @throws IOException if cleanup fails
*/
public void cleanupLocalTmpStorageInCaseOfUncleanShutdown() throws IOException {
public void cleanupLocalTmpStorageInCaseOfUncleanShutdown() {
try {
for (Path p : environment.dataFiles()) {
IOUtils.rm(p.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER));
}
} catch (Exception e) {
LOGGER.warn("Failed to cleanup native storage from previous invocation", e);
}
}
/**
@ -61,17 +67,28 @@ public class NativeStorageProvider {
* @return Path for temporary storage if available, null otherwise
*/
public Path tryGetLocalTmpStorage(String uniqueIdentifier, ByteSizeValue requestedSize) {
Path path = allocatedStorage.get(uniqueIdentifier);
if (path != null && localTmpStorageHasEnoughSpace(path, requestedSize) == false) {
LOGGER.debug("Previous tmp storage for [{}] run out, returning null", uniqueIdentifier);
return null;
} else {
path = tryAllocateStorage(uniqueIdentifier, requestedSize);
}
return path;
}
private Path tryAllocateStorage(String uniqueIdentifier, ByteSizeValue requestedSize) {
for (Path path : environment.dataFiles()) {
try {
if (getUsableSpace(path) >= requestedSize.getBytes() + minLocalStorageAvailable.getBytes()) {
Path tmpDirectory = path.resolve(LOCAL_STORAGE_SUBFOLDER).resolve(LOCAL_STORAGE_TMP_FOLDER).resolve(uniqueIdentifier);
Files.createDirectories(tmpDirectory);
allocatedStorage.put(uniqueIdentifier, tmpDirectory);
return tmpDirectory;
}
} catch (IOException e) {
LOGGER.debug("Failed to obtain information about path [{}]: {}", path, e);
}
}
LOGGER.debug("Failed to find native storage for [{}], returning null", uniqueIdentifier);
return null;
@ -96,12 +113,12 @@ public class NativeStorageProvider {
/**
* Delete temporary storage, previously allocated
*
* @param path
* Path to temporary storage
* @throws IOException
* if path can not be cleaned up
* @param uniqueIdentifier the identifier to which storage was allocated
* @throws IOException if path can not be cleaned up
*/
public void cleanupLocalTmpStorage(Path path) throws IOException {
public void cleanupLocalTmpStorage(String uniqueIdentifier) throws IOException {
Path path = allocatedStorage.remove(uniqueIdentifier);
if (path != null) {
// do not allow to breakout from the tmp storage provided
Path realPath = path.toAbsolutePath();
for (Path p : environment.dataFiles()) {
@ -110,6 +127,7 @@ public class NativeStorageProvider {
}
}
}
}
public ByteSizeValue getMinLocalStorageAvailable() {
return minLocalStorageAvailable;

View File

@ -54,6 +54,7 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.TimeRange;
import org.elasticsearch.xpack.ml.job.process.normalizer.NormalizerFactory;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.process.NativeStorageProvider;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
@ -121,6 +122,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private Auditor auditor;
private ClusterState clusterState;
private ClusterService clusterService;
private NativeStorageProvider nativeStorageProvider;
private DataCounts dataCounts = new DataCounts("foo");
private ModelSizeStats modelSizeStats = new ModelSizeStats.Builder("foo").build();
@ -159,6 +161,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
clusterState = mock(ClusterState.class);
when(clusterState.getMetaData()).thenReturn(metaData);
when(clusterState.metaData()).thenReturn(metaData);
nativeStorageProvider = mock(NativeStorageProvider.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
@ -685,9 +688,8 @@ public class AutodetectProcessManagerTests extends ESTestCase {
private AutodetectProcessManager createManager(Settings settings) {
return new AutodetectProcessManager(environment, settings,
client, threadPool, jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister,
autodetectFactory, normalizerFactory,
new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService);
client, threadPool, new NamedXContentRegistry(Collections.emptyList()), auditor, clusterService, jobManager, jobResultsProvider,
jobResultsPersister, jobDataCountsPersister, autodetectFactory, normalizerFactory, nativeStorageProvider);
}
private AutodetectProcessManager createSpyManagerAndCallProcessData(String jobId) {
AutodetectProcessManager manager = createSpyManager();

View File

@ -89,7 +89,7 @@ public class NativeStorageProviderTests extends ESTestCase {
Assert.assertTrue(Files.isRegularFile(testFile));
// the native component should cleanup itself, but assume it has crashed
storageProvider.cleanupLocalTmpStorage(path);
storageProvider.cleanupLocalTmpStorage(id);
Assert.assertFalse(Files.exists(testFile));
Assert.assertFalse(Files.exists(path));
}