From 576a543a2811e276d3a39bac1cfd5a7aafc086d0 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Mon, 6 Jun 2016 22:09:58 -0400 Subject: [PATCH] Register watcher thread pool This commit register the watcher thread pool in the thread pool module in core, and also makes the necessary changes to reflect a refactoring that took place in core. Relates elastic/elasticsearch#2397 Original commit: elastic/x-pack-elasticsearch@be298a7578f6eeb9731bd27aaa5afceb94ffcdfc --- .../messy/tests/GroovyScriptConditionIT.java | 3 +- .../messy/tests/ScriptConditionSearchIT.java | 3 +- .../messy/tests/ScriptConditionTests.java | 3 +- .../TransportMonitoringBulkActionTests.java | 3 +- .../marvel/cleaner/CleanerServiceTests.java | 3 +- .../shield/audit/AuditTrailModuleTests.java | 3 +- .../index/IndexAuditTrailMutedTests.java | 3 +- .../audit/index/IndexAuditTrailTests.java | 3 +- .../IndexAuditTrailUpdateMappingTests.java | 3 +- .../ActiveDirectoryRealmTests.java | 3 +- .../authc/file/FileUserPasswdStoreTests.java | 3 +- .../authc/file/FileUserRolesStoreTests.java | 5 +- .../shield/authc/ldap/LdapRealmTests.java | 3 +- .../authc/support/DnRoleMapperTests.java | 3 +- .../authz/store/FileRolesStoreTests.java | 3 +- .../crypto/InternalCryptoServiceTests.java | 3 +- .../shield/ssl/SSLConfigurationTests.java | 17 ++--- .../SelfReschedulingRunnableTests.java | 5 +- .../org/elasticsearch/xpack/XPackPlugin.java | 8 +++ .../elasticsearch/xpack/watcher/Watcher.java | 28 ++++++--- .../execution/InternalWatchExecutor.java | 21 +------ .../xpack/watcher/history/HistoryModule.java | 3 - .../support/ThreadPoolSettingsBuilder.java | 62 ------------------- .../script/ScriptConditionSearchTests.java | 3 +- .../script/ScriptConditionTests.java | 3 +- .../script/ScriptTransformTests.java | 3 +- 26 files changed, 80 insertions(+), 123 deletions(-) delete mode 100644 elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/ThreadPoolSettingsBuilder.java diff --git a/elasticsearch/qa/messy-test-watcher-with-groovy/src/test/java/org/elasticsearch/messy/tests/GroovyScriptConditionIT.java b/elasticsearch/qa/messy-test-watcher-with-groovy/src/test/java/org/elasticsearch/messy/tests/GroovyScriptConditionIT.java index f286832382e..bd751eed42f 100644 --- a/elasticsearch/qa/messy-test-watcher-with-groovy/src/test/java/org/elasticsearch/messy/tests/GroovyScriptConditionIT.java +++ b/elasticsearch/qa/messy-test-watcher-with-groovy/src/test/java/org/elasticsearch/messy/tests/GroovyScriptConditionIT.java @@ -11,6 +11,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.groovy.GroovyPlugin; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.watcher.condition.script.ExecutableScriptCondition; import org.elasticsearch.xpack.watcher.condition.script.ScriptCondition; @@ -49,7 +50,7 @@ public class GroovyScriptConditionIT extends AbstractWatcherIntegrationTestCase @BeforeClass public static void startThreadPool() { - THREAD_POOL = new ThreadPool(GroovyScriptConditionIT.class.getSimpleName()); + THREAD_POOL = new TestThreadPool(GroovyScriptConditionIT.class.getSimpleName()); } @Before diff --git a/elasticsearch/qa/messy-test-watcher-with-groovy/src/test/java/org/elasticsearch/messy/tests/ScriptConditionSearchIT.java b/elasticsearch/qa/messy-test-watcher-with-groovy/src/test/java/org/elasticsearch/messy/tests/ScriptConditionSearchIT.java index 042de64dfb3..d467b68a68c 100644 --- a/elasticsearch/qa/messy-test-watcher-with-groovy/src/test/java/org/elasticsearch/messy/tests/ScriptConditionSearchIT.java +++ b/elasticsearch/qa/messy-test-watcher-with-groovy/src/test/java/org/elasticsearch/messy/tests/ScriptConditionSearchIT.java @@ -18,6 +18,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.watcher.condition.script.ExecutableScriptCondition; import org.elasticsearch.xpack.watcher.condition.script.ScriptCondition; @@ -50,7 +51,7 @@ public class ScriptConditionSearchIT extends AbstractWatcherIntegrationTestCase @Before public void init() throws Exception { - tp = new ThreadPool(ThreadPool.Names.SAME); + tp = new TestThreadPool(ThreadPool.Names.SAME); scriptService = MessyTestUtils.getScriptServiceProxy(tp); } diff --git a/elasticsearch/qa/messy-test-watcher-with-groovy/src/test/java/org/elasticsearch/messy/tests/ScriptConditionTests.java b/elasticsearch/qa/messy-test-watcher-with-groovy/src/test/java/org/elasticsearch/messy/tests/ScriptConditionTests.java index 500d35e70da..9c3679edc47 100644 --- a/elasticsearch/qa/messy-test-watcher-with-groovy/src/test/java/org/elasticsearch/messy/tests/ScriptConditionTests.java +++ b/elasticsearch/qa/messy-test-watcher-with-groovy/src/test/java/org/elasticsearch/messy/tests/ScriptConditionTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.script.GeneralScriptException; import org.elasticsearch.script.ScriptService.ScriptType; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.watcher.condition.Condition; import org.elasticsearch.xpack.watcher.condition.script.ExecutableScriptCondition; @@ -49,7 +50,7 @@ public class ScriptConditionTests extends ESTestCase { @Before public void init() { - tp = new ThreadPool(ThreadPool.Names.SAME); + tp = new TestThreadPool(ThreadPool.Names.SAME); } @After diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/TransportMonitoringBulkActionTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/TransportMonitoringBulkActionTests.java index f9876038b73..d37e19023c3 100644 --- a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/TransportMonitoringBulkActionTests.java +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/action/TransportMonitoringBulkActionTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.marvel.agent.exporter.Exporters; import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.transport.CapturingTransport; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import org.junit.After; @@ -74,7 +75,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase { @BeforeClass public static void beforeClass() { - threadPool = new ThreadPool(TransportMonitoringBulkActionTests.class.getSimpleName()); + threadPool = new TestThreadPool(TransportMonitoringBulkActionTests.class.getSimpleName()); } @AfterClass diff --git a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/cleaner/CleanerServiceTests.java b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/cleaner/CleanerServiceTests.java index 97b3e7da384..769ee62f4a3 100644 --- a/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/cleaner/CleanerServiceTests.java +++ b/elasticsearch/x-pack/marvel/src/test/java/org/elasticsearch/marvel/cleaner/CleanerServiceTests.java @@ -11,6 +11,7 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.marvel.MonitoringSettings; import org.elasticsearch.marvel.MonitoringLicensee; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -40,7 +41,7 @@ public class CleanerServiceTests extends ESTestCase { @Before public void start() { clusterSettings = new ClusterSettings(Settings.EMPTY, Collections.singleton(MonitoringSettings.HISTORY_DURATION)); - threadPool = new ThreadPool("CleanerServiceTests"); + threadPool = new TestThreadPool("CleanerServiceTests"); } @After diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/AuditTrailModuleTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/AuditTrailModuleTests.java index 13581354c22..46bb410fca3 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/AuditTrailModuleTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/AuditTrailModuleTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.indices.breaker.CircuitBreakerModule; import org.elasticsearch.shield.audit.logfile.LoggingAuditTrail; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPoolModule; import org.elasticsearch.transport.Transport; @@ -55,7 +56,7 @@ public class AuditTrailModuleTests extends ESTestCase { .put(AuditTrailModule.ENABLED_SETTING.getKey(), true) .put("client.type", "node") .build(); - ThreadPool pool = new ThreadPool("testLogFile"); + ThreadPool pool = new TestThreadPool("testLogFile"); try { SettingsModule settingsModule = new SettingsModule(settings); settingsModule.registerSetting(AuditTrailModule.ENABLED_SETTING); diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailMutedTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailMutedTests.java index a91bc3729b5..5e76ebd3921 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailMutedTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailMutedTests.java @@ -25,6 +25,7 @@ import org.elasticsearch.shield.transport.filter.ShieldIpFilterRule; import org.elasticsearch.shield.user.SystemUser; import org.elasticsearch.shield.user.User; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportMessage; @@ -58,7 +59,7 @@ public class IndexAuditTrailMutedTests extends ESTestCase { when(transport.boundAddress()).thenReturn(new BoundTransportAddress(new TransportAddress[] { DummyTransportAddress.INSTANCE }, DummyTransportAddress.INSTANCE)); - threadPool = new ThreadPool("index audit trail tests"); + threadPool = new TestThreadPool("index audit trail tests"); transportClient = TransportClient.builder().settings(Settings.EMPTY).build(); clientCalled = new AtomicBoolean(false); client = new InternalClient(transportClient) { diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailTests.java index be3a74d833f..066ee7b773e 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.ShieldIntegTestCase; import org.elasticsearch.test.ShieldSettingsSource; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportInfo; @@ -260,7 +261,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase { BoundTransportAddress boundTransportAddress = new BoundTransportAddress(new TransportAddress[]{DummyTransportAddress.INSTANCE}, DummyTransportAddress.INSTANCE); when(transport.boundAddress()).thenReturn(boundTransportAddress); - threadPool = new ThreadPool("index audit trail tests"); + threadPool = new TestThreadPool("index audit trail tests"); enqueuedMessage = new SetOnce<>(); auditor = new IndexAuditTrail(settings, transport, Providers.of(internalClient()), threadPool, mock(ClusterService.class)) { @Override diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailUpdateMappingTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailUpdateMappingTests.java index e87b7026fba..11b567456f0 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailUpdateMappingTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/audit/index/IndexAuditTrailUpdateMappingTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.test.ShieldIntegTestCase; import org.elasticsearch.test.rest.FakeRestRequest; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.junit.After; @@ -39,7 +40,7 @@ public class IndexAuditTrailUpdateMappingTests extends ShieldIntegTestCase { @Before public void setup() { - threadPool = new ThreadPool("index audit trail update mapping tests"); + threadPool = new TestThreadPool("index audit trail update mapping tests"); } public void testMappingIsUpdated() throws Exception { diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/activedirectory/ActiveDirectoryRealmTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/activedirectory/ActiveDirectoryRealmTests.java index 547807fc2cd..68b4186dd46 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/activedirectory/ActiveDirectoryRealmTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/activedirectory/ActiveDirectoryRealmTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.shield.authc.support.SecuredString; import org.elasticsearch.shield.authc.support.SecuredStringTests; import org.elasticsearch.shield.authc.support.UsernamePasswordToken; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.junit.After; @@ -92,7 +93,7 @@ public class ActiveDirectoryRealmTests extends ESTestCase { directoryServer.startListening(); directoryServers[i] = directoryServer; } - threadPool = new ThreadPool("active directory realm tests"); + threadPool = new TestThreadPool("active directory realm tests"); resourceWatcherService = new ResourceWatcherService(Settings.EMPTY, threadPool); globalSettings = Settings.builder().put("path.home", createTempDir()).build(); } diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/file/FileUserPasswdStoreTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/file/FileUserPasswdStoreTests.java index 72c261e0c08..cb4735eec51 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/file/FileUserPasswdStoreTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/file/FileUserPasswdStoreTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.shield.authc.support.Hasher; import org.elasticsearch.shield.authc.support.RefreshListener; import org.elasticsearch.shield.authc.support.SecuredStringTests; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.junit.After; @@ -62,7 +63,7 @@ public class FileUserPasswdStoreTests extends ESTestCase { .put("path.home", createTempDir()) .build(); env = new Environment(settings); - threadPool = new ThreadPool("test"); + threadPool = new TestThreadPool("test"); } @After diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/file/FileUserRolesStoreTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/file/FileUserRolesStoreTests.java index e74569c7eb0..09dfd3133e7 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/file/FileUserRolesStoreTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/file/FileUserRolesStoreTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.shield.audit.logfile.CapturingLogger; import org.elasticsearch.shield.authc.RealmConfig; import org.elasticsearch.shield.authc.support.RefreshListener; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.XPackPlugin; @@ -63,7 +64,7 @@ public class FileUserRolesStoreTests extends ESTestCase { .put("path.home", createTempDir()) .build(); env = new Environment(settings); - threadPool = new ThreadPool("test"); + threadPool = new TestThreadPool("test"); } @After @@ -224,7 +225,7 @@ public class FileUserRolesStoreTests extends ESTestCase { public void testParseFileEmptyRolesDoesNotCauseNPE() throws Exception { ThreadPool threadPool = null; try { - threadPool = new ThreadPool("test"); + threadPool = new TestThreadPool("test"); Path usersRoles = writeUsersRoles("role1:admin"); Settings settings = Settings.builder() diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/ldap/LdapRealmTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/ldap/LdapRealmTests.java index 7ab06210a42..4be76a5c787 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/ldap/LdapRealmTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/ldap/LdapRealmTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.shield.authc.support.SecuredString; import org.elasticsearch.shield.authc.support.SecuredStringTests; import org.elasticsearch.shield.authc.support.UsernamePasswordToken; import org.elasticsearch.shield.user.User; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.junit.After; @@ -49,7 +50,7 @@ public class LdapRealmTests extends LdapTestCase { @Before public void init() throws Exception { - threadPool = new ThreadPool("ldap realm tests"); + threadPool = new TestThreadPool("ldap realm tests"); resourceWatcherService = new ResourceWatcherService(Settings.EMPTY, threadPool); globalSettings = Settings.builder().put("path.home", createTempDir()).build(); } diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/support/DnRoleMapperTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/support/DnRoleMapperTests.java index 3dd07e2e15c..2ae0276059b 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/support/DnRoleMapperTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authc/support/DnRoleMapperTests.java @@ -13,6 +13,7 @@ import org.elasticsearch.shield.authc.RealmConfig; import org.elasticsearch.shield.authc.activedirectory.ActiveDirectoryRealm; import org.elasticsearch.shield.authc.ldap.LdapRealm; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.junit.After; @@ -69,7 +70,7 @@ public class DnRoleMapperTests extends ESTestCase { .put("path.home", createTempDir()) .build(); env = new Environment(settings); - threadPool = new ThreadPool("test"); + threadPool = new TestThreadPool("test"); } @After diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authz/store/FileRolesStoreTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authz/store/FileRolesStoreTests.java index 6dc6bbabd26..0a0ccfd206c 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authz/store/FileRolesStoreTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/authz/store/FileRolesStoreTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.shield.authz.permission.RunAsPermission; import org.elasticsearch.shield.authz.privilege.ClusterPrivilege; import org.elasticsearch.shield.authz.privilege.IndexPrivilege; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.xpack.XPackPlugin; @@ -257,7 +258,7 @@ public class FileRolesStoreTests extends ESTestCase { .build(); Environment env = new Environment(settings); - threadPool = new ThreadPool("test"); + threadPool = new TestThreadPool("test"); watcherService = new ResourceWatcherService(settings, threadPool); final CountDownLatch latch = new CountDownLatch(1); FileRolesStore store = new FileRolesStore(settings, env, watcherService, new RefreshListener() { diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/crypto/InternalCryptoServiceTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/crypto/InternalCryptoServiceTests.java index d97d5fcf095..ef6d5cc9404 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/crypto/InternalCryptoServiceTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/crypto/InternalCryptoServiceTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; import org.junit.After; @@ -53,7 +54,7 @@ public class InternalCryptoServiceTests extends ESTestCase { .put("path.home", createTempDir()) .build(); env = new Environment(settings); - threadPool = new ThreadPool("test"); + threadPool = new TestThreadPool("test"); watcherService = new ResourceWatcherService(settings, threadPool); watcherService.start(); } diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/ssl/SSLConfigurationTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/ssl/SSLConfigurationTests.java index f356e916728..9d0f4a76807 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/ssl/SSLConfigurationTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/ssl/SSLConfigurationTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.shield.ssl.SSLConfiguration.Custom; import org.elasticsearch.shield.ssl.SSLConfiguration.Global; import org.elasticsearch.shield.ssl.TrustConfig.Reloadable.Listener; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.watcher.ResourceWatcherService; @@ -321,7 +322,7 @@ public class SSLConfigurationTests extends ESTestCase { AtomicReference exceptionRef = new AtomicReference<>(); Listener listener = createRefreshListener(latch, exceptionRef); - ThreadPool threadPool = new ThreadPool("reload"); + ThreadPool threadPool = new TestThreadPool("reload"); try { ResourceWatcherService resourceWatcherService = new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start(); @@ -384,7 +385,7 @@ public class SSLConfigurationTests extends ESTestCase { AtomicReference exceptionRef = new AtomicReference<>(); Listener listener = createRefreshListener(latch, exceptionRef); - ThreadPool threadPool = new ThreadPool("reload pem"); + ThreadPool threadPool = new TestThreadPool("reload pem"); try { ResourceWatcherService resourceWatcherService = new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start(); @@ -460,7 +461,7 @@ public class SSLConfigurationTests extends ESTestCase { AtomicReference exceptionRef = new AtomicReference<>(); Listener listener = createRefreshListener(latch, exceptionRef); - ThreadPool threadPool = new ThreadPool("reload"); + ThreadPool threadPool = new TestThreadPool("reload"); try { ResourceWatcherService resourceWatcherService = new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start(); @@ -506,7 +507,7 @@ public class SSLConfigurationTests extends ESTestCase { AtomicReference exceptionRef = new AtomicReference<>(); Listener listener = createRefreshListener(latch, exceptionRef); - ThreadPool threadPool = new ThreadPool("reload"); + ThreadPool threadPool = new TestThreadPool("reload"); try { ResourceWatcherService resourceWatcherService = new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start(); @@ -554,7 +555,7 @@ public class SSLConfigurationTests extends ESTestCase { AtomicReference exceptionRef = new AtomicReference<>(); Listener listener = createRefreshListener(latch, exceptionRef); - ThreadPool threadPool = new ThreadPool("reload"); + ThreadPool threadPool = new TestThreadPool("reload"); try { ResourceWatcherService resourceWatcherService = new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start(); @@ -603,7 +604,7 @@ public class SSLConfigurationTests extends ESTestCase { AtomicReference exceptionRef = new AtomicReference<>(); Listener listener = createRefreshListener(latch, exceptionRef); - ThreadPool threadPool = new ThreadPool("reload pem"); + ThreadPool threadPool = new TestThreadPool("reload pem"); try { ResourceWatcherService resourceWatcherService = new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start(); @@ -654,7 +655,7 @@ public class SSLConfigurationTests extends ESTestCase { AtomicReference exceptionRef = new AtomicReference<>(); Listener listener = createRefreshListener(latch, exceptionRef); - ThreadPool threadPool = new ThreadPool("reload"); + ThreadPool threadPool = new TestThreadPool("reload"); try { ResourceWatcherService resourceWatcherService = new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start(); @@ -693,7 +694,7 @@ public class SSLConfigurationTests extends ESTestCase { AtomicReference exceptionRef = new AtomicReference<>(); Listener listener = createRefreshListener(latch, exceptionRef); - ThreadPool threadPool = new ThreadPool("reload"); + ThreadPool threadPool = new TestThreadPool("reload"); try { ResourceWatcherService resourceWatcherService = new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start(); diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/support/SelfReschedulingRunnableTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/support/SelfReschedulingRunnableTests.java index 04bb5e1f73e..d3d7cd21553 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/support/SelfReschedulingRunnableTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/shield/support/SelfReschedulingRunnableTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool.Names; @@ -189,7 +190,7 @@ public class SelfReschedulingRunnableTests extends ESTestCase { } public void testStopPreventsRunning() throws Exception { - final ThreadPool threadPool = new ThreadPool("test-stop-self-schedule"); + final ThreadPool threadPool = new TestThreadPool("test-stop-self-schedule"); final AtomicInteger failureCounter = new AtomicInteger(0); final AtomicInteger runCounter = new AtomicInteger(0); final AbstractRunnable runnable = new AbstractRunnable() { @@ -232,7 +233,7 @@ public class SelfReschedulingRunnableTests extends ESTestCase { } public void testStopPreventsRescheduling() throws Exception { - final ThreadPool threadPool = new ThreadPool("test-stop-self-schedule"); + final ThreadPool threadPool = new TestThreadPool("test-stop-self-schedule"); final CountDownLatch threadRunningLatch = new CountDownLatch(randomIntBetween(1, 16)); final CountDownLatch stopCalledLatch = new CountDownLatch(1); final AbstractRunnable runnable = new AbstractRunnable() { diff --git a/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/XPackPlugin.java b/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/XPackPlugin.java index b519da5d489..a47a847104b 100644 --- a/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/XPackPlugin.java +++ b/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/XPackPlugin.java @@ -26,6 +26,8 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptModule; import org.elasticsearch.shield.Security; import org.elasticsearch.shield.authc.AuthenticationModule; +import org.elasticsearch.threadpool.ExecutorBuilder; +import org.elasticsearch.threadpool.ThreadPoolModule; import org.elasticsearch.xpack.action.TransportXPackInfoAction; import org.elasticsearch.xpack.action.TransportXPackUsageAction; import org.elasticsearch.xpack.action.XPackInfoAction; @@ -51,6 +53,7 @@ import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; public class XPackPlugin extends Plugin { @@ -201,6 +204,11 @@ public class XPackPlugin extends Plugin { licensing.onModule(module); } + @Override + public List> getExecutorBuilders(final Settings settings) { + return watcher.getExecutorBuilders(settings); + } + public void onModule(NetworkModule module) { if (!transportClientMode) { module.registerRestHandler(RestXPackInfoAction.class); diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java index 3852826defd..2f8b1e1ce2f 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/Watcher.java @@ -20,13 +20,18 @@ import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsModule; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.threadpool.ExecutorBuilder; +import org.elasticsearch.threadpool.FixedExecutorBuilder; +import org.elasticsearch.threadpool.ThreadPoolModule; import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.common.init.LazyInitializationModule; import org.elasticsearch.xpack.watcher.actions.WatcherActionModule; import org.elasticsearch.xpack.watcher.client.WatcherClientModule; import org.elasticsearch.xpack.watcher.condition.ConditionModule; import org.elasticsearch.xpack.watcher.execution.ExecutionModule; +import org.elasticsearch.xpack.watcher.execution.InternalWatchExecutor; import org.elasticsearch.xpack.watcher.history.HistoryModule; import org.elasticsearch.xpack.watcher.history.HistoryStore; import org.elasticsearch.xpack.watcher.input.InputModule; @@ -132,14 +137,7 @@ public class Watcher { } public Settings additionalSettings() { - if (enabled == false || transportClient) { - return Settings.EMPTY; - } - Settings additionalSettings = Settings.builder() - .put(HistoryModule.additionalSettings(settings)) - .build(); - - return additionalSettings; + return Settings.EMPTY; } public void onModule(ScriptModule module) { @@ -171,6 +169,20 @@ public class Watcher { module.registerSetting(Setting.simpleString("xpack.watcher.start_immediately", Setting.Property.NodeScope)); } + public List> getExecutorBuilders(final Settings settings) { + if (XPackPlugin.featureEnabled(settings, Watcher.NAME, true)) { + final FixedExecutorBuilder builder = + new FixedExecutorBuilder( + settings, + InternalWatchExecutor.THREAD_POOL_NAME, + 5 * EsExecutors.boundedNumberOfProcessors(settings), + 1000, + "xpack.watcher.thread_pool"); + return Collections.singletonList(builder); + } + return Collections.emptyList(); + } + public void onModule(NetworkModule module) { if (enabled && transportClient == false) { module.registerRestHandler(RestPutWatchAction.class); diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/InternalWatchExecutor.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/InternalWatchExecutor.java index 4d270496c08..668bdc53d6b 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/InternalWatchExecutor.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/execution/InternalWatchExecutor.java @@ -6,37 +6,17 @@ package org.elasticsearch.xpack.watcher.execution; import org.elasticsearch.common.inject.Inject; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.watcher.Watcher; -import org.elasticsearch.xpack.watcher.support.ThreadPoolSettingsBuilder; import java.util.concurrent.BlockingQueue; import java.util.stream.Stream; -/** - * - */ public class InternalWatchExecutor implements WatchExecutor { public static final String THREAD_POOL_NAME = Watcher.NAME; - public static Settings additionalSettings(Settings nodeSettings) { - Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME); - if (!settings.names().isEmpty()) { - // the TP is already configured in the node settings - // no need for additional settings - return Settings.EMPTY; - } - int availableProcessors = EsExecutors.boundedNumberOfProcessors(nodeSettings); - return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME) - .size(5 * availableProcessors) - .queueSize(1000) - .build(); - } - private final ThreadPool threadPool; @Inject @@ -67,4 +47,5 @@ public class InternalWatchExecutor implements WatchExecutor { private EsThreadPoolExecutor executor() { return (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME); } + } diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryModule.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryModule.java index e9e65252e7d..3c325c0f0e0 100644 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryModule.java +++ b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/history/HistoryModule.java @@ -21,7 +21,4 @@ public class HistoryModule extends AbstractModule { bind(HistoryStore.class).asEagerSingleton(); } - public static Settings additionalSettings(Settings nodeSettings) { - return InternalWatchExecutor.additionalSettings(nodeSettings); - } } diff --git a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/ThreadPoolSettingsBuilder.java b/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/ThreadPoolSettingsBuilder.java deleted file mode 100644 index b477be19c25..00000000000 --- a/elasticsearch/x-pack/watcher/src/main/java/org/elasticsearch/xpack/watcher/support/ThreadPoolSettingsBuilder.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.watcher.support; - -import org.elasticsearch.common.settings.Settings; - -/** - * - */ -public abstract class ThreadPoolSettingsBuilder { - - public static Same same(String name) { - return new Same(name); - } - - protected final String name; - private final Settings.Builder builder = Settings.builder(); - - protected ThreadPoolSettingsBuilder(String name, String type) { - this.name = name; - put("type", type); - } - - public Settings build() { - return builder.build(); - } - - protected B put(String setting, Object value) { - builder.put("threadpool." + name + "." + setting, value); - return (B) this; - } - - protected B put(String setting, int value) { - builder.put("threadpool." + name + "." + setting, value); - return (B) this; - } - - public static class Same extends ThreadPoolSettingsBuilder { - public Same(String name) { - super(name, "same"); - } - } - - public static class Fixed extends ThreadPoolSettingsBuilder { - - public Fixed(String name) { - super(name, "fixed"); - } - - public Fixed size(int size) { - return put("size", size); - } - - public Fixed queueSize(int queueSize) { - return put("queue_size", queueSize); - } - } - -} diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/condition/script/ScriptConditionSearchTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/condition/script/ScriptConditionSearchTests.java index a8fc5743f6c..f814c67f1f9 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/condition/script/ScriptConditionSearchTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/condition/script/ScriptConditionSearchTests.java @@ -17,6 +17,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.watcher.support.Script; @@ -40,7 +41,7 @@ public class ScriptConditionSearchTests extends AbstractWatcherIntegrationTestCa @Before public void init() throws Exception { - tp = new ThreadPool(ThreadPool.Names.SAME); + tp = new TestThreadPool(ThreadPool.Names.SAME); scriptService = WatcherTestUtils.getScriptServiceProxy(tp); } diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/condition/script/ScriptConditionTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/condition/script/ScriptConditionTests.java index c24b79efd5e..b495fe3f3cc 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/condition/script/ScriptConditionTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/condition/script/ScriptConditionTests.java @@ -18,6 +18,7 @@ import org.elasticsearch.script.GeneralScriptException; import org.elasticsearch.script.ScriptService.ScriptType; import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.watcher.condition.Condition; import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext; @@ -48,7 +49,7 @@ public class ScriptConditionTests extends ESTestCase { @Before public void init() { - tp = new ThreadPool(ThreadPool.Names.SAME); + tp = new TestThreadPool(ThreadPool.Names.SAME); } @After diff --git a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/transform/script/ScriptTransformTests.java b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/transform/script/ScriptTransformTests.java index 910ee7a8341..6e971535e16 100644 --- a/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/transform/script/ScriptTransformTests.java +++ b/elasticsearch/x-pack/watcher/src/test/java/org/elasticsearch/xpack/watcher/transform/script/ScriptTransformTests.java @@ -15,6 +15,7 @@ import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.GeneralScriptException; import org.elasticsearch.script.ScriptService.ScriptType; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.watcher.support.Script; @@ -53,7 +54,7 @@ public class ScriptTransformTests extends ESTestCase { @Before public void init() { - tp = new ThreadPool(ThreadPool.Names.SAME); + tp = new TestThreadPool(ThreadPool.Names.SAME); } @After