add our own bloom filter implementation
uses more hash iterations, yet require less memory for the same fpp relates to #2411
This commit is contained in:
parent
512585da82
commit
547bd7abf2
|
@ -24,13 +24,12 @@ import org.apache.lucene.analysis.TokenStream;
|
|||
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
|
||||
import org.apache.lucene.analysis.tokenattributes.PayloadAttribute;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.index.AtomicReaderContext;
|
||||
import org.apache.lucene.index.DocsAndPositionsEnum;
|
||||
import org.apache.lucene.index.Term;
|
||||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.Numbers;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.index.codec.postingsformat.BloomFilterPostingsFormat;
|
||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -56,11 +55,27 @@ public class UidField extends Field {
|
|||
|
||||
// this works fine for nested docs since they don't have the payload which has the version
|
||||
// so we iterate till we find the one with the payload
|
||||
// LUCENE 4 UPGRADE: We can get rid of the do while loop, since there is only one _uid value (live docs are taken into account)
|
||||
public static DocIdAndVersion loadDocIdAndVersion(AtomicReaderContext context, Term term) {
|
||||
int docId = Lucene.NO_DOC;
|
||||
try {
|
||||
DocsAndPositionsEnum uid = context.reader().termPositionsEnum(term);
|
||||
Terms terms = context.reader().terms(term.field());
|
||||
if (terms == null) {
|
||||
return null;
|
||||
}
|
||||
// hack to break early if we have a bloom filter...
|
||||
if (terms instanceof BloomFilterPostingsFormat.BloomFilteredFieldsProducer.BloomFilteredTerms) {
|
||||
if (!((BloomFilterPostingsFormat.BloomFilteredFieldsProducer.BloomFilteredTerms) terms).getFilter().mightContain(term.bytes())) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
TermsEnum termsEnum = terms.iterator(null);
|
||||
if (termsEnum == null) {
|
||||
return null;
|
||||
}
|
||||
if (!termsEnum.seekExact(term.bytes(), true)) {
|
||||
return null;
|
||||
}
|
||||
DocsAndPositionsEnum uid = termsEnum.docsAndPositions(context.reader().getLiveDocs(), null, DocsAndPositionsEnum.FLAG_PAYLOADS);
|
||||
if (uid == null || uid.nextDoc() == DocIdSetIterator.NO_MORE_DOCS) {
|
||||
return null; // no doc
|
||||
}
|
||||
|
@ -89,10 +104,26 @@ public class UidField extends Field {
|
|||
* Load the version for the uid from the reader, returning -1 if no doc exists, or -2 if
|
||||
* no version is available (for backward comp.)
|
||||
*/
|
||||
// LUCENE 4 UPGRADE: We can get rid of the do while loop, since there is only one _uid value (live docs are taken into account)
|
||||
public static long loadVersion(AtomicReaderContext context, Term term) {
|
||||
try {
|
||||
DocsAndPositionsEnum uid = context.reader().termPositionsEnum(term);
|
||||
Terms terms = context.reader().terms(term.field());
|
||||
if (terms == null) {
|
||||
return -1;
|
||||
}
|
||||
// hack to break early if we have a bloom filter...
|
||||
if (terms instanceof BloomFilterPostingsFormat.BloomFilteredFieldsProducer.BloomFilteredTerms) {
|
||||
if (!((BloomFilterPostingsFormat.BloomFilteredFieldsProducer.BloomFilteredTerms) terms).getFilter().mightContain(term.bytes())) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
TermsEnum termsEnum = terms.iterator(null);
|
||||
if (termsEnum == null) {
|
||||
return -1;
|
||||
}
|
||||
if (!termsEnum.seekExact(term.bytes(), true)) {
|
||||
return -1;
|
||||
}
|
||||
DocsAndPositionsEnum uid = termsEnum.docsAndPositions(context.reader().getLiveDocs(), null, DocsAndPositionsEnum.FLAG_PAYLOADS);
|
||||
if (uid == null || uid.nextDoc() == DocIdSetIterator.NO_MORE_DOCS) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,503 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.codec.postingsformat;
|
||||
|
||||
import com.google.common.math.LongMath;
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.apache.lucene.store.DataInput;
|
||||
import org.apache.lucene.store.DataOutput;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.unit.SizeValue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.math.RoundingMode;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
|
||||
/**
|
||||
* A bloom filter. Inspired by Guava bloom filter implementation though with some optimizations.
|
||||
*/
|
||||
public class BloomFilter {
|
||||
|
||||
/**
|
||||
* A factory that can use different fpp based on size.
|
||||
*/
|
||||
public static class Factory {
|
||||
|
||||
public static final Factory DEFAULT = buildDefault();
|
||||
|
||||
private static Factory buildDefault() {
|
||||
// Some numbers:
|
||||
// 10k =0.001: 140.4kb , 10 Hashes
|
||||
// 10k =0.01 : 93.6kb , 6 Hashes
|
||||
// 100k=0.01 : 936.0kb , 6 Hashes
|
||||
// 100k=0.03 : 712.7kb , 5 Hashes
|
||||
// 500k=0.01 : 4.5mb , 6 Hashes
|
||||
// 500k=0.03 : 3.4mb , 5 Hashes
|
||||
// 500k=0.05 : 2.9mb , 4 Hashes
|
||||
// 1m=0.01 : 9.1mb , 6 Hashes
|
||||
// 1m=0.03 : 6.9mb , 5 Hashes
|
||||
// 1m=0.05 : 5.9mb , 4 Hashes
|
||||
// 5m=0.01 : 45.7mb , 6 Hashes
|
||||
// 5m=0.03 : 34.8mb , 5 Hashes
|
||||
// 5m=0.05 : 29.7mb , 4 Hashes
|
||||
// 50m=0.01 : 457.0mb , 6 Hashes
|
||||
// 50m=0.03 : 297.3mb , 4 Hashes
|
||||
// 50m=0.10 : 228.5mb , 3 Hashes
|
||||
return buildFromString("10k=0.01,1m=0.03");
|
||||
}
|
||||
|
||||
/**
|
||||
* Supports just passing fpp, as in "0.01", and also ranges, like "50k=0.01,1m=0.05". If
|
||||
* its null, returns {@link #buildDefault()}.
|
||||
*/
|
||||
public static Factory buildFromString(@Nullable String config) {
|
||||
if (config == null) {
|
||||
return buildDefault();
|
||||
}
|
||||
String[] sEntries = Strings.splitStringToArray(config, ',');
|
||||
if (sEntries.length == 0) {
|
||||
if (config.length() > 0) {
|
||||
return new Factory(new Entry[]{new Entry(0, Double.parseDouble(config))});
|
||||
}
|
||||
return buildDefault();
|
||||
}
|
||||
Entry[] entries = new Entry[sEntries.length];
|
||||
for (int i = 0; i < sEntries.length; i++) {
|
||||
int index = sEntries[i].indexOf('=');
|
||||
entries[i] = new Entry(
|
||||
(int) SizeValue.parseSizeValue(sEntries[i].substring(0, index).trim()).singles(),
|
||||
Double.parseDouble(sEntries[i].substring(index + 1).trim())
|
||||
);
|
||||
}
|
||||
return new Factory(entries);
|
||||
}
|
||||
|
||||
private final Entry[] entries;
|
||||
|
||||
public Factory(Entry[] entries) {
|
||||
this.entries = entries;
|
||||
// the order is from the upper most expected insertions to the lowest
|
||||
Arrays.sort(this.entries, new Comparator<Entry>() {
|
||||
@Override
|
||||
public int compare(Entry o1, Entry o2) {
|
||||
return o2.expectedInsertions - o1.expectedInsertions;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public BloomFilter createFilter(int expectedInsertions) {
|
||||
for (Entry entry : entries) {
|
||||
if (expectedInsertions > entry.expectedInsertions) {
|
||||
return BloomFilter.create(expectedInsertions, entry.fpp);
|
||||
}
|
||||
}
|
||||
return BloomFilter.create(expectedInsertions, 0.03);
|
||||
}
|
||||
|
||||
public static class Entry {
|
||||
public final int expectedInsertions;
|
||||
public final double fpp;
|
||||
|
||||
Entry(int expectedInsertions, double fpp) {
|
||||
this.expectedInsertions = expectedInsertions;
|
||||
this.fpp = fpp;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a bloom filter based on the with the expected number
|
||||
* of insertions and expected false positive probability.
|
||||
*
|
||||
* @param expectedInsertions the number of expected insertions to the constructed
|
||||
* @param fpp the desired false positive probability (must be positive and less than 1.0)
|
||||
*/
|
||||
public static BloomFilter create(int expectedInsertions, double fpp) {
|
||||
if (expectedInsertions == 0) {
|
||||
expectedInsertions = 1;
|
||||
}
|
||||
/*
|
||||
* TODO(user): Put a warning in the javadoc about tiny fpp values,
|
||||
* since the resulting size is proportional to -log(p), but there is not
|
||||
* much of a point after all, e.g. optimalM(1000, 0.0000000000000001) = 76680
|
||||
* which is less that 10kb. Who cares!
|
||||
*/
|
||||
long numBits = optimalNumOfBits(expectedInsertions, fpp);
|
||||
int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
|
||||
try {
|
||||
return new BloomFilter(new BitArray(numBits), numHashFunctions);
|
||||
} catch (IllegalArgumentException e) {
|
||||
throw new IllegalArgumentException("Could not create BloomFilter of " + numBits + " bits", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static BloomFilter deserialize(DataInput in) throws IOException {
|
||||
int version = in.readInt(); // we do nothing with this now..., defaults to 0
|
||||
|
||||
int numLongs = in.readInt();
|
||||
long[] data = new long[numLongs];
|
||||
for (int i = 0; i < numLongs; i++) {
|
||||
data[i] = in.readLong();
|
||||
}
|
||||
|
||||
int numberOfHashFunctions = in.readInt();
|
||||
|
||||
int hashType = in.readInt(); // again, nothing to do now...
|
||||
|
||||
return new BloomFilter(new BitArray(data), numberOfHashFunctions);
|
||||
}
|
||||
|
||||
public static void serilaize(BloomFilter filter, DataOutput out) throws IOException {
|
||||
out.writeInt(0); // version
|
||||
|
||||
BitArray bits = filter.bits;
|
||||
out.writeInt(bits.data.length);
|
||||
for (long l : bits.data) {
|
||||
out.writeLong(l);
|
||||
}
|
||||
|
||||
out.writeInt(filter.numHashFunctions);
|
||||
|
||||
out.writeInt(0); // hashType
|
||||
}
|
||||
|
||||
/**
|
||||
* The bit set of the BloomFilter (not necessarily power of 2!)
|
||||
*/
|
||||
final BitArray bits;
|
||||
/**
|
||||
* Number of hashes per element
|
||||
*/
|
||||
final int numHashFunctions;
|
||||
|
||||
BloomFilter(BitArray bits, int numHashFunctions) {
|
||||
this.bits = bits;
|
||||
this.numHashFunctions = numHashFunctions;
|
||||
/*
|
||||
* This only exists to forbid BFs that cannot use the compact persistent representation.
|
||||
* If it ever throws, at a user who was not intending to use that representation, we should
|
||||
* reconsider
|
||||
*/
|
||||
if (numHashFunctions > 255) {
|
||||
throw new IllegalArgumentException("Currently we don't allow BloomFilters that would use more than 255 hash functions");
|
||||
}
|
||||
}
|
||||
|
||||
public boolean put(BytesRef value) {
|
||||
long hash64 = hash3_x64_128(value.bytes, value.offset, value.length, 0);
|
||||
int hash1 = (int) hash64;
|
||||
int hash2 = (int) (hash64 >>> 32);
|
||||
boolean bitsChanged = false;
|
||||
for (int i = 1; i <= numHashFunctions; i++) {
|
||||
int nextHash = hash1 + i * hash2;
|
||||
if (nextHash < 0) {
|
||||
nextHash = ~nextHash;
|
||||
}
|
||||
bitsChanged |= bits.set(nextHash % bits.size());
|
||||
}
|
||||
return bitsChanged;
|
||||
}
|
||||
|
||||
public boolean mightContain(BytesRef value) {
|
||||
long hash64 = hash3_x64_128(value.bytes, value.offset, value.length, 0);
|
||||
int hash1 = (int) hash64;
|
||||
int hash2 = (int) (hash64 >>> 32);
|
||||
for (int i = 1; i <= numHashFunctions; i++) {
|
||||
int nextHash = hash1 + i * hash2;
|
||||
if (nextHash < 0) {
|
||||
nextHash = ~nextHash;
|
||||
}
|
||||
if (!bits.get(nextHash % bits.size())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public int getNumHashFunctions() {
|
||||
return this.numHashFunctions;
|
||||
}
|
||||
|
||||
public long getSizeInBytes() {
|
||||
return bits.size() + 8;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the probability that {@linkplain #mightContain(BytesRef)} will erroneously return
|
||||
* {@code true} for an object that has not actually been put in the {@code BloomFilter}.
|
||||
* <p/>
|
||||
* <p>Ideally, this number should be close to the {@code fpp} parameter
|
||||
* passed in create, or smaller. If it is
|
||||
* significantly higher, it is usually the case that too many elements (more than
|
||||
* expected) have been put in the {@code BloomFilter}, degenerating it.
|
||||
*/
|
||||
public double getExpectedFpp() {
|
||||
// You down with FPP? (Yeah you know me!) Who's down with FPP? (Every last homie!)
|
||||
return Math.pow((double) bits.bitCount() / bits.size(), numHashFunctions);
|
||||
}
|
||||
|
||||
/*
|
||||
* Cheat sheet:
|
||||
*
|
||||
* m: total bits
|
||||
* n: expected insertions
|
||||
* b: m/n, bits per insertion
|
||||
|
||||
* p: expected false positive probability
|
||||
*
|
||||
* 1) Optimal k = b * ln2
|
||||
* 2) p = (1 - e ^ (-kn/m))^k
|
||||
* 3) For optimal k: p = 2 ^ (-k) ~= 0.6185^b
|
||||
* 4) For optimal k: m = -nlnp / ((ln2) ^ 2)
|
||||
*/
|
||||
|
||||
/**
|
||||
* Computes the optimal k (number of hashes per element inserted in Bloom filter), given the
|
||||
* expected insertions and total number of bits in the Bloom filter.
|
||||
* <p/>
|
||||
* See http://en.wikipedia.org/wiki/File:Bloom_filter_fp_probability.svg for the formula.
|
||||
*
|
||||
* @param n expected insertions (must be positive)
|
||||
* @param m total number of bits in Bloom filter (must be positive)
|
||||
*/
|
||||
static int optimalNumOfHashFunctions(long n, long m) {
|
||||
return Math.max(1, (int) Math.round(m / n * Math.log(2)));
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes m (total bits of Bloom filter) which is expected to achieve, for the specified
|
||||
* expected insertions, the required false positive probability.
|
||||
* <p/>
|
||||
* See http://en.wikipedia.org/wiki/Bloom_filter#Probability_of_false_positives for the formula.
|
||||
*
|
||||
* @param n expected insertions (must be positive)
|
||||
* @param p false positive rate (must be 0 < p < 1)
|
||||
*/
|
||||
static long optimalNumOfBits(long n, double p) {
|
||||
if (p == 0) {
|
||||
p = Double.MIN_VALUE;
|
||||
}
|
||||
return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
|
||||
}
|
||||
|
||||
// START : MURMUR 3_128
|
||||
|
||||
protected static long getblock(byte[] key, int offset, int index) {
|
||||
int i_8 = index << 3;
|
||||
int blockOffset = offset + i_8;
|
||||
return ((long) key[blockOffset + 0] & 0xff) + (((long) key[blockOffset + 1] & 0xff) << 8) +
|
||||
(((long) key[blockOffset + 2] & 0xff) << 16) + (((long) key[blockOffset + 3] & 0xff) << 24) +
|
||||
(((long) key[blockOffset + 4] & 0xff) << 32) + (((long) key[blockOffset + 5] & 0xff) << 40) +
|
||||
(((long) key[blockOffset + 6] & 0xff) << 48) + (((long) key[blockOffset + 7] & 0xff) << 56);
|
||||
}
|
||||
|
||||
protected static long rotl64(long v, int n) {
|
||||
return ((v << n) | (v >>> (64 - n)));
|
||||
}
|
||||
|
||||
protected static long fmix(long k) {
|
||||
k ^= k >>> 33;
|
||||
k *= 0xff51afd7ed558ccdL;
|
||||
k ^= k >>> 33;
|
||||
k *= 0xc4ceb9fe1a85ec53L;
|
||||
k ^= k >>> 33;
|
||||
|
||||
return k;
|
||||
}
|
||||
|
||||
public static long hash3_x64_128(byte[] key, int offset, int length, long seed) {
|
||||
final int nblocks = length >> 4; // Process as 128-bit blocks.
|
||||
|
||||
long h1 = seed;
|
||||
long h2 = seed;
|
||||
|
||||
long c1 = 0x87c37b91114253d5L;
|
||||
long c2 = 0x4cf5ad432745937fL;
|
||||
|
||||
//----------
|
||||
// body
|
||||
|
||||
for (int i = 0; i < nblocks; i++) {
|
||||
long k1 = getblock(key, offset, i * 2 + 0);
|
||||
long k2 = getblock(key, offset, i * 2 + 1);
|
||||
|
||||
k1 *= c1;
|
||||
k1 = rotl64(k1, 31);
|
||||
k1 *= c2;
|
||||
h1 ^= k1;
|
||||
|
||||
h1 = rotl64(h1, 27);
|
||||
h1 += h2;
|
||||
h1 = h1 * 5 + 0x52dce729;
|
||||
|
||||
k2 *= c2;
|
||||
k2 = rotl64(k2, 33);
|
||||
k2 *= c1;
|
||||
h2 ^= k2;
|
||||
|
||||
h2 = rotl64(h2, 31);
|
||||
h2 += h1;
|
||||
h2 = h2 * 5 + 0x38495ab5;
|
||||
}
|
||||
|
||||
//----------
|
||||
// tail
|
||||
|
||||
// Advance offset to the unprocessed tail of the data.
|
||||
offset += nblocks * 16;
|
||||
|
||||
long k1 = 0;
|
||||
long k2 = 0;
|
||||
|
||||
switch (length & 15) {
|
||||
case 15:
|
||||
k2 ^= ((long) key[offset + 14]) << 48;
|
||||
case 14:
|
||||
k2 ^= ((long) key[offset + 13]) << 40;
|
||||
case 13:
|
||||
k2 ^= ((long) key[offset + 12]) << 32;
|
||||
case 12:
|
||||
k2 ^= ((long) key[offset + 11]) << 24;
|
||||
case 11:
|
||||
k2 ^= ((long) key[offset + 10]) << 16;
|
||||
case 10:
|
||||
k2 ^= ((long) key[offset + 9]) << 8;
|
||||
case 9:
|
||||
k2 ^= ((long) key[offset + 8]) << 0;
|
||||
k2 *= c2;
|
||||
k2 = rotl64(k2, 33);
|
||||
k2 *= c1;
|
||||
h2 ^= k2;
|
||||
|
||||
case 8:
|
||||
k1 ^= ((long) key[offset + 7]) << 56;
|
||||
case 7:
|
||||
k1 ^= ((long) key[offset + 6]) << 48;
|
||||
case 6:
|
||||
k1 ^= ((long) key[offset + 5]) << 40;
|
||||
case 5:
|
||||
k1 ^= ((long) key[offset + 4]) << 32;
|
||||
case 4:
|
||||
k1 ^= ((long) key[offset + 3]) << 24;
|
||||
case 3:
|
||||
k1 ^= ((long) key[offset + 2]) << 16;
|
||||
case 2:
|
||||
k1 ^= ((long) key[offset + 1]) << 8;
|
||||
case 1:
|
||||
k1 ^= ((long) key[offset]);
|
||||
k1 *= c1;
|
||||
k1 = rotl64(k1, 31);
|
||||
k1 *= c2;
|
||||
h1 ^= k1;
|
||||
}
|
||||
|
||||
//----------
|
||||
// finalization
|
||||
|
||||
h1 ^= length;
|
||||
h2 ^= length;
|
||||
|
||||
h1 += h2;
|
||||
h2 += h1;
|
||||
|
||||
h1 = fmix(h1);
|
||||
h2 = fmix(h2);
|
||||
|
||||
h1 += h2;
|
||||
h2 += h1;
|
||||
|
||||
//return (new long[]{h1, h2});
|
||||
// SAME AS GUAVA, they take the first long out of the 128bit
|
||||
return h1;
|
||||
}
|
||||
|
||||
// END: MURMUR 3_128
|
||||
|
||||
// Note: We use this instead of java.util.BitSet because we need access to the long[] data field
|
||||
static class BitArray {
|
||||
final long[] data;
|
||||
int bitCount;
|
||||
|
||||
BitArray(long bits) {
|
||||
this(new long[Ints.checkedCast(LongMath.divide(bits, 64, RoundingMode.CEILING))]);
|
||||
}
|
||||
|
||||
// Used by serialization
|
||||
BitArray(long[] data) {
|
||||
this.data = data;
|
||||
int bitCount = 0;
|
||||
for (long value : data) {
|
||||
bitCount += Long.bitCount(value);
|
||||
}
|
||||
this.bitCount = bitCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the bit changed value.
|
||||
*/
|
||||
boolean set(int index) {
|
||||
if (!get(index)) {
|
||||
data[index >> 6] |= (1L << index);
|
||||
bitCount++;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean get(int index) {
|
||||
return (data[index >> 6] & (1L << index)) != 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Number of bits
|
||||
*/
|
||||
int size() {
|
||||
return data.length * Long.SIZE;
|
||||
}
|
||||
|
||||
/**
|
||||
* Number of set bits (1s)
|
||||
*/
|
||||
int bitCount() {
|
||||
return bitCount;
|
||||
}
|
||||
|
||||
BitArray copy() {
|
||||
return new BitArray(data.clone());
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (o instanceof BitArray) {
|
||||
BitArray bitArray = (BitArray) o;
|
||||
return Arrays.equals(data, bitArray.data);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Arrays.hashCode(data);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,106 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.codec.postingsformat;
|
||||
|
||||
import org.apache.lucene.codecs.PostingsFormat;
|
||||
import org.apache.lucene.codecs.bloom.BloomFilterFactory;
|
||||
import org.apache.lucene.codecs.bloom.BloomFilteringPostingsFormat;
|
||||
import org.apache.lucene.codecs.bloom.FuzzySet;
|
||||
import org.apache.lucene.index.FieldInfo;
|
||||
import org.apache.lucene.index.SegmentWriteState;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.assistedinject.Assisted;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class BloomFilterLucenePostingsFormatProvider extends AbstractPostingsFormatProvider {
|
||||
|
||||
public static final class Defaults {
|
||||
public static final float MAX_SATURATION = 0.1f;
|
||||
public static final float SATURATION_LIMIT = 0.9f;
|
||||
}
|
||||
|
||||
private final float desiredMaxSaturation;
|
||||
private final float saturationLimit;
|
||||
private final PostingsFormatProvider delegate;
|
||||
private final BloomFilteringPostingsFormat postingsFormat;
|
||||
|
||||
@Inject
|
||||
public BloomFilterLucenePostingsFormatProvider(@IndexSettings Settings indexSettings, @Nullable Map<String, Factory> postingFormatFactories, @Assisted String name, @Assisted Settings postingsFormatSettings) {
|
||||
super(name);
|
||||
this.desiredMaxSaturation = postingsFormatSettings.getAsFloat("desired_max_saturation", Defaults.MAX_SATURATION);
|
||||
this.saturationLimit = postingsFormatSettings.getAsFloat("saturation_limit", Defaults.SATURATION_LIMIT);
|
||||
this.delegate = Helper.lookup(indexSettings, postingsFormatSettings.get("delegate"), postingFormatFactories);
|
||||
this.postingsFormat = new BloomFilteringPostingsFormat(
|
||||
delegate.get(),
|
||||
new CustomBloomFilterFactory(desiredMaxSaturation, saturationLimit)
|
||||
);
|
||||
}
|
||||
|
||||
public float desiredMaxSaturation() {
|
||||
return desiredMaxSaturation;
|
||||
}
|
||||
|
||||
public float saturationLimit() {
|
||||
return saturationLimit;
|
||||
}
|
||||
|
||||
public PostingsFormatProvider delegate() {
|
||||
return delegate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PostingsFormat get() {
|
||||
return postingsFormat;
|
||||
}
|
||||
|
||||
public static class CustomBloomFilterFactory extends BloomFilterFactory {
|
||||
|
||||
private final float desiredMaxSaturation;
|
||||
private final float saturationLimit;
|
||||
|
||||
public CustomBloomFilterFactory() {
|
||||
this(Defaults.MAX_SATURATION, Defaults.SATURATION_LIMIT);
|
||||
}
|
||||
|
||||
CustomBloomFilterFactory(float desiredMaxSaturation, float saturationLimit) {
|
||||
this.desiredMaxSaturation = desiredMaxSaturation;
|
||||
this.saturationLimit = saturationLimit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FuzzySet getSetForField(SegmentWriteState state, FieldInfo info) {
|
||||
//Assume all of the docs have a unique term (e.g. a primary key) and we hope to maintain a set with desiredMaxSaturation% of bits set
|
||||
return FuzzySet.createSetBasedOnQuality(state.segmentInfo.getDocCount(), desiredMaxSaturation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSaturated(FuzzySet bloomFilter, FieldInfo fieldInfo) {
|
||||
// Don't bother saving bitsets if > saturationLimit % of bits are set - we don't want to
|
||||
// throw any more memory at this problem.
|
||||
return bloomFilter.getSaturation() > saturationLimit;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,452 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.codec.postingsformat;
|
||||
|
||||
import org.apache.lucene.codecs.*;
|
||||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.apache.lucene.store.IndexOutput;
|
||||
import org.apache.lucene.util.Bits;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.automaton.CompiledAutomaton;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* A {@link PostingsFormat} useful for low doc-frequency fields such as primary
|
||||
* keys. Bloom filters are maintained in a ".blm" file which offers "fast-fail"
|
||||
* for reads in segments known to have no record of the key. A choice of
|
||||
* delegate PostingsFormat is used to record all other Postings data.
|
||||
* </p>
|
||||
* <p>
|
||||
* This is a special bloom filter version, based on {@link BloomFilter} and inspired
|
||||
* by Lucene {@link org.apache.lucene.codecs.bloom.BloomFilteringPostingsFormat}.
|
||||
* </p>
|
||||
*/
|
||||
public final class BloomFilterPostingsFormat extends PostingsFormat {
|
||||
|
||||
public static final String BLOOM_CODEC_NAME = "XBloomFilter"; // the Lucene one is named BloomFilter
|
||||
public static final int BLOOM_CODEC_VERSION = 1;
|
||||
|
||||
/**
|
||||
* Extension of Bloom Filters file
|
||||
*/
|
||||
static final String BLOOM_EXTENSION = "blm";
|
||||
|
||||
private BloomFilter.Factory bloomFilterFactory = BloomFilter.Factory.DEFAULT;
|
||||
private PostingsFormat delegatePostingsFormat;
|
||||
|
||||
/**
|
||||
* Creates Bloom filters for a selection of fields created in the index. This
|
||||
* is recorded as a set of Bitsets held as a segment summary in an additional
|
||||
* "blm" file. This PostingsFormat delegates to a choice of delegate
|
||||
* PostingsFormat for encoding all other postings data.
|
||||
*
|
||||
* @param delegatePostingsFormat The PostingsFormat that records all the non-bloom filter data i.e.
|
||||
* postings info.
|
||||
* @param bloomFilterFactory The {@link BloomFilter.Factory} responsible for sizing BloomFilters
|
||||
* appropriately
|
||||
*/
|
||||
public BloomFilterPostingsFormat(PostingsFormat delegatePostingsFormat,
|
||||
BloomFilter.Factory bloomFilterFactory) {
|
||||
super(BLOOM_CODEC_NAME);
|
||||
this.delegatePostingsFormat = delegatePostingsFormat;
|
||||
this.bloomFilterFactory = bloomFilterFactory;
|
||||
}
|
||||
|
||||
// Used only by core Lucene at read-time via Service Provider instantiation -
|
||||
// do not use at Write-time in application code.
|
||||
public BloomFilterPostingsFormat() {
|
||||
super(BLOOM_CODEC_NAME);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FieldsConsumer fieldsConsumer(SegmentWriteState state)
|
||||
throws IOException {
|
||||
if (delegatePostingsFormat == null) {
|
||||
throw new UnsupportedOperationException("Error - " + getClass().getName()
|
||||
+ " has been constructed without a choice of PostingsFormat");
|
||||
}
|
||||
return new BloomFilteredFieldsConsumer(
|
||||
delegatePostingsFormat.fieldsConsumer(state), state,
|
||||
delegatePostingsFormat);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FieldsProducer fieldsProducer(SegmentReadState state)
|
||||
throws IOException {
|
||||
return new BloomFilteredFieldsProducer(state);
|
||||
}
|
||||
|
||||
public class BloomFilteredFieldsProducer extends FieldsProducer {
|
||||
private FieldsProducer delegateFieldsProducer;
|
||||
HashMap<String, BloomFilter> bloomsByFieldName = new HashMap<String, BloomFilter>();
|
||||
|
||||
public BloomFilteredFieldsProducer(SegmentReadState state)
|
||||
throws IOException {
|
||||
|
||||
String bloomFileName = IndexFileNames.segmentFileName(
|
||||
state.segmentInfo.name, state.segmentSuffix, BLOOM_EXTENSION);
|
||||
IndexInput bloomIn = null;
|
||||
boolean success = false;
|
||||
try {
|
||||
bloomIn = state.dir.openInput(bloomFileName, state.context);
|
||||
CodecUtil.checkHeader(bloomIn, BLOOM_CODEC_NAME, BLOOM_CODEC_VERSION,
|
||||
BLOOM_CODEC_VERSION);
|
||||
// // Load the hash function used in the BloomFilter
|
||||
// hashFunction = HashFunction.forName(bloomIn.readString());
|
||||
// Load the delegate postings format
|
||||
PostingsFormat delegatePostingsFormat = PostingsFormat.forName(bloomIn
|
||||
.readString());
|
||||
|
||||
this.delegateFieldsProducer = delegatePostingsFormat
|
||||
.fieldsProducer(state);
|
||||
int numBlooms = bloomIn.readInt();
|
||||
for (int i = 0; i < numBlooms; i++) {
|
||||
int fieldNum = bloomIn.readInt();
|
||||
BloomFilter bloom = BloomFilter.deserialize(bloomIn);
|
||||
FieldInfo fieldInfo = state.fieldInfos.fieldInfo(fieldNum);
|
||||
bloomsByFieldName.put(fieldInfo.name, bloom);
|
||||
}
|
||||
IOUtils.close(bloomIn);
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
IOUtils.closeWhileHandlingException(bloomIn, delegateFieldsProducer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<String> iterator() {
|
||||
return delegateFieldsProducer.iterator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
delegateFieldsProducer.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Terms terms(String field) throws IOException {
|
||||
BloomFilter filter = bloomsByFieldName.get(field);
|
||||
if (filter == null) {
|
||||
return delegateFieldsProducer.terms(field);
|
||||
} else {
|
||||
Terms result = delegateFieldsProducer.terms(field);
|
||||
if (result == null) {
|
||||
return null;
|
||||
}
|
||||
return new BloomFilteredTerms(result, filter);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int size() {
|
||||
return delegateFieldsProducer.size();
|
||||
}
|
||||
|
||||
public long getUniqueTermCount() throws IOException {
|
||||
return delegateFieldsProducer.getUniqueTermCount();
|
||||
}
|
||||
|
||||
public class BloomFilteredTerms extends Terms {
|
||||
private Terms delegateTerms;
|
||||
private BloomFilter filter;
|
||||
|
||||
public BloomFilteredTerms(Terms terms, BloomFilter filter) {
|
||||
this.delegateTerms = terms;
|
||||
this.filter = filter;
|
||||
}
|
||||
|
||||
public BloomFilter getFilter() {
|
||||
return filter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TermsEnum intersect(CompiledAutomaton compiled,
|
||||
final BytesRef startTerm) throws IOException {
|
||||
return delegateTerms.intersect(compiled, startTerm);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TermsEnum iterator(TermsEnum reuse) throws IOException {
|
||||
TermsEnum result;
|
||||
if ((reuse != null) && (reuse instanceof BloomFilteredTermsEnum)) {
|
||||
// recycle the existing BloomFilteredTermsEnum by asking the delegate
|
||||
// to recycle its contained TermsEnum
|
||||
BloomFilteredTermsEnum bfte = (BloomFilteredTermsEnum) reuse;
|
||||
if (bfte.filter == filter) {
|
||||
bfte.delegateTermsEnum = delegateTerms.iterator(bfte.delegateTermsEnum);
|
||||
return bfte;
|
||||
}
|
||||
}
|
||||
// We have been handed something we cannot reuse (either null, wrong
|
||||
// class or wrong filter) so allocate a new object
|
||||
result = new BloomFilteredTermsEnum(delegateTerms.iterator(reuse), filter);
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator<BytesRef> getComparator() throws IOException {
|
||||
return delegateTerms.getComparator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long size() throws IOException {
|
||||
return delegateTerms.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSumTotalTermFreq() throws IOException {
|
||||
return delegateTerms.getSumTotalTermFreq();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getSumDocFreq() throws IOException {
|
||||
return delegateTerms.getSumDocFreq();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getDocCount() throws IOException {
|
||||
return delegateTerms.getDocCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasOffsets() {
|
||||
return delegateTerms.hasOffsets();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPositions() {
|
||||
return delegateTerms.hasPositions();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasPayloads() {
|
||||
return delegateTerms.hasPayloads();
|
||||
}
|
||||
}
|
||||
|
||||
class BloomFilteredTermsEnum extends TermsEnum {
|
||||
|
||||
TermsEnum delegateTermsEnum;
|
||||
private BloomFilter filter;
|
||||
|
||||
public BloomFilteredTermsEnum(TermsEnum iterator, BloomFilter filter) {
|
||||
this.delegateTermsEnum = iterator;
|
||||
this.filter = filter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final BytesRef next() throws IOException {
|
||||
return delegateTermsEnum.next();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final Comparator<BytesRef> getComparator() {
|
||||
return delegateTermsEnum.getComparator();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean seekExact(BytesRef text, boolean useCache)
|
||||
throws IOException {
|
||||
// The magical fail-fast speed up that is the entire point of all of
|
||||
// this code - save a disk seek if there is a match on an in-memory
|
||||
// structure
|
||||
// that may occasionally give a false positive but guaranteed no false
|
||||
// negatives
|
||||
if (!filter.mightContain(text)) {
|
||||
return false;
|
||||
}
|
||||
return delegateTermsEnum.seekExact(text, useCache);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final SeekStatus seekCeil(BytesRef text, boolean useCache)
|
||||
throws IOException {
|
||||
return delegateTermsEnum.seekCeil(text, useCache);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void seekExact(long ord) throws IOException {
|
||||
delegateTermsEnum.seekExact(ord);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final BytesRef term() throws IOException {
|
||||
return delegateTermsEnum.term();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final long ord() throws IOException {
|
||||
return delegateTermsEnum.ord();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final int docFreq() throws IOException {
|
||||
return delegateTermsEnum.docFreq();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final long totalTermFreq() throws IOException {
|
||||
return delegateTermsEnum.totalTermFreq();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public DocsAndPositionsEnum docsAndPositions(Bits liveDocs,
|
||||
DocsAndPositionsEnum reuse, int flags) throws IOException {
|
||||
return delegateTermsEnum.docsAndPositions(liveDocs, reuse, flags);
|
||||
}
|
||||
|
||||
@Override
|
||||
public DocsEnum docs(Bits liveDocs, DocsEnum reuse, int flags)
|
||||
throws IOException {
|
||||
return delegateTermsEnum.docs(liveDocs, reuse, flags);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class BloomFilteredFieldsConsumer extends FieldsConsumer {
|
||||
private FieldsConsumer delegateFieldsConsumer;
|
||||
private Map<FieldInfo, BloomFilter> bloomFilters = new HashMap<FieldInfo, BloomFilter>();
|
||||
private SegmentWriteState state;
|
||||
|
||||
// private PostingsFormat delegatePostingsFormat;
|
||||
|
||||
public BloomFilteredFieldsConsumer(FieldsConsumer fieldsConsumer,
|
||||
SegmentWriteState state, PostingsFormat delegatePostingsFormat) {
|
||||
this.delegateFieldsConsumer = fieldsConsumer;
|
||||
// this.delegatePostingsFormat=delegatePostingsFormat;
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TermsConsumer addField(FieldInfo field) throws IOException {
|
||||
BloomFilter bloomFilter = bloomFilterFactory.createFilter(state.segmentInfo.getDocCount());
|
||||
if (bloomFilter != null) {
|
||||
assert bloomFilters.containsKey(field) == false;
|
||||
bloomFilters.put(field, bloomFilter);
|
||||
return new WrappedTermsConsumer(delegateFieldsConsumer.addField(field), bloomFilter);
|
||||
} else {
|
||||
// No, use the unfiltered fieldsConsumer - we are not interested in
|
||||
// recording any term Bitsets.
|
||||
return delegateFieldsConsumer.addField(field);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
delegateFieldsConsumer.close();
|
||||
// Now we are done accumulating values for these fields
|
||||
List<Entry<FieldInfo, BloomFilter>> nonSaturatedBlooms = new ArrayList<Map.Entry<FieldInfo, BloomFilter>>();
|
||||
|
||||
for (Entry<FieldInfo, BloomFilter> entry : bloomFilters.entrySet()) {
|
||||
BloomFilter bloomFilter = entry.getValue();
|
||||
//if (!bloomFilterFactory.isSaturated(bloomFilter, entry.getKey())) {
|
||||
nonSaturatedBlooms.add(entry);
|
||||
//}
|
||||
}
|
||||
String bloomFileName = IndexFileNames.segmentFileName(
|
||||
state.segmentInfo.name, state.segmentSuffix, BLOOM_EXTENSION);
|
||||
IndexOutput bloomOutput = null;
|
||||
try {
|
||||
bloomOutput = state.directory
|
||||
.createOutput(bloomFileName, state.context);
|
||||
CodecUtil.writeHeader(bloomOutput, BLOOM_CODEC_NAME,
|
||||
BLOOM_CODEC_VERSION);
|
||||
// remember the name of the postings format we will delegate to
|
||||
bloomOutput.writeString(delegatePostingsFormat.getName());
|
||||
|
||||
// First field in the output file is the number of fields+blooms saved
|
||||
bloomOutput.writeInt(nonSaturatedBlooms.size());
|
||||
for (Entry<FieldInfo, BloomFilter> entry : nonSaturatedBlooms) {
|
||||
FieldInfo fieldInfo = entry.getKey();
|
||||
BloomFilter bloomFilter = entry.getValue();
|
||||
bloomOutput.writeInt(fieldInfo.number);
|
||||
saveAppropriatelySizedBloomFilter(bloomOutput, bloomFilter, fieldInfo);
|
||||
}
|
||||
} finally {
|
||||
IOUtils.close(bloomOutput);
|
||||
}
|
||||
//We are done with large bitsets so no need to keep them hanging around
|
||||
bloomFilters.clear();
|
||||
}
|
||||
|
||||
private void saveAppropriatelySizedBloomFilter(IndexOutput bloomOutput,
|
||||
BloomFilter bloomFilter, FieldInfo fieldInfo) throws IOException {
|
||||
|
||||
// FuzzySet rightSizedSet = bloomFilterFactory.downsize(fieldInfo,
|
||||
// bloomFilter);
|
||||
// if (rightSizedSet == null) {
|
||||
// rightSizedSet = bloomFilter;
|
||||
// }
|
||||
// rightSizedSet.serialize(bloomOutput);
|
||||
BloomFilter.serilaize(bloomFilter, bloomOutput);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class WrappedTermsConsumer extends TermsConsumer {
|
||||
private TermsConsumer delegateTermsConsumer;
|
||||
private BloomFilter bloomFilter;
|
||||
|
||||
public WrappedTermsConsumer(TermsConsumer termsConsumer, BloomFilter bloomFilter) {
|
||||
this.delegateTermsConsumer = termsConsumer;
|
||||
this.bloomFilter = bloomFilter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PostingsConsumer startTerm(BytesRef text) throws IOException {
|
||||
return delegateTermsConsumer.startTerm(text);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finishTerm(BytesRef text, TermStats stats) throws IOException {
|
||||
|
||||
// Record this term in our BloomFilter
|
||||
if (stats.docFreq > 0) {
|
||||
bloomFilter.put(text);
|
||||
}
|
||||
delegateTermsConsumer.finishTerm(text, stats);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finish(long sumTotalTermFreq, long sumDocFreq, int docCount)
|
||||
throws IOException {
|
||||
delegateTermsConsumer.finish(sumTotalTermFreq, sumDocFreq, docCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Comparator<BytesRef> getComparator() throws IOException {
|
||||
return delegateTermsConsumer.getComparator();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -20,11 +20,6 @@
|
|||
package org.elasticsearch.index.codec.postingsformat;
|
||||
|
||||
import org.apache.lucene.codecs.PostingsFormat;
|
||||
import org.apache.lucene.codecs.bloom.BloomFilterFactory;
|
||||
import org.apache.lucene.codecs.bloom.BloomFilteringPostingsFormat;
|
||||
import org.apache.lucene.codecs.bloom.FuzzySet;
|
||||
import org.apache.lucene.index.FieldInfo;
|
||||
import org.apache.lucene.index.SegmentWriteState;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.assistedinject.Assisted;
|
||||
|
@ -37,36 +32,19 @@ import java.util.Map;
|
|||
*/
|
||||
public class BloomFilterPostingsFormatProvider extends AbstractPostingsFormatProvider {
|
||||
|
||||
public static final class Defaults {
|
||||
public static final float MAX_SATURATION = 0.1f;
|
||||
public static final float SATURATION_LIMIT = 0.9f;
|
||||
}
|
||||
|
||||
private final float desiredMaxSaturation;
|
||||
private final float saturationLimit;
|
||||
private final PostingsFormatProvider delegate;
|
||||
private final BloomFilteringPostingsFormat postingsFormat;
|
||||
private final BloomFilterPostingsFormat postingsFormat;
|
||||
|
||||
@Inject
|
||||
public BloomFilterPostingsFormatProvider(@IndexSettings Settings indexSettings, @Nullable Map<String, Factory> postingFormatFactories, @Assisted String name, @Assisted Settings postingsFormatSettings) {
|
||||
super(name);
|
||||
this.desiredMaxSaturation = postingsFormatSettings.getAsFloat("desired_max_saturation", Defaults.MAX_SATURATION);
|
||||
this.saturationLimit = postingsFormatSettings.getAsFloat("saturation_limit", Defaults.SATURATION_LIMIT);
|
||||
this.delegate = Helper.lookup(indexSettings, postingsFormatSettings.get("delegate"), postingFormatFactories);
|
||||
this.postingsFormat = new BloomFilteringPostingsFormat(
|
||||
this.postingsFormat = new BloomFilterPostingsFormat(
|
||||
delegate.get(),
|
||||
new CustomBloomFilterFactory(desiredMaxSaturation, saturationLimit)
|
||||
BloomFilter.Factory.buildFromString(indexSettings.get("fpp"))
|
||||
);
|
||||
}
|
||||
|
||||
public float desiredMaxSaturation() {
|
||||
return desiredMaxSaturation;
|
||||
}
|
||||
|
||||
public float saturationLimit() {
|
||||
return saturationLimit;
|
||||
}
|
||||
|
||||
public PostingsFormatProvider delegate() {
|
||||
return delegate;
|
||||
}
|
||||
|
@ -75,32 +53,4 @@ public class BloomFilterPostingsFormatProvider extends AbstractPostingsFormatPro
|
|||
public PostingsFormat get() {
|
||||
return postingsFormat;
|
||||
}
|
||||
|
||||
public static class CustomBloomFilterFactory extends BloomFilterFactory {
|
||||
|
||||
private final float desiredMaxSaturation;
|
||||
private final float saturationLimit;
|
||||
|
||||
public CustomBloomFilterFactory() {
|
||||
this(Defaults.MAX_SATURATION, Defaults.SATURATION_LIMIT);
|
||||
}
|
||||
|
||||
CustomBloomFilterFactory(float desiredMaxSaturation, float saturationLimit) {
|
||||
this.desiredMaxSaturation = desiredMaxSaturation;
|
||||
this.saturationLimit = saturationLimit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FuzzySet getSetForField(SegmentWriteState state, FieldInfo info) {
|
||||
//Assume all of the docs have a unique term (e.g. a primary key) and we hope to maintain a set with desiredMaxSaturation% of bits set
|
||||
return FuzzySet.createSetBasedOnQuality(state.segmentInfo.getDocCount(), desiredMaxSaturation);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSaturated(FuzzySet bloomFilter, FieldInfo fieldInfo) {
|
||||
// Don't bother saving bitsets if > saturationLimit % of bits are set - we don't want to
|
||||
// throw any more memory at this problem.
|
||||
return bloomFilter.getSaturation() > saturationLimit;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,23 +37,23 @@ import org.elasticsearch.common.collect.MapBuilder;
|
|||
* its terms and postings directly into memory. Note this postings format is
|
||||
* very memory intensive and has certain limitation that don't allow segments to
|
||||
* grow beyond 2.1GB see {@link DirectPostingsFormat} for details.</li>
|
||||
*
|
||||
* <p/>
|
||||
* <li><b>memory</b>: a postings format that stores its entire terms, postings,
|
||||
* positions and payloads in a finite state transducer. This format should only
|
||||
* be used for primary keys or with fields where each term is contained in a
|
||||
* very low number of documents.</li>
|
||||
*
|
||||
* <p/>
|
||||
* <li><b>pulsing</b>: a postings format in-lines the posting lists for very low
|
||||
* frequent terms in the term dictionary. This is useful to improve lookup
|
||||
* performance for low-frequent terms.</li>
|
||||
*
|
||||
* <p/>
|
||||
* <li><b>bloom_default</b>: a postings format that uses a bloom filter to
|
||||
* improve term lookup performance. This is useful for primarily keys or fields
|
||||
* that are used as a delete key</li>
|
||||
*
|
||||
* <p/>
|
||||
* <li><b>bloom_pulsing</b>: a postings format that combines the advantages of
|
||||
* <b>bloom</b> and <b>pulsing</b> to further improve lookup performance</li>
|
||||
*
|
||||
* <p/>
|
||||
* <li><b>default</b>: the default Elasticsearch postings format offering best
|
||||
* general purpose performance. This format is used if no postings format is
|
||||
* specified in the field mapping.</li>
|
||||
|
@ -73,13 +73,23 @@ public class PostingFormats {
|
|||
buildInPostingFormatsX.put("memory", new PreBuiltPostingsFormatProvider.Factory("memory", new MemoryPostingsFormat()));
|
||||
// LUCENE UPGRADE: Need to change this to the relevant ones on a lucene upgrade
|
||||
buildInPostingFormatsX.put("pulsing", new PreBuiltPostingsFormatProvider.Factory("pulsing", new Pulsing41PostingsFormat()));
|
||||
buildInPostingFormatsX.put("bloom_pulsing", new PreBuiltPostingsFormatProvider.Factory("bloom_pulsing", new BloomFilteringPostingsFormat(new Pulsing41PostingsFormat(), new BloomFilterPostingsFormatProvider.CustomBloomFilterFactory())));
|
||||
buildInPostingFormatsX.put("default", new PreBuiltPostingsFormatProvider.Factory("default", new Lucene41PostingsFormat()));
|
||||
buildInPostingFormatsX.put("bloom_default", new PreBuiltPostingsFormatProvider.Factory("bloom_default", new BloomFilteringPostingsFormat(new Lucene41PostingsFormat(), new BloomFilterPostingsFormatProvider.CustomBloomFilterFactory())));
|
||||
|
||||
buildInPostingFormatsX.put("bloom_pulsing", new PreBuiltPostingsFormatProvider.Factory("bloom_pulsing", wrapInBloom(new Pulsing41PostingsFormat())));
|
||||
buildInPostingFormatsX.put("bloom_default", new PreBuiltPostingsFormatProvider.Factory("bloom_default", wrapInBloom(new Lucene41PostingsFormat())));
|
||||
|
||||
builtInPostingFormats = buildInPostingFormatsX.immutableMap();
|
||||
}
|
||||
|
||||
public static final boolean luceneBloomFilter = false;
|
||||
|
||||
static PostingsFormat wrapInBloom(PostingsFormat delegate) {
|
||||
if (luceneBloomFilter) {
|
||||
return new BloomFilteringPostingsFormat(delegate, new BloomFilterLucenePostingsFormatProvider.CustomBloomFilterFactory());
|
||||
}
|
||||
return new BloomFilterPostingsFormat(delegate, BloomFilter.Factory.DEFAULT);
|
||||
}
|
||||
|
||||
public static PostingsFormatProvider.Factory getAsFactory(String name) {
|
||||
return builtInPostingFormats.get(name);
|
||||
}
|
||||
|
|
|
@ -69,11 +69,12 @@ public interface PostingsFormatProvider {
|
|||
* and the formats name as the key are passed to the factory.
|
||||
* </p>
|
||||
*
|
||||
* @param indexSettings the index settings to configure the postings format
|
||||
* @param name the name of the postings format to lookup
|
||||
* @param indexSettings the index settings to configure the postings format
|
||||
* @param name the name of the postings format to lookup
|
||||
* @param postingFormatFactories the factory mapping to lookup the {@link Factory} to create the {@link PostingsFormatProvider}
|
||||
* @return a fully configured {@link PostingsFormatProvider} for the given name.
|
||||
* @throws ElasticSearchIllegalArgumentException if the no {@link PostingsFormatProvider} for the given name parameter could be found.
|
||||
* @throws ElasticSearchIllegalArgumentException
|
||||
* if the no {@link PostingsFormatProvider} for the given name parameter could be found.
|
||||
*/
|
||||
public static PostingsFormatProvider lookup(@IndexSettings Settings indexSettings, String name, Map<String, Factory> postingFormatFactories) throws ElasticSearchIllegalArgumentException {
|
||||
Factory factory = postingFormatFactories.get(name);
|
||||
|
@ -100,7 +101,7 @@ public interface PostingsFormatProvider {
|
|||
|
||||
/**
|
||||
* A simple factory used to create {@link PostingsFormatProvider} used by
|
||||
* delegating providers like {@link BloomFilterPostingsFormatProvider} or
|
||||
* delegating providers like {@link BloomFilterLucenePostingsFormatProvider} or
|
||||
* {@link PulsingPostingsFormatProvider}. Those providers wrap other
|
||||
* postings formats to enrich their capabilities.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
org.elasticsearch.index.codec.postingsformat.BloomFilterPostingsFormat
|
|
@ -0,0 +1,42 @@
|
|||
package org.elasticsearch.benchmark.bloom;
|
||||
|
||||
import org.apache.lucene.codecs.bloom.FuzzySet;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.elasticsearch.common.UUID;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.SizeValue;
|
||||
import org.elasticsearch.index.codec.postingsformat.BloomFilter;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class BloomBench {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
final int ELEMENTS = (int) SizeValue.parseSizeValue("1m").singles();
|
||||
final double fpp = 0.01;
|
||||
BloomFilter gFilter = BloomFilter.create(ELEMENTS, fpp);
|
||||
System.out.println("G SIZE: " + new ByteSizeValue(gFilter.getSizeInBytes()));
|
||||
|
||||
FuzzySet lFilter = FuzzySet.createSetBasedOnMaxMemory((int) gFilter.getSizeInBytes());
|
||||
//FuzzySet lFilter = FuzzySet.createSetBasedOnQuality(ELEMENTS, 0.97f);
|
||||
|
||||
for (int i = 0; i < ELEMENTS; i++) {
|
||||
BytesRef bytesRef = new BytesRef(UUID.randomBase64UUID());
|
||||
gFilter.put(bytesRef);
|
||||
lFilter.addValue(bytesRef);
|
||||
}
|
||||
|
||||
int lFalse = 0;
|
||||
int gFalse = 0;
|
||||
for (int i = 0; i < ELEMENTS; i++) {
|
||||
BytesRef bytesRef = new BytesRef(UUID.randomBase64UUID());
|
||||
if (gFilter.mightContain(bytesRef)) {
|
||||
gFalse++;
|
||||
}
|
||||
if (lFilter.contains(bytesRef) == FuzzySet.ContainsResult.MAYBE) {
|
||||
lFalse++;
|
||||
}
|
||||
}
|
||||
System.out.println("Failed positives, g[" + gFalse + "], l[" + lFalse + "]");
|
||||
}
|
||||
}
|
|
@ -22,7 +22,6 @@ package org.elasticsearch.test.unit.index.codec;
|
|||
import org.apache.lucene.codecs.Codec;
|
||||
import org.apache.lucene.codecs.bloom.BloomFilteringPostingsFormat;
|
||||
import org.apache.lucene.codecs.lucene40.Lucene40Codec;
|
||||
import org.apache.lucene.codecs.lucene40.Lucene40PostingsFormat;
|
||||
import org.apache.lucene.codecs.lucene41.Lucene41Codec;
|
||||
import org.apache.lucene.codecs.lucene41.Lucene41PostingsFormat;
|
||||
import org.apache.lucene.codecs.memory.DirectPostingsFormat;
|
||||
|
@ -77,12 +76,22 @@ public class CodecTests {
|
|||
assertThat(postingsFormatService.get("Lucene41").get(), instanceOf(((PerFieldPostingsFormat) Codec.getDefault().postingsFormat()).getPostingsFormatForField(null).getClass()));
|
||||
|
||||
assertThat(postingsFormatService.get("bloom_default"), instanceOf(PreBuiltPostingsFormatProvider.class));
|
||||
assertThat(postingsFormatService.get("bloom_default").get(), instanceOf(BloomFilteringPostingsFormat.class));
|
||||
if (PostingFormats.luceneBloomFilter) {
|
||||
assertThat(postingsFormatService.get("bloom_default").get(), instanceOf(BloomFilteringPostingsFormat.class));
|
||||
} else {
|
||||
assertThat(postingsFormatService.get("bloom_default").get(), instanceOf(BloomFilterPostingsFormat.class));
|
||||
}
|
||||
assertThat(postingsFormatService.get("BloomFilter"), instanceOf(PreBuiltPostingsFormatProvider.class));
|
||||
assertThat(postingsFormatService.get("BloomFilter").get(), instanceOf(BloomFilteringPostingsFormat.class));
|
||||
|
||||
assertThat(postingsFormatService.get("bloom_pulsing"), instanceOf(PreBuiltPostingsFormatProvider.class));
|
||||
assertThat(postingsFormatService.get("bloom_pulsing").get(), instanceOf(BloomFilteringPostingsFormat.class));
|
||||
assertThat(postingsFormatService.get("XBloomFilter"), instanceOf(PreBuiltPostingsFormatProvider.class));
|
||||
assertThat(postingsFormatService.get("XBloomFilter").get(), instanceOf(BloomFilterPostingsFormat.class));
|
||||
|
||||
if (PostingFormats.luceneBloomFilter) {
|
||||
assertThat(postingsFormatService.get("bloom_pulsing").get(), instanceOf(BloomFilteringPostingsFormat.class));
|
||||
} else {
|
||||
assertThat(postingsFormatService.get("bloom_pulsing").get(), instanceOf(BloomFilterPostingsFormat.class));
|
||||
}
|
||||
|
||||
assertThat(postingsFormatService.get("pulsing"), instanceOf(PreBuiltPostingsFormatProvider.class));
|
||||
assertThat(postingsFormatService.get("pulsing").get(), instanceOf(Pulsing41PostingsFormat.class));
|
||||
|
@ -203,7 +212,7 @@ public class CodecTests {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testResolvePostingFormatsFromMapping_bloom() throws Exception {
|
||||
public void testResolvePostingFormatsFromMappingLuceneBloom() throws Exception {
|
||||
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
|
||||
.startObject("properties")
|
||||
.startObject("field1").field("type", "string").field("postings_format", "bloom_default").endObject()
|
||||
|
@ -213,7 +222,7 @@ public class CodecTests {
|
|||
.endObject().endObject().string();
|
||||
|
||||
Settings indexSettings = ImmutableSettings.settingsBuilder()
|
||||
.put("index.codec.postings_format.my_format1.type", "bloom_filter")
|
||||
.put("index.codec.postings_format.my_format1.type", "bloom_filter_lucene")
|
||||
.put("index.codec.postings_format.my_format1.desired_max_saturation", 0.2f)
|
||||
.put("index.codec.postings_format.my_format1.saturation_limit", 0.8f)
|
||||
.put("index.codec.postings_format.my_format1.delegate", "delegate1")
|
||||
|
@ -224,13 +233,21 @@ public class CodecTests {
|
|||
CodecService codecService = createCodecService(indexSettings);
|
||||
DocumentMapper documentMapper = codecService.mapperService().documentMapperParser().parse(mapping);
|
||||
assertThat(documentMapper.mappers().name("field1").mapper().postingsFormatProvider(), instanceOf(PreBuiltPostingsFormatProvider.class));
|
||||
assertThat(documentMapper.mappers().name("field1").mapper().postingsFormatProvider().get(), instanceOf(BloomFilteringPostingsFormat.class));
|
||||
if (PostingFormats.luceneBloomFilter) {
|
||||
assertThat(documentMapper.mappers().name("field1").mapper().postingsFormatProvider().get(), instanceOf(BloomFilteringPostingsFormat.class));
|
||||
} else {
|
||||
assertThat(documentMapper.mappers().name("field1").mapper().postingsFormatProvider().get(), instanceOf(BloomFilterPostingsFormat.class));
|
||||
}
|
||||
|
||||
assertThat(documentMapper.mappers().name("field2").mapper().postingsFormatProvider(), instanceOf(PreBuiltPostingsFormatProvider.class));
|
||||
assertThat(documentMapper.mappers().name("field2").mapper().postingsFormatProvider().get(), instanceOf(BloomFilteringPostingsFormat.class));
|
||||
if (PostingFormats.luceneBloomFilter) {
|
||||
assertThat(documentMapper.mappers().name("field2").mapper().postingsFormatProvider().get(), instanceOf(BloomFilteringPostingsFormat.class));
|
||||
} else {
|
||||
assertThat(documentMapper.mappers().name("field2").mapper().postingsFormatProvider().get(), instanceOf(BloomFilterPostingsFormat.class));
|
||||
}
|
||||
|
||||
assertThat(documentMapper.mappers().name("field3").mapper().postingsFormatProvider(), instanceOf(BloomFilterPostingsFormatProvider.class));
|
||||
BloomFilterPostingsFormatProvider provider = (BloomFilterPostingsFormatProvider) documentMapper.mappers().name("field3").mapper().postingsFormatProvider();
|
||||
assertThat(documentMapper.mappers().name("field3").mapper().postingsFormatProvider(), instanceOf(BloomFilterLucenePostingsFormatProvider.class));
|
||||
BloomFilterLucenePostingsFormatProvider provider = (BloomFilterLucenePostingsFormatProvider) documentMapper.mappers().name("field3").mapper().postingsFormatProvider();
|
||||
assertThat(provider.desiredMaxSaturation(), equalTo(0.2f));
|
||||
assertThat(provider.saturationLimit(), equalTo(0.8f));
|
||||
assertThat(provider.delegate(), instanceOf(DirectPostingsFormatProvider.class));
|
||||
|
|
Loading…
Reference in New Issue