From e972f0216d450b74177c5e49bb287ab46c9e00a1 Mon Sep 17 00:00:00 2001 From: Martijn van Groningen Date: Fri, 3 Mar 2017 12:00:48 +0100 Subject: [PATCH] [ML] Auto close jobs and auto stop datafeeds if license is invalid. Original commit: elastic/x-pack-elasticsearch@6e2634b2b6488e50003e68277ed219ea21ee1377 --- .../org/elasticsearch/xpack/XPackPlugin.java | 5 +- .../xpack/ml/InvalidLicenseEnforcer.java | 49 ++++++ .../xpack/ml/MachineLearning.java | 27 +-- .../xpack/ml/action/OpenJobAction.java | 13 +- .../xpack/ml/datafeed/DatafeedJobRunner.java | 17 ++ .../xpack/ml/job/persistence/JobProvider.java | 27 --- .../job/persistence/JobResultsPersister.java | 6 +- .../autodetect/AutodetectCommunicator.java | 14 +- .../autodetect/AutodetectProcessManager.java | 26 ++- .../xpack/ml/utils/JobStateObserver.java | 11 +- .../MachineLearningLicensingTests.java | 161 +++++++++++++++++- .../AutodetectProcessManagerTests.java | 5 +- .../xpack/ml/support/BaseMlIntegTestCase.java | 28 ++- 13 files changed, 310 insertions(+), 79 deletions(-) create mode 100644 plugin/src/main/java/org/elasticsearch/xpack/ml/InvalidLicenseEnforcer.java diff --git a/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java b/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java index 158830bb6fe..6a670278c93 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/XPackPlugin.java @@ -98,6 +98,7 @@ import org.elasticsearch.xpack.ssl.SSLService; import org.elasticsearch.xpack.watcher.Watcher; import org.elasticsearch.xpack.watcher.WatcherFeatureSet; +import javax.security.auth.DestroyFailedException; import java.io.IOException; import java.nio.file.Path; import java.security.AccessController; @@ -119,8 +120,6 @@ import java.util.function.Supplier; import java.util.function.UnaryOperator; import java.util.stream.Collectors; -import javax.security.auth.DestroyFailedException; - public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, IngestPlugin, NetworkPlugin { public static final String NAME = "x-pack"; @@ -203,7 +202,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I this.monitoring = new Monitoring(settings, licenseState); this.watcher = new Watcher(settings); this.graph = new Graph(settings); - this.machineLearning = new MachineLearning(settings, env); + this.machineLearning = new MachineLearning(settings, env, licenseState); // Check if the node is a transport client. if (transportClientMode == false) { this.extensionsService = new XPackExtensionsService(settings, resolveXPackExtensionsFile(env), getExtensions()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/InvalidLicenseEnforcer.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/InvalidLicenseEnforcer.java new file mode 100644 index 00000000000..8d35219192a --- /dev/null +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/InvalidLicenseEnforcer.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml; + +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.license.XPackLicenseState; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner; +import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; + +public class InvalidLicenseEnforcer extends AbstractComponent { + + private final ThreadPool threadPool; + private final XPackLicenseState licenseState; + private final DatafeedJobRunner datafeedJobRunner; + private final AutodetectProcessManager autodetectProcessManager; + + InvalidLicenseEnforcer(Settings settings, XPackLicenseState licenseState, ThreadPool threadPool, + DatafeedJobRunner datafeedJobRunner, AutodetectProcessManager autodetectProcessManager) { + super(settings); + this.threadPool = threadPool; + this.licenseState = licenseState; + this.datafeedJobRunner = datafeedJobRunner; + this.autodetectProcessManager = autodetectProcessManager; + licenseState.addListener(this::closeJobsAndDatafeedsIfLicenseExpired); + } + + private void closeJobsAndDatafeedsIfLicenseExpired() { + if (licenseState.isMachineLearningAllowed() == false) { + threadPool.generic().execute(new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.warn("cannot close all jobs", e); + } + + @Override + protected void doRun() throws Exception { + datafeedJobRunner.closeAllDatafeeds("invalid license"); + autodetectProcessManager.closeAllJobs("invalid license"); + } + }); + } + } +} diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 6ccdbcb2948..fdcd747505c 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; +import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.plugins.ActionPlugin; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.rest.RestController; @@ -160,19 +161,17 @@ public class MachineLearning extends Plugin implements ActionPlugin { private final Settings settings; private final Environment env; - private boolean enabled; - private boolean transportClientMode; + private final XPackLicenseState licenseState; + private final boolean enabled; + private final boolean transportClientMode; private final boolean tribeNode; private final boolean tribeNodeClient; - public MachineLearning(Settings settings) { - this(settings, new Environment(settings)); - } - - public MachineLearning(Settings settings, Environment env) { - this.enabled = XPackSettings.MACHINE_LEARNING_ENABLED.get(settings); + public MachineLearning(Settings settings, Environment env, XPackLicenseState licenseState) { this.settings = settings; this.env = env; + this.licenseState = licenseState; + this.enabled = XPackSettings.MACHINE_LEARNING_ENABLED.get(settings); this.transportClientMode = XPackPlugin.transportClientMode(settings); this.tribeNode = XPackPlugin.isTribeNode(settings); this.tribeNodeClient = XPackPlugin.isTribeClientNode(settings); @@ -298,18 +297,21 @@ public class MachineLearning extends Plugin implements ActionPlugin { } NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory, threadPool.executor(MachineLearning.THREAD_POOL_NAME)); - AutodetectProcessManager dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider, - jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory); + AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, client, threadPool, + jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, + normalizerFactory); DatafeedJobRunner datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider, System::currentTimeMillis, auditor); PersistentActionService persistentActionService = new PersistentActionService(Settings.EMPTY, threadPool, clusterService, client); PersistentActionRegistry persistentActionRegistry = new PersistentActionRegistry(Settings.EMPTY); + InvalidLicenseEnforcer invalidLicenseEnforcer = + new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedJobRunner, autodetectProcessManager); return Arrays.asList( mlLifeCycleService, jobProvider, jobManager, - dataProcessor, + autodetectProcessManager, new MachineLearningTemplateRegistry(settings, clusterService, client, threadPool), new MlInitializationService(settings, threadPool, clusterService, client), jobDataCountsPersister, @@ -318,7 +320,8 @@ public class MachineLearning extends Plugin implements ActionPlugin { persistentActionRegistry, new PersistentTaskClusterService(Settings.EMPTY, persistentActionRegistry, clusterService), auditor, - new CloseJobService(client, threadPool, clusterService) + new CloseJobService(client, threadPool, clusterService), + invalidLicenseEnforcer ); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java index 6650b3f141d..4f41bd26183 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/action/OpenJobAction.java @@ -50,11 +50,11 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManage import org.elasticsearch.xpack.ml.notifications.Auditor; import org.elasticsearch.xpack.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.ml.utils.JobStateObserver; +import org.elasticsearch.xpack.persistent.NodePersistentTask; import org.elasticsearch.xpack.persistent.PersistentActionRegistry; import org.elasticsearch.xpack.persistent.PersistentActionRequest; import org.elasticsearch.xpack.persistent.PersistentActionResponse; import org.elasticsearch.xpack.persistent.PersistentActionService; -import org.elasticsearch.xpack.persistent.NodePersistentTask; import org.elasticsearch.xpack.persistent.PersistentTasks; import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment; import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask; @@ -66,7 +66,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.Consumer; +import java.util.function.BiConsumer; import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE; @@ -221,7 +221,7 @@ public class OpenJobAction extends Action cancelHandler; + private volatile BiConsumer cancelHandler; JobTask(String jobId, long id, String type, String action, TaskId parentTask) { super(id, type, action, "job-" + jobId, parentTask); @@ -234,8 +234,9 @@ public class OpenJobAction extends Action autodetectProcessManager.closeJob(request.getJobId(), reason); + jobTask.cancelHandler = (restart, reason) -> autodetectProcessManager.closeJob(request.getJobId(), restart, reason); autodetectProcessManager.openJob(request.getJobId(), task.getPersistentTaskId(), request.isIgnoreDowntime(), e2 -> { if (e2 == null) { listener.onResponse(new TransportResponse.Empty()); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index 6231138f69c..f0f5bdf4f57 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -39,7 +39,10 @@ import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction; import java.time.Duration; import java.util.Collections; +import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -57,6 +60,7 @@ public class DatafeedJobRunner extends AbstractComponent { private final ThreadPool threadPool; private final Supplier currentTimeSupplier; private final Auditor auditor; + private final ConcurrentMap runningDatafeeds = new ConcurrentHashMap<>(); public DatafeedJobRunner(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider, Supplier currentTimeSupplier, Auditor auditor) { @@ -95,6 +99,7 @@ public class DatafeedJobRunner extends AbstractComponent { latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime(); } Holder holder = createJobDatafeed(datafeed, job, latestFinalBucketEndMs, latestRecordTimeMs, handler, task); + runningDatafeeds.put(datafeedId, holder); updateDatafeedState(task.getPersistentTaskId(), DatafeedState.STARTED, e -> { if (e != null) { handler.accept(e); @@ -105,6 +110,17 @@ public class DatafeedJobRunner extends AbstractComponent { }, handler); } + public synchronized void closeAllDatafeeds(String reason) { + int numDatafeeds = runningDatafeeds.size(); + if (numDatafeeds != 0) { + logger.info("Closing [{}] datafeeds, because [{}]", numDatafeeds, reason); + } + + for (Map.Entry entry : runningDatafeeds.entrySet()) { + entry.getValue().stop(reason, TimeValue.timeValueSeconds(20), null); + } + } + // Important: Holder must be created and assigned to DatafeedTask before setting state to started, // otherwise if a stop datafeed call is made immediately after the start datafeed call we could cancel // the DatafeedTask without stopping datafeed, which causes the datafeed to keep on running. @@ -279,6 +295,7 @@ public class DatafeedJobRunner extends AbstractComponent { } finally { logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", source, datafeed.getId(), datafeed.getJobId(), acquired); + runningDatafeeds.remove(datafeed.getId()); FutureUtils.cancel(future); auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED)); handler.accept(e); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index def8dc5a19a..c1df2b53768 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -84,8 +84,6 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Supplier; @@ -530,31 +528,6 @@ public class JobProvider { false, partitionFieldValue, h, errorHandler); } - // keep blocking variant around for ScoresUpdater as that can remain a blocking as this is ran from dedicated ml threadpool. - // also refactoring that to be non blocking is a lot of work. - public int expandBucket(String jobId, boolean includeInterim, Bucket bucket) { - CountDownLatch latch = new CountDownLatch(1); - AtomicReference holder = new AtomicReference<>(); - AtomicReference errorHolder = new AtomicReference<>(); - expandBucket(jobId, includeInterim, bucket, null, 0, records -> { - holder.set(records); - latch.countDown(); - }, e -> { - errorHolder.set(e); - latch.countDown(); - }); - try { - latch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - if (errorHolder.get() != null) { - throw new RuntimeException(errorHolder.get()); - } else { - return holder.get(); - } - } - void bucketRecords(String jobId, Bucket bucket, int from, int size, boolean includeInterim, String sortField, boolean descending, String partitionFieldValue, Consumer> handler, Consumer errorHandler) { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java index f63dc32dc66..b5c534dcea8 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobResultsPersister.java @@ -11,6 +11,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.settings.Settings; @@ -298,7 +299,9 @@ public class JobResultsPersister extends AbstractComponent { String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); // Refresh should wait for Lucene to make the data searchable logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName); - client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet(); + RefreshRequest refreshRequest = new RefreshRequest(indexName); + refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); + client.admin().indices().refresh(refreshRequest).actionGet(); return true; } @@ -314,6 +317,7 @@ public class JobResultsPersister extends AbstractComponent { // Refresh should wait for Lucene to make the data searchable logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName); RefreshRequest refreshRequest = new RefreshRequest(indexName); + refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen()); client.admin().indices().refresh(refreshRequest).actionGet(); return true; } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java index 23015dae064..4416527dcd1 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectCommunicator.java @@ -89,15 +89,23 @@ public class AutodetectCommunicator implements Closeable { @Override public void close() throws IOException { - close(null); + close(false, null); } - public void close(String errorReason) throws IOException { + /** + * Closes job this communicator is encapsulating. + * + * @param restart Whether the job should be restarted by persistent tasks + * @param reason The reason for closing the job + */ + public void close(boolean restart, String reason) throws IOException { checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_CLOSE, job.getId()), () -> { + LOGGER.info("[{}] job closing, reason [{}]", job.getId(), reason); dataCountsReporter.close(); autodetectProcess.close(); autoDetectResultProcessor.awaitCompletion(); - handler.accept(errorReason != null ? new ElasticsearchException(errorReason) : null); + handler.accept(restart ? new ElasticsearchException(reason) : null); + LOGGER.info("[{}] job closed", job.getId()); return null; }, true); } diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java index 0c122bb876b..844fe0b8685 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java @@ -49,6 +49,7 @@ import java.time.Duration; import java.time.ZonedDateTime; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -97,6 +98,17 @@ public class AutodetectProcessManager extends AbstractComponent { this.autoDetectCommunicatorByJob = new ConcurrentHashMap<>(); } + public synchronized void closeAllJobs(String reason) throws IOException { + int numJobs = autoDetectCommunicatorByJob.size(); + if (numJobs != 0) { + logger.info("Closing [{}] jobs, because [{}]", numJobs, reason); + } + + for (Map.Entry entry : autoDetectCommunicatorByJob.entrySet()) { + closeJob(entry.getKey(), false, reason); + } + } + /** * Passes data to the native process. * This is a blocking call that won't return until all the data has been @@ -267,25 +279,25 @@ public class AutodetectProcessManager extends AbstractComponent { /** * Stop the running job and mark it as finished.
* @param jobId The job to stop - * @param errorReason If caused by failure, the reason for closing the job + * @param restart Whether the job should be restarted by persistent tasks + * @param reason The reason for closing the job */ - public void closeJob(String jobId, String errorReason) { - logger.debug("Attempting to close job [{}], because [{}]", jobId, errorReason); + public void closeJob(String jobId, boolean restart, String reason) { + logger.debug("Attempting to close job [{}], because [{}]", jobId, reason); AutodetectCommunicator communicator = autoDetectCommunicatorByJob.remove(jobId); if (communicator == null) { logger.debug("Cannot close: no active autodetect process for job {}", jobId); return; } - if (errorReason == null) { + if (reason == null) { logger.info("Closing job [{}]", jobId); } else { - logger.info("Closing job [{}], because [{}]", jobId, errorReason); + logger.info("Closing job [{}], because [{}]", jobId, reason); } try { - communicator.close(errorReason); - logger.info("[{}] job closed", jobId); + communicator.close(restart, reason); } catch (Exception e) { logger.warn("Exception closing stopped process input stream", e); throw ExceptionsHelper.serverError("Exception closing stopped process input stream", e); diff --git a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/JobStateObserver.java b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/JobStateObserver.java index ce62ecc89ec..b030b7d5e29 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/JobStateObserver.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/ml/utils/JobStateObserver.java @@ -14,8 +14,8 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.threadpool.ThreadPool; -import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.MlMetadata; +import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.persistent.PersistentTasks; import java.util.function.Consumer; @@ -57,7 +57,8 @@ public class JobStateObserver { @Override public void onTimeout(TimeValue timeout) { - if (jobStatePredicate.test(clusterService.state())) { + ClusterState state = clusterService.state(); + if (jobStatePredicate.test(state)) { if (jobStatePredicate.failed) { handler.accept(new ElasticsearchStatusException("[" + jobId + "] expected state [" + JobState.OPENED + "] but got [" + JobState.FAILED +"]", RestStatus.CONFLICT)); @@ -65,8 +66,10 @@ public class JobStateObserver { handler.accept(null); } } else { - Exception e = new IllegalArgumentException("Timeout expired while waiting for job state to change to [" - + expectedState + "]"); + PersistentTasks tasks = state.getMetaData().custom(PersistentTasks.TYPE); + JobState actual = MlMetadata.getJobState(jobId, tasks); + Exception e = new IllegalArgumentException("Timeout expired while waiting for job state [" + actual + + "] to change to [" + expectedState + "]"); handler.accept(e); } } diff --git a/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java b/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java index 74e0cb9fffb..6bcde5621a9 100644 --- a/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java +++ b/plugin/src/test/java/org/elasticsearch/license/MachineLearningLicensingTests.java @@ -6,8 +6,10 @@ package org.elasticsearch.license; import org.elasticsearch.ElasticsearchSecurityException; +import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.support.PlainListenableActionFuture; import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.license.License.OperationMode; import org.elasticsearch.rest.RestStatus; @@ -23,8 +25,11 @@ import org.elasticsearch.xpack.ml.action.PutJobAction; import org.elasticsearch.xpack.ml.action.StartDatafeedAction; import org.elasticsearch.xpack.ml.action.StopDatafeedAction; import org.elasticsearch.xpack.ml.client.MachineLearningClient; +import org.elasticsearch.xpack.ml.datafeed.DatafeedState; +import org.elasticsearch.xpack.ml.job.config.JobState; import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase; import org.elasticsearch.xpack.persistent.PersistentActionResponse; +import org.elasticsearch.xpack.persistent.PersistentTasks; import org.junit.Before; import java.util.Collections; @@ -42,7 +47,7 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { ensureStableCluster(1); ensureYellow(); } - + public void testMachineLearningPutJobActionRestricted() throws Exception { // Pick a license that does not allow machine learning License.OperationMode mode = randomInvalidLicenseType(); @@ -108,6 +113,13 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { mode = randomValidLicenseType(); enableLicensing(mode); assertMLAllowed(true); + + // now that the license is invalid, the job should get closed: + assertBusy(() -> { + JobState jobState = getJobStats("foo").getState(); + assertEquals(JobState.CLOSED, jobState); + }); + // test that license restricted apis do now work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); @@ -163,6 +175,102 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { } } + public void testAutoCloseJobWithDatafeed() throws Exception { + assertMLAllowed(true); + try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { + client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); + // put job + PlainListenableActionFuture putJobListener = new PlainListenableActionFuture<>(client.threadPool()); + new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo").build()), putJobListener); + PutJobAction.Response putJobResponse = putJobListener.actionGet(); + assertNotNull(putJobResponse); + // put datafeed + PlainListenableActionFuture putDatafeedListener = new PlainListenableActionFuture<>( + client.threadPool()); + new MachineLearningClient(client).putDatafeed( + new PutDatafeedAction.Request(createDatafeed("foobar", "foo", Collections.singletonList("foo"))), putDatafeedListener); + PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet(); + assertNotNull(putDatafeedResponse); + // open job + PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool()); + new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener); + PersistentActionResponse openJobResponse = openJobListener.actionGet(); + assertNotNull(openJobResponse); + // start datafeed + PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool()); + new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request("foobar", 0L), listener); + listener.actionGet(); + } + + if (randomBoolean()) { + enableLicensing(randomInvalidLicenseType()); + } else { + disableLicensing(); + } + assertMLAllowed(false); + + // now that the license is invalid, the job should be closed and datafeed stopped: + assertBusy(() -> { + JobState jobState = getJobStats("foo").getState(); + assertEquals(JobState.CLOSED, jobState); + + DatafeedState datafeedState = getDatafeedStats("foobar").getDatafeedState(); + assertEquals(DatafeedState.STOPPED, datafeedState); + + ClusterState state = client().admin().cluster().prepareState().get().getState(); + PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE); + assertEquals(0, tasks.taskMap().size()); + }); + + enableLicensing(randomValidLicenseType()); + assertMLAllowed(true); + + try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { + client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); + // open job + PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool()); + new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener); + PersistentActionResponse openJobResponse = openJobListener.actionGet(); + assertNotNull(openJobResponse); + // start datafeed + PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool()); + new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request("foobar", 0L), listener); + listener.actionGet(); + } + + assertBusy(() -> { + JobState jobState = getJobStats("foo").getState(); + assertEquals(JobState.OPENED, jobState); + + DatafeedState datafeedState = getDatafeedStats("foobar").getDatafeedState(); + assertEquals(DatafeedState.STARTED, datafeedState); + + ClusterState state = client().admin().cluster().prepareState().get().getState(); + PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE); + assertEquals(2, tasks.taskMap().size()); + }); + + if (randomBoolean()) { + enableLicensing(randomInvalidLicenseType()); + } else { + disableLicensing(); + } + assertMLAllowed(false); + + // now that the license is invalid, the job should be closed and datafeed stopped: + assertBusy(() -> { + JobState jobState = getJobStats("foo").getState(); + assertEquals(JobState.CLOSED, jobState); + + DatafeedState datafeedState = getDatafeedStats("foobar").getDatafeedState(); + assertEquals(DatafeedState.STOPPED, datafeedState); + + ClusterState state = client().admin().cluster().prepareState().get().getState(); + PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE); + assertEquals(0, tasks.taskMap().size()); + }); + } + public void testMachineLearningStartDatafeedActionRestricted() throws Exception { assertMLAllowed(true); @@ -189,6 +297,16 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { License.OperationMode mode = randomInvalidLicenseType(); enableLicensing(mode); assertMLAllowed(false); + + // now that the license is invalid, the job should get closed: + assertBusy(() -> { + JobState jobState = getJobStats("foo").getState(); + assertEquals(JobState.CLOSED, jobState); + ClusterState state = client().admin().cluster().prepareState().get().getState(); + PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE); + assertEquals(0, tasks.taskMap().size()); + }); + // test that license restricted apis do not work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); @@ -209,6 +327,12 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { // test that license restricted apis do now work try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); + // re-open job now that the license is valid again + PlainListenableActionFuture openJobListener = new PlainListenableActionFuture<>(client.threadPool()); + new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener); + PersistentActionResponse openJobResponse = openJobListener.actionGet(); + assertNotNull(openJobResponse); + PlainListenableActionFuture listener = new PlainListenableActionFuture<>(client.threadPool()); new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request("foobar", 0L), listener); PersistentActionResponse response = listener.actionGet(); @@ -243,16 +367,26 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { assertNotNull(startDatafeedResponse); } - // Pick a random license - License.OperationMode mode = randomLicenseType(); - enableLicensing(mode); + boolean invalidLicense = randomBoolean(); + if (invalidLicense) { + enableLicensing(randomInvalidLicenseType()); + } else { + enableLicensing(randomValidLicenseType()); + } try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); PlainListenableActionFuture listener = new PlainListenableActionFuture<>( client.threadPool()); new MachineLearningClient(client).stopDatafeed(new StopDatafeedAction.Request("foobar"), listener); - listener.actionGet(); + if (invalidLicense) { + // expected to because datafeeds is automatically stopped in case of invalid license, + // a license error should not be returned + Exception e = expectThrows(ElasticsearchStatusException.class, listener::actionGet); + assertEquals("datafeed already stopped, expected datafeed state [started], but got [stopped]", e.getMessage()); + } else { + listener.actionGet(); + } } } @@ -272,9 +406,12 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { assertNotNull(openJobResponse); } - // Pick a random license - License.OperationMode mode = randomLicenseType(); - enableLicensing(mode); + boolean invalidLicense = randomBoolean(); + if (invalidLicense) { + enableLicensing(randomInvalidLicenseType()); + } else { + enableLicensing(randomValidLicenseType()); + } try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) { client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress()); @@ -282,7 +419,13 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase { CloseJobAction.Request request = new CloseJobAction.Request("foo"); request.setTimeout(TimeValue.timeValueSeconds(30)); new MachineLearningClient(client).closeJob(request, listener); - listener.actionGet(); + if (invalidLicense) { + // so the license expired then job closes automatically, so an error is expected: + Exception e = expectThrows(ElasticsearchStatusException.class, listener::actionGet); + assertEquals("cannot close job, expected job state [opened], but got [closed]", e.getMessage()); + } else { + listener.actionGet(); + } } } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java index a42572d20f9..57442747e9b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java @@ -14,7 +14,6 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.ml.MachineLearning; -import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.job.JobManager; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.DataDescription; @@ -190,7 +189,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { Exception e = holder[0]; assertEquals("max running job capacity [3] reached", e.getMessage()); - manager.closeJob("baz", null); + manager.closeJob("baz", false, null); assertEquals(2, manager.numberOfOpenJobs()); manager.openJob("foobar", 4L, false, e1 -> {}); assertEquals(3, manager.numberOfOpenJobs()); @@ -230,7 +229,7 @@ public class AutodetectProcessManagerTests extends ESTestCase { // job is created assertEquals(1, manager.numberOfOpenJobs()); - manager.closeJob("foo", null); + manager.closeJob("foo", false, null); assertEquals(0, manager.numberOfOpenJobs()); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java index 9ffcf1e0e36..616a206560c 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/ml/support/BaseMlIntegTestCase.java @@ -115,7 +115,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { MachineLearningTemplateRegistry.allTemplatesInstalled(metaData)); }); } - + protected Job.Builder createJob(String id) { DataDescription.Builder dataDescription = new DataDescription.Builder(); dataDescription.setFormat(DataDescription.DataFormat.JSON); @@ -185,6 +185,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { @After public void cleanupWorkaround() throws Exception { + logger.info("[{}#{}]: Cleaning up datafeeds and jobs after test", getTestClass().getSimpleName(), getTestName()); deleteAllDatafeeds(client()); deleteAllJobs(client()); assertBusy(() -> { @@ -214,13 +215,32 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase { logger.info("Indexed [{}] documents", numDocs); } - public static DataCounts getDataCounts(String jobId) { + public static GetJobsStatsAction.Response.JobStats getJobStats(String jobId) { GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId); GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).actionGet(); if (response.getResponse().results().isEmpty()) { - return new DataCounts(jobId); + return null; } else { - return response.getResponse().results().get(0).getDataCounts(); + return response.getResponse().results().get(0); + } + } + + public static DataCounts getDataCounts(String jobId) { + GetJobsStatsAction.Response.JobStats jobStats = getJobStats(jobId); + if (jobStats != null) { + return jobStats.getDataCounts(); + } else { + return new DataCounts(jobId); + } + } + + public static GetDatafeedsStatsAction.Response.DatafeedStats getDatafeedStats(String datafeedId) { + GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId); + GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet(); + if (response.getResponse().results().isEmpty()) { + return null; + } else { + return response.getResponse().results().get(0); } }