use bloom filter to reduce seeks when needing to load version
This commit is contained in:
parent
3f8933fb7b
commit
332c4cfa53
|
@ -32,6 +32,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
import org.elasticsearch.index.analysis.AnalysisService;
|
import org.elasticsearch.index.analysis.AnalysisService;
|
||||||
|
import org.elasticsearch.index.cache.bloom.none.NonBloomCache;
|
||||||
import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
|
import org.elasticsearch.index.deletionpolicy.KeepOnlyLastDeletionPolicy;
|
||||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
|
@ -307,7 +308,7 @@ public class SimpleEngineBenchmark {
|
||||||
ThreadPool threadPool = new ScalingThreadPool();
|
ThreadPool threadPool = new ScalingThreadPool();
|
||||||
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings));
|
SnapshotDeletionPolicy deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastDeletionPolicy(shardId, settings));
|
||||||
Engine engine = new RobinEngine(shardId, settings, store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false), new LogByteSizeMergePolicyProvider(store),
|
Engine engine = new RobinEngine(shardId, settings, store, deletionPolicy, new FsTranslog(shardId, EMPTY_SETTINGS, new File("work/fs-translog"), false), new LogByteSizeMergePolicyProvider(store),
|
||||||
new ConcurrentMergeSchedulerProvider(shardId, settings), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()));
|
new ConcurrentMergeSchedulerProvider(shardId, settings), new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NonBloomCache(shardId.index()));
|
||||||
engine.start();
|
engine.start();
|
||||||
|
|
||||||
SimpleEngineBenchmark benchmark = new SimpleEngineBenchmark(store, engine)
|
SimpleEngineBenchmark benchmark = new SimpleEngineBenchmark(store, engine)
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.elasticsearch.index.query.IndexQueryParserModule;
|
||||||
import org.elasticsearch.index.settings.IndexSettingsModule;
|
import org.elasticsearch.index.settings.IndexSettingsModule;
|
||||||
import org.elasticsearch.index.similarity.SimilarityModule;
|
import org.elasticsearch.index.similarity.SimilarityModule;
|
||||||
import org.elasticsearch.script.ScriptModule;
|
import org.elasticsearch.script.ScriptModule;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPoolModule;
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
|
@ -60,6 +61,7 @@ public class EmbeddedPercolatorBenchmarkTest {
|
||||||
Index index = new Index("test");
|
Index index = new Index("test");
|
||||||
Injector injector = new ModulesBuilder().add(
|
Injector injector = new ModulesBuilder().add(
|
||||||
new SettingsModule(settings),
|
new SettingsModule(settings),
|
||||||
|
new ThreadPoolModule(settings),
|
||||||
new ScriptModule(),
|
new ScriptModule(),
|
||||||
new MapperServiceModule(),
|
new MapperServiceModule(),
|
||||||
new IndexSettingsModule(settings),
|
new IndexSettingsModule(settings),
|
||||||
|
|
|
@ -0,0 +1,172 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.common.bloom;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The following calculations are taken from:
|
||||||
|
* http://www.cs.wisc.edu/~cao/papers/summary-cache/node8.html
|
||||||
|
* "Bloom Filters - the math"
|
||||||
|
*
|
||||||
|
* This class's static methods are meant to facilitate the use of the Bloom
|
||||||
|
* Filter class by helping to choose correct values of 'bits per element' and
|
||||||
|
* 'number of hash functions, k'.
|
||||||
|
*/
|
||||||
|
class BloomCalculations {
|
||||||
|
|
||||||
|
private static final int minBuckets = 2;
|
||||||
|
private static final int minK = 1;
|
||||||
|
|
||||||
|
private static final int EXCESS = 20;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* In the following table, the row 'i' shows false positive rates if i buckets
|
||||||
|
* per element are used. Column 'j' shows false positive rates if j hash
|
||||||
|
* functions are used. The first row is 'i=0', the first column is 'j=0'.
|
||||||
|
* Each cell (i,j) the false positive rate determined by using i buckets per
|
||||||
|
* element and j hash functions.
|
||||||
|
*/
|
||||||
|
static final double[][] probs = new double[][]{
|
||||||
|
{1.0}, // dummy row representing 0 buckets per element
|
||||||
|
{1.0, 1.0}, // dummy row representing 1 buckets per element
|
||||||
|
{1.0, 0.393, 0.400},
|
||||||
|
{1.0, 0.283, 0.237, 0.253},
|
||||||
|
{1.0, 0.221, 0.155, 0.147, 0.160},
|
||||||
|
{1.0, 0.181, 0.109, 0.092, 0.092, 0.101}, // 5
|
||||||
|
{1.0, 0.154, 0.0804, 0.0609, 0.0561, 0.0578, 0.0638},
|
||||||
|
{1.0, 0.133, 0.0618, 0.0423, 0.0359, 0.0347, 0.0364},
|
||||||
|
{1.0, 0.118, 0.0489, 0.0306, 0.024, 0.0217, 0.0216, 0.0229},
|
||||||
|
{1.0, 0.105, 0.0397, 0.0228, 0.0166, 0.0141, 0.0133, 0.0135, 0.0145},
|
||||||
|
{1.0, 0.0952, 0.0329, 0.0174, 0.0118, 0.00943, 0.00844, 0.00819, 0.00846}, // 10
|
||||||
|
{1.0, 0.0869, 0.0276, 0.0136, 0.00864, 0.0065, 0.00552, 0.00513, 0.00509},
|
||||||
|
{1.0, 0.08, 0.0236, 0.0108, 0.00646, 0.00459, 0.00371, 0.00329, 0.00314},
|
||||||
|
{1.0, 0.074, 0.0203, 0.00875, 0.00492, 0.00332, 0.00255, 0.00217, 0.00199, 0.00194},
|
||||||
|
{1.0, 0.0689, 0.0177, 0.00718, 0.00381, 0.00244, 0.00179, 0.00146, 0.00129, 0.00121, 0.0012},
|
||||||
|
{1.0, 0.0645, 0.0156, 0.00596, 0.003, 0.00183, 0.00128, 0.001, 0.000852, 0.000775, 0.000744}, // 15
|
||||||
|
{1.0, 0.0606, 0.0138, 0.005, 0.00239, 0.00139, 0.000935, 0.000702, 0.000574, 0.000505, 0.00047, 0.000459},
|
||||||
|
{1.0, 0.0571, 0.0123, 0.00423, 0.00193, 0.00107, 0.000692, 0.000499, 0.000394, 0.000335, 0.000302, 0.000287, 0.000284},
|
||||||
|
{1.0, 0.054, 0.0111, 0.00362, 0.00158, 0.000839, 0.000519, 0.00036, 0.000275, 0.000226, 0.000198, 0.000183, 0.000176},
|
||||||
|
{1.0, 0.0513, 0.00998, 0.00312, 0.0013, 0.000663, 0.000394, 0.000264, 0.000194, 0.000155, 0.000132, 0.000118, 0.000111, 0.000109},
|
||||||
|
{1.0, 0.0488, 0.00906, 0.0027, 0.00108, 0.00053, 0.000303, 0.000196, 0.00014, 0.000108, 8.89e-05, 7.77e-05, 7.12e-05, 6.79e-05, 6.71e-05} // 20
|
||||||
|
}; // the first column is a dummy column representing K=0.
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The optimal number of hashes for a given number of bits per element.
|
||||||
|
* These values are automatically calculated from the data above.
|
||||||
|
*/
|
||||||
|
private static final int[] optKPerBuckets = new int[probs.length];
|
||||||
|
|
||||||
|
static {
|
||||||
|
for (int i = 0; i < probs.length; i++) {
|
||||||
|
double min = Double.MAX_VALUE;
|
||||||
|
double[] prob = probs[i];
|
||||||
|
for (int j = 0; j < prob.length; j++) {
|
||||||
|
if (prob[j] < min) {
|
||||||
|
min = prob[j];
|
||||||
|
optKPerBuckets[i] = Math.max(minK, j);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given the number of buckets that can be used per element, return a
|
||||||
|
* specification that minimizes the false positive rate.
|
||||||
|
*
|
||||||
|
* @param bucketsPerElement The number of buckets per element for the filter.
|
||||||
|
* @return A spec that minimizes the false positive rate.
|
||||||
|
*/
|
||||||
|
public static BloomSpecification computeBloomSpec(int bucketsPerElement) {
|
||||||
|
assert bucketsPerElement >= 1;
|
||||||
|
assert bucketsPerElement <= probs.length - 1;
|
||||||
|
return new BloomSpecification(optKPerBuckets[bucketsPerElement], bucketsPerElement);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A wrapper class that holds two key parameters for a Bloom Filter: the
|
||||||
|
* number of hash functions used, and the number of buckets per element used.
|
||||||
|
*/
|
||||||
|
public static class BloomSpecification {
|
||||||
|
final int K; // number of hash functions.
|
||||||
|
final int bucketsPerElement;
|
||||||
|
|
||||||
|
public BloomSpecification(int k, int bucketsPerElement) {
|
||||||
|
K = k;
|
||||||
|
this.bucketsPerElement = bucketsPerElement;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Given a maximum tolerable false positive probability, compute a Bloom
|
||||||
|
* specification which will give less than the specified false positive rate,
|
||||||
|
* but minimize the number of buckets per element and the number of hash
|
||||||
|
* functions used. Because bandwidth (and therefore total bitvector size)
|
||||||
|
* is considered more expensive than computing power, preference is given
|
||||||
|
* to minimizing buckets per element rather than number of hash functions.
|
||||||
|
*
|
||||||
|
* @param maxBucketsPerElement The maximum number of buckets available for the filter.
|
||||||
|
* @param maxFalsePosProb The maximum tolerable false positive rate.
|
||||||
|
* @return A Bloom Specification which would result in a false positive rate
|
||||||
|
* less than specified by the function call
|
||||||
|
* @throws UnsupportedOperationException if a filter satisfying the parameters cannot be met
|
||||||
|
*/
|
||||||
|
public static BloomSpecification computeBloomSpec(int maxBucketsPerElement, double maxFalsePosProb) {
|
||||||
|
assert maxBucketsPerElement >= 1;
|
||||||
|
assert maxBucketsPerElement <= probs.length - 1;
|
||||||
|
int maxK = probs[maxBucketsPerElement].length - 1;
|
||||||
|
|
||||||
|
// Handle the trivial cases
|
||||||
|
if (maxFalsePosProb >= probs[minBuckets][minK]) {
|
||||||
|
return new BloomSpecification(2, optKPerBuckets[2]);
|
||||||
|
}
|
||||||
|
if (maxFalsePosProb < probs[maxBucketsPerElement][maxK]) {
|
||||||
|
throw new UnsupportedOperationException(String.format("Unable to satisfy %s with %s buckets per element",
|
||||||
|
maxFalsePosProb, maxBucketsPerElement));
|
||||||
|
}
|
||||||
|
|
||||||
|
// First find the minimal required number of buckets:
|
||||||
|
int bucketsPerElement = 2;
|
||||||
|
int K = optKPerBuckets[2];
|
||||||
|
while (probs[bucketsPerElement][K] > maxFalsePosProb) {
|
||||||
|
bucketsPerElement++;
|
||||||
|
K = optKPerBuckets[bucketsPerElement];
|
||||||
|
}
|
||||||
|
// Now that the number of buckets is sufficient, see if we can relax K
|
||||||
|
// without losing too much precision.
|
||||||
|
while (probs[bucketsPerElement][K - 1] <= maxFalsePosProb) {
|
||||||
|
K--;
|
||||||
|
}
|
||||||
|
|
||||||
|
return new BloomSpecification(K, bucketsPerElement);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Calculates the maximum number of buckets per element that this implementation
|
||||||
|
* can support. Crucially, it will lower the bucket count if necessary to meet
|
||||||
|
* BitSet's size restrictions.
|
||||||
|
*/
|
||||||
|
public static int maxBucketsPerElement(long numElements) {
|
||||||
|
numElements = Math.max(1, numElements);
|
||||||
|
double v = (Long.MAX_VALUE - EXCESS) / (double) numElements;
|
||||||
|
if (v < 1.0) {
|
||||||
|
throw new UnsupportedOperationException("Cannot compute probabilities for " + numElements + " elements.");
|
||||||
|
}
|
||||||
|
return Math.min(BloomCalculations.probs.length - 1, (int) v);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,78 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.common.bloom;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public interface BloomFilter {
|
||||||
|
|
||||||
|
public static final BloomFilter NONE = new BloomFilter() {
|
||||||
|
@Override public void add(byte[] key, int offset, int length) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void add(ByteBuffer key) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean isPresent(byte[] key, int offset, int length) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean isPresent(ByteBuffer key) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long sizeInBytes() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
public static final BloomFilter EMPTY = new BloomFilter() {
|
||||||
|
@Override public void add(byte[] key, int offset, int length) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void add(ByteBuffer key) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean isPresent(byte[] key, int offset, int length) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean isPresent(ByteBuffer key) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long sizeInBytes() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
void add(byte[] key, int offset, int length);
|
||||||
|
|
||||||
|
void add(ByteBuffer key);
|
||||||
|
|
||||||
|
boolean isPresent(byte[] key, int offset, int length);
|
||||||
|
|
||||||
|
boolean isPresent(ByteBuffer key);
|
||||||
|
|
||||||
|
long sizeInBytes();
|
||||||
|
}
|
|
@ -0,0 +1,100 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.common.bloom;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.OpenBitSet;
|
||||||
|
import org.elasticsearch.common.UUID;
|
||||||
|
import org.elasticsearch.common.logging.ESLogger;
|
||||||
|
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||||
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
|
import org.elasticsearch.common.unit.SizeValue;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
|
||||||
|
import java.io.UnsupportedEncodingException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class BloomFilterFactory {
|
||||||
|
|
||||||
|
private static ESLogger logger = ESLoggerFactory.getLogger(BloomFilterFactory.class.getName());
|
||||||
|
|
||||||
|
private static final int EXCESS = 20;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return A BloomFilter with the lowest practical false positive probability
|
||||||
|
* for the given number of elements.
|
||||||
|
*/
|
||||||
|
public static BloomFilter getFilter(long numElements, int targetBucketsPerElem) {
|
||||||
|
int maxBucketsPerElement = Math.max(1, BloomCalculations.maxBucketsPerElement(numElements));
|
||||||
|
int bucketsPerElement = Math.min(targetBucketsPerElem, maxBucketsPerElement);
|
||||||
|
if (bucketsPerElement < targetBucketsPerElem) {
|
||||||
|
logger.warn(String.format("Cannot provide an optimal BloomFilter for %d elements (%d/%d buckets per element).",
|
||||||
|
numElements, bucketsPerElement, targetBucketsPerElem));
|
||||||
|
}
|
||||||
|
BloomCalculations.BloomSpecification spec = BloomCalculations.computeBloomSpec(bucketsPerElement);
|
||||||
|
return new ObsBloomFilter(spec.K, bucketsFor(numElements, spec.bucketsPerElement));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The smallest BloomFilter that can provide the given false positive
|
||||||
|
* probability rate for the given number of elements.
|
||||||
|
*
|
||||||
|
* Asserts that the given probability can be satisfied using this filter.
|
||||||
|
*/
|
||||||
|
public static BloomFilter getFilter(long numElements, double maxFalsePosProbability) {
|
||||||
|
assert maxFalsePosProbability <= 1.0 : "Invalid probability";
|
||||||
|
int bucketsPerElement = BloomCalculations.maxBucketsPerElement(numElements);
|
||||||
|
BloomCalculations.BloomSpecification spec = BloomCalculations.computeBloomSpec(bucketsPerElement, maxFalsePosProbability);
|
||||||
|
return new ObsBloomFilter(spec.K, bucketsFor(numElements, spec.bucketsPerElement));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static OpenBitSet bucketsFor(long numElements, int bucketsPer) {
|
||||||
|
long numBits = numElements * bucketsPer + EXCESS; //TODO overflow?
|
||||||
|
return new OpenBitSet((long) Math.min(Long.MAX_VALUE, numBits));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws UnsupportedEncodingException {
|
||||||
|
long elements = SizeValue.parseSizeValue("100m").singles();
|
||||||
|
BloomFilter filter = BloomFilterFactory.getFilter(elements, 15);
|
||||||
|
System.out.println("Filter size: " + new ByteSizeValue(filter.sizeInBytes()));
|
||||||
|
for (long i = 0; i < elements; i++) {
|
||||||
|
byte[] utf8s = UUID.randomBase64UUID().getBytes("UTF8");
|
||||||
|
filter.add(utf8s, 0, utf8s.length);
|
||||||
|
}
|
||||||
|
long falsePositives = 0;
|
||||||
|
for (long i = 0; i < elements; i++) {
|
||||||
|
byte[] utf8s = UUID.randomBase64UUID().getBytes("UTF8");
|
||||||
|
if (filter.isPresent(utf8s, 0, utf8s.length)) {
|
||||||
|
falsePositives++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
System.out.println("false positives: " + falsePositives);
|
||||||
|
|
||||||
|
byte[] utf8s = UUID.randomBase64UUID().getBytes("UTF8");
|
||||||
|
long time = System.currentTimeMillis();
|
||||||
|
for (long i = 0; i < elements; i++) {
|
||||||
|
if (filter.isPresent(utf8s, 0, utf8s.length)) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
long timeSize = System.currentTimeMillis() - time;
|
||||||
|
System.out.println("Indexed in " + new TimeValue(timeSize) + ", TPS: " + (elements / timeSize) + " per millisecond");
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,188 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.common.bloom;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a very fast, non-cryptographic hash suitable for general hash-based
|
||||||
|
* lookup. See http://murmurhash.googlepages.com/ for more details.
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* The C version of MurmurHash 2.0 found at that site was ported to Java by
|
||||||
|
* Andrzej Bialecki (ab at getopt org).
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
public class MurmurHash {
|
||||||
|
public static int hash32(ByteBuffer data, int offset, int length, int seed) {
|
||||||
|
int m = 0x5bd1e995;
|
||||||
|
int r = 24;
|
||||||
|
|
||||||
|
int h = seed ^ length;
|
||||||
|
|
||||||
|
int len_4 = length >> 2;
|
||||||
|
|
||||||
|
for (int i = 0; i < len_4; i++) {
|
||||||
|
int i_4 = i << 2;
|
||||||
|
int k = data.get(offset + i_4 + 3);
|
||||||
|
k = k << 8;
|
||||||
|
k = k | (data.get(offset + i_4 + 2) & 0xff);
|
||||||
|
k = k << 8;
|
||||||
|
k = k | (data.get(offset + i_4 + 1) & 0xff);
|
||||||
|
k = k << 8;
|
||||||
|
k = k | (data.get(offset + i_4 + 0) & 0xff);
|
||||||
|
k *= m;
|
||||||
|
k ^= k >>> r;
|
||||||
|
k *= m;
|
||||||
|
h *= m;
|
||||||
|
h ^= k;
|
||||||
|
}
|
||||||
|
|
||||||
|
// avoid calculating modulo
|
||||||
|
int len_m = len_4 << 2;
|
||||||
|
int left = length - len_m;
|
||||||
|
|
||||||
|
if (left != 0) {
|
||||||
|
if (left >= 3) {
|
||||||
|
h ^= (int) data.get(offset + length - 3) << 16;
|
||||||
|
}
|
||||||
|
if (left >= 2) {
|
||||||
|
h ^= (int) data.get(offset + length - 2) << 8;
|
||||||
|
}
|
||||||
|
if (left >= 1) {
|
||||||
|
h ^= (int) data.get(offset + length - 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
h *= m;
|
||||||
|
}
|
||||||
|
|
||||||
|
h ^= h >>> 13;
|
||||||
|
h *= m;
|
||||||
|
h ^= h >>> 15;
|
||||||
|
|
||||||
|
return h;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long hash64(ByteBuffer key, int offset, int length, long seed) {
|
||||||
|
long m64 = 0xc6a4a7935bd1e995L;
|
||||||
|
int r64 = 47;
|
||||||
|
|
||||||
|
long h64 = (seed & 0xffffffffL) ^ (m64 * length);
|
||||||
|
|
||||||
|
int lenLongs = length >> 3;
|
||||||
|
|
||||||
|
for (int i = 0; i < lenLongs; ++i) {
|
||||||
|
int i_8 = i << 3;
|
||||||
|
|
||||||
|
long k64 = ((long) key.get(offset + i_8 + 0) & 0xff) + (((long) key.get(offset + i_8 + 1) & 0xff) << 8) +
|
||||||
|
(((long) key.get(offset + i_8 + 2) & 0xff) << 16) + (((long) key.get(offset + i_8 + 3) & 0xff) << 24) +
|
||||||
|
(((long) key.get(offset + i_8 + 4) & 0xff) << 32) + (((long) key.get(offset + i_8 + 5) & 0xff) << 40) +
|
||||||
|
(((long) key.get(offset + i_8 + 6) & 0xff) << 48) + (((long) key.get(offset + i_8 + 7) & 0xff) << 56);
|
||||||
|
|
||||||
|
k64 *= m64;
|
||||||
|
k64 ^= k64 >>> r64;
|
||||||
|
k64 *= m64;
|
||||||
|
|
||||||
|
h64 ^= k64;
|
||||||
|
h64 *= m64;
|
||||||
|
}
|
||||||
|
|
||||||
|
int rem = length & 0x7;
|
||||||
|
|
||||||
|
switch (rem) {
|
||||||
|
case 0:
|
||||||
|
break;
|
||||||
|
case 7:
|
||||||
|
h64 ^= (long) key.get(offset + length - rem + 6) << 48;
|
||||||
|
case 6:
|
||||||
|
h64 ^= (long) key.get(offset + length - rem + 5) << 40;
|
||||||
|
case 5:
|
||||||
|
h64 ^= (long) key.get(offset + length - rem + 4) << 32;
|
||||||
|
case 4:
|
||||||
|
h64 ^= (long) key.get(offset + length - rem + 3) << 24;
|
||||||
|
case 3:
|
||||||
|
h64 ^= (long) key.get(offset + length - rem + 2) << 16;
|
||||||
|
case 2:
|
||||||
|
h64 ^= (long) key.get(offset + length - rem + 1) << 8;
|
||||||
|
case 1:
|
||||||
|
h64 ^= (long) key.get(offset + length - rem);
|
||||||
|
h64 *= m64;
|
||||||
|
}
|
||||||
|
|
||||||
|
h64 ^= h64 >>> r64;
|
||||||
|
h64 *= m64;
|
||||||
|
h64 ^= h64 >>> r64;
|
||||||
|
|
||||||
|
return h64;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long hash64(byte[] key, int offset, int length, long seed) {
|
||||||
|
long m64 = 0xc6a4a7935bd1e995L;
|
||||||
|
int r64 = 47;
|
||||||
|
|
||||||
|
long h64 = (seed & 0xffffffffL) ^ (m64 * length);
|
||||||
|
|
||||||
|
int lenLongs = length >> 3;
|
||||||
|
|
||||||
|
for (int i = 0; i < lenLongs; ++i) {
|
||||||
|
int i_8 = i << 3;
|
||||||
|
|
||||||
|
long k64 = ((long) key[offset + i_8 + 0] & 0xff) + (((long) key[offset + i_8 + 1] & 0xff) << 8) +
|
||||||
|
(((long) key[offset + i_8 + 2] & 0xff) << 16) + (((long) key[offset + i_8 + 3] & 0xff) << 24) +
|
||||||
|
(((long) key[offset + i_8 + 4] & 0xff) << 32) + (((long) key[offset + i_8 + 5] & 0xff) << 40) +
|
||||||
|
(((long) key[offset + i_8 + 6] & 0xff) << 48) + (((long) key[offset + i_8 + 7] & 0xff) << 56);
|
||||||
|
|
||||||
|
k64 *= m64;
|
||||||
|
k64 ^= k64 >>> r64;
|
||||||
|
k64 *= m64;
|
||||||
|
|
||||||
|
h64 ^= k64;
|
||||||
|
h64 *= m64;
|
||||||
|
}
|
||||||
|
|
||||||
|
int rem = length & 0x7;
|
||||||
|
|
||||||
|
switch (rem) {
|
||||||
|
case 0:
|
||||||
|
break;
|
||||||
|
case 7:
|
||||||
|
h64 ^= (long) key[offset + length - rem + 6] << 48;
|
||||||
|
case 6:
|
||||||
|
h64 ^= (long) key[offset + length - rem + 5] << 40;
|
||||||
|
case 5:
|
||||||
|
h64 ^= (long) key[offset + length - rem + 4] << 32;
|
||||||
|
case 4:
|
||||||
|
h64 ^= (long) key[offset + length - rem + 3] << 24;
|
||||||
|
case 3:
|
||||||
|
h64 ^= (long) key[offset + length - rem + 2] << 16;
|
||||||
|
case 2:
|
||||||
|
h64 ^= (long) key[offset + length - rem + 1] << 8;
|
||||||
|
case 1:
|
||||||
|
h64 ^= (long) key[offset + length - rem];
|
||||||
|
h64 *= m64;
|
||||||
|
}
|
||||||
|
|
||||||
|
h64 ^= h64 >>> r64;
|
||||||
|
h64 *= m64;
|
||||||
|
h64 ^= h64 >>> r64;
|
||||||
|
|
||||||
|
return h64;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,127 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.common.bloom;
|
||||||
|
|
||||||
|
import org.apache.lucene.util.OpenBitSet;
|
||||||
|
import org.elasticsearch.common.RamUsage;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
public class ObsBloomFilter implements BloomFilter {
|
||||||
|
|
||||||
|
private final int hashCount;
|
||||||
|
|
||||||
|
private final OpenBitSet bitset;
|
||||||
|
|
||||||
|
ObsBloomFilter(int hashCount, OpenBitSet bs) {
|
||||||
|
this.hashCount = hashCount;
|
||||||
|
this.bitset = bs;
|
||||||
|
}
|
||||||
|
|
||||||
|
long emptyBuckets() {
|
||||||
|
long n = 0;
|
||||||
|
for (long i = 0; i < buckets(); i++) {
|
||||||
|
if (!bitset.get(i)) {
|
||||||
|
n++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
private long buckets() {
|
||||||
|
return bitset.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
private long[] getHashBuckets(ByteBuffer key) {
|
||||||
|
return getHashBuckets(key, hashCount, buckets());
|
||||||
|
}
|
||||||
|
|
||||||
|
private long[] getHashBuckets(byte[] key, int offset, int length) {
|
||||||
|
return getHashBuckets(key, offset, length, hashCount, buckets());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Murmur is faster than an SHA-based approach and provides as-good collision
|
||||||
|
// resistance. The combinatorial generation approach described in
|
||||||
|
// http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/esa06.pdf
|
||||||
|
// does prove to work in actual tests, and is obviously faster
|
||||||
|
// than performing further iterations of murmur.
|
||||||
|
static long[] getHashBuckets(ByteBuffer b, int hashCount, long max) {
|
||||||
|
long[] result = new long[hashCount];
|
||||||
|
long hash1 = MurmurHash.hash64(b, b.position(), b.remaining(), 0L);
|
||||||
|
long hash2 = MurmurHash.hash64(b, b.position(), b.remaining(), hash1);
|
||||||
|
for (int i = 0; i < hashCount; ++i) {
|
||||||
|
result[i] = Math.abs((hash1 + (long) i * hash2) % max);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Murmur is faster than an SHA-based approach and provides as-good collision
|
||||||
|
// resistance. The combinatorial generation approach described in
|
||||||
|
// http://www.eecs.harvard.edu/~kirsch/pubs/bbbf/esa06.pdf
|
||||||
|
// does prove to work in actual tests, and is obviously faster
|
||||||
|
// than performing further iterations of murmur.
|
||||||
|
static long[] getHashBuckets(byte[] b, int offset, int length, int hashCount, long max) {
|
||||||
|
long[] result = new long[hashCount];
|
||||||
|
long hash1 = MurmurHash.hash64(b, offset, length, 0L);
|
||||||
|
long hash2 = MurmurHash.hash64(b, offset, length, hash1);
|
||||||
|
for (int i = 0; i < hashCount; ++i) {
|
||||||
|
result[i] = Math.abs((hash1 + (long) i * hash2) % max);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void add(byte[] key, int offset, int length) {
|
||||||
|
for (long bucketIndex : getHashBuckets(key, offset, length)) {
|
||||||
|
bitset.fastSet(bucketIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(ByteBuffer key) {
|
||||||
|
for (long bucketIndex : getHashBuckets(key)) {
|
||||||
|
bitset.fastSet(bucketIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public boolean isPresent(byte[] key, int offset, int length) {
|
||||||
|
for (long bucketIndex : getHashBuckets(key, offset, length)) {
|
||||||
|
if (!bitset.fastGet(bucketIndex)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isPresent(ByteBuffer key) {
|
||||||
|
for (long bucketIndex : getHashBuckets(key)) {
|
||||||
|
if (!bitset.fastGet(bucketIndex)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void clear() {
|
||||||
|
bitset.clear(0, bitset.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long sizeInBytes() {
|
||||||
|
return bitset.getBits().length * RamUsage.NUM_BYTES_LONG + RamUsage.NUM_BYTES_ARRAY_HEADER + RamUsage.NUM_BYTES_INT /* wlen */;
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,9 +20,9 @@
|
||||||
package org.elasticsearch.common.lucene;
|
package org.elasticsearch.common.lucene;
|
||||||
|
|
||||||
import org.apache.lucene.index.IndexReader;
|
import org.apache.lucene.index.IndexReader;
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
|
||||||
import org.elasticsearch.ElasticSearchException;
|
import org.elasticsearch.ElasticSearchException;
|
||||||
import org.elasticsearch.common.lease.Releasable;
|
import org.elasticsearch.common.lease.Releasable;
|
||||||
|
import org.elasticsearch.common.lucene.search.ExtendedIndexSearcher;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A very simple holder for a tuple of reader and searcher.
|
* A very simple holder for a tuple of reader and searcher.
|
||||||
|
@ -31,13 +31,13 @@ import org.elasticsearch.common.lease.Releasable;
|
||||||
*/
|
*/
|
||||||
public class ReaderSearcherHolder implements Releasable {
|
public class ReaderSearcherHolder implements Releasable {
|
||||||
|
|
||||||
private final IndexSearcher indexSearcher;
|
private final ExtendedIndexSearcher indexSearcher;
|
||||||
|
|
||||||
public ReaderSearcherHolder(IndexReader indexReader) {
|
public ReaderSearcherHolder(IndexReader indexReader) {
|
||||||
this(new IndexSearcher(indexReader));
|
this(new ExtendedIndexSearcher(indexReader));
|
||||||
}
|
}
|
||||||
|
|
||||||
public ReaderSearcherHolder(IndexSearcher indexSearcher) {
|
public ReaderSearcherHolder(ExtendedIndexSearcher indexSearcher) {
|
||||||
this.indexSearcher = indexSearcher;
|
this.indexSearcher = indexSearcher;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ public class ReaderSearcherHolder implements Releasable {
|
||||||
return indexSearcher.getIndexReader();
|
return indexSearcher.getIndexReader();
|
||||||
}
|
}
|
||||||
|
|
||||||
public IndexSearcher searcher() {
|
public ExtendedIndexSearcher searcher() {
|
||||||
return indexSearcher;
|
return indexSearcher;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.AbstractIndexComponent;
|
import org.elasticsearch.index.AbstractIndexComponent;
|
||||||
import org.elasticsearch.index.Index;
|
import org.elasticsearch.index.Index;
|
||||||
|
import org.elasticsearch.index.cache.bloom.BloomCache;
|
||||||
import org.elasticsearch.index.cache.field.data.FieldDataCache;
|
import org.elasticsearch.index.cache.field.data.FieldDataCache;
|
||||||
import org.elasticsearch.index.cache.filter.FilterCache;
|
import org.elasticsearch.index.cache.filter.FilterCache;
|
||||||
import org.elasticsearch.index.cache.id.IdCache;
|
import org.elasticsearch.index.cache.id.IdCache;
|
||||||
|
@ -49,15 +50,18 @@ public class IndexCache extends AbstractIndexComponent implements CloseableCompo
|
||||||
|
|
||||||
private final IdCache idCache;
|
private final IdCache idCache;
|
||||||
|
|
||||||
|
private final BloomCache bloomCache;
|
||||||
|
|
||||||
private ClusterService clusterService;
|
private ClusterService clusterService;
|
||||||
|
|
||||||
@Inject public IndexCache(Index index, @IndexSettings Settings indexSettings, FilterCache filterCache, FieldDataCache fieldDataCache,
|
@Inject public IndexCache(Index index, @IndexSettings Settings indexSettings, FilterCache filterCache, FieldDataCache fieldDataCache,
|
||||||
QueryParserCache queryParserCache, IdCache idCache) {
|
QueryParserCache queryParserCache, IdCache idCache, BloomCache bloomCache) {
|
||||||
super(index, indexSettings);
|
super(index, indexSettings);
|
||||||
this.filterCache = filterCache;
|
this.filterCache = filterCache;
|
||||||
this.fieldDataCache = fieldDataCache;
|
this.fieldDataCache = fieldDataCache;
|
||||||
this.queryParserCache = queryParserCache;
|
this.queryParserCache = queryParserCache;
|
||||||
this.idCache = idCache;
|
this.idCache = idCache;
|
||||||
|
this.bloomCache = bloomCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Inject(optional = true)
|
@Inject(optional = true)
|
||||||
|
@ -89,6 +93,7 @@ public class IndexCache extends AbstractIndexComponent implements CloseableCompo
|
||||||
fieldDataCache.close();
|
fieldDataCache.close();
|
||||||
idCache.close();
|
idCache.close();
|
||||||
queryParserCache.close();
|
queryParserCache.close();
|
||||||
|
bloomCache.close();
|
||||||
if (clusterService != null) {
|
if (clusterService != null) {
|
||||||
clusterService.remove(this);
|
clusterService.remove(this);
|
||||||
}
|
}
|
||||||
|
@ -98,6 +103,7 @@ public class IndexCache extends AbstractIndexComponent implements CloseableCompo
|
||||||
filterCache.clear(reader);
|
filterCache.clear(reader);
|
||||||
fieldDataCache.clear(reader);
|
fieldDataCache.clear(reader);
|
||||||
idCache.clear(reader);
|
idCache.clear(reader);
|
||||||
|
bloomCache.clear(reader);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clear() {
|
public void clear() {
|
||||||
|
@ -105,12 +111,14 @@ public class IndexCache extends AbstractIndexComponent implements CloseableCompo
|
||||||
fieldDataCache.clear();
|
fieldDataCache.clear();
|
||||||
idCache.clear();
|
idCache.clear();
|
||||||
queryParserCache.clear();
|
queryParserCache.clear();
|
||||||
|
bloomCache.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void clearUnreferenced() {
|
public void clearUnreferenced() {
|
||||||
filterCache.clearUnreferenced();
|
filterCache.clearUnreferenced();
|
||||||
fieldDataCache.clearUnreferenced();
|
fieldDataCache.clearUnreferenced();
|
||||||
idCache.clearUnreferenced();
|
idCache.clearUnreferenced();
|
||||||
|
bloomCache.clearUnreferenced();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public void clusterChanged(ClusterChangedEvent event) {
|
@Override public void clusterChanged(ClusterChangedEvent event) {
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.index.cache;
|
||||||
|
|
||||||
import org.elasticsearch.common.inject.AbstractModule;
|
import org.elasticsearch.common.inject.AbstractModule;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.index.cache.bloom.BloomCacheModule;
|
||||||
import org.elasticsearch.index.cache.field.data.FieldDataCacheModule;
|
import org.elasticsearch.index.cache.field.data.FieldDataCacheModule;
|
||||||
import org.elasticsearch.index.cache.filter.FilterCacheModule;
|
import org.elasticsearch.index.cache.filter.FilterCacheModule;
|
||||||
import org.elasticsearch.index.cache.id.IdCacheModule;
|
import org.elasticsearch.index.cache.id.IdCacheModule;
|
||||||
|
@ -42,6 +43,7 @@ public class IndexCacheModule extends AbstractModule {
|
||||||
new FieldDataCacheModule(settings).configure(binder());
|
new FieldDataCacheModule(settings).configure(binder());
|
||||||
new IdCacheModule(settings).configure(binder());
|
new IdCacheModule(settings).configure(binder());
|
||||||
new QueryParserCacheModule(settings).configure(binder());
|
new QueryParserCacheModule(settings).configure(binder());
|
||||||
|
new BloomCacheModule(settings).configure(binder());
|
||||||
|
|
||||||
bind(IndexCache.class).asEagerSingleton();
|
bind(IndexCache.class).asEagerSingleton();
|
||||||
}
|
}
|
||||||
|
|
46
modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/BloomCache.java
vendored
Normal file
46
modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/BloomCache.java
vendored
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.index.cache.bloom;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.IndexReader;
|
||||||
|
import org.elasticsearch.common.bloom.BloomFilter;
|
||||||
|
import org.elasticsearch.common.component.CloseableComponent;
|
||||||
|
import org.elasticsearch.index.IndexComponent;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author kimchy (shay.banon)
|
||||||
|
*/
|
||||||
|
public interface BloomCache extends IndexComponent, CloseableComponent {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* *Async* loads a bloom filter for the field name.
|
||||||
|
*/
|
||||||
|
BloomFilter filter(IndexReader reader, String fieldName, boolean asyncLoad);
|
||||||
|
|
||||||
|
void clear();
|
||||||
|
|
||||||
|
void clear(IndexReader reader);
|
||||||
|
|
||||||
|
void clearUnreferenced();
|
||||||
|
|
||||||
|
long sizeInBytes();
|
||||||
|
|
||||||
|
long sizeInBytes(String fieldName);
|
||||||
|
}
|
47
modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/BloomCacheModule.java
vendored
Normal file
47
modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/BloomCacheModule.java
vendored
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.index.cache.bloom;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.inject.AbstractModule;
|
||||||
|
import org.elasticsearch.common.inject.Scopes;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.index.cache.bloom.simple.SimpleBloomCache;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author kimchy (shay.banon)
|
||||||
|
*/
|
||||||
|
public class BloomCacheModule extends AbstractModule {
|
||||||
|
|
||||||
|
public static final class BloomCacheSettings {
|
||||||
|
public static final String TYPE = "index.cache.bloom.type";
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Settings settings;
|
||||||
|
|
||||||
|
public BloomCacheModule(Settings settings) {
|
||||||
|
this.settings = settings;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override protected void configure() {
|
||||||
|
bind(BloomCache.class)
|
||||||
|
.to(settings.getAsClass(BloomCacheSettings.TYPE, SimpleBloomCache.class, "org.elasticsearch.index.cache.bloom.", "BloomCache"))
|
||||||
|
.in(Scopes.SINGLETON);
|
||||||
|
}
|
||||||
|
}
|
69
modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/none/NonBloomCache.java
vendored
Normal file
69
modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/none/NonBloomCache.java
vendored
Normal file
|
@ -0,0 +1,69 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.index.cache.bloom.none;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.IndexReader;
|
||||||
|
import org.elasticsearch.ElasticSearchException;
|
||||||
|
import org.elasticsearch.common.bloom.BloomFilter;
|
||||||
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.index.AbstractIndexComponent;
|
||||||
|
import org.elasticsearch.index.Index;
|
||||||
|
import org.elasticsearch.index.cache.bloom.BloomCache;
|
||||||
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author kimchy (shay.banon)
|
||||||
|
*/
|
||||||
|
public class NonBloomCache extends AbstractIndexComponent implements BloomCache {
|
||||||
|
|
||||||
|
public NonBloomCache(Index index) {
|
||||||
|
super(index, ImmutableSettings.Builder.EMPTY_SETTINGS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Inject public NonBloomCache(Index index, @IndexSettings Settings indexSettings) {
|
||||||
|
super(index, indexSettings);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public BloomFilter filter(IndexReader reader, String fieldName, boolean asyncLoad) {
|
||||||
|
return BloomFilter.NONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void clear() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void clear(IndexReader reader) {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void clearUnreferenced() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long sizeInBytes() {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long sizeInBytes(String fieldName) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void close() throws ElasticSearchException {
|
||||||
|
}
|
||||||
|
}
|
227
modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/simple/SimpleBloomCache.java
vendored
Normal file
227
modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/bloom/simple/SimpleBloomCache.java
vendored
Normal file
|
@ -0,0 +1,227 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.index.cache.bloom.simple;
|
||||||
|
|
||||||
|
import org.apache.lucene.index.IndexReader;
|
||||||
|
import org.apache.lucene.index.Term;
|
||||||
|
import org.apache.lucene.index.TermDocs;
|
||||||
|
import org.apache.lucene.index.TermEnum;
|
||||||
|
import org.apache.lucene.util.StringHelper;
|
||||||
|
import org.apache.lucene.util.UnicodeUtil;
|
||||||
|
import org.elasticsearch.ElasticSearchException;
|
||||||
|
import org.elasticsearch.common.Unicode;
|
||||||
|
import org.elasticsearch.common.bloom.BloomFilter;
|
||||||
|
import org.elasticsearch.common.bloom.BloomFilterFactory;
|
||||||
|
import org.elasticsearch.common.collect.MapMaker;
|
||||||
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
|
import org.elasticsearch.index.AbstractIndexComponent;
|
||||||
|
import org.elasticsearch.index.Index;
|
||||||
|
import org.elasticsearch.index.cache.bloom.BloomCache;
|
||||||
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author kimchy (shay.banon)
|
||||||
|
*/
|
||||||
|
public class SimpleBloomCache extends AbstractIndexComponent implements BloomCache {
|
||||||
|
|
||||||
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
|
private final ConcurrentMap<Object, ConcurrentMap<String, BloomFilterEntry>> cache;
|
||||||
|
|
||||||
|
private final Object creationMutex = new Object();
|
||||||
|
|
||||||
|
@Inject public SimpleBloomCache(Index index, @IndexSettings Settings indexSettings, ThreadPool threadPool) {
|
||||||
|
super(index, indexSettings);
|
||||||
|
this.threadPool = threadPool;
|
||||||
|
// weak keys is fine, it will only be cleared once IndexReader references will be removed
|
||||||
|
// (assuming clear(...) will not be called)
|
||||||
|
this.cache = new MapMaker().weakKeys().makeMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void close() throws ElasticSearchException {
|
||||||
|
clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void clear() {
|
||||||
|
cache.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void clear(IndexReader reader) {
|
||||||
|
ConcurrentMap<String, BloomFilterEntry> map = cache.remove(reader.getFieldCacheKey());
|
||||||
|
// help soft/weak handling GC
|
||||||
|
if (map != null) {
|
||||||
|
map.clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public void clearUnreferenced() {
|
||||||
|
// nothing to do here...
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long sizeInBytes() {
|
||||||
|
// the overhead of the map is not really relevant...
|
||||||
|
long sizeInBytes = 0;
|
||||||
|
for (ConcurrentMap<String, BloomFilterEntry> map : cache.values()) {
|
||||||
|
for (BloomFilterEntry filter : map.values()) {
|
||||||
|
sizeInBytes += filter.filter.sizeInBytes();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sizeInBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public long sizeInBytes(String fieldName) {
|
||||||
|
long sizeInBytes = 0;
|
||||||
|
for (ConcurrentMap<String, BloomFilterEntry> map : cache.values()) {
|
||||||
|
BloomFilterEntry filter = map.get(fieldName);
|
||||||
|
if (filter != null) {
|
||||||
|
sizeInBytes += filter.filter.sizeInBytes();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sizeInBytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public BloomFilter filter(IndexReader reader, String fieldName, boolean asyncLoad) {
|
||||||
|
int currentNumDocs = reader.numDocs();
|
||||||
|
if (currentNumDocs == 0) {
|
||||||
|
return BloomFilter.EMPTY;
|
||||||
|
}
|
||||||
|
ConcurrentMap<String, BloomFilterEntry> fieldCache = cache.get(reader.getFieldCacheKey());
|
||||||
|
if (fieldCache == null) {
|
||||||
|
synchronized (creationMutex) {
|
||||||
|
fieldCache = cache.get(reader.getFieldCacheKey());
|
||||||
|
if (fieldCache == null) {
|
||||||
|
fieldCache = ConcurrentCollections.newConcurrentMap();
|
||||||
|
cache.put(reader.getFieldCacheKey(), fieldCache);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
BloomFilterEntry filter = fieldCache.get(fieldName);
|
||||||
|
if (filter == null) {
|
||||||
|
synchronized (fieldCache) {
|
||||||
|
filter = fieldCache.get(fieldName);
|
||||||
|
if (filter == null) {
|
||||||
|
filter = new BloomFilterEntry(reader.numDocs(), BloomFilter.NONE);
|
||||||
|
filter.loading.set(true);
|
||||||
|
fieldCache.put(fieldName, filter);
|
||||||
|
// now, do the async load of it...
|
||||||
|
BloomFilterLoader loader = new BloomFilterLoader(reader, fieldName);
|
||||||
|
if (asyncLoad) {
|
||||||
|
threadPool.cached().execute(loader);
|
||||||
|
} else {
|
||||||
|
loader.run();
|
||||||
|
filter = fieldCache.get(fieldName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// if we too many deletes, we need to reload the bloom filter so it will be more effective
|
||||||
|
if (filter.numDocs > 1000 && (currentNumDocs / filter.numDocs) < 0.6) {
|
||||||
|
if (filter.loading.compareAndSet(false, true)) {
|
||||||
|
// do the async loading
|
||||||
|
BloomFilterLoader loader = new BloomFilterLoader(reader, fieldName);
|
||||||
|
if (asyncLoad) {
|
||||||
|
threadPool.cached().execute(loader);
|
||||||
|
} else {
|
||||||
|
loader.run();
|
||||||
|
filter = fieldCache.get(fieldName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return filter.filter;
|
||||||
|
}
|
||||||
|
|
||||||
|
class BloomFilterLoader implements Runnable {
|
||||||
|
private final IndexReader reader;
|
||||||
|
private final String field;
|
||||||
|
|
||||||
|
BloomFilterLoader(IndexReader reader, String field) {
|
||||||
|
this.reader = reader;
|
||||||
|
this.field = StringHelper.intern(field);
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings({"StringEquality"})
|
||||||
|
@Override public void run() {
|
||||||
|
TermDocs termDocs = null;
|
||||||
|
TermEnum termEnum = null;
|
||||||
|
try {
|
||||||
|
BloomFilter filter = BloomFilterFactory.getFilter(reader.numDocs(), 15);
|
||||||
|
termDocs = reader.termDocs();
|
||||||
|
termEnum = reader.terms(new Term(field));
|
||||||
|
do {
|
||||||
|
Term term = termEnum.term();
|
||||||
|
if (term == null || term.field() != field) break;
|
||||||
|
|
||||||
|
// LUCENE MONITOR: 4.0, move to use bytes!
|
||||||
|
UnicodeUtil.UTF8Result utf8Result = Unicode.fromStringAsUtf8(term.text());
|
||||||
|
termDocs.seek(termEnum);
|
||||||
|
while (termDocs.next()) {
|
||||||
|
// when traversing, make sure to ignore deleted docs, so the key->docId will be correct
|
||||||
|
if (!reader.isDeleted(termDocs.doc())) {
|
||||||
|
filter.add(utf8Result.result, 0, utf8Result.length);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} while (termEnum.next());
|
||||||
|
ConcurrentMap<String, BloomFilterEntry> fieldCache = cache.get(reader.getFieldCacheKey());
|
||||||
|
if (fieldCache != null) {
|
||||||
|
if (fieldCache.containsKey(field)) {
|
||||||
|
BloomFilterEntry filterEntry = new BloomFilterEntry(reader.numDocs(), filter);
|
||||||
|
filterEntry.loading.set(false);
|
||||||
|
fieldCache.put(field, filterEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.warn("failed to load bloom filter for [{}]", e, field);
|
||||||
|
} finally {
|
||||||
|
try {
|
||||||
|
if (termDocs != null) {
|
||||||
|
termDocs.close();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
if (termEnum != null) {
|
||||||
|
termEnum.close();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class BloomFilterEntry {
|
||||||
|
final int numDocs;
|
||||||
|
final BloomFilter filter;
|
||||||
|
final AtomicBoolean loading = new AtomicBoolean();
|
||||||
|
|
||||||
|
public BloomFilterEntry(int numDocs, BloomFilter filter) {
|
||||||
|
this.numDocs = numDocs;
|
||||||
|
this.filter = filter;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,12 +23,12 @@ import org.apache.lucene.analysis.Analyzer;
|
||||||
import org.apache.lucene.document.Document;
|
import org.apache.lucene.document.Document;
|
||||||
import org.apache.lucene.index.IndexReader;
|
import org.apache.lucene.index.IndexReader;
|
||||||
import org.apache.lucene.index.Term;
|
import org.apache.lucene.index.Term;
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
|
||||||
import org.apache.lucene.search.Query;
|
import org.apache.lucene.search.Query;
|
||||||
import org.elasticsearch.ElasticSearchException;
|
import org.elasticsearch.ElasticSearchException;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.component.CloseableComponent;
|
import org.elasticsearch.common.component.CloseableComponent;
|
||||||
import org.elasticsearch.common.lease.Releasable;
|
import org.elasticsearch.common.lease.Releasable;
|
||||||
|
import org.elasticsearch.common.lucene.search.ExtendedIndexSearcher;
|
||||||
import org.elasticsearch.common.lucene.uid.UidField;
|
import org.elasticsearch.common.lucene.uid.UidField;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadSafe;
|
import org.elasticsearch.common.util.concurrent.ThreadSafe;
|
||||||
|
@ -121,7 +121,7 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
|
||||||
|
|
||||||
IndexReader reader();
|
IndexReader reader();
|
||||||
|
|
||||||
IndexSearcher searcher();
|
ExtendedIndexSearcher searcher();
|
||||||
}
|
}
|
||||||
|
|
||||||
static class Refresh {
|
static class Refresh {
|
||||||
|
|
|
@ -22,13 +22,16 @@ package org.elasticsearch.index.engine.robin;
|
||||||
import org.apache.lucene.index.IndexReader;
|
import org.apache.lucene.index.IndexReader;
|
||||||
import org.apache.lucene.index.IndexWriter;
|
import org.apache.lucene.index.IndexWriter;
|
||||||
import org.apache.lucene.index.Term;
|
import org.apache.lucene.index.Term;
|
||||||
import org.apache.lucene.search.IndexSearcher;
|
|
||||||
import org.apache.lucene.store.AlreadyClosedException;
|
import org.apache.lucene.store.AlreadyClosedException;
|
||||||
|
import org.apache.lucene.util.UnicodeUtil;
|
||||||
import org.elasticsearch.ElasticSearchException;
|
import org.elasticsearch.ElasticSearchException;
|
||||||
import org.elasticsearch.common.Preconditions;
|
import org.elasticsearch.common.Preconditions;
|
||||||
|
import org.elasticsearch.common.Unicode;
|
||||||
|
import org.elasticsearch.common.bloom.BloomFilter;
|
||||||
import org.elasticsearch.common.inject.Inject;
|
import org.elasticsearch.common.inject.Inject;
|
||||||
import org.elasticsearch.common.lucene.IndexWriters;
|
import org.elasticsearch.common.lucene.IndexWriters;
|
||||||
import org.elasticsearch.common.lucene.ReaderSearcherHolder;
|
import org.elasticsearch.common.lucene.ReaderSearcherHolder;
|
||||||
|
import org.elasticsearch.common.lucene.search.ExtendedIndexSearcher;
|
||||||
import org.elasticsearch.common.lucene.uid.UidField;
|
import org.elasticsearch.common.lucene.uid.UidField;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
|
@ -36,9 +39,11 @@ import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.resource.AcquirableResource;
|
import org.elasticsearch.common.util.concurrent.resource.AcquirableResource;
|
||||||
import org.elasticsearch.index.analysis.AnalysisService;
|
import org.elasticsearch.index.analysis.AnalysisService;
|
||||||
|
import org.elasticsearch.index.cache.bloom.BloomCache;
|
||||||
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
import org.elasticsearch.index.deletionpolicy.SnapshotDeletionPolicy;
|
||||||
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
|
||||||
import org.elasticsearch.index.engine.*;
|
import org.elasticsearch.index.engine.*;
|
||||||
|
import org.elasticsearch.index.mapper.UidFieldMapper;
|
||||||
import org.elasticsearch.index.merge.policy.EnableMergePolicy;
|
import org.elasticsearch.index.merge.policy.EnableMergePolicy;
|
||||||
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
|
import org.elasticsearch.index.merge.policy.MergePolicyProvider;
|
||||||
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
|
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
|
||||||
|
@ -95,6 +100,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
|
||||||
|
|
||||||
private final SimilarityService similarityService;
|
private final SimilarityService similarityService;
|
||||||
|
|
||||||
|
private final BloomCache bloomCache;
|
||||||
|
|
||||||
|
private final boolean asyncLoadBloomFilter;
|
||||||
|
|
||||||
// no need for volatile, its always used under a lock
|
// no need for volatile, its always used under a lock
|
||||||
private IndexWriter indexWriter;
|
private IndexWriter indexWriter;
|
||||||
|
|
||||||
|
@ -107,6 +116,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
|
||||||
|
|
||||||
private volatile int disableFlushCounter = 0;
|
private volatile int disableFlushCounter = 0;
|
||||||
|
|
||||||
|
private volatile Searcher postFlushSearcher;
|
||||||
|
|
||||||
private final AtomicBoolean flushing = new AtomicBoolean();
|
private final AtomicBoolean flushing = new AtomicBoolean();
|
||||||
|
|
||||||
private final ConcurrentMap<String, VersionValue> versionMap;
|
private final ConcurrentMap<String, VersionValue> versionMap;
|
||||||
|
@ -115,7 +126,8 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
|
||||||
|
|
||||||
@Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
|
@Inject public RobinEngine(ShardId shardId, @IndexSettings Settings indexSettings, Store store, SnapshotDeletionPolicy deletionPolicy, Translog translog,
|
||||||
MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler,
|
MergePolicyProvider mergePolicyProvider, MergeSchedulerProvider mergeScheduler,
|
||||||
AnalysisService analysisService, SimilarityService similarityService) throws EngineException {
|
AnalysisService analysisService, SimilarityService similarityService,
|
||||||
|
BloomCache bloomCache) throws EngineException {
|
||||||
super(shardId, indexSettings);
|
super(shardId, indexSettings);
|
||||||
Preconditions.checkNotNull(store, "Store must be provided to the engine");
|
Preconditions.checkNotNull(store, "Store must be provided to the engine");
|
||||||
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the engine");
|
Preconditions.checkNotNull(deletionPolicy, "Snapshot deletion policy must be provided to the engine");
|
||||||
|
@ -126,6 +138,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
|
||||||
this.termIndexDivisor = indexSettings.getAsInt("index.term_index_divisor", 1); // IndexReader#DEFAULT_TERMS_INDEX_DIVISOR
|
this.termIndexDivisor = indexSettings.getAsInt("index.term_index_divisor", 1); // IndexReader#DEFAULT_TERMS_INDEX_DIVISOR
|
||||||
this.compoundFormat = indexSettings.getAsBoolean("index.compound_format", indexSettings.getAsBoolean("index.merge.policy.use_compound_file", store == null ? false : store.suggestUseCompoundFile()));
|
this.compoundFormat = indexSettings.getAsBoolean("index.compound_format", indexSettings.getAsBoolean("index.merge.policy.use_compound_file", store == null ? false : store.suggestUseCompoundFile()));
|
||||||
this.refreshInterval = componentSettings.getAsTime("refresh_interval", indexSettings.getAsTime("index.refresh_interval", timeValueSeconds(1)));
|
this.refreshInterval = componentSettings.getAsTime("refresh_interval", indexSettings.getAsTime("index.refresh_interval", timeValueSeconds(1)));
|
||||||
|
this.asyncLoadBloomFilter = componentSettings.getAsBoolean("async_load_bloom", true); // Here for testing, should always be true
|
||||||
|
|
||||||
this.store = store;
|
this.store = store;
|
||||||
this.deletionPolicy = deletionPolicy;
|
this.deletionPolicy = deletionPolicy;
|
||||||
|
@ -134,6 +147,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
|
||||||
this.mergeScheduler = mergeScheduler;
|
this.mergeScheduler = mergeScheduler;
|
||||||
this.analysisService = analysisService;
|
this.analysisService = analysisService;
|
||||||
this.similarityService = similarityService;
|
this.similarityService = similarityService;
|
||||||
|
this.bloomCache = bloomCache;
|
||||||
|
|
||||||
this.versionMap = new ConcurrentHashMap<String, VersionValue>(1000);
|
this.versionMap = new ConcurrentHashMap<String, VersionValue>(1000);
|
||||||
this.dirtyLocks = new Object[componentSettings.getAsInt("concurrency", 10000)];
|
this.dirtyLocks = new Object[componentSettings.getAsInt("concurrency", 10000)];
|
||||||
|
@ -178,6 +192,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
|
||||||
try {
|
try {
|
||||||
translog.newTranslog(newTransactionLogId());
|
translog.newTranslog(newTransactionLogId());
|
||||||
this.nrtResource = buildNrtResource(indexWriter);
|
this.nrtResource = buildNrtResource(indexWriter);
|
||||||
|
if (postFlushSearcher != null) {
|
||||||
|
postFlushSearcher.release();
|
||||||
|
}
|
||||||
|
postFlushSearcher = searcher();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
try {
|
try {
|
||||||
indexWriter.rollback();
|
indexWriter.rollback();
|
||||||
|
@ -590,7 +608,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
|
||||||
AcquirableResource<ReaderSearcherHolder> current = nrtResource;
|
AcquirableResource<ReaderSearcherHolder> current = nrtResource;
|
||||||
IndexReader newReader = current.resource().reader().reopen(true);
|
IndexReader newReader = current.resource().reader().reopen(true);
|
||||||
if (newReader != current.resource().reader()) {
|
if (newReader != current.resource().reader()) {
|
||||||
IndexSearcher indexSearcher = new IndexSearcher(newReader);
|
ExtendedIndexSearcher indexSearcher = new ExtendedIndexSearcher(newReader);
|
||||||
indexSearcher.setSimilarity(similarityService.defaultSearchSimilarity());
|
indexSearcher.setSimilarity(similarityService.defaultSearchSimilarity());
|
||||||
nrtResource = newAcquirableResource(new ReaderSearcherHolder(indexSearcher));
|
nrtResource = newAcquirableResource(new ReaderSearcherHolder(indexSearcher));
|
||||||
current.markForClose();
|
current.markForClose();
|
||||||
|
@ -687,6 +705,12 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
|
||||||
dirty = true; // force a refresh
|
dirty = true; // force a refresh
|
||||||
// we need to do a refresh here so we sync versioning support
|
// we need to do a refresh here so we sync versioning support
|
||||||
refresh(new Refresh(true));
|
refresh(new Refresh(true));
|
||||||
|
if (postFlushSearcher != null) {
|
||||||
|
postFlushSearcher.release();
|
||||||
|
}
|
||||||
|
// only need to load for this flush version searcher, since we keep a map for all
|
||||||
|
// the changes since the previous flush in memory
|
||||||
|
postFlushSearcher = searcher();
|
||||||
} finally {
|
} finally {
|
||||||
rwl.writeLock().unlock();
|
rwl.writeLock().unlock();
|
||||||
flushing.set(false);
|
flushing.set(false);
|
||||||
|
@ -838,6 +862,10 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
|
||||||
rwl.writeLock().lock();
|
rwl.writeLock().lock();
|
||||||
this.versionMap.clear();
|
this.versionMap.clear();
|
||||||
try {
|
try {
|
||||||
|
if (postFlushSearcher != null) {
|
||||||
|
postFlushSearcher.release();
|
||||||
|
postFlushSearcher = null;
|
||||||
|
}
|
||||||
if (nrtResource != null) {
|
if (nrtResource != null) {
|
||||||
this.nrtResource.forceClose();
|
this.nrtResource.forceClose();
|
||||||
}
|
}
|
||||||
|
@ -862,13 +890,22 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
|
||||||
}
|
}
|
||||||
|
|
||||||
private long loadCurrentVersionFromIndex(Term uid) {
|
private long loadCurrentVersionFromIndex(Term uid) {
|
||||||
|
UnicodeUtil.UTF8Result utf8 = Unicode.fromStringAsUtf8(uid.text());
|
||||||
// no version, get the version from the index
|
// no version, get the version from the index
|
||||||
Searcher searcher = searcher();
|
Searcher searcher = postFlushSearcher;
|
||||||
try {
|
for (IndexReader reader : searcher.searcher().subReaders()) {
|
||||||
return UidField.loadVersion(searcher.reader(), uid);
|
BloomFilter filter = bloomCache.filter(reader, UidFieldMapper.NAME, asyncLoadBloomFilter);
|
||||||
} finally {
|
// we know that its not there...
|
||||||
searcher.release();
|
if (!filter.isPresent(utf8.result, 0, utf8.length)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
long version = UidField.loadVersion(reader, uid);
|
||||||
|
// either -2 (its there, but no version associated), or an actual version
|
||||||
|
if (version != -1) {
|
||||||
|
return version;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
private IndexWriter createWriter() throws IOException {
|
private IndexWriter createWriter() throws IOException {
|
||||||
|
@ -898,7 +935,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
|
||||||
|
|
||||||
private AcquirableResource<ReaderSearcherHolder> buildNrtResource(IndexWriter indexWriter) throws IOException {
|
private AcquirableResource<ReaderSearcherHolder> buildNrtResource(IndexWriter indexWriter) throws IOException {
|
||||||
IndexReader indexReader = indexWriter.getReader();
|
IndexReader indexReader = indexWriter.getReader();
|
||||||
IndexSearcher indexSearcher = new IndexSearcher(indexReader);
|
ExtendedIndexSearcher indexSearcher = new ExtendedIndexSearcher(indexReader);
|
||||||
indexSearcher.setSimilarity(similarityService.defaultSearchSimilarity());
|
indexSearcher.setSimilarity(similarityService.defaultSearchSimilarity());
|
||||||
return newAcquirableResource(new ReaderSearcherHolder(indexSearcher));
|
return newAcquirableResource(new ReaderSearcherHolder(indexSearcher));
|
||||||
}
|
}
|
||||||
|
@ -923,7 +960,7 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine,
|
||||||
return nrtHolder.resource().reader();
|
return nrtHolder.resource().reader();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override public IndexSearcher searcher() {
|
@Override public ExtendedIndexSearcher searcher() {
|
||||||
return nrtHolder.resource().searcher();
|
return nrtHolder.resource().searcher();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elastic Search and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. Elastic Search licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.common.bloom;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.base.Charsets;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.*;
|
||||||
|
import static org.hamcrest.Matchers.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author kimchy (shay.banon)
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public class BoomFilterTests {
|
||||||
|
|
||||||
|
@Test public void testSimpleOps() {
|
||||||
|
BloomFilter filter = BloomFilterFactory.getFilter(10, 15);
|
||||||
|
filter.add(wrap("1"));
|
||||||
|
assertThat(filter.isPresent(wrap("1")), equalTo(true));
|
||||||
|
assertThat(filter.isPresent(wrap("2")), equalTo(false));
|
||||||
|
filter.add(wrap("2"));
|
||||||
|
assertThat(filter.isPresent(wrap("1")), equalTo(true));
|
||||||
|
assertThat(filter.isPresent(wrap("2")), equalTo(true));
|
||||||
|
}
|
||||||
|
|
||||||
|
private ByteBuffer wrap(String key) {
|
||||||
|
return ByteBuffer.wrap(key.getBytes(Charsets.UTF_8));
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,6 +20,7 @@
|
||||||
package org.elasticsearch.index.engine.robin;
|
package org.elasticsearch.index.engine.robin;
|
||||||
|
|
||||||
import org.elasticsearch.index.analysis.AnalysisService;
|
import org.elasticsearch.index.analysis.AnalysisService;
|
||||||
|
import org.elasticsearch.index.cache.bloom.none.NonBloomCache;
|
||||||
import org.elasticsearch.index.engine.AbstractSimpleEngineTests;
|
import org.elasticsearch.index.engine.AbstractSimpleEngineTests;
|
||||||
import org.elasticsearch.index.engine.Engine;
|
import org.elasticsearch.index.engine.Engine;
|
||||||
import org.elasticsearch.index.similarity.SimilarityService;
|
import org.elasticsearch.index.similarity.SimilarityService;
|
||||||
|
@ -35,6 +36,6 @@ public class SimpleRobinEngineTests extends AbstractSimpleEngineTests {
|
||||||
|
|
||||||
protected Engine createEngine(Store store, Translog translog) {
|
protected Engine createEngine(Store store, Translog translog) {
|
||||||
return new RobinEngine(shardId, EMPTY_SETTINGS, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
|
return new RobinEngine(shardId, EMPTY_SETTINGS, store, createSnapshotDeletionPolicy(), translog, createMergePolicy(), createMergeScheduler(),
|
||||||
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()));
|
new AnalysisService(shardId.index()), new SimilarityService(shardId.index()), new NonBloomCache(shardId.index()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.elasticsearch.index.query.IndexQueryParserModule;
|
||||||
import org.elasticsearch.index.settings.IndexSettingsModule;
|
import org.elasticsearch.index.settings.IndexSettingsModule;
|
||||||
import org.elasticsearch.index.similarity.SimilarityModule;
|
import org.elasticsearch.index.similarity.SimilarityModule;
|
||||||
import org.elasticsearch.script.ScriptModule;
|
import org.elasticsearch.script.ScriptModule;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPoolModule;
|
||||||
import org.testng.annotations.BeforeTest;
|
import org.testng.annotations.BeforeTest;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
@ -59,6 +60,7 @@ public class PercolatorExecutorTests {
|
||||||
Index index = new Index("test");
|
Index index = new Index("test");
|
||||||
Injector injector = new ModulesBuilder().add(
|
Injector injector = new ModulesBuilder().add(
|
||||||
new SettingsModule(settings),
|
new SettingsModule(settings),
|
||||||
|
new ThreadPoolModule(settings),
|
||||||
new ScriptModule(),
|
new ScriptModule(),
|
||||||
new MapperServiceModule(),
|
new MapperServiceModule(),
|
||||||
new IndexSettingsModule(settings),
|
new IndexSettingsModule(settings),
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.elasticsearch.index.search.geo.GeoPolygonFilter;
|
||||||
import org.elasticsearch.index.settings.IndexSettingsModule;
|
import org.elasticsearch.index.settings.IndexSettingsModule;
|
||||||
import org.elasticsearch.index.similarity.SimilarityModule;
|
import org.elasticsearch.index.similarity.SimilarityModule;
|
||||||
import org.elasticsearch.script.ScriptModule;
|
import org.elasticsearch.script.ScriptModule;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPoolModule;
|
||||||
import org.testng.annotations.BeforeClass;
|
import org.testng.annotations.BeforeClass;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
@ -77,6 +78,7 @@ public class SimpleIndexQueryParserTests {
|
||||||
Index index = new Index("test");
|
Index index = new Index("test");
|
||||||
Injector injector = new ModulesBuilder().add(
|
Injector injector = new ModulesBuilder().add(
|
||||||
new SettingsModule(settings),
|
new SettingsModule(settings),
|
||||||
|
new ThreadPoolModule(settings),
|
||||||
new ScriptModule(),
|
new ScriptModule(),
|
||||||
new MapperServiceModule(),
|
new MapperServiceModule(),
|
||||||
new IndexSettingsModule(settings),
|
new IndexSettingsModule(settings),
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.index.query.xcontent.XContentIndexQueryParser;
|
||||||
import org.elasticsearch.index.settings.IndexSettingsModule;
|
import org.elasticsearch.index.settings.IndexSettingsModule;
|
||||||
import org.elasticsearch.index.similarity.SimilarityModule;
|
import org.elasticsearch.index.similarity.SimilarityModule;
|
||||||
import org.elasticsearch.script.ScriptModule;
|
import org.elasticsearch.script.ScriptModule;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPoolModule;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
import static org.elasticsearch.common.settings.ImmutableSettings.*;
|
import static org.elasticsearch.common.settings.ImmutableSettings.*;
|
||||||
|
@ -57,6 +58,7 @@ public class IndexQueryParserModuleTests {
|
||||||
Index index = new Index("test");
|
Index index = new Index("test");
|
||||||
Injector injector = new ModulesBuilder().add(
|
Injector injector = new ModulesBuilder().add(
|
||||||
new SettingsModule(settings),
|
new SettingsModule(settings),
|
||||||
|
new ThreadPoolModule(settings),
|
||||||
new ScriptModule(),
|
new ScriptModule(),
|
||||||
new IndexSettingsModule(settings),
|
new IndexSettingsModule(settings),
|
||||||
new IndexCacheModule(settings),
|
new IndexCacheModule(settings),
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.elasticsearch.index.query.xcontent.XContentIndexQueryParser;
|
||||||
import org.elasticsearch.index.settings.IndexSettingsModule;
|
import org.elasticsearch.index.settings.IndexSettingsModule;
|
||||||
import org.elasticsearch.index.similarity.SimilarityModule;
|
import org.elasticsearch.index.similarity.SimilarityModule;
|
||||||
import org.elasticsearch.script.ScriptModule;
|
import org.elasticsearch.script.ScriptModule;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPoolModule;
|
||||||
import org.testng.annotations.Test;
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
import static org.hamcrest.MatcherAssert.*;
|
import static org.hamcrest.MatcherAssert.*;
|
||||||
|
@ -62,6 +63,7 @@ public class IndexQueryParserPluginTests {
|
||||||
Index index = new Index("test");
|
Index index = new Index("test");
|
||||||
Injector injector = new ModulesBuilder().add(
|
Injector injector = new ModulesBuilder().add(
|
||||||
new SettingsModule(settings),
|
new SettingsModule(settings),
|
||||||
|
new ThreadPoolModule(settings),
|
||||||
new ScriptModule(),
|
new ScriptModule(),
|
||||||
new IndexSettingsModule(settings),
|
new IndexSettingsModule(settings),
|
||||||
new IndexCacheModule(settings),
|
new IndexCacheModule(settings),
|
||||||
|
|
|
@ -23,6 +23,8 @@ import org.elasticsearch.ElasticSearchException;
|
||||||
import org.elasticsearch.action.index.IndexResponse;
|
import org.elasticsearch.action.index.IndexResponse;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||||
import org.elasticsearch.indices.IndexMissingException;
|
import org.elasticsearch.indices.IndexMissingException;
|
||||||
import org.elasticsearch.test.integration.AbstractNodesTests;
|
import org.elasticsearch.test.integration.AbstractNodesTests;
|
||||||
|
@ -43,8 +45,10 @@ public class SimpleVersioningTests extends AbstractNodesTests {
|
||||||
private Client client2;
|
private Client client2;
|
||||||
|
|
||||||
@BeforeClass public void createNodes() throws Exception {
|
@BeforeClass public void createNodes() throws Exception {
|
||||||
startNode("server1");
|
// make sure we use bloom filters here!
|
||||||
startNode("server2");
|
Settings settings = ImmutableSettings.settingsBuilder().put("index.engine.robin.async_load_bloom", false).build();
|
||||||
|
startNode("server1", settings);
|
||||||
|
startNode("server2", settings);
|
||||||
client = client("server1");
|
client = client("server1");
|
||||||
client2 = client("server2");
|
client2 = client("server2");
|
||||||
}
|
}
|
||||||
|
@ -115,4 +119,70 @@ public class SimpleVersioningTests extends AbstractNodesTests {
|
||||||
assertThat(searchResponse.hits().getAt(0).version(), equalTo(2l));
|
assertThat(searchResponse.hits().getAt(0).version(), equalTo(2l));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test public void testSimpleVersioningWithFlush() throws Exception {
|
||||||
|
try {
|
||||||
|
client.admin().indices().prepareDelete("test").execute().actionGet();
|
||||||
|
} catch (IndexMissingException e) {
|
||||||
|
// its ok
|
||||||
|
}
|
||||||
|
client.admin().indices().prepareCreate("test").execute().actionGet();
|
||||||
|
client.admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet();
|
||||||
|
|
||||||
|
IndexResponse indexResponse = client.prepareIndex("test", "type", "1").setSource("field1", "value1_1").execute().actionGet();
|
||||||
|
assertThat(indexResponse.version(), equalTo(1l));
|
||||||
|
|
||||||
|
client.admin().indices().prepareFlush().execute().actionGet();
|
||||||
|
|
||||||
|
indexResponse = client.prepareIndex("test", "type", "1").setSource("field1", "value1_2").setVersion(1).execute().actionGet();
|
||||||
|
assertThat(indexResponse.version(), equalTo(2l));
|
||||||
|
|
||||||
|
client.admin().indices().prepareFlush().execute().actionGet();
|
||||||
|
|
||||||
|
try {
|
||||||
|
client.prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute().actionGet();
|
||||||
|
} catch (ElasticSearchException e) {
|
||||||
|
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
client2.prepareIndex("test", "type", "1").setSource("field1", "value1_1").setVersion(1).execute().actionGet();
|
||||||
|
} catch (ElasticSearchException e) {
|
||||||
|
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
client.prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute().actionGet();
|
||||||
|
} catch (ElasticSearchException e) {
|
||||||
|
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
client2.prepareIndex("test", "type", "1").setCreate(true).setSource("field1", "value1_1").setVersion(1).execute().actionGet();
|
||||||
|
} catch (ElasticSearchException e) {
|
||||||
|
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
client.prepareDelete("test", "type", "1").setVersion(1).execute().actionGet();
|
||||||
|
} catch (ElasticSearchException e) {
|
||||||
|
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
client2.prepareDelete("test", "type", "1").setVersion(1).execute().actionGet();
|
||||||
|
} catch (ElasticSearchException e) {
|
||||||
|
assertThat(e.unwrapCause(), instanceOf(VersionConflictEngineException.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
client.admin().indices().prepareRefresh().execute().actionGet();
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
assertThat(client.prepareGet("test", "type", "1").execute().actionGet().version(), equalTo(2l));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
SearchResponse searchResponse = client.prepareSearch().setQuery(matchAllQuery()).execute().actionGet();
|
||||||
|
assertThat(searchResponse.hits().getAt(0).version(), equalTo(2l));
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue