[Transform] fail to start/put on missing pipeline (#50701) (#50795)

If a pipeline referenced by a transform does not exist, we should not allow the transform to be created. 

We do allow the pipeline existence check to be skipped with defer_validations, but if the pipeline still does not exist on `_start`, the pipeline will fail to start.

relates:  #50135
This commit is contained in:
Benjamin Trent 2020-01-09 10:33:22 -05:00 committed by GitHub
parent 4f150e4961
commit 3e014d39c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 173 additions and 12 deletions

View File

@ -134,6 +134,7 @@ public class TransformDocumentationIT extends ESRestHighLevelClientTestCase {
.setIndex("pivot-destination")
.setPipeline("my-pipeline").build();
// end::put-transform-dest-config
destConfig = DestConfig.builder().setIndex("pivot-destination").build();
// tag::put-transform-group-config
GroupConfig groupConfig = GroupConfig.builder()
.groupBy("reviewer", // <1>

View File

@ -1226,7 +1226,23 @@ buildRestTests.setups['kibana_sample_data_ecommerce'] = '''
number_of_shards: 1
number_of_replicas: 0
'''
buildRestTests.setups['simple_kibana_continuous_pivot'] = buildRestTests.setups['kibana_sample_data_ecommerce'] + '''
buildRestTests.setups['add_timestamp_pipeline'] = '''
- do:
ingest.put_pipeline:
id: "add_timestamp_pipeline"
body: >
{
"processors": [
{
"set" : {
"field" : "@timestamp",
"value" : "{{_ingest.timestamp}}"
}
}
]
}
'''
buildRestTests.setups['simple_kibana_continuous_pivot'] = buildRestTests.setups['kibana_sample_data_ecommerce'] + buildRestTests.setups['add_timestamp_pipeline'] + '''
- do:
raw:
method: PUT

View File

@ -195,7 +195,7 @@ PUT _transform/ecommerce_transform
}
}
--------------------------------------------------
// TEST[setup:kibana_sample_data_ecommerce]
// TEST[setup:kibana_sample_data_ecommerce,add_timestamp_pipeline]
When the {transform} is created, you receive the following results:

View File

@ -27,6 +27,7 @@ public class TransformMessages {
public static final String REST_FAILED_TO_SERIALIZE_TRANSFORM = "Failed to serialise transform [{0}]";
public static final String TRANSFORM_FAILED_TO_PERSIST_STATS = "Failed to persist transform statistics for transform [{0}]";
public static final String UNKNOWN_TRANSFORM_STATS = "Statistics for transform [{0}] could not be found";
public static final String PIPELINE_MISSING = "Pipeline with id [{0}] could not be found";
public static final String REST_DEPRECATED_ENDPOINT = "[_data_frame/transforms/] is deprecated, use [_transform/] in the future.";

View File

@ -122,6 +122,22 @@ setup:
}
- match: { acknowledged: true }
- do:
ingest.put_pipeline:
id: "airline-pipeline"
body: >
{
"processors": [
{
"set" : {
"field" : "some_field",
"value" : 42
}
}
]
}
- match: { acknowledged: true }
- do:
transform.put_transform:
transform_id: "airline-transform-dos"
@ -631,3 +647,36 @@ setup:
transform_id: "airline-transform-start-delete"
force: true
- match: { acknowledged: true }
---
"Test put transform with missing pipeline":
- do:
catch: /Pipeline with id \[missing-transform-pipeline\] could not be found/
transform.put_transform:
transform_id: "airline-transform-with-missing-pipeline-crud"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline-with-pipeline", "pipeline": "missing-transform-pipeline" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"description": "yaml test transform on airline-data"
}
---
"Test put transform with missing pipeline and defer validations":
- do:
transform.put_transform:
defer_validation: true
transform_id: "airline-transform-with-missing-pipeline-crud-defer"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline", "pipeline": "missing-transform-pipeline" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"description": "yaml test transform on airline-data"
}
- match: {acknowledged: true}

View File

@ -376,3 +376,59 @@ teardown:
index: airline-data-time-alias
- match: { airline-data-time-alias.mappings.properties.time.type: date }
- match: { airline-data-time-alias.mappings.properties.avg_response.type: double }
---
"Test start transform with missing pipeline":
- do:
transform.put_transform:
defer_validation: true
transform_id: "airline-transform-with-missing-pipeline"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline-pipeline", "pipeline": "missing-transform-pipeline" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"description": "yaml test transform on airline-data"
}
- match: {acknowledged: true}
- do:
catch: /Pipeline with id \[missing-transform-pipeline\] could not be found/
transform.start_transform:
transform_id: "airline-transform-with-missing-pipeline"
---
"Test start transform with pipeline":
- do:
ingest.put_pipeline:
id: "transform-pipeline"
body: >
{
"processors": [
{
"set" : {
"field" : "some_field",
"value" : 42
}
}
]
}
- match: { acknowledged: true }
- do:
transform.put_transform:
transform_id: "airline-transform-with-pipeline"
body: >
{
"source": { "index": "airline-data" },
"dest": { "index": "airline-data-by-airline-pipeline", "pipeline": "transform-pipeline" },
"pivot": {
"group_by": { "airline": {"terms": {"field": "airline"}}},
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
},
"description": "yaml test transform on airline-data"
}
- match: {acknowledged: true}
- do:
transform.start_transform:
transform_id: "airline-transform-with-pipeline"

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.license.License;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
@ -73,6 +74,7 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
private final SecurityContext securityContext;
private final TransformAuditor auditor;
private final SourceDestValidator sourceDestValidator;
private final IngestService ingestService;
@Inject
public TransportPutTransformAction(
@ -84,7 +86,8 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
ClusterService clusterService,
XPackLicenseState licenseState,
TransformServices transformServices,
Client client
Client client,
IngestService ingestService
) {
this(
PutTransformAction.NAME,
@ -96,7 +99,8 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
clusterService,
licenseState,
transformServices,
client
client,
ingestService
);
}
@ -110,7 +114,8 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
ClusterService clusterService,
XPackLicenseState licenseState,
TransformServices transformServices,
Client client
Client client,
IngestService ingestService
) {
super(
name,
@ -137,6 +142,7 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
clusterService.getNodeName(),
License.OperationMode.BASIC.description()
);
this.ingestService = ingestService;
}
static HasPrivilegesRequest buildPrivilegeCheck(
@ -335,6 +341,16 @@ public class TransportPutTransformAction extends TransportMasterNodeAction<Reque
if (request.isDeferValidation()) {
pivotValidationListener.onResponse(true);
} else {
if (config.getDestination().getPipeline() != null) {
if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) {
listener.onFailure(new ElasticsearchStatusException(
TransformMessages.getMessage(TransformMessages.PIPELINE_MISSING, config.getDestination().getPipeline()),
RestStatus.BAD_REQUEST
)
);
return;
}
}
pivot.validateQuery(client, config.getSource(), pivotValidationListener);
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.license.License;
import org.elasticsearch.license.LicenseUtils;
import org.elasticsearch.license.RemoteClusterLicenseChecker;
@ -70,6 +71,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
private final Client client;
private final TransformAuditor auditor;
private final SourceDestValidator sourceDestValidator;
private final IngestService ingestService;
@Inject
public TransportStartTransformAction(
@ -82,7 +84,8 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
TransformServices transformServices,
PersistentTasksService persistentTasksService,
Client client,
Settings settings
Settings settings,
IngestService ingestService
) {
this(
StartTransformAction.NAME,
@ -95,7 +98,8 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
transformServices,
persistentTasksService,
client,
settings
settings,
ingestService
);
}
@ -110,7 +114,8 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
TransformServices transformServices,
PersistentTasksService persistentTasksService,
Client client,
Settings settings
Settings settings,
IngestService ingestService
) {
super(
name,
@ -135,6 +140,7 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
clusterService.getNodeName(),
License.OperationMode.BASIC.description()
);
this.ingestService = ingestService;
}
@Override
@ -256,6 +262,16 @@ public class TransportStartTransformAction extends TransportMasterNodeAction<Sta
}
transformTaskHolder.set(createTransform(config.getId(), config.getVersion(), config.getFrequency()));
transformConfigHolder.set(config);
if (config.getDestination().getPipeline() != null) {
if (ingestService.getPipeline(config.getDestination().getPipeline()) == null) {
listener.onFailure(new ElasticsearchStatusException(
TransformMessages.getMessage(TransformMessages.PIPELINE_MISSING, config.getDestination().getPipeline()),
RestStatus.BAD_REQUEST
)
);
return;
}
}
sourceDestValidator.validate(
clusterService.state(),

View File

@ -12,6 +12,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
@ -31,7 +32,8 @@ public class TransportPutTransformActionDeprecated extends TransportPutTransform
ClusterService clusterService,
XPackLicenseState licenseState,
TransformServices transformServices,
Client client
Client client,
IngestService ingestService
) {
super(
PutTransformActionDeprecated.NAME,
@ -43,7 +45,8 @@ public class TransportPutTransformActionDeprecated extends TransportPutTransform
clusterService,
licenseState,
transformServices,
client
client,
ingestService
);
}

View File

@ -12,6 +12,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.threadpool.ThreadPool;
@ -33,7 +34,8 @@ public class TransportStartTransformActionDeprecated extends TransportStartTrans
TransformServices transformServices,
PersistentTasksService persistentTasksService,
Client client,
Settings settings
Settings settings,
IngestService ingestService
) {
super(
StartTransformActionDeprecated.NAME,
@ -46,7 +48,8 @@ public class TransportStartTransformActionDeprecated extends TransportStartTrans
transformServices,
persistentTasksService,
client,
settings
settings,
ingestService
);
}
}