Register watcher thread pool

This commit register the watcher thread pool in the thread pool module
in core, and also makes the necessary changes to reflect a refactoring
that took place in core.

Relates elastic/elasticsearch#2397

Original commit: elastic/x-pack-elasticsearch@be298a7578
This commit is contained in:
Jason Tedor 2016-06-06 22:09:58 -04:00
parent dacc22f57a
commit 576a543a28
26 changed files with 80 additions and 123 deletions

View File

@ -11,6 +11,7 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.groovy.GroovyPlugin; import org.elasticsearch.script.groovy.GroovyPlugin;
import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.condition.script.ExecutableScriptCondition; import org.elasticsearch.xpack.watcher.condition.script.ExecutableScriptCondition;
import org.elasticsearch.xpack.watcher.condition.script.ScriptCondition; import org.elasticsearch.xpack.watcher.condition.script.ScriptCondition;
@ -49,7 +50,7 @@ public class GroovyScriptConditionIT extends AbstractWatcherIntegrationTestCase
@BeforeClass @BeforeClass
public static void startThreadPool() { public static void startThreadPool() {
THREAD_POOL = new ThreadPool(GroovyScriptConditionIT.class.getSimpleName()); THREAD_POOL = new TestThreadPool(GroovyScriptConditionIT.class.getSimpleName());
} }
@Before @Before

View File

@ -18,6 +18,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.condition.script.ExecutableScriptCondition; import org.elasticsearch.xpack.watcher.condition.script.ExecutableScriptCondition;
import org.elasticsearch.xpack.watcher.condition.script.ScriptCondition; import org.elasticsearch.xpack.watcher.condition.script.ScriptCondition;
@ -50,7 +51,7 @@ public class ScriptConditionSearchIT extends AbstractWatcherIntegrationTestCase
@Before @Before
public void init() throws Exception { public void init() throws Exception {
tp = new ThreadPool(ThreadPool.Names.SAME); tp = new TestThreadPool(ThreadPool.Names.SAME);
scriptService = MessyTestUtils.getScriptServiceProxy(tp); scriptService = MessyTestUtils.getScriptServiceProxy(tp);
} }

View File

@ -17,6 +17,7 @@ import org.elasticsearch.script.GeneralScriptException;
import org.elasticsearch.script.ScriptService.ScriptType; import org.elasticsearch.script.ScriptService.ScriptType;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.condition.Condition; import org.elasticsearch.xpack.watcher.condition.Condition;
import org.elasticsearch.xpack.watcher.condition.script.ExecutableScriptCondition; import org.elasticsearch.xpack.watcher.condition.script.ExecutableScriptCondition;
@ -49,7 +50,7 @@ public class ScriptConditionTests extends ESTestCase {
@Before @Before
public void init() { public void init() {
tp = new ThreadPool(ThreadPool.Names.SAME); tp = new TestThreadPool(ThreadPool.Names.SAME);
} }
@After @After

View File

@ -30,6 +30,7 @@ import org.elasticsearch.marvel.agent.exporter.Exporters;
import org.elasticsearch.marvel.agent.exporter.MonitoringDoc; import org.elasticsearch.marvel.agent.exporter.MonitoringDoc;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport; import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.junit.After; import org.junit.After;
@ -74,7 +75,7 @@ public class TransportMonitoringBulkActionTests extends ESTestCase {
@BeforeClass @BeforeClass
public static void beforeClass() { public static void beforeClass() {
threadPool = new ThreadPool(TransportMonitoringBulkActionTests.class.getSimpleName()); threadPool = new TestThreadPool(TransportMonitoringBulkActionTests.class.getSimpleName());
} }
@AfterClass @AfterClass

View File

@ -11,6 +11,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.marvel.MonitoringSettings; import org.elasticsearch.marvel.MonitoringSettings;
import org.elasticsearch.marvel.MonitoringLicensee; import org.elasticsearch.marvel.MonitoringLicensee;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
@ -40,7 +41,7 @@ public class CleanerServiceTests extends ESTestCase {
@Before @Before
public void start() { public void start() {
clusterSettings = new ClusterSettings(Settings.EMPTY, Collections.singleton(MonitoringSettings.HISTORY_DURATION)); clusterSettings = new ClusterSettings(Settings.EMPTY, Collections.singleton(MonitoringSettings.HISTORY_DURATION));
threadPool = new ThreadPool("CleanerServiceTests"); threadPool = new TestThreadPool("CleanerServiceTests");
} }
@After @After

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.indices.breaker.CircuitBreakerModule; import org.elasticsearch.indices.breaker.CircuitBreakerModule;
import org.elasticsearch.shield.audit.logfile.LoggingAuditTrail; import org.elasticsearch.shield.audit.logfile.LoggingAuditTrail;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPoolModule; import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
@ -55,7 +56,7 @@ public class AuditTrailModuleTests extends ESTestCase {
.put(AuditTrailModule.ENABLED_SETTING.getKey(), true) .put(AuditTrailModule.ENABLED_SETTING.getKey(), true)
.put("client.type", "node") .put("client.type", "node")
.build(); .build();
ThreadPool pool = new ThreadPool("testLogFile"); ThreadPool pool = new TestThreadPool("testLogFile");
try { try {
SettingsModule settingsModule = new SettingsModule(settings); SettingsModule settingsModule = new SettingsModule(settings);
settingsModule.registerSetting(AuditTrailModule.ENABLED_SETTING); settingsModule.registerSetting(AuditTrailModule.ENABLED_SETTING);

View File

@ -25,6 +25,7 @@ import org.elasticsearch.shield.transport.filter.ShieldIpFilterRule;
import org.elasticsearch.shield.user.SystemUser; import org.elasticsearch.shield.user.SystemUser;
import org.elasticsearch.shield.user.User; import org.elasticsearch.shield.user.User;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportMessage; import org.elasticsearch.transport.TransportMessage;
@ -58,7 +59,7 @@ public class IndexAuditTrailMutedTests extends ESTestCase {
when(transport.boundAddress()).thenReturn(new BoundTransportAddress(new TransportAddress[] { DummyTransportAddress.INSTANCE }, when(transport.boundAddress()).thenReturn(new BoundTransportAddress(new TransportAddress[] { DummyTransportAddress.INSTANCE },
DummyTransportAddress.INSTANCE)); DummyTransportAddress.INSTANCE));
threadPool = new ThreadPool("index audit trail tests"); threadPool = new TestThreadPool("index audit trail tests");
transportClient = TransportClient.builder().settings(Settings.EMPTY).build(); transportClient = TransportClient.builder().settings(Settings.EMPTY).build();
clientCalled = new AtomicBoolean(false); clientCalled = new AtomicBoolean(false);
client = new InternalClient(transportClient) { client = new InternalClient(transportClient) {

View File

@ -43,6 +43,7 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.ShieldIntegTestCase; import org.elasticsearch.test.ShieldIntegTestCase;
import org.elasticsearch.test.ShieldSettingsSource; import org.elasticsearch.test.ShieldSettingsSource;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportInfo; import org.elasticsearch.transport.TransportInfo;
@ -260,7 +261,7 @@ public class IndexAuditTrailTests extends ShieldIntegTestCase {
BoundTransportAddress boundTransportAddress = new BoundTransportAddress(new TransportAddress[]{DummyTransportAddress.INSTANCE}, BoundTransportAddress boundTransportAddress = new BoundTransportAddress(new TransportAddress[]{DummyTransportAddress.INSTANCE},
DummyTransportAddress.INSTANCE); DummyTransportAddress.INSTANCE);
when(transport.boundAddress()).thenReturn(boundTransportAddress); when(transport.boundAddress()).thenReturn(boundTransportAddress);
threadPool = new ThreadPool("index audit trail tests"); threadPool = new TestThreadPool("index audit trail tests");
enqueuedMessage = new SetOnce<>(); enqueuedMessage = new SetOnce<>();
auditor = new IndexAuditTrail(settings, transport, Providers.of(internalClient()), threadPool, mock(ClusterService.class)) { auditor = new IndexAuditTrail(settings, transport, Providers.of(internalClient()), threadPool, mock(ClusterService.class)) {
@Override @Override

View File

@ -14,6 +14,7 @@ import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.test.ShieldIntegTestCase; import org.elasticsearch.test.ShieldIntegTestCase;
import org.elasticsearch.test.rest.FakeRestRequest; import org.elasticsearch.test.rest.FakeRestRequest;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.Transport;
import org.junit.After; import org.junit.After;
@ -39,7 +40,7 @@ public class IndexAuditTrailUpdateMappingTests extends ShieldIntegTestCase {
@Before @Before
public void setup() { public void setup() {
threadPool = new ThreadPool("index audit trail update mapping tests"); threadPool = new TestThreadPool("index audit trail update mapping tests");
} }
public void testMappingIsUpdated() throws Exception { public void testMappingIsUpdated() throws Exception {

View File

@ -22,6 +22,7 @@ import org.elasticsearch.shield.authc.support.SecuredString;
import org.elasticsearch.shield.authc.support.SecuredStringTests; import org.elasticsearch.shield.authc.support.SecuredStringTests;
import org.elasticsearch.shield.authc.support.UsernamePasswordToken; import org.elasticsearch.shield.authc.support.UsernamePasswordToken;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
import org.junit.After; import org.junit.After;
@ -92,7 +93,7 @@ public class ActiveDirectoryRealmTests extends ESTestCase {
directoryServer.startListening(); directoryServer.startListening();
directoryServers[i] = directoryServer; directoryServers[i] = directoryServer;
} }
threadPool = new ThreadPool("active directory realm tests"); threadPool = new TestThreadPool("active directory realm tests");
resourceWatcherService = new ResourceWatcherService(Settings.EMPTY, threadPool); resourceWatcherService = new ResourceWatcherService(Settings.EMPTY, threadPool);
globalSettings = Settings.builder().put("path.home", createTempDir()).build(); globalSettings = Settings.builder().put("path.home", createTempDir()).build();
} }

View File

@ -16,6 +16,7 @@ import org.elasticsearch.shield.authc.support.Hasher;
import org.elasticsearch.shield.authc.support.RefreshListener; import org.elasticsearch.shield.authc.support.RefreshListener;
import org.elasticsearch.shield.authc.support.SecuredStringTests; import org.elasticsearch.shield.authc.support.SecuredStringTests;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
import org.junit.After; import org.junit.After;
@ -62,7 +63,7 @@ public class FileUserPasswdStoreTests extends ESTestCase {
.put("path.home", createTempDir()) .put("path.home", createTempDir())
.build(); .build();
env = new Environment(settings); env = new Environment(settings);
threadPool = new ThreadPool("test"); threadPool = new TestThreadPool("test");
} }
@After @After

View File

@ -15,6 +15,7 @@ import org.elasticsearch.shield.audit.logfile.CapturingLogger;
import org.elasticsearch.shield.authc.RealmConfig; import org.elasticsearch.shield.authc.RealmConfig;
import org.elasticsearch.shield.authc.support.RefreshListener; import org.elasticsearch.shield.authc.support.RefreshListener;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackPlugin;
@ -63,7 +64,7 @@ public class FileUserRolesStoreTests extends ESTestCase {
.put("path.home", createTempDir()) .put("path.home", createTempDir())
.build(); .build();
env = new Environment(settings); env = new Environment(settings);
threadPool = new ThreadPool("test"); threadPool = new TestThreadPool("test");
} }
@After @After
@ -224,7 +225,7 @@ public class FileUserRolesStoreTests extends ESTestCase {
public void testParseFileEmptyRolesDoesNotCauseNPE() throws Exception { public void testParseFileEmptyRolesDoesNotCauseNPE() throws Exception {
ThreadPool threadPool = null; ThreadPool threadPool = null;
try { try {
threadPool = new ThreadPool("test"); threadPool = new TestThreadPool("test");
Path usersRoles = writeUsersRoles("role1:admin"); Path usersRoles = writeUsersRoles("role1:admin");
Settings settings = Settings.builder() Settings settings = Settings.builder()

View File

@ -15,6 +15,7 @@ import org.elasticsearch.shield.authc.support.SecuredString;
import org.elasticsearch.shield.authc.support.SecuredStringTests; import org.elasticsearch.shield.authc.support.SecuredStringTests;
import org.elasticsearch.shield.authc.support.UsernamePasswordToken; import org.elasticsearch.shield.authc.support.UsernamePasswordToken;
import org.elasticsearch.shield.user.User; import org.elasticsearch.shield.user.User;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
import org.junit.After; import org.junit.After;
@ -49,7 +50,7 @@ public class LdapRealmTests extends LdapTestCase {
@Before @Before
public void init() throws Exception { public void init() throws Exception {
threadPool = new ThreadPool("ldap realm tests"); threadPool = new TestThreadPool("ldap realm tests");
resourceWatcherService = new ResourceWatcherService(Settings.EMPTY, threadPool); resourceWatcherService = new ResourceWatcherService(Settings.EMPTY, threadPool);
globalSettings = Settings.builder().put("path.home", createTempDir()).build(); globalSettings = Settings.builder().put("path.home", createTempDir()).build();
} }

View File

@ -13,6 +13,7 @@ import org.elasticsearch.shield.authc.RealmConfig;
import org.elasticsearch.shield.authc.activedirectory.ActiveDirectoryRealm; import org.elasticsearch.shield.authc.activedirectory.ActiveDirectoryRealm;
import org.elasticsearch.shield.authc.ldap.LdapRealm; import org.elasticsearch.shield.authc.ldap.LdapRealm;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
import org.junit.After; import org.junit.After;
@ -69,7 +70,7 @@ public class DnRoleMapperTests extends ESTestCase {
.put("path.home", createTempDir()) .put("path.home", createTempDir())
.build(); .build();
env = new Environment(settings); env = new Environment(settings);
threadPool = new ThreadPool("test"); threadPool = new TestThreadPool("test");
} }
@After @After

View File

@ -17,6 +17,7 @@ import org.elasticsearch.shield.authz.permission.RunAsPermission;
import org.elasticsearch.shield.authz.privilege.ClusterPrivilege; import org.elasticsearch.shield.authz.privilege.ClusterPrivilege;
import org.elasticsearch.shield.authz.privilege.IndexPrivilege; import org.elasticsearch.shield.authz.privilege.IndexPrivilege;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackPlugin;
@ -257,7 +258,7 @@ public class FileRolesStoreTests extends ESTestCase {
.build(); .build();
Environment env = new Environment(settings); Environment env = new Environment(settings);
threadPool = new ThreadPool("test"); threadPool = new TestThreadPool("test");
watcherService = new ResourceWatcherService(settings, threadPool); watcherService = new ResourceWatcherService(settings, threadPool);
final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch latch = new CountDownLatch(1);
FileRolesStore store = new FileRolesStore(settings, env, watcherService, new RefreshListener() { FileRolesStore store = new FileRolesStore(settings, env, watcherService, new RefreshListener() {

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
import org.junit.After; import org.junit.After;
@ -53,7 +54,7 @@ public class InternalCryptoServiceTests extends ESTestCase {
.put("path.home", createTempDir()) .put("path.home", createTempDir())
.build(); .build();
env = new Environment(settings); env = new Environment(settings);
threadPool = new ThreadPool("test"); threadPool = new TestThreadPool("test");
watcherService = new ResourceWatcherService(settings, threadPool); watcherService = new ResourceWatcherService(settings, threadPool);
watcherService.start(); watcherService.start();
} }

View File

@ -15,6 +15,7 @@ import org.elasticsearch.shield.ssl.SSLConfiguration.Custom;
import org.elasticsearch.shield.ssl.SSLConfiguration.Global; import org.elasticsearch.shield.ssl.SSLConfiguration.Global;
import org.elasticsearch.shield.ssl.TrustConfig.Reloadable.Listener; import org.elasticsearch.shield.ssl.TrustConfig.Reloadable.Listener;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
@ -321,7 +322,7 @@ public class SSLConfigurationTests extends ESTestCase {
AtomicReference<Exception> exceptionRef = new AtomicReference<>(); AtomicReference<Exception> exceptionRef = new AtomicReference<>();
Listener listener = createRefreshListener(latch, exceptionRef); Listener listener = createRefreshListener(latch, exceptionRef);
ThreadPool threadPool = new ThreadPool("reload"); ThreadPool threadPool = new TestThreadPool("reload");
try { try {
ResourceWatcherService resourceWatcherService = ResourceWatcherService resourceWatcherService =
new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start(); new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start();
@ -384,7 +385,7 @@ public class SSLConfigurationTests extends ESTestCase {
AtomicReference<Exception> exceptionRef = new AtomicReference<>(); AtomicReference<Exception> exceptionRef = new AtomicReference<>();
Listener listener = createRefreshListener(latch, exceptionRef); Listener listener = createRefreshListener(latch, exceptionRef);
ThreadPool threadPool = new ThreadPool("reload pem"); ThreadPool threadPool = new TestThreadPool("reload pem");
try { try {
ResourceWatcherService resourceWatcherService = ResourceWatcherService resourceWatcherService =
new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start(); new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start();
@ -460,7 +461,7 @@ public class SSLConfigurationTests extends ESTestCase {
AtomicReference<Exception> exceptionRef = new AtomicReference<>(); AtomicReference<Exception> exceptionRef = new AtomicReference<>();
Listener listener = createRefreshListener(latch, exceptionRef); Listener listener = createRefreshListener(latch, exceptionRef);
ThreadPool threadPool = new ThreadPool("reload"); ThreadPool threadPool = new TestThreadPool("reload");
try { try {
ResourceWatcherService resourceWatcherService = ResourceWatcherService resourceWatcherService =
new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start(); new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start();
@ -506,7 +507,7 @@ public class SSLConfigurationTests extends ESTestCase {
AtomicReference<Exception> exceptionRef = new AtomicReference<>(); AtomicReference<Exception> exceptionRef = new AtomicReference<>();
Listener listener = createRefreshListener(latch, exceptionRef); Listener listener = createRefreshListener(latch, exceptionRef);
ThreadPool threadPool = new ThreadPool("reload"); ThreadPool threadPool = new TestThreadPool("reload");
try { try {
ResourceWatcherService resourceWatcherService = ResourceWatcherService resourceWatcherService =
new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start(); new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start();
@ -554,7 +555,7 @@ public class SSLConfigurationTests extends ESTestCase {
AtomicReference<Exception> exceptionRef = new AtomicReference<>(); AtomicReference<Exception> exceptionRef = new AtomicReference<>();
Listener listener = createRefreshListener(latch, exceptionRef); Listener listener = createRefreshListener(latch, exceptionRef);
ThreadPool threadPool = new ThreadPool("reload"); ThreadPool threadPool = new TestThreadPool("reload");
try { try {
ResourceWatcherService resourceWatcherService = ResourceWatcherService resourceWatcherService =
new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start(); new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start();
@ -603,7 +604,7 @@ public class SSLConfigurationTests extends ESTestCase {
AtomicReference<Exception> exceptionRef = new AtomicReference<>(); AtomicReference<Exception> exceptionRef = new AtomicReference<>();
Listener listener = createRefreshListener(latch, exceptionRef); Listener listener = createRefreshListener(latch, exceptionRef);
ThreadPool threadPool = new ThreadPool("reload pem"); ThreadPool threadPool = new TestThreadPool("reload pem");
try { try {
ResourceWatcherService resourceWatcherService = ResourceWatcherService resourceWatcherService =
new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start(); new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start();
@ -654,7 +655,7 @@ public class SSLConfigurationTests extends ESTestCase {
AtomicReference<Exception> exceptionRef = new AtomicReference<>(); AtomicReference<Exception> exceptionRef = new AtomicReference<>();
Listener listener = createRefreshListener(latch, exceptionRef); Listener listener = createRefreshListener(latch, exceptionRef);
ThreadPool threadPool = new ThreadPool("reload"); ThreadPool threadPool = new TestThreadPool("reload");
try { try {
ResourceWatcherService resourceWatcherService = ResourceWatcherService resourceWatcherService =
new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start(); new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start();
@ -693,7 +694,7 @@ public class SSLConfigurationTests extends ESTestCase {
AtomicReference<Exception> exceptionRef = new AtomicReference<>(); AtomicReference<Exception> exceptionRef = new AtomicReference<>();
Listener listener = createRefreshListener(latch, exceptionRef); Listener listener = createRefreshListener(latch, exceptionRef);
ThreadPool threadPool = new ThreadPool("reload"); ThreadPool threadPool = new TestThreadPool("reload");
try { try {
ResourceWatcherService resourceWatcherService = ResourceWatcherService resourceWatcherService =
new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start(); new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool).start();

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names; import org.elasticsearch.threadpool.ThreadPool.Names;
@ -189,7 +190,7 @@ public class SelfReschedulingRunnableTests extends ESTestCase {
} }
public void testStopPreventsRunning() throws Exception { public void testStopPreventsRunning() throws Exception {
final ThreadPool threadPool = new ThreadPool("test-stop-self-schedule"); final ThreadPool threadPool = new TestThreadPool("test-stop-self-schedule");
final AtomicInteger failureCounter = new AtomicInteger(0); final AtomicInteger failureCounter = new AtomicInteger(0);
final AtomicInteger runCounter = new AtomicInteger(0); final AtomicInteger runCounter = new AtomicInteger(0);
final AbstractRunnable runnable = new AbstractRunnable() { final AbstractRunnable runnable = new AbstractRunnable() {
@ -232,7 +233,7 @@ public class SelfReschedulingRunnableTests extends ESTestCase {
} }
public void testStopPreventsRescheduling() throws Exception { public void testStopPreventsRescheduling() throws Exception {
final ThreadPool threadPool = new ThreadPool("test-stop-self-schedule"); final ThreadPool threadPool = new TestThreadPool("test-stop-self-schedule");
final CountDownLatch threadRunningLatch = new CountDownLatch(randomIntBetween(1, 16)); final CountDownLatch threadRunningLatch = new CountDownLatch(randomIntBetween(1, 16));
final CountDownLatch stopCalledLatch = new CountDownLatch(1); final CountDownLatch stopCalledLatch = new CountDownLatch(1);
final AbstractRunnable runnable = new AbstractRunnable() { final AbstractRunnable runnable = new AbstractRunnable() {

View File

@ -26,6 +26,8 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptModule; import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.shield.Security; import org.elasticsearch.shield.Security;
import org.elasticsearch.shield.authc.AuthenticationModule; import org.elasticsearch.shield.authc.AuthenticationModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.xpack.action.TransportXPackInfoAction; import org.elasticsearch.xpack.action.TransportXPackInfoAction;
import org.elasticsearch.xpack.action.TransportXPackUsageAction; import org.elasticsearch.xpack.action.TransportXPackUsageAction;
import org.elasticsearch.xpack.action.XPackInfoAction; import org.elasticsearch.xpack.action.XPackInfoAction;
@ -51,6 +53,7 @@ import java.security.PrivilegedAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.List;
public class XPackPlugin extends Plugin { public class XPackPlugin extends Plugin {
@ -201,6 +204,11 @@ public class XPackPlugin extends Plugin {
licensing.onModule(module); licensing.onModule(module);
} }
@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(final Settings settings) {
return watcher.getExecutorBuilders(settings);
}
public void onModule(NetworkModule module) { public void onModule(NetworkModule module) {
if (!transportClientMode) { if (!transportClientMode) {
module.registerRestHandler(RestXPackInfoAction.class); module.registerRestHandler(RestXPackInfoAction.class);

View File

@ -20,13 +20,18 @@ import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.settings.SettingsModule;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.script.ScriptModule; import org.elasticsearch.script.ScriptModule;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.FixedExecutorBuilder;
import org.elasticsearch.threadpool.ThreadPoolModule;
import org.elasticsearch.xpack.XPackPlugin; import org.elasticsearch.xpack.XPackPlugin;
import org.elasticsearch.xpack.common.init.LazyInitializationModule; import org.elasticsearch.xpack.common.init.LazyInitializationModule;
import org.elasticsearch.xpack.watcher.actions.WatcherActionModule; import org.elasticsearch.xpack.watcher.actions.WatcherActionModule;
import org.elasticsearch.xpack.watcher.client.WatcherClientModule; import org.elasticsearch.xpack.watcher.client.WatcherClientModule;
import org.elasticsearch.xpack.watcher.condition.ConditionModule; import org.elasticsearch.xpack.watcher.condition.ConditionModule;
import org.elasticsearch.xpack.watcher.execution.ExecutionModule; import org.elasticsearch.xpack.watcher.execution.ExecutionModule;
import org.elasticsearch.xpack.watcher.execution.InternalWatchExecutor;
import org.elasticsearch.xpack.watcher.history.HistoryModule; import org.elasticsearch.xpack.watcher.history.HistoryModule;
import org.elasticsearch.xpack.watcher.history.HistoryStore; import org.elasticsearch.xpack.watcher.history.HistoryStore;
import org.elasticsearch.xpack.watcher.input.InputModule; import org.elasticsearch.xpack.watcher.input.InputModule;
@ -132,15 +137,8 @@ public class Watcher {
} }
public Settings additionalSettings() { public Settings additionalSettings() {
if (enabled == false || transportClient) {
return Settings.EMPTY; return Settings.EMPTY;
} }
Settings additionalSettings = Settings.builder()
.put(HistoryModule.additionalSettings(settings))
.build();
return additionalSettings;
}
public void onModule(ScriptModule module) { public void onModule(ScriptModule module) {
module.registerScriptContext(ScriptServiceProxy.INSTANCE); module.registerScriptContext(ScriptServiceProxy.INSTANCE);
@ -171,6 +169,20 @@ public class Watcher {
module.registerSetting(Setting.simpleString("xpack.watcher.start_immediately", Setting.Property.NodeScope)); module.registerSetting(Setting.simpleString("xpack.watcher.start_immediately", Setting.Property.NodeScope));
} }
public List<ExecutorBuilder<?>> getExecutorBuilders(final Settings settings) {
if (XPackPlugin.featureEnabled(settings, Watcher.NAME, true)) {
final FixedExecutorBuilder builder =
new FixedExecutorBuilder(
settings,
InternalWatchExecutor.THREAD_POOL_NAME,
5 * EsExecutors.boundedNumberOfProcessors(settings),
1000,
"xpack.watcher.thread_pool");
return Collections.singletonList(builder);
}
return Collections.emptyList();
}
public void onModule(NetworkModule module) { public void onModule(NetworkModule module) {
if (enabled && transportClient == false) { if (enabled && transportClient == false) {
module.registerRestHandler(RestPutWatchAction.class); module.registerRestHandler(RestPutWatchAction.class);

View File

@ -6,37 +6,17 @@
package org.elasticsearch.xpack.watcher.execution; package org.elasticsearch.xpack.watcher.execution;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.Watcher; import org.elasticsearch.xpack.watcher.Watcher;
import org.elasticsearch.xpack.watcher.support.ThreadPoolSettingsBuilder;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.stream.Stream; import java.util.stream.Stream;
/**
*
*/
public class InternalWatchExecutor implements WatchExecutor { public class InternalWatchExecutor implements WatchExecutor {
public static final String THREAD_POOL_NAME = Watcher.NAME; public static final String THREAD_POOL_NAME = Watcher.NAME;
public static Settings additionalSettings(Settings nodeSettings) {
Settings settings = nodeSettings.getAsSettings("threadpool." + THREAD_POOL_NAME);
if (!settings.names().isEmpty()) {
// the TP is already configured in the node settings
// no need for additional settings
return Settings.EMPTY;
}
int availableProcessors = EsExecutors.boundedNumberOfProcessors(nodeSettings);
return new ThreadPoolSettingsBuilder.Fixed(THREAD_POOL_NAME)
.size(5 * availableProcessors)
.queueSize(1000)
.build();
}
private final ThreadPool threadPool; private final ThreadPool threadPool;
@Inject @Inject
@ -67,4 +47,5 @@ public class InternalWatchExecutor implements WatchExecutor {
private EsThreadPoolExecutor executor() { private EsThreadPoolExecutor executor() {
return (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME); return (EsThreadPoolExecutor) threadPool.executor(THREAD_POOL_NAME);
} }
} }

View File

@ -21,7 +21,4 @@ public class HistoryModule extends AbstractModule {
bind(HistoryStore.class).asEagerSingleton(); bind(HistoryStore.class).asEagerSingleton();
} }
public static Settings additionalSettings(Settings nodeSettings) {
return InternalWatchExecutor.additionalSettings(nodeSettings);
}
} }

View File

@ -1,62 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.watcher.support;
import org.elasticsearch.common.settings.Settings;
/**
*
*/
public abstract class ThreadPoolSettingsBuilder<B extends ThreadPoolSettingsBuilder> {
public static Same same(String name) {
return new Same(name);
}
protected final String name;
private final Settings.Builder builder = Settings.builder();
protected ThreadPoolSettingsBuilder(String name, String type) {
this.name = name;
put("type", type);
}
public Settings build() {
return builder.build();
}
protected B put(String setting, Object value) {
builder.put("threadpool." + name + "." + setting, value);
return (B) this;
}
protected B put(String setting, int value) {
builder.put("threadpool." + name + "." + setting, value);
return (B) this;
}
public static class Same extends ThreadPoolSettingsBuilder<Same> {
public Same(String name) {
super(name, "same");
}
}
public static class Fixed extends ThreadPoolSettingsBuilder<Fixed> {
public Fixed(String name) {
super(name, "fixed");
}
public Fixed size(int size) {
return put("size", size);
}
public Fixed queueSize(int queueSize) {
return put("queue_size", queueSize);
}
}
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.internal.InternalSearchHit; import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits; import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.Script; import org.elasticsearch.xpack.watcher.support.Script;
@ -40,7 +41,7 @@ public class ScriptConditionSearchTests extends AbstractWatcherIntegrationTestCa
@Before @Before
public void init() throws Exception { public void init() throws Exception {
tp = new ThreadPool(ThreadPool.Names.SAME); tp = new TestThreadPool(ThreadPool.Names.SAME);
scriptService = WatcherTestUtils.getScriptServiceProxy(tp); scriptService = WatcherTestUtils.getScriptServiceProxy(tp);
} }

View File

@ -18,6 +18,7 @@ import org.elasticsearch.script.GeneralScriptException;
import org.elasticsearch.script.ScriptService.ScriptType; import org.elasticsearch.script.ScriptService.ScriptType;
import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.condition.Condition; import org.elasticsearch.xpack.watcher.condition.Condition;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
@ -48,7 +49,7 @@ public class ScriptConditionTests extends ESTestCase {
@Before @Before
public void init() { public void init() {
tp = new ThreadPool(ThreadPool.Names.SAME); tp = new TestThreadPool(ThreadPool.Names.SAME);
} }
@After @After

View File

@ -15,6 +15,7 @@ import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.GeneralScriptException; import org.elasticsearch.script.GeneralScriptException;
import org.elasticsearch.script.ScriptService.ScriptType; import org.elasticsearch.script.ScriptService.ScriptType;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext; import org.elasticsearch.xpack.watcher.execution.WatchExecutionContext;
import org.elasticsearch.xpack.watcher.support.Script; import org.elasticsearch.xpack.watcher.support.Script;
@ -53,7 +54,7 @@ public class ScriptTransformTests extends ESTestCase {
@Before @Before
public void init() { public void init() {
tp = new ThreadPool(ThreadPool.Names.SAME); tp = new TestThreadPool(ThreadPool.Names.SAME);
} }
@After @After