[ML] Index job and datafeed assignment notifications via a separate cluster state listener.

`PersistentTasksExecutor#getAssignment(...)` should be a cheap and side-effect free method,
but in case of `OpenJobPersistentTasksExecutor` and `StartDatafeedPersistentTasksExecutor` before this change it would index a document each time `getAssignment(...)` was invoked

Original commit: elastic/x-pack-elasticsearch@5ca5890baf
This commit is contained in:
Martijn van Groningen 2017-04-10 17:06:52 +02:00
parent ecc19d9948
commit 018a3d197d
5 changed files with 213 additions and 79 deletions

View File

@ -54,7 +54,6 @@ import org.elasticsearch.xpack.ml.action.GetJobsAction;
import org.elasticsearch.xpack.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.ml.action.GetModelSnapshotsAction;
import org.elasticsearch.xpack.ml.action.GetRecordsAction;
import org.elasticsearch.xpack.common.action.XPackDeleteByQueryAction;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.PostDataAction;
import org.elasticsearch.xpack.ml.action.PreviewDatafeedAction;
@ -304,10 +303,8 @@ public class MachineLearning implements ActionPlugin {
new InvalidLicenseEnforcer(settings, licenseState, threadPool, datafeedManager, autodetectProcessManager);
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY, Arrays.asList(
new OpenJobAction.OpenJobPersistentTasksExecutor(settings, threadPool, licenseState, clusterService,
autodetectProcessManager, auditor),
new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, threadPool, licenseState,
datafeedManager, auditor)
new OpenJobAction.OpenJobPersistentTasksExecutor(settings, licenseState, clusterService, autodetectProcessManager),
new StartDatafeedAction.StartDatafeedPersistentTasksExecutor(settings, licenseState, datafeedManager)
));
return Arrays.asList(
@ -323,7 +320,8 @@ public class MachineLearning implements ActionPlugin {
persistentTasksExecutorRegistry,
new PersistentTasksClusterService(Settings.EMPTY, persistentTasksExecutorRegistry, clusterService),
auditor,
invalidLicenseEnforcer
invalidLicenseEnforcer,
new MlAssignmentNotifier(settings, auditor, clusterService)
);
}

View File

@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
public class MlAssignmentNotifier extends AbstractComponent implements ClusterStateListener, LocalNodeMasterListener {
private final Auditor auditor;
private final ClusterService clusterService;
private final AtomicBoolean enabled = new AtomicBoolean(false);
MlAssignmentNotifier(Settings settings, Auditor auditor, ClusterService clusterService) {
super(settings);
this.auditor = auditor;
this.clusterService = clusterService;
clusterService.addLocalNodeMasterListener(this);
}
@Override
public void onMaster() {
if (enabled.compareAndSet(false, true)) {
clusterService.addListener(this);
}
}
@Override
public void offMaster() {
if (enabled.compareAndSet(true, false)) {
clusterService.removeListener(this);
}
}
@Override
public String executorName() {
return ThreadPool.Names.GENERIC;
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (enabled.get() == false) {
return;
}
if (event.metaDataChanged() == false) {
return;
}
PersistentTasksCustomMetaData previous = event.previousState().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
PersistentTasksCustomMetaData current = event.state().getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
if (Objects.equals(previous, current)) {
return;
}
for (PersistentTask<?> currentTask : current.tasks()) {
Assignment currentAssignment = currentTask.getAssignment();
PersistentTask<?> previousTask = previous != null ? previous.getTask(currentTask.getId()) : null;
Assignment previousAssignment = previousTask != null ? previousTask.getAssignment() : null;
if (Objects.equals(currentAssignment, previousAssignment)) {
continue;
}
if (OpenJobAction.NAME.equals(currentTask.getTaskName())) {
String jobId = ((OpenJobAction.Request) currentTask.getRequest()).getJobId();
if (currentAssignment.getExecutorNode() == null) {
auditor.warning(jobId, "No node found to open job. Reasons [" + currentAssignment.getExplanation() + "]");
} else {
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
auditor.info(jobId, "Opening job on node [" + node.toString() + "]");
}
} else if (StartDatafeedAction.NAME.equals(currentTask.getTaskName())) {
String datafeedId = ((StartDatafeedAction.Request) currentTask.getRequest()).getDatafeedId();
MlMetadata mlMetadata = event.state().getMetaData().custom(MlMetadata.TYPE);
DatafeedConfig datafeedConfig = mlMetadata.getDatafeed(datafeedId);
if (currentAssignment.getExecutorNode() == null) {
String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" +
currentAssignment.getExplanation() + "]";
logger.warn("[{}] {}", datafeedConfig.getJobId(), msg);
auditor.warning(datafeedConfig.getJobId(), msg);
} else {
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
auditor.info(datafeedConfig.getJobId(), "Starting datafeed [" + datafeedId + "] on node [" + node + "]");
}
}
}
}
}

View File

@ -29,7 +29,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -48,7 +47,6 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.ml.job.persistence.AnomalyDetectorsIndex;
import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.AllocatedPersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
@ -377,19 +375,14 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
private final AutodetectProcessManager autodetectProcessManager;
private final XPackLicenseState licenseState;
private final Auditor auditor;
private final ThreadPool threadPool;
private volatile int maxConcurrentJobAllocations;
public OpenJobPersistentTasksExecutor(Settings settings, ThreadPool threadPool, XPackLicenseState licenseState,
ClusterService clusterService, AutodetectProcessManager autodetectProcessManager,
Auditor auditor) {
public OpenJobPersistentTasksExecutor(Settings settings, XPackLicenseState licenseState,
ClusterService clusterService, AutodetectProcessManager autodetectProcessManager) {
super(settings, NAME, ThreadPool.Names.MANAGEMENT);
this.licenseState = licenseState;
this.autodetectProcessManager = autodetectProcessManager;
this.auditor = auditor;
this.threadPool = threadPool;
this.maxConcurrentJobAllocations = MachineLearning.CONCURRENT_JOB_ALLOCATIONS.get(settings);
clusterService.getClusterSettings()
.addSettingsUpdateConsumer(MachineLearning.CONCURRENT_JOB_ALLOCATIONS, this::setMaxConcurrentJobAllocations);
@ -397,9 +390,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
@Override
public Assignment getAssignment(Request request, ClusterState clusterState) {
Assignment assignment = selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, logger);
writeAssignmentNotification(request.getJobId(), assignment, clusterState);
return assignment;
return selectLeastLoadedMlNode(request.getJobId(), clusterState, maxConcurrentJobAllocations, logger);
}
@Override
@ -415,7 +406,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
String msg = "Could not open job because no suitable nodes were found, allocation explanation ["
+ assignment.getExplanation() + "]";
logger.warn("[{}] {}", request.getJobId(), msg);
auditor.warning(request.getJobId(), msg);
throw new ElasticsearchStatusException(msg, RestStatus.TOO_MANY_REQUESTS);
}
} else {
@ -441,27 +431,6 @@ public class OpenJobAction extends Action<OpenJobAction.Request, OpenJobAction.R
this.maxConcurrentJobAllocations, maxConcurrentJobAllocations);
this.maxConcurrentJobAllocations = maxConcurrentJobAllocations;
}
private void writeAssignmentNotification(String jobId, Assignment assignment, ClusterState state) {
// Forking as this code is called from cluster state update thread:
// Should be ok as auditor uses index api which has its own tp
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("Failed to write assignment notification for job [" + jobId + "]", e);
}
@Override
protected void doRun() throws Exception {
if (assignment.getExecutorNode() == null) {
auditor.warning(jobId, "No node found to open job. Reasons [" + assignment.getExplanation() + "]");
} else {
DiscoveryNode node = state.nodes().get(assignment.getExecutorNode());
auditor.info(jobId, "Opening job on node [" + node.toString() + "]");
}
}
});
}
}
/**

View File

@ -20,7 +20,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.common.Nullable;
@ -31,7 +30,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.joda.DateMathParser;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -54,7 +52,6 @@ import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.job.config.JobTaskStatus;
import org.elasticsearch.xpack.ml.job.messages.Messages;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.persistent.AllocatedPersistentTask;
import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
@ -406,25 +403,18 @@ public class StartDatafeedAction
public static class StartDatafeedPersistentTasksExecutor extends PersistentTasksExecutor<Request> {
private final DatafeedManager datafeedManager;
private final XPackLicenseState licenseState;
private final Auditor auditor;
private final ThreadPool threadPool;
private final IndexNameExpressionResolver resolver;
public StartDatafeedPersistentTasksExecutor(Settings settings, ThreadPool threadPool, XPackLicenseState licenseState,
DatafeedManager datafeedManager, Auditor auditor) {
public StartDatafeedPersistentTasksExecutor(Settings settings, XPackLicenseState licenseState, DatafeedManager datafeedManager) {
super(settings, NAME, ThreadPool.Names.MANAGEMENT);
this.licenseState = licenseState;
this.datafeedManager = datafeedManager;
this.auditor = auditor;
this.threadPool = threadPool;
this.resolver = new IndexNameExpressionResolver(settings);
}
@Override
public Assignment getAssignment(Request request, ClusterState clusterState) {
Assignment assignment = selectNode(logger, request.getDatafeedId(), clusterState, resolver);
writeAssignmentNotification(request.getDatafeedId(), assignment, clusterState);
return assignment;
return selectNode(logger, request.getDatafeedId(), clusterState, resolver);
}
@Override
@ -461,33 +451,6 @@ public class StartDatafeedAction
});
}
private void writeAssignmentNotification(String datafeedId, Assignment assignment, ClusterState state) {
// Forking as this code is called from cluster state update thread:
// Should be ok as auditor uses index api which has its own tp
threadPool.executor(ThreadPool.Names.GENERIC).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("Failed to write assignment notification for datafeed [" + datafeedId + "]", e);
}
@Override
protected void doRun() throws Exception {
MlMetadata mlMetadata = state.metaData().custom(MlMetadata.TYPE);
DatafeedConfig datafeed = mlMetadata.getDatafeed(datafeedId);
String jobId = datafeed.getJobId();
if (assignment.getExecutorNode() == null) {
String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" +
assignment.getExplanation() + "]";
logger.warn("[{}] {}", datafeed.getJobId(), msg);
auditor.warning(jobId, msg);
} else {
DiscoveryNode node = state.nodes().get(assignment.getExecutorNode());
auditor.info(jobId, "Starting datafeed [" + datafeedId + "] on node [" + node + "]");
}
}
});
}
}
static void validate(String datafeedId, MlMetadata mlMetadata, PersistentTasksCustomMetaData tasks) {

View File

@ -0,0 +1,99 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.action.OpenJobAction;
import org.elasticsearch.xpack.ml.notifications.Auditor;
import org.elasticsearch.xpack.persistent.PersistentTaskRequest;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.xpack.persistent.PersistentTasksCustomMetaData.PersistentTask;
import java.net.InetAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
public class MlAssignmentNotifierTests extends ESTestCase {
public void testClusterChanged_info() throws Exception {
Auditor auditor = mock(Auditor.class);
ClusterService clusterService = mock(ClusterService.class);
MlAssignmentNotifier notifier = new MlAssignmentNotifier(Settings.EMPTY, auditor, clusterService);
notifier.onMaster();
DiscoveryNode node =
new DiscoveryNode("node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT);
ClusterState previous = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
new PersistentTasksCustomMetaData(0L, Collections.emptyMap())))
.build();
Map<Long, PersistentTask<?>> tasks = new HashMap<>();
tasks.put(0L, new PersistentTask<PersistentTaskRequest>(0L, OpenJobAction.NAME,
new OpenJobAction.Request("job_id"), new Assignment("node_id", "")));
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
new PersistentTasksCustomMetaData(0L, tasks)).build();
ClusterState state = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
.nodes(DiscoveryNodes.builder().add(node))
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
verify(auditor, times(1)).info(eq("job_id"), any());
notifier.offMaster();
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
verifyNoMoreInteractions(auditor);
}
public void testClusterChanged_warning() throws Exception {
Auditor auditor = mock(Auditor.class);
ClusterService clusterService = mock(ClusterService.class);
MlAssignmentNotifier notifier = new MlAssignmentNotifier(Settings.EMPTY, auditor, clusterService);
notifier.onMaster();
ClusterState previous = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
new PersistentTasksCustomMetaData(0L, Collections.emptyMap())))
.build();
Map<Long, PersistentTask<?>> tasks = new HashMap<>();
tasks.put(0L, new PersistentTask<PersistentTaskRequest>(0L, OpenJobAction.NAME,
new OpenJobAction.Request("job_id"), new Assignment(null, "no nodes")));
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
new PersistentTasksCustomMetaData(0L, tasks)).build();
ClusterState state = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
verify(auditor, times(1)).warning(eq("job_id"), any());
notifier.offMaster();
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
verifyNoMoreInteractions(auditor);
}
}