add tests for concurrent user lookup
This commit adds tests to ensure that user lookup in caching realms works as expected. An unclear contract in the Cache#computeIfAbsent method allowed for null values to be returned from this method even if there should have been exception reported to the loader. This has been fixed in the cache implementation and we add tests to verify that the caching of user lookups is done properly under concurrent operations. Closes elastic/elasticsearch#4054 Original commit: elastic/x-pack-elasticsearch@41567c6ed9
This commit is contained in:
parent
6170f3d22c
commit
7873bb73c4
|
@ -161,6 +161,8 @@ public abstract class CachingUsernamePasswordRealm extends UsernamePasswordRealm
|
|||
|
||||
try {
|
||||
UserWithHash userWithHash = cache.computeIfAbsent(username, callback);
|
||||
assert userWithHash != null : "the cache contract requires that a value returned from computeIfAbsent be non-null or an " +
|
||||
"ExecutionException should be thrown";
|
||||
return userWithHash.user;
|
||||
} catch (ExecutionException ee) {
|
||||
if (ee.getCause() instanceof ElasticsearchSecurityException) {
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class RunAsIntegTests extends SecurityIntegTestCase {
|
||||
|
@ -75,9 +76,7 @@ public class RunAsIntegTests extends SecurityIntegTestCase {
|
|||
try (TransportClient client = getTransportClient(Settings.builder()
|
||||
.put(Security.USER_SETTING.getKey(), TRANSPORT_CLIENT_USER + ":" + SecuritySettingsSource.DEFAULT_PASSWORD).build())) {
|
||||
//ensure the client can connect
|
||||
awaitBusy(() -> {
|
||||
return client.connectedNodes().size() > 0;
|
||||
});
|
||||
assertBusy(() -> assertThat(client.connectedNodes().size(), greaterThan(0)));
|
||||
|
||||
// make sure the client can't get health
|
||||
try {
|
||||
|
|
|
@ -10,9 +10,11 @@ import org.elasticsearch.ElasticsearchSecurityException;
|
|||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.ValidationException;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.xpack.security.SecurityTemplateService;
|
||||
|
@ -48,6 +50,7 @@ import java.util.Arrays;
|
|||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
|
||||
|
@ -705,4 +708,47 @@ public class NativeRealmIntegTests extends NativeRealmIntegTestCase {
|
|||
.admin().cluster().prepareHealth().get();
|
||||
assertNoTimeout(response);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that multiple concurrent run as requests can be authenticated successfully. There was a bug in the Cache implementation used
|
||||
* for our internal realms that caused some run as requests to fail even when the authentication was valid and the run as user existed.
|
||||
*
|
||||
* The issue was that when iterating the realms there would be failed lookups and under heavy concurrency, requests will wait for an
|
||||
* existing load attempt in the cache. The original caller was thrown an ExecutionException with a nested NullPointerException since
|
||||
* the loader returned a null value, while the other caller(s) would get a null value unexpectedly
|
||||
*/
|
||||
public void testConcurrentRunAs() throws Exception {
|
||||
securityClient().preparePutUser("joe", "s3krit".toCharArray(), SecuritySettingsSource.DEFAULT_ROLE).get();
|
||||
securityClient().preparePutUser("executor", "s3krit".toCharArray(), "superuser").get();
|
||||
final String token = basicAuthHeaderValue("executor", new SecuredString("s3krit".toCharArray()));
|
||||
final Client client = client().filterWithHeader(MapBuilder.<String, String>newMapBuilder()
|
||||
.put("Authorization", token)
|
||||
.put("es-security-runas-user", "joe")
|
||||
.immutableMap());
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final int numberOfProcessors = Runtime.getRuntime().availableProcessors();
|
||||
final int numberOfThreads = scaledRandomIntBetween(numberOfProcessors, numberOfProcessors * 3);
|
||||
final int numberOfIterations = scaledRandomIntBetween(20, 100);
|
||||
List<Thread> threads = new ArrayList<>();
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
threads.add(new Thread(() -> {
|
||||
try {
|
||||
latch.await();
|
||||
for (int j = 0; j < numberOfIterations; j++) {
|
||||
ClusterHealthResponse response = client.admin().cluster().prepareHealth().get();
|
||||
assertNoTimeout(response);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
for (Thread thread : threads) {
|
||||
thread.start();
|
||||
}
|
||||
latch.countDown();
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -239,6 +239,59 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testUserLookupConcurrency() throws Exception {
|
||||
final String username = "username";
|
||||
|
||||
RealmConfig config = new RealmConfig("test_realm", Settings.EMPTY, globalSettings);
|
||||
final CachingUsernamePasswordRealm realm = new CachingUsernamePasswordRealm("test", config) {
|
||||
@Override
|
||||
protected User doAuthenticate(UsernamePasswordToken token) {
|
||||
throw new UnsupportedOperationException("authenticate should not be called!");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected User doLookupUser(String username) {
|
||||
return new User(username, new String[]{"r1", "r2", "r3"});
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean userLookupSupported() {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final int numberOfProcessors = Runtime.getRuntime().availableProcessors();
|
||||
final int numberOfThreads = scaledRandomIntBetween(numberOfProcessors, numberOfProcessors * 3);
|
||||
final int numberOfIterations = scaledRandomIntBetween(10000, 100000);
|
||||
List<Thread> threads = new ArrayList<>();
|
||||
for (int i = 0; i < numberOfThreads; i++) {
|
||||
threads.add(new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
latch.await();
|
||||
for (int i = 0; i < numberOfIterations; i++) {
|
||||
User user = realm.lookupUser(username);
|
||||
if (user == null) {
|
||||
throw new RuntimeException("failed to lookup user");
|
||||
}
|
||||
}
|
||||
|
||||
} catch (InterruptedException e) {}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
for (Thread thread : threads) {
|
||||
thread.start();
|
||||
}
|
||||
latch.countDown();
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
static class FailingAuthenticationRealm extends CachingUsernamePasswordRealm {
|
||||
|
||||
FailingAuthenticationRealm(Settings settings, Settings global) {
|
||||
|
|
|
@ -13,6 +13,10 @@ import org.elasticsearch.test.ESTestCase;
|
|||
import org.elasticsearch.xpack.security.authz.RoleDescriptor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.atomic.AtomicReferenceArray;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
||||
|
@ -238,4 +242,41 @@ public class FieldPermissionTests extends ESTestCase {
|
|||
// order should be preserved in any case
|
||||
assertEquals(readFieldPermissions, fieldPermissions);
|
||||
}
|
||||
|
||||
public void testFieldPermissionsHashCodeThreadSafe() throws Exception {
|
||||
final int numThreads = scaledRandomIntBetween(4, 16);
|
||||
final FieldPermissions fieldPermissions = randomBoolean() ?
|
||||
new FieldPermissions(new String[] { "*" }, new String[] { "foo" }) :
|
||||
FieldPermissions.merge(new FieldPermissions(new String[] { "f*" }, new String[] { "foo" }),
|
||||
new FieldPermissions(new String[] { "b*" }, new String[] { "bar" }));
|
||||
final CountDownLatch latch = new CountDownLatch(numThreads + 1);
|
||||
final AtomicReferenceArray<Integer> hashCodes = new AtomicReferenceArray<>(numThreads);
|
||||
List<Thread> threads = new ArrayList<>(numThreads);
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
final int threadNum = i;
|
||||
threads.add(new Thread(() -> {
|
||||
latch.countDown();
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
final int hashCode = fieldPermissions.hashCode();
|
||||
hashCodes.set(threadNum, hashCode);
|
||||
}));
|
||||
}
|
||||
|
||||
for (Thread thread : threads) {
|
||||
thread.start();
|
||||
}
|
||||
latch.countDown();
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
|
||||
final int hashCode = fieldPermissions.hashCode();
|
||||
for (int i = 0; i < numThreads; i++) {
|
||||
assertEquals((Integer) hashCode, hashCodes.get(i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue