mirror of
synced 2025-03-24 17:09:48 +00:00
Merge branch 'master' into index-lifecycle
This commit is contained in:
@ -505,8 +505,6 @@
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]settings[/\\]ClusterSettingsIT.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]shards[/\\]ClusterSearchShardsIT.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]cluster[/\\]structure[/\\]RoutingIteratorTests.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]blobstore[/\\]FsBlobStoreContainerTests.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]blobstore[/\\]FsBlobStoreTests.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]breaker[/\\]MemoryCircuitBreakerTests.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]geo[/\\]ShapeBuilderTests.java" checks="LineLength" />
<suppress files="server[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]common[/\\]hash[/\\]MessageDigestsTests.java" checks="LineLength" />
@ -40,94 +40,3 @@ better. For instance if a user searches for two words `foo` and `bar`, a match
across different chapters is probably very poor, while a match within the same
paragraph is likely good.
=== Avoid sparsity
The data-structures behind Lucene, which Elasticsearch relies on in order to
index and store data, work best with dense data, ie. when all documents have the
same fields. This is especially true for fields that have norms enabled (which
is the case for `text` fields by default) or doc values enabled (which is the
case for numerics, `date`, `ip` and `keyword` by default).
The reason is that Lucene internally identifies documents with so-called doc
ids, which are integers between 0 and the total number of documents in the
index. These doc ids are used for communication between the internal APIs of
Lucene: for instance searching on a term with a `match` query produces an
iterator of doc ids, and these doc ids are then used to retrieve the value of
the `norm` in order to compute a score for these documents. The way this `norm`
lookup is implemented currently is by reserving one byte for each document.
The `norm` value for a given doc id can then be retrieved by reading the
byte at index `doc_id`. While this is very efficient and helps Lucene quickly
have access to the `norm` values of every document, this has the drawback that
documents that do not have a value will also require one byte of storage.
In practice, this means that if an index has `M` documents, norms will require
`M` bytes of storage *per field*, even for fields that only appear in a small
fraction of the documents of the index. Although slightly more complex with doc
values due to the fact that doc values have multiple ways that they can be
encoded depending on the type of field and on the actual data that the field
stores, the problem is very similar. In case you wonder: `fielddata`, which was
used in Elasticsearch pre-2.0 before being replaced with doc values, also
suffered from this issue, except that the impact was only on the memory
footprint since `fielddata` was not explicitly materialized on disk.
Note that even though the most notable impact of sparsity is on storage
requirements, it also has an impact on indexing speed and search speed since
these bytes for documents that do not have a field still need to be written
at index time and skipped over at search time.
It is totally fine to have a minority of sparse fields in an index. But beware
that if sparsity becomes the rule rather than the exception, then the index
will not be as efficient as it could be.
This section mostly focused on `norms` and `doc values` because those are the
two features that are most affected by sparsity. Sparsity also affect the
efficiency of the inverted index (used to index `text`/`keyword` fields) and
dimensional points (used to index `geo_point` and numerics) but to a lesser
Here are some recommendations that can help avoid sparsity:
==== Avoid putting unrelated data in the same index
You should avoid putting documents that have totally different structures into
the same index in order to avoid sparsity. It is often better to put these
documents into different indices, you could also consider giving fewer shards
to these smaller indices since they will contain fewer documents overall.
Note that this advice does not apply in the case that you need to use
parent/child relations between your documents since this feature is only
supported on documents that live in the same index.
==== Normalize document structures
Even if you really need to put different kinds of documents in the same index,
maybe there are opportunities to reduce sparsity. For instance if all documents
in the index have a timestamp field but some call it `timestamp` and others
call it `creation_date`, it would help to rename it so that all documents have
the same field name for the same data.
==== Avoid types
Types might sound like a good way to store multiple tenants in a single index.
They are not: given that types store everything in a single index, having
multiple types that have different fields in a single index will also cause
problems due to sparsity as described above. If your types do not have very
similar mappings, you might want to consider moving them to a dedicated index.
==== Disable `norms` and `doc_values` on sparse fields
If none of the above recommendations apply in your case, you might want to
check whether you actually need `norms` and `doc_values` on your sparse fields.
`norms` can be disabled if producing scores is not necessary on a field, this is
typically true for fields that are only used for filtering. `doc_values` can be
disabled on fields that are neither used for sorting nor for aggregations.
Beware that this decision should not be made lightly since these parameters
cannot be changed on a live index, so you would have to reindex if you realize
that you need `norms` or `doc_values`.
@ -341,7 +341,18 @@ Which yields:
// TESTRESPONSE[s/"took": 12/"took" : $body.took/]
// TESTRESPONSE[s/OzrdjxNtQGaqs4DmioFw9A/$body.hits.hits.0._node/]
You might have noticed that a significant part of the script depends on
WARNING: While scripted similarities provide a lot of flexibility, there is
a set of rules that they need to satisfy. Failing to do so could make
Elasticsearch silently return wrong top hits or fail with internal errors at
search time:
- Returned scores must be positive.
- All other variables remaining equal, scores must not decrease when
`doc.freq` increases.
- All other variables remaining equal, scores must not increase when
`doc.length` increases.
You might have noticed that a significant part of the above script depends on
statistics that are the same for every document. It is possible to make the
above slightly more efficient by providing an `weight_script` which will
compute the document-independent part of the score and will be available
@ -506,7 +517,6 @@ GET /index/_search?explain=true
Type name: `scripted`
@ -135,6 +135,6 @@ PUT my_index/_doc/1
<1> The `my_float` field is added as a <<number,`double`>> field.
<1> The `my_float` field is added as a <<number,`float`>> field.
<2> The `my_integer` field is added as a <<number,`long`>> field.
@ -46,11 +46,22 @@ name as an existing template, it will replace the old version.
==== `match_mapping_type`
The `match_mapping_type` matches on the datatype detected by
<<dynamic-field-mapping,dynamic field mapping>>, in other words, the datatype
that Elasticsearch thinks the field should have. Only the following datatypes
can be automatically detected: `boolean`, `date`, `double`, `long`, `object`,
`string`. It also accepts `*` to match all datatypes.
The `match_mapping_type` is the datatype detected by the json parser. Since
JSON doesn't allow to distinguish a `long` from an `integer` or a `double` from
a `float`, it will always choose the wider datatype, ie. `long` for integers
and `double` for floating-point numbers.
The following datatypes may be automatically detected:
- `boolean` when `true` or `false` are encountered.
- `date` when <<date-detection,date detection>> is enabled and a string is
found that matches any of the configured date formats.
- `double` for numbers with a decimal part.
- `long` for numbers without a decimal part.
- `object` for objects, also called hashes.
- `string` for character strings.
`*` may also be used in order to match all datatypes.
For example, if we wanted to map all integer fields as `integer` instead of
`long`, and all `string` fields as both `text` and `keyword`, we
@ -233,26 +233,10 @@ states that:
* `news` must not be present
* `quick` and `brown` are optional -- their presence increases the relevance
The familiar operators `AND`, `OR` and `NOT` (also written `&&`, `||` and `!`)
are also supported. However, the effects of these operators can be more
complicated than is obvious at first glance. `NOT` takes precedence over
`AND`, which takes precedence over `OR`. While the `+` and `-` only affect
the term to the right of the operator, `AND` and `OR` can affect the terms to
the left and right.
Rewriting the above query using `AND`, `OR` and `NOT` demonstrates the
`quick OR brown AND fox AND NOT news`::
This is incorrect, because `brown` is now a required term.
`(quick OR brown) AND fox AND NOT news`::
This is incorrect because at least one of `quick` or `brown` is now required
and the search for those terms would be scored differently from the original
The familiar boolean operators `AND`, `OR` and `NOT` (also written `&&`, `||`
and `!`) are also supported but beware that they do not honor the usual
precedence rules, so parentheses should be used whenever multiple operators are
used together. For instance the previous query could be rewritten as:
`((quick AND fox) OR (brown AND fox) OR fox) AND NOT news`::
@ -270,7 +254,6 @@ would look like this:
===== Grouping
@ -47,7 +47,7 @@ public abstract class MultiValuesSource <VS extends ValuesSource> {
if (ordinal > names.length) {
throw new IndexOutOfBoundsException("ValuesSource array index " + ordinal + " out of bounds");
return multiValueMode.select(values[ordinal].doubleValues(ctx), Double.NEGATIVE_INFINITY);
return multiValueMode.select(values[ordinal].doubleValues(ctx));
@ -54,7 +54,7 @@ class DateMethodValueSource extends FieldDataValueSource {
public FunctionValues getValues(Map context, LeafReaderContext leaf) throws IOException {
AtomicNumericFieldData leafData = (AtomicNumericFieldData) fieldData.load(leaf);
final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT);
NumericDoubleValues docValues = multiValueMode.select(leafData.getDoubleValues(), 0d);
NumericDoubleValues docValues = multiValueMode.select(leafData.getDoubleValues());
return new DoubleDocValues(this) {
public double doubleVal(int docId) throws IOException {
@ -56,7 +56,7 @@ class DateObjectValueSource extends FieldDataValueSource {
public FunctionValues getValues(Map context, LeafReaderContext leaf) throws IOException {
AtomicNumericFieldData leafData = (AtomicNumericFieldData) fieldData.load(leaf);
MutableDateTime joda = new MutableDateTime(0, DateTimeZone.UTC);
NumericDoubleValues docValues = multiValueMode.select(leafData.getDoubleValues(), 0d);
NumericDoubleValues docValues = multiValueMode.select(leafData.getDoubleValues());
return new DoubleDocValues(this) {
public double doubleVal(int docId) throws IOException {
@ -68,7 +68,7 @@ class FieldDataValueSource extends ValueSource {
@SuppressWarnings("rawtypes") // ValueSource uses a rawtype
public FunctionValues getValues(Map context, LeafReaderContext leaf) throws IOException {
AtomicNumericFieldData leafData = (AtomicNumericFieldData) fieldData.load(leaf);
NumericDoubleValues docValues = multiValueMode.select(leafData.getDoubleValues(), 0d);
NumericDoubleValues docValues = multiValueMode.select(leafData.getDoubleValues());
return new DoubleDocValues(this) {
public double doubleVal(int doc) throws IOException {
@ -131,6 +131,7 @@ public class RatedRequestsTests extends ESTestCase {
public void testXContentParsingIsNotLenient() throws IOException {
RatedRequest testItem = createTestItem(randomBoolean());
XContentType xContentType = randomFrom(XContentType.values());
@ -37,17 +37,19 @@ final class Netty4SizeHeaderFrameDecoder extends ByteToMessageDecoder {
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
try {
BytesReference networkBytes = Netty4Utils.toBytesReference(in);
int messageLength = TcpTransport.readMessageLength(networkBytes) + HEADER_SIZE;
// If the message length is -1, we have not read a complete header. If the message length is
// greater than the network bytes available, we have not read a complete frame.
if (messageLength != -1 && messageLength <= networkBytes.length()) {
final ByteBuf message = in.skipBytes(HEADER_SIZE);
// 6 bytes would mean it is a ping. And we should ignore.
if (messageLength != 6) {
int messageLength = TcpTransport.readMessageLength(networkBytes);
// If the message length is -1, we have not read a complete header.
if (messageLength != -1) {
int messageLengthWithHeader = messageLength + HEADER_SIZE;
// If the message length is greater than the network bytes available, we have not read a complete frame.
if (messageLengthWithHeader <= networkBytes.length()) {
final ByteBuf message = in.skipBytes(HEADER_SIZE);
// 6 bytes would mean it is a ping. And we should ignore.
if (messageLengthWithHeader != 6) {
} catch (IllegalArgumentException ex) {
throw new TooLongFrameException(ex);
@ -63,7 +63,7 @@ public class ClusterRerouteResponse extends AcknowledgedResponse implements ToXC
public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
state = ClusterState.readFrom(in, null);
explanations = RoutingExplanations.readFrom(in);
@ -76,7 +76,7 @@ public class ClusterRerouteResponse extends AcknowledgedResponse implements ToXC
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
RoutingExplanations.writeTo(explanations, out);
@ -68,7 +68,7 @@ public class ClusterUpdateSettingsResponse extends AcknowledgedResponse {
public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
transientSettings = Settings.readSettingsFromStream(in);
persistentSettings = Settings.readSettingsFromStream(in);
@ -89,7 +89,7 @@ public class ClusterUpdateSettingsResponse extends AcknowledgedResponse {
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
Settings.writeSettingsToStream(transientSettings, out);
Settings.writeSettingsToStream(persistentSettings, out);
@ -115,7 +115,7 @@ public final class RolloverResponse extends ShardsAcknowledgedResponse implement
public void readFrom(StreamInput in) throws IOException {
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
oldIndex = in.readString();
newIndex = in.readString();
@ -144,7 +144,7 @@ public final class RolloverResponse extends ShardsAcknowledgedResponse implement
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
@ -74,6 +74,29 @@ public interface BlobContainer {
void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException;
* Reads blob content from the input stream and writes it to the container in a new blob with the given name,
* using an atomic write operation if the implementation supports it. When the BlobContainer implementation
* does not provide a specific implementation of writeBlobAtomic(String, InputStream, long), then
* the {@link #writeBlob(String, InputStream, long)} method is used.
* This method assumes the container does not already contain a blob of the same blobName. If a blob by the
* same name already exists, the operation will fail and an {@link IOException} will be thrown.
* @param blobName
* The name of the blob to write the contents of the input stream to.
* @param inputStream
* The input stream from which to retrieve the bytes to write to the blob.
* @param blobSize
* The size of the blob to be written, in bytes. It is implementation dependent whether
* this value is used in writing the blob to the repository.
* @throws FileAlreadyExistsException if a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
default void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
writeBlob(blobName, inputStream, blobSize);
* Deletes a blob with giving name, if the blob exists. If the blob does not exist,
* this method throws a NoSuchFileException.
@ -19,11 +19,12 @@
package org.elasticsearch.common.blobstore.fs;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.core.internal.io.Streams;
import java.io.BufferedInputStream;
@ -56,8 +57,9 @@ import static java.util.Collections.unmodifiableMap;
public class FsBlobContainer extends AbstractBlobContainer {
protected final FsBlobStore blobStore;
private static final String TEMP_FILE_PREFIX = "pending-";
protected final FsBlobStore blobStore;
protected final Path path;
public FsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path) {
@ -131,6 +133,48 @@ public class FsBlobContainer extends AbstractBlobContainer {
IOUtils.fsync(path, true);
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
final String tempBlob = tempBlobName(blobName);
final Path tempBlobPath = path.resolve(tempBlob);
try {
try (OutputStream outputStream = Files.newOutputStream(tempBlobPath, StandardOpenOption.CREATE_NEW)) {
Streams.copy(inputStream, outputStream);
IOUtils.fsync(tempBlobPath, false);
final Path blobPath = path.resolve(blobName);
// If the target file exists then Files.move() behaviour is implementation specific
// the existing file might be replaced or this method fails by throwing an IOException.
if (Files.exists(blobPath)) {
throw new FileAlreadyExistsException("blob [" + blobPath + "] already exists, cannot overwrite");
Files.move(tempBlobPath, blobPath, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException ex) {
try {
} catch (IOException e) {
throw ex;
} finally {
IOUtils.fsync(path, true);
public static String tempBlobName(final String blobName) {
return "pending-" + blobName + "-" + UUIDs.randomBase64UUID();
* Returns true if the blob is a leftover temporary blob.
* The temporary blobs might be left after failed atomic write operation.
public static boolean isTempBlobName(final String blobName) {
return blobName.startsWith(TEMP_FILE_PREFIX);
public void move(String source, String target) throws IOException {
Path sourcePath = path.resolve(source);
@ -64,6 +64,11 @@ public abstract class SingleObjectCache<T>{
return cached;
/** Return the potentially stale cached entry. */
protected final T getNoRefresh() {
return cached;
* Returns a new instance to cache
@ -291,38 +291,6 @@ public enum FieldData {
return DocValues.unwrapSingleton(values) == null;
* Returns whether the provided values *might* be multi-valued. There is no
* guarantee that this method will return {@code false} in the single-valued case.
public static boolean isMultiValued(SortedNumericDocValues values) {
return DocValues.unwrapSingleton(values) == null;
* Returns whether the provided values *might* be multi-valued. There is no
* guarantee that this method will return {@code false} in the single-valued case.
public static boolean isMultiValued(SortedNumericDoubleValues values) {
return unwrapSingleton(values) == null;
* Returns whether the provided values *might* be multi-valued. There is no
* guarantee that this method will return {@code false} in the single-valued case.
public static boolean isMultiValued(SortedBinaryDocValues values) {
return unwrapSingleton(values) != null;
* Returns whether the provided values *might* be multi-valued. There is no
* guarantee that this method will return {@code false} in the single-valued case.
public static boolean isMultiValued(MultiGeoPointValues values) {
return unwrapSingleton(values) == null;
* Return a {@link String} representation of the provided values. That is
* typically used for scripts or for the `map` execution mode of terms aggs.
@ -555,4 +523,63 @@ public enum FieldData {
* Return a {@link NumericDocValues} instance that has a value for every
* document, returns the same value as {@code values} if there is a value
* for the current document and {@code missing} otherwise.
public static NumericDocValues replaceMissing(NumericDocValues values, long missing) {
return new AbstractNumericDocValues() {
private long value;
public int docID() {
return values.docID();
public boolean advanceExact(int target) throws IOException {
if (values.advanceExact(target)) {
value = values.longValue();
} else {
value = missing;
return true;
public long longValue() throws IOException {
return value;
* Return a {@link NumericDoubleValues} instance that has a value for every
* document, returns the same value as {@code values} if there is a value
* for the current document and {@code missing} otherwise.
public static NumericDoubleValues replaceMissing(NumericDoubleValues values, double missing) {
return new NumericDoubleValues() {
private double value;
public boolean advanceExact(int target) throws IOException {
if (values.advanceExact(target)) {
value = values.doubleValue();
} else {
value = missing;
return true;
public double doubleValue() throws IOException {
return value;
@ -27,6 +27,7 @@ import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.SortField;
import org.apache.lucene.util.BitSet;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.fielddata.NumericDoubleValues;
@ -71,7 +72,7 @@ public class DoubleValuesComparatorSource extends IndexFieldData.XFieldComparato
final SortedNumericDoubleValues values = getValues(context);
final NumericDoubleValues selectedValues;
if (nested == null) {
selectedValues = sortMode.select(values, dMissingValue);
selectedValues = FieldData.replaceMissing(sortMode.select(values), dMissingValue);
} else {
final BitSet rootDocs = nested.rootDocs(context);
final DocIdSetIterator innerDocs = nested.innerDocs(context);
@ -25,6 +25,7 @@ import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.SortField;
import org.apache.lucene.util.BitSet;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.fielddata.NumericDoubleValues;
@ -63,7 +64,7 @@ public class FloatValuesComparatorSource extends IndexFieldData.XFieldComparator
final SortedNumericDoubleValues values = indexFieldData.load(context).getDoubleValues();
final NumericDoubleValues selectedValues;
if (nested == null) {
selectedValues = sortMode.select(values, dMissingValue);
selectedValues = FieldData.replaceMissing(sortMode.select(values), dMissingValue);
} else {
final BitSet rootDocs = nested.rootDocs(context);
final DocIdSetIterator innerDocs = nested.innerDocs(context);
@ -26,6 +26,7 @@ import org.apache.lucene.search.FieldComparator;
import org.apache.lucene.search.SortField;
import org.apache.lucene.util.BitSet;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.search.MultiValueMode;
@ -62,7 +63,7 @@ public class LongValuesComparatorSource extends IndexFieldData.XFieldComparatorS
final SortedNumericDocValues values = indexFieldData.load(context).getLongValues();
final NumericDocValues selectedValues;
if (nested == null) {
selectedValues = sortMode.select(values, dMissingValue);
selectedValues = FieldData.replaceMissing(sortMode.select(values), dMissingValue);
} else {
final BitSet rootDocs = nested.rootDocs(context);
final DocIdSetIterator innerDocs = nested.innerDocs(context);
@ -40,6 +40,8 @@ import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.plain.BytesBinaryDVIndexFieldData;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.query.QueryShardException;
import org.elasticsearch.search.DocValueFormat;
import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.util.Base64;
@ -104,6 +106,10 @@ public class BinaryFieldMapper extends FieldMapper {
public DocValueFormat docValueFormat(String format, DateTimeZone timeZone) {
return DocValueFormat.BINARY;
public BytesReference valueForDisplay(Object value) {
@ -40,6 +40,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.IndexGeoPointFieldData;
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
import org.elasticsearch.index.fielddata.MultiGeoPointValues;
@ -354,7 +355,7 @@ public abstract class DecayFunctionBuilder<DFB extends DecayFunctionBuilder<DFB>
protected NumericDoubleValues distance(LeafReaderContext context) {
final MultiGeoPointValues geoPointValues = fieldData.load(context).getGeoPointValues();
return mode.select(new SortingNumericDoubleValues() {
return FieldData.replaceMissing(mode.select(new SortingNumericDoubleValues() {
public boolean advanceExact(int docId) throws IOException {
if (geoPointValues.advanceExact(docId)) {
@ -372,7 +373,7 @@ public abstract class DecayFunctionBuilder<DFB extends DecayFunctionBuilder<DFB>
return false;
}, 0.0);
}), 0);
@ -436,7 +437,7 @@ public abstract class DecayFunctionBuilder<DFB extends DecayFunctionBuilder<DFB>
protected NumericDoubleValues distance(LeafReaderContext context) {
final SortedNumericDoubleValues doubleValues = fieldData.load(context).getDoubleValues();
return mode.select(new SortingNumericDoubleValues() {
return FieldData.replaceMissing(mode.select(new SortingNumericDoubleValues() {
public boolean advanceExact(int docId) throws IOException {
if (doubleValues.advanceExact(docId)) {
@ -451,7 +452,7 @@ public abstract class DecayFunctionBuilder<DFB extends DecayFunctionBuilder<DFB>
return false;
}, 0.0);
}), 0);
@ -0,0 +1,183 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.index.store;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.lucene.store.FilterIndexOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.SingleObjectCache;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.AccessDeniedException;
import java.nio.file.NoSuchFileException;
final class ByteSizeCachingDirectory extends FilterDirectory {
private static class SizeAndModCount {
final long size;
final long modCount;
final boolean pendingWrite;
SizeAndModCount(long length, long modCount, boolean pendingWrite) {
this.size = length;
this.modCount = modCount;
this.pendingWrite = pendingWrite;
private static long estimateSizeInBytes(Directory directory) throws IOException {
long estimatedSize = 0;
String[] files = directory.listAll();
for (String file : files) {
try {
estimatedSize += directory.fileLength(file);
} catch (NoSuchFileException | FileNotFoundException | AccessDeniedException e) {
// ignore, the file is not there no more; on Windows, if one thread concurrently deletes a file while
// calling Files.size, you can also sometimes hit AccessDeniedException
return estimatedSize;
private final SingleObjectCache<SizeAndModCount> size;
// Both these variables need to be accessed under `this` lock.
private long modCount = 0;
private long numOpenOutputs = 0;
ByteSizeCachingDirectory(Directory in, TimeValue refreshInterval) {
size = new SingleObjectCache<SizeAndModCount>(refreshInterval, new SizeAndModCount(0L, -1L, true)) {
protected SizeAndModCount refresh() {
// It is ok for the size of the directory to be more recent than
// the mod count, we would just recompute the size of the
// directory on the next call as well. However the opposite
// would be bad as we would potentially have a stale cache
// entry for a long time. So we fetch the values of modCount and
// numOpenOutputs BEFORE computing the size of the directory.
final long modCount;
final boolean pendingWrite;
synchronized(ByteSizeCachingDirectory.this) {
modCount = ByteSizeCachingDirectory.this.modCount;
pendingWrite = ByteSizeCachingDirectory.this.numOpenOutputs != 0;
final long size;
try {
// Compute this OUTSIDE of the lock
size = estimateSizeInBytes(getDelegate());
} catch (IOException e) {
throw new UncheckedIOException(e);
return new SizeAndModCount(size, modCount, pendingWrite);
protected boolean needsRefresh() {
if (super.needsRefresh() == false) {
// The size was computed recently, don't recompute
return false;
SizeAndModCount cached = getNoRefresh();
if (cached.pendingWrite) {
// The cached entry was generated while there were pending
// writes, so the size might be stale: recompute.
return true;
synchronized(ByteSizeCachingDirectory.this) {
// If there are pending writes or if new files have been
// written/deleted since last time: recompute
return numOpenOutputs != 0 || cached.modCount != modCount;
/** Return the cumulative size of all files in this directory. */
long estimateSizeInBytes() throws IOException {
try {
return size.getOrRefresh().size;
} catch (UncheckedIOException e) {
// we wrapped in the cache and unwrap here
throw e.getCause();
public IndexOutput createOutput(String name, IOContext context) throws IOException {
return wrapIndexOutput(super.createOutput(name, context));
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) throws IOException {
return wrapIndexOutput(super.createTempOutput(prefix, suffix, context));
private IndexOutput wrapIndexOutput(IndexOutput out) {
synchronized (this) {
return new FilterIndexOutput(out.toString(), out) {
public void writeBytes(byte[] b, int length) throws IOException {
// Don't write to atomicXXX here since it might be called in
// tight loops and memory barriers are costly
super.writeBytes(b, length);
public void writeByte(byte b) throws IOException {
// Don't write to atomicXXX here since it might be called in
// tight loops and memory barriers are costly
public void close() throws IOException {
// Close might cause some data to be flushed from in-memory buffers, so
// increment the modification counter too.
try {
} finally {
synchronized (this) {
public void deleteFile(String name) throws IOException {
try {
} finally {
synchronized (this) {
@ -50,7 +50,6 @@ import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefBuilder;
import org.apache.lucene.util.Version;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
@ -67,7 +66,6 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.SingleObjectCache;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.RefCounted;
import org.elasticsearch.common.util.iterable.Iterables;
@ -91,7 +89,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.nio.file.AccessDeniedException;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.util.ArrayList;
@ -146,7 +143,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
private final ReentrantReadWriteLock metadataLock = new ReentrantReadWriteLock();
private final ShardLock shardLock;
private final OnClose onClose;
private final SingleObjectCache<StoreStats> statsCache;
private final AbstractRefCounted refCounter = new AbstractRefCounted("store") {
@ -164,12 +160,13 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
OnClose onClose) throws IOException {
super(shardId, indexSettings);
final Settings settings = indexSettings.getSettings();
this.directory = new StoreDirectory(directoryService.newDirectory(), Loggers.getLogger("index.store.deletes", settings, shardId));
Directory dir = directoryService.newDirectory();
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(dir, refreshInterval);
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", settings, shardId));
this.shardLock = shardLock;
this.onClose = onClose;
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
this.statsCache = new StoreStatsCache(refreshInterval, directory);
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
assert onClose != null;
assert shardLock != null;
@ -377,7 +374,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
public StoreStats stats() throws IOException {
return statsCache.getOrRefresh();
return new StoreStats(directory.estimateSize());
@ -731,11 +728,16 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
private final Logger deletesLogger;
StoreDirectory(Directory delegateDirectory, Logger deletesLogger) {
StoreDirectory(ByteSizeCachingDirectory delegateDirectory, Logger deletesLogger) {
this.deletesLogger = deletesLogger;
/** Estimate the cumulative size of all files in this directory in bytes. */
long estimateSize() throws IOException {
return ((ByteSizeCachingDirectory) getDelegate()).estimateSizeInBytes();
public void close() {
assert false : "Nobody should close this directory except of the Store itself";
@ -1428,38 +1430,6 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
private static class StoreStatsCache extends SingleObjectCache<StoreStats> {
private final Directory directory;
StoreStatsCache(TimeValue refreshInterval, Directory directory) throws IOException {
super(refreshInterval, new StoreStats(estimateSize(directory)));
this.directory = directory;
protected StoreStats refresh() {
try {
return new StoreStats(estimateSize(directory));
} catch (IOException ex) {
throw new ElasticsearchException("failed to refresh store stats", ex);
private static long estimateSize(Directory directory) throws IOException {
long estimatedSize = 0;
String[] files = directory.listAll();
for (String file : files) {
try {
estimatedSize += directory.fileLength(file);
} catch (NoSuchFileException | FileNotFoundException | AccessDeniedException e) {
// ignore, the file is not there no more; on Windows, if one thread concurrently deletes a file while
// calling Files.size, you can also sometimes hit AccessDeniedException
return estimatedSize;
* creates an empty lucene index and a corresponding empty translog. Any existing data will be deleted.
@ -19,7 +19,6 @@
package org.elasticsearch.indices.flush;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.Assertions;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
@ -502,18 +501,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
if (indexShard.routingEntry().primary() == false) {
throw new IllegalStateException("[" + request.shardId() +"] expected a primary shard");
if (Assertions.ENABLED) {
if (logger.isTraceEnabled()) {
logger.trace("in flight operations {}, acquirers {}", indexShard.getActiveOperationsCount(), indexShard.getActiveOperations());
int opCount = indexShard.getActiveOperationsCount();
// Need to snapshot the debug info twice as it's updated concurrently with the permit count.
if (Assertions.ENABLED) {
if (logger.isTraceEnabled()) {
logger.trace("in flight operations {}, acquirers {}", indexShard.getActiveOperationsCount(), indexShard.getActiveOperations());
return new InFlightOpsResponse(opCount);
@ -50,6 +50,7 @@ import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
@ -555,10 +556,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
String blobName = "master.dat";
BytesArray bytes = new BytesArray(testBytes);
try (InputStream stream = bytes.streamInput()) {
testContainer.writeBlob(blobName + "-temp", stream, bytes.length());
testContainer.writeBlobAtomic(blobName, stream, bytes.length());
// Make sure that move is supported
testContainer.move(blobName + "-temp", blobName);
return seed;
} catch (IOException exp) {
@ -774,18 +773,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private void writeAtomic(final String blobName, final BytesReference bytesRef) throws IOException {
final String tempBlobName = "pending-" + blobName + "-" + UUIDs.randomBase64UUID();
try (InputStream stream = bytesRef.streamInput()) {
snapshotsBlobContainer.writeBlob(tempBlobName, stream, bytesRef.length());
snapshotsBlobContainer.move(tempBlobName, blobName);
} catch (IOException ex) {
// temporary blob creation or move failed - try cleaning up
try {
} catch (IOException e) {
throw ex;
snapshotsBlobContainer.writeBlobAtomic(blobName, stream, bytesRef.length());
@ -955,7 +944,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// Delete temporary index files first, as we might otherwise fail in the next step creating the new index file if an earlier
// attempt to write an index file with this generation failed mid-way after creating the temporary file.
for (final String blobName : blobs.keySet()) {
if (indexShardSnapshotsFormat.isTempBlobName(blobName)) {
if (FsBlobContainer.isTempBlobName(blobName)) {
try {
} catch (IOException e) {
@ -23,6 +23,7 @@ import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.bytes.BytesArray;
@ -52,8 +53,6 @@ import java.util.Locale;
public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreFormat<T> {
private static final String TEMP_FILE_PREFIX = "pending-";
private static final XContentType DEFAULT_X_CONTENT_TYPE = XContentType.SMILE;
// The format version
@ -120,7 +119,7 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm
* Writes blob in atomic manner with resolving the blob name using {@link #blobName} and {@link #tempBlobName} methods.
* Writes blob in atomic manner with resolving the blob name using {@link #blobName} method.
* <p>
* The blob will be compressed and checksum will be written if required.
@ -131,20 +130,12 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm
* @param name blob name
public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws IOException {
String blobName = blobName(name);
String tempBlobName = tempBlobName(name);
writeBlob(obj, blobContainer, tempBlobName);
try {
blobContainer.move(tempBlobName, blobName);
} catch (IOException ex) {
// Move failed - try cleaning up
try {
} catch (Exception e) {
final String blobName = blobName(name);
writeTo(obj, blobName, bytesArray -> {
try (InputStream stream = bytesArray.streamInput()) {
blobContainer.writeBlobAtomic(blobName, stream, bytesArray.length());
throw ex;
@ -157,51 +148,35 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm
* @param name blob name
public void write(T obj, BlobContainer blobContainer, String name) throws IOException {
String blobName = blobName(name);
writeBlob(obj, blobContainer, blobName);
final String blobName = blobName(name);
writeTo(obj, blobName, bytesArray -> {
try (InputStream stream = bytesArray.streamInput()) {
blobContainer.writeBlob(blobName, stream, bytesArray.length());
* Writes blob in atomic manner without resolving the blobName using using {@link #blobName} method.
* <p>
* The blob will be compressed and checksum will be written if required.
* @param obj object to be serialized
* @param blobContainer blob container
* @param blobName blob name
protected void writeBlob(T obj, BlobContainer blobContainer, String blobName) throws IOException {
BytesReference bytes = write(obj);
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
private void writeTo(final T obj, final String blobName, final CheckedConsumer<BytesArray, IOException> consumer) throws IOException {
final BytesReference bytes = write(obj);
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")";
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, byteArrayOutputStream, BUFFER_SIZE)) {
try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, outputStream, BUFFER_SIZE)) {
CodecUtil.writeHeader(indexOutput, codec, VERSION);
try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
public void close() throws IOException {
// this is important since some of the XContentBuilders write bytes on close.
// in order to write the footer we need to prevent closing the actual index input.
} }) {
}) {
BytesArray bytesArray = new BytesArray(byteArrayOutputStream.toByteArray());
try (InputStream stream = bytesArray.streamInput()) {
blobContainer.writeBlob(blobName, stream, bytesArray.length());
consumer.accept(new BytesArray(outputStream.toByteArray()));
* Returns true if the blob is a leftover temporary blob.
* The temporary blobs might be left after failed atomic write operation.
public boolean isTempBlobName(String blobName) {
return blobName.startsWith(ChecksumBlobStoreFormat.TEMP_FILE_PREFIX);
protected BytesReference write(T obj) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
if (compress) {
@ -222,10 +197,4 @@ public class ChecksumBlobStoreFormat<T extends ToXContent> extends BlobStoreForm
protected String tempBlobName(String name) {
return TEMP_FILE_PREFIX + String.format(Locale.ROOT, blobNameFormat, name);
@ -39,6 +39,7 @@ import java.text.DecimalFormatSymbols;
import java.text.NumberFormat;
import java.text.ParseException;
import java.util.Arrays;
import java.util.Base64;
import java.util.Locale;
import java.util.Objects;
import java.util.function.LongSupplier;
@ -121,6 +122,50 @@ public interface DocValueFormat extends NamedWriteable {
DocValueFormat BINARY = new DocValueFormat() {
public String getWriteableName() {
return "binary";
public void writeTo(StreamOutput out) throws IOException {
public Object format(long value) {
throw new UnsupportedOperationException();
public Object format(double value) {
throw new UnsupportedOperationException();
public String format(BytesRef value) {
return Base64.getEncoder()
.encodeToString(Arrays.copyOfRange(value.bytes, value.offset, value.offset + value.length));
public long parseLong(String value, boolean roundUp, LongSupplier now) {
throw new UnsupportedOperationException();
public double parseDouble(String value, boolean roundUp, LongSupplier now) {
throw new UnsupportedOperationException();
public BytesRef parseBytesRef(String value) {
return new BytesRef(Base64.getDecoder().decode(value));
final class DateTime implements DocValueFormat {
public static final String NAME = "date_time";
@ -411,29 +411,10 @@ public enum MultiValueMode implements Writeable {
* Allowed Modes: SUM, AVG, MEDIAN, MIN, MAX
public NumericDocValues select(final SortedNumericDocValues values, final long missingValue) {
public NumericDocValues select(final SortedNumericDocValues values) {
final NumericDocValues singleton = DocValues.unwrapSingleton(values);
if (singleton != null) {
return new AbstractNumericDocValues() {
private long value;
public boolean advanceExact(int target) throws IOException {
this.value = singleton.advanceExact(target) ? singleton.longValue() : missingValue;
return true;
public int docID() {
return singleton.docID();
public long longValue() throws IOException {
return this.value;
return singleton;
} else {
return new AbstractNumericDocValues() {
@ -441,8 +422,11 @@ public enum MultiValueMode implements Writeable {
public boolean advanceExact(int target) throws IOException {
this.value = values.advanceExact(target) ? pick(values) : missingValue;
return true;
if (values.advanceExact(target)) {
value = pick(values);
return true;
return false;
@ -476,7 +460,7 @@ public enum MultiValueMode implements Writeable {
public NumericDocValues select(final SortedNumericDocValues values, final long missingValue, final BitSet parentDocs, final DocIdSetIterator childDocs, int maxDoc) throws IOException {
if (parentDocs == null || childDocs == null) {
return select(DocValues.emptySortedNumeric(maxDoc), missingValue);
return FieldData.replaceMissing(DocValues.emptyNumeric(), missingValue);
return new AbstractNumericDocValues() {
@ -529,23 +513,10 @@ public enum MultiValueMode implements Writeable {
* Allowed Modes: SUM, AVG, MEDIAN, MIN, MAX
public NumericDoubleValues select(final SortedNumericDoubleValues values, final double missingValue) {
public NumericDoubleValues select(final SortedNumericDoubleValues values) {
final NumericDoubleValues singleton = FieldData.unwrapSingleton(values);
if (singleton != null) {
return new NumericDoubleValues() {
private double value;
public boolean advanceExact(int doc) throws IOException {
this.value = singleton.advanceExact(doc) ? singleton.doubleValue() : missingValue;
return true;
public double doubleValue() throws IOException {
return this.value;
return singleton;
} else {
return new NumericDoubleValues() {
@ -553,8 +524,11 @@ public enum MultiValueMode implements Writeable {
public boolean advanceExact(int target) throws IOException {
value = values.advanceExact(target) ? pick(values) : missingValue;
return true;
if (values.advanceExact(target)) {
value = pick(values);
return true;
return false;
@ -583,7 +557,7 @@ public enum MultiValueMode implements Writeable {
public NumericDoubleValues select(final SortedNumericDoubleValues values, final double missingValue, final BitSet parentDocs, final DocIdSetIterator childDocs, int maxDoc) throws IOException {
if (parentDocs == null || childDocs == null) {
return select(FieldData.emptySortedNumericDoubles(), missingValue);
return FieldData.replaceMissing(FieldData.emptyNumericDouble(), missingValue);
return new NumericDoubleValues() {
@ -645,6 +645,7 @@ public class SearchModule {
registerValueFormat(DocValueFormat.GEOHASH.getWriteableName(), in -> DocValueFormat.GEOHASH);
registerValueFormat(DocValueFormat.IP.getWriteableName(), in -> DocValueFormat.IP);
registerValueFormat(DocValueFormat.RAW.getWriteableName(), in -> DocValueFormat.RAW);
registerValueFormat(DocValueFormat.BINARY.getWriteableName(), in -> DocValueFormat.BINARY);
@ -72,7 +72,7 @@ public class MaxAggregator extends NumericMetricsAggregator.SingleValue {
final BigArrays bigArrays = context.bigArrays();
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
final NumericDoubleValues values = MultiValueMode.MAX.select(allValues, Double.NEGATIVE_INFINITY);
final NumericDoubleValues values = MultiValueMode.MAX.select(allValues);
return new LeafBucketCollectorBase(sub, allValues) {
@ -71,7 +71,7 @@ public class MinAggregator extends NumericMetricsAggregator.SingleValue {
final BigArrays bigArrays = context.bigArrays();
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
final NumericDoubleValues values = MultiValueMode.MIN.select(allValues, Double.POSITIVE_INFINITY);
final NumericDoubleValues values = MultiValueMode.MIN.select(allValues);
return new LeafBucketCollectorBase(sub, allValues) {
@ -41,6 +41,7 @@ import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource.Nested;
import org.elasticsearch.index.fielddata.IndexGeoPointFieldData;
@ -637,7 +638,7 @@ public class GeoDistanceSortBuilder extends SortBuilder<GeoDistanceSortBuilder>
final NumericDoubleValues selectedValues;
if (nested == null) {
selectedValues = finalSortMode.select(distanceValues, Double.POSITIVE_INFINITY);
selectedValues = FieldData.replaceMissing(finalSortMode.select(distanceValues), Double.POSITIVE_INFINITY);
} else {
final BitSet rootDocs = nested.rootDocs(context);
final DocIdSetIterator innerDocs = nested.innerDocs(context);
@ -102,6 +102,6 @@ public class ClusterUpdateSettingsResponseTests extends AbstractStreamableXConte
public void testOldSerialisation() throws IOException {
ClusterUpdateSettingsResponse original = createTestInstance();
assertSerialization(original, VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, Version.V_7_0_0_alpha1));
assertSerialization(original, VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, Version.V_6_4_0));
@ -19,6 +19,8 @@
package org.elasticsearch.action.admin.indices.rollover;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import org.elasticsearch.Version;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
@ -132,6 +134,6 @@ public class RolloverResponseTests extends AbstractStreamableXContentTestCase<Ro
public void testOldSerialisation() throws IOException {
RolloverResponse original = createTestInstance();
assertSerialization(original, VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, Version.V_7_0_0_alpha1));
assertSerialization(original, VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, Version.V_6_4_0));
@ -0,0 +1,40 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.common.blobstore.fs;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
public class FsBlobContainerTests extends ESTestCase {
public void testTempBlobName() {
final String blobName = randomAlphaOfLengthBetween(1, 20);
final String tempBlobName = FsBlobContainer.tempBlobName(blobName);
assertThat(tempBlobName, startsWith("pending-"));
assertThat(tempBlobName, containsString(blobName));
public void testIsTempBlobName() {
final String tempBlobName = FsBlobContainer.tempBlobName(randomAlphaOfLengthBetween(1, 20));
assertThat(FsBlobContainer.isTempBlobName(tempBlobName), is(true));
@ -16,23 +16,27 @@
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.common.blobstore;
package org.elasticsearch.common.blobstore.fs;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.repositories.ESBlobStoreContainerTestCase;
import java.io.IOException;
import java.nio.file.Path;
public class FsBlobStoreContainerTests extends ESBlobStoreContainerTestCase {
protected BlobStore newBlobStore() throws IOException {
Path tempDir = createTempDir();
Settings settings = randomBoolean() ? Settings.EMPTY : Settings.builder().put("buffer_size", new ByteSizeValue(randomIntBetween(1, 100), ByteSizeUnit.KB)).build();
return new FsBlobStore(settings, tempDir);
final Settings settings;
if (randomBoolean()) {
settings = Settings.builder().put("buffer_size", new ByteSizeValue(randomIntBetween(1, 100), ByteSizeUnit.KB)).build();
} else {
settings = Settings.EMPTY;
return new FsBlobStore(settings, createTempDir());
@ -16,10 +16,12 @@
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.common.blobstore;
package org.elasticsearch.common.blobstore.fs;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.common.blobstore.fs.FsBlobStore;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
@ -32,10 +34,15 @@ import java.nio.file.Path;
public class FsBlobStoreTests extends ESBlobStoreTestCase {
protected BlobStore newBlobStore() throws IOException {
Path tempDir = createTempDir();
Settings settings = randomBoolean() ? Settings.EMPTY : Settings.builder().put("buffer_size", new ByteSizeValue(randomIntBetween(1, 100), ByteSizeUnit.KB)).build();
return new FsBlobStore(settings, tempDir);
final Settings settings;
if (randomBoolean()) {
settings = Settings.builder().put("buffer_size", new ByteSizeValue(randomIntBetween(1, 100), ByteSizeUnit.KB)).build();
} else {
settings = Settings.EMPTY;
return new FsBlobStore(settings, createTempDir());
public void testReadOnly() throws Exception {
@ -137,4 +137,91 @@ public class FieldDataTests extends ESTestCase {
assertEquals(valueBits, asMultiLongs.nextValue());
assertSame(multiValues, FieldData.sortableLongBitsToDoubles(asMultiLongs));
private static NumericDocValues asNumericDocValues(Long... values) {
return new AbstractNumericDocValues() {
int docID = -1;
public int docID() {
return docID;
public boolean advanceExact(int target) throws IOException {
docID = target;
return target < values.length && values[target] != null;
public long longValue() throws IOException {
return values[docID];
public void testReplaceMissingLongs() throws IOException {
final NumericDocValues values = asNumericDocValues(null, 3L, 2L, null, 5L, null);
final NumericDocValues replaced = FieldData.replaceMissing(values, 4);
assertEquals(4L, replaced.longValue());
assertEquals(3L, replaced.longValue());
assertEquals(2L, replaced.longValue());
assertEquals(4L, replaced.longValue());
assertEquals(5L, replaced.longValue());
assertEquals(4L, replaced.longValue());
private static NumericDoubleValues asNumericDoubleValues(Double... values) {
return new NumericDoubleValues() {
int docID = -1;
public boolean advanceExact(int target) throws IOException {
docID = target;
return target < values.length && values[target] != null;
public double doubleValue() throws IOException {
return values[docID];
public void testReplaceMissingDoubles() throws IOException {
final NumericDoubleValues values = asNumericDoubleValues(null, 1.3, 1.2, null, 1.5, null);
final NumericDoubleValues replaced = FieldData.replaceMissing(values, 1.4);
assertEquals(1.4, replaced.doubleValue(), 0d);
assertEquals(1.3, replaced.doubleValue(), 0d);
assertEquals(1.2, replaced.doubleValue(), 0d);
assertEquals(1.4, replaced.doubleValue(), 0d);
assertEquals(1.5, replaced.doubleValue(), 0d);
assertEquals(1.4, replaced.doubleValue(), 0d);
@ -18,6 +18,7 @@
package org.elasticsearch.index.fielddata.ordinals;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.SortedDocValues;
import org.apache.lucene.index.SortedSetDocValues;
import org.apache.lucene.util.packed.PackedInts;
@ -261,7 +262,7 @@ public class MultiOrdinalsTests extends ESTestCase {
assertThat(docs.getValueCount(), equalTo(maxOrd));
assertThat(FieldData.isMultiValued(docs), equalTo(true));
for (int doc = 0; doc < ordinalPlan.length; ++doc) {
long[] ords = ordinalPlan[doc];
assertEquals(ords.length > 0, docs.advanceExact(doc));
@ -119,6 +119,7 @@ public class MatchPhrasePrefixQueryBuilderTests extends AbstractQueryTestCase<Ma
public void testPhraseOnFieldWithNoTerms() {
assumeTrue("test runs only when at least a type is registered", getCurrentTypes().length > 0);
MatchPhrasePrefixQueryBuilder matchQuery = new MatchPhrasePrefixQueryBuilder(DATE_FIELD_NAME, "three term phrase");
expectThrows(IllegalArgumentException.class, () -> matchQuery.doToQuery(createShardContext()));
@ -0,0 +1,102 @@
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.elasticsearch.index.store;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
public class ByteSizeCachingDirectoryTests extends ESTestCase {
private static class LengthCountingDirectory extends FilterDirectory {
int numFileLengthCalls;
LengthCountingDirectory(Directory in) {
public long fileLength(String name) throws IOException {
return super.fileLength(name);
public void testBasics() throws IOException {
try (Directory dir = newDirectory()) {
try (IndexOutput out = dir.createOutput("quux", IOContext.DEFAULT)) {
out.writeBytes(new byte[11], 11);
LengthCountingDirectory countingDir = new LengthCountingDirectory(dir);
ByteSizeCachingDirectory cachingDir = new ByteSizeCachingDirectory(countingDir, new TimeValue(0));
assertEquals(11, cachingDir.estimateSizeInBytes());
assertEquals(11, cachingDir.estimateSizeInBytes());
assertEquals(1, countingDir.numFileLengthCalls);
try (IndexOutput out = cachingDir.createOutput("foo", IOContext.DEFAULT)) {
out.writeBytes(new byte[5], 5);
// +2 because there are 3 files
assertEquals(3, countingDir.numFileLengthCalls);
// An index output is open so no caching
assertEquals(5, countingDir.numFileLengthCalls);
assertEquals(16, cachingDir.estimateSizeInBytes());
assertEquals(7, countingDir.numFileLengthCalls);
assertEquals(16, cachingDir.estimateSizeInBytes());
assertEquals(7, countingDir.numFileLengthCalls);
try (IndexOutput out = cachingDir.createTempOutput("bar", "baz", IOContext.DEFAULT)) {
out.writeBytes(new byte[4], 4);
assertEquals(10, countingDir.numFileLengthCalls);
// An index output is open so no caching
assertEquals(13, countingDir.numFileLengthCalls);
assertEquals(20, cachingDir.estimateSizeInBytes());
// +3 because there are 3 files
assertEquals(16, countingDir.numFileLengthCalls);
assertEquals(20, cachingDir.estimateSizeInBytes());
assertEquals(16, countingDir.numFileLengthCalls);
assertEquals(15, cachingDir.estimateSizeInBytes());
// +2 because there are 2 files now
assertEquals(18, countingDir.numFileLengthCalls);
assertEquals(15, cachingDir.estimateSizeInBytes());
assertEquals(18, countingDir.numFileLengthCalls);
@ -46,16 +46,13 @@ import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@ -103,7 +100,7 @@ public class FlushIT extends ESIntegTestCase {
public void testSyncedFlush() throws ExecutionException, InterruptedException, IOException {
public void testSyncedFlush() throws Exception {
prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)).get();
@ -246,16 +243,6 @@ public class FlushIT extends ESIntegTestCase {
assertThat(indexResult.getFailure(), nullValue());
private String syncedFlushDescription(ShardsSyncedFlushResult result) {
String detail = result.shardResponses().entrySet().stream()
.map(e -> "Shard [" + e.getKey() + "], result [" + e.getValue() + "]")
return String.format(Locale.ROOT, "Total shards: [%d], failed: [%s], reason: [%s], detail: [%s]",
result.totalShards(), result.failed(), result.failureReason(), detail);
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29392")
public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
@ -281,7 +268,6 @@ public class FlushIT extends ESIntegTestCase {
indexDoc(IndexShardTestCase.getEngine(outOfSyncReplica), "extra_" + i);
final ShardsSyncedFlushResult partialResult = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
logger.info("Partial seal: {}", syncedFlushDescription(partialResult));
assertThat(partialResult.totalShards(), equalTo(numberOfReplicas + 1));
assertThat(partialResult.successfulShards(), equalTo(numberOfReplicas));
assertThat(partialResult.shardResponses().get(outOfSyncReplica.routingEntry()).failureReason, equalTo(
@ -297,8 +283,6 @@ public class FlushIT extends ESIntegTestCase {
assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1));
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29392")
public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
@ -315,11 +299,9 @@ public class FlushIT extends ESIntegTestCase {
index("test", "doc", Integer.toString(i));
final ShardsSyncedFlushResult firstSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
logger.info("First seal: {}", syncedFlushDescription(firstSeal));
assertThat(firstSeal.successfulShards(), equalTo(numberOfReplicas + 1));
// Do not renew synced-flush
final ShardsSyncedFlushResult secondSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
logger.info("Second seal: {}", syncedFlushDescription(secondSeal));
assertThat(secondSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(secondSeal.syncId(), equalTo(firstSeal.syncId()));
// Shards were updated, renew synced flush.
@ -328,7 +310,6 @@ public class FlushIT extends ESIntegTestCase {
index("test", "doc", Integer.toString(i));
final ShardsSyncedFlushResult thirdSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
logger.info("Third seal: {}", syncedFlushDescription(thirdSeal));
assertThat(thirdSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(thirdSeal.syncId(), not(equalTo(firstSeal.syncId())));
// Manually remove or change sync-id, renew synced flush.
@ -344,7 +325,6 @@ public class FlushIT extends ESIntegTestCase {
assertThat(shard.commitStats().syncId(), nullValue());
final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
logger.info("Forth seal: {}", syncedFlushDescription(forthSeal));
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
@ -29,6 +29,9 @@ import org.elasticsearch.test.InternalTestCluster;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.test.ESTestCase.assertBusy;
/** Utils for SyncedFlush */
public class SyncedFlushUtil {
@ -40,21 +43,31 @@ public class SyncedFlushUtil {
* Blocking version of {@link SyncedFlushService#attemptSyncedFlush(ShardId, ActionListener)}
public static ShardsSyncedFlushResult attemptSyncedFlush(Logger logger, InternalTestCluster cluster, ShardId shardId) {
public static ShardsSyncedFlushResult attemptSyncedFlush(Logger logger, InternalTestCluster cluster, ShardId shardId) throws Exception {
* When the last indexing operation is completed, we will fire a global checkpoint sync.
* Since a global checkpoint sync request is a replication request, it will acquire an index
* shard permit on the primary when executing. If this happens at the same time while we are
* issuing the synced-flush, the synced-flush request will fail as it thinks there are
* in-flight operations. We can avoid such situation by continuing issuing another synced-flush
* if the synced-flush failed due to the ongoing operations on the primary.
SyncedFlushService service = cluster.getInstance(SyncedFlushService.class);
logger.debug("Issue synced-flush on node [{}], shard [{}], cluster state [{}]",
service.nodeName(), shardId, cluster.clusterService(service.nodeName()).state());
LatchedListener<ShardsSyncedFlushResult> listener = new LatchedListener<>();
service.attemptSyncedFlush(shardId, listener);
try {
AtomicReference<LatchedListener<ShardsSyncedFlushResult>> listenerHolder = new AtomicReference<>();
assertBusy(() -> {
LatchedListener<ShardsSyncedFlushResult> listener = new LatchedListener<>();
service.attemptSyncedFlush(shardId, listener);
} catch (InterruptedException e) {
if (listener.result != null && listener.result.failureReason() != null
&& listener.result.failureReason().contains("ongoing operations on primary")) {
throw new AssertionError(listener.result.failureReason()); // cause the assert busy to retry
if (listenerHolder.get().error != null) {
throw ExceptionsHelper.convertToElastic(listenerHolder.get().error);
if (listener.error != null) {
throw ExceptionsHelper.convertToElastic(listener.error);
return listener.result;
return listenerHolder.get().result;
public static final class LatchedListener<T> implements ActionListener<T> {
@ -44,6 +44,7 @@ public class DocValueFormatTests extends ESTestCase {
entries.add(new Entry(DocValueFormat.class, DocValueFormat.GEOHASH.getWriteableName(), in -> DocValueFormat.GEOHASH));
entries.add(new Entry(DocValueFormat.class, DocValueFormat.IP.getWriteableName(), in -> DocValueFormat.IP));
entries.add(new Entry(DocValueFormat.class, DocValueFormat.RAW.getWriteableName(), in -> DocValueFormat.RAW));
entries.add(new Entry(DocValueFormat.class, DocValueFormat.BINARY.getWriteableName(), in -> DocValueFormat.BINARY));
NamedWriteableRegistry registry = new NamedWriteableRegistry(entries);
BytesStreamOutput out = new BytesStreamOutput();
@ -82,6 +83,11 @@ public class DocValueFormatTests extends ESTestCase {
in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), registry);
assertSame(DocValueFormat.RAW, in.readNamedWriteable(DocValueFormat.class));
out = new BytesStreamOutput();
in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), registry);
assertSame(DocValueFormat.BINARY, in.readNamedWriteable(DocValueFormat.class));
public void testRawFormat() {
@ -96,6 +102,14 @@ public class DocValueFormatTests extends ESTestCase {
assertEquals("abc", DocValueFormat.RAW.format(new BytesRef("abc")));
public void testBinaryFormat() {
assertEquals("", DocValueFormat.BINARY.format(new BytesRef()));
assertEquals("KmQ", DocValueFormat.BINARY.format(new BytesRef(new byte[] {42, 100})));
assertEquals(new BytesRef(), DocValueFormat.BINARY.parseBytesRef(""));
assertEquals(new BytesRef(new byte[] {42, 100}), DocValueFormat.BINARY.parseBytesRef("KmQ"));
public void testBooleanFormat() {
assertEquals(false, DocValueFormat.BOOLEAN.format(0));
assertEquals(true, DocValueFormat.BOOLEAN.format(1));
@ -151,54 +151,55 @@ public class MultiValueModeTests extends ESTestCase {
private void verifySortedNumeric(Supplier<SortedNumericDocValues> supplier, int maxDoc) throws IOException {
for (long missingValue : new long[] { 0, randomLong() }) {
for (MultiValueMode mode : MultiValueMode.values()) {
SortedNumericDocValues values = supplier.get();
final NumericDocValues selected = mode.select(values, missingValue);
for (int i = 0; i < maxDoc; ++i) {
final long actual = selected.longValue();
for (MultiValueMode mode : MultiValueMode.values()) {
SortedNumericDocValues values = supplier.get();
final NumericDocValues selected = mode.select(values);
for (int i = 0; i < maxDoc; ++i) {
Long actual = null;
if (selected.advanceExact(i)) {
actual = selected.longValue();
verifyLongValueCanCalledMoreThanOnce(selected, actual);
long expected = 0;
if (values.advanceExact(i) == false) {
expected = missingValue;
Long expected = null;
if (values.advanceExact(i)) {
int numValues = values.docValueCount();
if (mode == MultiValueMode.MAX) {
expected = Long.MIN_VALUE;
} else if (mode == MultiValueMode.MIN) {
expected = Long.MAX_VALUE;
} else {
int numValues = values.docValueCount();
if (mode == MultiValueMode.MAX) {
expected = Long.MIN_VALUE;
expected = 0L;
for (int j = 0; j < numValues; ++j) {
if (mode == MultiValueMode.SUM || mode == MultiValueMode.AVG) {
expected += values.nextValue();
} else if (mode == MultiValueMode.MIN) {
expected = Long.MAX_VALUE;
for (int j = 0; j < numValues; ++j) {
if (mode == MultiValueMode.SUM || mode == MultiValueMode.AVG) {
expected += values.nextValue();
} else if (mode == MultiValueMode.MIN) {
expected = Math.min(expected, values.nextValue());
} else if (mode == MultiValueMode.MAX) {
expected = Math.max(expected, values.nextValue());
if (mode == MultiValueMode.AVG) {
expected = numValues > 1 ? Math.round((double)expected/(double)numValues) : expected;
} else if (mode == MultiValueMode.MEDIAN) {
int value = numValues/2;
if (numValues % 2 == 0) {
for (int j = 0; j < value - 1; ++j) {
expected = Math.round(((double) values.nextValue() + values.nextValue())/2.0);
} else {
for (int j = 0; j < value; ++j) {
expected = values.nextValue();
expected = Math.min(expected, values.nextValue());
} else if (mode == MultiValueMode.MAX) {
expected = Math.max(expected, values.nextValue());
if (mode == MultiValueMode.AVG) {
expected = numValues > 1 ? Math.round((double)expected/(double)numValues) : expected;
} else if (mode == MultiValueMode.MEDIAN) {
int value = numValues/2;
if (numValues % 2 == 0) {
for (int j = 0; j < value - 1; ++j) {
expected = Math.round(((double) values.nextValue() + values.nextValue())/2.0);
} else {
for (int j = 0; j < value; ++j) {
expected = values.nextValue();
assertEquals(mode.toString() + " docId=" + i, expected, actual);
assertEquals(mode.toString() + " docId=" + i, expected, actual);
@ -326,54 +327,54 @@ public class MultiValueModeTests extends ESTestCase {
private void verifySortedNumericDouble(Supplier<SortedNumericDoubleValues> supplier, int maxDoc) throws IOException {
for (long missingValue : new long[] { 0, randomLong() }) {
for (MultiValueMode mode : MultiValueMode.values()) {
SortedNumericDoubleValues values = supplier.get();
final NumericDoubleValues selected = mode.select(values, missingValue);
for (int i = 0; i < maxDoc; ++i) {
final double actual = selected.doubleValue();
for (MultiValueMode mode : MultiValueMode.values()) {
SortedNumericDoubleValues values = supplier.get();
final NumericDoubleValues selected = mode.select(values);
for (int i = 0; i < maxDoc; ++i) {
Double actual = null;
if (selected.advanceExact(i)) {
actual = selected.doubleValue();
verifyDoubleValueCanCalledMoreThanOnce(selected, actual);
double expected = 0.0;
if (values.advanceExact(i) == false) {
expected = missingValue;
Double expected = null;
if (values.advanceExact(i)) {
int numValues = values.docValueCount();
if (mode == MultiValueMode.MAX) {
expected = Double.NEGATIVE_INFINITY;
} else if (mode == MultiValueMode.MIN) {
expected = Double.POSITIVE_INFINITY;
} else {
int numValues = values.docValueCount();
if (mode == MultiValueMode.MAX) {
expected = Long.MIN_VALUE;
expected = 0d;
for (int j = 0; j < numValues; ++j) {
if (mode == MultiValueMode.SUM || mode == MultiValueMode.AVG) {
expected += values.nextValue();
} else if (mode == MultiValueMode.MIN) {
expected = Long.MAX_VALUE;
for (int j = 0; j < numValues; ++j) {
if (mode == MultiValueMode.SUM || mode == MultiValueMode.AVG) {
expected += values.nextValue();
} else if (mode == MultiValueMode.MIN) {
expected = Math.min(expected, values.nextValue());
} else if (mode == MultiValueMode.MAX) {
expected = Math.max(expected, values.nextValue());
if (mode == MultiValueMode.AVG) {
expected = expected/numValues;
} else if (mode == MultiValueMode.MEDIAN) {
int value = numValues/2;
if (numValues % 2 == 0) {
for (int j = 0; j < value - 1; ++j) {
expected = (values.nextValue() + values.nextValue())/2.0;
} else {
for (int j = 0; j < value; ++j) {
expected = values.nextValue();
expected = Math.min(expected, values.nextValue());
} else if (mode == MultiValueMode.MAX) {
expected = Math.max(expected, values.nextValue());
if (mode == MultiValueMode.AVG) {
expected = expected/numValues;
} else if (mode == MultiValueMode.MEDIAN) {
int value = numValues/2;
if (numValues % 2 == 0) {
for (int j = 0; j < value - 1; ++j) {
expected = (values.nextValue() + values.nextValue())/2.0;
} else {
for (int j = 0; j < value; ++j) {
expected = values.nextValue();
assertEquals(mode.toString() + " docId=" + i, expected, actual, 0.1);
assertEquals(mode.toString() + " docId=" + i, expected, actual);
@ -19,6 +19,7 @@
package org.elasticsearch.search.fields;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
@ -700,7 +701,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
assertThat(fields.get("test_field").getValue(), equalTo("foobar"));
public void testFieldsPulledFromFieldData() throws Exception {
public void testDocValueFields() throws Exception {
String mapping = Strings
@ -744,6 +745,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
.field("type", "binary")
.field("doc_values", true) // off by default on binary fields
.field("type", "ip")
@ -766,6 +768,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
.field("double_field", 6.0d)
.field("date_field", Joda.forPattern("dateOptionalTime").printer().print(date))
.field("boolean_field", true)
.field("binary_field", new byte[] {42, 100})
.field("ip_field", "::1")
@ -782,6 +785,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
SearchResponse searchResponse = builder.execute().actionGet();
@ -790,7 +794,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
Set<String> fields = new HashSet<>(searchResponse.getHits().getAt(0).getFields().keySet());
assertThat(fields, equalTo(newHashSet("byte_field", "short_field", "integer_field", "long_field",
"float_field", "double_field", "date_field", "boolean_field", "text_field", "keyword_field",
"binary_field", "ip_field")));
assertThat(searchResponse.getHits().getAt(0).getFields().get("byte_field").getValue().toString(), equalTo("1"));
assertThat(searchResponse.getHits().getAt(0).getFields().get("short_field").getValue().toString(), equalTo("2"));
@ -802,6 +806,8 @@ public class SearchFieldsIT extends ESIntegTestCase {
assertThat(searchResponse.getHits().getAt(0).getFields().get("boolean_field").getValue(), equalTo((Object) true));
assertThat(searchResponse.getHits().getAt(0).getFields().get("text_field").getValue(), equalTo("foo"));
assertThat(searchResponse.getHits().getAt(0).getFields().get("keyword_field").getValue(), equalTo("foo"));
equalTo(new BytesRef(new byte[] {42, 100})));
assertThat(searchResponse.getHits().getAt(0).getFields().get("ip_field").getValue(), equalTo("::1"));
builder = client().prepareSearch().setQuery(matchAllQuery())
@ -815,6 +821,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
.addDocValueField("double_field", "use_field_mapping")
.addDocValueField("date_field", "use_field_mapping")
.addDocValueField("boolean_field", "use_field_mapping")
.addDocValueField("binary_field", "use_field_mapping")
.addDocValueField("ip_field", "use_field_mapping");
searchResponse = builder.execute().actionGet();
@ -823,7 +830,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
fields = new HashSet<>(searchResponse.getHits().getAt(0).getFields().keySet());
assertThat(fields, equalTo(newHashSet("byte_field", "short_field", "integer_field", "long_field",
"float_field", "double_field", "date_field", "boolean_field", "text_field", "keyword_field",
"binary_field", "ip_field")));
assertThat(searchResponse.getHits().getAt(0).getFields().get("byte_field").getValue().toString(), equalTo("1"));
assertThat(searchResponse.getHits().getAt(0).getFields().get("short_field").getValue().toString(), equalTo("2"));
@ -836,6 +843,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
assertThat(searchResponse.getHits().getAt(0).getFields().get("boolean_field").getValue(), equalTo((Object) true));
assertThat(searchResponse.getHits().getAt(0).getFields().get("text_field").getValue(), equalTo("foo"));
assertThat(searchResponse.getHits().getAt(0).getFields().get("keyword_field").getValue(), equalTo("foo"));
assertThat(searchResponse.getHits().getAt(0).getFields().get("binary_field").getValue(), equalTo("KmQ"));
assertThat(searchResponse.getHits().getAt(0).getFields().get("ip_field").getValue(), equalTo("::1"));
builder = client().prepareSearch().setQuery(matchAllQuery())
@ -94,7 +94,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
fail("Timeout waiting for node [" + node + "] to be blocked");
public SnapshotInfo waitForCompletion(String repository, String snapshotName, TimeValue timeout) throws InterruptedException {
@ -224,52 +224,16 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase {
IOException writeBlobException = expectThrows(IOException.class, () -> {
BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
throw new IOException("Exception thrown in writeBlob() for " + blobName);
public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize) throws IOException {
throw new IOException("Exception thrown in writeBlobAtomic() for " + blobName);
checksumFormat.writeAtomic(blobObj, wrapper, name);
assertEquals("Exception thrown in writeBlob() for pending-" + name, writeBlobException.getMessage());
assertEquals("Exception thrown in writeBlobAtomic() for " + name, writeBlobException.getMessage());
assertEquals(0, writeBlobException.getSuppressed().length);
IOException moveException = expectThrows(IOException.class, () -> {
BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
public void move(String sourceBlobName, String targetBlobName) throws IOException {
throw new IOException("Exception thrown in move() for " + sourceBlobName);
checksumFormat.writeAtomic(blobObj, wrapper, name);
assertEquals("Exception thrown in move() for pending-" + name, moveException.getMessage());
assertEquals(0, moveException.getSuppressed().length);
IOException moveThenDeleteException = expectThrows(IOException.class, () -> {
BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
public void move(String sourceBlobName, String targetBlobName) throws IOException {
throw new IOException("Exception thrown in move() for " + sourceBlobName);
public void deleteBlob(String blobName) throws IOException {
throw new IOException("Exception thrown in deleteBlob() for " + blobName);
checksumFormat.writeAtomic(blobObj, wrapper, name);
assertEquals("Exception thrown in move() for pending-" + name, moveThenDeleteException.getMessage());
assertEquals(1, moveThenDeleteException.getSuppressed().length);
final Throwable suppressedThrowable = moveThenDeleteException.getSuppressed()[0];
assertTrue(suppressedThrowable instanceof IOException);
assertEquals("Exception thrown in deleteBlob() for pending-" + name, suppressedThrowable.getMessage());
protected BlobStore createTestBlobStore() throws IOException {
@ -53,11 +53,21 @@ public class BlobContainerWrapper implements BlobContainer {
delegate.writeBlob(blobName, inputStream, blobSize);
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
delegate.writeBlobAtomic(blobName, inputStream, blobSize);
public void deleteBlob(String blobName) throws IOException {
public void deleteBlobIgnoringIfNotExists(final String blobName) throws IOException {
public Map<String, BlobMetaData> listBlobs() throws IOException {
return delegate.listBlobs();
@ -19,6 +19,28 @@
package org.elasticsearch.snapshots.mockstore;
import com.carrotsearch.randomizedtesting.RandomizedContext;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotId;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
@ -29,31 +51,11 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import com.carrotsearch.randomizedtesting.RandomizedContext;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.io.PathUtils;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotId;
public class MockRepository extends FsRepository {
public static class Plugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin {
@ -325,6 +327,12 @@ public class MockRepository extends FsRepository {
public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException {
public Map<String, BlobMetaData> listBlobs() throws IOException {
@ -365,6 +373,31 @@ public class MockRepository extends FsRepository {
public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
final Random random = RandomizedContext.current().getRandom();
if (random.nextBoolean()) {
if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) {
// Simulate a failure between the write and move operation in FsBlobContainer
final String tempBlobName = FsBlobContainer.tempBlobName(blobName);
super.writeBlob(tempBlobName, inputStream, blobSize);
final FsBlobContainer fsBlobContainer = (FsBlobContainer) delegate();
fsBlobContainer.move(tempBlobName, blobName);
} else {
// Atomic write since it is potentially supported
// by the delegating blob container
super.writeBlobAtomic(blobName, inputStream, blobSize);
} else {
// Simulate a non-atomic write since many blob container
// implementations does not support atomic write
super.writeBlob(blobName, inputStream, blobSize);
@ -158,7 +158,11 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
protected void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray) throws IOException {
try (InputStream stream = bytesArray.streamInput()) {
container.writeBlob(blobName, stream, bytesArray.length());
if (randomBoolean()) {
container.writeBlob(blobName, stream, bytesArray.length());
} else {
container.writeBlobAtomic(blobName, stream, bytesArray.length());
@ -411,7 +411,8 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste
// auto-generate license if no licenses ever existed or if the current license is basic and
// needs extended or if the license signature needs to be updated. this will trigger a subsequent cluster changed event
if (currentClusterState.getNodes().isLocalNodeElectedMaster() &&
(noLicense || LicenseUtils.licenseNeedsExtended(currentLicense) || LicenseUtils.signatureNeedsUpdate(currentLicense))) {
(noLicense || LicenseUtils.licenseNeedsExtended(currentLicense) ||
LicenseUtils.signatureNeedsUpdate(currentLicense, currentClusterState.nodes()))) {
} else if (logger.isDebugEnabled()) {
@ -6,8 +6,12 @@
package org.elasticsearch.license;
import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.rest.RestStatus;
import java.util.stream.StreamSupport;
public class LicenseUtils {
public static final String EXPIRED_FEATURE_METADATA = "es.license.expired.feature";
@ -42,8 +46,25 @@ public class LicenseUtils {
* Checks if the signature of a self generated license with older version needs to be
* recreated with the new key
public static boolean signatureNeedsUpdate(License license) {
public static boolean signatureNeedsUpdate(License license, DiscoveryNodes currentNodes) {
assert License.VERSION_CRYPTO_ALGORITHMS == License.VERSION_CURRENT : "update this method when adding a new version";
return ("basic".equals(license.type()) || "trial".equals(license.type())) &&
(license.version() < License.VERSION_CRYPTO_ALGORITHMS);
// only upgrade signature when all nodes are ready to deserialize the new signature
(license.version() < License.VERSION_CRYPTO_ALGORITHMS &&
compatibleLicenseVersion(currentNodes) == License.VERSION_CRYPTO_ALGORITHMS
public static int compatibleLicenseVersion(DiscoveryNodes currentNodes) {
assert License.VERSION_CRYPTO_ALGORITHMS == License.VERSION_CURRENT : "update this method when adding a new version";
if (StreamSupport.stream(currentNodes.spliterator(), false)
.allMatch(node -> node.getVersion().onOrAfter(Version.V_6_4_0))) {
// License.VERSION_CRYPTO_ALGORITHMS was introduced in 6.4.0
} else {
return License.VERSION_START_DATE;
@ -5,6 +5,7 @@
package org.elasticsearch.license;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@ -26,8 +27,8 @@ import static org.elasticsearch.license.CryptUtils.decrypt;
class SelfGeneratedLicense {
public static License create(License.Builder specBuilder) {
return create(specBuilder, License.VERSION_CURRENT);
public static License create(License.Builder specBuilder, DiscoveryNodes currentNodes) {
return create(specBuilder, LicenseUtils.compatibleLicenseVersion(currentNodes));
public static License create(License.Builder specBuilder, int version) {
@ -73,7 +73,7 @@ public class StartBasicClusterTask extends ClusterStateUpdateTask {
License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder);
License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, currentState.nodes());
if (request.isAcknowledged() == false && currentLicense != null) {
Map<String, String[]> ackMessages = LicenseService.getAckMessages(selfGeneratedLicense, currentLicense);
if (ackMessages.isEmpty() == false) {
@ -82,7 +82,7 @@ public class StartTrialClusterTask extends ClusterStateUpdateTask {
License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder);
License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, currentState.nodes());
LicensesMetaData newLicensesMetaData = new LicensesMetaData(selfGeneratedLicense, Version.CURRENT);
mdBuilder.putCustom(LicensesMetaData.TYPE, newLicensesMetaData);
return ClusterState.builder(currentState).metaData(mdBuilder).build();
@ -61,7 +61,7 @@ public class StartupSelfGeneratedLicenseTask extends ClusterStateUpdateTask {
"]. Must be trial or basic.");
return updateWithLicense(currentState, type);
} else if (LicenseUtils.signatureNeedsUpdate(currentLicensesMetaData.getLicense())) {
} else if (LicenseUtils.signatureNeedsUpdate(currentLicensesMetaData.getLicense(), currentState.nodes())) {
return updateLicenseSignature(currentState, currentLicensesMetaData);
} else if (LicenseUtils.licenseNeedsExtended(currentLicensesMetaData.getLicense())) {
return extendBasic(currentState, currentLicensesMetaData);
@ -87,7 +87,7 @@ public class StartupSelfGeneratedLicenseTask extends ClusterStateUpdateTask {
License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder);
License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, currentState.nodes());
Version trialVersion = currentLicenseMetaData.getMostRecentTrialVersion();
LicensesMetaData newLicenseMetadata = new LicensesMetaData(selfGeneratedLicense, trialVersion);
mdBuilder.putCustom(LicensesMetaData.TYPE, newLicenseMetadata);
@ -120,7 +120,7 @@ public class StartupSelfGeneratedLicenseTask extends ClusterStateUpdateTask {
License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder);
License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, currentLicense.version());
Version trialVersion = currentLicenseMetadata.getMostRecentTrialVersion();
return new LicensesMetaData(selfGeneratedLicense, trialVersion);
@ -141,7 +141,7 @@ public class StartupSelfGeneratedLicenseTask extends ClusterStateUpdateTask {
License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder);
License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, currentState.nodes());
LicensesMetaData licensesMetaData;
if ("trial".equals(type)) {
licensesMetaData = new LicensesMetaData(selfGeneratedLicense, Version.CURRENT);
@ -104,7 +104,7 @@ public class LicenseRegistrationTests extends AbstractLicenseServiceTestCase {
.issueDate(dateMath("now-10h", now))
.expiryDate(dateMath("now-2h", now));
License license = SelfGeneratedLicense.create(builder);
License license = SelfGeneratedLicense.create(builder, License.VERSION_CURRENT);
XPackLicenseState licenseState = new XPackLicenseState(Settings.EMPTY);
setInitialState(license, licenseState, Settings.EMPTY);
@ -125,4 +125,4 @@ public class LicenseRegistrationTests extends AbstractLicenseServiceTestCase {
assertEquals(LicenseService.BASIC_SELF_GENERATED_LICENSE_EXPIRATION_MILLIS, licenseMetaData.getLicense().expiryDate());
assertEquals(uid, licenseMetaData.getLicense().uid());
@ -111,7 +111,7 @@ public class LicenseSerializationTests extends ESTestCase {
License license = SelfGeneratedLicense.create(specBuilder);
License license = SelfGeneratedLicense.create(specBuilder, License.VERSION_CURRENT);
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
license.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap(License.REST_VIEW_MODE, "true")));
@ -95,7 +95,7 @@ public class LicensesMetaDataSerializationTests extends ESTestCase {
.type(randomBoolean() ? "trial" : "basic")
.expiryDate(issueDate + TimeValue.timeValueHours(2).getMillis());
final License trialLicense = SelfGeneratedLicense.create(specBuilder);
final License trialLicense = SelfGeneratedLicense.create(specBuilder, License.VERSION_CURRENT);
LicensesMetaData licensesMetaData = new LicensesMetaData(trialLicense, Version.CURRENT);
XContentBuilder builder = XContentFactory.jsonBuilder();
@ -34,7 +34,7 @@ public class SelfGeneratedLicenseTests extends ESTestCase {
.type(randomBoolean() ? "trial" : "basic")
.expiryDate(issueDate + TimeValue.timeValueHours(2).getMillis());
License trialLicense = SelfGeneratedLicense.create(specBuilder);
License trialLicense = SelfGeneratedLicense.create(specBuilder, License.VERSION_CURRENT);
assertThat(SelfGeneratedLicense.verify(trialLicense), equalTo(true));
@ -47,7 +47,7 @@ public class SelfGeneratedLicenseTests extends ESTestCase {
.expiryDate(issueDate + TimeValue.timeValueHours(2).getMillis());
License trialLicense = SelfGeneratedLicense.create(specBuilder);
License trialLicense = SelfGeneratedLicense.create(specBuilder, License.VERSION_CURRENT);
final String originalSignature = trialLicense.signature();
License tamperedLicense = License.builder().fromLicenseSpec(trialLicense, originalSignature)
.expiryDate(System.currentTimeMillis() + TimeValue.timeValueHours(5).getMillis())
@ -70,7 +70,8 @@ public class SelfGeneratedLicenseTests extends ESTestCase {
.expiryDate(issueDate + TimeValue.timeValueHours(2).getMillis());
License pre20TrialLicense = specBuilder.build();
License license = SelfGeneratedLicense.create(License.builder().fromPre20LicenseSpec(pre20TrialLicense).type("trial"));
License license = SelfGeneratedLicense.create(License.builder().fromPre20LicenseSpec(pre20TrialLicense).type("trial"),
assertThat(SelfGeneratedLicense.verify(license), equalTo(true));
@ -39,6 +39,7 @@ import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
@ -66,6 +67,7 @@ import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
@ -271,91 +273,38 @@ public class TransportRollupSearchAction extends TransportAction<SearchRequest,
rewriteQuery(((BoostingQueryBuilder)builder).positiveQuery(), jobCaps));
} else if (builder.getWriteableName().equals(DisMaxQueryBuilder.NAME)) {
DisMaxQueryBuilder rewritten = new DisMaxQueryBuilder();
((DisMaxQueryBuilder)builder).innerQueries().forEach(query -> rewritten.add(rewriteQuery(query, jobCaps)));
((DisMaxQueryBuilder) builder).innerQueries().forEach(query -> rewritten.add(rewriteQuery(query, jobCaps)));
return rewritten;
} else if (builder.getWriteableName().equals(RangeQueryBuilder.NAME) || builder.getWriteableName().equals(TermQueryBuilder.NAME)) {
} else if (builder.getWriteableName().equals(RangeQueryBuilder.NAME)) {
RangeQueryBuilder range = (RangeQueryBuilder) builder;
String fieldName = range.fieldName();
// Many range queries don't include the timezone because the default is UTC, but the query
// builder will return null so we need to set it here
String timeZone = range.timeZone() == null ? DateTimeZone.UTC.toString() : range.timeZone();
String fieldName = builder.getWriteableName().equals(RangeQueryBuilder.NAME)
? ((RangeQueryBuilder)builder).fieldName()
: ((TermQueryBuilder)builder).fieldName();
List<String> incorrectTimeZones = new ArrayList<>();
List<String> rewrittenFieldName = jobCaps.stream()
// We only care about job caps that have the query's target field
.filter(caps -> caps.getFieldCaps().keySet().contains(fieldName))
.map(caps -> {
RollupJobCaps.RollupFieldCaps fieldCaps = caps.getFieldCaps().get(fieldName);
return fieldCaps.getAggs().stream()
// For now, we only allow filtering on grouping fields
.filter(agg -> {
String type = (String)agg.get(RollupField.AGG);
// If the cap is for a date_histo, and the query is a range, the timezones need to match
if (type.equals(DateHistogramAggregationBuilder.NAME) && builder instanceof RangeQueryBuilder) {
String timeZone = ((RangeQueryBuilder)builder).timeZone();
// Many range queries don't include the timezone because the default is UTC, but the query
// builder will return null so we need to set it here
if (timeZone == null) {
timeZone = DateTimeZone.UTC.toString();
boolean matchingTZ = ((String)agg.get(DateHistoGroupConfig.TIME_ZONE.getPreferredName()))
if (matchingTZ == false) {
return matchingTZ;
// Otherwise just make sure it's one of the three groups
return type.equals(TermsAggregationBuilder.NAME)
|| type.equals(DateHistogramAggregationBuilder.NAME)
|| type.equals(HistogramAggregationBuilder.NAME);
// Rewrite the field name to our convention (e.g. "foo" -> "date_histogram.foo.timestamp")
.map(agg -> {
if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) {
return RollupField.formatFieldName(fieldName, (String)agg.get(RollupField.AGG), RollupField.TIMESTAMP);
} else {
return RollupField.formatFieldName(fieldName, (String)agg.get(RollupField.AGG), RollupField.VALUE);
.collect(ArrayList::new, List::addAll, List::addAll);
if (rewrittenFieldName.isEmpty()) {
if (incorrectTimeZones.isEmpty()) {
throw new IllegalArgumentException("Field [" + fieldName + "] in [" + builder.getWriteableName()
+ "] query is not available in selected rollup indices, cannot query.");
} else {
throw new IllegalArgumentException("Field [" + fieldName + "] in [" + builder.getWriteableName()
+ "] query was found in rollup indices, but requested timezone is not compatible. Options include: "
+ incorrectTimeZones);
String rewrittenFieldName = rewriteFieldName(jobCaps, RangeQueryBuilder.NAME, fieldName, timeZone);
RangeQueryBuilder rewritten = new RangeQueryBuilder(rewrittenFieldName)
if (range.timeZone() != null) {
if (rewrittenFieldName.size() > 1) {
throw new IllegalArgumentException("Ambiguous field name resolution when mapping to rolled fields. Field name [" +
fieldName + "] was mapped to: [" + Strings.collectionToDelimitedString(rewrittenFieldName, ",") + "].");
if (range.format() != null) {
//Note: instanceof here to make casting checks happier
if (builder instanceof RangeQueryBuilder) {
RangeQueryBuilder rewritten = new RangeQueryBuilder(rewrittenFieldName.get(0));
RangeQueryBuilder original = (RangeQueryBuilder)builder;
if (original.timeZone() != null) {
return rewritten;
} else {
return new TermQueryBuilder(rewrittenFieldName.get(0), ((TermQueryBuilder)builder).value());
return rewritten;
} else if (builder.getWriteableName().equals(TermQueryBuilder.NAME)) {
TermQueryBuilder term = (TermQueryBuilder) builder;
String fieldName = term.fieldName();
String rewrittenFieldName = rewriteFieldName(jobCaps, TermQueryBuilder.NAME, fieldName, null);
return new TermQueryBuilder(rewrittenFieldName, term.value());
} else if (builder.getWriteableName().equals(TermsQueryBuilder.NAME)) {
TermsQueryBuilder terms = (TermsQueryBuilder) builder;
String fieldName = terms.fieldName();
String rewrittenFieldName = rewriteFieldName(jobCaps, TermQueryBuilder.NAME, fieldName, null);
return new TermsQueryBuilder(rewrittenFieldName, terms.values());
} else if (builder.getWriteableName().equals(MatchAllQueryBuilder.NAME)) {
// no-op
return builder;
@ -364,6 +313,64 @@ public class TransportRollupSearchAction extends TransportAction<SearchRequest,
private static String rewriteFieldName(Set<RollupJobCaps> jobCaps,
String builderName,
String fieldName,
String timeZone) {
List<String> incompatibleTimeZones = timeZone == null ? Collections.emptyList() : new ArrayList<>();
List<String> rewrittenFieldNames = jobCaps.stream()
// We only care about job caps that have the query's target field
.filter(caps -> caps.getFieldCaps().keySet().contains(fieldName))
.map(caps -> {
RollupJobCaps.RollupFieldCaps fieldCaps = caps.getFieldCaps().get(fieldName);
return fieldCaps.getAggs().stream()
// For now, we only allow filtering on grouping fields
.filter(agg -> {
String type = (String)agg.get(RollupField.AGG);
// If the cap is for a date_histo, and the query is a range, the timezones need to match
if (type.equals(DateHistogramAggregationBuilder.NAME) && timeZone != null) {
boolean matchingTZ = ((String)agg.get(DateHistoGroupConfig.TIME_ZONE.getPreferredName()))
if (matchingTZ == false) {
return matchingTZ;
// Otherwise just make sure it's one of the three groups
return type.equals(TermsAggregationBuilder.NAME)
|| type.equals(DateHistogramAggregationBuilder.NAME)
|| type.equals(HistogramAggregationBuilder.NAME);
// Rewrite the field name to our convention (e.g. "foo" -> "date_histogram.foo.timestamp")
.map(agg -> {
if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) {
return RollupField.formatFieldName(fieldName, (String)agg.get(RollupField.AGG), RollupField.TIMESTAMP);
} else {
return RollupField.formatFieldName(fieldName, (String)agg.get(RollupField.AGG), RollupField.VALUE);
.collect(ArrayList::new, List::addAll, List::addAll);
if (rewrittenFieldNames.isEmpty()) {
if (incompatibleTimeZones.isEmpty()) {
throw new IllegalArgumentException("Field [" + fieldName + "] in [" + builderName
+ "] query is not available in selected rollup indices, cannot query.");
} else {
throw new IllegalArgumentException("Field [" + fieldName + "] in [" + builderName
+ "] query was found in rollup indices, but requested timezone is not compatible. Options include: "
+ incompatibleTimeZones);
} else if (rewrittenFieldNames.size() > 1) {
throw new IllegalArgumentException("Ambiguous field name resolution when mapping to rolled fields. Field name [" +
fieldName + "] was mapped to: [" + Strings.collectionToDelimitedString(rewrittenFieldNames, ",") + "].");
} else {
return rewrittenFieldNames.get(0);
static RollupSearchContext separateIndices(String[] indices, ImmutableOpenMap<String, IndexMetaData> indexMetaData) {
if (indices.length == 0) {
@ -25,9 +25,11 @@ import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.DisMaxQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.MatchPhraseQueryBuilder;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.script.ScriptService;
@ -61,6 +63,7 @@ import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -153,7 +156,7 @@ public class SearchActionTests extends ESTestCase {
"compatible. Options include: [UTC]"));
public void testTerms() {
public void testTermQuery() {
RollupJobConfig.Builder job = ConfigTestHelpers.getRollupJob("foo");
GroupConfig.Builder group = ConfigTestHelpers.getGroupConfig();
@ -166,6 +169,23 @@ public class SearchActionTests extends ESTestCase {
assertThat(((TermQueryBuilder)rewritten).fieldName(), equalTo("foo.terms.value"));
public void testTermsQuery() {
RollupJobConfig.Builder job = ConfigTestHelpers.getRollupJob("foo");
GroupConfig.Builder group = ConfigTestHelpers.getGroupConfig();
RollupJobCaps cap = new RollupJobCaps(job.build());
Set<RollupJobCaps> caps = new HashSet<>();
QueryBuilder original = new TermsQueryBuilder("foo", Arrays.asList("bar", "baz"));
QueryBuilder rewritten =
TransportRollupSearchAction.rewriteQuery(original, caps);
assertThat(rewritten, instanceOf(TermsQueryBuilder.class));
assertNotSame(rewritten, original);
assertThat(((TermsQueryBuilder)rewritten).fieldName(), equalTo("foo.terms.value"));
assertThat(((TermsQueryBuilder)rewritten).values(), equalTo(Arrays.asList("bar", "baz")));
public void testCompounds() {
RollupJobConfig.Builder job = ConfigTestHelpers.getRollupJob("foo");
GroupConfig.Builder group = ConfigTestHelpers.getGroupConfig();
@ -7,7 +7,6 @@ package org.elasticsearch.xpack.security;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
@ -17,7 +16,6 @@ import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
@ -112,7 +110,6 @@ import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField;
import org.elasticsearch.xpack.core.security.authc.DefaultAuthenticationFailureHandler;
import org.elasticsearch.xpack.core.security.authc.Realm;
import org.elasticsearch.xpack.core.security.authc.RealmSettings;
import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
import org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken;
import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
@ -934,7 +931,8 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
if (enabled) {
return new ValidateTLSOnJoin(XPackSettings.TRANSPORT_SSL_ENABLED.get(settings),
.andThen(new ValidateUpgradedSecurityIndex());
.andThen(new ValidateUpgradedSecurityIndex())
.andThen(new ValidateLicenseCanBeDeserialized());
return null;
@ -971,6 +969,17 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
static final class ValidateLicenseCanBeDeserialized implements BiConsumer<DiscoveryNode, ClusterState> {
public void accept(DiscoveryNode node, ClusterState state) {
License license = LicenseService.getLicense(state.metaData());
if (license != null && license.version() >= License.VERSION_CRYPTO_ALGORITHMS && node.getVersion().before(Version.V_6_4_0)) {
throw new IllegalStateException("node " + node + " is on version [" + node.getVersion() +
"] that cannot deserialize the license format [" + license.version() + "], upgrade node to at least 6.4.0");
public void reloadSPI(ClassLoader loader) {
@ -28,6 +28,7 @@ import org.elasticsearch.license.TestUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackSettings;
@ -278,6 +279,19 @@ public class SecurityTests extends ESTestCase {
public void testJoinValidatorForLicenseDeserialization() throws Exception {
DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
VersionUtils.randomVersionBetween(random(), null, Version.V_6_3_0));
MetaData.Builder builder = MetaData.builder();
License license = TestUtils.generateSignedLicense(null,
randomIntBetween(License.VERSION_CRYPTO_ALGORITHMS, License.VERSION_CURRENT), -1, TimeValue.timeValueHours(24));
TestUtils.putLicense(builder, license);
ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metaData(builder.build()).build();
IllegalStateException e = expectThrows(IllegalStateException.class,
() -> new Security.ValidateLicenseCanBeDeserialized().accept(node, state));
assertThat(e.getMessage(), containsString("cannot deserialize the license format"));
public void testIndexJoinValidator_Old_And_Rolling() throws Exception {
BiConsumer<DiscoveryNode, ClusterState> joinValidator = security.getJoinValidator();
@ -345,7 +359,7 @@ public class SecurityTests extends ESTestCase {
joinValidator.accept(node, clusterState);
public void testGetFieldFilterSecurityEnabled() throws Exception {
Function<String, Predicate<String>> fieldFilter = security.getFieldFilter();
@ -239,8 +239,6 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase {
throw e;
List<ForecastRequestStats> forecastStats = getForecastStats();
assertThat(forecastStats.size(), equalTo(1));
ForecastRequestStats forecastRequestStats = forecastStats.get(0);
@ -248,6 +246,21 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase {
assertThat(forecastRequestStats.getRecordCount(), equalTo(8000L));
assertThat(forecasts.size(), equalTo(8000));
// run forecast a 2nd time
try {
String forecastId = forecast(job.getId(), TimeValue.timeValueHours(1), null);
waitForecastToFinish(job.getId(), forecastId);
} catch (ElasticsearchStatusException e) {
if (e.getMessage().contains("disk space")) {
throw new ElasticsearchStatusException(
"Test likely fails due to insufficient disk space on test machine, please free up space.", e.status(), e);
throw e;
private void createDataWithLotsOfClientIps(TimeValue bucketSpan, Job.Builder job) throws IOException {
Reference in New Issue
Block a user