diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java index 8b4b54af02a..f45f2a6c8ec 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/DataFrameRequestConverters.java @@ -39,6 +39,7 @@ import static org.elasticsearch.client.RequestConverters.REQUEST_BODY_CONTENT_TY import static org.elasticsearch.client.RequestConverters.createEntity; import static org.elasticsearch.client.dataframe.DeleteDataFrameTransformRequest.FORCE; import static org.elasticsearch.client.dataframe.GetDataFrameTransformRequest.ALLOW_NO_MATCH; +import static org.elasticsearch.client.dataframe.PutDataFrameTransformRequest.DEFER_VALIDATION; final class DataFrameRequestConverters { @@ -51,6 +52,9 @@ final class DataFrameRequestConverters { .build(); Request request = new Request(HttpPut.METHOD_NAME, endpoint); request.setEntity(createEntity(putRequest, REQUEST_BODY_CONTENT_TYPE)); + if (putRequest.getDeferValidation() != null) { + request.addParameter(DEFER_VALIDATION, Boolean.toString(putRequest.getDeferValidation())); + } return request; } diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/PutDataFrameTransformRequest.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/PutDataFrameTransformRequest.java index 8ac80587fed..814414f04ee 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/PutDataFrameTransformRequest.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/PutDataFrameTransformRequest.java @@ -31,7 +31,9 @@ import java.util.Optional; public class PutDataFrameTransformRequest implements ToXContentObject, Validatable { + public static final String DEFER_VALIDATION = "defer_validation"; private final DataFrameTransformConfig config; + private Boolean deferValidation; public PutDataFrameTransformRequest(DataFrameTransformConfig config) { this.config = config; @@ -41,6 +43,19 @@ public class PutDataFrameTransformRequest implements ToXContentObject, Validatab return config; } + public Boolean getDeferValidation() { + return deferValidation; + } + + /** + * Indicates if deferrable validations should be skipped until the transform starts + * + * @param deferValidation {@code true} will cause validations to be deferred + */ + public void setDeferValidation(boolean deferValidation) { + this.deferValidation = deferValidation; + } + @Override public Optional validate() { ValidationException validationException = new ValidationException(); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java index 8fe5cc837ba..299636bc5cc 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameRequestConvertersTests.java @@ -67,7 +67,7 @@ public class DataFrameRequestConvertersTests extends ESTestCase { PutDataFrameTransformRequest putRequest = new PutDataFrameTransformRequest( DataFrameTransformConfigTests.randomDataFrameTransformConfig()); Request request = DataFrameRequestConverters.putDataFrameTransform(putRequest); - + assertThat(request.getParameters(), not(hasKey("defer_validation"))); assertEquals(HttpPut.METHOD_NAME, request.getMethod()); assertThat(request.getEndpoint(), equalTo("/_data_frame/transforms/" + putRequest.getConfig().getId())); @@ -75,6 +75,9 @@ public class DataFrameRequestConvertersTests extends ESTestCase { DataFrameTransformConfig parsedConfig = DataFrameTransformConfig.PARSER.apply(parser, null); assertThat(parsedConfig, equalTo(putRequest.getConfig())); } + putRequest.setDeferValidation(true); + request = DataFrameRequestConverters.putDataFrameTransform(putRequest); + assertThat(request.getParameters(), hasEntry("defer_validation", Boolean.toString(putRequest.getDeferValidation()))); } public void testDeleteDataFrameTransform() { diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java index 71b6cfe3337..d41a7c07dcb 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/DataFrameTransformIT.java @@ -181,6 +181,22 @@ public class DataFrameTransformIT extends ESRestHighLevelClientTestCase { assertThat(deleteError.getMessage(), containsString("Transform with id [test-crud] could not be found")); } + public void testCreateDeleteWithDefer() throws IOException { + String sourceIndex = "missing-source-index"; + + String id = "test-with-defer"; + DataFrameTransformConfig transform = validDataFrameTransformConfig(id, sourceIndex, "pivot-dest"); + DataFrameClient client = highLevelClient().dataFrame(); + PutDataFrameTransformRequest request = new PutDataFrameTransformRequest(transform); + request.setDeferValidation(true); + AcknowledgedResponse ack = execute(request, client::putDataFrameTransform, client::putDataFrameTransformAsync); + assertTrue(ack.isAcknowledged()); + + ack = execute(new DeleteDataFrameTransformRequest(transform.getId()), client::deleteDataFrameTransform, + client::deleteDataFrameTransformAsync); + assertTrue(ack.isAcknowledged()); + } + public void testGetTransform() throws IOException { String sourceIndex = "transform-source"; createIndex(sourceIndex); diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java index cf2fe485245..88332a0c13c 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/DataFrameTransformDocumentationIT.java @@ -166,6 +166,7 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest // tag::put-data-frame-transform-request PutDataFrameTransformRequest request = new PutDataFrameTransformRequest(transformConfig); // <1> + request.setDeferValidation(false); // <2> // end::put-data-frame-transform-request // tag::put-data-frame-transform-execute diff --git a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc index 4b50ad367c0..50362d2fc4a 100644 --- a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc +++ b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc @@ -20,6 +20,10 @@ A +{request}+ requires the following argument: include-tagged::{doc-tests-file}[{api}-request] -------------------------------------------------- <1> The configuration of the {dataframe-transform} to create +<2> Whether or not to wait to run deferrable validations until `_start` is called. +This option should be used with care as the created {dataframe-transform} will run +with the privileges of the user creating it. Meaning, if they do not have privileges, +such an error will not be visible until `_start` is called. [id="{upid}-{api}-config"] ==== Data Frame Transform Configuration diff --git a/docs/reference/data-frames/apis/put-transform.asciidoc b/docs/reference/data-frames/apis/put-transform.asciidoc index 252cf0450c5..9fe95470280 100644 --- a/docs/reference/data-frames/apis/put-transform.asciidoc +++ b/docs/reference/data-frames/apis/put-transform.asciidoc @@ -45,6 +45,18 @@ IMPORTANT: You must use {kib} or this API to create a {dataframe-transform}. can contain lowercase alphanumeric characters (a-z and 0-9), hyphens, and underscores. It must start and end with alphanumeric characters. +[[put-data-frame-transform-query-parms]] +==== {api-query-parms-title} + +`defer_validation`:: + (Optional, boolean) When `true`, deferrable validations are not run. This + behavior may be desired if the source index does not exist until after the +{dataframe-transform} is created. Deferred validations are always run when the +{dataframe-transform} is started, with the exception of privilege checks. If the +user who created the transform does not have the required privileges on the +source and destination indices, the transform starts but then fails when it +attempts the unauthorized operation. The default value is `false`. + [[put-data-frame-transform-request-body]] ==== {api-request-body-title} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java index 9d5db1e5022..9f526dd92d2 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java @@ -33,6 +33,7 @@ public final class DataFrameField { public static final ParseField SYNC = new ParseField("sync"); public static final ParseField TIME_BASED_SYNC = new ParseField("time"); public static final ParseField DELAY = new ParseField("delay"); + public static final ParseField DEFER_VALIDATION = new ParseField("defer_validation"); public static final ParseField ALLOW_NO_MATCH = new ParseField("allow_no_match"); /** diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java index dd6458cd246..29888046644 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.core.dataframe.action; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.master.AcknowledgedRequest; @@ -13,8 +14,6 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.xpack.core.dataframe.DataFrameField; @@ -41,21 +40,28 @@ public class PutDataFrameTransformAction extends ActionType implements ToXContentObject { + public static class Request extends AcknowledgedRequest { private final DataFrameTransformConfig config; + private final boolean deferValidation; - public Request(DataFrameTransformConfig config) { + public Request(DataFrameTransformConfig config, boolean deferValidation) { this.config = config; + this.deferValidation = deferValidation; } public Request(StreamInput in) throws IOException { super(in); this.config = new DataFrameTransformConfig(in); + if (in.getVersion().onOrAfter(Version.V_7_4_0)) { + this.deferValidation = in.readBoolean(); + } else { + this.deferValidation = false; + } } - public static Request fromXContent(final XContentParser parser, final String id) throws IOException { - return new Request(DataFrameTransformConfig.fromXContent(parser, id, false)); + public static Request fromXContent(final XContentParser parser, final String id, final boolean deferValidation) { + return new Request(DataFrameTransformConfig.fromXContent(parser, id, false), deferValidation); } /** @@ -111,24 +117,26 @@ public class PutDataFrameTransformAction extends ActionType { +import static java.util.Collections.emptyList; + +public class PutDataFrameTransformActionRequestTests extends AbstractWireSerializingTestCase { private String transformId; @Before @@ -23,24 +31,24 @@ public class PutDataFrameTransformActionRequestTests extends AbstractSerializing transformId = randomAlphaOfLengthBetween(1, 10); } - @Override - protected Request doParseInstance(XContentParser parser) throws IOException { - return Request.fromXContent(parser, transformId); - } - @Override protected Writeable.Reader instanceReader() { return Request::new; } @Override - protected boolean supportsUnknownFields() { - return false; + protected Request createTestInstance() { + DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfigWithoutHeaders(transformId); + return new Request(config, randomBoolean()); } @Override - protected Request createTestInstance() { - DataFrameTransformConfig config = DataFrameTransformConfigTests.randomDataFrameTransformConfigWithoutHeaders(transformId); - return new Request(config); + protected NamedWriteableRegistry getNamedWriteableRegistry() { + SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList()); + + List namedWriteables = searchModule.getNamedWriteables(); + namedWriteables.add(new NamedWriteableRegistry.Entry(SyncConfig.class, DataFrameField.TIME_BASED_SYNC.getPreferredName(), + TimeSyncConfig::new)); + return new NamedWriteableRegistry(namedWriteables); } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java index 9110f473a42..29a5e5d08ef 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportPutDataFrameTransformAction.java @@ -56,8 +56,7 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -public class TransportPutDataFrameTransformAction - extends TransportMasterNodeAction { +public class TransportPutDataFrameTransformAction extends TransportMasterNodeAction { private final XPackLicenseState licenseState; private final Client client; @@ -120,14 +119,14 @@ public class TransportPutDataFrameTransformAction return; } try { - SourceDestValidator.check(config, clusterState, indexNameExpressionResolver); + SourceDestValidator.validate(config, clusterState, indexNameExpressionResolver, request.isDeferValidation()); } catch (ElasticsearchStatusException ex) { listener.onFailure(ex); return; } // Early check to verify that the user can create the destination index and can read from the source - if (licenseState.isAuthAllowed()) { + if (licenseState.isAuthAllowed() && request.isDeferValidation() == false) { final String destIndex = config.getDestination().getIndex(); final String[] concreteDest = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), @@ -162,12 +161,12 @@ public class TransportPutDataFrameTransformAction privRequest.clusterPrivileges(Strings.EMPTY_ARRAY); privRequest.indexPrivileges(sourceIndexPrivileges, destIndexPrivileges); ActionListener privResponseListener = ActionListener.wrap( - r -> handlePrivsResponse(username, config, r, listener), + r -> handlePrivsResponse(username, request, r, listener), listener::onFailure); client.execute(HasPrivilegesAction.INSTANCE, privRequest, privResponseListener); } else { // No security enabled, just create the transform - putDataFrame(config, listener); + putDataFrame(request, listener); } } @@ -177,11 +176,11 @@ public class TransportPutDataFrameTransformAction } private void handlePrivsResponse(String username, - DataFrameTransformConfig config, + Request request, HasPrivilegesResponse privilegesResponse, - ActionListener listener) throws IOException { + ActionListener listener) { if (privilegesResponse.isCompleteMatch()) { - putDataFrame(config, listener); + putDataFrame(request, listener); } else { List indices = privilegesResponse.getIndexPrivileges() .stream() @@ -190,14 +189,15 @@ public class TransportPutDataFrameTransformAction listener.onFailure(Exceptions.authorizationError( "Cannot create data frame transform [{}] because user {} lacks all the required permissions for indices: {}", - config.getId(), + request.getConfig().getId(), username, indices)); } } - private void putDataFrame(DataFrameTransformConfig config, ActionListener listener) { + private void putDataFrame(Request request, ActionListener listener) { + final DataFrameTransformConfig config = request.getConfig(); final Pivot pivot = new Pivot(config.getPivotConfig()); // <3> Return to the listener @@ -213,11 +213,23 @@ public class TransportPutDataFrameTransformAction ActionListener pivotValidationListener = ActionListener.wrap( validationResult -> dataFrameTransformsConfigManager.putTransformConfiguration(config, putTransformConfigurationListener), validationException -> listener.onFailure( - new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION, - validationException)) + new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION, + validationException)) ); - // <1> Validate our pivot - pivot.validate(client, config.getSource(), pivotValidationListener); + try { + pivot.validateConfig(); + } catch (Exception e) { + listener.onFailure( + new RuntimeException(DataFrameMessages.REST_PUT_DATA_FRAME_FAILED_TO_VALIDATE_DATA_FRAME_CONFIGURATION, + e)); + return; + } + + if (request.isDeferValidation()) { + pivotValidationListener.onResponse(true); + } else { + pivot.validateQuery(client, config.getSource(), pivotValidationListener); + } } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java index a3835b067d7..ed88ef95502 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java @@ -181,7 +181,7 @@ public class TransportStartDataFrameTransformAction extends return; } // Validate source and destination indices - SourceDestValidator.check(config, clusterService.state(), indexNameExpressionResolver); + SourceDestValidator.validate(config, clusterService.state(), indexNameExpressionResolver, false); transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion(), config.getFrequency())); final String destinationIndex = config.getDestination().getIndex(); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java index dc3c34f1bd0..9e881c81130 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStopDataFrameTransformAction.java @@ -141,7 +141,7 @@ public class TransportStopDataFrameTransformAction extends } if (ids.contains(transformTask.getTransformId())) { - // This should not occur as we validate that none of the tasks are in a failed state earlier + // This should not occur as we check that none of the tasks are in a failed state earlier // Keep this check in here for insurance. if (transformTask.getState().getTaskState() == DataFrameTransformTaskState.FAILED && request.isForce() == false) { listener.onFailure( diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestPutDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestPutDataFrameTransformAction.java index 2874894d879..10320024456 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestPutDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/rest/action/RestPutDataFrameTransformAction.java @@ -35,7 +35,8 @@ public class RestPutDataFrameTransformAction extends BaseRestHandler { String id = restRequest.param(DataFrameField.ID.getPreferredName()); XContentParser parser = restRequest.contentParser(); - PutDataFrameTransformAction.Request request = PutDataFrameTransformAction.Request.fromXContent(parser, id); + boolean deferValidation = restRequest.paramAsBoolean(DataFrameField.DEFER_VALIDATION.getPreferredName(), false); + PutDataFrameTransformAction.Request request = PutDataFrameTransformAction.Request.fromXContent(parser, id, deferValidation); return channel -> client.execute(PutDataFrameTransformAction.INSTANCE, request, new RestToXContentListener<>(channel)); } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidator.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidator.java index 411e1787f1c..3f5ae039a9a 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidator.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidator.java @@ -18,6 +18,7 @@ import org.elasticsearch.xpack.core.dataframe.transforms.DataFrameTransformConfi import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; /** @@ -26,7 +27,14 @@ import java.util.Set; */ public final class SourceDestValidator { - private SourceDestValidator() {} + interface SourceDestValidation { + boolean isDeferrable(); + void validate(DataFrameTransformConfig config, ClusterState clusterState, IndexNameExpressionResolver indexNameExpressionResolver); + } + + private static final List VALIDATIONS = Arrays.asList(new SourceMissingValidation(), + new DestinationInSourceValidation(), + new DestinationSingleIndexValidation()); /** * Validates the DataFrameTransformConfiguration source and destination indices. @@ -41,52 +49,111 @@ public final class SourceDestValidator { * @param indexNameExpressionResolver A valid IndexNameExpressionResolver object * @throws ElasticsearchStatusException when a validation fails */ - public static void check(DataFrameTransformConfig config, + public static void validate(DataFrameTransformConfig config, + ClusterState clusterState, + IndexNameExpressionResolver indexNameExpressionResolver, + boolean shouldDefer) { + for (SourceDestValidation validation : VALIDATIONS) { + if (shouldDefer && validation.isDeferrable()) { + continue; + } + validation.validate(config, clusterState, indexNameExpressionResolver); + } + } + + static class SourceMissingValidation implements SourceDestValidation { + + @Override + public boolean isDeferrable() { + return true; + } + + @Override + public void validate(DataFrameTransformConfig config, ClusterState clusterState, IndexNameExpressionResolver indexNameExpressionResolver) { - - final String destIndex = config.getDestination().getIndex(); - Set concreteSourceIndexNames = new HashSet<>(); - for(String src : config.getSource().getIndex()) { - String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), src); - if (concreteNames.length == 0) { - throw new ElasticsearchStatusException( - DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src), - RestStatus.BAD_REQUEST); + for(String src : config.getSource().getIndex()) { + String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, + IndicesOptions.lenientExpandOpen(), + src); + if (concreteNames.length == 0) { + throw new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_SOURCE_INDEX_MISSING, src), + RestStatus.BAD_REQUEST); + } } - if (Regex.simpleMatch(src, destIndex)) { - throw new ElasticsearchStatusException( - DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, destIndex, src), - RestStatus.BAD_REQUEST); + } + } + + static class DestinationInSourceValidation implements SourceDestValidation { + + @Override + public boolean isDeferrable() { + return true; + } + + @Override + public void validate(DataFrameTransformConfig config, + ClusterState clusterState, + IndexNameExpressionResolver indexNameExpressionResolver) { + final String destIndex = config.getDestination().getIndex(); + Set concreteSourceIndexNames = new HashSet<>(); + for(String src : config.getSource().getIndex()) { + String[] concreteNames = indexNameExpressionResolver.concreteIndexNames(clusterState, + IndicesOptions.lenientExpandOpen(), + src); + if (Regex.simpleMatch(src, destIndex)) { + throw new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, destIndex, src), + RestStatus.BAD_REQUEST); + } + concreteSourceIndexNames.addAll(Arrays.asList(concreteNames)); } - concreteSourceIndexNames.addAll(Arrays.asList(concreteNames)); + + if (concreteSourceIndexNames.contains(destIndex)) { + throw new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, + destIndex, + Strings.arrayToCommaDelimitedString(config.getSource().getIndex())), + RestStatus.BAD_REQUEST + ); + } + + final String[] concreteDest = indexNameExpressionResolver.concreteIndexNames(clusterState, + IndicesOptions.lenientExpandOpen(), + destIndex); + if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) { + throw new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, + concreteDest[0], + Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))), + RestStatus.BAD_REQUEST + ); + } + } + } + + static class DestinationSingleIndexValidation implements SourceDestValidation { + + @Override + public boolean isDeferrable() { + return false; } - if (concreteSourceIndexNames.contains(destIndex)) { - throw new ElasticsearchStatusException( - DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, - destIndex, - Strings.arrayToCommaDelimitedString(config.getSource().getIndex())), - RestStatus.BAD_REQUEST - ); - } + @Override + public void validate(DataFrameTransformConfig config, + ClusterState clusterState, + IndexNameExpressionResolver indexNameExpressionResolver) { + final String destIndex = config.getDestination().getIndex(); + final String[] concreteDest = + indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex); - final String[] concreteDest = - indexNameExpressionResolver.concreteIndexNames(clusterState, IndicesOptions.lenientExpandOpen(), destIndex); - - if (concreteDest.length > 1) { - throw new ElasticsearchStatusException( - DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX, destIndex), - RestStatus.BAD_REQUEST - ); - } - if (concreteDest.length > 0 && concreteSourceIndexNames.contains(concreteDest[0])) { - throw new ElasticsearchStatusException( - DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_IN_SOURCE, - concreteDest[0], - Strings.arrayToCommaDelimitedString(concreteSourceIndexNames.toArray(new String[0]))), - RestStatus.BAD_REQUEST - ); + if (concreteDest.length > 1) { + throw new ElasticsearchStatusException( + DataFrameMessages.getMessage(DataFrameMessages.REST_PUT_DATA_FRAME_DEST_SINGLE_INDEX, destIndex), + RestStatus.BAD_REQUEST + ); + } } } } diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java index 3406e3b2356..46b27938648 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/pivot/Pivot.java @@ -69,17 +69,28 @@ public class Pivot { this.supportsIncrementalBucketUpdate = supportsIncrementalBucketUpdate; } - public void validate(Client client, SourceConfig sourceConfig, final ActionListener listener) { - // step 1: check if used aggregations are supported + public void validateConfig() { for (AggregationBuilder agg : config.getAggregationConfig().getAggregatorFactories()) { if (Aggregations.isSupportedByDataframe(agg.getType()) == false) { - listener.onFailure(new RuntimeException("Unsupported aggregation type [" + agg.getType() + "]")); - return; + throw new RuntimeException("Unsupported aggregation type [" + agg.getType() + "]"); } } + } - // step 2: run a query to validate that config is valid - runTestQuery(client, sourceConfig, listener); + public void validateQuery(Client client, SourceConfig sourceConfig, final ActionListener listener) { + SearchRequest searchRequest = buildSearchRequest(sourceConfig, null, TEST_QUERY_PAGE_SIZE); + + client.execute(SearchAction.INSTANCE, searchRequest, ActionListener.wrap(response -> { + if (response == null) { + listener.onFailure(new RuntimeException("Unexpected null response from test query")); + return; + } + if (response.status() != RestStatus.OK) { + listener.onFailure(new RuntimeException("Unexpected status from response of test query: "+ response.status())); + return; + } + listener.onResponse(true); + }, e -> listener.onFailure(new RuntimeException("Failed to test query", e)))); } public void deduceMappings(Client client, SourceConfig sourceConfig, final ActionListener> listener) { @@ -164,24 +175,6 @@ public class Pivot { dataFrameIndexerTransformStats); } - private void runTestQuery(Client client, SourceConfig sourceConfig, final ActionListener listener) { - SearchRequest searchRequest = buildSearchRequest(sourceConfig, null, TEST_QUERY_PAGE_SIZE); - - client.execute(SearchAction.INSTANCE, searchRequest, ActionListener.wrap(response -> { - if (response == null) { - listener.onFailure(new RuntimeException("Unexpected null response from test query")); - return; - } - if (response.status() != RestStatus.OK) { - listener.onFailure(new RuntimeException("Unexpected status from response of test query: " + response.status())); - return; - } - listener.onResponse(true); - }, e->{ - listener.onFailure(new RuntimeException("Failed to test query", e)); - })); - } - public QueryBuilder filterBuckets(Map> changedBuckets) { if (changedBuckets == null || changedBuckets.isEmpty()) { @@ -247,4 +240,5 @@ public class Pivot { } return compositeAggregation; } + } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidatorTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidatorTests.java index 48f994de6ca..c9f4a0bc06b 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidatorTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/SourceDestValidatorTests.java @@ -65,43 +65,47 @@ public class SourceDestValidatorTests extends ESTestCase { public void testCheck_GivenSimpleSourceIndexAndValidDestIndex() { DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("dest", null)); - SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver()); + SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false); } public void testCheck_GivenMissingConcreteSourceIndex() { DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("missing"), new DestConfig("dest", null)); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); + () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false)); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.getMessage(), equalTo("Source index [missing] does not exist")); + SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true); } public void testCheck_GivenMissingWildcardSourceIndex() { DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("missing*"), new DestConfig("dest", null)); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); + () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false)); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.getMessage(), equalTo("Source index [missing*] does not exist")); + SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true); } public void testCheck_GivenDestIndexSameAsSourceIndex() { DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1", null)); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); + () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false)); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.getMessage(), equalTo("Destination index [source-1] is included in source expression [source-1]")); + SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true); } public void testCheck_GivenDestIndexMatchesSourceIndex() { DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig("source-*"), new DestConfig(SOURCE_2, null)); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); + () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false)); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]")); + SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true); } public void testCheck_GivenDestIndexMatchesOneOfSourceIndices() { @@ -109,16 +113,23 @@ public class SourceDestValidatorTests extends ESTestCase { new DestConfig(SOURCE_2, null)); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); + () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false)); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.getMessage(), equalTo("Destination index [source-2] is included in source expression [source-*]")); + SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true); } public void testCheck_GivenDestIndexIsAliasThatMatchesMultipleIndices() { DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("dest-alias", null)); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); + () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false)); + assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); + assertThat(e.getMessage(), + equalTo("Destination index [dest-alias] should refer to a single index")); + + e = expectThrows(ElasticsearchStatusException.class, + () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true)); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.getMessage(), equalTo("Destination index [dest-alias] should refer to a single index")); @@ -128,10 +139,12 @@ public class SourceDestValidatorTests extends ESTestCase { DataFrameTransformConfig config = createDataFrameTransform(new SourceConfig(SOURCE_1), new DestConfig("source-1-alias", null)); ElasticsearchStatusException e = expectThrows(ElasticsearchStatusException.class, - () -> SourceDestValidator.check(config, CLUSTER_STATE, new IndexNameExpressionResolver())); + () -> SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), false)); assertThat(e.status(), equalTo(RestStatus.BAD_REQUEST)); assertThat(e.getMessage(), equalTo("Destination index [source-1] is included in source expression [source-1]")); + + SourceDestValidator.validate(config, CLUSTER_STATE, new IndexNameExpressionResolver(), true); } private static DataFrameTransformConfig createDataFrameTransform(SourceConfig sourceConfig, DestConfig destConfig) { diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java index 7e6015073e9..a4e58db5860 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/pivot/PivotTests.java @@ -47,7 +47,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.Collections.emptyList; +import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; public class PivotTests extends ESTestCase { @@ -140,10 +142,10 @@ public class PivotTests extends ESTestCase { public void testValidateAllUnsupportedAggregations() throws Exception { for (String agg : unsupportedAggregations) { AggregationConfig aggregationConfig = getAggregationConfig(agg); - SourceConfig source = new SourceConfig(new String[]{"existing_source"}, QueryConfig.matchAll()); Pivot pivot = new Pivot(getValidPivotConfig(aggregationConfig)); - assertInvalidTransform(client, source, pivot); + RuntimeException ex = expectThrows(RuntimeException.class, pivot::validateConfig); + assertThat("expected aggregations to be unsupported, but they were", ex, is(notNullValue())); } } @@ -248,7 +250,7 @@ public class PivotTests extends ESTestCase { private static void validate(Client client, SourceConfig source, Pivot pivot, boolean expectValid) throws Exception { CountDownLatch latch = new CountDownLatch(1); final AtomicReference exceptionHolder = new AtomicReference<>(); - pivot.validate(client, source, ActionListener.wrap(validity -> { + pivot.validateQuery(client, source, ActionListener.wrap(validity -> { assertEquals(expectValid, validity); latch.countDown(); }, e -> { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.put_data_frame_transform.json b/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.put_data_frame_transform.json index 919682676da..69f740c059b 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.put_data_frame_transform.json +++ b/x-pack/plugin/src/test/resources/rest-api-spec/api/data_frame.put_data_frame_transform.json @@ -11,6 +11,13 @@ "required": true, "description": "The id of the new transform." } + }, + "params": { + "defer_validation": { + "type": "boolean", + "required": false, + "description": "If validations should be deferred until data frame transform starts, defaults to false." + } } }, "body": { diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml index e3c060f480e..eac24905436 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml @@ -91,6 +91,20 @@ setup: "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} } } + - do: + data_frame.put_data_frame_transform: + transform_id: "missing-source-transform" + defer_validation: true + body: > + { + "source": { "index": "missing-index" }, + "dest": { "index": "missing-source-dest" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + - match: { acknowledged: true } --- "Test basic transform crud": - do: @@ -316,6 +330,22 @@ setup: "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} } } + + - do: + data_frame.put_data_frame_transform: + transform_id: "airline-transform" + defer_validation: true + body: > + { + "source": { + "index": ["airline-data*"] + }, + "dest": { "index": "airline-data-by-airline" }, + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } --- "Test alias scenarios": - do: