diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyAction.java index e878289eea3..615c6295438 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyAction.java @@ -6,10 +6,14 @@ package org.elasticsearch.xpack.enrich.action; import org.elasticsearch.ElasticsearchStatusException; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.TransportMasterNodeAction; +import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; @@ -36,6 +40,11 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction private final EnrichPolicyLocks enrichPolicyLocks; private final IngestService ingestService; + private final Client client; + // the most lenient we can get in order to not bomb out if no indices are found, which is a valid case + // where a user creates and deletes a policy before running execute + private static final IndicesOptions LENIENT_OPTIONS = IndicesOptions.fromOptions(true, true, true, true); + @Inject public TransportDeleteEnrichPolicyAction(TransportService transportService, @@ -43,10 +52,12 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + Client client, EnrichPolicyLocks enrichPolicyLocks, IngestService ingestService) { super(DeleteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters, DeleteEnrichPolicyAction.Request::new, indexNameExpressionResolver); + this.client = client; this.enrichPolicyLocks = enrichPolicyLocks; this.ingestService = ingestService; } @@ -68,36 +79,74 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction @Override protected void masterOperation(DeleteEnrichPolicyAction.Request request, ClusterState state, ActionListener listener) throws Exception { - enrichPolicyLocks.lockPolicy(request.getName()); - List pipelines = IngestService.getPipelines(state); - EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state); - List pipelinesWithProcessors = new ArrayList<>(); - - for (PipelineConfiguration pipelineConfiguration : pipelines) { - List enrichProcessors = - ingestService.getProcessorsInPipeline(pipelineConfiguration.getId(), AbstractEnrichProcessor.class); - for (AbstractEnrichProcessor processor: enrichProcessors) { - if (processor.getPolicyName().equals(request.getName())) { - pipelinesWithProcessors.add(pipelineConfiguration.getId()); - } - } + EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state); // ensure the policy exists first + if (policy == null) { + throw new ResourceNotFoundException("policy [{}] not found", request.getName()); } - if (pipelinesWithProcessors.isEmpty() == false) { + enrichPolicyLocks.lockPolicy(request.getName()); + try { + List pipelines = IngestService.getPipelines(state); + List pipelinesWithProcessors = new ArrayList<>(); + + for (PipelineConfiguration pipelineConfiguration : pipelines) { + List enrichProcessors = + ingestService.getProcessorsInPipeline(pipelineConfiguration.getId(), AbstractEnrichProcessor.class); + for (AbstractEnrichProcessor processor : enrichProcessors) { + if (processor.getPolicyName().equals(request.getName())) { + pipelinesWithProcessors.add(pipelineConfiguration.getId()); + } + } + } + + if (pipelinesWithProcessors.isEmpty() == false) { + throw new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}", + RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors); + } + } catch (Exception e) { enrichPolicyLocks.releasePolicy(request.getName()); - listener.onFailure( - new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}", - RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors)); + listener.onFailure(e); return; } - EnrichStore.deletePolicy(request.getName(), clusterService, e -> { - enrichPolicyLocks.releasePolicy(request.getName()); - if (e == null) { - listener.onResponse(new AcknowledgedResponse(true)); - } else { - listener.onFailure(e); - } + deleteIndicesAndPolicy(request.getName(), ActionListener.wrap( + (response) -> { + enrichPolicyLocks.releasePolicy(request.getName()); + listener.onResponse(response); + }, + (exc) -> { + enrichPolicyLocks.releasePolicy(request.getName()); + listener.onFailure(exc); + } + )); + } + + private void deleteIndicesAndPolicy(String name, ActionListener listener) { + // delete all enrich indices for this policy + DeleteIndexRequest deleteRequest = new DeleteIndexRequest() + .indices(EnrichPolicy.getBaseName(name) + "-*") + .indicesOptions(LENIENT_OPTIONS); + + client.admin().indices().delete(deleteRequest, ActionListener.wrap( + (response) -> { + if (response.isAcknowledged() == false) { + listener.onFailure(new ElasticsearchStatusException("Could not fetch indices to delete during policy delete of [{}]", + RestStatus.INTERNAL_SERVER_ERROR, name)); + } else { + deletePolicy(name, listener); + } + }, + (error) -> listener.onFailure(error) + )); + } + + private void deletePolicy(String name, ActionListener listener) { + EnrichStore.deletePolicy(name, clusterService, e -> { + if (e == null) { + listener.onResponse(new AcknowledgedResponse(true)); + } else { + listener.onFailure(e); + } }); } diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyAction.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyAction.java index ac3d2f1a01d..b8cf46d7386 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyAction.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyAction.java @@ -63,7 +63,7 @@ public class TransportGetEnrichPolicyAction extends TransportMasterNodeReadActio } else { EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state); if (policy == null) { - throw new ResourceNotFoundException("Policy [{}] was not found", request.getName()); + throw new ResourceNotFoundException("Policy [{}] not found", request.getName()); } policies = Collections.singletonMap(request.getName(), policy); diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/AbstractEnrichTestCase.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/AbstractEnrichTestCase.java index 056c950a819..6d52a1d1e12 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/AbstractEnrichTestCase.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/AbstractEnrichTestCase.java @@ -34,7 +34,7 @@ public abstract class AbstractEnrichTestCase extends ESSingleNodeTestCase { return error; } - void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception { + protected void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception { CountDownLatch latch = new CountDownLatch(1); AtomicReference error = new AtomicReference<>(); EnrichStore.deletePolicy(name, clusterService, e -> { diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnricyPolicyActionTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyActionTests.java similarity index 52% rename from x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnricyPolicyActionTests.java rename to x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyActionTests.java index 49ba1cb1e84..9d03fa1b8e4 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnricyPolicyActionTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportDeleteEnrichPolicyActionTests.java @@ -6,15 +6,19 @@ package org.elasticsearch.xpack.enrich.action; +import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction; import org.elasticsearch.xpack.enrich.AbstractEnrichTestCase; import org.elasticsearch.xpack.enrich.EnrichPolicyLocks; +import org.elasticsearch.xpack.enrich.EnrichStore; +import org.junit.After; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicReference; @@ -24,9 +28,56 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.core.IsInstanceOf.instanceOf; -public class TransportDeleteEnricyPolicyActionTests extends AbstractEnrichTestCase { +public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCase { - public void testDeleteIsNotLocked() throws InterruptedException { + @After + private void cleanupPolicy() { + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + String name = "my-policy"; + + try { + deleteEnrichPolicy(name, clusterService); + } catch (Exception e) { + // if the enrich policy does not exist, then just keep going + } + + // fail if the state of this is left locked + EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class); + assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight()); + } + + public void testDeletePolicyDoesNotExistUnlocksPolicy() throws InterruptedException { + String fakeId = "fake-id"; + createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo1"); + createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo2"); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference reference = new AtomicReference<>(); + final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class); + transportAction.execute(null, + new DeleteEnrichPolicyAction.Request(fakeId), + new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + fail(); + } + + public void onFailure(final Exception e) { + reference.set(e); + latch.countDown(); + } + }); + latch.await(); + assertNotNull(reference.get()); + assertThat(reference.get(), instanceOf(ResourceNotFoundException.class)); + assertThat(reference.get().getMessage(), equalTo("policy [fake-id] not found")); + + // fail if the state of this is left locked + EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class); + assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight()); + } + + public void testDeleteWithoutIndex() throws Exception { EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON); ClusterService clusterService = getInstanceFromNode(ClusterService.class); String name = "my-policy"; @@ -53,6 +104,56 @@ public class TransportDeleteEnricyPolicyActionTests extends AbstractEnrichTestCa latch.await(); assertNotNull(reference.get()); assertTrue(reference.get().isAcknowledged()); + + EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class); + assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight()); + + assertNull(EnrichStore.getPolicy(name, clusterService.state())); + } + + public void testDeleteIsNotLocked() throws Exception { + EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON); + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + String name = "my-policy"; + + AtomicReference error = saveEnrichPolicy(name, policy, clusterService); + assertThat(error.get(), nullValue()); + + createIndex(EnrichPolicy.getBaseName(name) + "-foo1"); + createIndex(EnrichPolicy.getBaseName(name) + "-foo2"); + + client().admin().indices().prepareGetIndex().setIndices( + EnrichPolicy.getBaseName(name) + "-foo1", + EnrichPolicy.getBaseName(name) + "-foo2").get(); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference reference = new AtomicReference<>(); + final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class); + transportAction.execute(null, + new DeleteEnrichPolicyAction.Request(name), + new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + reference.set(acknowledgedResponse); + latch.countDown(); + } + + public void onFailure(final Exception e) { + fail(); + } + }); + latch.await(); + assertNotNull(reference.get()); + assertTrue(reference.get().isAcknowledged()); + + expectThrows(IndexNotFoundException.class, () -> client().admin().indices().prepareGetIndex().setIndices( + EnrichPolicy.getBaseName(name) + "-foo1", + EnrichPolicy.getBaseName(name) + "-foo2").get()); + + EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class); + assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight()); + + assertNull(EnrichStore.getPolicy(name, clusterService.state())); } public void testDeleteLocked() throws InterruptedException { @@ -63,6 +164,9 @@ public class TransportDeleteEnricyPolicyActionTests extends AbstractEnrichTestCa AtomicReference error = saveEnrichPolicy(name, policy, clusterService); assertThat(error.get(), nullValue()); + createIndex(EnrichPolicy.getBaseName(name) + "-foo1"); + createIndex(EnrichPolicy.getBaseName(name) + "-foo2"); + EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class); assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight()); @@ -116,6 +220,8 @@ public class TransportDeleteEnricyPolicyActionTests extends AbstractEnrichTestCa assertTrue(reference.get().isAcknowledged()); assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight()); + + assertNull(EnrichStore.getPolicy(name, clusterService.state())); } } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java index a3dab01bce1..db8710381e5 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/action/TransportGetEnrichPolicyActionTests.java @@ -8,13 +8,12 @@ package org.elasticsearch.xpack.enrich.action; import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; -import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction; import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction; import org.elasticsearch.xpack.enrich.AbstractEnrichTestCase; +import org.elasticsearch.xpack.enrich.EnrichPolicyLocks; import org.junit.After; import java.util.concurrent.CountDownLatch; @@ -30,6 +29,8 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase @After private void cleanupPolicies() throws InterruptedException { + ClusterService clusterService = getInstanceFromNode(ClusterService.class); + final CountDownLatch latch = new CountDownLatch(1); final AtomicReference reference = new AtomicReference<>(); final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class); @@ -52,26 +53,16 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase GetEnrichPolicyAction.Response response = reference.get(); for (EnrichPolicy.NamedPolicy policy: response.getPolicies()) { - final CountDownLatch loopLatch = new CountDownLatch(1); - final AtomicReference loopReference = new AtomicReference<>(); - final TransportDeleteEnrichPolicyAction deleteAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class); - deleteAction.execute(null, - new DeleteEnrichPolicyAction.Request(policy.getName()), - new ActionListener() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - loopReference.set(acknowledgedResponse); - loopLatch.countDown(); - } - - public void onFailure(final Exception e) { - fail(); - } - }); - loopLatch.await(); - assertNotNull(loopReference.get()); - assertTrue(loopReference.get().isAcknowledged()); + try { + deleteEnrichPolicy(policy.getName(), clusterService); + } catch (Exception e) { + // if the enrich policy does not exist, then just keep going + } } + + // fail if the state of this is left locked + EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class); + assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight()); } public void testListPolicies() throws InterruptedException { @@ -204,6 +195,6 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase assertNotNull(reference.get()); assertThat(reference.get(), instanceOf(ResourceNotFoundException.class)); assertThat(reference.get().getMessage(), - equalTo("Policy [non-exists] was not found")); + equalTo("Policy [non-exists] not found")); } }