Limit a enrich policy execution to only one at a time (#42535)
Add a keyed lock mechanism to the policy executor to ensure that an enrich policy can only have one execution happening at a time.
This commit is contained in:
parent
415f1a484f
commit
9d56a0365f
|
@ -6,14 +6,16 @@
|
|||
|
||||
package org.elasticsearch.xpack.enrich;
|
||||
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
||||
|
||||
|
@ -25,6 +27,7 @@ public class EnrichPolicyExecutor {
|
|||
private final IndexNameExpressionResolver indexNameExpressionResolver;
|
||||
private final LongSupplier nowSupplier;
|
||||
private final int fetchSize;
|
||||
private final ConcurrentHashMap<String, Semaphore> policyLocks = new ConcurrentHashMap<>();
|
||||
|
||||
EnrichPolicyExecutor(Settings settings,
|
||||
ClusterService clusterService,
|
||||
|
@ -40,19 +43,64 @@ public class EnrichPolicyExecutor {
|
|||
this.fetchSize = EnrichPlugin.ENRICH_FETCH_SIZE_SETTING.get(settings);
|
||||
}
|
||||
|
||||
private void tryLockingPolicy(String policyName) {
|
||||
Semaphore runLock = policyLocks.computeIfAbsent(policyName, (name) -> new Semaphore(1));
|
||||
if (runLock.tryAcquire() == false) {
|
||||
throw new EsRejectedExecutionException("Policy execution failed. Policy execution for [" + policyName +
|
||||
"] is already in progress.");
|
||||
}
|
||||
}
|
||||
|
||||
private void releasePolicy(String policyName) {
|
||||
policyLocks.remove(policyName);
|
||||
}
|
||||
|
||||
private class PolicyUnlockingListener implements ActionListener<PolicyExecutionResult> {
|
||||
private final String policyName;
|
||||
private final ActionListener<PolicyExecutionResult> listener;
|
||||
|
||||
PolicyUnlockingListener(String policyName, ActionListener<PolicyExecutionResult> listener) {
|
||||
this.policyName = policyName;
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onResponse(PolicyExecutionResult policyExecutionResult) {
|
||||
releasePolicy(policyName);
|
||||
listener.onResponse(policyExecutionResult);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
releasePolicy(policyName);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener) {
|
||||
return new EnrichPolicyRunner(policyName, policy, listener, clusterService, client, indexNameExpressionResolver, nowSupplier,
|
||||
fetchSize);
|
||||
}
|
||||
|
||||
public void runPolicy(String policyId, ActionListener<PolicyExecutionResult> listener) {
|
||||
// Look up policy in policy store and execute it
|
||||
EnrichPolicy policy = EnrichStore.getPolicy(policyId, clusterService.state());
|
||||
if (policy == null) {
|
||||
throw new ElasticsearchException("Policy execution failed. Could not locate policy with id [{}]", policyId);
|
||||
throw new IllegalArgumentException("Policy execution failed. Could not locate policy with id [" + policyId + "]");
|
||||
} else {
|
||||
runPolicy(policyId, policy, listener);
|
||||
}
|
||||
}
|
||||
|
||||
public void runPolicy(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener) {
|
||||
EnrichPolicyRunner runnable = new EnrichPolicyRunner(policyName, policy, listener, clusterService, client,
|
||||
indexNameExpressionResolver, nowSupplier, fetchSize);
|
||||
tryLockingPolicy(policyName);
|
||||
try {
|
||||
Runnable runnable = createPolicyRunner(policyName, policy, new PolicyUnlockingListener(policyName, listener));
|
||||
threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable);
|
||||
} catch (Exception e) {
|
||||
// Be sure to unlock if submission failed.
|
||||
releasePolicy(policyName);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,140 @@
|
|||
/*
|
||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
||||
* or more contributor license agreements. Licensed under the Elastic License;
|
||||
* you may not use this file except in compliance with the Elastic License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.xpack.enrich;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.LongSupplier;
|
||||
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.LatchedActionListener;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.TestThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
|
||||
public class EnrichPolicyExecutorTests extends ESTestCase {
|
||||
|
||||
private static ThreadPool testThreadPool;
|
||||
private static final ActionListener<PolicyExecutionResult> noOpListener = new ActionListener<PolicyExecutionResult>() {
|
||||
@Override
|
||||
public void onResponse(PolicyExecutionResult policyExecutionResult) { }
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) { }
|
||||
};
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeCLass() {
|
||||
testThreadPool = new TestThreadPool("EnrichPolicyExecutorTests");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() {
|
||||
ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* A policy runner drop-in replacement that just waits on a given countdown latch, and reports success after the latch is counted down.
|
||||
*/
|
||||
private static class BlockingTestPolicyRunner implements Runnable {
|
||||
private final CountDownLatch latch;
|
||||
private final ActionListener<PolicyExecutionResult> listener;
|
||||
|
||||
BlockingTestPolicyRunner(CountDownLatch latch, ActionListener<PolicyExecutionResult> listener) {
|
||||
this.latch = latch;
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
latch.await();
|
||||
listener.onResponse(new PolicyExecutionResult(true));
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException("Interrupted waiting for test framework to continue the test", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A mocked policy executor that accepts policy execution requests which block until the returned latch is decremented. Allows for
|
||||
* controlling the timing for "in flight" policy executions to test for correct locking logic.
|
||||
*/
|
||||
private static class EnrichPolicyTestExecutor extends EnrichPolicyExecutor {
|
||||
|
||||
EnrichPolicyTestExecutor(Settings settings, ClusterService clusterService, Client client, ThreadPool threadPool,
|
||||
IndexNameExpressionResolver indexNameExpressionResolver, LongSupplier nowSupplier) {
|
||||
super(settings, clusterService, client, threadPool, indexNameExpressionResolver, nowSupplier);
|
||||
}
|
||||
|
||||
private CountDownLatch currentLatch;
|
||||
CountDownLatch testRunPolicy(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener) {
|
||||
currentLatch = new CountDownLatch(1);
|
||||
runPolicy(policyName, policy, listener);
|
||||
return currentLatch;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener) {
|
||||
if (currentLatch == null) {
|
||||
throw new IllegalStateException("Use the testRunPolicy method on this test instance");
|
||||
}
|
||||
return new BlockingTestPolicyRunner(currentLatch, listener);
|
||||
}
|
||||
}
|
||||
|
||||
public void testNonConcurrentPolicyExecution() throws InterruptedException {
|
||||
String testPolicyName = "test_policy";
|
||||
EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Arrays.asList("some_index"), "keyfield",
|
||||
Collections.singletonList("valuefield"));
|
||||
final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(Settings.EMPTY, null, null, testThreadPool,
|
||||
new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong);
|
||||
|
||||
// Launch a fake policy run that will block until firstTaskBlock is counted down.
|
||||
final CountDownLatch firstTaskComplete = new CountDownLatch(1);
|
||||
final CountDownLatch firstTaskBlock = testExecutor.testRunPolicy(testPolicyName, testPolicy,
|
||||
new LatchedActionListener<>(noOpListener, firstTaskComplete));
|
||||
|
||||
// Launch a second fake run that should fail immediately because the lock is obtained.
|
||||
EsRejectedExecutionException expected = expectThrows(EsRejectedExecutionException.class,
|
||||
"Expected exception but nothing was thrown", () -> {
|
||||
CountDownLatch countDownLatch = testExecutor.testRunPolicy(testPolicyName, testPolicy, noOpListener);
|
||||
// Should throw exception on the previous statement, but if it doesn't, be a
|
||||
// good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions
|
||||
countDownLatch.countDown();
|
||||
firstTaskBlock.countDown();
|
||||
firstTaskComplete.await();
|
||||
});
|
||||
|
||||
// Conclude the first mock run
|
||||
firstTaskBlock.countDown();
|
||||
firstTaskComplete.await();
|
||||
|
||||
// Validate exception from second run
|
||||
assertThat(expected.getMessage(), containsString("Policy execution failed. Policy execution for [" + testPolicyName +
|
||||
"] is already in progress."));
|
||||
|
||||
// Ensure that the lock from the previous run has been cleared
|
||||
CountDownLatch secondTaskComplete = new CountDownLatch(1);
|
||||
CountDownLatch secondTaskBlock = testExecutor.testRunPolicy(testPolicyName, testPolicy,
|
||||
new LatchedActionListener<>(noOpListener, secondTaskComplete));
|
||||
secondTaskBlock.countDown();
|
||||
secondTaskComplete.await();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue