Add soft limit for max concurrent policy executions (#43117)

Adds a global soft limit on the number of concurrently executing enrich policies.
Since an enrich policy is run on the generic thread pool, this is meant to limit
policy runs separately from the generic thread pool capacity.
This commit is contained in:
James Baiera 2019-07-23 15:10:21 -04:00
parent fc20264b99
commit c357f81aa7
3 changed files with 81 additions and 5 deletions

View File

@ -65,7 +65,10 @@ import static org.elasticsearch.xpack.core.XPackSettings.ENRICH_ENABLED_SETTING;
public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
static final Setting<Integer> ENRICH_FETCH_SIZE_SETTING =
Setting.intSetting("index.xpack.enrich.fetch_size", 10000, 1, 1000000, Setting.Property.NodeScope);
Setting.intSetting("enrich.fetch_size", 10000, 1, 1000000, Setting.Property.NodeScope);
static final Setting<Integer> ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS =
Setting.intSetting("enrich.max_concurrent_policy_executions", 50, 1, Setting.Property.NodeScope);
static final Setting<TimeValue> ENRICH_CLEANUP_PERIOD =
Setting.timeSetting("enrich.cleanup_period", new TimeValue(15, TimeUnit.MINUTES), Setting.Property.NodeScope);
@ -173,6 +176,7 @@ public class EnrichPlugin extends Plugin implements ActionPlugin, IngestPlugin {
public List<Setting<?>> getSettings() {
return Arrays.asList(
ENRICH_FETCH_SIZE_SETTING,
ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS,
ENRICH_CLEANUP_PERIOD,
COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS,
COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST,

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.enrich;
import java.util.concurrent.Semaphore;
import java.util.function.LongSupplier;
import org.elasticsearch.action.ActionListener;
@ -13,6 +14,7 @@ import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
@ -25,6 +27,8 @@ public class EnrichPolicyExecutor {
private final LongSupplier nowSupplier;
private final int fetchSize;
private final EnrichPolicyLocks policyLocks;
private final int maximumConcurrentPolicyExecutions;
private final Semaphore policyExecutionPermits;
EnrichPolicyExecutor(Settings settings,
ClusterService clusterService,
@ -40,6 +44,26 @@ public class EnrichPolicyExecutor {
this.nowSupplier = nowSupplier;
this.policyLocks = policyLocks;
this.fetchSize = EnrichPlugin.ENRICH_FETCH_SIZE_SETTING.get(settings);
this.maximumConcurrentPolicyExecutions = EnrichPlugin.ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS.get(settings);
this.policyExecutionPermits = new Semaphore(maximumConcurrentPolicyExecutions);
}
private void tryLockingPolicy(String policyName) {
policyLocks.lockPolicy(policyName);
if (policyExecutionPermits.tryAcquire() == false) {
// Release policy lock, and throw a different exception
policyLocks.releasePolicy(policyName);
throw new EsRejectedExecutionException("Policy execution failed. Policy execution for [" + policyName + "] would exceed " +
"maximum concurrent policy executions [" + maximumConcurrentPolicyExecutions + "]");
}
}
private void releasePolicy(String policyName) {
try {
policyExecutionPermits.release();
} finally {
policyLocks.releasePolicy(policyName);
}
}
private class PolicyUnlockingListener implements ActionListener<PolicyExecutionResult> {
@ -53,13 +77,13 @@ public class EnrichPolicyExecutor {
@Override
public void onResponse(PolicyExecutionResult policyExecutionResult) {
policyLocks.releasePolicy(policyName);
releasePolicy(policyName);
listener.onResponse(policyExecutionResult);
}
@Override
public void onFailure(Exception e) {
policyLocks.releasePolicy(policyName);
releasePolicy(policyName);
listener.onFailure(e);
}
}
@ -80,13 +104,13 @@ public class EnrichPolicyExecutor {
}
public void runPolicy(String policyName, EnrichPolicy policy, ActionListener<PolicyExecutionResult> listener) {
policyLocks.lockPolicy(policyName);
tryLockingPolicy(policyName);
try {
Runnable runnable = createPolicyRunner(policyName, policy, new PolicyUnlockingListener(policyName, listener));
threadPool.executor(ThreadPool.Names.GENERIC).execute(runnable);
} catch (Exception e) {
// Be sure to unlock if submission failed.
policyLocks.releasePolicy(policyName);
releasePolicy(policyName);
throw e;
}
}

View File

@ -137,4 +137,52 @@ public class EnrichPolicyExecutorTests extends ESTestCase {
secondTaskBlock.countDown();
secondTaskComplete.await();
}
public void testMaximumPolicyExecutionLimit() throws InterruptedException {
String testPolicyBaseName = "test_policy_";
Settings testSettings = Settings.builder().put(EnrichPlugin.ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS.getKey(), 2).build();
EnrichPolicy testPolicy = new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, Collections.singletonList("some_index"), "keyfield",
Collections.singletonList("valuefield"));
final EnrichPolicyTestExecutor testExecutor = new EnrichPolicyTestExecutor(testSettings, null, null, testThreadPool,
new IndexNameExpressionResolver(), ESTestCase::randomNonNegativeLong);
// Launch a two fake policy runs that will block until counted down to use up the maximum concurrent
final CountDownLatch firstTaskComplete = new CountDownLatch(1);
final CountDownLatch firstTaskBlock = testExecutor.testRunPolicy(testPolicyBaseName + "1", testPolicy,
new LatchedActionListener<>(noOpListener, firstTaskComplete));
final CountDownLatch secondTaskComplete = new CountDownLatch(1);
final CountDownLatch secondTaskBlock = testExecutor.testRunPolicy(testPolicyBaseName + "2", testPolicy,
new LatchedActionListener<>(noOpListener, secondTaskComplete));
// Launch a third fake run that should fail immediately because the lock is obtained.
EsRejectedExecutionException expected = expectThrows(EsRejectedExecutionException.class,
"Expected exception but nothing was thrown", () -> {
CountDownLatch countDownLatch = testExecutor.testRunPolicy(testPolicyBaseName + "3", testPolicy, noOpListener);
// Should throw exception on the previous statement, but if it doesn't, be a
// good citizen and conclude the fake runs to keep the logs clean from interrupted exceptions
countDownLatch.countDown();
firstTaskBlock.countDown();
secondTaskBlock.countDown();
firstTaskComplete.await();
secondTaskComplete.await();
});
// Conclude the first mock run
firstTaskBlock.countDown();
secondTaskBlock.countDown();
firstTaskComplete.await();
secondTaskComplete.await();
// Validate exception from second run
assertThat(expected.getMessage(), containsString("Policy execution failed. Policy execution for [test_policy_3] would exceed " +
"maximum concurrent policy executions [2]"));
// Ensure that the lock from the previous run has been cleared
CountDownLatch finalTaskComplete = new CountDownLatch(1);
CountDownLatch finalTaskBlock = testExecutor.testRunPolicy(testPolicyBaseName + "1", testPolicy,
new LatchedActionListener<>(noOpListener, finalTaskComplete));
finalTaskBlock.countDown();
finalTaskComplete.await();
}
}