Fail delete policy if pipeline exists (#44438)
If a pipeline that refrences the policy exists, we should not allow the policy to be deleted. The user will need to remove the processor from the pipeline before deleting the policy. This commit adds a check to ensure that the policy cannot be deleted if it is referenced by any pipeline in the system.
This commit is contained in:
parent
43b8ab607d
commit
52a094b177
|
@ -950,6 +950,7 @@ Which returns:
|
|||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
DELETE /_ingest/pipeline/user_lookup
|
||||
DELETE /_enrich/policy/users-policy
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
|
|
|
@ -92,6 +92,7 @@ PUT _ingest/pipeline/user_lookup
|
|||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
DELETE /_ingest/pipeline/user_lookup
|
||||
DELETE /_enrich/policy/users-policy
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
||||
|
|
|
@ -33,7 +33,7 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testBasicFlow() throws Exception {
|
||||
private void setupGenericLifecycleTest(boolean deletePipeilne) throws Exception {
|
||||
// Create the policy:
|
||||
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
|
||||
putPolicyRequest.setJsonEntity("{\"type\": \"exact_match\",\"indices\": [\"my-source-index\"], \"match_field\": \"host\", " +
|
||||
|
@ -75,6 +75,15 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
|
|||
assertThat(_source.get("host"), equalTo("elastic.co"));
|
||||
assertThat(_source.get("global_rank"), equalTo(25));
|
||||
assertThat(_source.get("tld_rank"), equalTo(7));
|
||||
|
||||
if (deletePipeilne) {
|
||||
// delete the pipeline so the policies can be deleted
|
||||
client().performRequest(new Request("DELETE", "/_ingest/pipeline/my_pipeline"));
|
||||
}
|
||||
}
|
||||
|
||||
public void testBasicFlow() throws Exception {
|
||||
setupGenericLifecycleTest(true);
|
||||
}
|
||||
|
||||
public void testImmutablePolicy() throws IOException {
|
||||
|
@ -87,6 +96,40 @@ public abstract class CommonEnrichRestTestCase extends ESRestTestCase {
|
|||
assertTrue(exc.getMessage().contains("policy [my_policy] already exists"));
|
||||
}
|
||||
|
||||
public void testDeleteIsCaseSensitive() throws Exception {
|
||||
Request putPolicyRequest = new Request("PUT", "/_enrich/policy/my_policy");
|
||||
putPolicyRequest.setJsonEntity("{\"type\": \"exact_match\",\"indices\": [\"my-source-index\"], \"enrich_key\": \"host\", " +
|
||||
"\"enrich_values\": [\"globalRank\", \"tldRank\", \"tld\"]}");
|
||||
assertOK(client().performRequest(putPolicyRequest));
|
||||
|
||||
ResponseException exc = expectThrows(ResponseException.class,
|
||||
() -> client().performRequest(new Request("DELETE", "/_enrich/policy/MY_POLICY")));
|
||||
assertTrue(exc.getMessage().contains("policy [MY_POLICY] not found"));
|
||||
}
|
||||
|
||||
public void testDeleteExistingPipeline() throws Exception {
|
||||
// lets not delete the pipeline at first, to test the failure
|
||||
setupGenericLifecycleTest(false);
|
||||
|
||||
Request putPipelineRequest = new Request("PUT", "/_ingest/pipeline/another_pipeline");
|
||||
putPipelineRequest.setJsonEntity("{\"processors\":[" +
|
||||
"{\"enrich\":{\"policy_name\":\"my_policy\",\"enrich_key\":\"host\",\"set_from\":[" +
|
||||
"{\"source\":\"globalRank\",\"target\":\"global_rank\"}," +
|
||||
"{\"source\":\"tldRank\",\"target\":\"tld_rank\"}" +
|
||||
"]}}" +
|
||||
"]}");
|
||||
assertOK(client().performRequest(putPipelineRequest));
|
||||
|
||||
ResponseException exc = expectThrows(ResponseException.class,
|
||||
() -> client().performRequest(new Request("DELETE", "/_enrich/policy/my_policy")));
|
||||
assertTrue(exc.getMessage().contains("Could not delete policy [my_policy] because" +
|
||||
" a pipeline is referencing it [my_pipeline, another_pipeline]"));
|
||||
|
||||
// delete the pipelines so the policies can be deleted
|
||||
client().performRequest(new Request("DELETE", "/_ingest/pipeline/my_pipeline"));
|
||||
client().performRequest(new Request("DELETE", "/_ingest/pipeline/another_pipeline"));
|
||||
}
|
||||
|
||||
private static Map<String, Object> toMap(Response response) throws IOException {
|
||||
return toMap(EntityUtils.toString(response.getEntity()));
|
||||
}
|
||||
|
|
|
@ -0,0 +1,22 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
package org.elasticsearch.xpack.enrich;
|
||||
|
||||
import org.elasticsearch.ingest.AbstractProcessor;
|
||||
|
||||
public abstract class AbstractEnrichProcessor extends AbstractProcessor {
|
||||
|
||||
private final String policyName;
|
||||
|
||||
protected AbstractEnrichProcessor(String tag, String policyName) {
|
||||
super(tag);
|
||||
this.policyName = policyName;
|
||||
}
|
||||
|
||||
public String getPolicyName() {
|
||||
return policyName;
|
||||
}
|
||||
}
|
|
@ -12,7 +12,6 @@ import org.elasticsearch.client.Client;
|
|||
import org.elasticsearch.cluster.routing.Preference;
|
||||
import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
|
||||
import org.elasticsearch.index.query.TermQueryBuilder;
|
||||
import org.elasticsearch.ingest.AbstractProcessor;
|
||||
import org.elasticsearch.ingest.IngestDocument;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.builder.SearchSourceBuilder;
|
||||
|
@ -24,12 +23,11 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
final class ExactMatchProcessor extends AbstractProcessor {
|
||||
public final class ExactMatchProcessor extends AbstractEnrichProcessor {
|
||||
|
||||
static final String ENRICH_KEY_FIELD_NAME = "enrich_key_field";
|
||||
|
||||
private final BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> searchRunner;
|
||||
private final String policyName;
|
||||
private final String enrichKey;
|
||||
private final boolean ignoreMissing;
|
||||
private final boolean overrideEnabled;
|
||||
|
@ -60,9 +58,8 @@ final class ExactMatchProcessor extends AbstractProcessor {
|
|||
boolean ignoreMissing,
|
||||
boolean overrideEnabled,
|
||||
List<EnrichSpecification> specifications) {
|
||||
super(tag);
|
||||
super(tag, policyName);
|
||||
this.searchRunner = searchRunner;
|
||||
this.policyName = policyName;
|
||||
this.enrichKey = enrichKey;
|
||||
this.ignoreMissing = ignoreMissing;
|
||||
this.overrideEnabled = overrideEnabled;
|
||||
|
@ -89,7 +86,7 @@ final class ExactMatchProcessor extends AbstractProcessor {
|
|||
searchBuilder.query(constantScore);
|
||||
|
||||
SearchRequest req = new SearchRequest();
|
||||
req.indices(EnrichPolicy.getBaseName(policyName));
|
||||
req.indices(EnrichPolicy.getBaseName(getPolicyName()));
|
||||
req.preference(Preference.LOCAL.type());
|
||||
req.source(searchBuilder);
|
||||
|
||||
|
@ -137,10 +134,6 @@ final class ExactMatchProcessor extends AbstractProcessor {
|
|||
return EnrichProcessorFactory.TYPE;
|
||||
}
|
||||
|
||||
String getPolicyName() {
|
||||
return policyName;
|
||||
}
|
||||
|
||||
String getEnrichKey() {
|
||||
return enrichKey;
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.enrich.action;
|
||||
|
||||
import org.elasticsearch.ElasticsearchStatusException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
|
@ -16,23 +17,34 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.ingest.IngestService;
|
||||
import org.elasticsearch.ingest.PipelineConfiguration;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
||||
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
|
||||
import org.elasticsearch.xpack.enrich.AbstractEnrichProcessor;
|
||||
import org.elasticsearch.xpack.enrich.EnrichStore;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction<DeleteEnrichPolicyAction.Request, AcknowledgedResponse> {
|
||||
|
||||
private final IngestService ingestService;
|
||||
|
||||
@Inject
|
||||
public TransportDeleteEnrichPolicyAction(TransportService transportService,
|
||||
ClusterService clusterService,
|
||||
ThreadPool threadPool,
|
||||
ActionFilters actionFilters,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver) {
|
||||
IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
IngestService ingestService) {
|
||||
super(DeleteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
|
||||
DeleteEnrichPolicyAction.Request::new, indexNameExpressionResolver);
|
||||
this.ingestService = ingestService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -52,6 +64,26 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
|
|||
@Override
|
||||
protected void masterOperation(DeleteEnrichPolicyAction.Request request, ClusterState state,
|
||||
ActionListener<AcknowledgedResponse> listener) throws Exception {
|
||||
List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
|
||||
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state);
|
||||
List<String> pipelinesWithProcessors = new ArrayList<>();
|
||||
|
||||
for (PipelineConfiguration pipelineConfiguration : pipelines) {
|
||||
List<AbstractEnrichProcessor> enrichProcessors =
|
||||
ingestService.getProcessorsInPipeline(pipelineConfiguration.getId(), AbstractEnrichProcessor.class);
|
||||
for (AbstractEnrichProcessor processor: enrichProcessors) {
|
||||
if (processor.getPolicyName().equals(request.getName())) {
|
||||
pipelinesWithProcessors.add(pipelineConfiguration.getId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pipelinesWithProcessors.isEmpty() == false) {
|
||||
listener.onFailure(
|
||||
new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
|
||||
RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors));
|
||||
}
|
||||
|
||||
EnrichStore.deletePolicy(request.getName(), clusterService, e -> {
|
||||
if (e == null) {
|
||||
listener.onResponse(new AcknowledgedResponse(true));
|
||||
|
|
Loading…
Reference in New Issue