From c357f81aa79a3df079e87fea022f747737ec0eb3 Mon Sep 17 00:00:00 2001 From: James Baiera Date: Tue, 23 Jul 2019 15:10:21 -0400 Subject: [PATCH] Add soft limit for max concurrent policy executions (#43117) Adds a global soft limit on the number of concurrently executing enrich policies. Since an enrich policy is run on the generic thread pool, this is meant to limit policy runs separately from the generic thread pool capacity. --- .../xpack/enrich/EnrichPlugin.java | 6 ++- .../xpack/enrich/EnrichPolicyExecutor.java | 32 +++++++++++-- .../enrich/EnrichPolicyExecutorTests.java | 48 +++++++++++++++++++ 3 files changed, 81 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java index 4ee36ceb339..aee799d9130 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPlugin.java @@ -65,7 +65,10 @@ import static org.elasticsearch.xpack.core.XPackSettings.ENRICH_ENABLED_SETTING; public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin { static final Setting ENRICH_FETCH_SIZE_SETTING = - Setting.intSetting("index.xpack.enrich.fetch_size", 10000, 1, 1000000, Setting.Property.NodeScope); + Setting.intSetting("enrich.fetch_size", 10000, 1, 1000000, Setting.Property.NodeScope); + + static final Setting ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS = + Setting.intSetting("enrich.max_concurrent_policy_executions", 50, 1, Setting.Property.NodeScope); static final Setting ENRICH_CLEANUP_PERIOD = Setting.timeSetting("enrich.cleanup_period", new TimeValue(15, TimeUnit.MINUTES), Setting.Property.NodeScope); @@ -173,6 +176,7 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin { public List> getSettings() { return Arrays.asList( ENRICH_FETCH_SIZE_SETTING, + ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS, ENRICH_CLEANUP_PERIOD, COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS, COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST, diff --git a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java index 31d01bc2329..d1b3c8606c4 100644 --- a/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java +++ b/x-pack/plugin/enrich/src/main/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutor.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.enrich; +import java.util.concurrent.Semaphore; import java.util.function.LongSupplier; import org.elasticsearch.action.ActionListener; @@ -13,6 +14,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.enrich.EnrichPolicy; @@ -25,6 +27,8 @@ public class EnrichPolicyExecutor { private final LongSupplier nowSupplier; private final int fetchSize; private final EnrichPolicyLocks policyLocks; + private final int maximumConcurrentPolicyExecutions; + private final Semaphore policyExecutionPermits; EnrichPolicyExecutor(Settings settings, ClusterService clusterService, @@ -40,6 +44,26 @@ public class EnrichPolicyExecutor { this.nowSupplier = nowSupplier; this.policyLocks = policyLocks; this.fetchSize = EnrichPlugin.ENRICH_FETCH_SIZE_SETTING.get(settings); + this.maximumConcurrentPolicyExecutions = EnrichPlugin.ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS.get(settings); + this.policyExecutionPermits = new Semaphore(maximumConcurrentPolicyExecutions); + } + + private void tryLockingPolicy(String policyName) { + policyLocks.lockPolicy(policyName); + if (policyExecutionPermits.tryAcquire() == false) { + // Release policy lock, and throw a different exception + policyLocks.releasePolicy(policyName); + throw new EsRejectedExecutionException("Policy execution failed. Policy execution for [" + policyName + "] would exceed " + + "maximum concurrent policy executions [" + maximumConcurrentPolicyExecutions + "]"); + } + } + + private void releasePolicy(String policyName) { + try { + policyExecutionPermits.release(); + } finally { + policyLocks.releasePolicy(policyName); + } } private class PolicyUnlockingListener implements ActionListener { @@ -53,13 +77,13 @@ public class EnrichPolicyExecutor { @Override public void onResponse(PolicyExecutionResult policyExecutionResult) { - policyLocks.releasePolicy(policyName); + releasePolicy(policyName); listener.onResponse(policyExecutionResult); } @Override public void onFailure(Exception e) { - policyLocks.releasePolicy(policyName); + releasePolicy(policyName); listener.onFailure(e); } } @@ -80,13 +104,13 @@ public class EnrichPolicyExecutor { } public void runPolicy(String policyName, EnrichPolicy policy, ActionListener listener) { - policyLocks.lockPolicy(policyName); + tryLockingPolicy(policyName); try { Runnable runnable = createPolicyRunner(policyName, policy, new PolicyUnlockingListener(policyName, listener)); threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable); } catch (Exception e) { // Be sure to unlock if submission failed. - policyLocks.releasePolicy(policyName); + releasePolicy(policyName); throw e; } } diff --git a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java index 22509b7368d..e9546b94bd8 100644 --- a/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java +++ b/x-pack/plugin/enrich/src/test/java/org/elasticsearch/xpack/enrich/EnrichPolicyExecutorTests.java @@ -137,4 +137,52 @@ public class EnrichPolicyExecutorTests extends ESTestCase { secondTaskBlock.countDown(); secondTaskComplete.await(); } + + public void testMaximumPolicyExecutionLimit() throws InterruptedException { + String testPolicyBaseName = "test_policy_"; + Settings testSettings = Settings.builder().put(EnrichPlugin.ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS.getKey(), 2).build(); + EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("some_index"), "keyfield", + Collections.singletonList("valuefield")); + final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(testSettings, null, null, testThreadPool, + new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong); + + // Launch a two fake policy runs that will block until counted down to use up the maximum concurrent + final CountDownLatch firstTaskComplete = new CountDownLatch(1); + final CountDownLatch firstTaskBlock = testExecutor.testRunPolicy(testPolicyBaseName + "1", testPolicy, + new LatchedActionListener<>(noOpListener, firstTaskComplete)); + + final CountDownLatch secondTaskComplete = new CountDownLatch(1); + final CountDownLatch secondTaskBlock = testExecutor.testRunPolicy(testPolicyBaseName + "2", testPolicy, + new LatchedActionListener<>(noOpListener, secondTaskComplete)); + + // Launch a third fake run that should fail immediately because the lock is obtained. + EsRejectedExecutionException expected = expectThrows(EsRejectedExecutionException.class, + "Expected exception but nothing was thrown", () -> { + CountDownLatch countDownLatch = testExecutor.testRunPolicy(testPolicyBaseName + "3", testPolicy, noOpListener); + // Should throw exception on the previous statement, but if it doesn't, be a + // good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions + countDownLatch.countDown(); + firstTaskBlock.countDown(); + secondTaskBlock.countDown(); + firstTaskComplete.await(); + secondTaskComplete.await(); + }); + + // Conclude the first mock run + firstTaskBlock.countDown(); + secondTaskBlock.countDown(); + firstTaskComplete.await(); + secondTaskComplete.await(); + + // Validate exception from second run + assertThat(expected.getMessage(), containsString("Policy execution failed. Policy execution for [test_policy_3] would exceed " + + "maximum concurrent policy executions [2]")); + + // Ensure that the lock from the previous run has been cleared + CountDownLatch finalTaskComplete = new CountDownLatch(1); + CountDownLatch finalTaskBlock = testExecutor.testRunPolicy(testPolicyBaseName + "1", testPolicy, + new LatchedActionListener<>(noOpListener, finalTaskComplete)); + finalTaskBlock.countDown(); + finalTaskComplete.await(); + } }