Implement custom circuit breaker estimator for parent/child field data

This commit is contained in:
Lee Hinman 2014-03-04 09:37:38 -07:00
parent 2bcede168c
commit 23471cd72c

View File

@ -21,20 +21,16 @@ package org.elasticsearch.index.fielddata.plain;
import com.carrotsearch.hppc.ObjectObjectOpenHashMap;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.DocsEnum;
import org.apache.lucene.index.*;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.PagedBytes;
import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.AbstractIndexFieldData;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.*;
import org.elasticsearch.index.fielddata.fieldcomparator.BytesRefFieldComparatorSource;
import org.elasticsearch.index.fielddata.fieldcomparator.SortMode;
import org.elasticsearch.index.fielddata.ordinals.Ordinals;
@ -53,6 +49,8 @@ import java.util.NavigableSet;
import java.util.TreeSet;
/**
* ParentChildIndexFieldData is responsible for loading the id cache mapping
* needed for has_child and has_parent queries into memory.
*/
public class ParentChildIndexFieldData extends AbstractIndexFieldData<ParentChildAtomicFieldData> implements DocumentTypeListener {
@ -63,7 +61,9 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<ParentChil
// while loading.
private final Object lock = new Object();
public ParentChildIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndexFieldDataCache cache, MapperService mapperService, CircuitBreakerService breakerService) {
public ParentChildIndexFieldData(Index index, @IndexSettings Settings indexSettings, FieldMapper.Names fieldNames,
FieldDataType fieldDataType, IndexFieldDataCache cache, MapperService mapperService,
CircuitBreakerService breakerService) {
super(index, indexSettings, fieldNames, fieldDataType, cache);
parentTypes = new TreeSet<BytesRef>(BytesRef.getUTF8SortedAsUnicodeComparator());
this.breakerService = breakerService;
@ -86,8 +86,6 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<ParentChil
@Override
public ParentChildAtomicFieldData loadDirect(AtomicReaderContext context) throws Exception {
AtomicReader reader = context.reader();
// TODO: Implement a custom estimator for p/c field data
NonEstimatingEstimator estimator = new NonEstimatingEstimator(breakerService.getBreaker());
final float acceptableTransientOverheadRatio = fieldDataType.getSettings().getAsFloat(
"acceptable_transient_overhead_ratio", OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO
);
@ -99,10 +97,17 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<ParentChil
new ParentChildIntersectTermsEnum(reader, UidFieldMapper.NAME, ParentFieldMapper.NAME),
parentTypes
);
ParentChildEstimator estimator = new ParentChildEstimator(breakerService.getBreaker(), termsEnum);
TermsEnum estimatedTermsEnum = estimator.beforeLoad(null);
ObjectObjectOpenHashMap<String, TypeBuilder> typeBuilders = ObjectObjectOpenHashMap.newInstance();
try {
DocsEnum docsEnum = null;
for (BytesRef term = termsEnum.next(); term != null; term = termsEnum.next()) {
for (BytesRef term = estimatedTermsEnum.next(); term != null; term = estimatedTermsEnum.next()) {
// Usually this would be estimatedTermsEnum, but the
// abstract TermsEnum class does not support the .type()
// and .id() methods, so we skip using the wrapped
// TermsEnum and delegate directly to the
// ParentChildFilteredTermsEnum that was originally wrapped
String type = termsEnum.type();
TypeBuilder typeBuilder = typeBuilders.get(type);
if (typeBuilder == null) {
@ -113,7 +118,7 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<ParentChil
final long termOrd = typeBuilder.builder.nextOrdinal();
assert termOrd == typeBuilder.termOrdToBytesOffset.size();
typeBuilder.termOrdToBytesOffset.add(typeBuilder.bytes.copyUsingLengthPrefix(id));
docsEnum = termsEnum.docs(null, docsEnum, DocsEnum.FLAG_NONE);
docsEnum = estimatedTermsEnum.docs(null, docsEnum, DocsEnum.FLAG_NONE);
for (int docId = docsEnum.nextDoc(); docId != DocsEnum.NO_MORE_DOCS; docId = docsEnum.nextDoc()) {
typeBuilder.builder.addDoc(docId);
}
@ -138,7 +143,7 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<ParentChil
cursor.value.builder.close();
}
if (success) {
estimator.afterLoad(null, data.getMemorySizeInBytes());
estimator.afterLoad(estimatedTermsEnum, data.getMemorySizeInBytes());
}
}
}
@ -194,4 +199,53 @@ public class ParentChildIndexFieldData extends AbstractIndexFieldData<ParentChil
}
}
/**
* Estimator that wraps parent/child id field data by wrapping the data
* in a RamAccountingTermsEnum.
*/
public class ParentChildEstimator implements PerValueEstimator {
private final MemoryCircuitBreaker breaker;
private final TermsEnum filteredEnum;
// The TermsEnum is passed in here instead of being generated in the
// beforeLoad() function since it's filtered inside the previous
// TermsEnum wrappers
public ParentChildEstimator(MemoryCircuitBreaker breaker, TermsEnum filteredEnum) {
this.breaker = breaker;
this.filteredEnum = filteredEnum;
}
/**
* General overhead for ids is 2 times the length of the ID
*/
@Override
public long bytesPerValue(BytesRef term) {
if (term == null) {
return 0;
}
return 2 * term.length;
}
/**
* Wraps the already filtered {@link TermsEnum} in a
* {@link RamAccountingTermsEnum} and returns it
*/
@Override
public TermsEnum beforeLoad(Terms terms) throws IOException {
return new RamAccountingTermsEnum(filteredEnum, breaker, this);
}
/**
* Adjusts the breaker based on the difference between the actual usage
* and the aggregated estimations.
*/
@Override
public void afterLoad(TermsEnum termsEnum, long actualUsed) {
assert termsEnum instanceof RamAccountingTermsEnum;
long estimatedBytes = ((RamAccountingTermsEnum) termsEnum).getTotalBytes();
breaker.addWithoutBreaking(-(estimatedBytes - actualUsed));
}
}
}