diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java index 4a979007ac5..40218030f92 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java @@ -6,14 +6,16 @@ package org.elasticsearch.xpack.enrich; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; import java.util.function.LongSupplier; -import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; @@ -25,6 +27,7 @@ public class EnrichPolicyExecutor { private final IndexNameExpressionResolver indexNameExpressionResolver; private final LongSupplier nowSupplier; private final int fetchSize; + private final ConcurrentHashMap policyLocks = new ConcurrentHashMap<>(); EnrichPolicyExecutor(Settings settings, ClusterService clusterService, @@ -40,19 +43,64 @@ public class EnrichPolicyExecutor { this.fetchSize = EnrichPlugin.ENRICH_FETCH_SIZE_SETTING.get(settings); } + private void tryLockingPolicy(String policyName) { + Semaphore runLock = policyLocks.computeIfAbsent(policyName, (name) -> new Semaphore(1)); + if (runLock.tryAcquire() == false) { + throw new EsRejectedExecutionException("Policy execution failed. Policy execution for [" + policyName + + "] is already in progress."); + } + } + + private void releasePolicy(String policyName) { + policyLocks.remove(policyName); + } + + private class PolicyUnlockingListener implements ActionListener { + private final String policyName; + private final ActionListener listener; + + PolicyUnlockingListener(String policyName, ActionListener listener) { + this.policyName = policyName; + this.listener = listener; + } + + @Override + public void onResponse(PolicyExecutionResult policyExecutionResult) { + releasePolicy(policyName); + listener.onResponse(policyExecutionResult); + } + + @Override + public void onFailure(Exception e) { + releasePolicy(policyName); + listener.onFailure(e); + } + } + + protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ActionListener listener) { + return new EnrichPolicyRunner(policyName, policy, listener, clusterService, client, indexNameExpressionResolver, nowSupplier, + fetchSize); + } + public void runPolicy(String policyId, ActionListener listener) { // Look up policy in policy store and execute it EnrichPolicy policy = EnrichStore.getPolicy(policyId, clusterService.state()); if (policy == null) { - throw new ElasticsearchException("Policy execution failed. Could not locate policy with id [{}]", policyId); + throw new IllegalArgumentException("Policy execution failed. Could not locate policy with id [" + policyId + "]"); } else { runPolicy(policyId, policy, listener); } } public void runPolicy(String policyName, EnrichPolicy policy, ActionListener listener) { - EnrichPolicyRunner runnable = new EnrichPolicyRunner(policyName, policy, listener, clusterService, client, - indexNameExpressionResolver, nowSupplier, fetchSize); - threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable); + tryLockingPolicy(policyName); + try { + Runnable runnable = createPolicyRunner(policyName, policy, new PolicyUnlockingListener(policyName, listener)); + threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable); + } catch (Exception e) { + // Be sure to unlock if submission failed. + releasePolicy(policyName); + throw e; + } } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java new file mode 100644 index 00000000000..52be22473a1 --- /dev/null +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java @@ -0,0 +1,140 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.enrich; + +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.xpack.core.enrich.EnrichPolicy; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import static org.hamcrest.CoreMatchers.containsString; + +public class EnrichPolicyExecutorTests extends ESTestCase { + + private static ThreadPool testThreadPool; + private static final ActionListener noOpListener = new ActionListener() { + @Override + public void onResponse(PolicyExecutionResult policyExecutionResult) { } + + @Override + public void onFailure(Exception e) { } + }; + + @BeforeClass + public static void beforeCLass() { + testThreadPool = new TestThreadPool("EnrichPolicyExecutorTests"); + } + + @AfterClass + public static void afterClass() { + ThreadPool.terminate(testThreadPool, 30, TimeUnit.SECONDS); + } + + /** + * A policy runner drop-in replacement that just waits on a given countdown latch, and reports success after the latch is counted down. + */ + private static class BlockingTestPolicyRunner implements Runnable { + private final CountDownLatch latch; + private final ActionListener listener; + + BlockingTestPolicyRunner(CountDownLatch latch, ActionListener listener) { + this.latch = latch; + this.listener = listener; + } + + @Override + public void run() { + try { + latch.await(); + listener.onResponse(new PolicyExecutionResult(true)); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted waiting for test framework to continue the test", e); + } + } + } + + /** + * A mocked policy executor that accepts policy execution requests which block until the returned latch is decremented. Allows for + * controlling the timing for "in flight" policy executions to test for correct locking logic. + */ + private static class EnrichPolicyTestExecutor extends EnrichPolicyExecutor { + + EnrichPolicyTestExecutor(Settings settings, ClusterService clusterService, Client client, ThreadPool threadPool, + IndexNameExpressionResolver indexNameExpressionResolver, LongSupplier nowSupplier) { + super(settings, clusterService, client, threadPool, indexNameExpressionResolver, nowSupplier); + } + + private CountDownLatch currentLatch; + CountDownLatch testRunPolicy(String policyName, EnrichPolicy policy, ActionListener listener) { + currentLatch = new CountDownLatch(1); + runPolicy(policyName, policy, listener); + return currentLatch; + } + + @Override + protected Runnable createPolicyRunner(String policyName, EnrichPolicy policy, ActionListener listener) { + if (currentLatch == null) { + throw new IllegalStateException("Use the testRunPolicy method on this test instance"); + } + return new BlockingTestPolicyRunner(currentLatch, listener); + } + } + + public void testNonConcurrentPolicyExecution() throws InterruptedException { + String testPolicyName = "test_policy"; + EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Arrays.asList("some_index"), "keyfield", + Collections.singletonList("valuefield")); + final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(Settings.EMPTY, null, null, testThreadPool, + new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong); + + // Launch a fake policy run that will block until firstTaskBlock is counted down. + final CountDownLatch firstTaskComplete = new CountDownLatch(1); + final CountDownLatch firstTaskBlock = testExecutor.testRunPolicy(testPolicyName, testPolicy, + new LatchedActionListener<>(noOpListener, firstTaskComplete)); + + // Launch a second fake run that should fail immediately because the lock is obtained. + EsRejectedExecutionException expected = expectThrows(EsRejectedExecutionException.class, + "Expected exception but nothing was thrown", () -> { + CountDownLatch countDownLatch = testExecutor.testRunPolicy(testPolicyName, testPolicy, noOpListener); + // Should throw exception on the previous statement, but if it doesn't, be a + // good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions + countDownLatch.countDown(); + firstTaskBlock.countDown(); + firstTaskComplete.await(); + }); + + // Conclude the first mock run + firstTaskBlock.countDown(); + firstTaskComplete.await(); + + // Validate exception from second run + assertThat(expected.getMessage(), containsString("Policy execution failed. Policy execution for [" + testPolicyName + + "] is already in progress.")); + + // Ensure that the lock from the previous run has been cleared + CountDownLatch secondTaskComplete = new CountDownLatch(1); + CountDownLatch secondTaskBlock = testExecutor.testRunPolicy(testPolicyName, testPolicy, + new LatchedActionListener<>(noOpListener, secondTaskComplete)); + secondTaskBlock.countDown(); + secondTaskComplete.await(); + } +}