diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java index 2c1b6630273..38b686d5190 100644 --- a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java +++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/index/engine/SimpleEngineBenchmark.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.analysis.AnalysisService; +import org.elasticsearch.index.cache.bloom.none.NonBloomCache; import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.engine.Engine; @@ -307,7 +308,7 @@ public class SimpleEngineBenchmark { ThreadPool threadPool = new ScalingThreadPool(); SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings)); Engine engine = new RobinEngine(shardId, settings, store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false), new LogByteSizeMergePolicyProvider(store), - new ConcurrentMergeSchedulerProvider(shardId, settings), new AnalysisService(shardId.index()), new SimilarityService(shardId.index())); + new ConcurrentMergeSchedulerProvider(shardId, settings), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NonBloomCache(shardId.index())); engine.start(); SimpleEngineBenchmark benchmark = new SimpleEngineBenchmark(store, engine) diff --git a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/percolator/EmbeddedPercolatorBenchmarkTest.java b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/percolator/EmbeddedPercolatorBenchmarkTest.java index ea748dad880..4f6ea804eb6 100644 --- a/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/percolator/EmbeddedPercolatorBenchmarkTest.java +++ b/modules/benchmark/micro/src/main/java/org/elasticsearch/benchmark/percolator/EmbeddedPercolatorBenchmarkTest.java @@ -39,6 +39,7 @@ import org.elasticsearch.index.query.IndexQueryParserModule; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.threadpool.ThreadPoolModule; import java.util.concurrent.CountDownLatch; @@ -60,6 +61,7 @@ public class EmbeddedPercolatorBenchmarkTest { Index index = new Index("test"); Injector injector = new ModulesBuilder().add( new SettingsModule(settings), + new ThreadPoolModule(settings), new ScriptModule(), new MapperServiceModule(), new IndexSettingsModule(settings), diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/bloom/BloomCalculations.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/bloom/BloomCalculations.java new file mode 100644 index 00000000000..02f21ff3fcc --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/bloom/BloomCalculations.java @@ -0,0 +1,172 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.bloom; + +/** + * The following calculations are taken from: + * http://www.cs.wisc.edu/~cao/papers/summary-cache/node8.html + * "Bloom Filters - the math" + * + * This class's static methods are meant to facilitate the use of the Bloom + * Filter class by helping to choose correct values of 'bits per element' and + * 'number of hash functions, k'. + */ +class BloomCalculations { + + private static final int minBuckets = 2; + private static final int minK = 1; + + private static final int EXCESS = 20; + + /** + * In the following table, the row 'i' shows false positive rates if i buckets + * per element are used. Column 'j' shows false positive rates if j hash + * functions are used. The first row is 'i=0', the first column is 'j=0'. + * Each cell (i,j) the false positive rate determined by using i buckets per + * element and j hash functions. + */ + static final double[][] probs = new double[][]{ + {1.0}, // dummy row representing 0 buckets per element + {1.0, 1.0}, // dummy row representing 1 buckets per element + {1.0, 0.393, 0.400}, + {1.0, 0.283, 0.237, 0.253}, + {1.0, 0.221, 0.155, 0.147, 0.160}, + {1.0, 0.181, 0.109, 0.092, 0.092, 0.101}, // 5 + {1.0, 0.154, 0.0804, 0.0609, 0.0561, 0.0578, 0.0638}, + {1.0, 0.133, 0.0618, 0.0423, 0.0359, 0.0347, 0.0364}, + {1.0, 0.118, 0.0489, 0.0306, 0.024, 0.0217, 0.0216, 0.0229}, + {1.0, 0.105, 0.0397, 0.0228, 0.0166, 0.0141, 0.0133, 0.0135, 0.0145}, + {1.0, 0.0952, 0.0329, 0.0174, 0.0118, 0.00943, 0.00844, 0.00819, 0.00846}, // 10 + {1.0, 0.0869, 0.0276, 0.0136, 0.00864, 0.0065, 0.00552, 0.00513, 0.00509}, + {1.0, 0.08, 0.0236, 0.0108, 0.00646, 0.00459, 0.00371, 0.00329, 0.00314}, + {1.0, 0.074, 0.0203, 0.00875, 0.00492, 0.00332, 0.00255, 0.00217, 0.00199, 0.00194}, + {1.0, 0.0689, 0.0177, 0.00718, 0.00381, 0.00244, 0.00179, 0.00146, 0.00129, 0.00121, 0.0012}, + {1.0, 0.0645, 0.0156, 0.00596, 0.003, 0.00183, 0.00128, 0.001, 0.000852, 0.000775, 0.000744}, // 15 + {1.0, 0.0606, 0.0138, 0.005, 0.00239, 0.00139, 0.000935, 0.000702, 0.000574, 0.000505, 0.00047, 0.000459}, + {1.0, 0.0571, 0.0123, 0.00423, 0.00193, 0.00107, 0.000692, 0.000499, 0.000394, 0.000335, 0.000302, 0.000287, 0.000284}, + {1.0, 0.054, 0.0111, 0.00362, 0.00158, 0.000839, 0.000519, 0.00036, 0.000275, 0.000226, 0.000198, 0.000183, 0.000176}, + {1.0, 0.0513, 0.00998, 0.00312, 0.0013, 0.000663, 0.000394, 0.000264, 0.000194, 0.000155, 0.000132, 0.000118, 0.000111, 0.000109}, + {1.0, 0.0488, 0.00906, 0.0027, 0.00108, 0.00053, 0.000303, 0.000196, 0.00014, 0.000108, 8.89e-05, 7.77e-05, 7.12e-05, 6.79e-05, 6.71e-05} // 20 + }; // the first column is a dummy column representing K=0. + + /** + * The optimal number of hashes for a given number of bits per element. + * These values are automatically calculated from the data above. + */ + private static final int[] optKPerBuckets = new int[probs.length]; + + static { + for (int i = 0; i < probs.length; i++) { + double min = Double.MAX_VALUE; + double[] prob = probs[i]; + for (int j = 0; j < prob.length; j++) { + if (prob[j] < min) { + min = prob[j]; + optKPerBuckets[i] = Math.max(minK, j); + } + } + } + } + + /** + * Given the number of buckets that can be used per element, return a + * specification that minimizes the false positive rate. + * + * @param bucketsPerElement The number of buckets per element for the filter. + * @return A spec that minimizes the false positive rate. + */ + public static BloomSpecification computeBloomSpec(int bucketsPerElement) { + assert bucketsPerElement >= 1; + assert bucketsPerElement <= probs.length - 1; + return new BloomSpecification(optKPerBuckets[bucketsPerElement], bucketsPerElement); + } + + /** + * A wrapper class that holds two key parameters for a Bloom Filter: the + * number of hash functions used, and the number of buckets per element used. + */ + public static class BloomSpecification { + final int K; // number of hash functions. + final int bucketsPerElement; + + public BloomSpecification(int k, int bucketsPerElement) { + K = k; + this.bucketsPerElement = bucketsPerElement; + } + } + + /** + * Given a maximum tolerable false positive probability, compute a Bloom + * specification which will give less than the specified false positive rate, + * but minimize the number of buckets per element and the number of hash + * functions used. Because bandwidth (and therefore total bitvector size) + * is considered more expensive than computing power, preference is given + * to minimizing buckets per element rather than number of hash functions. + * + * @param maxBucketsPerElement The maximum number of buckets available for the filter. + * @param maxFalsePosProb The maximum tolerable false positive rate. + * @return A Bloom Specification which would result in a false positive rate + * less than specified by the function call + * @throws UnsupportedOperationException if a filter satisfying the parameters cannot be met + */ + public static BloomSpecification computeBloomSpec(int maxBucketsPerElement, double maxFalsePosProb) { + assert maxBucketsPerElement >= 1; + assert maxBucketsPerElement <= probs.length - 1; + int maxK = probs[maxBucketsPerElement].length - 1; + + // Handle the trivial cases + if (maxFalsePosProb >= probs[minBuckets][minK]) { + return new BloomSpecification(2, optKPerBuckets[2]); + } + if (maxFalsePosProb < probs[maxBucketsPerElement][maxK]) { + throw new UnsupportedOperationException(String.format("Unable to satisfy %s with %s buckets per element", + maxFalsePosProb, maxBucketsPerElement)); + } + + // First find the minimal required number of buckets: + int bucketsPerElement = 2; + int K = optKPerBuckets[2]; + while (probs[bucketsPerElement][K] > maxFalsePosProb) { + bucketsPerElement++; + K = optKPerBuckets[bucketsPerElement]; + } + // Now that the number of buckets is sufficient, see if we can relax K + // without losing too much precision. + while (probs[bucketsPerElement][K - 1] <= maxFalsePosProb) { + K--; + } + + return new BloomSpecification(K, bucketsPerElement); + } + + /** + * Calculates the maximum number of buckets per element that this implementation + * can support. Crucially, it will lower the bucket count if necessary to meet + * BitSet's size restrictions. + */ + public static int maxBucketsPerElement(long numElements) { + numElements = Math.max(1, numElements); + double v = (Long.MAX_VALUE - EXCESS) / (double) numElements; + if (v < 1.0) { + throw new UnsupportedOperationException("Cannot compute probabilities for " + numElements + " elements."); + } + return Math.min(BloomCalculations.probs.length - 1, (int) v); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/bloom/BloomFilter.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/bloom/BloomFilter.java new file mode 100644 index 00000000000..d07fe9347f3 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/bloom/BloomFilter.java @@ -0,0 +1,78 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.bloom; + +import java.nio.ByteBuffer; + +/** + * + */ +public interface BloomFilter { + + public static final BloomFilter NONE = new BloomFilter() { + @Override public void add(byte[] key, int offset, int length) { + } + + @Override public void add(ByteBuffer key) { + } + + @Override public boolean isPresent(byte[] key, int offset, int length) { + return true; + } + + @Override public boolean isPresent(ByteBuffer key) { + return true; + } + + @Override public long sizeInBytes() { + return 0; + } + }; + + public static final BloomFilter EMPTY = new BloomFilter() { + @Override public void add(byte[] key, int offset, int length) { + } + + @Override public void add(ByteBuffer key) { + } + + @Override public boolean isPresent(byte[] key, int offset, int length) { + return false; + } + + @Override public boolean isPresent(ByteBuffer key) { + return false; + } + + @Override public long sizeInBytes() { + return 0; + } + }; + + void add(byte[] key, int offset, int length); + + void add(ByteBuffer key); + + boolean isPresent(byte[] key, int offset, int length); + + boolean isPresent(ByteBuffer key); + + long sizeInBytes(); +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/bloom/BloomFilterFactory.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/bloom/BloomFilterFactory.java new file mode 100644 index 00000000000..36948c3a42f --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/bloom/BloomFilterFactory.java @@ -0,0 +1,100 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.bloom; + +import org.apache.lucene.util.OpenBitSet; +import org.elasticsearch.common.UUID; +import org.elasticsearch.common.logging.ESLogger; +import org.elasticsearch.common.logging.ESLoggerFactory; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.SizeValue; +import org.elasticsearch.common.unit.TimeValue; + +import java.io.UnsupportedEncodingException; + +/** + * + */ +public class BloomFilterFactory { + + private static ESLogger logger = ESLoggerFactory.getLogger(BloomFilterFactory.class.getName()); + + private static final int EXCESS = 20; + + /** + * @return A BloomFilter with the lowest practical false positive probability + * for the given number of elements. + */ + public static BloomFilter getFilter(long numElements, int targetBucketsPerElem) { + int maxBucketsPerElement = Math.max(1, BloomCalculations.maxBucketsPerElement(numElements)); + int bucketsPerElement = Math.min(targetBucketsPerElem, maxBucketsPerElement); + if (bucketsPerElement < targetBucketsPerElem) { + logger.warn(String.format("Cannot provide an optimal BloomFilter for %d elements (%d/%d buckets per element).", + numElements, bucketsPerElement, targetBucketsPerElem)); + } + BloomCalculations.BloomSpecification spec = BloomCalculations.computeBloomSpec(bucketsPerElement); + return new ObsBloomFilter(spec.K, bucketsFor(numElements, spec.bucketsPerElement)); + } + + /** + * @return The smallest BloomFilter that can provide the given false positive + * probability rate for the given number of elements. + * + * Asserts that the given probability can be satisfied using this filter. + */ + public static BloomFilter getFilter(long numElements, double maxFalsePosProbability) { + assert maxFalsePosProbability <= 1.0 : "Invalid probability"; + int bucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements); + BloomCalculations.BloomSpecification spec = BloomCalculations.computeBloomSpec(bucketsPerElement, maxFalsePosProbability); + return new ObsBloomFilter(spec.K, bucketsFor(numElements, spec.bucketsPerElement)); + } + + private static OpenBitSet bucketsFor(long numElements, int bucketsPer) { + long numBits = numElements * bucketsPer + EXCESS; //TODO overflow? + return new OpenBitSet((long) Math.min(Long.MAX_VALUE, numBits)); + } + + public static void main(String[] args) throws UnsupportedEncodingException { + long elements = SizeValue.parseSizeValue("100m").singles(); + BloomFilter filter = BloomFilterFactory.getFilter(elements, 15); + System.out.println("Filter size: " + new ByteSizeValue(filter.sizeInBytes())); + for (long i = 0; i < elements; i++) { + byte[] utf8s = UUID.randomBase64UUID().getBytes("UTF8"); + filter.add(utf8s, 0, utf8s.length); + } + long falsePositives = 0; + for (long i = 0; i < elements; i++) { + byte[] utf8s = UUID.randomBase64UUID().getBytes("UTF8"); + if (filter.isPresent(utf8s, 0, utf8s.length)) { + falsePositives++; + } + } + System.out.println("false positives: " + falsePositives); + + byte[] utf8s = UUID.randomBase64UUID().getBytes("UTF8"); + long time = System.currentTimeMillis(); + for (long i = 0; i < elements; i++) { + if (filter.isPresent(utf8s, 0, utf8s.length)) { + } + } + long timeSize = System.currentTimeMillis() - time; + System.out.println("Indexed in " + new TimeValue(timeSize) + ", TPS: " + (elements / timeSize) + " per millisecond"); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/bloom/MurmurHash.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/bloom/MurmurHash.java new file mode 100644 index 00000000000..9530e5f2a8e --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/bloom/MurmurHash.java @@ -0,0 +1,188 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.bloom; + +import java.nio.ByteBuffer; + +/** + * This is a very fast, non-cryptographic hash suitable for general hash-based + * lookup. See http://murmurhash.googlepages.com/ for more details. + * + *

+ * The C version of MurmurHash 2.0 found at that site was ported to Java by + * Andrzej Bialecki (ab at getopt org). + *

+ */ +public class MurmurHash { + public static int hash32(ByteBuffer data, int offset, int length, int seed) { + int m = 0x5bd1e995; + int r = 24; + + int h = seed ^ length; + + int len_4 = length >> 2; + + for (int i = 0; i < len_4; i++) { + int i_4 = i << 2; + int k = data.get(offset + i_4 + 3); + k = k << 8; + k = k | (data.get(offset + i_4 + 2) & 0xff); + k = k << 8; + k = k | (data.get(offset + i_4 + 1) & 0xff); + k = k << 8; + k = k | (data.get(offset + i_4 + 0) & 0xff); + k *= m; + k ^= k >>> r; + k *= m; + h *= m; + h ^= k; + } + + // avoid calculating modulo + int len_m = len_4 << 2; + int left = length - len_m; + + if (left != 0) { + if (left >= 3) { + h ^= (int) data.get(offset + length - 3) << 16; + } + if (left >= 2) { + h ^= (int) data.get(offset + length - 2) << 8; + } + if (left >= 1) { + h ^= (int) data.get(offset + length - 1); + } + + h *= m; + } + + h ^= h >>> 13; + h *= m; + h ^= h >>> 15; + + return h; + } + + public static long hash64(ByteBuffer key, int offset, int length, long seed) { + long m64 = 0xc6a4a7935bd1e995L; + int r64 = 47; + + long h64 = (seed & 0xffffffffL) ^ (m64 * length); + + int lenLongs = length >> 3; + + for (int i = 0; i < lenLongs; ++i) { + int i_8 = i << 3; + + long k64 = ((long) key.get(offset + i_8 + 0) & 0xff) + (((long) key.get(offset + i_8 + 1) & 0xff) << 8) + + (((long) key.get(offset + i_8 + 2) & 0xff) << 16) + (((long) key.get(offset + i_8 + 3) & 0xff) << 24) + + (((long) key.get(offset + i_8 + 4) & 0xff) << 32) + (((long) key.get(offset + i_8 + 5) & 0xff) << 40) + + (((long) key.get(offset + i_8 + 6) & 0xff) << 48) + (((long) key.get(offset + i_8 + 7) & 0xff) << 56); + + k64 *= m64; + k64 ^= k64 >>> r64; + k64 *= m64; + + h64 ^= k64; + h64 *= m64; + } + + int rem = length & 0x7; + + switch (rem) { + case 0: + break; + case 7: + h64 ^= (long) key.get(offset + length - rem + 6) << 48; + case 6: + h64 ^= (long) key.get(offset + length - rem + 5) << 40; + case 5: + h64 ^= (long) key.get(offset + length - rem + 4) << 32; + case 4: + h64 ^= (long) key.get(offset + length - rem + 3) << 24; + case 3: + h64 ^= (long) key.get(offset + length - rem + 2) << 16; + case 2: + h64 ^= (long) key.get(offset + length - rem + 1) << 8; + case 1: + h64 ^= (long) key.get(offset + length - rem); + h64 *= m64; + } + + h64 ^= h64 >>> r64; + h64 *= m64; + h64 ^= h64 >>> r64; + + return h64; + } + + public static long hash64(byte[] key, int offset, int length, long seed) { + long m64 = 0xc6a4a7935bd1e995L; + int r64 = 47; + + long h64 = (seed & 0xffffffffL) ^ (m64 * length); + + int lenLongs = length >> 3; + + for (int i = 0; i < lenLongs; ++i) { + int i_8 = i << 3; + + long k64 = ((long) key[offset + i_8 + 0] & 0xff) + (((long) key[offset + i_8 + 1] & 0xff) << 8) + + (((long) key[offset + i_8 + 2] & 0xff) << 16) + (((long) key[offset + i_8 + 3] & 0xff) << 24) + + (((long) key[offset + i_8 + 4] & 0xff) << 32) + (((long) key[offset + i_8 + 5] & 0xff) << 40) + + (((long) key[offset + i_8 + 6] & 0xff) << 48) + (((long) key[offset + i_8 + 7] & 0xff) << 56); + + k64 *= m64; + k64 ^= k64 >>> r64; + k64 *= m64; + + h64 ^= k64; + h64 *= m64; + } + + int rem = length & 0x7; + + switch (rem) { + case 0: + break; + case 7: + h64 ^= (long) key[offset + length - rem + 6] << 48; + case 6: + h64 ^= (long) key[offset + length - rem + 5] << 40; + case 5: + h64 ^= (long) key[offset + length - rem + 4] << 32; + case 4: + h64 ^= (long) key[offset + length - rem + 3] << 24; + case 3: + h64 ^= (long) key[offset + length - rem + 2] << 16; + case 2: + h64 ^= (long) key[offset + length - rem + 1] << 8; + case 1: + h64 ^= (long) key[offset + length - rem]; + h64 *= m64; + } + + h64 ^= h64 >>> r64; + h64 *= m64; + h64 ^= h64 >>> r64; + + return h64; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/bloom/ObsBloomFilter.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/bloom/ObsBloomFilter.java new file mode 100644 index 00000000000..c1cae1d88ae --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/bloom/ObsBloomFilter.java @@ -0,0 +1,127 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.bloom; + +import org.apache.lucene.util.OpenBitSet; +import org.elasticsearch.common.RamUsage; + +import java.nio.ByteBuffer; + +public class ObsBloomFilter implements BloomFilter { + + private final int hashCount; + + private final OpenBitSet bitset; + + ObsBloomFilter(int hashCount, OpenBitSet bs) { + this.hashCount = hashCount; + this.bitset = bs; + } + + long emptyBuckets() { + long n = 0; + for (long i = 0; i < buckets(); i++) { + if (!bitset.get(i)) { + n++; + } + } + return n; + } + + private long buckets() { + return bitset.size(); + } + + private long[] getHashBuckets(ByteBuffer key) { + return getHashBuckets(key, hashCount, buckets()); + } + + private long[] getHashBuckets(byte[] key, int offset, int length) { + return getHashBuckets(key, offset, length, hashCount, buckets()); + } + + // Murmur is faster than an SHA-based approach and provides as-good collision + // resistance. The combinatorial generation approach described in + // http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/esa06.pdf + // does prove to work in actual tests, and is obviously faster + // than performing further iterations of murmur. + static long[] getHashBuckets(ByteBuffer b, int hashCount, long max) { + long[] result = new long[hashCount]; + long hash1 = MurmurHash.hash64(b, b.position(), b.remaining(), 0L); + long hash2 = MurmurHash.hash64(b, b.position(), b.remaining(), hash1); + for (int i = 0; i < hashCount; ++i) { + result[i] = Math.abs((hash1 + (long) i * hash2) % max); + } + return result; + } + + // Murmur is faster than an SHA-based approach and provides as-good collision + // resistance. The combinatorial generation approach described in + // http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/esa06.pdf + // does prove to work in actual tests, and is obviously faster + // than performing further iterations of murmur. + static long[] getHashBuckets(byte[] b, int offset, int length, int hashCount, long max) { + long[] result = new long[hashCount]; + long hash1 = MurmurHash.hash64(b, offset, length, 0L); + long hash2 = MurmurHash.hash64(b, offset, length, hash1); + for (int i = 0; i < hashCount; ++i) { + result[i] = Math.abs((hash1 + (long) i * hash2) % max); + } + return result; + } + + @Override public void add(byte[] key, int offset, int length) { + for (long bucketIndex : getHashBuckets(key, offset, length)) { + bitset.fastSet(bucketIndex); + } + } + + public void add(ByteBuffer key) { + for (long bucketIndex : getHashBuckets(key)) { + bitset.fastSet(bucketIndex); + } + } + + @Override public boolean isPresent(byte[] key, int offset, int length) { + for (long bucketIndex : getHashBuckets(key, offset, length)) { + if (!bitset.fastGet(bucketIndex)) { + return false; + } + } + return true; + } + + public boolean isPresent(ByteBuffer key) { + for (long bucketIndex : getHashBuckets(key)) { + if (!bitset.fastGet(bucketIndex)) { + return false; + } + } + return true; + } + + public void clear() { + bitset.clear(0, bitset.size()); + } + + @Override public long sizeInBytes() { + return bitset.getBits().length * RamUsage.NUM_BYTES_LONG + RamUsage.NUM_BYTES_ARRAY_HEADER + RamUsage.NUM_BYTES_INT /* wlen */; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/lucene/ReaderSearcherHolder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/lucene/ReaderSearcherHolder.java index fb8e8675855..4ab7f4cefd9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/lucene/ReaderSearcherHolder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/lucene/ReaderSearcherHolder.java @@ -20,9 +20,9 @@ package org.elasticsearch.common.lucene; import org.apache.lucene.index.IndexReader; -import org.apache.lucene.search.IndexSearcher; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lucene.search.ExtendedIndexSearcher; /** * A very simple holder for a tuple of reader and searcher. @@ -31,13 +31,13 @@ import org.elasticsearch.common.lease.Releasable; */ public class ReaderSearcherHolder implements Releasable { - private final IndexSearcher indexSearcher; + private final ExtendedIndexSearcher indexSearcher; public ReaderSearcherHolder(IndexReader indexReader) { - this(new IndexSearcher(indexReader)); + this(new ExtendedIndexSearcher(indexReader)); } - public ReaderSearcherHolder(IndexSearcher indexSearcher) { + public ReaderSearcherHolder(ExtendedIndexSearcher indexSearcher) { this.indexSearcher = indexSearcher; } @@ -45,7 +45,7 @@ public class ReaderSearcherHolder implements Releasable { return indexSearcher.getIndexReader(); } - public IndexSearcher searcher() { + public ExtendedIndexSearcher searcher() { return indexSearcher; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/IndexCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/IndexCache.java index a88587bad51..12f1e95ea87 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/IndexCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/IndexCache.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.Index; +import org.elasticsearch.index.cache.bloom.BloomCache; import org.elasticsearch.index.cache.field.data.FieldDataCache; import org.elasticsearch.index.cache.filter.FilterCache; import org.elasticsearch.index.cache.id.IdCache; @@ -49,15 +50,18 @@ public class IndexCache extends AbstractIndexComponent implements CloseableCompo private final IdCache idCache; + private final BloomCache bloomCache; + private ClusterService clusterService; @Inject public IndexCache(Index index, @IndexSettings Settings indexSettings, FilterCache filterCache, FieldDataCache fieldDataCache, - QueryParserCache queryParserCache, IdCache idCache) { + QueryParserCache queryParserCache, IdCache idCache, BloomCache bloomCache) { super(index, indexSettings); this.filterCache = filterCache; this.fieldDataCache = fieldDataCache; this.queryParserCache = queryParserCache; this.idCache = idCache; + this.bloomCache = bloomCache; } @Inject(optional = true) @@ -89,6 +93,7 @@ public class IndexCache extends AbstractIndexComponent implements CloseableCompo fieldDataCache.close(); idCache.close(); queryParserCache.close(); + bloomCache.close(); if (clusterService != null) { clusterService.remove(this); } @@ -98,6 +103,7 @@ public class IndexCache extends AbstractIndexComponent implements CloseableCompo filterCache.clear(reader); fieldDataCache.clear(reader); idCache.clear(reader); + bloomCache.clear(reader); } public void clear() { @@ -105,12 +111,14 @@ public class IndexCache extends AbstractIndexComponent implements CloseableCompo fieldDataCache.clear(); idCache.clear(); queryParserCache.clear(); + bloomCache.clear(); } public void clearUnreferenced() { filterCache.clearUnreferenced(); fieldDataCache.clearUnreferenced(); idCache.clearUnreferenced(); + bloomCache.clearUnreferenced(); } @Override public void clusterChanged(ClusterChangedEvent event) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/IndexCacheModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/IndexCacheModule.java index 2f379329ea6..8031258985b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/IndexCacheModule.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/IndexCacheModule.java @@ -21,6 +21,7 @@ package org.elasticsearch.index.cache; import org.elasticsearch.common.inject.AbstractModule; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.cache.bloom.BloomCacheModule; import org.elasticsearch.index.cache.field.data.FieldDataCacheModule; import org.elasticsearch.index.cache.filter.FilterCacheModule; import org.elasticsearch.index.cache.id.IdCacheModule; @@ -42,6 +43,7 @@ public class IndexCacheModule extends AbstractModule { new FieldDataCacheModule(settings).configure(binder()); new IdCacheModule(settings).configure(binder()); new QueryParserCacheModule(settings).configure(binder()); + new BloomCacheModule(settings).configure(binder()); bind(IndexCache.class).asEagerSingleton(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/BloomCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/BloomCache.java new file mode 100644 index 00000000000..19af4b0d34e --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/BloomCache.java @@ -0,0 +1,46 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.cache.bloom; + +import org.apache.lucene.index.IndexReader; +import org.elasticsearch.common.bloom.BloomFilter; +import org.elasticsearch.common.component.CloseableComponent; +import org.elasticsearch.index.IndexComponent; + +/** + * @author kimchy (shay.banon) + */ +public interface BloomCache extends IndexComponent, CloseableComponent { + + /** + * *Async* loads a bloom filter for the field name. + */ + BloomFilter filter(IndexReader reader, String fieldName, boolean asyncLoad); + + void clear(); + + void clear(IndexReader reader); + + void clearUnreferenced(); + + long sizeInBytes(); + + long sizeInBytes(String fieldName); +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/BloomCacheModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/BloomCacheModule.java new file mode 100644 index 00000000000..504f62c5945 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/BloomCacheModule.java @@ -0,0 +1,47 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.cache.bloom; + +import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.common.inject.Scopes; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.cache.bloom.simple.SimpleBloomCache; + +/** + * @author kimchy (shay.banon) + */ +public class BloomCacheModule extends AbstractModule { + + public static final class BloomCacheSettings { + public static final String TYPE = "index.cache.bloom.type"; + } + + private final Settings settings; + + public BloomCacheModule(Settings settings) { + this.settings = settings; + } + + @Override protected void configure() { + bind(BloomCache.class) + .to(settings.getAsClass(BloomCacheSettings.TYPE, SimpleBloomCache.class, "org.elasticsearch.index.cache.bloom.", "BloomCache")) + .in(Scopes.SINGLETON); + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/none/NonBloomCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/none/NonBloomCache.java new file mode 100644 index 00000000000..a5ec82b99fc --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/none/NonBloomCache.java @@ -0,0 +1,69 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.cache.bloom.none; + +import org.apache.lucene.index.IndexReader; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.common.bloom.BloomFilter; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.AbstractIndexComponent; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.cache.bloom.BloomCache; +import org.elasticsearch.index.settings.IndexSettings; + +/** + * @author kimchy (shay.banon) + */ +public class NonBloomCache extends AbstractIndexComponent implements BloomCache { + + public NonBloomCache(Index index) { + super(index, ImmutableSettings.Builder.EMPTY_SETTINGS); + } + + @Inject public NonBloomCache(Index index, @IndexSettings Settings indexSettings) { + super(index, indexSettings); + } + + @Override public BloomFilter filter(IndexReader reader, String fieldName, boolean asyncLoad) { + return BloomFilter.NONE; + } + + @Override public void clear() { + } + + @Override public void clear(IndexReader reader) { + } + + @Override public void clearUnreferenced() { + } + + @Override public long sizeInBytes() { + return 0; + } + + @Override public long sizeInBytes(String fieldName) { + return 0; + } + + @Override public void close() throws ElasticSearchException { + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/simple/SimpleBloomCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/simple/SimpleBloomCache.java new file mode 100644 index 00000000000..dffb78adb2b --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/simple/SimpleBloomCache.java @@ -0,0 +1,227 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.cache.bloom.simple; + +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.Term; +import org.apache.lucene.index.TermDocs; +import org.apache.lucene.index.TermEnum; +import org.apache.lucene.util.StringHelper; +import org.apache.lucene.util.UnicodeUtil; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.common.Unicode; +import org.elasticsearch.common.bloom.BloomFilter; +import org.elasticsearch.common.bloom.BloomFilterFactory; +import org.elasticsearch.common.collect.MapMaker; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.index.AbstractIndexComponent; +import org.elasticsearch.index.Index; +import org.elasticsearch.index.cache.bloom.BloomCache; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * @author kimchy (shay.banon) + */ +public class SimpleBloomCache extends AbstractIndexComponent implements BloomCache { + + private final ThreadPool threadPool; + + private final ConcurrentMap> cache; + + private final Object creationMutex = new Object(); + + @Inject public SimpleBloomCache(Index index, @IndexSettings Settings indexSettings, ThreadPool threadPool) { + super(index, indexSettings); + this.threadPool = threadPool; + // weak keys is fine, it will only be cleared once IndexReader references will be removed + // (assuming clear(...) will not be called) + this.cache = new MapMaker().weakKeys().makeMap(); + } + + @Override public void close() throws ElasticSearchException { + clear(); + } + + @Override public void clear() { + cache.clear(); + } + + @Override public void clear(IndexReader reader) { + ConcurrentMap map = cache.remove(reader.getFieldCacheKey()); + // help soft/weak handling GC + if (map != null) { + map.clear(); + } + } + + @Override public void clearUnreferenced() { + // nothing to do here... + } + + @Override public long sizeInBytes() { + // the overhead of the map is not really relevant... + long sizeInBytes = 0; + for (ConcurrentMap map : cache.values()) { + for (BloomFilterEntry filter : map.values()) { + sizeInBytes += filter.filter.sizeInBytes(); + } + } + return sizeInBytes; + } + + @Override public long sizeInBytes(String fieldName) { + long sizeInBytes = 0; + for (ConcurrentMap map : cache.values()) { + BloomFilterEntry filter = map.get(fieldName); + if (filter != null) { + sizeInBytes += filter.filter.sizeInBytes(); + } + } + return sizeInBytes; + } + + @Override public BloomFilter filter(IndexReader reader, String fieldName, boolean asyncLoad) { + int currentNumDocs = reader.numDocs(); + if (currentNumDocs == 0) { + return BloomFilter.EMPTY; + } + ConcurrentMap fieldCache = cache.get(reader.getFieldCacheKey()); + if (fieldCache == null) { + synchronized (creationMutex) { + fieldCache = cache.get(reader.getFieldCacheKey()); + if (fieldCache == null) { + fieldCache = ConcurrentCollections.newConcurrentMap(); + cache.put(reader.getFieldCacheKey(), fieldCache); + } + } + } + BloomFilterEntry filter = fieldCache.get(fieldName); + if (filter == null) { + synchronized (fieldCache) { + filter = fieldCache.get(fieldName); + if (filter == null) { + filter = new BloomFilterEntry(reader.numDocs(), BloomFilter.NONE); + filter.loading.set(true); + fieldCache.put(fieldName, filter); + // now, do the async load of it... + BloomFilterLoader loader = new BloomFilterLoader(reader, fieldName); + if (asyncLoad) { + threadPool.cached().execute(loader); + } else { + loader.run(); + filter = fieldCache.get(fieldName); + } + } + } + } + // if we too many deletes, we need to reload the bloom filter so it will be more effective + if (filter.numDocs > 1000 && (currentNumDocs / filter.numDocs) < 0.6) { + if (filter.loading.compareAndSet(false, true)) { + // do the async loading + BloomFilterLoader loader = new BloomFilterLoader(reader, fieldName); + if (asyncLoad) { + threadPool.cached().execute(loader); + } else { + loader.run(); + filter = fieldCache.get(fieldName); + } + } + } + return filter.filter; + } + + class BloomFilterLoader implements Runnable { + private final IndexReader reader; + private final String field; + + BloomFilterLoader(IndexReader reader, String field) { + this.reader = reader; + this.field = StringHelper.intern(field); + } + + @SuppressWarnings({"StringEquality"}) + @Override public void run() { + TermDocs termDocs = null; + TermEnum termEnum = null; + try { + BloomFilter filter = BloomFilterFactory.getFilter(reader.numDocs(), 15); + termDocs = reader.termDocs(); + termEnum = reader.terms(new Term(field)); + do { + Term term = termEnum.term(); + if (term == null || term.field() != field) break; + + // LUCENE MONITOR: 4.0, move to use bytes! + UnicodeUtil.UTF8Result utf8Result = Unicode.fromStringAsUtf8(term.text()); + termDocs.seek(termEnum); + while (termDocs.next()) { + // when traversing, make sure to ignore deleted docs, so the key->docId will be correct + if (!reader.isDeleted(termDocs.doc())) { + filter.add(utf8Result.result, 0, utf8Result.length); + } + } + } while (termEnum.next()); + ConcurrentMap fieldCache = cache.get(reader.getFieldCacheKey()); + if (fieldCache != null) { + if (fieldCache.containsKey(field)) { + BloomFilterEntry filterEntry = new BloomFilterEntry(reader.numDocs(), filter); + filterEntry.loading.set(false); + fieldCache.put(field, filterEntry); + } + } + } catch (Exception e) { + logger.warn("failed to load bloom filter for [{}]", e, field); + } finally { + try { + if (termDocs != null) { + termDocs.close(); + } + } catch (IOException e) { + // ignore + } + try { + if (termEnum != null) { + termEnum.close(); + } + } catch (IOException e) { + // ignore + } + } + } + } + + static class BloomFilterEntry { + final int numDocs; + final BloomFilter filter; + final AtomicBoolean loading = new AtomicBoolean(); + + public BloomFilterEntry(int numDocs, BloomFilter filter) { + this.numDocs = numDocs; + this.filter = filter; + } + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java index 2c2b4c1b37f..aa0190e3794 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -23,12 +23,12 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.Term; -import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.Query; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.CloseableComponent; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lucene.search.ExtendedIndexSearcher; import org.elasticsearch.common.lucene.uid.UidField; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.util.concurrent.ThreadSafe; @@ -121,7 +121,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent { IndexReader reader(); - IndexSearcher searcher(); + ExtendedIndexSearcher searcher(); } static class Refresh { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java index 9ac4cc6f91f..64a4526cc2b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/engine/robin/RobinEngine.java @@ -22,13 +22,16 @@ package org.elasticsearch.index.engine.robin; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.Term; -import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.store.AlreadyClosedException; +import org.apache.lucene.util.UnicodeUtil; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.common.Preconditions; +import org.elasticsearch.common.Unicode; +import org.elasticsearch.common.bloom.BloomFilter; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.IndexWriters; import org.elasticsearch.common.lucene.ReaderSearcherHolder; +import org.elasticsearch.common.lucene.search.ExtendedIndexSearcher; import org.elasticsearch.common.lucene.uid.UidField; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; @@ -36,9 +39,11 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.resource.AcquirableResource; import org.elasticsearch.index.analysis.AnalysisService; +import org.elasticsearch.index.cache.bloom.BloomCache; import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.engine.*; +import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.merge.policy.EnableMergePolicy; import org.elasticsearch.index.merge.policy.MergePolicyProvider; import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; @@ -95,6 +100,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, private final SimilarityService similarityService; + private final BloomCache bloomCache; + + private final boolean asyncLoadBloomFilter; + // no need for volatile, its always used under a lock private IndexWriter indexWriter; @@ -107,6 +116,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, private volatile int disableFlushCounter = 0; + private volatile Searcher postFlushSearcher; + private final AtomicBoolean flushing = new AtomicBoolean(); private final ConcurrentMap versionMap; @@ -115,7 +126,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, @Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog, MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler, - AnalysisService analysisService, SimilarityService similarityService) throws EngineException { + AnalysisService analysisService, SimilarityService similarityService, + BloomCache bloomCache) throws EngineException { super(shardId, indexSettings); Preconditions.checkNotNull(store, "Store must be provided to the engine"); Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the engine"); @@ -126,6 +138,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, this.termIndexDivisor = indexSettings.getAsInt("index.term_index_divisor", 1); // IndexReader#DEFAULT_TERMS_INDEX_DIVISOR this.compoundFormat = indexSettings.getAsBoolean("index.compound_format", indexSettings.getAsBoolean("index.merge.policy.use_compound_file", store == null ? false : store.suggestUseCompoundFile())); this.refreshInterval = componentSettings.getAsTime("refresh_interval", indexSettings.getAsTime("index.refresh_interval", timeValueSeconds(1))); + this.asyncLoadBloomFilter = componentSettings.getAsBoolean("async_load_bloom", true); // Here for testing, should always be true this.store = store; this.deletionPolicy = deletionPolicy; @@ -134,6 +147,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, this.mergeScheduler = mergeScheduler; this.analysisService = analysisService; this.similarityService = similarityService; + this.bloomCache = bloomCache; this.versionMap = new ConcurrentHashMap(1000); this.dirtyLocks = new Object[componentSettings.getAsInt("concurrency", 10000)]; @@ -178,6 +192,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, try { translog.newTranslog(newTransactionLogId()); this.nrtResource = buildNrtResource(indexWriter); + if (postFlushSearcher != null) { + postFlushSearcher.release(); + } + postFlushSearcher = searcher(); } catch (IOException e) { try { indexWriter.rollback(); @@ -590,7 +608,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, AcquirableResource current = nrtResource; IndexReader newReader = current.resource().reader().reopen(true); if (newReader != current.resource().reader()) { - IndexSearcher indexSearcher = new IndexSearcher(newReader); + ExtendedIndexSearcher indexSearcher = new ExtendedIndexSearcher(newReader); indexSearcher.setSimilarity(similarityService.defaultSearchSimilarity()); nrtResource = newAcquirableResource(new ReaderSearcherHolder(indexSearcher)); current.markForClose(); @@ -687,6 +705,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, dirty = true; // force a refresh // we need to do a refresh here so we sync versioning support refresh(new Refresh(true)); + if (postFlushSearcher != null) { + postFlushSearcher.release(); + } + // only need to load for this flush version searcher, since we keep a map for all + // the changes since the previous flush in memory + postFlushSearcher = searcher(); } finally { rwl.writeLock().unlock(); flushing.set(false); @@ -838,6 +862,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, rwl.writeLock().lock(); this.versionMap.clear(); try { + if (postFlushSearcher != null) { + postFlushSearcher.release(); + postFlushSearcher = null; + } if (nrtResource != null) { this.nrtResource.forceClose(); } @@ -862,13 +890,22 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, } private long loadCurrentVersionFromIndex(Term uid) { + UnicodeUtil.UTF8Result utf8 = Unicode.fromStringAsUtf8(uid.text()); // no version, get the version from the index - Searcher searcher = searcher(); - try { - return UidField.loadVersion(searcher.reader(), uid); - } finally { - searcher.release(); + Searcher searcher = postFlushSearcher; + for (IndexReader reader : searcher.searcher().subReaders()) { + BloomFilter filter = bloomCache.filter(reader, UidFieldMapper.NAME, asyncLoadBloomFilter); + // we know that its not there... + if (!filter.isPresent(utf8.result, 0, utf8.length)) { + continue; + } + long version = UidField.loadVersion(reader, uid); + // either -2 (its there, but no version associated), or an actual version + if (version != -1) { + return version; + } } + return -1; } private IndexWriter createWriter() throws IOException { @@ -898,7 +935,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, private AcquirableResource buildNrtResource(IndexWriter indexWriter) throws IOException { IndexReader indexReader = indexWriter.getReader(); - IndexSearcher indexSearcher = new IndexSearcher(indexReader); + ExtendedIndexSearcher indexSearcher = new ExtendedIndexSearcher(indexReader); indexSearcher.setSimilarity(similarityService.defaultSearchSimilarity()); return newAcquirableResource(new ReaderSearcherHolder(indexSearcher)); } @@ -923,7 +960,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine, return nrtHolder.resource().reader(); } - @Override public IndexSearcher searcher() { + @Override public ExtendedIndexSearcher searcher() { return nrtHolder.resource().searcher(); } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/common/bloom/BoomFilterTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/common/bloom/BoomFilterTests.java new file mode 100644 index 00000000000..18a50466a3e --- /dev/null +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/common/bloom/BoomFilterTests.java @@ -0,0 +1,49 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.bloom; + +import org.elasticsearch.common.base.Charsets; +import org.testng.annotations.Test; + +import java.nio.ByteBuffer; + +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + * @author kimchy (shay.banon) + */ +@Test +public class BoomFilterTests { + + @Test public void testSimpleOps() { + BloomFilter filter = BloomFilterFactory.getFilter(10, 15); + filter.add(wrap("1")); + assertThat(filter.isPresent(wrap("1")), equalTo(true)); + assertThat(filter.isPresent(wrap("2")), equalTo(false)); + filter.add(wrap("2")); + assertThat(filter.isPresent(wrap("1")), equalTo(true)); + assertThat(filter.isPresent(wrap("2")), equalTo(true)); + } + + private ByteBuffer wrap(String key) { + return ByteBuffer.wrap(key.getBytes(Charsets.UTF_8)); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/robin/SimpleRobinEngineTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/robin/SimpleRobinEngineTests.java index 3682598882e..3d318857cb4 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/robin/SimpleRobinEngineTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/engine/robin/SimpleRobinEngineTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.engine.robin; import org.elasticsearch.index.analysis.AnalysisService; +import org.elasticsearch.index.cache.bloom.none.NonBloomCache; import org.elasticsearch.index.engine.AbstractSimpleEngineTests; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.similarity.SimilarityService; @@ -35,6 +36,6 @@ public class SimpleRobinEngineTests extends AbstractSimpleEngineTests { protected Engine createEngine(Store store, Translog translog) { return new RobinEngine(shardId, EMPTY_SETTINGS, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(), - new AnalysisService(shardId.index()), new SimilarityService(shardId.index())); + new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NonBloomCache(shardId.index())); } } diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/percolator/PercolatorExecutorTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/percolator/PercolatorExecutorTests.java index 24b2b7ccdc5..8b60d88a58e 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/percolator/PercolatorExecutorTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/percolator/PercolatorExecutorTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.index.query.IndexQueryParserModule; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.threadpool.ThreadPoolModule; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; @@ -59,6 +60,7 @@ public class PercolatorExecutorTests { Index index = new Index("test"); Injector injector = new ModulesBuilder().add( new SettingsModule(settings), + new ThreadPoolModule(settings), new ScriptModule(), new MapperServiceModule(), new IndexSettingsModule(settings), diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/query/xcontent/SimpleIndexQueryParserTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/query/xcontent/SimpleIndexQueryParserTests.java index e086057caca..71cb5a00db2 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/query/xcontent/SimpleIndexQueryParserTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/query/xcontent/SimpleIndexQueryParserTests.java @@ -49,6 +49,7 @@ import org.elasticsearch.index.search.geo.GeoPolygonFilter; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.threadpool.ThreadPoolModule; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -77,6 +78,7 @@ public class SimpleIndexQueryParserTests { Index index = new Index("test"); Injector injector = new ModulesBuilder().add( new SettingsModule(settings), + new ThreadPoolModule(settings), new ScriptModule(), new MapperServiceModule(), new IndexSettingsModule(settings), diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/query/xcontent/guice/IndexQueryParserModuleTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/query/xcontent/guice/IndexQueryParserModuleTests.java index c349aad7b83..ce1ea05d158 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/query/xcontent/guice/IndexQueryParserModuleTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/query/xcontent/guice/IndexQueryParserModuleTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.index.query.xcontent.XContentIndexQueryParser; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.threadpool.ThreadPoolModule; import org.testng.annotations.Test; import static org.elasticsearch.common.settings.ImmutableSettings.*; @@ -57,6 +58,7 @@ public class IndexQueryParserModuleTests { Index index = new Index("test"); Injector injector = new ModulesBuilder().add( new SettingsModule(settings), + new ThreadPoolModule(settings), new ScriptModule(), new IndexSettingsModule(settings), new IndexCacheModule(settings), diff --git a/modules/elasticsearch/src/test/java/org/elasticsearch/index/query/xcontent/plugin/IndexQueryParserPluginTests.java b/modules/elasticsearch/src/test/java/org/elasticsearch/index/query/xcontent/plugin/IndexQueryParserPluginTests.java index 03a8a907333..ee306e1955a 100644 --- a/modules/elasticsearch/src/test/java/org/elasticsearch/index/query/xcontent/plugin/IndexQueryParserPluginTests.java +++ b/modules/elasticsearch/src/test/java/org/elasticsearch/index/query/xcontent/plugin/IndexQueryParserPluginTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.index.query.xcontent.XContentIndexQueryParser; import org.elasticsearch.index.settings.IndexSettingsModule; import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.threadpool.ThreadPoolModule; import org.testng.annotations.Test; import static org.hamcrest.MatcherAssert.*; @@ -62,6 +63,7 @@ public class IndexQueryParserPluginTests { Index index = new Index("test"); Injector injector = new ModulesBuilder().add( new SettingsModule(settings), + new ThreadPoolModule(settings), new ScriptModule(), new IndexSettingsModule(settings), new IndexCacheModule(settings), diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/versioning/SimpleVersioningTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/versioning/SimpleVersioningTests.java index dca6e165455..8fd9ee5eb5b 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/versioning/SimpleVersioningTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/versioning/SimpleVersioningTests.java @@ -23,6 +23,8 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.indices.IndexMissingException; import org.elasticsearch.test.integration.AbstractNodesTests; @@ -43,8 +45,10 @@ public class SimpleVersioningTests extends AbstractNodesTests { private Client client2; @BeforeClass public void createNodes() throws Exception { - startNode("server1"); - startNode("server2"); + // make sure we use bloom filters here! + Settings settings = ImmutableSettings.settingsBuilder().put("index.engine.robin.async_load_bloom", false).build(); + startNode("server1", settings); + startNode("server2", settings); client = client("server1"); client2 = client("server2"); } @@ -115,4 +119,70 @@ public class SimpleVersioningTests extends AbstractNodesTests { assertThat(searchResponse.hits().getAt(0).version(), equalTo(2l)); } } + + @Test public void testSimpleVersioningWithFlush() throws Exception { + try { + client.admin().indices().prepareDelete("test").execute().actionGet(); + } catch (IndexMissingException e) { + // its ok + } + client.admin().indices().prepareCreate("test").execute().actionGet(); + client.admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet(); + + IndexResponse indexResponse = client.prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet(); + assertThat(indexResponse.version(), equalTo(1l)); + + client.admin().indices().prepareFlush().execute().actionGet(); + + indexResponse = client.prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet(); + assertThat(indexResponse.version(), equalTo(2l)); + + client.admin().indices().prepareFlush().execute().actionGet(); + + try { + client.prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute().actionGet(); + } catch (ElasticSearchException e) { + assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); + } + + try { + client2.prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute().actionGet(); + } catch (ElasticSearchException e) { + assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); + } + + try { + client.prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute().actionGet(); + } catch (ElasticSearchException e) { + assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); + } + + try { + client2.prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute().actionGet(); + } catch (ElasticSearchException e) { + assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); + } + + try { + client.prepareDelete("test", "type", "1").setVersion(1).execute().actionGet(); + } catch (ElasticSearchException e) { + assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); + } + + try { + client2.prepareDelete("test", "type", "1").setVersion(1).execute().actionGet(); + } catch (ElasticSearchException e) { + assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class)); + } + + client.admin().indices().prepareRefresh().execute().actionGet(); + for (int i = 0; i < 10; i++) { + assertThat(client.prepareGet("test", "type", "1").execute().actionGet().version(), equalTo(2l)); + } + + for (int i = 0; i < 10; i++) { + SearchResponse searchResponse = client.prepareSearch().setQuery(matchAllQuery()).execute().actionGet(); + assertThat(searchResponse.hits().getAt(0).version(), equalTo(2l)); + } + } }