[ML] Auto close jobs and auto stop datafeeds if license is invalid.

Original commit: elastic/x-pack-elasticsearch@6e2634b2b6
This commit is contained in:
Martijn van Groningen 2017-03-03 12:00:48 +01:00
parent 6365dec42e
commit e972f0216d
13 changed files with 310 additions and 79 deletions

View File

@ -98,6 +98,7 @@ import org.elasticsearch.xpack.ssl.SSLService;
import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.WatcherFeatureSet;
import javax.security.auth.DestroyFailedException;
import java.io.IOException;
import java.nio.file.Path;
import java.security.AccessController;
@ -119,8 +120,6 @@ import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import javax.security.auth.DestroyFailedException;
public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, IngestPlugin, NetworkPlugin {
public static final String NAME = "x-pack";
@ -203,7 +202,7 @@ public class XPackPlugin extends Plugin implements ScriptPlugin, ActionPlugin, I
this.monitoring = new Monitoring(settings, licenseState);
this.watcher = new Watcher(settings);
this.graph = new Graph(settings);
this.machineLearning = new MachineLearning(settings, env);
this.machineLearning = new MachineLearning(settings, env, licenseState);
// Check if the node is a transport client.
if (transportClientMode == false) {
this.extensionsService = new XPackExtensionsService(settings, resolveXPackExtensionsFile(env), getExtensions());

View File

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobRunner;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
public class InvalidLicenseEnforcer extends AbstractComponent {
private final ThreadPool threadPool;
private final XPackLicenseState licenseState;
private final DatafeedJobRunner datafeedJobRunner;
private final AutodetectProcessManager autodetectProcessManager;
InvalidLicenseEnforcer(Settings settings, XPackLicenseState licenseState, ThreadPool threadPool,
DatafeedJobRunner datafeedJobRunner, AutodetectProcessManager autodetectProcessManager) {
super(settings);
this.threadPool = threadPool;
this.licenseState = licenseState;
this.datafeedJobRunner = datafeedJobRunner;
this.autodetectProcessManager = autodetectProcessManager;
licenseState.addListener(this::closeJobsAndDatafeedsIfLicenseExpired);
}
private void closeJobsAndDatafeedsIfLicenseExpired() {
if (licenseState.isMachineLearningAllowed() == false) {
threadPool.generic().execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("cannot close all jobs", e);
}
@Override
protected void doRun() throws Exception {
datafeedJobRunner.closeAllDatafeeds("invalid license");
autodetectProcessManager.closeAllJobs("invalid license");
}
});
}
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestController;
@ -160,19 +161,17 @@ public class MachineLearning extends Plugin implements ActionPlugin {
private final Settings settings;
private final Environment env;
private boolean enabled;
private boolean transportClientMode;
private final XPackLicenseState licenseState;
private final boolean enabled;
private final boolean transportClientMode;
private final boolean tribeNode;
private final boolean tribeNodeClient;
public MachineLearning(Settings settings) {
this(settings, new Environment(settings));
}
public MachineLearning(Settings settings, Environment env) {
this.enabled = XPackSettings.MACHINE_LEARNING_ENABLED.get(settings);
public MachineLearning(Settings settings, Environment env, XPackLicenseState licenseState) {
this.settings = settings;
this.env = env;
this.licenseState = licenseState;
this.enabled = XPackSettings.MACHINE_LEARNING_ENABLED.get(settings);
this.transportClientMode = XPackPlugin.transportClientMode(settings);
this.tribeNode = XPackPlugin.isTribeNode(settings);
this.tribeNodeClient = XPackPlugin.isTribeClientNode(settings);
@ -298,18 +297,21 @@ public class MachineLearning extends Plugin implements ActionPlugin {
}
NormalizerFactory normalizerFactory = new NormalizerFactory(normalizerProcessFactory,
threadPool.executor(MachineLearning.THREAD_POOL_NAME));
AutodetectProcessManager dataProcessor = new AutodetectProcessManager(settings, client, threadPool, jobManager, jobProvider,
jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory, normalizerFactory);
AutodetectProcessManager autodetectProcessManager = new AutodetectProcessManager(settings, client, threadPool,
jobManager, jobProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory);
DatafeedJobRunner datafeedJobRunner = new DatafeedJobRunner(threadPool, client, clusterService, jobProvider,
System::currentTimeMillis, auditor);
PersistentActionService persistentActionService = new PersistentActionService(Settings.EMPTY, threadPool, clusterService, client);
PersistentActionRegistry persistentActionRegistry = new PersistentActionRegistry(Settings.EMPTY);
InvalidLicenseEnforcer invalidLicenseEnforcer =
new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedJobRunner, autodetectProcessManager);
return Arrays.asList(
mlLifeCycleService,
jobProvider,
jobManager,
dataProcessor,
autodetectProcessManager,
new MachineLearningTemplateRegistry(settings, clusterService, client, threadPool),
new MlInitializationService(settings, threadPool, clusterService, client),
jobDataCountsPersister,
@ -318,7 +320,8 @@ public class MachineLearning extends Plugin implements ActionPlugin {
persistentActionRegistry,
new PersistentTaskClusterService(Settings.EMPTY, persistentActionRegistry, clusterService),
auditor,
new CloseJobService(client, threadPool, clusterService)
new CloseJobService(client, threadPool, clusterService),
invalidLicenseEnforcer
);
}

View File

@ -50,11 +50,11 @@ import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManage
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.utils.JobStateObserver;
import org.elasticsearch.xpack.persistent.NodePersistentTask;
import org.elasticsearch.xpack.persistent.PersistentActionRegistry;
import org.elasticsearch.xpack.persistent.PersistentActionRequest;
import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.elasticsearch.xpack.persistent.PersistentActionService;
import org.elasticsearch.xpack.persistent.NodePersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.elasticsearch.xpack.persistent.PersistentTasks.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasks.PersistentTask;
@ -66,7 +66,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
import static org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE;
@ -221,7 +221,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
public static class JobTask extends NodePersistentTask {
private final String jobId;
private volatile Consumer<String> cancelHandler;
private volatile BiConsumer<Boolean, String> cancelHandler;
JobTask(String jobId, long id, String type, String action, TaskId parentTask) {
super(id, type, action, "job-" + jobId, parentTask);
@ -234,8 +234,9 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
@Override
protected void onCancelled() {
String reason = CancelTasksRequest.DEFAULT_REASON.equals(getReasonCancelled()) ? null : getReasonCancelled();
cancelHandler.accept(reason);
String reason = getReasonCancelled();
boolean restart = CancelTasksRequest.DEFAULT_REASON.equals(reason) == false;
cancelHandler.accept(restart, reason);
}
static boolean match(Task task, String expectedJobId) {
@ -333,7 +334,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, PersistentActio
}
JobTask jobTask = (JobTask) task;
jobTask.cancelHandler = (reason) -> autodetectProcessManager.closeJob(request.getJobId(), reason);
jobTask.cancelHandler = (restart, reason) -> autodetectProcessManager.closeJob(request.getJobId(), restart, reason);
autodetectProcessManager.openJob(request.getJobId(), task.getPersistentTaskId(), request.isIgnoreDowntime(), e2 -> {
if (e2 == null) {
listener.onResponse(new TransportResponse.Empty());

View File

@ -39,7 +39,10 @@ import org.elasticsearch.xpack.persistent.UpdatePersistentTaskStatusAction;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@ -57,6 +60,7 @@ public class DatafeedJobRunner extends AbstractComponent {
private final ThreadPool threadPool;
private final Supplier<Long> currentTimeSupplier;
private final Auditor auditor;
private final ConcurrentMap<String, Holder> runningDatafeeds = new ConcurrentHashMap<>();
public DatafeedJobRunner(ThreadPool threadPool, Client client, ClusterService clusterService, JobProvider jobProvider,
Supplier<Long> currentTimeSupplier, Auditor auditor) {
@ -95,6 +99,7 @@ public class DatafeedJobRunner extends AbstractComponent {
latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime();
}
Holder holder = createJobDatafeed(datafeed, job, latestFinalBucketEndMs, latestRecordTimeMs, handler, task);
runningDatafeeds.put(datafeedId, holder);
updateDatafeedState(task.getPersistentTaskId(), DatafeedState.STARTED, e -> {
if (e != null) {
handler.accept(e);
@ -105,6 +110,17 @@ public class DatafeedJobRunner extends AbstractComponent {
}, handler);
}
public synchronized void closeAllDatafeeds(String reason) {
int numDatafeeds = runningDatafeeds.size();
if (numDatafeeds != 0) {
logger.info("Closing [{}] datafeeds, because [{}]", numDatafeeds, reason);
}
for (Map.Entry<String, Holder> entry : runningDatafeeds.entrySet()) {
entry.getValue().stop(reason, TimeValue.timeValueSeconds(20), null);
}
}
// Important: Holder must be created and assigned to DatafeedTask before setting state to started,
// otherwise if a stop datafeed call is made immediately after the start datafeed call we could cancel
// the DatafeedTask without stopping datafeed, which causes the datafeed to keep on running.
@ -279,6 +295,7 @@ public class DatafeedJobRunner extends AbstractComponent {
} finally {
logger.info("[{}] stopping datafeed [{}] for job [{}], acquired [{}]...", source, datafeed.getId(),
datafeed.getJobId(), acquired);
runningDatafeeds.remove(datafeed.getId());
FutureUtils.cancel(future);
auditor.info(datafeed.getJobId(), Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_STOPPED));
handler.accept(e);

View File

@ -84,8 +84,6 @@ import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
@ -530,31 +528,6 @@ public class JobProvider {
false, partitionFieldValue, h, errorHandler);
}
// keep blocking variant around for ScoresUpdater as that can remain a blocking as this is ran from dedicated ml threadpool.
// also refactoring that to be non blocking is a lot of work.
public int expandBucket(String jobId, boolean includeInterim, Bucket bucket) {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Integer> holder = new AtomicReference<>();
AtomicReference<Exception> errorHolder = new AtomicReference<>();
expandBucket(jobId, includeInterim, bucket, null, 0, records -> {
holder.set(records);
latch.countDown();
}, e -> {
errorHolder.set(e);
latch.countDown();
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (errorHolder.get() != null) {
throw new RuntimeException(errorHolder.get());
} else {
return holder.get();
}
}
void bucketRecords(String jobId, Bucket bucket, int from, int size, boolean includeInterim, String sortField,
boolean descending, String partitionFieldValue, Consumer<QueryPage<AnomalyRecord>> handler,
Consumer<Exception> errorHandler) {

View File

@ -11,6 +11,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
@ -298,7 +299,9 @@ public class JobResultsPersister extends AbstractComponent {
String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId);
// Refresh should wait for Lucene to make the data searchable
logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName);
client.admin().indices().refresh(new RefreshRequest(indexName)).actionGet();
RefreshRequest refreshRequest = new RefreshRequest(indexName);
refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
client.admin().indices().refresh(refreshRequest).actionGet();
return true;
}
@ -314,6 +317,7 @@ public class JobResultsPersister extends AbstractComponent {
// Refresh should wait for Lucene to make the data searchable
logger.trace("[{}] ES API CALL: refresh index {}", jobId, indexName);
RefreshRequest refreshRequest = new RefreshRequest(indexName);
refreshRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
client.admin().indices().refresh(refreshRequest).actionGet();
return true;
}

View File

@ -89,15 +89,23 @@ public class AutodetectCommunicator implements Closeable {
@Override
public void close() throws IOException {
close(null);
close(false, null);
}
public void close(String errorReason) throws IOException {
/**
* Closes job this communicator is encapsulating.
*
* @param restart Whether the job should be restarted by persistent tasks
* @param reason The reason for closing the job
*/
public void close(boolean restart, String reason) throws IOException {
checkAndRun(() -> Messages.getMessage(Messages.JOB_DATA_CONCURRENT_USE_CLOSE, job.getId()), () -> {
LOGGER.info("[{}] job closing, reason [{}]", job.getId(), reason);
dataCountsReporter.close();
autodetectProcess.close();
autoDetectResultProcessor.awaitCompletion();
handler.accept(errorReason != null ? new ElasticsearchException(errorReason) : null);
handler.accept(restart ? new ElasticsearchException(reason) : null);
LOGGER.info("[{}] job closed", job.getId());
return null;
}, true);
}

View File

@ -49,6 +49,7 @@ import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -97,6 +98,17 @@ public class AutodetectProcessManager extends AbstractComponent {
this.autoDetectCommunicatorByJob = new ConcurrentHashMap<>();
}
public synchronized void closeAllJobs(String reason) throws IOException {
int numJobs = autoDetectCommunicatorByJob.size();
if (numJobs != 0) {
logger.info("Closing [{}] jobs, because [{}]", numJobs, reason);
}
for (Map.Entry<String, AutodetectCommunicator> entry : autoDetectCommunicatorByJob.entrySet()) {
closeJob(entry.getKey(), false, reason);
}
}
/**
* Passes data to the native process.
* This is a blocking call that won't return until all the data has been
@ -267,25 +279,25 @@ public class AutodetectProcessManager extends AbstractComponent {
/**
* Stop the running job and mark it as finished.<br>
* @param jobId The job to stop
* @param errorReason If caused by failure, the reason for closing the job
* @param restart Whether the job should be restarted by persistent tasks
* @param reason The reason for closing the job
*/
public void closeJob(String jobId, String errorReason) {
logger.debug("Attempting to close job [{}], because [{}]", jobId, errorReason);
public void closeJob(String jobId, boolean restart, String reason) {
logger.debug("Attempting to close job [{}], because [{}]", jobId, reason);
AutodetectCommunicator communicator = autoDetectCommunicatorByJob.remove(jobId);
if (communicator == null) {
logger.debug("Cannot close: no active autodetect process for job {}", jobId);
return;
}
if (errorReason == null) {
if (reason == null) {
logger.info("Closing job [{}]", jobId);
} else {
logger.info("Closing job [{}], because [{}]", jobId, errorReason);
logger.info("Closing job [{}], because [{}]", jobId, reason);
}
try {
communicator.close(errorReason);
logger.info("[{}] job closed", jobId);
communicator.close(restart, reason);
} catch (Exception e) {
logger.warn("Exception closing stopped process input stream", e);
throw ExceptionsHelper.serverError("Exception closing stopped process input stream", e);

View File

@ -14,8 +14,8 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.MlMetadata;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import java.util.function.Consumer;
@ -57,7 +57,8 @@ public class JobStateObserver {
@Override
public void onTimeout(TimeValue timeout) {
if (jobStatePredicate.test(clusterService.state())) {
ClusterState state = clusterService.state();
if (jobStatePredicate.test(state)) {
if (jobStatePredicate.failed) {
handler.accept(new ElasticsearchStatusException("[" + jobId + "] expected state [" + JobState.OPENED +
"] but got [" + JobState.FAILED +"]", RestStatus.CONFLICT));
@ -65,8 +66,10 @@ public class JobStateObserver {
handler.accept(null);
}
} else {
Exception e = new IllegalArgumentException("Timeout expired while waiting for job state to change to ["
+ expectedState + "]");
PersistentTasks tasks = state.getMetaData().custom(PersistentTasks.TYPE);
JobState actual = MlMetadata.getJobState(jobId, tasks);
Exception e = new IllegalArgumentException("Timeout expired while waiting for job state [" + actual +
"] to change to [" + expectedState + "]");
handler.accept(e);
}
}

View File

@ -6,8 +6,10 @@
package org.elasticsearch.license;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.license.License.OperationMode;
import org.elasticsearch.rest.RestStatus;
@ -23,8 +25,11 @@ import org.elasticsearch.xpack.ml.action.PutJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.action.StopDatafeedAction;
import org.elasticsearch.xpack.ml.client.MachineLearningClient;
import org.elasticsearch.xpack.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.support.BaseMlIntegTestCase;
import org.elasticsearch.xpack.persistent.PersistentActionResponse;
import org.elasticsearch.xpack.persistent.PersistentTasks;
import org.junit.Before;
import java.util.Collections;
@ -108,6 +113,13 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
mode = randomValidLicenseType();
enableLicensing(mode);
assertMLAllowed(true);
// now that the license is invalid, the job should get closed:
assertBusy(() -> {
JobState jobState = getJobStats("foo").getState();
assertEquals(JobState.CLOSED, jobState);
});
// test that license restricted apis do now work
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
@ -163,6 +175,102 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
}
}
public void testAutoCloseJobWithDatafeed() throws Exception {
assertMLAllowed(true);
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
// put job
PlainListenableActionFuture<PutJobAction.Response> putJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).putJob(new PutJobAction.Request(createJob("foo").build()), putJobListener);
PutJobAction.Response putJobResponse = putJobListener.actionGet();
assertNotNull(putJobResponse);
// put datafeed
PlainListenableActionFuture<PutDatafeedAction.Response> putDatafeedListener = new PlainListenableActionFuture<>(
client.threadPool());
new MachineLearningClient(client).putDatafeed(
new PutDatafeedAction.Request(createDatafeed("foobar", "foo", Collections.singletonList("foo"))), putDatafeedListener);
PutDatafeedAction.Response putDatafeedResponse = putDatafeedListener.actionGet();
assertNotNull(putDatafeedResponse);
// open job
PlainListenableActionFuture<PersistentActionResponse> openJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener);
PersistentActionResponse openJobResponse = openJobListener.actionGet();
assertNotNull(openJobResponse);
// start datafeed
PlainListenableActionFuture<PersistentActionResponse> listener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request("foobar", 0L), listener);
listener.actionGet();
}
if (randomBoolean()) {
enableLicensing(randomInvalidLicenseType());
} else {
disableLicensing();
}
assertMLAllowed(false);
// now that the license is invalid, the job should be closed and datafeed stopped:
assertBusy(() -> {
JobState jobState = getJobStats("foo").getState();
assertEquals(JobState.CLOSED, jobState);
DatafeedState datafeedState = getDatafeedStats("foobar").getDatafeedState();
assertEquals(DatafeedState.STOPPED, datafeedState);
ClusterState state = client().admin().cluster().prepareState().get().getState();
PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE);
assertEquals(0, tasks.taskMap().size());
});
enableLicensing(randomValidLicenseType());
assertMLAllowed(true);
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
// open job
PlainListenableActionFuture<PersistentActionResponse> openJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener);
PersistentActionResponse openJobResponse = openJobListener.actionGet();
assertNotNull(openJobResponse);
// start datafeed
PlainListenableActionFuture<PersistentActionResponse> listener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request("foobar", 0L), listener);
listener.actionGet();
}
assertBusy(() -> {
JobState jobState = getJobStats("foo").getState();
assertEquals(JobState.OPENED, jobState);
DatafeedState datafeedState = getDatafeedStats("foobar").getDatafeedState();
assertEquals(DatafeedState.STARTED, datafeedState);
ClusterState state = client().admin().cluster().prepareState().get().getState();
PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE);
assertEquals(2, tasks.taskMap().size());
});
if (randomBoolean()) {
enableLicensing(randomInvalidLicenseType());
} else {
disableLicensing();
}
assertMLAllowed(false);
// now that the license is invalid, the job should be closed and datafeed stopped:
assertBusy(() -> {
JobState jobState = getJobStats("foo").getState();
assertEquals(JobState.CLOSED, jobState);
DatafeedState datafeedState = getDatafeedStats("foobar").getDatafeedState();
assertEquals(DatafeedState.STOPPED, datafeedState);
ClusterState state = client().admin().cluster().prepareState().get().getState();
PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE);
assertEquals(0, tasks.taskMap().size());
});
}
public void testMachineLearningStartDatafeedActionRestricted() throws Exception {
assertMLAllowed(true);
@ -189,6 +297,16 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
License.OperationMode mode = randomInvalidLicenseType();
enableLicensing(mode);
assertMLAllowed(false);
// now that the license is invalid, the job should get closed:
assertBusy(() -> {
JobState jobState = getJobStats("foo").getState();
assertEquals(JobState.CLOSED, jobState);
ClusterState state = client().admin().cluster().prepareState().get().getState();
PersistentTasks tasks = state.metaData().custom(PersistentTasks.TYPE);
assertEquals(0, tasks.taskMap().size());
});
// test that license restricted apis do not work
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
@ -209,6 +327,12 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
// test that license restricted apis do now work
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
// re-open job now that the license is valid again
PlainListenableActionFuture<PersistentActionResponse> openJobListener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).openJob(new OpenJobAction.Request("foo"), openJobListener);
PersistentActionResponse openJobResponse = openJobListener.actionGet();
assertNotNull(openJobResponse);
PlainListenableActionFuture<PersistentActionResponse> listener = new PlainListenableActionFuture<>(client.threadPool());
new MachineLearningClient(client).startDatafeed(new StartDatafeedAction.Request("foobar", 0L), listener);
PersistentActionResponse response = listener.actionGet();
@ -243,18 +367,28 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
assertNotNull(startDatafeedResponse);
}
// Pick a random license
License.OperationMode mode = randomLicenseType();
enableLicensing(mode);
boolean invalidLicense = randomBoolean();
if (invalidLicense) {
enableLicensing(randomInvalidLicenseType());
} else {
enableLicensing(randomValidLicenseType());
}
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
PlainListenableActionFuture<StopDatafeedAction.Response> listener = new PlainListenableActionFuture<>(
client.threadPool());
new MachineLearningClient(client).stopDatafeed(new StopDatafeedAction.Request("foobar"), listener);
if (invalidLicense) {
// expected to because datafeeds is automatically stopped in case of invalid license,
// a license error should not be returned
Exception e = expectThrows(ElasticsearchStatusException.class, listener::actionGet);
assertEquals("datafeed already stopped, expected datafeed state [started], but got [stopped]", e.getMessage());
} else {
listener.actionGet();
}
}
}
public void testMachineLearningCloseJobActionNotRestricted() throws Exception {
@ -272,9 +406,12 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
assertNotNull(openJobResponse);
}
// Pick a random license
License.OperationMode mode = randomLicenseType();
enableLicensing(mode);
boolean invalidLicense = randomBoolean();
if (invalidLicense) {
enableLicensing(randomInvalidLicenseType());
} else {
enableLicensing(randomValidLicenseType());
}
try (TransportClient client = new TestXPackTransportClient(internalCluster().transportClient().settings())) {
client.addTransportAddress(internalCluster().getDataNodeInstance(Transport.class).boundAddress().publishAddress());
@ -282,9 +419,15 @@ public class MachineLearningLicensingTests extends BaseMlIntegTestCase {
CloseJobAction.Request request = new CloseJobAction.Request("foo");
request.setTimeout(TimeValue.timeValueSeconds(30));
new MachineLearningClient(client).closeJob(request, listener);
if (invalidLicense) {
// so the license expired then job closes automatically, so an error is expected:
Exception e = expectThrows(ElasticsearchStatusException.class, listener::actionGet);
assertEquals("cannot close job, expected job state [opened], but got [closed]", e.getMessage());
} else {
listener.actionGet();
}
}
}
public void testMachineLearningDeleteJobActionNotRestricted() throws Exception {

View File

@ -14,7 +14,6 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
@ -190,7 +189,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
Exception e = holder[0];
assertEquals("max running job capacity [3] reached", e.getMessage());
manager.closeJob("baz", null);
manager.closeJob("baz", false, null);
assertEquals(2, manager.numberOfOpenJobs());
manager.openJob("foobar", 4L, false, e1 -> {});
assertEquals(3, manager.numberOfOpenJobs());
@ -230,7 +229,7 @@ public class AutodetectProcessManagerTests extends ESTestCase {
// job is created
assertEquals(1, manager.numberOfOpenJobs());
manager.closeJob("foo", null);
manager.closeJob("foo", false, null);
assertEquals(0, manager.numberOfOpenJobs());
}

View File

@ -185,6 +185,7 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
@After
public void cleanupWorkaround() throws Exception {
logger.info("[{}#{}]: Cleaning up datafeeds and jobs after test", getTestClass().getSimpleName(), getTestName());
deleteAllDatafeeds(client());
deleteAllJobs(client());
assertBusy(() -> {
@ -214,13 +215,32 @@ public abstract class BaseMlIntegTestCase extends ESIntegTestCase {
logger.info("Indexed [{}] documents", numDocs);
}
public static DataCounts getDataCounts(String jobId) {
public static GetJobsStatsAction.Response.JobStats getJobStats(String jobId) {
GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId);
GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).actionGet();
if (response.getResponse().results().isEmpty()) {
return new DataCounts(jobId);
return null;
} else {
return response.getResponse().results().get(0).getDataCounts();
return response.getResponse().results().get(0);
}
}
public static DataCounts getDataCounts(String jobId) {
GetJobsStatsAction.Response.JobStats jobStats = getJobStats(jobId);
if (jobStats != null) {
return jobStats.getDataCounts();
} else {
return new DataCounts(jobId);
}
}
public static GetDatafeedsStatsAction.Response.DatafeedStats getDatafeedStats(String datafeedId) {
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId);
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
if (response.getResponse().results().isEmpty()) {
return null;
} else {
return response.getResponse().results().get(0);
}
}