[7.10][ML] Validate dest pipeline exists on transform update (#63494) (#63549)

Adds validation that the dest pipeline exists when a transform
is updated. Refactors the pipeline check into the `SourceDestValidator`.

Fixes #59587

Backport of #63494
This commit is contained in:
Dimitris Athanasiou 2020-10-12 15:41:35 +03:00 committed by GitHub
parent 8b07750a8b
commit e1c418aac7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 200 additions and 74 deletions

View File

@ -20,6 +20,7 @@ import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.protocol.xpack.license.LicenseStatus;
import org.elasticsearch.transport.NoSuchRemoteClusterException;
@ -62,6 +63,7 @@ public final class SourceDestValidator {
public static final String REMOTE_CLUSTER_LICENSE_INACTIVE = "License check failed for remote cluster "
+ "alias [{0}], license is not active";
public static final String REMOTE_SOURCE_INDICES_NOT_SUPPORTED = "remote source indices are not supported";
public static final String PIPELINE_MISSING = "Pipeline with id [{0}] could not be found";
// workaround for 7.x: remoteClusterAliases does not throw
private static final ClusterNameExpressionResolver clusterNameExpressionResolver = new ClusterNameExpressionResolver();
@ -69,6 +71,7 @@ public final class SourceDestValidator {
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final RemoteClusterService remoteClusterService;
private final RemoteClusterLicenseChecker remoteClusterLicenseChecker;
private final IngestService ingestService;
private final String nodeName;
private final String license;
@ -80,8 +83,10 @@ public final class SourceDestValidator {
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final RemoteClusterService remoteClusterService;
private final RemoteClusterLicenseChecker remoteClusterLicenseChecker;
private final IngestService ingestService;
private final String[] source;
private final String dest;
private final String destIndex;
private final String destPipeline;
private final String nodeName;
private final String license;
@ -95,8 +100,10 @@ public final class SourceDestValidator {
final IndexNameExpressionResolver indexNameExpressionResolver,
final RemoteClusterService remoteClusterService,
final RemoteClusterLicenseChecker remoteClusterLicenseChecker,
final IngestService ingestService,
final String[] source,
final String dest,
final String destIndex,
final String destPipeline,
final String nodeName,
final String license
) {
@ -104,8 +111,10 @@ public final class SourceDestValidator {
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.remoteClusterService = remoteClusterService;
this.remoteClusterLicenseChecker = remoteClusterLicenseChecker;
this.ingestService = ingestService;
this.source = source;
this.dest = dest;
this.destIndex = destIndex;
this.destPipeline = destPipeline;
this.nodeName = nodeName;
this.license = license;
}
@ -126,6 +135,10 @@ public final class SourceDestValidator {
return indexNameExpressionResolver;
}
public IngestService getIngestService() {
return ingestService;
}
public boolean isRemoteSearchEnabled() {
return remoteClusterLicenseChecker != null;
}
@ -134,8 +147,8 @@ public final class SourceDestValidator {
return source;
}
public String getDest() {
return dest;
public String getDestIndex() {
return destIndex;
}
public String getNodeName() {
@ -168,11 +181,11 @@ public final class SourceDestValidator {
Index singleWriteIndex = indexNameExpressionResolver.concreteWriteIndex(
state,
IndicesOptions.lenientExpandOpen(),
dest,
destIndex,
true,
false);
resolvedDest = singleWriteIndex != null ? singleWriteIndex.getName() : dest;
resolvedDest = singleWriteIndex != null ? singleWriteIndex.getName() : destIndex;
} catch (IllegalArgumentException e) {
// stop here as we can not return a single dest index
addValidationError(e.getMessage());
@ -236,6 +249,7 @@ public final class SourceDestValidator {
public static final SourceDestValidation DESTINATION_IN_SOURCE_VALIDATION = new DestinationInSourceValidation();
public static final SourceDestValidation DESTINATION_SINGLE_INDEX_VALIDATION = new DestinationSingleIndexValidation();
public static final SourceDestValidation REMOTE_SOURCE_NOT_SUPPORTED_VALIDATION = new RemoteSourceNotSupportedValidation();
public static final SourceDestValidation DESTINATION_PIPELINE_MISSING_VALIDATION = new DestinationPipelineMissingValidation();
/**
* Create a new Source Dest Validator
@ -250,29 +264,33 @@ public final class SourceDestValidator {
IndexNameExpressionResolver indexNameExpressionResolver,
RemoteClusterService remoteClusterService,
RemoteClusterLicenseChecker remoteClusterLicenseChecker,
IngestService ingestService,
String nodeName,
String license
) {
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.remoteClusterService = remoteClusterService;
this.remoteClusterLicenseChecker = remoteClusterLicenseChecker;
this.ingestService = ingestService;
this.nodeName = nodeName;
this.license = license;
}
/**
* Run validation against source and dest.
* Run validation against source and destIndex.
*
* @param clusterState The current ClusterState
* @param source an array of source indexes
* @param dest destination index
* @param destIndex destination index
* @param destPipeline destination pipeline
* @param validations list of of validations to run
* @param listener result listener
*/
public void validate(
final ClusterState clusterState,
final String[] source,
final String dest,
final String destIndex,
@Nullable final String destPipeline,
final List<SourceDestValidation> validations,
final ActionListener<Boolean> listener
) {
@ -281,8 +299,10 @@ public final class SourceDestValidator {
indexNameExpressionResolver,
remoteClusterService,
remoteClusterLicenseChecker,
ingestService,
source,
dest,
destIndex,
destPipeline,
nodeName,
license
);
@ -306,7 +326,7 @@ public final class SourceDestValidator {
}
/**
* Validate dest request.
* Validate request.
*
* This runs a couple of simple validations at request time, to be executed from a {@link ActionRequest}}
* implementation.
@ -314,17 +334,17 @@ public final class SourceDestValidator {
* Note: Source can not be validated at request time as it might contain expressions.
*
* @param validationException an ActionRequestValidationException for collection validation problem, can be null
* @param dest destination index, null if validation shall be skipped
* @param destIndex destination index, null if validation shall be skipped
*/
public static ActionRequestValidationException validateRequest(
@Nullable ActionRequestValidationException validationException,
@Nullable String dest
@Nullable String destIndex
) {
try {
if (dest != null) {
validateIndexOrAliasName(dest, InvalidIndexNameException::new);
if (dest.toLowerCase(Locale.ROOT).equals(dest) == false) {
validationException = addValidationError(getMessage(DEST_LOWERCASE, dest), validationException);
if (destIndex != null) {
validateIndexOrAliasName(destIndex, InvalidIndexNameException::new);
if (destIndex.toLowerCase(Locale.ROOT).equals(destIndex) == false) {
validationException = addValidationError(getMessage(DEST_LOWERCASE, destIndex), validationException);
}
}
} catch (InvalidIndexNameException ex) {
@ -408,7 +428,7 @@ public final class SourceDestValidator {
@Override
public void validate(Context context, ActionListener<Context> listener) {
final String destIndex = context.getDest();
final String destIndex = context.getDestIndex();
boolean foundSourceInDest = false;
for (String src : context.getSource()) {
@ -462,6 +482,19 @@ public final class SourceDestValidator {
}
}
static class DestinationPipelineMissingValidation implements SourceDestValidation {
@Override
public void validate(Context context, ActionListener<Context> listener) {
if (context.destPipeline != null) {
if (context.ingestService.getPipeline(context.destPipeline) == null) {
context.addValidationError(PIPELINE_MISSING, context.destPipeline);
}
}
listener.onResponse(context);
}
}
private static String getMessage(String message, Object... args) {
return new MessageFormat(message, Locale.ROOT).format(args);
}

View File

@ -27,7 +27,6 @@ public class TransformMessages {
public static final String REST_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";
public static final String TRANSFORM_FAILED_TO_PERSIST_STATS = "Failed to persist transform statistics for transform [{0}]";
public static final String UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found";
public static final String PIPELINE_MISSING = "Pipeline with id [{0}] could not be found";
public static final String REST_DEPRECATED_ENDPOINT = "[_data_frame/transforms/] is deprecated, use [_transform/] in the future.";

View File

@ -23,6 +23,11 @@ import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.ingest.ConfigurationUtils;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.ingest.Pipeline;
import org.elasticsearch.ingest.Processor;
import org.elasticsearch.ingest.TestProcessor;
import org.elasticsearch.license.License;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
import org.elasticsearch.license.XPackLicenseState;
@ -44,7 +49,9 @@ import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -56,10 +63,12 @@ import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_VERSION_CREATED;
import static org.elasticsearch.mock.orig.Mockito.when;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_IN_SOURCE_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_PIPELINE_MISSING_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_SINGLE_INDEX_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.REMOTE_SOURCE_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SOURCE_MISSING_VALIDATION;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
public class SourceDestValidatorTests extends ESTestCase {
@ -79,6 +88,7 @@ public class SourceDestValidatorTests extends ESTestCase {
SOURCE_MISSING_VALIDATION,
DESTINATION_IN_SOURCE_VALIDATION,
DESTINATION_SINGLE_INDEX_VALIDATION,
DESTINATION_PIPELINE_MISSING_VALIDATION,
REMOTE_SOURCE_VALIDATION
);
@ -91,10 +101,13 @@ public class SourceDestValidatorTests extends ESTestCase {
private final ThreadPool threadPool = new TestThreadPool(getClass().getName());
private final TransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool);
private final RemoteClusterService remoteClusterService = transportService.getRemoteClusterService();
private final IngestService ingestService = mock(IngestService.class);
private final SourceDestValidator simpleNonRemoteValidator = new SourceDestValidator(
new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
remoteClusterService,
null,
ingestService,
"node_id",
"license"
);
@ -205,6 +218,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1 },
"dest",
null,
TEST_VALIDATIONS,
listener
),
@ -219,6 +233,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] {},
"dest",
null,
TEST_VALIDATIONS,
listener
),
@ -236,6 +251,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { "missing" },
"dest",
null,
TEST_VALIDATIONS,
listener
),
@ -251,6 +267,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { "missing" },
"dest",
null,
Collections.emptyList(),
listener
),
@ -265,6 +282,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1, "missing" },
"dest",
null,
TEST_VALIDATIONS,
listener
),
@ -280,6 +298,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1, "missing" },
"dest",
null,
Collections.emptyList(),
listener
),
@ -294,6 +313,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1, "wildcard*", "missing" },
"dest",
null,
TEST_VALIDATIONS,
listener
),
@ -309,6 +329,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1, "wildcard*", "missing" },
"dest",
null,
Collections.emptyList(),
listener
),
@ -323,6 +344,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { "wildcard*" },
"dest",
null,
TEST_VALIDATIONS,
listener
),
@ -337,6 +359,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1 },
SOURCE_1,
null,
TEST_VALIDATIONS,
listener
),
@ -355,6 +378,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1 },
SOURCE_1,
null,
Collections.emptyList(),
listener
),
@ -369,6 +393,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { "source-*" },
SOURCE_2,
null,
TEST_VALIDATIONS,
listener
),
@ -387,6 +412,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { "source-*" },
SOURCE_2,
null,
Collections.emptyList(),
listener
),
@ -401,6 +427,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { "source-1", "source-*" },
SOURCE_2,
null,
TEST_VALIDATIONS,
listener
),
@ -419,6 +446,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { "source-1", "source-*" },
SOURCE_2,
null,
Collections.emptyList(),
listener
),
@ -433,6 +461,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { "source-1", "source-*", "sou*" },
SOURCE_2,
null,
TEST_VALIDATIONS,
listener
),
@ -457,6 +486,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1 },
DEST_ALIAS,
null,
TEST_VALIDATIONS,
listener
),
@ -479,6 +509,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1 },
DEST_ALIAS,
null,
Collections.emptyList(),
listener
),
@ -493,6 +524,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1 },
ALIAS_READ_WRITE_DEST,
null,
TEST_VALIDATIONS,
listener
),
@ -519,6 +551,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1 },
SOURCE_1_ALIAS,
null,
TEST_VALIDATIONS,
listener
),
@ -537,6 +570,7 @@ public class SourceDestValidatorTests extends ESTestCase {
CLUSTER_STATE,
new String[] { SOURCE_1 },
SOURCE_1_ALIAS,
null,
Collections.emptyList(),
listener
),
@ -545,12 +579,60 @@ public class SourceDestValidatorTests extends ESTestCase {
);
}
public void testCheck_GivenMissingDestPipeline() throws Exception {
assertValidation(
listener -> simpleNonRemoteValidator.validate(
CLUSTER_STATE,
new String[] { SOURCE_1 },
"some-dest",
"missing-pipeline",
TEST_VALIDATIONS,
listener
),
(Boolean) null,
e -> {
assertEquals(1, e.validationErrors().size());
assertThat(
e.validationErrors().get(0),
equalTo("Pipeline with id [missing-pipeline] could not be found")
);
}
);
// Let's now pretend that pipeline exists
Map<String, Object> processorConfig0 = new HashMap<>();
Map<String, Object> processorConfig1 = new HashMap<>();
processorConfig0.put(ConfigurationUtils.TAG_KEY, "first-processor");
Map<String, Object> pipelineConfig = new HashMap<>();
pipelineConfig.put(Pipeline.DESCRIPTION_KEY, "_description");
pipelineConfig.put(Pipeline.VERSION_KEY, "1");
pipelineConfig.put(Pipeline.PROCESSORS_KEY,
Arrays.asList(Collections.singletonMap("test", processorConfig0), Collections.singletonMap("test", processorConfig1)));
Map<String, Processor.Factory> processorRegistry = Collections.singletonMap("test", new TestProcessor.Factory());
Pipeline pipeline = Pipeline.create("missing-pipeline", pipelineConfig, processorRegistry, null);
when(ingestService.getPipeline("missing-pipeline")).thenReturn(pipeline);
assertValidation(
listener -> simpleNonRemoteValidator.validate(
CLUSTER_STATE,
new String[] { SOURCE_1 },
"some-dest",
"missing-pipeline",
TEST_VALIDATIONS,
listener
),
true,
null
);
}
public void testCheck_MultipleValidationErrors() throws InterruptedException {
assertValidation(
listener -> simpleNonRemoteValidator.validate(
CLUSTER_STATE,
new String[] { SOURCE_1, "missing" },
SOURCE_1_ALIAS,
null,
TEST_VALIDATIONS,
listener
),
@ -575,8 +657,10 @@ public class SourceDestValidatorTests extends ESTestCase {
new IndexNameExpressionResolver(new ThreadContext(Settings.EMPTY)),
remoteClusterService,
remoteClusterLicenseCheckerBasic,
ingestService,
new String[] { REMOTE_BASIC + ":" + "SOURCE_1" },
"dest",
null,
"node_id",
"license"
)
@ -600,8 +684,10 @@ public class SourceDestValidatorTests extends ESTestCase {
remoteClusterService,
new RemoteClusterLicenseChecker(clientWithBasicLicense,
operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)),
ingestService,
new String[] { REMOTE_BASIC + ":" + "SOURCE_1" },
"dest",
null,
"node_id",
"platinum"
)
@ -630,8 +716,10 @@ public class SourceDestValidatorTests extends ESTestCase {
remoteClusterService,
new RemoteClusterLicenseChecker(clientWithPlatinumLicense,
operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)),
ingestService,
new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" },
"dest",
null,
"node_id",
"license"
)
@ -651,9 +739,11 @@ public class SourceDestValidatorTests extends ESTestCase {
remoteClusterService,
new RemoteClusterLicenseChecker(clientWithPlatinumLicense,
operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)),
ingestService,
new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" },
"dest",
"node_id",
null,
"platinum"
)
);
@ -673,9 +763,11 @@ public class SourceDestValidatorTests extends ESTestCase {
remoteClusterService,
new RemoteClusterLicenseChecker(clientWithTrialLicense,
operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)),
ingestService,
new String[] { REMOTE_PLATINUM + ":" + "SOURCE_1" },
"dest",
"node_id",
null,
"trial"
)
);
@ -697,8 +789,10 @@ public class SourceDestValidatorTests extends ESTestCase {
remoteClusterService,
new RemoteClusterLicenseChecker(clientWithExpiredBasicLicense,
operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)),
ingestService,
new String[] { REMOTE_BASIC + ":" + "SOURCE_1" },
"dest",
null,
"node_id",
"license"
)
@ -724,8 +818,10 @@ public class SourceDestValidatorTests extends ESTestCase {
remoteClusterService,
new RemoteClusterLicenseChecker(clientWithExpiredBasicLicense,
operationMode -> XPackLicenseState.isAllowedByOperationMode(operationMode, License.OperationMode.PLATINUM)),
ingestService,
new String[] { "non_existing_remote:" + "SOURCE_1" },
"dest",
null,
"node_id",
"license"
)

View File

@ -93,6 +93,7 @@ public class TransportPutDataFrameAnalyticsAction
indexNameExpressionResolver,
transportService.getRemoteClusterService(),
null,
null,
clusterService.getNodeName(),
License.OperationMode.PLATINUM.description()
);
@ -128,7 +129,7 @@ public class TransportPutDataFrameAnalyticsAction
listener::onFailure
);
sourceDestValidator.validate(clusterService.state(), config.getSource().getIndex(), config.getDest().getIndex(),
sourceDestValidator.validate(clusterService.state(), config.getSource().getIndex(), config.getDest().getIndex(), null,
SourceDestValidations.ALL_VALIDATIONS, sourceDestValidationListener);
}

View File

@ -132,6 +132,7 @@ public class TransportStartDataFrameAnalyticsAction
indexNameExpressionResolver,
transportService.getRemoteClusterService(),
null,
null,
clusterService.getNodeName(),
License.OperationMode.PLATINUM.description()
);
@ -313,7 +314,7 @@ public class TransportStartDataFrameAnalyticsAction
// Validate source/dest are valid
sourceDestValidator.validate(clusterService.state(), startContext.config.getSource().getIndex(),
startContext.config.getDest().getIndex(), SourceDestValidations.ALL_VALIDATIONS, ActionListener.wrap(
startContext.config.getDest().getIndex(), null, SourceDestValidations.ALL_VALIDATIONS, ActionListener.wrap(
aBoolean -> toValidateExtractionPossibleListener.onResponse(startContext), finalListener::onFailure));
},
finalListener::onFailure

View File

@ -260,7 +260,6 @@ setup:
body: >
{
"source": { "index": "airline-data" },
"dest": { "pipeline": "missing-pipeline" },
"pivot": {
"group_by": {
"time": {"date_histogram": {"fixed_interval": "1h", "field": "time"}}},
@ -275,7 +274,6 @@ setup:
body: >
{
"source": { "index": "airline-data" },
"dest": { "pipeline": "missing-pipeline" },
"pivot": {
"group_by": {
"time": {"date_histogram": {"fixed_interval": "1h", "field": "time"}}},

View File

@ -45,6 +45,16 @@ setup:
"description": "new description"
}
---
"Test update transform with missing pipeline":
- do:
catch: /Pipeline with id \[missing-transform-pipeline\] could not be found/
transform.update_transform:
transform_id: "updating-airline-transform"
body: >
{
"dest": { "index": "airline-data-by-airline", "pipeline": "missing-transform-pipeline" }
}
---
"Test update transform with frequency too low":
- do:
catch: /minimum permitted \[frequency\] is \[1s\]/

View File

@ -6,8 +6,6 @@
package org.elasticsearch.xpack.transform.action;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ingest.SimulateDocumentResult;
@ -30,6 +28,7 @@ import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.license.License;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
@ -63,7 +62,6 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
PreviewTransformAction.Request,
PreviewTransformAction.Response> {
private static final Logger logger = LogManager.getLogger(TransportPreviewTransformAction.class);
private static final int NUMBER_OF_PREVIEW_BUCKETS = 100;
private final XPackLicenseState licenseState;
private final Client client;
@ -80,7 +78,8 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
XPackLicenseState licenseState,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService,
Settings settings
Settings settings,
IngestService ingestService
) {
this(
PreviewTransformAction.NAME,
@ -91,7 +90,8 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
licenseState,
indexNameExpressionResolver,
clusterService,
settings
settings,
ingestService
);
}
@ -104,7 +104,8 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
XPackLicenseState licenseState,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService,
Settings settings
Settings settings,
IngestService ingestService
) {
super(name, transportService, actionFilters, PreviewTransformAction.Request::new);
this.licenseState = licenseState;
@ -117,6 +118,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
DiscoveryNode.isRemoteClusterClient(settings)
? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode)
: null,
ingestService,
clusterService.getNodeName(),
License.OperationMode.BASIC.description()
);
@ -137,6 +139,7 @@ public class TransportPreviewTransformAction extends HandledTransportAction<
clusterState,
config.getSource().getIndex(),
config.getDestination().getIndex(),
config.getDestination().getPipeline(),
SourceDestValidations.PREVIEW_VALIDATIONS,
ActionListener.wrap(r -> {
// create the function for validation

View File

@ -76,7 +76,6 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
private final SecurityContext securityContext;
private final TransformAuditor auditor;
private final SourceDestValidator sourceDestValidator;
private final IngestService ingestService;
@Inject
public TransportPutTransformAction(
@ -141,10 +140,10 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
DiscoveryNode.isRemoteClusterClient(settings)
? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode)
: null,
ingestService,
clusterService.getNodeName(),
License.OperationMode.BASIC.description()
);
this.ingestService = ingestService;
}
static HasPrivilegesRequest buildPrivilegeCheck(
@ -229,6 +228,7 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
clusterState,
config.getSource().getIndex(),
config.getDestination().getIndex(),
config.getDestination().getPipeline(),
request.isDeferValidation() ? SourceDestValidations.NON_DEFERABLE_VALIDATIONS : SourceDestValidations.ALL_VALIDATIONS,
ActionListener.wrap(
validationResponse -> {
@ -319,26 +319,11 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
);
function.validateConfig(ActionListener.wrap(r2 -> {
if (request.isDeferValidation()) {
validationListener.onResponse(true);
} else {
if (config.getDestination().getPipeline() != null) {
if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) {
listener.onFailure(
new ElasticsearchStatusException(
TransformMessages.getMessage(TransformMessages.PIPELINE_MISSING, config.getDestination().getPipeline()),
RestStatus.BAD_REQUEST
)
);
return;
}
}
if (request.isDeferValidation()) {
validationListener.onResponse(true);
} else {
function.validateQuery(client, config.getSource(), validationListener);
}
}
}, listener::onFailure));
}
}

View File

@ -140,6 +140,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
DiscoveryNode.isRemoteClusterClient(settings)
? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode)
: null,
ingestService,
clusterService.getNodeName(),
License.OperationMode.BASIC.description()
);
@ -269,22 +270,12 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
createTransform(config.getId(), config.getVersion(), config.getFrequency(), config.getSource().requiresRemoteCluster())
);
transformConfigHolder.set(config);
if (config.getDestination().getPipeline() != null) {
if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) {
listener.onFailure(
new ElasticsearchStatusException(
TransformMessages.getMessage(TransformMessages.PIPELINE_MISSING, config.getDestination().getPipeline()),
RestStatus.BAD_REQUEST
)
);
return;
}
}
sourceDestValidator.validate(
clusterService.state(),
config.getSource().getIndex(),
config.getDestination().getIndex(),
config.getDestination().getPipeline(),
SourceDestValidations.ALL_VALIDATIONS,
validationListener
);

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.license.License;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
@ -96,7 +97,8 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
ClusterService clusterService,
XPackLicenseState licenseState,
TransformServices transformServices,
Client client
Client client,
IngestService ingestService
) {
this(
UpdateTransformAction.NAME,
@ -108,7 +110,8 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
clusterService,
licenseState,
transformServices,
client
client,
ingestService
);
}
@ -122,7 +125,8 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
ClusterService clusterService,
XPackLicenseState licenseState,
TransformServices transformServices,
Client client
Client client,
IngestService ingestService
) {
super(
name,
@ -148,6 +152,7 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
DiscoveryNode.isRemoteClusterClient(settings)
? new RemoteClusterLicenseChecker(client, XPackLicenseState::isTransformAllowedForOperationMode)
: null,
ingestService,
clusterService.getNodeName(),
License.OperationMode.BASIC.description()
);
@ -237,6 +242,7 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
clusterState,
updatedConfig.getSource().getIndex(),
updatedConfig.getDestination().getIndex(),
updatedConfig.getDestination().getPipeline(),
request.isDeferValidation() ? SourceDestValidations.NON_DEFERABLE_VALIDATIONS : SourceDestValidations.ALL_VALIDATIONS,
ActionListener.wrap(
validationResponse -> {
@ -396,16 +402,11 @@ public class TransportUpdateTransformAction extends TransportTasksAction<Transfo
});
function.validateConfig(ActionListener.wrap(r2 -> {
if (request.isDeferValidation()) {
functionValidationListener.onResponse(true);
} else {
// TODO: it seems we are not validating ingest pipelines, consider to share code with PUT
if (request.isDeferValidation()) {
functionValidationListener.onResponse(true);
} else {
function.validateQuery(client, config.getSource(), functionValidationListener);
}
}
}, listener::onFailure));
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -29,7 +30,8 @@ public class TransportPreviewTransformActionDeprecated extends TransportPreviewT
XPackLicenseState licenseState,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService,
Settings settings
Settings settings,
IngestService ingestService
) {
super(
PreviewTransformActionDeprecated.NAME,
@ -40,7 +42,8 @@ public class TransportPreviewTransformActionDeprecated extends TransportPreviewT
licenseState,
indexNameExpressionResolver,
clusterService,
settings
settings,
ingestService
);
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -31,7 +32,8 @@ public class TransportUpdateTransformActionDeprecated extends TransportUpdateTra
ClusterService clusterService,
XPackLicenseState licenseState,
TransformServices transformServices,
Client client
Client client,
IngestService ingestService
) {
super(
UpdateTransformActionDeprecated.NAME,
@ -43,7 +45,8 @@ public class TransportUpdateTransformActionDeprecated extends TransportUpdateTra
clusterService,
licenseState,
transformServices,
client
client,
ingestService
);
}

View File

@ -13,6 +13,7 @@ import java.util.Collections;
import java.util.List;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_IN_SOURCE_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_PIPELINE_MISSING_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.DESTINATION_SINGLE_INDEX_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.REMOTE_SOURCE_VALIDATION;
import static org.elasticsearch.xpack.core.common.validation.SourceDestValidator.SOURCE_MISSING_VALIDATION;
@ -25,13 +26,14 @@ public final class SourceDestValidations {
private SourceDestValidations() {}
public static final List<SourceDestValidator.SourceDestValidation> PREVIEW_VALIDATIONS = Arrays.asList(
SOURCE_MISSING_VALIDATION, REMOTE_SOURCE_VALIDATION);
SOURCE_MISSING_VALIDATION, REMOTE_SOURCE_VALIDATION, DESTINATION_PIPELINE_MISSING_VALIDATION);
public static final List<SourceDestValidator.SourceDestValidation> ALL_VALIDATIONS = Arrays.asList(
SOURCE_MISSING_VALIDATION,
REMOTE_SOURCE_VALIDATION,
DESTINATION_IN_SOURCE_VALIDATION,
DESTINATION_SINGLE_INDEX_VALIDATION
DESTINATION_SINGLE_INDEX_VALIDATION,
DESTINATION_PIPELINE_MISSING_VALIDATION
);
public static final List<SourceDestValidator.SourceDestValidation> NON_DEFERABLE_VALIDATIONS = Collections.singletonList(