[7.x] [ML][Data Frame] Add optional defer_validation param to PUT (#44455) (#44697)

* [ML][Data Frame] Add optional defer_validation param to PUT (#44455)

* [ML][Data Frame] Add optional defer_validation param to PUT

* addressing PR comments

* reverting bad replace

* addressing pr comments

* Update put-transform.asciidoc

* Update put-transform.asciidoc

* Update put-transform.asciidoc

* adjusting for backport

* fixing imports

* [DOCS] Fixes formatting in  create data frame transform API
This commit is contained in:
Benjamin Trent 2019-07-22 15:12:55 -05:00 committed by GitHub
parent 06e21f7902
commit 4456850a8e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 318 additions and 120 deletions

View File

@ -39,6 +39,7 @@ import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TY
import static org.elasticsearch.client.RequestConverters.createEntity;
import static org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest.FORCE;
import static org.elasticsearch.client.dataframe.GetDataFrameTransformRequest.ALLOW_NO_MATCH;
import static org.elasticsearch.client.dataframe.PutDataFrameTransformRequest.DEFER_VALIDATION;
final class DataFrameRequestConverters {
@ -51,6 +52,9 @@ final class DataFrameRequestConverters {
.build();
Request request = new Request(HttpPut.METHOD_NAME, endpoint);
request.setEntity(createEntity(putRequest, REQUEST_BODY_CONTENT_TYPE));
if (putRequest.getDeferValidation() != null) {
request.addParameter(DEFER_VALIDATION, Boolean.toString(putRequest.getDeferValidation()));
}
return request;
}

View File

@ -31,7 +31,9 @@ import java.util.Optional;
public class PutDataFrameTransformRequest implements ToXContentObject, Validatable {
public static final String DEFER_VALIDATION = "defer_validation";
private final DataFrameTransformConfig config;
private Boolean deferValidation;
public PutDataFrameTransformRequest(DataFrameTransformConfig config) {
this.config = config;
@ -41,6 +43,19 @@ public class PutDataFrameTransformRequest implements ToXContentObject, Validatab
return config;
}
public Boolean getDeferValidation() {
return deferValidation;
}
/**
* Indicates if deferrable validations should be skipped until the transform starts
*
* @param deferValidation {@code true} will cause validations to be deferred
*/
public void setDeferValidation(boolean deferValidation) {
this.deferValidation = deferValidation;
}
@Override
public Optional<ValidationException> validate() {
ValidationException validationException = new ValidationException();

View File

@ -67,7 +67,7 @@ public class DataFrameRequestConvertersTests extends ESTestCase {
PutDataFrameTransformRequest putRequest = new PutDataFrameTransformRequest(
DataFrameTransformConfigTests.randomDataFrameTransformConfig());
Request request = DataFrameRequestConverters.putDataFrameTransform(putRequest);
assertThat(request.getParameters(), not(hasKey("defer_validation")));
assertEquals(HttpPut.METHOD_NAME, request.getMethod());
assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/" + putRequest.getConfig().getId()));
@ -75,6 +75,9 @@ public class DataFrameRequestConvertersTests extends ESTestCase {
DataFrameTransformConfig parsedConfig = DataFrameTransformConfig.PARSER.apply(parser, null);
assertThat(parsedConfig, equalTo(putRequest.getConfig()));
}
putRequest.setDeferValidation(true);
request = DataFrameRequestConverters.putDataFrameTransform(putRequest);
assertThat(request.getParameters(), hasEntry("defer_validation", Boolean.toString(putRequest.getDeferValidation())));
}
public void testDeleteDataFrameTransform() {

View File

@ -181,6 +181,22 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase {
assertThat(deleteError.getMessage(), containsString("Transform with id [test-crud] could not be found"));
}
public void testCreateDeleteWithDefer() throws IOException {
String sourceIndex = "missing-source-index";
String id = "test-with-defer";
DataFrameTransformConfig transform = validDataFrameTransformConfig(id, sourceIndex, "pivot-dest");
DataFrameClient client = highLevelClient().dataFrame();
PutDataFrameTransformRequest request = new PutDataFrameTransformRequest(transform);
request.setDeferValidation(true);
AcknowledgedResponse ack = execute(request, client::putDataFrameTransform, client::putDataFrameTransformAsync);
assertTrue(ack.isAcknowledged());
ack = execute(new DeleteDataFrameTransformRequest(transform.getId()), client::deleteDataFrameTransform,
client::deleteDataFrameTransformAsync);
assertTrue(ack.isAcknowledged());
}
public void testGetTransform() throws IOException {
String sourceIndex = "transform-source";
createIndex(sourceIndex);

View File

@ -166,6 +166,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
// tag::put-data-frame-transform-request
PutDataFrameTransformRequest request =
new PutDataFrameTransformRequest(transformConfig); // <1>
request.setDeferValidation(false); // <2>
// end::put-data-frame-transform-request
// tag::put-data-frame-transform-execute

View File

@ -20,6 +20,10 @@ A +{request}+ requires the following argument:
include-tagged::{doc-tests-file}[{api}-request]
--------------------------------------------------
<1> The configuration of the {dataframe-transform} to create
<2> Whether or not to wait to run deferrable validations until `_start` is called.
This option should be used with care as the created {dataframe-transform} will run
with the privileges of the user creating it. Meaning, if they do not have privileges,
such an error will not be visible until `_start` is called.
[id="{upid}-{api}-config"]
==== Data Frame Transform Configuration

View File

@ -45,6 +45,18 @@ IMPORTANT: You must use {kib} or this API to create a {dataframe-transform}.
can contain lowercase alphanumeric characters (a-z and 0-9), hyphens, and
underscores. It must start and end with alphanumeric characters.
[[put-data-frame-transform-query-parms]]
==== {api-query-parms-title}
`defer_validation`::
(Optional, boolean) When `true`, deferrable validations are not run. This
behavior may be desired if the source index does not exist until after the
{dataframe-transform} is created. Deferred validations are always run when the
{dataframe-transform} is started, with the exception of privilege checks. If the
user who created the transform does not have the required privileges on the
source and destination indices, the transform starts but then fails when it
attempts the unauthorized operation. The default value is `false`.
[[put-data-frame-transform-request-body]]
==== {api-request-body-title}

View File

@ -33,6 +33,7 @@ public final class DataFrameField {
public static final ParseField SYNC = new ParseField("sync");
public static final ParseField TIME_BASED_SYNC = new ParseField("time");
public static final ParseField DELAY = new ParseField("delay");
public static final ParseField DEFER_VALIDATION = new ParseField("defer_validation");
public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match");
/**

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
@ -13,8 +14,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
@ -41,21 +40,28 @@ public class PutDataFrameTransformAction extends ActionType<AcknowledgedResponse
super(NAME, AcknowledgedResponse::new);
}
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {
public static class Request extends AcknowledgedRequest<Request> {
private final DataFrameTransformConfig config;
private final boolean deferValidation;
public Request(DataFrameTransformConfig config) {
public Request(DataFrameTransformConfig config, boolean deferValidation) {
this.config = config;
this.deferValidation = deferValidation;
}
public Request(StreamInput in) throws IOException {
super(in);
this.config = new DataFrameTransformConfig(in);
if (in.getVersion().onOrAfter(Version.V_7_4_0)) {
this.deferValidation = in.readBoolean();
} else {
this.deferValidation = false;
}
}
public static Request fromXContent(final XContentParser parser, final String id) throws IOException {
return new Request(DataFrameTransformConfig.fromXContent(parser, id, false));
public static Request fromXContent(final XContentParser parser, final String id, final boolean deferValidation) {
return new Request(DataFrameTransformConfig.fromXContent(parser, id, false), deferValidation);
}
/**
@ -111,24 +117,26 @@ public class PutDataFrameTransformAction extends ActionType<AcknowledgedResponse
return validationException;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return this.config.toXContent(builder, params);
}
public DataFrameTransformConfig getConfig() {
return config;
}
public boolean isDeferValidation() {
return deferValidation;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
this.config.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_7_4_0)) {
out.writeBoolean(this.deferValidation);
}
}
@Override
public int hashCode() {
return Objects.hash(config);
return Objects.hash(config, deferValidation);
}
@Override
@ -140,7 +148,7 @@ public class PutDataFrameTransformAction extends ActionType<AcknowledgedResponse
return false;
}
Request other = (Request) obj;
return Objects.equals(config, other.config);
return Objects.equals(config, other.config) && this.deferValidation == other.deferValidation;
}
}

View File

@ -6,16 +6,24 @@
package org.elasticsearch.xpack.core.dataframe.action;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xpack.core.dataframe.DataFrameField;
import org.elasticsearch.xpack.core.dataframe.action.PutDataFrameTransformAction.Request;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfigTests;
import org.elasticsearch.xpack.core.dataframe.transforms.SyncConfig;
import org.elasticsearch.xpack.core.dataframe.transforms.TimeSyncConfig;
import org.junit.Before;
import java.io.IOException;
import java.util.List;
public class PutDataFrameTransformActionRequestTests extends AbstractSerializingDataFrameTestCase<Request> {
import static java.util.Collections.emptyList;
public class PutDataFrameTransformActionRequestTests extends AbstractWireSerializingTestCase<Request> {
private String transformId;
@Before
@ -23,24 +31,24 @@ public class PutDataFrameTransformActionRequestTests extends AbstractSerializing
transformId = randomAlphaOfLengthBetween(1, 10);
}
@Override
protected Request doParseInstance(XContentParser parser) throws IOException {
return Request.fromXContent(parser, transformId);
}
@Override
protected Writeable.Reader<Request> instanceReader() {
return Request::new;
}
@Override
protected boolean supportsUnknownFields() {
return false;
protected Request createTestInstance() {
DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfigWithoutHeaders(transformId);
return new Request(config, randomBoolean());
}
@Override
protected Request createTestInstance() {
DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfigWithoutHeaders(transformId);
return new Request(config);
protected NamedWriteableRegistry getNamedWriteableRegistry() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
List<NamedWriteableRegistry.Entry> namedWriteables = searchModule.getNamedWriteables();
namedWriteables.add(new NamedWriteableRegistry.Entry(SyncConfig.class, DataFrameField.TIME_BASED_SYNC.getPreferredName(),
TimeSyncConfig::new));
return new NamedWriteableRegistry(namedWriteables);
}
}

View File

@ -56,8 +56,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class TransportPutDataFrameTransformAction
extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
public class TransportPutDataFrameTransformAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
private final XPackLicenseState licenseState;
private final Client client;
@ -120,14 +119,14 @@ public class TransportPutDataFrameTransformAction
return;
}
try {
SourceDestValidator.check(config, clusterState, indexNameExpressionResolver);
SourceDestValidator.validate(config, clusterState, indexNameExpressionResolver, request.isDeferValidation());
} catch (ElasticsearchStatusException ex) {
listener.onFailure(ex);
return;
}
// Early check to verify that the user can create the destination index and can read from the source
if (licenseState.isAuthAllowed()) {
if (licenseState.isAuthAllowed() && request.isDeferValidation() == false) {
final String destIndex = config.getDestination().getIndex();
final String[] concreteDest = indexNameExpressionResolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(),
@ -162,12 +161,12 @@ public class TransportPutDataFrameTransformAction
privRequest.clusterPrivileges(Strings.EMPTY_ARRAY);
privRequest.indexPrivileges(sourceIndexPrivileges, destIndexPrivileges);
ActionListener<HasPrivilegesResponse> privResponseListener = ActionListener.wrap(
r -> handlePrivsResponse(username, config, r, listener),
r -> handlePrivsResponse(username, request, r, listener),
listener::onFailure);
client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener);
} else { // No security enabled, just create the transform
putDataFrame(config, listener);
putDataFrame(request, listener);
}
}
@ -177,11 +176,11 @@ public class TransportPutDataFrameTransformAction
}
private void handlePrivsResponse(String username,
DataFrameTransformConfig config,
Request request,
HasPrivilegesResponse privilegesResponse,
ActionListener<AcknowledgedResponse> listener) throws IOException {
ActionListener<AcknowledgedResponse> listener) {
if (privilegesResponse.isCompleteMatch()) {
putDataFrame(config, listener);
putDataFrame(request, listener);
} else {
List<String> indices = privilegesResponse.getIndexPrivileges()
.stream()
@ -190,14 +189,15 @@ public class TransportPutDataFrameTransformAction
listener.onFailure(Exceptions.authorizationError(
"Cannot create data frame transform [{}] because user {} lacks all the required permissions for indices: {}",
config.getId(),
request.getConfig().getId(),
username,
indices));
}
}
private void putDataFrame(DataFrameTransformConfig config, ActionListener<AcknowledgedResponse> listener) {
private void putDataFrame(Request request, ActionListener<AcknowledgedResponse> listener) {
final DataFrameTransformConfig config = request.getConfig();
final Pivot pivot = new Pivot(config.getPivotConfig());
// <3> Return to the listener
@ -213,11 +213,23 @@ public class TransportPutDataFrameTransformAction
ActionListener<Boolean> pivotValidationListener = ActionListener.wrap(
validationResult -> dataFrameTransformsConfigManager.putTransformConfiguration(config, putTransformConfigurationListener),
validationException -> listener.onFailure(
new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION,
validationException))
new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION,
validationException))
);
// <1> Validate our pivot
pivot.validate(client, config.getSource(), pivotValidationListener);
try {
pivot.validateConfig();
} catch (Exception e) {
listener.onFailure(
new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION,
e));
return;
}
if (request.isDeferValidation()) {
pivotValidationListener.onResponse(true);
} else {
pivot.validateQuery(client, config.getSource(), pivotValidationListener);
}
}
}

View File

@ -181,7 +181,7 @@ public class TransportStartDataFrameTransformAction extends
return;
}
// Validate source and destination indices
SourceDestValidator.check(config, clusterService.state(), indexNameExpressionResolver);
SourceDestValidator.validate(config, clusterService.state(), indexNameExpressionResolver, false);
transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion(), config.getFrequency()));
final String destinationIndex = config.getDestination().getIndex();

View File

@ -141,7 +141,7 @@ public class TransportStopDataFrameTransformAction extends
}
if (ids.contains(transformTask.getTransformId())) {
// This should not occur as we validate that none of the tasks are in a failed state earlier
// This should not occur as we check that none of the tasks are in a failed state earlier
// Keep this check in here for insurance.
if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) {
listener.onFailure(

View File

@ -35,7 +35,8 @@ public class RestPutDataFrameTransformAction extends BaseRestHandler {
String id = restRequest.param(DataFrameField.ID.getPreferredName());
XContentParser parser = restRequest.contentParser();
PutDataFrameTransformAction.Request request = PutDataFrameTransformAction.Request.fromXContent(parser, id);
boolean deferValidation = restRequest.paramAsBoolean(DataFrameField.DEFER_VALIDATION.getPreferredName(), false);
PutDataFrameTransformAction.Request request = PutDataFrameTransformAction.Request.fromXContent(parser, id, deferValidation);
return channel -> client.execute(PutDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel));
}

View File

@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfi
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
@ -26,7 +27,14 @@ import java.util.Set;
*/
public final class SourceDestValidator {
private SourceDestValidator() {}
interface SourceDestValidation {
boolean isDeferrable();
void validate(DataFrameTransformConfig config, ClusterState clusterState, IndexNameExpressionResolver indexNameExpressionResolver);
}
private static final List<SourceDestValidation> VALIDATIONS = Arrays.asList(new SourceMissingValidation(),
new DestinationInSourceValidation(),
new DestinationSingleIndexValidation());
/**
* Validates the DataFrameTransformConfiguration source and destination indices.
@ -41,52 +49,111 @@ public final class SourceDestValidator {
* @param indexNameExpressionResolver A valid IndexNameExpressionResolver object
* @throws ElasticsearchStatusException when a validation fails
*/
public static void check(DataFrameTransformConfig config,
public static void validate(DataFrameTransformConfig config,
ClusterState clusterState,
IndexNameExpressionResolver indexNameExpressionResolver,
boolean shouldDefer) {
for (SourceDestValidation validation : VALIDATIONS) {
if (shouldDefer && validation.isDeferrable()) {
continue;
}
validation.validate(config, clusterState, indexNameExpressionResolver);
}
}
static class SourceMissingValidation implements SourceDestValidation {
@Override
public boolean isDeferrable() {
return true;
}
@Override
public void validate(DataFrameTransformConfig config,
ClusterState clusterState,
IndexNameExpressionResolver indexNameExpressionResolver) {
final String destIndex = config.getDestination().getIndex();
Set<String> concreteSourceIndexNames = new HashSet<>();
for(String src : config.getSource().getIndex()) {
String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src);
if (concreteNames.length == 0) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src),
RestStatus.BAD_REQUEST);
for(String src : config.getSource().getIndex()) {
String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(),
src);
if (concreteNames.length == 0) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src),
RestStatus.BAD_REQUEST);
}
}
if (Regex.simpleMatch(src, destIndex)) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, destIndex, src),
RestStatus.BAD_REQUEST);
}
}
static class DestinationInSourceValidation implements SourceDestValidation {
@Override
public boolean isDeferrable() {
return true;
}
@Override
public void validate(DataFrameTransformConfig config,
ClusterState clusterState,
IndexNameExpressionResolver indexNameExpressionResolver) {
final String destIndex = config.getDestination().getIndex();
Set<String> concreteSourceIndexNames = new HashSet<>();
for(String src : config.getSource().getIndex()) {
String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(),
src);
if (Regex.simpleMatch(src, destIndex)) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, destIndex, src),
RestStatus.BAD_REQUEST);
}
concreteSourceIndexNames.addAll(Arrays.asList(concreteNames));
}
concreteSourceIndexNames.addAll(Arrays.asList(concreteNames));
if (concreteSourceIndexNames.contains(destIndex)) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
destIndex,
Strings.arrayToCommaDelimitedString(config.getSource().getIndex())),
RestStatus.BAD_REQUEST
);
}
final String[] concreteDest = indexNameExpressionResolver.concreteIndexNames(clusterState,
IndicesOptions.lenientExpandOpen(),
destIndex);
if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
concreteDest[0],
Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))),
RestStatus.BAD_REQUEST
);
}
}
}
static class DestinationSingleIndexValidation implements SourceDestValidation {
@Override
public boolean isDeferrable() {
return false;
}
if (concreteSourceIndexNames.contains(destIndex)) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
destIndex,
Strings.arrayToCommaDelimitedString(config.getSource().getIndex())),
RestStatus.BAD_REQUEST
);
}
@Override
public void validate(DataFrameTransformConfig config,
ClusterState clusterState,
IndexNameExpressionResolver indexNameExpressionResolver) {
final String destIndex = config.getDestination().getIndex();
final String[] concreteDest =
indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex);
final String[] concreteDest =
indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex);
if (concreteDest.length > 1) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX, destIndex),
RestStatus.BAD_REQUEST
);
}
if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE,
concreteDest[0],
Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))),
RestStatus.BAD_REQUEST
);
if (concreteDest.length > 1) {
throw new ElasticsearchStatusException(
DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX, destIndex),
RestStatus.BAD_REQUEST
);
}
}
}
}

View File

@ -69,17 +69,28 @@ public class Pivot {
this.supportsIncrementalBucketUpdate = supportsIncrementalBucketUpdate;
}
public void validate(Client client, SourceConfig sourceConfig, final ActionListener<Boolean> listener) {
// step 1: check if used aggregations are supported
public void validateConfig() {
for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) {
if (Aggregations.isSupportedByDataframe(agg.getType()) == false) {
listener.onFailure(new RuntimeException("Unsupported aggregation type [" + agg.getType() + "]"));
return;
throw new RuntimeException("Unsupported aggregation type [" + agg.getType() + "]");
}
}
}
// step 2: run a query to validate that config is valid
runTestQuery(client, sourceConfig, listener);
public void validateQuery(Client client, SourceConfig sourceConfig, final ActionListener<Boolean> listener) {
SearchRequest searchRequest = buildSearchRequest(sourceConfig, null, TEST_QUERY_PAGE_SIZE);
client.execute(SearchAction.INSTANCE, searchRequest, ActionListener.wrap(response -> {
if (response == null) {
listener.onFailure(new RuntimeException("Unexpected null response from test query"));
return;
}
if (response.status() != RestStatus.OK) {
listener.onFailure(new RuntimeException("Unexpected status from response of test query: "+ response.status()));
return;
}
listener.onResponse(true);
}, e -> listener.onFailure(new RuntimeException("Failed to test query", e))));
}
public void deduceMappings(Client client, SourceConfig sourceConfig, final ActionListener<Map<String, String>> listener) {
@ -164,24 +175,6 @@ public class Pivot {
dataFrameIndexerTransformStats);
}
private void runTestQuery(Client client, SourceConfig sourceConfig, final ActionListener<Boolean> listener) {
SearchRequest searchRequest = buildSearchRequest(sourceConfig, null, TEST_QUERY_PAGE_SIZE);
client.execute(SearchAction.INSTANCE, searchRequest, ActionListener.wrap(response -> {
if (response == null) {
listener.onFailure(new RuntimeException("Unexpected null response from test query"));
return;
}
if (response.status() != RestStatus.OK) {
listener.onFailure(new RuntimeException("Unexpected status from response of test query: " + response.status()));
return;
}
listener.onResponse(true);
}, e->{
listener.onFailure(new RuntimeException("Failed to test query", e));
}));
}
public QueryBuilder filterBuckets(Map<String, Set<String>> changedBuckets) {
if (changedBuckets == null || changedBuckets.isEmpty()) {
@ -247,4 +240,5 @@ public class Pivot {
}
return compositeAggregation;
}
}

View File

@ -65,43 +65,47 @@ public class SourceDestValidatorTests extends ESTestCase {
public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("dest", null));
SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver());
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false);
}
public void testCheck_GivenMissingConcreteSourceIndex() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("missing"), new DestConfig("dest", null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Source index [missing] does not exist"));
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
}
public void testCheck_GivenMissingWildcardSourceIndex() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("missing*"), new DestConfig("dest", null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Source index [missing*] does not exist"));
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
}
public void testCheck_GivenDestIndexSameAsSourceIndex() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1", null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Destination index [source-1] is included in source expression [source-1]"));
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
}
public void testCheck_GivenDestIndexMatchesSourceIndex() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("source-*"), new DestConfig(SOURCE_2, null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]"));
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
}
public void testCheck_GivenDestIndexMatchesOneOfSourceIndices() {
@ -109,16 +113,23 @@ public class SourceDestValidatorTests extends ESTestCase {
new DestConfig(SOURCE_2, null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]"));
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
}
public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndices() {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("dest-alias", null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(),
equalTo("Destination index [dest-alias] should refer to a single index"));
e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(),
equalTo("Destination index [dest-alias] should refer to a single index"));
@ -128,10 +139,12 @@ public class SourceDestValidatorTests extends ESTestCase {
DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1-alias", null));
ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class,
() -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()));
() -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false));
assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST));
assertThat(e.getMessage(),
equalTo("Destination index [source-1] is included in source expression [source-1]"));
SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true);
}
private static DataFrameTransformConfig createDataFrameTransform(SourceConfig sourceConfig, DestConfig destConfig) {

View File

@ -47,7 +47,9 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.util.Collections.emptyList;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
public class PivotTests extends ESTestCase {
@ -140,10 +142,10 @@ public class PivotTests extends ESTestCase {
public void testValidateAllUnsupportedAggregations() throws Exception {
for (String agg : unsupportedAggregations) {
AggregationConfig aggregationConfig = getAggregationConfig(agg);
SourceConfig source = new SourceConfig(new String[]{"existing_source"}, QueryConfig.matchAll());
Pivot pivot = new Pivot(getValidPivotConfig(aggregationConfig));
assertInvalidTransform(client, source, pivot);
RuntimeException ex = expectThrows(RuntimeException.class, pivot::validateConfig);
assertThat("expected aggregations to be unsupported, but they were", ex, is(notNullValue()));
}
}
@ -248,7 +250,7 @@ public class PivotTests extends ESTestCase {
private static void validate(Client client, SourceConfig source, Pivot pivot, boolean expectValid) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
pivot.validate(client, source, ActionListener.wrap(validity -> {
pivot.validateQuery(client, source, ActionListener.wrap(validity -> {
assertEquals(expectValid, validity);
latch.countDown();
}, e -> {

View File

@ -11,6 +11,13 @@
"required": true,
"description": "The id of the new transform."
}
},
"params": {
"defer_validation": {
"type": "boolean",
"required": false,
"description": "If validations should be deferred until data frame transform starts, defaults to false."
}
}
},
"body": {

View File

@ -91,6 +91,20 @@ setup:
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- do:
data_frame.put_data_frame_transform:
transform_id: "missing-source-transform"
defer_validation: true
body: >
{
"source": { "index": "missing-index" },
"dest": { "index": "missing-source-dest" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- match: { acknowledged: true }
---
"Test basic transform crud":
- do:
@ -316,6 +330,22 @@ setup:
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
- do:
data_frame.put_data_frame_transform:
transform_id: "airline-transform"
defer_validation: true
body: >
{
"source": {
"index": ["airline-data*"]
},
"dest": { "index": "airline-data-by-airline" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
}
}
---
"Test alias scenarios":
- do: