diff --git a/core/src/main/java/org/elasticsearch/common/cache/Cache.java b/core/src/main/java/org/elasticsearch/common/cache/Cache.java index aac8a7d4792..962ab2f536b 100644 --- a/core/src/main/java/org/elasticsearch/common/cache/Cache.java +++ b/core/src/main/java/org/elasticsearch/common/cache/Cache.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.util.concurrent.ReleasableLock; import java.util.*; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; @@ -275,12 +276,13 @@ public class Cache { * If the specified key is not already associated with a value (or is mapped to null), attempts to compute its * value using the given mapping function and enters it into this map unless null. * - * @param key the key whose associated value is to be returned or computed for if non-existant - * @param mappingFunction the function to compute a value given a key + * @param key the key whose associated value is to be returned or computed for if non-existant + * @param loader the function to compute a value given a key * @return the current (existing or computed) value associated with the specified key, or null if the computed * value is null + * @throws ExecutionException thrown if loader throws an exception */ - public V computeIfAbsent(K key, Function mappingFunction) { + public V computeIfAbsent(K key, CacheLoader loader) throws ExecutionException { long now = now(); V value = get(key, now); if (value == null) { @@ -288,7 +290,11 @@ public class Cache { try (ReleasableLock ignored = segment.writeLock.acquire()) { value = get(key, now); if (value == null) { - value = mappingFunction.apply(key); + try { + value = loader.load(key); + } catch (Exception e) { + throw new ExecutionException(e); + } } if (value != null) { put(key, value, now); diff --git a/core/src/main/java/org/elasticsearch/common/cache/CacheLoader.java b/core/src/main/java/org/elasticsearch/common/cache/CacheLoader.java new file mode 100644 index 00000000000..85636e1e186 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/common/cache/CacheLoader.java @@ -0,0 +1,25 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.cache; + +@FunctionalInterface +public interface CacheLoader { + V load(K key) throws Exception; +} diff --git a/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java b/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java index 5630faf5546..f2b7ba8e131 100644 --- a/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java +++ b/core/src/main/java/org/elasticsearch/index/cache/bitset/BitsetFilterCache.java @@ -151,25 +151,21 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea }); return filterToFbs.computeIfAbsent(query, key -> { - try { - final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context); - final IndexSearcher searcher = new IndexSearcher(topLevelContext); - searcher.setQueryCache(null); - final Weight weight = searcher.createNormalizedWeight(query, false); - final DocIdSetIterator it = weight.scorer(context); - final BitSet bitSet; - if (it == null) { - bitSet = null; - } else { - bitSet = BitSet.of(it, context.reader().maxDoc()); - } - - Value value = new Value(bitSet, shardId); - listener.onCache(shardId, value.bitset); - return value; - } catch (IOException e) { - throw new RuntimeException(e); + final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context); + final IndexSearcher searcher = new IndexSearcher(topLevelContext); + searcher.setQueryCache(null); + final Weight weight = searcher.createNormalizedWeight(query, false); + final DocIdSetIterator it = weight.scorer(context); + final BitSet bitSet; + if (it == null) { + bitSet = null; + } else { + bitSet = BitSet.of(it, context.reader().maxDoc()); } + + Value value = new Value(bitSet, shardId); + listener.onCache(shardId, value.bitset); + return value; }).bitset; } diff --git a/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java b/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java index ceaec6b3ad9..5fb70b61160 100644 --- a/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java +++ b/core/src/main/java/org/elasticsearch/indices/cache/request/IndicesRequestCache.java @@ -29,10 +29,7 @@ import org.elasticsearch.action.search.SearchType; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.common.cache.Cache; -import org.elasticsearch.common.cache.CacheBuilder; -import org.elasticsearch.common.cache.RemovalListener; -import org.elasticsearch.common.cache.RemovalNotification; +import org.elasticsearch.common.cache.*; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.BytesStreamOutput; @@ -263,7 +260,7 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis } } - private static class Loader implements Function { + private static class Loader implements CacheLoader { private final QueryPhase queryPhase; private final SearchContext context; @@ -279,7 +276,7 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis } @Override - public Value apply(Key key) { + public Value load(Key key) throws Exception { queryPhase.execute(context); /* BytesStreamOutput allows to pass the expected size but by default uses @@ -299,8 +296,6 @@ public class IndicesRequestCache extends AbstractComponent implements RemovalLis Value value = new Value(reference, out.ramBytesUsed()); key.shard.requestCache().onCached(key, value); return value; - } catch (IOException e) { - throw new RuntimeException(e); } } } diff --git a/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java b/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java index 84454544033..6612b9f4e8e 100644 --- a/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java +++ b/core/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java @@ -152,12 +152,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL for (Listener listener : this.listeners) { k.listeners.add(listener); } - final AtomicFieldData fieldData; - try { - fieldData = indexFieldData.loadDirect(context); - } catch (Exception e) { - throw new RuntimeException(e); - } + final AtomicFieldData fieldData = indexFieldData.loadDirect(context); for (Listener listener : k.listeners) { try { listener.onCache(shardId, fieldNames, fieldDataType, fieldData); @@ -181,12 +176,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL for (Listener listener : this.listeners) { k.listeners.add(listener); } - final Accountable ifd; - try { - ifd = (Accountable) indexFieldData.localGlobalDirect(indexReader); - } catch (Exception e) { - throw new RuntimeException(e); - } + final Accountable ifd = (Accountable) indexFieldData.localGlobalDirect(indexReader); for (Listener listener : k.listeners) { try { listener.onCache(shardId, fieldNames, fieldDataType, ifd); diff --git a/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java b/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java index 2783b6e7fd1..35be07f8964 100644 --- a/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java +++ b/core/src/test/java/org/elasticsearch/common/cache/CacheTests.java @@ -24,6 +24,7 @@ import org.junit.Before; import java.util.*; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReferenceArray; @@ -448,10 +449,14 @@ public class CacheTests extends ESTestCase { Thread thread = new Thread(() -> { latch.countDown(); for (int j = 0; j < numberOfEntries; j++) { - cache.computeIfAbsent(j, key -> { - assertTrue(flags.compareAndSet(key, false, true)); - return Integer.toString(key); - }); + try { + cache.computeIfAbsent(j, key -> { + assertTrue(flags.compareAndSet(key, false, true)); + return Integer.toString(key); + }); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } } }); threads.add(thread);