[7.x][ML] Make PUT data frame analytics action a master node action (… (#47433)

While it seemed like the PUT data frame analytics action did not
have to be a master node action as the config is stored in an index
rather than the cluster state, there are other subtle nuances which
make it worthwhile to convert it. In particular, it helps maintain
order of execution for put actions which are anyhow user driven and
are expected to have low volume.

This commit converts `TransportPutDataFrameAnalyticsAction` from
a handled transport action to a master node action.

Note this means that the action might fail in a mixed cluster
but as the API is still experimental and not widely used there will
be few moments more suitable to make this change than now.
This commit is contained in:
Dimitris Athanasiou 2019-10-02 16:24:21 +03:00 committed by GitHub
parent f7980e9745
commit b9541eb3af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 17 deletions

View File

@ -119,7 +119,7 @@ public class PutDataFrameAnalyticsAction extends ActionType<PutDataFrameAnalytic
Response() {}
Response(StreamInput in) throws IOException {
public Response(StreamInput in) throws IOException {
super(in);
config = new DataFrameAnalyticsConfig(in);
}

View File

@ -11,13 +11,16 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -54,17 +57,14 @@ import java.util.Map;
import java.util.Objects;
public class TransportPutDataFrameAnalyticsAction
extends HandledTransportAction<PutDataFrameAnalyticsAction.Request, PutDataFrameAnalyticsAction.Response> {
extends TransportMasterNodeAction<PutDataFrameAnalyticsAction.Request, PutDataFrameAnalyticsAction.Response> {
private static final Logger logger = LogManager.getLogger(TransportPutDataFrameAnalyticsAction.class);
private final XPackLicenseState licenseState;
private final DataFrameAnalyticsConfigProvider configProvider;
private final ThreadPool threadPool;
private final SecurityContext securityContext;
private final Client client;
private final ClusterService clusterService;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final DataFrameAnalyticsAuditor auditor;
private volatile ByteSizeValue maxModelMemoryLimit;
@ -74,15 +74,13 @@ public class TransportPutDataFrameAnalyticsAction
XPackLicenseState licenseState, Client client, ThreadPool threadPool,
ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
DataFrameAnalyticsConfigProvider configProvider, DataFrameAnalyticsAuditor auditor) {
super(PutDataFrameAnalyticsAction.NAME, transportService, actionFilters, PutDataFrameAnalyticsAction.Request::new);
super(PutDataFrameAnalyticsAction.NAME, transportService, clusterService, threadPool, actionFilters,
PutDataFrameAnalyticsAction.Request::new, indexNameExpressionResolver);
this.licenseState = licenseState;
this.configProvider = configProvider;
this.threadPool = threadPool;
this.securityContext = XPackSettings.SECURITY_ENABLED.get(settings) ?
new SecurityContext(settings, threadPool.getThreadContext()) : null;
this.client = client;
this.clusterService = clusterService;
this.indexNameExpressionResolver = Objects.requireNonNull(indexNameExpressionResolver);
this.auditor = Objects.requireNonNull(auditor);
maxModelMemoryLimit = MachineLearningField.MAX_MODEL_MEMORY_LIMIT.get(settings);
@ -95,12 +93,23 @@ public class TransportPutDataFrameAnalyticsAction
}
@Override
protected void doExecute(Task task, PutDataFrameAnalyticsAction.Request request,
ActionListener<PutDataFrameAnalyticsAction.Response> listener) {
if (licenseState.isMachineLearningAllowed() == false) {
listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING));
return;
}
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected PutDataFrameAnalyticsAction.Response read(StreamInput in) throws IOException {
return new PutDataFrameAnalyticsAction.Response(in);
}
@Override
protected ClusterBlockException checkBlock(PutDataFrameAnalyticsAction.Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
@Override
protected void masterOperation(PutDataFrameAnalyticsAction.Request request, ClusterState state,
ActionListener<PutDataFrameAnalyticsAction.Response> listener) {
validateConfig(request.getConfig());
DataFrameAnalyticsConfig memoryCappedConfig =
new DataFrameAnalyticsConfig.Builder(request.getConfig(), maxModelMemoryLimit)
@ -137,7 +146,7 @@ public class TransportPutDataFrameAnalyticsAction
ActionListener.wrap(
indexResponse -> listener.onResponse(new PutDataFrameAnalyticsAction.Response(memoryCappedConfig)),
listener::onFailure
));
));
}
}
@ -204,6 +213,15 @@ public class TransportPutDataFrameAnalyticsAction
}
config.getDest().validate();
new SourceDestValidator(clusterService.state(), indexNameExpressionResolver).check(config);
}
@Override
protected void doExecute(Task task, PutDataFrameAnalyticsAction.Request request,
ActionListener<PutDataFrameAnalyticsAction.Response> listener) {
if (licenseState.isMachineLearningAllowed()) {
super.doExecute(task, request, listener);
} else {
listener.onFailure(LicenseUtils.newComplianceException(XPackField.MACHINE_LEARNING));
}
}
}