Start resource watcher service early (#55275)

The ResourceWatcherService enables watching of files for modifications
and deletions. During startup various consumers register the files that
should be watched by this service. There is behavior that might be
unexpected in that the service may not start polling until later in the
startup process due to the use of lifecycle states to control when the
service actually starts the jobs to monitor resources. This change
removes this unexpected behavior so that upon construction the service
has already registered its tasks to poll resources for changes. In
making this modification, the service no longer extends
AbstractLifecycleComponent and instead implements the Closeable
interface so that the polling jobs can be terminated when the service
is no longer required.

Relates #54867
Backport of #54993
This commit is contained in:
Jay Modi 2020-04-15 20:45:39 -06:00 committed by GitHub
parent f75e3b4551
commit 2d9e3c7794
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 242 additions and 252 deletions

View File

@ -341,6 +341,8 @@ public class Node implements Closeable {
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS));
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
resourcesToClose.add(resourceWatcherService);
// adds the context to the DeprecationLogger so that it does not need to be injected everywhere
DeprecationLogger.setThreadContext(threadPool.getThreadContext());
resourcesToClose.add(() -> DeprecationLogger.removeThreadContext(threadPool.getThreadContext()));
@ -351,7 +353,7 @@ public class Node implements Closeable {
additionalSettings.addAll(builder.getRegisteredSettings());
}
client = new NodeClient(settings, threadPool);
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
final ScriptModule scriptModule = new ScriptModule(settings, pluginsService.filterPlugins(ScriptPlugin.class));
final ScriptService scriptService = newScriptService(settings, scriptModule.engines, scriptModule.contexts);
AnalysisModule analysisModule = new AnalysisModule(this.environment, pluginsService.filterPlugins(AnalysisPlugin.class));
@ -367,7 +369,6 @@ public class Node implements Closeable {
final SettingsModule settingsModule =
new SettingsModule(settings, additionalSettings, additionalSettingsFilter, settingsUpgraders);
scriptModule.registerClusterSettingsListeners(scriptService, settingsModule.getClusterSettings());
resourcesToClose.add(resourceWatcherService);
final NetworkService networkService = new NetworkService(
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class)));
@ -716,7 +717,6 @@ public class Node implements Closeable {
nodeConnectionsService.start();
clusterService.setNodeConnectionsService(nodeConnectionsService);
injector.getInstance(ResourceWatcherService.class).start();
injector.getInstance(GatewayService.class).start();
Discovery discovery = injector.getInstance(Discovery.class);
clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
@ -830,7 +830,7 @@ public class Node implements Closeable {
}
logger.info("stopping ...");
injector.getInstance(ResourceWatcherService.class).stop();
injector.getInstance(ResourceWatcherService.class).close();
injector.getInstance(HttpServerTransport.class).stop();
injector.getInstance(SnapshotsService.class).stop();

View File

@ -20,8 +20,6 @@ package org.elasticsearch.watcher;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
@ -30,6 +28,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.Scheduler.Cancellable;
import org.elasticsearch.threadpool.ThreadPool.Names;
import java.io.Closeable;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@ -42,7 +41,7 @@ import java.util.concurrent.CopyOnWriteArraySet;
* registered watcher periodically. The frequency of checks can be specified using {@code resource.reload.interval} setting, which
* defaults to {@code 60s}. The service can be disabled by setting {@code resource.reload.enabled} setting to {@code false}.
*/
public class ResourceWatcherService extends AbstractLifecycleComponent {
public class ResourceWatcherService implements Closeable {
private static final Logger logger = LogManager.getLogger(ResourceWatcherService.class);
public enum Frequency {
@ -78,20 +77,17 @@ public class ResourceWatcherService extends AbstractLifecycleComponent {
Setting.timeSetting("resource.reload.interval.low", Frequency.LOW.interval, Property.NodeScope);
private final boolean enabled;
private final ThreadPool threadPool;
final ResourceMonitor lowMonitor;
final ResourceMonitor mediumMonitor;
final ResourceMonitor highMonitor;
private volatile Cancellable lowFuture;
private volatile Cancellable mediumFuture;
private volatile Cancellable highFuture;
private final Cancellable lowFuture;
private final Cancellable mediumFuture;
private final Cancellable highFuture;
@Inject
public ResourceWatcherService(Settings settings, ThreadPool threadPool) {
this.enabled = ENABLED.get(settings);
this.threadPool = threadPool;
TimeValue interval = RELOAD_INTERVAL_LOW.get(settings);
lowMonitor = new ResourceMonitor(interval, Frequency.LOW);
@ -99,30 +95,24 @@ public class ResourceWatcherService extends AbstractLifecycleComponent {
mediumMonitor = new ResourceMonitor(interval, Frequency.MEDIUM);
interval = RELOAD_INTERVAL_HIGH.get(settings);
highMonitor = new ResourceMonitor(interval, Frequency.HIGH);
}
@Override
protected void doStart() {
if (!enabled) {
return;
if (enabled) {
lowFuture = threadPool.scheduleWithFixedDelay(lowMonitor, lowMonitor.interval, Names.SAME);
mediumFuture = threadPool.scheduleWithFixedDelay(mediumMonitor, mediumMonitor.interval, Names.SAME);
highFuture = threadPool.scheduleWithFixedDelay(highMonitor, highMonitor.interval, Names.SAME);
} else {
lowFuture = null;
mediumFuture = null;
highFuture = null;
}
lowFuture = threadPool.scheduleWithFixedDelay(lowMonitor, lowMonitor.interval, Names.SAME);
mediumFuture = threadPool.scheduleWithFixedDelay(mediumMonitor, mediumMonitor.interval, Names.SAME);
highFuture = threadPool.scheduleWithFixedDelay(highMonitor, highMonitor.interval, Names.SAME);
}
@Override
protected void doStop() {
if (!enabled) {
return;
public void close() {
if (enabled) {
lowFuture.cancel();
mediumFuture.cancel();
highFuture.cancel();
}
lowFuture.cancel();
mediumFuture.cancel();
highFuture.cancel();
}
@Override
protected void doClose() {
}
/**
@ -149,10 +139,6 @@ public class ResourceWatcherService extends AbstractLifecycleComponent {
}
}
public void notifyNow() {
notifyNow(Frequency.MEDIUM);
}
public void notifyNow(Frequency frequency) {
switch (frequency) {
case LOW:
@ -169,7 +155,7 @@ public class ResourceWatcherService extends AbstractLifecycleComponent {
}
}
class ResourceMonitor implements Runnable {
static class ResourceMonitor implements Runnable {
final TimeValue interval;
final Frequency frequency;
@ -188,7 +174,7 @@ public class ResourceWatcherService extends AbstractLifecycleComponent {
@Override
public synchronized void run() {
for(ResourceWatcher watcher : watchers) {
for (ResourceWatcher watcher : watchers) {
try {
watcher.checkAndNotify();
} catch (IOException e) {

View File

@ -36,9 +36,7 @@ public class OperationModeFileWatcherTests extends ESTestCase {
Settings settings = Settings.builder()
.put("resource.reload.interval.high", "10ms")
.build();
watcherService = new ResourceWatcherService(settings,
threadPool);
watcherService.start();
watcherService = new ResourceWatcherService(settings, threadPool);
licenseModePath = createTempFile();
onChangeCounter = new AtomicReference<>(new CountDownLatch(1));
operationModeFileWatcher = new OperationModeFileWatcher(watcherService, licenseModePath, logger,
@ -47,8 +45,8 @@ public class OperationModeFileWatcherTests extends ESTestCase {
@After
public void shutdown() throws InterruptedException {
watcherService.close();
terminate(threadPool);
watcherService.stop();
}
public void testInit() throws Exception {

View File

@ -86,11 +86,13 @@ public class SSLConfigurationReloaderTests extends ESTestCase {
threadPool = new TestThreadPool("reload tests");
resourceWatcherService =
new ResourceWatcherService(Settings.builder().put("resource.reload.interval.high", "1s").build(), threadPool);
resourceWatcherService.start();
}
@After
public void cleanup() {
if (resourceWatcherService != null) {
resourceWatcherService.close();
}
if (threadPool != null) {
terminate(threadPool);
}

View File

@ -76,9 +76,10 @@ public class FileUserPasswdStoreTests extends ESTestCase {
Files.write(file, Collections.singletonList("aldlfkjldjdflkjd"), StandardCharsets.UTF_16);
RealmConfig config = getRealmConfig();
ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool);
FileUserPasswdStore store = new FileUserPasswdStore(config, watcherService);
assertThat(store.usersCount(), is(0));
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
FileUserPasswdStore store = new FileUserPasswdStore(config, watcherService);
assertThat(store.usersCount(), is(0));
}
}
public void testStore_AutoReload() throws Exception {
@ -89,47 +90,46 @@ public class FileUserPasswdStoreTests extends ESTestCase {
Files.copy(users, file, StandardCopyOption.REPLACE_EXISTING);
final Hasher hasher = Hasher.resolve(settings.get("xpack.security.authc.password_hashing.algorithm"));
RealmConfig config = getRealmConfig();
ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool);
final CountDownLatch latch = new CountDownLatch(1);
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
final CountDownLatch latch = new CountDownLatch(1);
FileUserPasswdStore store = new FileUserPasswdStore(config, watcherService, latch::countDown);
//Test users share the hashing algorithm name for convenience
String username = settings.get("xpack.security.authc.password_hashing.algorithm");
User user = new User(username);
assertThat(store.userExists(username), is(true));
AuthenticationResult result = store.verifyPassword(username, new SecureString("test123"), () -> user);
assertThat(result.getStatus(), is(AuthenticationResult.Status.SUCCESS));
assertThat(result.getUser(), is(user));
FileUserPasswdStore store = new FileUserPasswdStore(config, watcherService, latch::countDown);
//Test users share the hashing algorithm name for convenience
String username = settings.get("xpack.security.authc.password_hashing.algorithm");
User user = new User(username);
assertThat(store.userExists(username), is(true));
AuthenticationResult result = store.verifyPassword(username, new SecureString("test123"), () -> user);
assertThat(result.getStatus(), is(AuthenticationResult.Status.SUCCESS));
assertThat(result.getUser(), is(user));
watcherService.start();
try (BufferedWriter writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.append("\n");
}
try (BufferedWriter writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.append("\n");
watcherService.notifyNow(ResourceWatcherService.Frequency.HIGH);
if (latch.getCount() != 1) {
fail("Listener should not be called as users passwords are not changed.");
}
assertThat(store.userExists(username), is(true));
result = store.verifyPassword(username, new SecureString("test123"), () -> user);
assertThat(result.getStatus(), is(AuthenticationResult.Status.SUCCESS));
assertThat(result.getUser(), is(user));
try (BufferedWriter writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.newLine();
writer.append("foobar:").append(new String(hasher.hash(new SecureString("barfoo"))));
}
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("Waited too long for the updated file to be picked up");
}
assertThat(store.userExists("foobar"), is(true));
result = store.verifyPassword("foobar", new SecureString("barfoo"), () -> user);
assertThat(result.getStatus(), is(AuthenticationResult.Status.SUCCESS));
assertThat(result.getUser(), is(user));
}
watcherService.notifyNow(ResourceWatcherService.Frequency.HIGH);
if (latch.getCount() != 1) {
fail("Listener should not be called as users passwords are not changed.");
}
assertThat(store.userExists(username), is(true));
result = store.verifyPassword(username, new SecureString("test123"), () -> user);
assertThat(result.getStatus(), is(AuthenticationResult.Status.SUCCESS));
assertThat(result.getUser(), is(user));
try (BufferedWriter writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.newLine();
writer.append("foobar:").append(new String(hasher.hash(new SecureString("barfoo"))));
}
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("Waited too long for the updated file to be picked up");
}
assertThat(store.userExists("foobar"), is(true));
result = store.verifyPassword("foobar", new SecureString("barfoo"), () -> user);
assertThat(result.getStatus(), is(AuthenticationResult.Status.SUCCESS));
assertThat(result.getUser(), is(user));
}
private RealmConfig getRealmConfig() {
@ -145,27 +145,26 @@ public class FileUserPasswdStoreTests extends ESTestCase {
Files.copy(users, testUsers, StandardCopyOption.REPLACE_EXISTING);
RealmConfig config = getRealmConfig();
ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool);
final CountDownLatch latch = new CountDownLatch(1);
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
final CountDownLatch latch = new CountDownLatch(1);
FileUserPasswdStore store = new FileUserPasswdStore(config, watcherService, latch::countDown);
//Test users share the hashing algorithm name for convenience
String username = settings.get("xpack.security.authc.password_hashing.algorithm");
User user = new User(username);
final AuthenticationResult result = store.verifyPassword(username, new SecureString("test123"), () -> user);
assertThat(result.getStatus(), is(AuthenticationResult.Status.SUCCESS));
assertThat(result.getUser(), is(user));
FileUserPasswdStore store = new FileUserPasswdStore(config, watcherService, latch::countDown);
//Test users share the hashing algorithm name for convenience
String username = settings.get("xpack.security.authc.password_hashing.algorithm");
User user = new User(username);
final AuthenticationResult result = store.verifyPassword(username, new SecureString("test123"), () -> user);
assertThat(result.getStatus(), is(AuthenticationResult.Status.SUCCESS));
assertThat(result.getUser(), is(user));
watcherService.start();
// now replacing the content of the users file with something that cannot be read
Files.write(testUsers, Collections.singletonList("aldlfkjldjdflkjd"), StandardCharsets.UTF_16);
// now replacing the content of the users file with something that cannot be read
Files.write(testUsers, Collections.singletonList("aldlfkjldjdflkjd"), StandardCharsets.UTF_16);
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("Waited too long for the updated file to be picked up");
}
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("Waited too long for the updated file to be picked up");
assertThat(store.usersCount(), is(0));
}
assertThat(store.usersCount(), is(0));
}
public void testParseFile() throws Exception {

View File

@ -76,9 +76,10 @@ public class FileUserRolesStoreTests extends ESTestCase {
RealmConfig.RealmIdentifier realmId = new RealmConfig.RealmIdentifier("file", "file-test");
RealmConfig config = new RealmConfig(realmId, settings, env, new ThreadContext(Settings.EMPTY));
ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool);
FileUserRolesStore store = new FileUserRolesStore(config, watcherService);
assertThat(store.entriesCount(), is(0));
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
FileUserRolesStore store = new FileUserRolesStore(config, watcherService);
assertThat(store.entriesCount(), is(0));
}
}
public void testStoreAutoReload() throws Exception {
@ -86,46 +87,44 @@ public class FileUserRolesStoreTests extends ESTestCase {
Path tmp = getUsersRolesPath();
Files.copy(users, tmp, StandardCopyOption.REPLACE_EXISTING);
final RealmConfig.RealmIdentifier realmId = new RealmConfig.RealmIdentifier("file", "file-test");
RealmConfig config = new RealmConfig(realmId, settings, env, new ThreadContext(Settings.EMPTY));
ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool);
final CountDownLatch latch = new CountDownLatch(1);
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
final CountDownLatch latch = new CountDownLatch(1);
FileUserRolesStore store = new FileUserRolesStore(config, watcherService, latch::countDown);
FileUserRolesStore store = new FileUserRolesStore(config, watcherService, latch::countDown);
String[] roles = store.roles("user1");
assertThat(roles, notNullValue());
assertThat(roles.length, is(3));
assertThat(roles, arrayContaining("role1", "role2", "role3"));
assertThat(store.roles("user4"), equalTo(Strings.EMPTY_ARRAY));
String[] roles = store.roles("user1");
assertThat(roles, notNullValue());
assertThat(roles.length, is(3));
assertThat(roles, arrayContaining("role1", "role2", "role3"));
assertThat(store.roles("user4"), equalTo(Strings.EMPTY_ARRAY));
watcherService.start();
try (BufferedWriter writer = Files.newBufferedWriter(tmp, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.append("\n");
}
try (BufferedWriter writer = Files.newBufferedWriter(tmp, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.append("\n");
watcherService.notifyNow(ResourceWatcherService.Frequency.HIGH);
if (latch.getCount() != 1) {
fail("Listener should not be called as users roles are not changed.");
}
assertThat(store.roles("user1"), arrayContaining("role1", "role2", "role3"));
try (BufferedWriter writer = Files.newBufferedWriter(tmp, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.newLine();
writer.append("role4:user4\nrole5:user4\n");
}
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("Waited too long for the updated file to be picked up");
}
roles = store.roles("user4");
assertThat(roles, notNullValue());
assertThat(roles.length, is(2));
assertThat(roles, arrayContaining("role4", "role5"));
}
watcherService.notifyNow(ResourceWatcherService.Frequency.HIGH);
if (latch.getCount() != 1) {
fail("Listener should not be called as users roles are not changed.");
}
assertThat(store.roles("user1"), arrayContaining("role1", "role2", "role3"));
try (BufferedWriter writer = Files.newBufferedWriter(tmp, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.newLine();
writer.append("role4:user4\nrole5:user4\n");
}
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("Waited too long for the updated file to be picked up");
}
roles = store.roles("user4");
assertThat(roles, notNullValue());
assertThat(roles.length, is(2));
assertThat(roles, arrayContaining("role4", "role5"));
}
public void testStoreAutoReloadWithParseFailure() throws Exception {
@ -135,27 +134,26 @@ public class FileUserRolesStoreTests extends ESTestCase {
final RealmConfig.RealmIdentifier realmId = new RealmConfig.RealmIdentifier("file", "file-test");
RealmConfig config = new RealmConfig(realmId, settings, env, new ThreadContext(Settings.EMPTY));
ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool);
final CountDownLatch latch = new CountDownLatch(1);
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
final CountDownLatch latch = new CountDownLatch(1);
FileUserRolesStore store = new FileUserRolesStore(config, watcherService, latch::countDown);
FileUserRolesStore store = new FileUserRolesStore(config, watcherService, latch::countDown);
String[] roles = store.roles("user1");
assertThat(roles, notNullValue());
assertThat(roles.length, is(3));
assertThat(roles, arrayContaining("role1", "role2", "role3"));
assertThat(store.roles("user4"), equalTo(Strings.EMPTY_ARRAY));
String[] roles = store.roles("user1");
assertThat(roles, notNullValue());
assertThat(roles.length, is(3));
assertThat(roles, arrayContaining("role1", "role2", "role3"));
assertThat(store.roles("user4"), equalTo(Strings.EMPTY_ARRAY));
watcherService.start();
// now replacing the content of the users file with something that cannot be read
Files.write(tmp, Collections.singletonList("aldlfkjldjdflkjd"), StandardCharsets.UTF_16);
// now replacing the content of the users file with something that cannot be read
Files.write(tmp, Collections.singletonList("aldlfkjldjdflkjd"), StandardCharsets.UTF_16);
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("Waited too long for the updated file to be picked up");
}
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("Waited too long for the updated file to be picked up");
assertThat(store.entriesCount(), is(0));
}
assertThat(store.entriesCount(), is(0));
}
public void testParseFile() throws Exception {
@ -225,9 +223,10 @@ public class FileUserRolesStoreTests extends ESTestCase {
Environment env = TestEnvironment.newEnvironment(settings);
final RealmConfig.RealmIdentifier realmId = new RealmConfig.RealmIdentifier("file", "file-test");
RealmConfig config = new RealmConfig(realmId, settings, env, new ThreadContext(Settings.EMPTY));
ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool);
FileUserRolesStore store = new FileUserRolesStore(config, watcherService);
assertThat(store.roles("user"), equalTo(Strings.EMPTY_ARRAY));
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
FileUserRolesStore store = new FileUserRolesStore(config, watcherService);
assertThat(store.roles("user"), equalTo(Strings.EMPTY_ARRAY));
}
} finally {
terminate(threadPool);
}

View File

@ -88,7 +88,7 @@ public abstract class KerberosRealmTestCase extends ESTestCase {
@After
public void shutdown() throws InterruptedException {
resourceWatcherService.stop();
resourceWatcherService.close();
terminate(threadPool);
}

View File

@ -154,7 +154,7 @@ public class ActiveDirectoryRealmTests extends ESTestCase {
@After
public void stop() throws InterruptedException {
resourceWatcherService.stop();
resourceWatcherService.close();
terminate(threadPool);
for (int i = 0; i < numberOfLdapServers; i++) {
directoryServers[i].shutDown(true);

View File

@ -111,7 +111,7 @@ public class LdapRealmTests extends LdapTestCase {
@After
public void shutdown() throws InterruptedException {
resourceWatcherService.stop();
resourceWatcherService.close();
terminate(threadPool);
}

View File

@ -272,24 +272,25 @@ public class LdapSessionFactoryTests extends LdapTestCase {
String user = "Horatio Hornblower";
SecureString userPass = new SecureString("pass");
final ResourceWatcherService resourceWatcher = new ResourceWatcherService(settings, threadPool);
new SSLConfigurationReloader(environment, sslService, resourceWatcher);
try (ResourceWatcherService resourceWatcher = new ResourceWatcherService(settings, threadPool)) {
new SSLConfigurationReloader(environment, sslService, resourceWatcher);
Files.copy(fakeCa, ldapCaPath, StandardCopyOption.REPLACE_EXISTING);
resourceWatcher.notifyNow(ResourceWatcherService.Frequency.HIGH);
Files.copy(fakeCa, ldapCaPath, StandardCopyOption.REPLACE_EXISTING);
resourceWatcher.notifyNow(ResourceWatcherService.Frequency.HIGH);
UncategorizedExecutionException e =
expectThrows(UncategorizedExecutionException.class, () -> session(sessionFactory, user, userPass));
assertThat(e.getCause(), instanceOf(ExecutionException.class));
assertThat(e.getCause().getCause(), instanceOf(LDAPException.class));
assertThat(e.getCause().getCause().getMessage(), containsString("SSLPeerUnverifiedException"));
UncategorizedExecutionException e =
expectThrows(UncategorizedExecutionException.class, () -> session(sessionFactory, user, userPass));
assertThat(e.getCause(), instanceOf(ExecutionException.class));
assertThat(e.getCause().getCause(), instanceOf(LDAPException.class));
assertThat(e.getCause().getCause().getMessage(), containsString("SSLPeerUnverifiedException"));
Files.copy(realCa, ldapCaPath, StandardCopyOption.REPLACE_EXISTING);
resourceWatcher.notifyNow(ResourceWatcherService.Frequency.HIGH);
Files.copy(realCa, ldapCaPath, StandardCopyOption.REPLACE_EXISTING);
resourceWatcher.notifyNow(ResourceWatcherService.Frequency.HIGH);
final LdapSession session = session(sessionFactory, user, userPass);
assertThat(session.userDn(), is("cn=Horatio Hornblower,ou=people,o=sevenSeas"));
final LdapSession session = session(sessionFactory, user, userPass);
assertThat(session.userDn(), is("cn=Horatio Hornblower,ou=people,o=sevenSeas"));
session.close();
session.close();
}
}
}

View File

@ -89,9 +89,10 @@ public class DnRoleMapperTests extends ESTestCase {
// writing in utf_16 should cause a parsing error as we try to read the file in utf_8
Files.write(file, Collections.singletonList("aldlfkjldjdflkjd"), StandardCharsets.UTF_16);
ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool);
DnRoleMapper mapper = createMapper(file, watcherService);
assertThat(mapper.mappingsCount(), is(0));
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
DnRoleMapper mapper = createMapper(file, watcherService);
assertThat(mapper.mappingsCount(), is(0));
}
}
public void testMapper_AutoReload() throws Exception {
@ -101,43 +102,42 @@ public class DnRoleMapperTests extends ESTestCase {
final CountDownLatch latch = new CountDownLatch(1);
ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool);
DnRoleMapper mapper = createMapper(file, watcherService);
mapper.addListener(latch::countDown);
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
DnRoleMapper mapper = createMapper(file, watcherService);
mapper.addListener(latch::countDown);
Set<String> roles = mapper.resolveRoles("", Collections.singletonList("cn=shield,ou=marvel,o=superheros"));
assertThat(roles, notNullValue());
assertThat(roles.size(), is(1));
assertThat(roles, contains("security"));
Set<String> roles = mapper.resolveRoles("", Collections.singletonList("cn=shield,ou=marvel,o=superheros"));
assertThat(roles, notNullValue());
assertThat(roles.size(), is(1));
assertThat(roles, contains("security"));
watcherService.start();
try (BufferedWriter writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.append("\n");
}
try (BufferedWriter writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.append("\n");
}
watcherService.notifyNow(ResourceWatcherService.Frequency.HIGH);
if (latch.getCount() != 1) {
fail("Listener should not be called as roles mapping is not changed.");
}
watcherService.notifyNow(ResourceWatcherService.Frequency.HIGH);
if (latch.getCount() != 1) {
fail("Listener should not be called as roles mapping is not changed.");
}
roles = mapper.resolveRoles("", Collections.singletonList("cn=shield,ou=marvel,o=superheros"));
assertThat(roles, contains("security"));
roles = mapper.resolveRoles("", Collections.singletonList("cn=shield,ou=marvel,o=superheros"));
assertThat(roles, contains("security"));
try (BufferedWriter writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.newLine();
writer.append("fantastic_four:\n")
try (BufferedWriter writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.newLine();
writer.append("fantastic_four:\n")
.append(" - \"cn=fantastic_four,ou=marvel,o=superheros\"");
}
}
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("Waited too long for the updated file to be picked up");
}
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("Waited too long for the updated file to be picked up");
}
roles = mapper.resolveRoles("", Collections.singletonList("cn=fantastic_four,ou=marvel,o=superheros"));
assertThat(roles, notNullValue());
assertThat(roles.size(), is(1));
assertThat(roles, contains("fantastic_four"));
roles = mapper.resolveRoles("", Collections.singletonList("cn=fantastic_four,ou=marvel,o=superheros"));
assertThat(roles, notNullValue());
assertThat(roles.size(), is(1));
assertThat(roles, contains("fantastic_four"));
}
}
public void testMapper_AutoReload_WithParseFailures() throws Exception {
@ -147,25 +147,24 @@ public class DnRoleMapperTests extends ESTestCase {
final CountDownLatch latch = new CountDownLatch(1);
ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool);
DnRoleMapper mapper = createMapper(file, watcherService);
mapper.addListener(latch::countDown);
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
DnRoleMapper mapper = createMapper(file, watcherService);
mapper.addListener(latch::countDown);
Set<String> roles = mapper.resolveRoles("", Collections.singletonList("cn=shield,ou=marvel,o=superheros"));
assertThat(roles, notNullValue());
assertThat(roles.size(), is(1));
assertThat(roles, contains("security"));
Set<String> roles = mapper.resolveRoles("", Collections.singletonList("cn=shield,ou=marvel,o=superheros"));
assertThat(roles, notNullValue());
assertThat(roles.size(), is(1));
assertThat(roles, contains("security"));
watcherService.start();
// now replacing the content of the users file with something that cannot be read
Files.write(file, Collections.singletonList("aldlfkjldjdflkjd"), StandardCharsets.UTF_16);
// now replacing the content of the users file with something that cannot be read
Files.write(file, Collections.singletonList("aldlfkjldjdflkjd"), StandardCharsets.UTF_16);
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("Waited too long for the updated file to be picked up");
}
if (!latch.await(5, TimeUnit.SECONDS)) {
fail("Waited too long for the updated file to be picked up");
assertThat(mapper.mappingsCount(), is(0));
}
assertThat(mapper.mappingsCount(), is(0));
}
public void testMapperAutoReloadWithoutListener() throws Exception {
@ -173,37 +172,39 @@ public class DnRoleMapperTests extends ESTestCase {
Path file = env.configFile().resolve("test_role_mapping.yml");
Files.copy(roleMappingFile, file, StandardCopyOption.REPLACE_EXISTING);
ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool);
DnRoleMapper mapper = createMapper(file, watcherService);
Set<String> roles = mapper.resolveRoles("", Collections.singletonList("cn=shield,ou=marvel,o=superheros"));
assertThat(roles, notNullValue());
assertThat(roles.size(), is(1));
assertThat(roles, contains("security"));
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
DnRoleMapper mapper = createMapper(file, watcherService);
Set<String> roles = mapper.resolveRoles("", Collections.singletonList("cn=shield,ou=marvel,o=superheros"));
assertThat(roles, notNullValue());
assertThat(roles.size(), is(1));
assertThat(roles, contains("security"));
watcherService.start();
try (BufferedWriter writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.newLine();
writer.append("fantastic_four:\n")
try (BufferedWriter writer = Files.newBufferedWriter(file, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.newLine();
writer.append("fantastic_four:\n")
.append(" - \"cn=fantastic_four,ou=marvel,o=superheros\"");
}
}
assertBusy(() -> {
Set<String> resolvedRoles = mapper.resolveRoles("",
Collections.singletonList("cn=fantastic_four,ou=marvel,o=superheros"));
assertThat(resolvedRoles, notNullValue());
assertThat(resolvedRoles.size(), is(1));
assertThat(resolvedRoles, contains("fantastic_four"));
}, 2L, TimeUnit.SECONDS);
assertBusy(() -> {
Set<String> resolvedRoles = mapper.resolveRoles(
"",
Collections.singletonList("cn=fantastic_four,ou=marvel,o=superheros")
);
assertThat(resolvedRoles, notNullValue());
assertThat(resolvedRoles.size(), is(1));
assertThat(resolvedRoles, contains("fantastic_four"));
}, 2L, TimeUnit.SECONDS);
}
}
public void testAddNullListener() throws Exception {
Path file = env.configFile().resolve("test_role_mapping.yml");
Files.write(file, Collections.singleton(""));
ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool);
DnRoleMapper mapper = createMapper(file, watcherService);
NullPointerException e = expectThrows(NullPointerException.class, () -> mapper.addListener(null));
assertEquals("listener cannot be null", e.getMessage());
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
DnRoleMapper mapper = createMapper(file, watcherService);
NullPointerException e = expectThrows(NullPointerException.class, () -> mapper.addListener(null));
assertEquals("listener cannot be null", e.getMessage());
}
}
public void testParseFile() throws Exception {
@ -300,12 +301,14 @@ public class DnRoleMapperTests extends ESTestCase {
RealmConfig config = new RealmConfig(realmIdentifier, ldapSettings,
TestEnvironment.newEnvironment(settings), new ThreadContext(Settings.EMPTY));
DnRoleMapper mapper = new DnRoleMapper(config, new ResourceWatcherService(settings, threadPool));
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
DnRoleMapper mapper = new DnRoleMapper(config, watcherService);
Set<String> roles = mapper.resolveRoles("", Arrays.asList(STARK_GROUP_DNS));
Set<String> roles = mapper.resolveRoles("", Arrays.asList(STARK_GROUP_DNS));
//verify
assertThat(roles, hasItems("security", "avenger"));
//verify
assertThat(roles, hasItems("security", "avenger"));
}
}
public void testRelativeDN() {
@ -317,10 +320,12 @@ public class DnRoleMapperTests extends ESTestCase {
RealmConfig config = new RealmConfig(realmIdentifier, ldapSettings,
TestEnvironment.newEnvironment(settings), new ThreadContext(Settings.EMPTY));
DnRoleMapper mapper = new DnRoleMapper(config, new ResourceWatcherService(settings, threadPool));
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
DnRoleMapper mapper = new DnRoleMapper(config, watcherService);
Set<String> roles = mapper.resolveRoles("", Arrays.asList(STARK_GROUP_DNS));
assertThat(roles, hasItems("genius", "billionaire", "playboy", "philanthropist", "shield", "avengers"));
Set<String> roles = mapper.resolveRoles("", Arrays.asList(STARK_GROUP_DNS));
assertThat(roles, hasItems("genius", "billionaire", "playboy", "philanthropist", "shield", "avengers"));
}
}
public void testUserDNMapping() throws Exception {
@ -334,10 +339,12 @@ public class DnRoleMapperTests extends ESTestCase {
RealmConfig config = new RealmConfig(realmIdentifier, ldapSettings,
TestEnvironment.newEnvironment(settings), new ThreadContext(Settings.EMPTY));
DnRoleMapper mapper = new DnRoleMapper(config, new ResourceWatcherService(settings, threadPool));
try (ResourceWatcherService watcherService = new ResourceWatcherService(settings, threadPool)) {
DnRoleMapper mapper = new DnRoleMapper(config, watcherService);
Set<String> roles = mapper.resolveRoles("cn=Horatio Hornblower,ou=people,o=sevenSeas", Collections.emptyList());
assertThat(roles, hasItem("avenger"));
Set<String> roles = mapper.resolveRoles("cn=Horatio Hornblower,ou=people,o=sevenSeas", Collections.emptyList());
assertThat(roles, hasItem("avenger"));
}
}
protected DnRoleMapper createMapper(Path file, ResourceWatcherService watcherService) {

View File

@ -352,8 +352,6 @@ public class FileRolesStoreTests extends ESTestCase {
assertThat(descriptors, notNullValue());
assertTrue(descriptors.isEmpty());
watcherService.start();
try (BufferedWriter writer = Files.newBufferedWriter(tmp, StandardCharsets.UTF_8, StandardOpenOption.APPEND)) {
writer.append("\n");
}
@ -438,7 +436,7 @@ public class FileRolesStoreTests extends ESTestCase {
assertEquals(1, descriptors.size());
} finally {
if (watcherService != null) {
watcherService.stop();
watcherService.close();
}
terminate(threadPool);
}