Cancel LDAP runnables if they are pending execution too long (elastic/x-pack-elasticsearch#727)

The LdapRealm submits runnables to the generic thread pool when a bind is necessary as a bind must
be a synchronous operation and we do not want to block network threads on this. However, the
generic threadpool could be full and this runnable could get queued. When this happens requests
appear to hang; to prevent this we submit a delayed executable that will stop the runnable from
attempting to connect to Ldap and provide an exceptional response to the listener.

relates elastic/x-pack-elasticsearch#716

Original commit: elastic/x-pack-elasticsearch@3e43b17f1d
This commit is contained in:
Jay Modi 2017-03-22 05:27:46 -07:00 committed by GitHub
parent 8ba6e8b3eb
commit 85aedb6776
2 changed files with 185 additions and 5 deletions

View File

@ -14,10 +14,15 @@ import com.unboundid.ldap.sdk.LDAPException;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.threadpool.ThreadPool.Names;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.security.authc.RealmConfig; import org.elasticsearch.xpack.security.authc.RealmConfig;
import org.elasticsearch.xpack.security.authc.ldap.support.LdapLoadBalancing; import org.elasticsearch.xpack.security.authc.ldap.support.LdapLoadBalancing;
@ -38,10 +43,13 @@ public final class LdapRealm extends CachingUsernamePasswordRealm {
public static final String LDAP_TYPE = "ldap"; public static final String LDAP_TYPE = "ldap";
public static final String AD_TYPE = "active_directory"; public static final String AD_TYPE = "active_directory";
static final Setting<TimeValue> EXECUTION_TIMEOUT =
Setting.timeSetting("timeout.execution", TimeValue.timeValueSeconds(30L), Property.NodeScope);
private final SessionFactory sessionFactory; private final SessionFactory sessionFactory;
private final DnRoleMapper roleMapper; private final DnRoleMapper roleMapper;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final TimeValue executionTimeout;
public LdapRealm(String type, RealmConfig config, ResourceWatcherService watcherService, SSLService sslService, public LdapRealm(String type, RealmConfig config, ResourceWatcherService watcherService, SSLService sslService,
ThreadPool threadPool) throws LDAPException { ThreadPool threadPool) throws LDAPException {
@ -54,6 +62,7 @@ public final class LdapRealm extends CachingUsernamePasswordRealm {
this.sessionFactory = sessionFactory; this.sessionFactory = sessionFactory;
this.roleMapper = roleMapper; this.roleMapper = roleMapper;
this.threadPool = threadPool; this.threadPool = threadPool;
this.executionTimeout = EXECUTION_TIMEOUT.get(config.settings());
roleMapper.addListener(this::expireAll); roleMapper.addListener(this::expireAll);
} }
@ -99,6 +108,7 @@ public final class LdapRealm extends CachingUsernamePasswordRealm {
Set<Setting<?>> settings = new HashSet<>(); Set<Setting<?>> settings = new HashSet<>();
settings.addAll(CachingUsernamePasswordRealm.getCachingSettings()); settings.addAll(CachingUsernamePasswordRealm.getCachingSettings());
DnRoleMapper.getSettings(settings); DnRoleMapper.getSettings(settings);
settings.add(EXECUTION_TIMEOUT);
if (AD_TYPE.equals(type)) { if (AD_TYPE.equals(type)) {
settings.addAll(ActiveDirectorySessionFactory.getSettings()); settings.addAll(ActiveDirectorySessionFactory.getSettings());
} else { } else {
@ -117,8 +127,11 @@ public final class LdapRealm extends CachingUsernamePasswordRealm {
protected void doAuthenticate(UsernamePasswordToken token, ActionListener<User> listener) { protected void doAuthenticate(UsernamePasswordToken token, ActionListener<User> listener) {
// we submit to the threadpool because authentication using LDAP will execute blocking I/O for a bind request and we don't want // we submit to the threadpool because authentication using LDAP will execute blocking I/O for a bind request and we don't want
// network threads stuck waiting for a socket to connect. After the bind, then all interaction with LDAP should be async // network threads stuck waiting for a socket to connect. After the bind, then all interaction with LDAP should be async
threadPool.generic().execute(() -> sessionFactory.session(token.principal(), token.credentials(), final CancellableLdapRunnable cancellableLdapRunnable = new CancellableLdapRunnable(listener,
new LdapSessionActionListener("authenticate", token.principal(), listener, roleMapper, logger))); () -> sessionFactory.session(token.principal(), token.credentials(),
new LdapSessionActionListener("authenticate", token.principal(), listener, roleMapper, logger)), logger);
threadPool.generic().execute(cancellableLdapRunnable);
threadPool.schedule(executionTimeout, Names.SAME, cancellableLdapRunnable::maybeTimeout);
} }
@Override @Override
@ -126,9 +139,11 @@ public final class LdapRealm extends CachingUsernamePasswordRealm {
if (sessionFactory.supportsUnauthenticatedSession()) { if (sessionFactory.supportsUnauthenticatedSession()) {
// we submit to the threadpool because authentication using LDAP will execute blocking I/O for a bind request and we don't want // we submit to the threadpool because authentication using LDAP will execute blocking I/O for a bind request and we don't want
// network threads stuck waiting for a socket to connect. After the bind, then all interaction with LDAP should be async // network threads stuck waiting for a socket to connect. After the bind, then all interaction with LDAP should be async
threadPool.generic().execute(() -> final CancellableLdapRunnable cancellableLdapRunnable = new CancellableLdapRunnable(listener,
sessionFactory.unauthenticatedSession(username, () -> sessionFactory.unauthenticatedSession(username,
new LdapSessionActionListener("lookup", username, listener, roleMapper, logger))); new LdapSessionActionListener("lookup", username, listener, roleMapper, logger)), logger);
threadPool.generic().execute(cancellableLdapRunnable);
threadPool.schedule(executionTimeout, Names.SAME, cancellableLdapRunnable::maybeTimeout);
} else { } else {
listener.onResponse(null); listener.onResponse(null);
} }
@ -214,4 +229,64 @@ public final class LdapRealm extends CachingUsernamePasswordRealm {
} }
} }
/**
* A runnable that allows us to terminate and call the listener. We use this as a runnable can
* be queued and not executed for a long time or ever and this causes user requests to appear
* to hang. In these cases at least we can provide a response.
*/
static class CancellableLdapRunnable extends AbstractRunnable {
private final Runnable in;
private final ActionListener<User> listener;
private final Logger logger;
private final AtomicReference<LdapRunnableState> state = new AtomicReference<>(LdapRunnableState.AWAITING_EXECUTION);
CancellableLdapRunnable(ActionListener<User> listener, Runnable in, Logger logger) {
this.listener = listener;
this.in = in;
this.logger = logger;
}
@Override
public void onFailure(Exception e) {
logger.error("execution of ldap runnable failed", e);
// this is really a exceptional state but just call the listener and maybe another realm can authenticate, otherwise
// something as simple as a down ldap server/network error takes down auth
listener.onResponse(null);
}
@Override
protected void doRun() throws Exception {
if (state.compareAndSet(LdapRunnableState.AWAITING_EXECUTION, LdapRunnableState.EXECUTING)) {
in.run();
} else {
logger.trace("skipping execution of ldap runnable as the current state is [{}]", state.get());
}
}
@Override
public void onRejection(Exception e) {
listener.onFailure(e);
}
/**
* If the execution of this runnable has not already started, the runnable is cancelled and we pass an exception to the user
* listener
*/
void maybeTimeout() {
if (state.compareAndSet(LdapRunnableState.AWAITING_EXECUTION, LdapRunnableState.TIMED_OUT)) {
logger.warn("skipping execution of ldap runnable as it has been waiting for " +
"execution too long");
listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for " +
"execution of ldap runnable"));
}
}
enum LdapRunnableState {
AWAITING_EXECUTION,
EXECUTING,
TIMED_OUT
}
}
} }

View File

@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.security.authc.ldap;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.security.authc.ldap.LdapRealm.CancellableLdapRunnable;
import org.elasticsearch.xpack.security.user.User;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.sameInstance;
public class CancellableLdapRunnableTests extends ESTestCase {
public void testTimingOutARunnable() {
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
final CancellableLdapRunnable runnable =
new CancellableLdapRunnable(ActionListener.wrap(user -> {
throw new AssertionError("onResponse should not be called");
}, exceptionAtomicReference::set), () -> {
throw new AssertionError("runnable should not be executed");
}, logger);
runnable.maybeTimeout();
runnable.run();
assertNotNull(exceptionAtomicReference.get());
assertThat(exceptionAtomicReference.get(), instanceOf(ElasticsearchTimeoutException.class));
assertThat(exceptionAtomicReference.get().getMessage(),
containsString("timed out waiting for execution"));
}
public void testCallTimeOutAfterRunning() {
final AtomicBoolean ran = new AtomicBoolean(false);
final AtomicBoolean listenerCalled = new AtomicBoolean(false);
final CancellableLdapRunnable runnable =
new CancellableLdapRunnable(ActionListener.wrap(user -> {
listenerCalled.set(true);
throw new AssertionError("onResponse should not be called");
}, e -> {
listenerCalled.set(true);
throw new AssertionError("onFailure should not be called");
}), () -> ran.set(ran.get() == false), logger);
runnable.run();
assertTrue(ran.get());
runnable.maybeTimeout();
assertTrue(ran.get());
// the listener shouldn't have ever been called. If it was, then either something called
// onResponse or onFailure was called as part of the timeout
assertFalse(listenerCalled.get());
}
public void testRejectingExecution() {
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
final CancellableLdapRunnable runnable =
new CancellableLdapRunnable(ActionListener.wrap(user -> {
throw new AssertionError("onResponse should not be called");
}, exceptionAtomicReference::set), () -> {
throw new AssertionError("runnable should not be executed");
}, logger);
final Exception e = new RuntimeException("foo");
runnable.onRejection(e);
assertNotNull(exceptionAtomicReference.get());
assertThat(exceptionAtomicReference.get(), sameInstance(e));
}
public void testTimeoutDuringExecution() throws InterruptedException {
final CountDownLatch listenerCalledLatch = new CountDownLatch(1);
final CountDownLatch timeoutCalledLatch = new CountDownLatch(1);
final CountDownLatch runningLatch = new CountDownLatch(1);
final ActionListener<User> listener = ActionListener.wrap(user -> {
listenerCalledLatch.countDown();
}, e -> {
throw new AssertionError("onFailure should not be executed");
});
final CancellableLdapRunnable runnable = new CancellableLdapRunnable(listener, () -> {
runningLatch.countDown();
try {
timeoutCalledLatch.await();
listener.onResponse(null);
} catch (InterruptedException e) {
throw new AssertionError("don't interrupt me", e);
}
}, logger);
Thread t = new Thread(runnable);
t.start();
runningLatch.await();
runnable.maybeTimeout();
timeoutCalledLatch.countDown();
listenerCalledLatch.await();
t.join();
}
}