Added a AppendingDeltaPackedLongBuffer-based storage format to single value field data

The AppendingDeltaPackedLongBuffer uses delta compression in paged fashion. For data which is roughly monotonic this results in reduced memory signature.

By default we use the storage format expected to use the least memory. You can force a choice using a new field data setting `memory_storage_hint` which can be set to `ORDINALS`, `PACKED` or `PAGED`

Closes #5706
This commit is contained in:
Boaz Leskes 2014-03-27 20:43:36 +01:00
parent e8ea9d7585
commit 1d1ca3befc
12 changed files with 662 additions and 68 deletions

View File

@ -41,14 +41,31 @@ import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
public interface IndexFieldData<FD extends AtomicFieldData> extends IndexComponent {
public static class CommonSettings {
public static String SETTING_MEMORY_STORAGE_HINT = "memory_storage_hint";
public enum MemoryStorageFormat {
ORDINALS, PACKED, PAGED;
public static MemoryStorageFormat fromString(String string) {
for (MemoryStorageFormat e : MemoryStorageFormat.values()) {
if (e.name().equalsIgnoreCase(string)) {
return e;
}
}
return null;
}
}
/**
* Should single value cross documents case be optimized to remove ords. Note, this optimization
* might not be supported by all Field Data implementations, but the ones that do, should consult
* this method to check if it should be done or not.
* Gets a memory storage hint that should be honored if possible but is not mandatory
*/
public static boolean removeOrdsOnSingleValue(FieldDataType fieldDataType) {
return !"always".equals(fieldDataType.getSettings().get("ordinals"));
public static MemoryStorageFormat getMemoryStorageHint(FieldDataType fieldDataType) {
// backwards compatibility
String s = fieldDataType.getSettings().get("ordinals");
if (s != null) {
return "always".equals(s) ? MemoryStorageFormat.ORDINALS : null;
}
return MemoryStorageFormat.fromString(fieldDataType.getSettings().get(SETTING_MEMORY_STORAGE_HINT));
}
}

View File

@ -98,7 +98,9 @@ public class DoubleArrayIndexFieldData extends AbstractIndexFieldData<DoubleArra
values.add(NumericUtils.sortableLongToDouble(NumericUtils.prefixCodedToLong(term)));
}
Ordinals build = builder.build(fieldDataType.getSettings());
if (!build.isMultiValued() && CommonSettings.removeOrdsOnSingleValue(fieldDataType)) {
if (build.isMultiValued() || CommonSettings.getMemoryStorageHint(fieldDataType) == CommonSettings.MemoryStorageFormat.ORDINALS) {
data = new DoubleArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
} else {
Docs ordinals = build.ordinals();
final FixedBitSet set = builder.buildDocsWithValuesSet();
@ -123,8 +125,6 @@ public class DoubleArrayIndexFieldData extends AbstractIndexFieldData<DoubleArra
} else {
data = new DoubleArrayAtomicFieldData.SingleFixedSet(sValues, maxDoc, set, ordinals.getNumOrds());
}
} else {
data = new DoubleArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
}
success = true;
return data;

View File

@ -97,7 +97,9 @@ public class FloatArrayIndexFieldData extends AbstractIndexFieldData<FloatArrayA
values.add(NumericUtils.sortableIntToFloat(NumericUtils.prefixCodedToInt(term)));
}
Ordinals build = builder.build(fieldDataType.getSettings());
if (!build.isMultiValued() && CommonSettings.removeOrdsOnSingleValue(fieldDataType)) {
if (build.isMultiValued() || CommonSettings.getMemoryStorageHint(fieldDataType) == CommonSettings.MemoryStorageFormat.ORDINALS) {
data = new FloatArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
} else {
Docs ordinals = build.ordinals();
final FixedBitSet set = builder.buildDocsWithValuesSet();
@ -122,8 +124,6 @@ public class FloatArrayIndexFieldData extends AbstractIndexFieldData<FloatArrayA
} else {
data = new FloatArrayAtomicFieldData.SingleFixedSet(sValues, maxDoc, set, ordinals.getNumOrds());
}
} else {
data = new FloatArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
}
success = true;
return data;

View File

@ -116,7 +116,13 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField
}
Ordinals build = builder.build(fieldDataType.getSettings());
if (!build.isMultiValued() && CommonSettings.removeOrdsOnSingleValue(fieldDataType)) {
if (build.isMultiValued() || CommonSettings.getMemoryStorageHint(fieldDataType) == CommonSettings.MemoryStorageFormat.ORDINALS) {
if (lat.size() != build.getMaxOrd()) {
lat = lat.resize(build.getMaxOrd());
lon = lon.resize(build.getMaxOrd());
}
data = new GeoPointCompressedAtomicFieldData.WithOrdinals(encoding, lon, lat, reader.maxDoc(), build);
} else {
Docs ordinals = build.ordinals();
int maxDoc = reader.maxDoc();
PagedMutable sLat = new PagedMutable(reader.maxDoc(), pageSize, encoding.numBitsPerCoordinate(), PackedInts.COMPACT);
@ -132,12 +138,6 @@ public class GeoPointCompressedIndexFieldData extends AbstractGeoPointIndexField
} else {
data = new GeoPointCompressedAtomicFieldData.SingleFixedSet(encoding, sLon, sLat, reader.maxDoc(), set, ordinals.getNumOrds());
}
} else {
if (lat.size() != build.getMaxOrd()) {
lat = lat.resize(build.getMaxOrd());
lon = lon.resize(build.getMaxOrd());
}
data = new GeoPointCompressedAtomicFieldData.WithOrdinals(encoding, lon, lat, reader.maxDoc(), build);
}
success = true;
return data;

View File

@ -85,7 +85,7 @@ public class GeoPointDoubleArrayIndexFieldData extends AbstractGeoPointIndexFiel
}
Ordinals build = builder.build(fieldDataType.getSettings());
if (!build.isMultiValued() && CommonSettings.removeOrdsOnSingleValue(fieldDataType)) {
if (!(build.isMultiValued() || CommonSettings.getMemoryStorageHint(fieldDataType) == CommonSettings.MemoryStorageFormat.ORDINALS)) {
Docs ordinals = build.ordinals();
int maxDoc = reader.maxDoc();
BigDoubleArrayList sLat = new BigDoubleArrayList(reader.maxDoc());

View File

@ -19,7 +19,9 @@
package org.elasticsearch.index.fielddata.plain;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.RamUsageEstimator;
import org.apache.lucene.util.packed.AppendingDeltaPackedLongBuffer;
import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer;
import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.index.fielddata.*;
@ -236,7 +238,7 @@ public abstract class PackedArrayAtomicFieldData extends AbstractAtomicNumericFi
@Override
public long nextValue() {
return minValue + values.get(docId);
return minValue + values.get(docId);
}
}
@ -261,7 +263,7 @@ public abstract class PackedArrayAtomicFieldData extends AbstractAtomicNumericFi
@Override
public double nextValue() {
return minValue + values.get(docId);
return minValue + values.get(docId);
}
}
}
@ -330,7 +332,7 @@ public abstract class PackedArrayAtomicFieldData extends AbstractAtomicNumericFi
public long nextValue() {
return minValue + values.get(docId);
}
}
@ -358,4 +360,186 @@ public abstract class PackedArrayAtomicFieldData extends AbstractAtomicNumericFi
}
}
/**
* A single valued case, where all values are "set" and are stored in a paged wise manner for better compression.
*/
public static class PagedSingle extends PackedArrayAtomicFieldData {
private final AppendingDeltaPackedLongBuffer values;
private final long numOrds;
/**
* Note, here, we assume that there is no offset by 1 from docId, so position 0
* is the value for docId 0.
*/
public PagedSingle(AppendingDeltaPackedLongBuffer values, int numDocs, long numOrds) {
super(numDocs);
this.values = values;
this.numOrds = numOrds;
}
@Override
public boolean isMultiValued() {
return false;
}
@Override
public long getNumberUniqueValues() {
return numOrds;
}
@Override
public long getMemorySizeInBytes() {
if (size == -1) {
size = values.ramBytesUsed();
}
return size;
}
@Override
public LongValues getLongValues() {
return new LongValues(values);
}
@Override
public DoubleValues getDoubleValues() {
return new DoubleValues(values);
}
static class LongValues extends DenseLongValues {
private final AppendingDeltaPackedLongBuffer values;
LongValues(AppendingDeltaPackedLongBuffer values) {
super(false);
this.values = values;
}
@Override
public long nextValue() {
return values.get(docId);
}
}
static class DoubleValues extends org.elasticsearch.index.fielddata.DoubleValues {
private final AppendingDeltaPackedLongBuffer values;
DoubleValues(AppendingDeltaPackedLongBuffer values) {
super(false);
this.values = values;
}
@Override
public int setDocument(int docId) {
this.docId = docId;
return 1;
}
@Override
public double nextValue() {
return values.get(docId);
}
}
}
/**
* A single valued case, where not all values are "set", so we have a special
* value which encodes the fact that the document has no value. The data is stored in
* a paged wise manner for better compression.
*/
public static class PagedSingleSparse extends PackedArrayAtomicFieldData {
private final AppendingDeltaPackedLongBuffer values;
private final FixedBitSet docsWithValue;
private final long numOrds;
public PagedSingleSparse(AppendingDeltaPackedLongBuffer values, int numDocs, FixedBitSet docsWithValue, long numOrds) {
super(numDocs);
this.values = values;
this.docsWithValue = docsWithValue;
this.numOrds = numOrds;
}
@Override
public boolean isMultiValued() {
return false;
}
@Override
public long getNumberUniqueValues() {
return numOrds;
}
@Override
public long getMemorySizeInBytes() {
if (size == -1) {
size = values.ramBytesUsed() + 2 * RamUsageEstimator.NUM_BYTES_LONG;
}
return size;
}
@Override
public LongValues getLongValues() {
return new LongValues(values, docsWithValue);
}
@Override
public DoubleValues getDoubleValues() {
return new DoubleValues(values, docsWithValue);
}
static class LongValues extends org.elasticsearch.index.fielddata.LongValues {
private final AppendingDeltaPackedLongBuffer values;
private final FixedBitSet docsWithValue;
LongValues(AppendingDeltaPackedLongBuffer values, FixedBitSet docsWithValue) {
super(false);
this.values = values;
this.docsWithValue = docsWithValue;
}
@Override
public int setDocument(int docId) {
this.docId = docId;
return docsWithValue.get(docId) ? 1 : 0;
}
@Override
public long nextValue() {
return values.get(docId);
}
}
static class DoubleValues extends org.elasticsearch.index.fielddata.DoubleValues {
private final AppendingDeltaPackedLongBuffer values;
private final FixedBitSet docsWithValue;
DoubleValues(AppendingDeltaPackedLongBuffer values, FixedBitSet docsWithValue) {
super(false);
this.values = values;
this.docsWithValue = docsWithValue;
}
@Override
public int setDocument(int docId) {
this.docId = docId;
return docsWithValue.get(docId) ? 1 : 0;
}
@Override
public double nextValue() {
return values.get(docId);
}
}
}
}

View File

@ -24,17 +24,15 @@ import org.apache.lucene.index.AtomicReader;
import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.apache.lucene.util.FixedBitSet;
import org.apache.lucene.util.NumericUtils;
import org.apache.lucene.util.*;
import org.apache.lucene.util.packed.AppendingDeltaPackedLongBuffer;
import org.apache.lucene.util.packed.MonotonicAppendingLongBuffer;
import org.apache.lucene.util.packed.PackedInts;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.breaker.MemoryCircuitBreaker;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.RamAccountingTermsEnum;
import org.elasticsearch.index.fielddata.*;
import org.elasticsearch.index.fielddata.fieldcomparator.LongValuesComparatorSource;
import org.elasticsearch.index.fielddata.fieldcomparator.SortMode;
@ -128,10 +126,13 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData<AtomicNume
values.add(value);
}
Ordinals build = builder.build(fieldDataType.getSettings());
CommonSettings.MemoryStorageFormat formatHint = CommonSettings.getMemoryStorageHint(fieldDataType);
if (!build.isMultiValued() && CommonSettings.removeOrdsOnSingleValue(fieldDataType)) {
if (build.isMultiValued() || formatHint == CommonSettings.MemoryStorageFormat.ORDINALS) {
data = new PackedArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
} else {
Docs ordinals = build.ordinals();
final FixedBitSet set = builder.buildDocsWithValuesSet();
final FixedBitSet docsWithValues = builder.buildDocsWithValuesSet();
long minValue, maxValue;
minValue = maxValue = 0;
@ -142,7 +143,7 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData<AtomicNume
// Encode document without a value with a special value
long missingValue = 0;
if (set != null) {
if (docsWithValues != null) {
if ((maxValue - minValue + 1) == values.size()) {
// values are dense
if (minValue > Long.MIN_VALUE) {
@ -159,40 +160,66 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData<AtomicNume
}
}
}
missingValue -= minValue; // delta
}
final long delta = maxValue - minValue;
final int bitsRequired = delta < 0 ? 64 : PackedInts.bitsRequired(delta);
final long valuesDelta = maxValue - minValue;
final float acceptableOverheadRatio = fieldDataType.getSettings().getAsFloat("acceptable_overhead_ratio", PackedInts.DEFAULT);
final PackedInts.FormatAndBits formatAndBits = PackedInts.fastestFormatAndBits(reader.maxDoc(), bitsRequired, acceptableOverheadRatio);
final int pageSize = fieldDataType.getSettings().getAsInt("single_value_page_size", 1024);
// there's sweet spot where due to low unique value count, using ordinals will consume less memory
final long singleValuesSize = formatAndBits.format.longCount(PackedInts.VERSION_CURRENT, reader.maxDoc(), formatAndBits.bitsPerValue) * 8L;
final long uniqueValuesSize = values.ramBytesUsed();
final long ordinalsSize = build.getMemorySizeInBytes();
if (uniqueValuesSize + ordinalsSize < singleValuesSize) {
data = new PackedArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
} else {
final PackedInts.Mutable sValues = PackedInts.getMutable(reader.maxDoc(), bitsRequired, acceptableOverheadRatio);
if (missingValue != 0) {
sValues.fill(0, sValues.size(), missingValue);
}
for (int i = 0; i < reader.maxDoc(); i++) {
final long ord = ordinals.getOrd(i);
if (ord != Ordinals.MISSING_ORDINAL) {
sValues.set(i, values.get(ord - 1) - minValue);
}
}
if (set == null) {
data = new PackedArrayAtomicFieldData.Single(sValues, minValue, reader.maxDoc(), ordinals.getNumOrds());
} else {
data = new PackedArrayAtomicFieldData.SingleSparse(sValues, minValue, reader.maxDoc(), missingValue, ordinals.getNumOrds());
}
if (formatHint == null) {
formatHint = chooseStorageFormat(reader, values, build, ordinals, missingValue, valuesDelta, acceptableOverheadRatio, pageSize);
}
} else {
data = new PackedArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
logger.trace("single value format for field [{}] set to [{}]", getFieldNames().fullName(), formatHint);
switch (formatHint) {
case PACKED:
int bitsRequired = valuesDelta < 0 ? 64 : PackedInts.bitsRequired(valuesDelta);
final PackedInts.Mutable sValues = PackedInts.getMutable(reader.maxDoc(), bitsRequired, acceptableOverheadRatio);
if (missingValue != 0) {
missingValue -= minValue;
sValues.fill(0, sValues.size(), missingValue);
}
for (int i = 0; i < reader.maxDoc(); i++) {
final long ord = ordinals.getOrd(i);
if (ord != Ordinals.MISSING_ORDINAL) {
sValues.set(i, values.get(ord - 1) - minValue);
}
}
if (docsWithValues == null) {
data = new PackedArrayAtomicFieldData.Single(sValues, minValue, reader.maxDoc(), ordinals.getNumOrds());
} else {
data = new PackedArrayAtomicFieldData.SingleSparse(sValues, minValue, reader.maxDoc(), missingValue, ordinals.getNumOrds());
}
break;
case PAGED:
final AppendingDeltaPackedLongBuffer dpValues = new AppendingDeltaPackedLongBuffer(reader.maxDoc() / pageSize + 1, pageSize, acceptableOverheadRatio);
long lastValue = 0;
for (int i = 0; i < reader.maxDoc(); i++) {
final long ord = ordinals.getOrd(i);
if (ord != Ordinals.MISSING_ORDINAL) {
lastValue = values.get(ord - 1);
}
dpValues.add(lastValue);
}
dpValues.freeze();
if (docsWithValues == null) {
data = new PackedArrayAtomicFieldData.PagedSingle(dpValues, reader.maxDoc(), ordinals.getNumOrds());
} else {
data = new PackedArrayAtomicFieldData.PagedSingleSparse(dpValues, reader.maxDoc(), docsWithValues, ordinals.getNumOrds());
}
break;
case ORDINALS:
data = new PackedArrayAtomicFieldData.WithOrdinals(values, reader.maxDoc(), build);
break;
default:
throw new ElasticsearchException("unknown memory format: " + formatHint);
}
}
success = true;
@ -210,6 +237,93 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData<AtomicNume
}
protected CommonSettings.MemoryStorageFormat chooseStorageFormat(AtomicReader reader, MonotonicAppendingLongBuffer values, Ordinals build, Docs ordinals,
long missingValue, long valuesDelta, float acceptableOverheadRatio, int pageSize) {
CommonSettings.MemoryStorageFormat format;// estimate format
// valuesDelta can be negative if the difference between max and min values overflows the positive side of longs.
int bitsRequired = valuesDelta < 0 ? 64 : PackedInts.bitsRequired(valuesDelta);
PackedInts.FormatAndBits formatAndBits = PackedInts.fastestFormatAndBits(reader.maxDoc(), bitsRequired, acceptableOverheadRatio);
// there's sweet spot where due to low unique value count, using ordinals will consume less memory
final long singleValuesSize = formatAndBits.format.longCount(PackedInts.VERSION_CURRENT, reader.maxDoc(), formatAndBits.bitsPerValue) * 8L;
final long ordinalsSize = build.getMemorySizeInBytes() + values.ramBytesUsed();
// estimate the memory signature of paged packing
long pagedSingleValuesSize = (reader.maxDoc() / pageSize + 1) * RamUsageEstimator.NUM_BYTES_OBJECT_REF; // array of pages
int pageIndex = 0;
long pageMinOrdinal = Long.MAX_VALUE;
long pageMaxOrdinal = Long.MIN_VALUE;
for (int i = 1; i < reader.maxDoc(); ++i, pageIndex = (pageIndex + 1) % pageSize) {
long ordinal = ordinals.getOrd(i);
if (ordinal != Ordinals.MISSING_ORDINAL) {
pageMaxOrdinal = Math.max(ordinal, pageMaxOrdinal);
pageMinOrdinal = Math.min(ordinal, pageMinOrdinal);
}
if (pageIndex == pageSize - 1) {
// end of page, we now know enough to estimate memory usage
if (pageMaxOrdinal == Long.MAX_VALUE) {
// empty page - will use the null reader which just stores size
pagedSingleValuesSize += RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + RamUsageEstimator.NUM_BYTES_INT);
} else {
long pageMinValue = values.get(pageMinOrdinal - 1);
long pageMaxValue = values.get(pageMaxOrdinal - 1);
long pageDelta = pageMaxValue - pageMinValue;
if (pageDelta != 0) {
bitsRequired = valuesDelta < 0 ? 64 : PackedInts.bitsRequired(pageDelta);
formatAndBits = PackedInts.fastestFormatAndBits(pageSize, bitsRequired, acceptableOverheadRatio);
pagedSingleValuesSize += formatAndBits.format.longCount(PackedInts.VERSION_CURRENT, pageSize, formatAndBits.bitsPerValue) * RamUsageEstimator.NUM_BYTES_LONG;
pagedSingleValuesSize += RamUsageEstimator.NUM_BYTES_LONG; // min value per page storage
} else {
// empty page
pagedSingleValuesSize += RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + RamUsageEstimator.NUM_BYTES_INT);
}
}
pageMinOrdinal = Long.MAX_VALUE;
pageMaxOrdinal = Long.MIN_VALUE;
}
}
if (pageIndex > 0) {
// last page estimation
pageIndex++;
if (pageMaxOrdinal == Long.MAX_VALUE) {
// empty page - will use the null reader which just stores size
pagedSingleValuesSize += RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + RamUsageEstimator.NUM_BYTES_INT);
} else {
long pageMinValue = values.get(pageMinOrdinal - 1);
long pageMaxValue = values.get(pageMaxOrdinal - 1);
long pageDelta = pageMaxValue - pageMinValue;
if (pageDelta != 0) {
bitsRequired = valuesDelta < 0 ? 64 : PackedInts.bitsRequired(pageDelta);
formatAndBits = PackedInts.fastestFormatAndBits(pageSize, bitsRequired, acceptableOverheadRatio);
pagedSingleValuesSize += formatAndBits.format.longCount(PackedInts.VERSION_CURRENT, pageSize, formatAndBits.bitsPerValue) * RamUsageEstimator.NUM_BYTES_LONG;
pagedSingleValuesSize += RamUsageEstimator.NUM_BYTES_LONG; // min value per page storage
} else {
// empty page
pagedSingleValuesSize += RamUsageEstimator.alignObjectSize(RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + RamUsageEstimator.NUM_BYTES_INT);
}
}
}
if (ordinalsSize < singleValuesSize) {
if (ordinalsSize < pagedSingleValuesSize) {
format = CommonSettings.MemoryStorageFormat.ORDINALS;
} else {
format = CommonSettings.MemoryStorageFormat.PAGED;
}
} else {
if (pagedSingleValuesSize < singleValuesSize) {
format = CommonSettings.MemoryStorageFormat.PAGED;
} else {
format = CommonSettings.MemoryStorageFormat.PACKED;
}
}
return format;
}
@Override
public XFieldComparatorSource comparatorSource(@Nullable Object missingValue, SortMode sortMode) {
return new LongValuesComparatorSource(this, missingValue, sortMode);
@ -266,6 +380,7 @@ public class PackedArrayIndexFieldData extends AbstractIndexFieldData<AtomicNume
* Adjust the breaker when no terms were actually loaded, but the field
* data takes up space regardless. For instance, when ordinals are
* used.
*
* @param actualUsed bytes actually used
*/
public void adjustForNoTerms(long actualUsed) {

View File

@ -0,0 +1,257 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.search.aggregations;
import com.google.common.collect.Lists;
import jsr166y.ThreadLocalRandom;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.node.Node;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import java.io.IOException;
import java.util.List;
import java.util.Locale;
import static org.elasticsearch.client.Requests.createIndexRequest;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
/**
*
*/
public class TimeDataHistogramAggregationBenchmark {
static long COUNT = SizeValue.parseSizeValue("5m").singles();
static long TIME_PERIOD = 24 * 3600 * 1000;
static int BATCH = 100;
static int QUERY_WARMUP = 50;
static int QUERY_COUNT = 500;
static IndexFieldData.CommonSettings.MemoryStorageFormat MEMORY_FORMAT = IndexFieldData.CommonSettings.MemoryStorageFormat.PAGED;
static double ACCEPTABLE_OVERHEAD_RATIO = 0.5;
static float MATCH_PERCENTAGE = 0.1f;
static Client client;
public static void main(String[] args) throws Exception {
Settings settings = settingsBuilder()
.put("index.refresh_interval", "-1")
.put("gateway.type", "local")
.put("node.local", true)
.put(SETTING_NUMBER_OF_SHARDS, 1)
.put(SETTING_NUMBER_OF_REPLICAS, 0)
.build();
String clusterName = TimeDataHistogramAggregationBenchmark.class.getSimpleName();
Node[] nodes = new Node[1];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = nodeBuilder().clusterName(clusterName)
.settings(settingsBuilder().put(settings).put("name", "node" + i))
.node();
}
client = nodes[0].client();
Thread.sleep(10000);
try {
client.admin().indices().create(createIndexRequest("test")).actionGet();
StopWatch stopWatch = new StopWatch().start();
System.out.println("--> Indexing [" + COUNT + "] ...");
long ITERS = COUNT / BATCH;
long i = 1;
int counter = 0;
long[] currentTimeInMillis1 = new long[]{System.currentTimeMillis()};
long[] currentTimeInMillis2 = new long[]{System.currentTimeMillis()};
long startTimeInMillis = currentTimeInMillis1[0];
long averageMillisChange = TIME_PERIOD / COUNT * 2;
long backwardSkew = Math.max(1, (long) (averageMillisChange * 0.1));
long bigOutOfOrder = 1;
for (; i <= ITERS; i++) {
BulkRequestBuilder request = client.prepareBulk();
for (int j = 0; j < BATCH; j++) {
counter++;
XContentBuilder builder = jsonBuilder().startObject();
builder.field("id", Integer.toString(counter));
// move forward in time and sometimes a little bit back (delayed delivery)
long diff = ThreadLocalRandom.current().nextLong(2 * averageMillisChange + 2 * backwardSkew) - backwardSkew;
long[] currentTime = counter % 2 == 0 ? currentTimeInMillis1 : currentTimeInMillis2;
currentTime[0] += diff;
if (ThreadLocalRandom.current().nextLong(100) <= bigOutOfOrder) {
builder.field("l_value", currentTime[0] - 60000); // 1m delays
} else {
builder.field("l_value", currentTime[0]);
}
builder.endObject();
request.add(Requests.indexRequest("test").type("type1").id(Integer.toString(counter))
.source(builder));
}
BulkResponse response = request.execute().actionGet();
if (response.hasFailures()) {
System.err.println("--> failures...");
}
if (((i * BATCH) % 10000) == 0) {
System.out.println("--> Indexed " + (i * BATCH) + " took " + stopWatch.stop().lastTaskTime());
stopWatch.start();
}
}
System.out.println("--> Indexing took " + stopWatch.totalTime() + ", TPS " + (((double) (COUNT)) / stopWatch.totalTime().secondsFrac()));
System.out.println("Time range 1: " + (currentTimeInMillis1[0] - startTimeInMillis) / 1000.0 / 3600 + " hours");
System.out.println("Time range 2: " + (currentTimeInMillis2[0] - startTimeInMillis) / 1000.0 / 3600 + " hours");
System.out.println("--> optimizing index");
client.admin().indices().prepareOptimize().setMaxNumSegments(1).get();
} catch (IndexAlreadyExistsException e) {
System.out.println("--> Index already exists, ignoring indexing phase, waiting for green");
ClusterHealthResponse clusterHealthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().setTimeout("10m").execute().actionGet();
if (clusterHealthResponse.isTimedOut()) {
System.err.println("--> Timed out waiting for cluster health");
}
}
client.admin().indices().prepareRefresh().execute().actionGet();
COUNT = client.prepareCount().setQuery(matchAllQuery()).execute().actionGet().getCount();
System.out.println("--> Number of docs in index: " + COUNT);
// load with the reverse options to make sure jit doesn't optimize one away
setMapping(ACCEPTABLE_OVERHEAD_RATIO, MEMORY_FORMAT.equals(IndexFieldData.CommonSettings.MemoryStorageFormat.PACKED) ? IndexFieldData.CommonSettings.MemoryStorageFormat.PAGED : IndexFieldData.CommonSettings.MemoryStorageFormat.PACKED);
warmUp("hist_l", "l_value", MATCH_PERCENTAGE);
setMapping(ACCEPTABLE_OVERHEAD_RATIO, MEMORY_FORMAT);
warmUp("hist_l", "l_value", MATCH_PERCENTAGE);
List<StatsResult> stats = Lists.newArrayList();
stats.add(measureAgg("hist_l", "l_value", MATCH_PERCENTAGE));
NodesStatsResponse nodeStats = client.admin().cluster().prepareNodesStats(nodes[0].settings().get("name")).clear()
.setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.FieldData)).get();
System.out.println("------------------ SUMMARY -------------------------------");
System.out.println("docs: " + COUNT);
System.out.println("match percentage: " + MATCH_PERCENTAGE);
System.out.println("memory format hint: " + MEMORY_FORMAT);
System.out.println("acceptable_overhead_ratio: " + ACCEPTABLE_OVERHEAD_RATIO);
System.out.println("field data: " + nodeStats.getNodes()[0].getIndices().getFieldData().getMemorySize());
System.out.format(Locale.ROOT, "%25s%10s%10s\n", "name", "took", "millis");
for (StatsResult stat : stats) {
System.out.format(Locale.ROOT, "%25s%10s%10d\n", stat.name, TimeValue.timeValueMillis(stat.took), (stat.took / QUERY_COUNT));
}
System.out.println("------------------ SUMMARY -------------------------------");
for (Node node : nodes) {
node.close();
}
}
protected static void setMapping(double acceptableOverheadRatio, IndexFieldData.CommonSettings.MemoryStorageFormat fielddataStorageFormat) throws IOException {
XContentBuilder mapping = JsonXContent.contentBuilder();
mapping.startObject().startObject("type1").startObject("properties").startObject("l_value")
.field("type", "long")
.startObject("fielddata")
.field("acceptable_transient_overhead_ratio", acceptableOverheadRatio)
.field("acceptable_overhead_ratio", acceptableOverheadRatio)
.field(IndexFieldData.CommonSettings.SETTING_MEMORY_STORAGE_HINT, fielddataStorageFormat.name().toLowerCase(Locale.ROOT))
.endObject()
.endObject().endObject().endObject().endObject();
client.admin().indices().preparePutMapping("test").setType("type1").setSource(mapping).get();
}
static class StatsResult {
final String name;
final long took;
StatsResult(String name, long took) {
this.name = name;
this.took = took;
}
}
private static SearchResponse doTermsAggsSearch(String name, String field, float matchPercentage) {
SearchResponse response = client.prepareSearch()
.setSearchType(SearchType.COUNT)
.setQuery(QueryBuilders.constantScoreQuery(FilterBuilders.scriptFilter("random()<matchP").addParam("matchP", matchPercentage).cache(true)))
.addAggregation(AggregationBuilders.histogram(name).field(field).interval(3600 * 1000)).get();
if (response.getHits().totalHits() < COUNT * matchPercentage * 0.7) {
System.err.println("--> warning - big deviation from expected count: " + response.getHits().totalHits() + " expected: " + COUNT * matchPercentage);
}
return response;
}
private static StatsResult measureAgg(String name, String field, float matchPercentage) {
long totalQueryTime;// LM VALUE
System.out.println("--> Running (" + name + ")...");
totalQueryTime = 0;
long previousCount = 0;
for (int j = 0; j < QUERY_COUNT; j++) {
SearchResponse searchResponse = doTermsAggsSearch(name, field, matchPercentage);
if (previousCount == 0) {
previousCount = searchResponse.getHits().getTotalHits();
} else if (searchResponse.getHits().totalHits() != previousCount) {
System.err.println("*** HIT COUNT CHANGE -> CACHE EXPIRED? ***");
}
totalQueryTime += searchResponse.getTookInMillis();
}
System.out.println("--> Histogram aggregations (" + field + "): " + (totalQueryTime / QUERY_COUNT) + "ms");
return new StatsResult(name, totalQueryTime);
}
private static void warmUp(String name, String field, float matchPercentage) {
System.out.println("--> Warmup (" + name + ")...");
client.admin().indices().prepareClearCache().setFieldDataCache(true).execute().actionGet();
// run just the child query, warm up first
for (int j = 0; j < QUERY_WARMUP; j++) {
SearchResponse searchResponse = doTermsAggsSearch(name, field, matchPercentage);
if (j == 0) {
System.out.println("--> Loading (" + field + "): took: " + searchResponse.getTook());
}
}
System.out.println("--> Warmup (" + name + ") DONE");
}
}

View File

@ -24,9 +24,12 @@ import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.*;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.fielddata.fieldcomparator.SortMode;
import org.junit.Test;
import java.util.Locale;
import static org.hamcrest.Matchers.equalTo;
/**
@ -35,6 +38,16 @@ public abstract class AbstractNumericFieldDataTests extends AbstractFieldDataImp
protected abstract FieldDataType getFieldDataType();
protected ImmutableSettings.Builder getFieldDataSettings() {
ImmutableSettings.Builder builder = ImmutableSettings.builder();
IndexFieldData.CommonSettings.MemoryStorageFormat[] formats = IndexFieldData.CommonSettings.MemoryStorageFormat.values();
int i = randomInt(formats.length);
if (i < formats.length) {
builder.put(IndexFieldData.CommonSettings.SETTING_MEMORY_STORAGE_HINT, formats[i].name().toLowerCase(Locale.ROOT));
}
return builder;
}
@Test
public void testSingleValueAllSetNumber() throws Exception {
fillSingleValueAllSet();
@ -118,7 +131,7 @@ public abstract class AbstractNumericFieldDataTests extends AbstractFieldDataImp
assertThat(1, equalTo(doubleValues.setDocument(2)));
assertThat(doubleValues.nextValue(), equalTo(3d));
IndexSearcher searcher = new IndexSearcher(readerContext.reader());
TopFieldDocs topDocs;
@ -236,7 +249,7 @@ public abstract class AbstractNumericFieldDataTests extends AbstractFieldDataImp
assertThat(1, equalTo(doubleValues.setDocument(2)));
assertThat(doubleValues.nextValue(), equalTo(3d));
}
@Test
@ -268,7 +281,7 @@ public abstract class AbstractNumericFieldDataTests extends AbstractFieldDataImp
assertThat(0, equalTo(doubleValues.setDocument(1)));
assertThat(0, equalTo(doubleValues.setDocument(2)));
}
}
protected void fillAllMissing() throws Exception {

View File

@ -31,7 +31,7 @@ public class DoubleFieldDataTests extends AbstractNumericFieldDataTests {
@Override
protected FieldDataType getFieldDataType() {
return new FieldDataType("double");
return new FieldDataType("double", getFieldDataSettings());
}
protected String one() {

View File

@ -30,7 +30,7 @@ public class FloatFieldDataTests extends AbstractNumericFieldDataTests {
@Override
protected FieldDataType getFieldDataType() {
return new FieldDataType("float");
return new FieldDataType("float", getFieldDataSettings());
}
protected String one() {

View File

@ -26,7 +26,6 @@ import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.Term;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.fielddata.plain.PackedArrayAtomicFieldData;
import org.elasticsearch.index.merge.Merges;
import org.joda.time.DateTimeZone;
@ -46,7 +45,7 @@ public class LongFieldDataTests extends AbstractNumericFieldDataTests {
@Override
protected FieldDataType getFieldDataType() {
// we don't want to optimize the type so it will always be a long...
return new FieldDataType("long", ImmutableSettings.builder());
return new FieldDataType("long", getFieldDataSettings());
}
protected void add2SingleValuedDocumentsAndDeleteOneOfThem() throws Exception {
@ -86,13 +85,13 @@ public class LongFieldDataTests extends AbstractNumericFieldDataTests {
private static long getFirst(LongValues values, int docId) {
final int numValues = values.setDocument(docId);
assertThat(numValues , is(1));
assertThat(numValues, is(1));
return values.nextValue();
}
private static double getFirst(DoubleValues values, int docId) {
final int numValues = values.setDocument(docId);
assertThat(numValues , is(1));
assertThat(numValues, is(1));
return values.nextValue();
}
@ -242,6 +241,7 @@ public class LongFieldDataTests extends AbstractNumericFieldDataTests {
public int numValues(Random r) {
return 1;
}
@Override
public long nextValue(Random r) {
return 1 + r.nextInt(16);
@ -251,6 +251,7 @@ public class LongFieldDataTests extends AbstractNumericFieldDataTests {
public int numValues(Random r) {
return 1;
}
@Override
public long nextValue(Random r) {
// somewhere in-between 2010 and 2012
@ -261,6 +262,7 @@ public class LongFieldDataTests extends AbstractNumericFieldDataTests {
public int numValues(Random r) {
return r.nextInt(3);
}
@Override
public long nextValue(Random r) {
// somewhere in-between 2010 and 2012
@ -271,6 +273,7 @@ public class LongFieldDataTests extends AbstractNumericFieldDataTests {
public int numValues(Random r) {
return r.nextInt(3);
}
@Override
public long nextValue(Random r) {
return 3 + r.nextInt(8);
@ -280,6 +283,7 @@ public class LongFieldDataTests extends AbstractNumericFieldDataTests {
public int numValues(Random r) {
return r.nextFloat() < 0.1f ? 1 : 0;
}
@Override
public long nextValue(Random r) {
return r.nextLong();
@ -289,6 +293,7 @@ public class LongFieldDataTests extends AbstractNumericFieldDataTests {
public int numValues(Random r) {
return r.nextFloat() < 0.1f ? 1 + r.nextInt(5) : 0;
}
@Override
public long nextValue(Random r) {
return r.nextLong();
@ -298,12 +303,15 @@ public class LongFieldDataTests extends AbstractNumericFieldDataTests {
public int numValues(Random r) {
return 1 + r.nextInt(3);
}
@Override
public long nextValue(Random r) {
return r.nextLong();
}
};
public abstract int numValues(Random r);
public abstract long nextValue(Random r);
}