diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetBitsetCache.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetBitsetCache.java index bb9276194e1..c07d3e3bded 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetBitsetCache.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetBitsetCache.java @@ -24,22 +24,28 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.cache.Cache; import org.elasticsearch.common.cache.CacheBuilder; import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.cache.RemovalNotification; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.ReleasableLock; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.threadpool.ThreadPool; import java.io.Closeable; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * This is a cache for {@link BitSet} instances that are used with the {@link DocumentSubsetReader}. @@ -66,17 +72,48 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen private static final BitSet NULL_MARKER = new FixedBitSet(0); private final Logger logger; + + /** + * When a {@link BitSet} is evicted from {@link #bitsetCache}, we need to also remove it from {@link #keysByIndex}. + * We use a {@link ReentrantReadWriteLock} to control atomicity here - the "read" side represents potential insertions to the + * {@link #bitsetCache}, the "write" side represents removals from {@link #keysByIndex}. + * The risk (that {@link Cache} does not provide protection for) is that an entry is removed from the cache, and then immediately + * re-populated, before we process the removal event. To protect against that we need to check the state of the {@link #bitsetCache} + * but we need exclusive ("write") access while performing that check and updating the values in {@link #keysByIndex}. + */ + private final ReleasableLock cacheEvictionLock; + private final ReleasableLock cacheModificationLock; + private final ExecutorService cleanupExecutor; + private final Cache bitsetCache; private final Map> keysByIndex; - public DocumentSubsetBitsetCache(Settings settings) { + public DocumentSubsetBitsetCache(Settings settings, ThreadPool threadPool) { + this(settings, threadPool.executor(ThreadPool.Names.GENERIC)); + } + + /** + * @param settings The global settings object for this node + * @param cleanupExecutor An executor on which the cache cleanup tasks can be run. Due to the way the cache is structured internally, + * it is sometimes necessary to run an asynchronous task to synchronize the internal state. + */ + protected DocumentSubsetBitsetCache(Settings settings, ExecutorService cleanupExecutor) { this.logger = LogManager.getLogger(getClass()); + + final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + this.cacheEvictionLock = new ReleasableLock(readWriteLock.writeLock()); + this.cacheModificationLock = new ReleasableLock(readWriteLock.readLock()); + this.cleanupExecutor = cleanupExecutor; + final TimeValue ttl = CACHE_TTL_SETTING.get(settings); final ByteSizeValue size = CACHE_SIZE_SETTING.get(settings); this.bitsetCache = CacheBuilder.builder() .setExpireAfterAccess(ttl) .setMaximumWeight(size.getBytes()) - .weigher((key, bitSet) -> bitSet == NULL_MARKER ? 0 : bitSet.ramBytesUsed()).build(); + .weigher((key, bitSet) -> bitSet == NULL_MARKER ? 0 : bitSet.ramBytesUsed()) + .removalListener(this::onCacheEviction) + .build(); + this.keysByIndex = new ConcurrentHashMap<>(); } @@ -90,6 +127,31 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen } } + /** + * Cleanup (synchronize) the internal state when an object is removed from the primary cache + */ + private void onCacheEviction(RemovalNotification notification) { + final BitsetCacheKey bitsetKey = notification.getKey(); + final IndexReader.CacheKey indexKey = bitsetKey.index; + if (keysByIndex.getOrDefault(indexKey, Collections.emptySet()).contains(bitsetKey) == false) { + // If the bitsetKey isn't in the lookup map, then there's nothing to synchronize + return; + } + // We push this to a background thread, so that it reduces the risk of blocking searches, but also so that the lock management is + // simpler - this callback is likely to take place on a thread that is actively adding something to the cache, and is therefore + // holding the read ("update") side of the lock. It is not possible to upgrade a read lock to a write ("eviction") lock, but we + // need to acquire that lock here. + cleanupExecutor.submit(() -> { + try (ReleasableLock ignored = cacheEvictionLock.acquire()) { + // it's possible for the key to be back in the cache if it was immediately repopulated after it was evicted, so check + if (bitsetCache.get(bitsetKey) == null) { + // key is no longer in the cache, make sure it is no longer in the lookup map either. + keysByIndex.getOrDefault(indexKey, Collections.emptySet()).remove(bitsetKey); + } + } + }); + } + @Override public void close() { clear("close"); @@ -98,7 +160,8 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen public void clear(String reason) { logger.debug("clearing all DLS bitsets because [{}]", reason); // Due to the order here, it is possible than a new entry could be added _after_ the keysByIndex map is cleared - // but _before_ the cache is cleared. This would mean it sits orphaned in keysByIndex, but this is not a issue. + // but _before_ the cache is cleared. This should get fixed up in the "onCacheEviction" callback, but if anything slips through + // and sits orphaned in keysByIndex, it will not be a significant issue. // When the index is closed, the key will be removed from the map, and there will not be a corresponding item // in the cache, which will make the cache-invalidate a no-op. // Since the entry is not in the cache, if #getBitSet is called, it will be loaded, and the new key will be added @@ -132,31 +195,33 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen final IndexReader.CacheKey indexKey = coreCacheHelper.getKey(); final BitsetCacheKey cacheKey = new BitsetCacheKey(indexKey, query); - final BitSet bitSet = bitsetCache.computeIfAbsent(cacheKey, ignore1 -> { - // This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees. - keysByIndex.compute(indexKey, (ignore2, set) -> { - if (set == null) { - set = Sets.newConcurrentHashSet(); + try (ReleasableLock ignored = cacheModificationLock.acquire()) { + final BitSet bitSet = bitsetCache.computeIfAbsent(cacheKey, ignore1 -> { + // This ensures all insertions into the set are guarded by ConcurrentHashMap's atomicity guarantees. + keysByIndex.compute(indexKey, (ignore2, set) -> { + if (set == null) { + set = Sets.newConcurrentHashSet(); + } + set.add(cacheKey); + return set; + }); + final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context); + final IndexSearcher searcher = new IndexSearcher(topLevelContext); + searcher.setQueryCache(null); + final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1f); + Scorer s = weight.scorer(context); + if (s == null) { + // A cache loader is not allowed to return null, return a marker object instead. + return NULL_MARKER; + } else { + return BitSet.of(s.iterator(), context.reader().maxDoc()); } - set.add(cacheKey); - return set; }); - final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context); - final IndexSearcher searcher = new IndexSearcher(topLevelContext); - searcher.setQueryCache(null); - final Weight weight = searcher.createWeight(searcher.rewrite(query), ScoreMode.COMPLETE_NO_SCORES, 1f); - Scorer s = weight.scorer(context); - if (s == null) { - // A cache loader is not allowed to return null, return a marker object instead. - return NULL_MARKER; + if (bitSet == NULL_MARKER) { + return null; } else { - return BitSet.of(s.iterator(), context.reader().maxDoc()); + return bitSet; } - }); - if (bitSet == NULL_MARKER) { - return null; - } else { - return bitSet; } } @@ -205,4 +270,27 @@ public final class DocumentSubsetBitsetCache implements IndexReader.ClosedListen return getClass().getSimpleName() + "(" + index + "," + query + ")"; } } + + /** + * This method verifies that the two internal data structures ({@link #bitsetCache} and {@link #keysByIndex}) are consistent with one + * another. This method is only called by tests. + */ + void verifyInternalConsistency() { + this.bitsetCache.keys().forEach(bck -> { + final Set set = this.keysByIndex.get(bck.index); + if (set == null) { + throw new IllegalStateException("Key [" + bck + "] is in the cache, but there is no entry for [" + bck.index + + "] in the lookup map"); + } + if (set.contains(bck) == false) { + throw new IllegalStateException("Key [" + bck + "] is in the cache, but the lookup entry for [" + bck.index + + "] does not contain that key"); + } + }); + this.keysByIndex.values().stream().flatMap(Set::stream).forEach(bck -> { + if (this.bitsetCache.get(bck) == null) { + throw new IllegalStateException("Key [" + bck + "] is in the lookup map, but is not in the cache"); + } + }); + } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetBitsetCacheTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetBitsetCacheTests.java index a50c39d4e6a..848c9438153 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetBitsetCacheTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetBitsetCacheTests.java @@ -21,6 +21,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.BitSet; import org.elasticsearch.client.Client; import org.elasticsearch.common.CheckedBiConsumer; +import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.index.IndexSettings; @@ -32,22 +33,51 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.IndexSettingsModule; import org.hamcrest.Matchers; +import org.junit.After; +import org.junit.Before; +import org.mockito.Mockito; +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class DocumentSubsetBitsetCacheTests extends ESTestCase { + private static final int FIELD_COUNT = 10; + private ExecutorService singleThreadExecutor; + + @Before + public void setUpExecutor() throws Exception { + singleThreadExecutor = Executors.newSingleThreadExecutor(); + } + + @After + public void cleanUpExecutor() throws Exception { + singleThreadExecutor.shutdown(); + } + public void testSameBitSetIsReturnedForIdenticalQuery() throws Exception { - final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache cache = newCache(Settings.EMPTY); runTestOnIndex((shardContext, leafContext) -> { final Query query1 = QueryBuilders.termQuery("field-1", "value-1").toQuery(shardContext); final BitSet bitSet1 = cache.getBitSet(query1, leafContext); @@ -62,7 +92,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase { } public void testNullBitSetIsReturnedForNonMatchingQuery() throws Exception { - final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache cache = newCache(Settings.EMPTY); runTestOnIndex((shardContext, leafContext) -> { final Query query = QueryBuilders.termQuery("does-not-exist", "any-value").toQuery(shardContext); final BitSet bitSet = cache.getBitSet(query, leafContext); @@ -71,7 +101,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase { } public void testNullEntriesAreNotCountedInMemoryUsage() throws Exception { - final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache cache = newCache(Settings.EMPTY); assertThat(cache.ramBytesUsed(), equalTo(0L)); runTestOnIndex((shardContext, leafContext) -> { @@ -95,7 +125,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase { final Settings settings = Settings.builder() .put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b") .build(); - final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings); + final DocumentSubsetBitsetCache cache = newCache(settings); assertThat(cache.entryCount(), equalTo(0)); assertThat(cache.ramBytesUsed(), equalTo(0L)); @@ -142,7 +172,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase { final Settings settings = Settings.builder() .put(DocumentSubsetBitsetCache.CACHE_TTL_SETTING.getKey(), "10ms") .build(); - final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings); + final DocumentSubsetBitsetCache cache = newCache(settings); assertThat(cache.entryCount(), equalTo(0)); assertThat(cache.ramBytesUsed(), equalTo(0L)); @@ -167,8 +197,131 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase { }); } + public void testIndexLookupIsClearedWhenBitSetIsEvicted() throws Exception { + // This value is based on the internal implementation details of lucene's FixedBitSet + // If the implementation changes, this can be safely updated to match the new ram usage for a single bitset + final long expectedBytesPerBitSet = 56; + + // Enough to hold slightly more than 1 bit-set in the cache + final long maxCacheBytes = expectedBytesPerBitSet + expectedBytesPerBitSet/2; + final Settings settings = Settings.builder() + .put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b") + .build(); + + final ExecutorService executor = mock(ExecutorService.class); + final AtomicReference runnableRef = new AtomicReference<>(); + when(executor.submit(any(Runnable.class))).thenAnswer(inv -> { + final Runnable r = (Runnable) inv.getArguments()[0]; + runnableRef.set(r); + return null; + }); + + final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings, executor); + assertThat(cache.entryCount(), equalTo(0)); + assertThat(cache.ramBytesUsed(), equalTo(0L)); + + runTestOnIndex((shardContext, leafContext) -> { + final Query query1 = QueryBuilders.termQuery("field-1", "value-1").toQuery(shardContext); + final BitSet bitSet1 = cache.getBitSet(query1, leafContext); + assertThat(bitSet1, notNullValue()); + + final Query query2 = QueryBuilders.termQuery("field-2", "value-2").toQuery(shardContext); + final BitSet bitSet2 = cache.getBitSet(query2, leafContext); + assertThat(bitSet2, notNullValue()); + + // BitSet1 has been evicted now, run the cleanup... + final Runnable runnable1 = runnableRef.get(); + assertThat(runnable1, notNullValue()); + runnable1.run(); + cache.verifyInternalConsistency(); + + // Check that the original bitset is no longer in the cache (a new instance is returned) + assertThat(cache.getBitSet(query1, leafContext), not(sameInstance(bitSet1))); + + // BitSet2 has been evicted now, run the cleanup... + final Runnable runnable2 = runnableRef.get(); + assertThat(runnable2, not(sameInstance(runnable1))); + runnable2.run(); + cache.verifyInternalConsistency(); + }); + } + public void testCacheUnderConcurrentAccess() throws Exception { + // This value is based on the internal implementation details of lucene's FixedBitSet + // If the implementation changes, this can be safely updated to match the new ram usage for a single bitset + final long expectedBytesPerBitSet = 56; + + final int concurrentThreads = randomIntBetween(5, 15); + final int numberOfIndices = randomIntBetween(3, 8); + + // Force cache evictions by setting the size to be less than the number of distinct queries we search on. + final int maxCacheCount = randomIntBetween(FIELD_COUNT / 2, FIELD_COUNT * 3 / 4); + final long maxCacheBytes = expectedBytesPerBitSet * maxCacheCount; + final Settings settings = Settings.builder() + .put(DocumentSubsetBitsetCache.CACHE_SIZE_SETTING.getKey(), maxCacheBytes + "b") + .build(); + + final ExecutorService threads = Executors.newFixedThreadPool(concurrentThreads + 1); + final ExecutorService cleanupExecutor = Mockito.mock(ExecutorService.class); + when(cleanupExecutor.submit(any(Runnable.class))).thenAnswer(inv -> { + final Runnable runnable = (Runnable) inv.getArguments()[0]; + return threads.submit(() -> { + // Sleep for a small (random) length of time. + // This increases the likelihood that cache could have been modified between the eviction & the cleanup + Thread.sleep(randomIntBetween(1, 10)); + runnable.run(); + return null; + }); + }); + try { + final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(settings, cleanupExecutor); + assertThat(cache.entryCount(), equalTo(0)); + assertThat(cache.ramBytesUsed(), equalTo(0L)); + + runTestOnIndices(numberOfIndices, contexts -> { + final CountDownLatch start = new CountDownLatch(concurrentThreads); + final CountDownLatch end = new CountDownLatch(concurrentThreads); + final Set uniqueBitSets = Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); + for (int thread = 0; thread < concurrentThreads; thread++) { + threads.submit(() -> { + start.countDown(); + start.await(100, TimeUnit.MILLISECONDS); + for (int loop = 0; loop < 15; loop++) { + for (int field = 1; field <= FIELD_COUNT; field++) { + final TermQueryBuilder queryBuilder = QueryBuilders.termQuery("field-" + field, "value-" + field); + final TestIndexContext randomContext = randomFrom(contexts); + final Query query = queryBuilder.toQuery(randomContext.queryShardContext); + final BitSet bitSet = cache.getBitSet(query, randomContext.leafReaderContext); + assertThat(bitSet, notNullValue()); + assertThat(bitSet.ramBytesUsed(), equalTo(expectedBytesPerBitSet)); + uniqueBitSets.add(bitSet); + } + } + end.countDown(); + return null; + }); + } + + assertTrue("Query threads did not complete in expected time", end.await(1, TimeUnit.SECONDS)); + + threads.shutdown(); + assertTrue("Cleanup thread did not complete in expected time", threads.awaitTermination(3, TimeUnit.SECONDS)); + cache.verifyInternalConsistency(); + + // Due to cache evictions, we must get more bitsets than fields + assertThat(uniqueBitSets.size(), Matchers.greaterThan(FIELD_COUNT)); + // Due to cache evictions, we must have seen more bitsets than the cache currently holds + assertThat(uniqueBitSets.size(), Matchers.greaterThan(cache.entryCount())); + // Even under concurrent pressure, the cache should hit the expected size + assertThat(cache.entryCount(), is(maxCacheCount)); + assertThat(cache.ramBytesUsed(), is(maxCacheBytes)); + }); + } finally { + threads.shutdown(); + } + } + public void testCacheIsPerIndex() throws Exception { - final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache cache = newCache(Settings.EMPTY); assertThat(cache.entryCount(), equalTo(0)); assertThat(cache.ramBytesUsed(), equalTo(0L)); @@ -196,7 +349,7 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase { } public void testCacheClearEntriesWhenIndexIsClosed() throws Exception { - final DocumentSubsetBitsetCache cache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache cache = newCache(Settings.EMPTY); assertThat(cache.entryCount(), equalTo(0)); assertThat(cache.ramBytesUsed(), equalTo(0L)); @@ -216,35 +369,106 @@ public class DocumentSubsetBitsetCacheTests extends ESTestCase { } private void runTestOnIndex(CheckedBiConsumer body) throws Exception { + runTestOnIndices(1, ctx -> { + final TestIndexContext indexContext = ctx.get(0); + body.accept(indexContext.queryShardContext, indexContext.leafReaderContext); + }); + } + + private static final class TestIndexContext implements Closeable { + private final Directory directory; + private final IndexWriter indexWriter; + private final DirectoryReader directoryReader; + private final QueryShardContext queryShardContext; + private final LeafReaderContext leafReaderContext; + + private TestIndexContext(Directory directory, IndexWriter indexWriter, DirectoryReader directoryReader, + QueryShardContext queryShardContext, LeafReaderContext leafReaderContext) { + this.directory = directory; + this.indexWriter = indexWriter; + this.directoryReader = directoryReader; + this.queryShardContext = queryShardContext; + this.leafReaderContext = leafReaderContext; + } + + @Override + public void close() throws IOException { + directoryReader.close(); + indexWriter.close(); + directory.close(); + } + } + + private TestIndexContext testIndex(MapperService mapperService, Client client) throws IOException { + TestIndexContext context = null; + + final long nowInMillis = randomNonNegativeLong(); final ShardId shardId = new ShardId("idx_" + randomAlphaOfLengthBetween(2, 8), randomAlphaOfLength(12), 0); final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(shardId.getIndex(), Settings.EMPTY); - final MapperService mapperService = mock(MapperService.class); - final long nowInMillis = randomNonNegativeLong(); - - final Client client = mock(Client.class); - when(client.settings()).thenReturn(Settings.EMPTY); - final IndexWriterConfig writerConfig = new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE); - try (Directory directory = newDirectory(); - IndexWriter iw = new IndexWriter(directory, writerConfig)) { + + Directory directory = null; + IndexWriter iw = null; + DirectoryReader directoryReader = null; + try { + directory = newDirectory(); + + iw = new IndexWriter(directory, writerConfig); for (int i = 1; i <= 100; i++) { Document document = new Document(); - for (int j = 1; j <= 10; j++) { + for (int j = 1; j <= FIELD_COUNT; j++) { document.add(new StringField("field-" + j, "value-" + i, Field.Store.NO)); } iw.addDocument(document); } iw.commit(); - try (DirectoryReader directoryReader = DirectoryReader.open(directory)) { - final LeafReaderContext leaf = directoryReader.leaves().get(0); + directoryReader = DirectoryReader.open(directory); + final LeafReaderContext leaf = directoryReader.leaves().get(0); - final QueryShardContext context = new QueryShardContext(shardId.id(), indexSettings, BigArrays.NON_RECYCLING_INSTANCE, - null, null, mapperService, null, null, xContentRegistry(), writableRegistry(), - client, new IndexSearcher(directoryReader), () -> nowInMillis, null, null); - body.accept(context, leaf); + final QueryShardContext shardContext = new QueryShardContext(shardId.id(), indexSettings, BigArrays.NON_RECYCLING_INSTANCE, + null, null, mapperService, null, null, xContentRegistry(), writableRegistry(), + client, new IndexSearcher(directoryReader), () -> nowInMillis, null, null); + + context = new TestIndexContext(directory, iw, directoryReader, shardContext, leaf); + return context; + } finally { + if (context == null) { + if (directoryReader != null) { + directoryReader.close(); + } + if (iw != null) { + iw.close(); + } + if (directory != null) { + directory.close(); + } } } } + private void runTestOnIndices(int numberIndices, CheckedConsumer, Exception> body) throws Exception { + final MapperService mapperService = mock(MapperService.class); + + final Client client = mock(Client.class); + when(client.settings()).thenReturn(Settings.EMPTY); + + final List context = new ArrayList<>(numberIndices); + try { + for (int i = 0; i < numberIndices; i++) { + context.add(testIndex(mapperService, client)); + } + + body.accept(context); + } finally { + for (TestIndexContext indexContext : context) { + indexContext.close(); + } + } + } + + private DocumentSubsetBitsetCache newCache(Settings settings) { + return new DocumentSubsetBitsetCache(settings, singleThreadExecutor); + } + } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetReaderTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetReaderTests.java index c84c0027302..3fae0c26ea7 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetReaderTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/DocumentSubsetReaderTests.java @@ -32,6 +32,7 @@ import org.junit.After; import org.junit.Before; import java.io.IOException; +import java.util.concurrent.Executors; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; @@ -50,7 +51,7 @@ public class DocumentSubsetReaderTests extends ESTestCase { assertTrue(DocumentSubsetReader.NUM_DOCS_CACHE.toString(), DocumentSubsetReader.NUM_DOCS_CACHE.isEmpty()); directory = newDirectory(); - bitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + bitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY, Executors.newSingleThreadExecutor()); } @After diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/SecurityIndexReaderWrapperIntegrationTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/SecurityIndexReaderWrapperIntegrationTests.java index ca2b38318a0..1f8ba704263 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/SecurityIndexReaderWrapperIntegrationTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/security/authz/accesscontrol/SecurityIndexReaderWrapperIntegrationTests.java @@ -48,6 +48,7 @@ import org.elasticsearch.xpack.core.security.user.User; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.Executors; import static java.util.Collections.singleton; import static java.util.Collections.singletonMap; @@ -80,7 +81,7 @@ public class SecurityIndexReaderWrapperIntegrationTests extends AbstractBuilderT null, null, mapperService, null, null, xContentRegistry(), writableRegistry(), client, null, () -> nowInMillis, null, null); QueryShardContext queryShardContext = spy(realQueryShardContext); - DocumentSubsetBitsetCache bitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + DocumentSubsetBitsetCache bitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY, Executors.newSingleThreadExecutor()); XPackLicenseState licenseState = mock(XPackLicenseState.class); when(licenseState.isDocumentAndFieldLevelSecurityAllowed()).thenReturn(true); @@ -202,7 +203,7 @@ public class SecurityIndexReaderWrapperIntegrationTests extends AbstractBuilderT null, null, mapperService, null, null, xContentRegistry(), writableRegistry(), client, null, () -> nowInMillis, null, null); QueryShardContext queryShardContext = spy(realQueryShardContext); - DocumentSubsetBitsetCache bitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + DocumentSubsetBitsetCache bitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY, Executors.newSingleThreadExecutor()); XPackLicenseState licenseState = mock(XPackLicenseState.class); when(licenseState.isDocumentAndFieldLevelSecurityAllowed()).thenReturn(true); diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java index a76d4f3a3f2..b1e7f8dae91 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java @@ -457,7 +457,7 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw final NativePrivilegeStore privilegeStore = new NativePrivilegeStore(settings, client, securityIndex.get()); components.add(privilegeStore); - dlsBitsetCache.set(new DocumentSubsetBitsetCache(settings)); + dlsBitsetCache.set(new DocumentSubsetBitsetCache(settings, threadPool)); final FieldPermissionsCache fieldPermissionsCache = new FieldPermissionsCache(settings); final FileRolesStore fileRolesStore = new FileRolesStore(settings, env, resourceWatcherService, getLicenseState(), xContentRegistry); diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java index e0ca082e3d0..82af02dacdb 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/store/CompositeRolesStoreTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.license.License.OperationMode; import org.elasticsearch.license.TestUtils.UpdatableLicenseState; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequest.Empty; import org.elasticsearch.xpack.core.XPackSettings; @@ -153,7 +154,7 @@ public class CompositeRolesStoreTests extends ESTestCase { when(fileRolesStore.roleDescriptors(Collections.singleton("fls_dls"))).thenReturn(Collections.singleton(flsDlsRole)); when(fileRolesStore.roleDescriptors(Collections.singleton("no_fls_dls"))).thenReturn(Collections.singleton(noFlsDlsRole)); final AtomicReference> effectiveRoleDescriptors = new AtomicReference>(); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); CompositeRolesStore compositeRolesStore = new CompositeRolesStore(Settings.EMPTY, fileRolesStore, nativeRolesStore, reservedRolesStore, mock(NativePrivilegeStore.class), Collections.emptyList(), new ThreadContext(Settings.EMPTY), licenseState, cache, mock(ApiKeyService.class), documentSubsetBitsetCache, @@ -229,7 +230,7 @@ public class CompositeRolesStoreTests extends ESTestCase { when(fileRolesStore.roleDescriptors(Collections.singleton("fls_dls"))).thenReturn(Collections.singleton(flsDlsRole)); when(fileRolesStore.roleDescriptors(Collections.singleton("no_fls_dls"))).thenReturn(Collections.singleton(noFlsDlsRole)); final AtomicReference> effectiveRoleDescriptors = new AtomicReference>(); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); CompositeRolesStore compositeRolesStore = new CompositeRolesStore(Settings.EMPTY, fileRolesStore, nativeRolesStore, reservedRolesStore, mock(NativePrivilegeStore.class), Collections.emptyList(), new ThreadContext(Settings.EMPTY), licenseState, cache, mock(ApiKeyService.class), documentSubsetBitsetCache, @@ -281,7 +282,7 @@ public class CompositeRolesStoreTests extends ESTestCase { }).when(nativePrivilegeStore).getPrivileges(isA(Set.class), isA(Set.class), any(ActionListener.class)); final AtomicReference> effectiveRoleDescriptors = new AtomicReference>(); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); final CompositeRolesStore compositeRolesStore = new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore, nativePrivilegeStore, Collections.emptyList(), new ThreadContext(SECURITY_ENABLED_SETTINGS), @@ -344,7 +345,7 @@ public class CompositeRolesStoreTests extends ESTestCase { .put("xpack.security.authz.store.roles.negative_lookup_cache.max_size", 0) .build(); final AtomicReference> effectiveRoleDescriptors = new AtomicReference>(); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); final CompositeRolesStore compositeRolesStore = new CompositeRolesStore(settings, fileRolesStore, nativeRolesStore, reservedRolesStore, mock(NativePrivilegeStore.class), Collections.emptyList(), new ThreadContext(settings), new XPackLicenseState(settings), cache, mock(ApiKeyService.class), documentSubsetBitsetCache, @@ -382,7 +383,7 @@ public class CompositeRolesStoreTests extends ESTestCase { final ReservedRolesStore reservedRolesStore = spy(new ReservedRolesStore()); final AtomicReference> effectiveRoleDescriptors = new AtomicReference>(); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); final CompositeRolesStore compositeRolesStore = new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore, mock(NativePrivilegeStore.class), Collections.emptyList(), new ThreadContext(SECURITY_ENABLED_SETTINGS), @@ -422,6 +423,10 @@ public class CompositeRolesStoreTests extends ESTestCase { verifyNoMoreInteractions(fileRolesStore, reservedRolesStore, nativeRolesStore); } + private DocumentSubsetBitsetCache buildBitsetCache() { + return new DocumentSubsetBitsetCache(Settings.EMPTY, mock(ThreadPool.class)); + } + public void testCustomRolesProviders() { final FileRolesStore fileRolesStore = mock(FileRolesStore.class); doCallRealMethod().when(fileRolesStore).accept(any(Set.class), any(ActionListener.class)); @@ -468,7 +473,7 @@ public class CompositeRolesStoreTests extends ESTestCase { })); final AtomicReference> effectiveRoleDescriptors = new AtomicReference>(); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); final CompositeRolesStore compositeRolesStore = new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore, mock(NativePrivilegeStore.class), Arrays.asList(inMemoryProvider1, inMemoryProvider2), @@ -697,7 +702,7 @@ public class CompositeRolesStoreTests extends ESTestCase { (roles, listener) -> listener.onFailure(new Exception("fake failure")); final AtomicReference> effectiveRoleDescriptors = new AtomicReference>(); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); final CompositeRolesStore compositeRolesStore = new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore, mock(NativePrivilegeStore.class), Arrays.asList(inMemoryProvider1, failingProvider), @@ -745,7 +750,7 @@ public class CompositeRolesStoreTests extends ESTestCase { // these licenses don't allow custom role providers xPackLicenseState.update(randomFrom(OperationMode.BASIC, OperationMode.GOLD, OperationMode.STANDARD), true, null); final AtomicReference> effectiveRoleDescriptors = new AtomicReference>(); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); CompositeRolesStore compositeRolesStore = new CompositeRolesStore( Settings.EMPTY, fileRolesStore, nativeRolesStore, reservedRolesStore, mock(NativePrivilegeStore.class), Arrays.asList(inMemoryProvider), new ThreadContext(Settings.EMPTY), xPackLicenseState, cache, @@ -809,7 +814,7 @@ public class CompositeRolesStoreTests extends ESTestCase { doCallRealMethod().when(reservedRolesStore).accept(any(Set.class), any(ActionListener.class)); NativeRolesStore nativeRolesStore = mock(NativeRolesStore.class); doCallRealMethod().when(nativeRolesStore).accept(any(Set.class), any(ActionListener.class)); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); CompositeRolesStore compositeRolesStore = new CompositeRolesStore( Settings.EMPTY, fileRolesStore, nativeRolesStore, reservedRolesStore, mock(NativePrivilegeStore.class), Collections.emptyList(), new ThreadContext(Settings.EMPTY), @@ -863,7 +868,7 @@ public class CompositeRolesStoreTests extends ESTestCase { doCallRealMethod().when(reservedRolesStore).accept(any(Set.class), any(ActionListener.class)); NativeRolesStore nativeRolesStore = mock(NativeRolesStore.class); doCallRealMethod().when(nativeRolesStore).accept(any(Set.class), any(ActionListener.class)); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); CompositeRolesStore compositeRolesStore = new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore, mock(NativePrivilegeStore.class), Collections.emptyList(), new ThreadContext(SECURITY_ENABLED_SETTINGS), @@ -895,7 +900,7 @@ public class CompositeRolesStoreTests extends ESTestCase { }).when(nativeRolesStore).getRoleDescriptors(isA(Set.class), any(ActionListener.class)); final ReservedRolesStore reservedRolesStore = spy(new ReservedRolesStore()); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); final CompositeRolesStore compositeRolesStore = new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore, mock(NativePrivilegeStore.class), Collections.emptyList(), new ThreadContext(SECURITY_ENABLED_SETTINGS), @@ -936,7 +941,7 @@ public class CompositeRolesStoreTests extends ESTestCase { }).when(nativeRolesStore).getRoleDescriptors(isA(Set.class), any(ActionListener.class)); final ReservedRolesStore reservedRolesStore = spy(new ReservedRolesStore()); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); final CompositeRolesStore compositeRolesStore = new CompositeRolesStore(settings, fileRolesStore, nativeRolesStore, reservedRolesStore, mock(NativePrivilegeStore.class), Collections.emptyList(), new ThreadContext(settings), @@ -964,7 +969,7 @@ public class CompositeRolesStoreTests extends ESTestCase { }).when(nativeRolesStore).getRoleDescriptors(isA(Set.class), any(ActionListener.class)); final ReservedRolesStore reservedRolesStore = spy(new ReservedRolesStore()); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); final AtomicReference> effectiveRoleDescriptors = new AtomicReference>(); final CompositeRolesStore compositeRolesStore = new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore, @@ -995,7 +1000,7 @@ public class CompositeRolesStoreTests extends ESTestCase { }).when(nativeRolesStore).getRoleDescriptors(isA(Set.class), any(ActionListener.class)); final ReservedRolesStore reservedRolesStore = spy(new ReservedRolesStore()); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); final AtomicReference> effectiveRoleDescriptors = new AtomicReference>(); final CompositeRolesStore compositeRolesStore = new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore, @@ -1031,7 +1036,7 @@ public class CompositeRolesStoreTests extends ESTestCase { return Void.TYPE; }).when(nativePrivStore).getPrivileges(any(Collection.class), any(Collection.class), any(ActionListener.class)); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); final AtomicReference> effectiveRoleDescriptors = new AtomicReference>(); final CompositeRolesStore compositeRolesStore = new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore, @@ -1077,7 +1082,7 @@ public class CompositeRolesStoreTests extends ESTestCase { return Void.TYPE; }).when(nativePrivStore).getPrivileges(any(Collection.class), any(Collection.class), any(ActionListener.class)); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); final AtomicReference> effectiveRoleDescriptors = new AtomicReference>(); final CompositeRolesStore compositeRolesStore = new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore, @@ -1121,7 +1126,7 @@ public class CompositeRolesStoreTests extends ESTestCase { }).when(nativeRolesStore).usageStats(any(ActionListener.class)); final ReservedRolesStore reservedRolesStore = spy(new ReservedRolesStore()); - final DocumentSubsetBitsetCache documentSubsetBitsetCache = new DocumentSubsetBitsetCache(Settings.EMPTY); + final DocumentSubsetBitsetCache documentSubsetBitsetCache = buildBitsetCache(); final CompositeRolesStore compositeRolesStore = new CompositeRolesStore(SECURITY_ENABLED_SETTINGS, fileRolesStore, nativeRolesStore, reservedRolesStore,