Remove enrich indices on delete policy (#45870)

When a policy is deleted, the enrich indices that are backing the policy
alias should also be deleted. This commit does that work and cleans up
the transport action a bit so that the lock release is easier to see, as
well as to ensure that any action carried out, regardless of exception,
unlocks the policy.
This commit is contained in:
Michael Basnight 2019-08-23 14:34:13 -05:00
parent a38e6850a5
commit a82d24b3ce
5 changed files with 196 additions and 50 deletions

View File

@ -6,10 +6,14 @@
package org.elasticsearch.xpack.enrich.action;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@ -36,6 +40,11 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
private final EnrichPolicyLocks enrichPolicyLocks;
private final IngestService ingestService;
private final Client client;
// the most lenient we can get in order to not bomb out if no indices are found, which is a valid case
// where a user creates and deletes a policy before running execute
private static final IndicesOptions LENIENT_OPTIONS = IndicesOptions.fromOptions(true, true, true, true);
@Inject
public TransportDeleteEnrichPolicyAction(TransportService transportService,
@ -43,10 +52,12 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
Client client,
EnrichPolicyLocks enrichPolicyLocks,
IngestService ingestService) {
super(DeleteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
DeleteEnrichPolicyAction.Request::new, indexNameExpressionResolver);
this.client = client;
this.enrichPolicyLocks = enrichPolicyLocks;
this.ingestService = ingestService;
}
@ -68,36 +79,74 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
@Override
protected void masterOperation(DeleteEnrichPolicyAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
enrichPolicyLocks.lockPolicy(request.getName());
List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state);
List<String> pipelinesWithProcessors = new ArrayList<>();
for (PipelineConfiguration pipelineConfiguration : pipelines) {
List<AbstractEnrichProcessor> enrichProcessors =
ingestService.getProcessorsInPipeline(pipelineConfiguration.getId(), AbstractEnrichProcessor.class);
for (AbstractEnrichProcessor processor: enrichProcessors) {
if (processor.getPolicyName().equals(request.getName())) {
pipelinesWithProcessors.add(pipelineConfiguration.getId());
}
}
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state); // ensure the policy exists first
if (policy == null) {
throw new ResourceNotFoundException("policy [{}] not found", request.getName());
}
if (pipelinesWithProcessors.isEmpty() == false) {
enrichPolicyLocks.lockPolicy(request.getName());
try {
List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
List<String> pipelinesWithProcessors = new ArrayList<>();
for (PipelineConfiguration pipelineConfiguration : pipelines) {
List<AbstractEnrichProcessor> enrichProcessors =
ingestService.getProcessorsInPipeline(pipelineConfiguration.getId(), AbstractEnrichProcessor.class);
for (AbstractEnrichProcessor processor : enrichProcessors) {
if (processor.getPolicyName().equals(request.getName())) {
pipelinesWithProcessors.add(pipelineConfiguration.getId());
}
}
}
if (pipelinesWithProcessors.isEmpty() == false) {
throw new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors);
}
} catch (Exception e) {
enrichPolicyLocks.releasePolicy(request.getName());
listener.onFailure(
new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors));
listener.onFailure(e);
return;
}
EnrichStore.deletePolicy(request.getName(), clusterService, e -> {
enrichPolicyLocks.releasePolicy(request.getName());
if (e == null) {
listener.onResponse(new AcknowledgedResponse(true));
} else {
listener.onFailure(e);
}
deleteIndicesAndPolicy(request.getName(), ActionListener.wrap(
(response) -> {
enrichPolicyLocks.releasePolicy(request.getName());
listener.onResponse(response);
},
(exc) -> {
enrichPolicyLocks.releasePolicy(request.getName());
listener.onFailure(exc);
}
));
}
private void deleteIndicesAndPolicy(String name, ActionListener<AcknowledgedResponse> listener) {
// delete all enrich indices for this policy
DeleteIndexRequest deleteRequest = new DeleteIndexRequest()
.indices(EnrichPolicy.getBaseName(name) + "-*")
.indicesOptions(LENIENT_OPTIONS);
client.admin().indices().delete(deleteRequest, ActionListener.wrap(
(response) -> {
if (response.isAcknowledged() == false) {
listener.onFailure(new ElasticsearchStatusException("Could not fetch indices to delete during policy delete of [{}]",
RestStatus.INTERNAL_SERVER_ERROR, name));
} else {
deletePolicy(name, listener);
}
},
(error) -> listener.onFailure(error)
));
}
private void deletePolicy(String name, ActionListener<AcknowledgedResponse> listener) {
EnrichStore.deletePolicy(name, clusterService, e -> {
if (e == null) {
listener.onResponse(new AcknowledgedResponse(true));
} else {
listener.onFailure(e);
}
});
}

View File

@ -63,7 +63,7 @@ public class TransportGetEnrichPolicyAction extends TransportMasterNodeReadActio
} else {
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state);
if (policy == null) {
throw new ResourceNotFoundException("Policy [{}] was not found", request.getName());
throw new ResourceNotFoundException("Policy [{}] not found", request.getName());
}
policies = Collections.singletonMap(request.getName(), policy);

View File

@ -34,7 +34,7 @@ public abstract class AbstractEnrichTestCase extends ESSingleNodeTestCase {
return error;
}
void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception {
protected void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>();
EnrichStore.deletePolicy(name, clusterService, e -> {

View File

@ -6,15 +6,19 @@
package org.elasticsearch.xpack.enrich.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.AbstractEnrichTestCase;
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
import org.elasticsearch.xpack.enrich.EnrichStore;
import org.junit.After;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
@ -24,9 +28,56 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
public class TransportDeleteEnricyPolicyActionTests extends AbstractEnrichTestCase {
public class TransportDeleteEnrichPolicyActionTests extends AbstractEnrichTestCase {
public void testDeleteIsNotLocked() throws InterruptedException {
@After
private void cleanupPolicy() {
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
String name = "my-policy";
try {
deleteEnrichPolicy(name, clusterService);
} catch (Exception e) {
// if the enrich policy does not exist, then just keep going
}
// fail if the state of this is left locked
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
}
public void testDeletePolicyDoesNotExistUnlocksPolicy() throws InterruptedException {
String fakeId = "fake-id";
createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo1");
createIndex(EnrichPolicy.getBaseName(fakeId) + "-foo2");
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> reference = new AtomicReference<>();
final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
transportAction.execute(null,
new DeleteEnrichPolicyAction.Request(fakeId),
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
fail();
}
public void onFailure(final Exception e) {
reference.set(e);
latch.countDown();
}
});
latch.await();
assertNotNull(reference.get());
assertThat(reference.get(), instanceOf(ResourceNotFoundException.class));
assertThat(reference.get().getMessage(), equalTo("policy [fake-id] not found"));
// fail if the state of this is left locked
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
}
public void testDeleteWithoutIndex() throws Exception {
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
String name = "my-policy";
@ -53,6 +104,56 @@ public class TransportDeleteEnricyPolicyActionTests extends AbstractEnrichTestCa
latch.await();
assertNotNull(reference.get());
assertTrue(reference.get().isAcknowledged());
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
assertNull(EnrichStore.getPolicy(name, clusterService.state()));
}
public void testDeleteIsNotLocked() throws Exception {
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
String name = "my-policy";
AtomicReference<Exception> error = saveEnrichPolicy(name, policy, clusterService);
assertThat(error.get(), nullValue());
createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
createIndex(EnrichPolicy.getBaseName(name) + "-foo2");
client().admin().indices().prepareGetIndex().setIndices(
EnrichPolicy.getBaseName(name) + "-foo1",
EnrichPolicy.getBaseName(name) + "-foo2").get();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();
final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
transportAction.execute(null,
new DeleteEnrichPolicyAction.Request(name),
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
reference.set(acknowledgedResponse);
latch.countDown();
}
public void onFailure(final Exception e) {
fail();
}
});
latch.await();
assertNotNull(reference.get());
assertTrue(reference.get().isAcknowledged());
expectThrows(IndexNotFoundException.class, () -> client().admin().indices().prepareGetIndex().setIndices(
EnrichPolicy.getBaseName(name) + "-foo1",
EnrichPolicy.getBaseName(name) + "-foo2").get());
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
assertNull(EnrichStore.getPolicy(name, clusterService.state()));
}
public void testDeleteLocked() throws InterruptedException {
@ -63,6 +164,9 @@ public class TransportDeleteEnricyPolicyActionTests extends AbstractEnrichTestCa
AtomicReference<Exception> error = saveEnrichPolicy(name, policy, clusterService);
assertThat(error.get(), nullValue());
createIndex(EnrichPolicy.getBaseName(name) + "-foo1");
createIndex(EnrichPolicy.getBaseName(name) + "-foo2");
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
@ -116,6 +220,8 @@ public class TransportDeleteEnricyPolicyActionTests extends AbstractEnrichTestCa
assertTrue(reference.get().isAcknowledged());
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
assertNull(EnrichStore.getPolicy(name, clusterService.state()));
}
}
}

View File

@ -8,13 +8,12 @@ package org.elasticsearch.xpack.enrich.action;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.core.enrich.action.GetEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.AbstractEnrichTestCase;
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
import org.junit.After;
import java.util.concurrent.CountDownLatch;
@ -30,6 +29,8 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
@After
private void cleanupPolicies() throws InterruptedException {
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<GetEnrichPolicyAction.Response> reference = new AtomicReference<>();
final TransportGetEnrichPolicyAction transportAction = node().injector().getInstance(TransportGetEnrichPolicyAction.class);
@ -52,26 +53,16 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
GetEnrichPolicyAction.Response response = reference.get();
for (EnrichPolicy.NamedPolicy policy: response.getPolicies()) {
final CountDownLatch loopLatch = new CountDownLatch(1);
final AtomicReference<AcknowledgedResponse> loopReference = new AtomicReference<>();
final TransportDeleteEnrichPolicyAction deleteAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
deleteAction.execute(null,
new DeleteEnrichPolicyAction.Request(policy.getName()),
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
loopReference.set(acknowledgedResponse);
loopLatch.countDown();
}
public void onFailure(final Exception e) {
fail();
}
});
loopLatch.await();
assertNotNull(loopReference.get());
assertTrue(loopReference.get().isAcknowledged());
try {
deleteEnrichPolicy(policy.getName(), clusterService);
} catch (Exception e) {
// if the enrich policy does not exist, then just keep going
}
}
// fail if the state of this is left locked
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
}
public void testListPolicies() throws InterruptedException {
@ -204,6 +195,6 @@ public class TransportGetEnrichPolicyActionTests extends AbstractEnrichTestCase
assertNotNull(reference.get());
assertThat(reference.get(), instanceOf(ResourceNotFoundException.class));
assertThat(reference.get().getMessage(),
equalTo("Policy [non-exists] was not found"));
equalTo("Policy [non-exists] not found"));
}
}