Prevent delete policy for active executing policy (#45472)

This commit adds a lock to the delete policy, in the same way that the
locking is done for policy execution. It also creates a test to exercise
the delete transport action, and modifies an existing test to provide a
common set of functions for saving and deleting policies.
This commit is contained in:
Michael Basnight 2019-08-15 09:41:04 -05:00
parent 03f45dad57
commit db57d2206a
8 changed files with 204 additions and 57 deletions

View File

@ -19,20 +19,24 @@ import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
* state of any policy executions in flight. This execution state can be captured and then later be used to verify that no policy
* executions have started in the time between the first state capturing.
*/
class EnrichPolicyLocks {
public class EnrichPolicyLocks {
/**
* A snapshot in time detailing if any policy executions are in flight and total number of local executions that
* have been kicked off since the node has started
*/
static class EnrichPolicyExecutionState {
final boolean arePoliciesInFlight;
public static class EnrichPolicyExecutionState {
final boolean anyPolicyInFlight;
final long executions;
EnrichPolicyExecutionState(boolean arePoliciesInFlight, long executions) {
this.arePoliciesInFlight = arePoliciesInFlight;
EnrichPolicyExecutionState(boolean anyPolicyInFlight, long executions) {
this.anyPolicyInFlight = anyPolicyInFlight;
this.executions = executions;
}
public boolean isAnyPolicyInFlight() {
return anyPolicyInFlight;
}
}
/**
@ -54,19 +58,19 @@ class EnrichPolicyLocks {
private final AtomicLong policyRunCounter = new AtomicLong(0L);
/**
* Locks a policy for execution. If the policy is currently executing, this method will immediately throw without waiting.
* This method only blocks if another thread is currently capturing the current policy execution state.
* Locks a policy to prevent concurrent execution. If the policy is currently executing, this method will immediately
* throw without waiting. This method only blocks if another thread is currently capturing the current policy execution state.
* @param policyName The policy name to lock for execution
* @throws EsRejectedExecutionException if the policy is locked already or if the maximum number of concurrent policy executions
* has been reached
*/
void lockPolicy(String policyName) {
public void lockPolicy(String policyName) {
currentStateLock.readLock().lock();
try {
Semaphore runLock = policyLocks.computeIfAbsent(policyName, (name) -> new Semaphore(1));
boolean acquired = runLock.tryAcquire();
if (acquired == false) {
throw new EsRejectedExecutionException("Policy execution failed. Policy execution for [" + policyName +
throw new EsRejectedExecutionException("Could not obtain lock because policy execution for [" + policyName +
"] is already in progress.");
}
policyRunCounter.incrementAndGet();
@ -80,7 +84,7 @@ class EnrichPolicyLocks {
* currently starting its execution and returns an appropriate state.
* @return The current state of in-flight policy executions
*/
EnrichPolicyExecutionState captureExecutionState() {
public EnrichPolicyExecutionState captureExecutionState() {
if (currentStateLock.writeLock().tryLock()) {
try {
long revision = policyRunCounter.get();
@ -101,7 +105,7 @@ class EnrichPolicyLocks {
*/
boolean isSameState(EnrichPolicyExecutionState previousState) {
EnrichPolicyExecutionState currentState = captureExecutionState();
return currentState.arePoliciesInFlight == previousState.arePoliciesInFlight &&
return currentState.anyPolicyInFlight == previousState.anyPolicyInFlight &&
currentState.executions == previousState.executions;
}
@ -109,7 +113,7 @@ class EnrichPolicyLocks {
* Releases the lock for a given policy name, allowing it to be executed.
* @param policyName The policy to release.
*/
void releasePolicy(String policyName) {
public void releasePolicy(String policyName) {
currentStateLock.readLock().lock();
try {
policyLocks.remove(policyName);

View File

@ -137,7 +137,7 @@ public class EnrichPolicyMaintenanceService implements LocalNodeMasterListener {
.indicesOptions(IndicesOptions.lenientExpand());
// Check that no enrich policies are being executed
final EnrichPolicyLocks.EnrichPolicyExecutionState executionState = enrichPolicyLocks.captureExecutionState();
if (executionState.arePoliciesInFlight == false) {
if (executionState.isAnyPolicyInFlight() == false) {
client.admin().indices().getIndex(indices, new ActionListener<GetIndexResponse>() {
@Override
public void onResponse(GetIndexResponse getIndexResponse) {

View File

@ -25,6 +25,7 @@ import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.AbstractEnrichProcessor;
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
import org.elasticsearch.xpack.enrich.EnrichStore;
import java.io.IOException;
@ -33,6 +34,7 @@ import java.util.List;
public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction<DeleteEnrichPolicyAction.Request, AcknowledgedResponse> {
private final EnrichPolicyLocks enrichPolicyLocks;
private final IngestService ingestService;
@Inject
@ -41,9 +43,11 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
ThreadPool threadPool,
ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
EnrichPolicyLocks enrichPolicyLocks,
IngestService ingestService) {
super(DeleteEnrichPolicyAction.NAME, transportService, clusterService, threadPool, actionFilters,
DeleteEnrichPolicyAction.Request::new, indexNameExpressionResolver);
this.enrichPolicyLocks = enrichPolicyLocks;
this.ingestService = ingestService;
}
@ -64,6 +68,7 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
@Override
protected void masterOperation(DeleteEnrichPolicyAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) throws Exception {
enrichPolicyLocks.lockPolicy(request.getName());
List<PipelineConfiguration> pipelines = IngestService.getPipelines(state);
EnrichPolicy policy = EnrichStore.getPolicy(request.getName(), state);
List<String> pipelinesWithProcessors = new ArrayList<>();
@ -79,6 +84,7 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
}
if (pipelinesWithProcessors.isEmpty() == false) {
enrichPolicyLocks.releasePolicy(request.getName());
listener.onFailure(
new ElasticsearchStatusException("Could not delete policy [{}] because a pipeline is referencing it {}",
RestStatus.CONFLICT, request.getName(), pipelinesWithProcessors));
@ -86,6 +92,7 @@ public class TransportDeleteEnrichPolicyAction extends TransportMasterNodeAction
}
EnrichStore.deletePolicy(request.getName(), clusterService, e -> {
enrichPolicyLocks.releasePolicy(request.getName());
if (e == null) {
listener.onResponse(new AcknowledgedResponse(true));
} else {

View File

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
public abstract class AbstractEnrichTestCase extends ESSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singletonList(LocalStateEnrich.class);
}
protected AtomicReference<Exception> saveEnrichPolicy(String name, EnrichPolicy policy,
ClusterService clusterService) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>();
EnrichStore.putPolicy(name, policy, clusterService, e -> {
error.set(e);
latch.countDown();
});
latch.await();
return error;
}
void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>();
EnrichStore.deletePolicy(name, clusterService, e -> {
error.set(e);
latch.countDown();
});
latch.await();
if (error.get() != null){
throw error.get();
}
}
}

View File

@ -127,7 +127,7 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
firstTaskComplete.await();
// Validate exception from second run
assertThat(expected.getMessage(), containsString("Policy execution failed. Policy execution for [" + testPolicyName +
assertThat(expected.getMessage(), containsString("Could not obtain lock because policy execution for [" + testPolicyName +
"] is already in progress."));
// Ensure that the lock from the previous run has been cleared

View File

@ -24,14 +24,14 @@ public class EnrichPolicyLocksTests extends ESTestCase {
// Ensure that locked policies are rejected
EsRejectedExecutionException exception1 = expectThrows(EsRejectedExecutionException.class,
() -> policyLocks.lockPolicy(policy1));
assertThat(exception1.getMessage(), is(equalTo("Policy execution failed. Policy execution for [policy1]" +
assertThat(exception1.getMessage(), is(equalTo("Could not obtain lock because policy execution for [policy1]" +
" is already in progress.")));
policyLocks.lockPolicy(policy2);
EsRejectedExecutionException exception2 = expectThrows(EsRejectedExecutionException.class,
() -> policyLocks.lockPolicy(policy2));
assertThat(exception2.getMessage(), is(equalTo("Policy execution failed. Policy execution for [policy2]" +
assertThat(exception2.getMessage(), is(equalTo("Could not obtain lock because policy execution for [policy2]" +
" is already in progress.")));
}
@ -42,13 +42,13 @@ public class EnrichPolicyLocksTests extends ESTestCase {
// Get exec state - should note as safe and revision 1 since nothing has happened yet
executionState = policyLocks.captureExecutionState();
assertThat(executionState.arePoliciesInFlight, is(false));
assertThat(executionState.anyPolicyInFlight, is(false));
assertThat(executionState.executions, is(0L));
assertThat(policyLocks.isSameState(executionState), is(true));
// Get another exec state - should still note as safe and revision 1 since nothing has happened yet
executionState = policyLocks.captureExecutionState();
assertThat(executionState.arePoliciesInFlight, is(false));
assertThat(executionState.anyPolicyInFlight, is(false));
assertThat(executionState.executions, is(0L));
assertThat(policyLocks.isSameState(executionState), is(true));
@ -57,7 +57,7 @@ public class EnrichPolicyLocksTests extends ESTestCase {
// Get a third exec state - should have a new revision and report unsafe since execution is in progress
executionState = policyLocks.captureExecutionState();
assertThat(executionState.arePoliciesInFlight, is(true));
assertThat(executionState.anyPolicyInFlight, is(true));
assertThat(executionState.executions, is(1L));
// Unlock the policy
@ -66,13 +66,13 @@ public class EnrichPolicyLocksTests extends ESTestCase {
// Get a fourth exec state - should have the same revision as third, and report no policies in flight since the previous execution
// is complete
executionState = policyLocks.captureExecutionState();
assertThat(executionState.arePoliciesInFlight, is(false));
assertThat(executionState.anyPolicyInFlight, is(false));
assertThat(executionState.executions, is(1L));
// Create a fifth exec state, lock and release a policy, and check if the captured exec state is the same as the current state in
// the lock object
executionState = policyLocks.captureExecutionState();
assertThat(executionState.arePoliciesInFlight, is(false));
assertThat(executionState.anyPolicyInFlight, is(false));
assertThat(executionState.executions, is(1L));
policyLocks.lockPolicy(policy);
policyLocks.releasePolicy(policy);

View File

@ -3,31 +3,22 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.xpack.enrich.EnrichPolicyTests.randomEnrichPolicy;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
public class EnrichStoreTests extends ESSingleNodeTestCase {
@Override
protected Collection<Class<? extends Plugin>> getPlugins() {
return Collections.singletonList(LocalStateEnrich.class);
}
public class EnrichStoreCrudTests extends AbstractEnrichTestCase {
public void testCrud() throws Exception {
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
@ -137,29 +128,4 @@ public class EnrichStoreTests extends ESSingleNodeTestCase {
Map<String, EnrichPolicy> policies = EnrichStore.getPolicies(clusterService.state());
assertTrue(policies.isEmpty());
}
private AtomicReference<Exception> saveEnrichPolicy(String name, EnrichPolicy policy,
ClusterService clusterService) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>();
EnrichStore.putPolicy(name, policy, clusterService, e -> {
error.set(e);
latch.countDown();
});
latch.await();
return error;
}
private void deleteEnrichPolicy(String name, ClusterService clusterService) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> error = new AtomicReference<>();
EnrichStore.deletePolicy(name, clusterService, e -> {
error.set(e);
latch.countDown();
});
latch.await();
if (error.get() != null){
throw error.get();
}
}
}

View File

@ -0,0 +1,121 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.enrich.action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
import org.elasticsearch.xpack.core.enrich.action.DeleteEnrichPolicyAction;
import org.elasticsearch.xpack.enrich.AbstractEnrichTestCase;
import org.elasticsearch.xpack.enrich.EnrichPolicyLocks;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.xpack.enrich.EnrichPolicyTests.randomEnrichPolicy;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
public class TransportDeleteEnricyPolicyActionTests extends AbstractEnrichTestCase {
public void testDeleteIsNotLocked() throws InterruptedException {
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
String name = "my-policy";
AtomicReference<Exception> error = saveEnrichPolicy(name, policy, clusterService);
assertThat(error.get(), nullValue());
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();
final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
transportAction.execute(null,
new DeleteEnrichPolicyAction.Request(name),
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
reference.set(acknowledgedResponse);
latch.countDown();
}
public void onFailure(final Exception e) {
fail();
}
});
latch.await();
assertNotNull(reference.get());
assertTrue(reference.get().isAcknowledged());
}
public void testDeleteLocked() throws InterruptedException {
EnrichPolicy policy = randomEnrichPolicy(XContentType.JSON);
ClusterService clusterService = getInstanceFromNode(ClusterService.class);
String name = "my-policy";
final TransportDeleteEnrichPolicyAction transportAction = node().injector().getInstance(TransportDeleteEnrichPolicyAction.class);
AtomicReference<Exception> error = saveEnrichPolicy(name, policy, clusterService);
assertThat(error.get(), nullValue());
EnrichPolicyLocks enrichPolicyLocks = getInstanceFromNode(EnrichPolicyLocks.class);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
enrichPolicyLocks.lockPolicy(name);
assertTrue(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
{
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> reference = new AtomicReference<>();
transportAction.execute(null,
new DeleteEnrichPolicyAction.Request(name),
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
fail();
}
public void onFailure(final Exception e) {
reference.set(e);
latch.countDown();
}
});
latch.await();
assertNotNull(reference.get());
assertThat(reference.get(), instanceOf(EsRejectedExecutionException.class));
assertThat(reference.get().getMessage(),
equalTo("Could not obtain lock because policy execution for [my-policy] is already in progress."));
}
{
enrichPolicyLocks.releasePolicy(name);
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<AcknowledgedResponse> reference = new AtomicReference<>();
transportAction.execute(null,
new DeleteEnrichPolicyAction.Request(name),
new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
reference.set(acknowledgedResponse);
latch.countDown();
}
public void onFailure(final Exception e) {
fail();
}
});
latch.await();
assertNotNull(reference.get());
assertTrue(reference.get().isAcknowledged());
assertFalse(enrichPolicyLocks.captureExecutionState().isAnyPolicyInFlight());
}
}
}