extends BlobStoreForm
}
/**
- * Writes blob in atomic manner with resolving the blob name using {@link #blobName} and {@link #tempBlobName} methods.
+ * Writes blob in atomic manner with resolving the blob name using {@link #blobName} method.
*
* The blob will be compressed and checksum will be written if required.
*
@@ -131,20 +130,12 @@ public class ChecksumBlobStoreFormat extends BlobStoreForm
* @param name blob name
*/
public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws IOException {
- String blobName = blobName(name);
- String tempBlobName = tempBlobName(name);
- writeBlob(obj, blobContainer, tempBlobName);
- try {
- blobContainer.move(tempBlobName, blobName);
- } catch (IOException ex) {
- // Move failed - try cleaning up
- try {
- blobContainer.deleteBlob(tempBlobName);
- } catch (Exception e) {
- ex.addSuppressed(e);
+ final String blobName = blobName(name);
+ writeTo(obj, blobName, bytesArray -> {
+ try (InputStream stream = bytesArray.streamInput()) {
+ blobContainer.writeBlobAtomic(blobName, stream, bytesArray.length());
}
- throw ex;
- }
+ });
}
/**
@@ -157,51 +148,35 @@ public class ChecksumBlobStoreFormat extends BlobStoreForm
* @param name blob name
*/
public void write(T obj, BlobContainer blobContainer, String name) throws IOException {
- String blobName = blobName(name);
- writeBlob(obj, blobContainer, blobName);
+ final String blobName = blobName(name);
+ writeTo(obj, blobName, bytesArray -> {
+ try (InputStream stream = bytesArray.streamInput()) {
+ blobContainer.writeBlob(blobName, stream, bytesArray.length());
+ }
+ });
}
- /**
- * Writes blob in atomic manner without resolving the blobName using using {@link #blobName} method.
- *
- * The blob will be compressed and checksum will be written if required.
- *
- * @param obj object to be serialized
- * @param blobContainer blob container
- * @param blobName blob name
- */
- protected void writeBlob(T obj, BlobContainer blobContainer, String blobName) throws IOException {
- BytesReference bytes = write(obj);
- try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
+ private void writeTo(final T obj, final String blobName, final CheckedConsumer consumer) throws IOException {
+ final BytesReference bytes = write(obj);
+ try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")";
- try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, byteArrayOutputStream, BUFFER_SIZE)) {
+ try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, outputStream, BUFFER_SIZE)) {
CodecUtil.writeHeader(indexOutput, codec, VERSION);
try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
@Override
public void close() throws IOException {
// this is important since some of the XContentBuilders write bytes on close.
// in order to write the footer we need to prevent closing the actual index input.
- } }) {
+ }
+ }) {
bytes.writeTo(indexOutputOutputStream);
}
CodecUtil.writeFooter(indexOutput);
}
- BytesArray bytesArray = new BytesArray(byteArrayOutputStream.toByteArray());
- try (InputStream stream = bytesArray.streamInput()) {
- blobContainer.writeBlob(blobName, stream, bytesArray.length());
- }
+ consumer.accept(new BytesArray(outputStream.toByteArray()));
}
}
- /**
- * Returns true if the blob is a leftover temporary blob.
- *
- * The temporary blobs might be left after failed atomic write operation.
- */
- public boolean isTempBlobName(String blobName) {
- return blobName.startsWith(ChecksumBlobStoreFormat.TEMP_FILE_PREFIX);
- }
-
protected BytesReference write(T obj) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
if (compress) {
@@ -222,10 +197,4 @@ public class ChecksumBlobStoreFormat extends BlobStoreForm
builder.endObject();
}
}
-
-
- protected String tempBlobName(String name) {
- return TEMP_FILE_PREFIX + String.format(Locale.ROOT, blobNameFormat, name);
- }
-
}
diff --git a/server/src/main/java/org/elasticsearch/search/DocValueFormat.java b/server/src/main/java/org/elasticsearch/search/DocValueFormat.java
index 8677370fc99..242e0887473 100644
--- a/server/src/main/java/org/elasticsearch/search/DocValueFormat.java
+++ b/server/src/main/java/org/elasticsearch/search/DocValueFormat.java
@@ -39,6 +39,7 @@ import java.text.DecimalFormatSymbols;
import java.text.NumberFormat;
import java.text.ParseException;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Locale;
import java.util.Objects;
import java.util.function.LongSupplier;
@@ -121,6 +122,50 @@ public interface DocValueFormat extends NamedWriteable {
}
};
+ DocValueFormat BINARY = new DocValueFormat() {
+
+ @Override
+ public String getWriteableName() {
+ return "binary";
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ }
+
+ @Override
+ public Object format(long value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object format(double value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String format(BytesRef value) {
+ return Base64.getEncoder()
+ .withoutPadding()
+ .encodeToString(Arrays.copyOfRange(value.bytes, value.offset, value.offset + value.length));
+ }
+
+ @Override
+ public long parseLong(String value, boolean roundUp, LongSupplier now) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public double parseDouble(String value, boolean roundUp, LongSupplier now) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BytesRef parseBytesRef(String value) {
+ return new BytesRef(Base64.getDecoder().decode(value));
+ }
+ };
+
final class DateTime implements DocValueFormat {
public static final String NAME = "date_time";
diff --git a/server/src/main/java/org/elasticsearch/search/MultiValueMode.java b/server/src/main/java/org/elasticsearch/search/MultiValueMode.java
index b2ee4b8ffbd..eaaa5f74fa4 100644
--- a/server/src/main/java/org/elasticsearch/search/MultiValueMode.java
+++ b/server/src/main/java/org/elasticsearch/search/MultiValueMode.java
@@ -411,29 +411,10 @@ public enum MultiValueMode implements Writeable {
*
* Allowed Modes: SUM, AVG, MEDIAN, MIN, MAX
*/
- public NumericDocValues select(final SortedNumericDocValues values, final long missingValue) {
+ public NumericDocValues select(final SortedNumericDocValues values) {
final NumericDocValues singleton = DocValues.unwrapSingleton(values);
if (singleton != null) {
- return new AbstractNumericDocValues() {
-
- private long value;
-
- @Override
- public boolean advanceExact(int target) throws IOException {
- this.value = singleton.advanceExact(target) ? singleton.longValue() : missingValue;
- return true;
- }
-
- @Override
- public int docID() {
- return singleton.docID();
- }
-
- @Override
- public long longValue() throws IOException {
- return this.value;
- }
- };
+ return singleton;
} else {
return new AbstractNumericDocValues() {
@@ -441,8 +422,11 @@ public enum MultiValueMode implements Writeable {
@Override
public boolean advanceExact(int target) throws IOException {
- this.value = values.advanceExact(target) ? pick(values) : missingValue;
- return true;
+ if (values.advanceExact(target)) {
+ value = pick(values);
+ return true;
+ }
+ return false;
}
@Override
@@ -476,7 +460,7 @@ public enum MultiValueMode implements Writeable {
*/
public NumericDocValues select(final SortedNumericDocValues values, final long missingValue, final BitSet parentDocs, final DocIdSetIterator childDocs, int maxDoc) throws IOException {
if (parentDocs == null || childDocs == null) {
- return select(DocValues.emptySortedNumeric(maxDoc), missingValue);
+ return FieldData.replaceMissing(DocValues.emptyNumeric(), missingValue);
}
return new AbstractNumericDocValues() {
@@ -529,23 +513,10 @@ public enum MultiValueMode implements Writeable {
*
* Allowed Modes: SUM, AVG, MEDIAN, MIN, MAX
*/
- public NumericDoubleValues select(final SortedNumericDoubleValues values, final double missingValue) {
+ public NumericDoubleValues select(final SortedNumericDoubleValues values) {
final NumericDoubleValues singleton = FieldData.unwrapSingleton(values);
if (singleton != null) {
- return new NumericDoubleValues() {
- private double value;
-
- @Override
- public boolean advanceExact(int doc) throws IOException {
- this.value = singleton.advanceExact(doc) ? singleton.doubleValue() : missingValue;
- return true;
- }
-
- @Override
- public double doubleValue() throws IOException {
- return this.value;
- }
- };
+ return singleton;
} else {
return new NumericDoubleValues() {
@@ -553,8 +524,11 @@ public enum MultiValueMode implements Writeable {
@Override
public boolean advanceExact(int target) throws IOException {
- value = values.advanceExact(target) ? pick(values) : missingValue;
- return true;
+ if (values.advanceExact(target)) {
+ value = pick(values);
+ return true;
+ }
+ return false;
}
@Override
@@ -583,7 +557,7 @@ public enum MultiValueMode implements Writeable {
*/
public NumericDoubleValues select(final SortedNumericDoubleValues values, final double missingValue, final BitSet parentDocs, final DocIdSetIterator childDocs, int maxDoc) throws IOException {
if (parentDocs == null || childDocs == null) {
- return select(FieldData.emptySortedNumericDoubles(), missingValue);
+ return FieldData.replaceMissing(FieldData.emptyNumericDouble(), missingValue);
}
return new NumericDoubleValues() {
diff --git a/server/src/main/java/org/elasticsearch/search/SearchModule.java b/server/src/main/java/org/elasticsearch/search/SearchModule.java
index 66ea407f42a..869dfe995ed 100644
--- a/server/src/main/java/org/elasticsearch/search/SearchModule.java
+++ b/server/src/main/java/org/elasticsearch/search/SearchModule.java
@@ -645,6 +645,7 @@ public class SearchModule {
registerValueFormat(DocValueFormat.GEOHASH.getWriteableName(), in -> DocValueFormat.GEOHASH);
registerValueFormat(DocValueFormat.IP.getWriteableName(), in -> DocValueFormat.IP);
registerValueFormat(DocValueFormat.RAW.getWriteableName(), in -> DocValueFormat.RAW);
+ registerValueFormat(DocValueFormat.BINARY.getWriteableName(), in -> DocValueFormat.BINARY);
}
/**
diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/max/MaxAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/max/MaxAggregator.java
index 8ef4d0b7e29..ff76e6637ba 100644
--- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/max/MaxAggregator.java
+++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/max/MaxAggregator.java
@@ -72,7 +72,7 @@ public class MaxAggregator extends NumericMetricsAggregator.SingleValue {
}
final BigArrays bigArrays = context.bigArrays();
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
- final NumericDoubleValues values = MultiValueMode.MAX.select(allValues, Double.NEGATIVE_INFINITY);
+ final NumericDoubleValues values = MultiValueMode.MAX.select(allValues);
return new LeafBucketCollectorBase(sub, allValues) {
@Override
diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/min/MinAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/min/MinAggregator.java
index f355f55139c..e4b371514bd 100644
--- a/server/src/main/java/org/elasticsearch/search/aggregations/metrics/min/MinAggregator.java
+++ b/server/src/main/java/org/elasticsearch/search/aggregations/metrics/min/MinAggregator.java
@@ -71,7 +71,7 @@ public class MinAggregator extends NumericMetricsAggregator.SingleValue {
}
final BigArrays bigArrays = context.bigArrays();
final SortedNumericDoubleValues allValues = valuesSource.doubleValues(ctx);
- final NumericDoubleValues values = MultiValueMode.MIN.select(allValues, Double.POSITIVE_INFINITY);
+ final NumericDoubleValues values = MultiValueMode.MIN.select(allValues);
return new LeafBucketCollectorBase(sub, allValues) {
@Override
diff --git a/server/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortBuilder.java b/server/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortBuilder.java
index 1d488a58857..cfa5a240dea 100644
--- a/server/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortBuilder.java
+++ b/server/src/main/java/org/elasticsearch/search/sort/GeoDistanceSortBuilder.java
@@ -41,6 +41,7 @@ import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParser.Token;
+import org.elasticsearch.index.fielddata.FieldData;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldData.XFieldComparatorSource.Nested;
import org.elasticsearch.index.fielddata.IndexGeoPointFieldData;
@@ -637,7 +638,7 @@ public class GeoDistanceSortBuilder extends SortBuilder
localPoints);
final NumericDoubleValues selectedValues;
if (nested == null) {
- selectedValues = finalSortMode.select(distanceValues, Double.POSITIVE_INFINITY);
+ selectedValues = FieldData.replaceMissing(finalSortMode.select(distanceValues), Double.POSITIVE_INFINITY);
} else {
final BitSet rootDocs = nested.rootDocs(context);
final DocIdSetIterator innerDocs = nested.innerDocs(context);
diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponseTests.java
index c15c0a1be7f..e8bd14b640d 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponseTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponseTests.java
@@ -102,6 +102,6 @@ public class ClusterUpdateSettingsResponseTests extends AbstractStreamableXConte
public void testOldSerialisation() throws IOException {
ClusterUpdateSettingsResponse original = createTestInstance();
- assertSerialization(original, VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, Version.V_7_0_0_alpha1));
+ assertSerialization(original, VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, Version.V_6_4_0));
}
}
diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponseTests.java
index c3ff4511815..903accac6ab 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponseTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponseTests.java
@@ -19,6 +19,8 @@
package org.elasticsearch.action.admin.indices.rollover;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+
import org.elasticsearch.Version;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
@@ -132,6 +134,6 @@ public class RolloverResponseTests extends AbstractStreamableXContentTestCase 0, docs.advanceExact(doc));
diff --git a/server/src/test/java/org/elasticsearch/index/query/MatchPhrasePrefixQueryBuilderTests.java b/server/src/test/java/org/elasticsearch/index/query/MatchPhrasePrefixQueryBuilderTests.java
index 8b706e552bc..e5da5d7f971 100644
--- a/server/src/test/java/org/elasticsearch/index/query/MatchPhrasePrefixQueryBuilderTests.java
+++ b/server/src/test/java/org/elasticsearch/index/query/MatchPhrasePrefixQueryBuilderTests.java
@@ -119,6 +119,7 @@ public class MatchPhrasePrefixQueryBuilderTests extends AbstractQueryTestCase 0);
MatchPhrasePrefixQueryBuilder matchQuery = new MatchPhrasePrefixQueryBuilder(DATE_FIELD_NAME, "three term phrase");
matchQuery.analyzer("whitespace");
expectThrows(IllegalArgumentException.class, () -> matchQuery.doToQuery(createShardContext()));
diff --git a/server/src/test/java/org/elasticsearch/index/store/ByteSizeCachingDirectoryTests.java b/server/src/test/java/org/elasticsearch/index/store/ByteSizeCachingDirectoryTests.java
new file mode 100644
index 00000000000..25d783d2531
--- /dev/null
+++ b/server/src/test/java/org/elasticsearch/index/store/ByteSizeCachingDirectoryTests.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.store;
+
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.store.FilterDirectory;
+import org.apache.lucene.store.IOContext;
+import org.apache.lucene.store.IndexOutput;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+
+public class ByteSizeCachingDirectoryTests extends ESTestCase {
+
+ private static class LengthCountingDirectory extends FilterDirectory {
+
+ int numFileLengthCalls;
+
+ LengthCountingDirectory(Directory in) {
+ super(in);
+ }
+
+ @Override
+ public long fileLength(String name) throws IOException {
+ numFileLengthCalls++;
+ return super.fileLength(name);
+ }
+ }
+
+ public void testBasics() throws IOException {
+ try (Directory dir = newDirectory()) {
+ try (IndexOutput out = dir.createOutput("quux", IOContext.DEFAULT)) {
+ out.writeBytes(new byte[11], 11);
+ }
+ LengthCountingDirectory countingDir = new LengthCountingDirectory(dir);
+
+ ByteSizeCachingDirectory cachingDir = new ByteSizeCachingDirectory(countingDir, new TimeValue(0));
+ assertEquals(11, cachingDir.estimateSizeInBytes());
+ assertEquals(11, cachingDir.estimateSizeInBytes());
+ assertEquals(1, countingDir.numFileLengthCalls);
+
+ try (IndexOutput out = cachingDir.createOutput("foo", IOContext.DEFAULT)) {
+ out.writeBytes(new byte[5], 5);
+
+ cachingDir.estimateSizeInBytes();
+ // +2 because there are 3 files
+ assertEquals(3, countingDir.numFileLengthCalls);
+ // An index output is open so no caching
+ cachingDir.estimateSizeInBytes();
+ assertEquals(5, countingDir.numFileLengthCalls);
+ }
+
+ assertEquals(16, cachingDir.estimateSizeInBytes());
+ assertEquals(7, countingDir.numFileLengthCalls);
+ assertEquals(16, cachingDir.estimateSizeInBytes());
+ assertEquals(7, countingDir.numFileLengthCalls);
+
+ try (IndexOutput out = cachingDir.createTempOutput("bar", "baz", IOContext.DEFAULT)) {
+ out.writeBytes(new byte[4], 4);
+
+ cachingDir.estimateSizeInBytes();
+ assertEquals(10, countingDir.numFileLengthCalls);
+ // An index output is open so no caching
+ cachingDir.estimateSizeInBytes();
+ assertEquals(13, countingDir.numFileLengthCalls);
+ }
+
+ assertEquals(20, cachingDir.estimateSizeInBytes());
+ // +3 because there are 3 files
+ assertEquals(16, countingDir.numFileLengthCalls);
+ assertEquals(20, cachingDir.estimateSizeInBytes());
+ assertEquals(16, countingDir.numFileLengthCalls);
+
+ cachingDir.deleteFile("foo");
+
+ assertEquals(15, cachingDir.estimateSizeInBytes());
+ // +2 because there are 2 files now
+ assertEquals(18, countingDir.numFileLengthCalls);
+ assertEquals(15, cachingDir.estimateSizeInBytes());
+ assertEquals(18, countingDir.numFileLengthCalls);
+ }
+ }
+
+}
diff --git a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java
index 94bd8e80898..a543e87adcb 100644
--- a/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java
+++ b/server/src/test/java/org/elasticsearch/indices/flush/FlushIT.java
@@ -46,16 +46,13 @@ import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;
-import org.elasticsearch.test.junit.annotations.TestLogging;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
-import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -103,7 +100,7 @@ public class FlushIT extends ESIntegTestCase {
}
}
- public void testSyncedFlush() throws ExecutionException, InterruptedException, IOException {
+ public void testSyncedFlush() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(2);
prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)).get();
ensureGreen();
@@ -246,16 +243,6 @@ public class FlushIT extends ESIntegTestCase {
assertThat(indexResult.getFailure(), nullValue());
}
- private String syncedFlushDescription(ShardsSyncedFlushResult result) {
- String detail = result.shardResponses().entrySet().stream()
- .map(e -> "Shard [" + e.getKey() + "], result [" + e.getValue() + "]")
- .collect(Collectors.joining(","));
- return String.format(Locale.ROOT, "Total shards: [%d], failed: [%s], reason: [%s], detail: [%s]",
- result.totalShards(), result.failed(), result.failureReason(), detail);
- }
-
- @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29392")
- @TestLogging("_root:DEBUG,org.elasticsearch.indices.flush:TRACE")
public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
@@ -281,7 +268,6 @@ public class FlushIT extends ESIntegTestCase {
indexDoc(IndexShardTestCase.getEngine(outOfSyncReplica), "extra_" + i);
}
final ShardsSyncedFlushResult partialResult = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
- logger.info("Partial seal: {}", syncedFlushDescription(partialResult));
assertThat(partialResult.totalShards(), equalTo(numberOfReplicas + 1));
assertThat(partialResult.successfulShards(), equalTo(numberOfReplicas));
assertThat(partialResult.shardResponses().get(outOfSyncReplica.routingEntry()).failureReason, equalTo(
@@ -297,8 +283,6 @@ public class FlushIT extends ESIntegTestCase {
assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1));
}
- @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29392")
- @TestLogging("_root:DEBUG,org.elasticsearch.indices.flush:TRACE")
public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
@@ -315,11 +299,9 @@ public class FlushIT extends ESIntegTestCase {
index("test", "doc", Integer.toString(i));
}
final ShardsSyncedFlushResult firstSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
- logger.info("First seal: {}", syncedFlushDescription(firstSeal));
assertThat(firstSeal.successfulShards(), equalTo(numberOfReplicas + 1));
// Do not renew synced-flush
final ShardsSyncedFlushResult secondSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
- logger.info("Second seal: {}", syncedFlushDescription(secondSeal));
assertThat(secondSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(secondSeal.syncId(), equalTo(firstSeal.syncId()));
// Shards were updated, renew synced flush.
@@ -328,7 +310,6 @@ public class FlushIT extends ESIntegTestCase {
index("test", "doc", Integer.toString(i));
}
final ShardsSyncedFlushResult thirdSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
- logger.info("Third seal: {}", syncedFlushDescription(thirdSeal));
assertThat(thirdSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(thirdSeal.syncId(), not(equalTo(firstSeal.syncId())));
// Manually remove or change sync-id, renew synced flush.
@@ -344,7 +325,6 @@ public class FlushIT extends ESIntegTestCase {
assertThat(shard.commitStats().syncId(), nullValue());
}
final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
- logger.info("Forth seal: {}", syncedFlushDescription(forthSeal));
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
}
diff --git a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java
index 987f69b6587..8a8d57295a5 100644
--- a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java
+++ b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java
@@ -29,6 +29,9 @@ import org.elasticsearch.test.InternalTestCluster;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.elasticsearch.test.ESTestCase.assertBusy;
/** Utils for SyncedFlush */
public class SyncedFlushUtil {
@@ -40,21 +43,31 @@ public class SyncedFlushUtil {
/**
* Blocking version of {@link SyncedFlushService#attemptSyncedFlush(ShardId, ActionListener)}
*/
- public static ShardsSyncedFlushResult attemptSyncedFlush(Logger logger, InternalTestCluster cluster, ShardId shardId) {
+ public static ShardsSyncedFlushResult attemptSyncedFlush(Logger logger, InternalTestCluster cluster, ShardId shardId) throws Exception {
+ /*
+ * When the last indexing operation is completed, we will fire a global checkpoint sync.
+ * Since a global checkpoint sync request is a replication request, it will acquire an index
+ * shard permit on the primary when executing. If this happens at the same time while we are
+ * issuing the synced-flush, the synced-flush request will fail as it thinks there are
+ * in-flight operations. We can avoid such situation by continuing issuing another synced-flush
+ * if the synced-flush failed due to the ongoing operations on the primary.
+ */
SyncedFlushService service = cluster.getInstance(SyncedFlushService.class);
- logger.debug("Issue synced-flush on node [{}], shard [{}], cluster state [{}]",
- service.nodeName(), shardId, cluster.clusterService(service.nodeName()).state());
- LatchedListener listener = new LatchedListener<>();
- service.attemptSyncedFlush(shardId, listener);
- try {
+ AtomicReference> listenerHolder = new AtomicReference<>();
+ assertBusy(() -> {
+ LatchedListener listener = new LatchedListener<>();
+ listenerHolder.set(listener);
+ service.attemptSyncedFlush(shardId, listener);
listener.latch.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ if (listener.result != null && listener.result.failureReason() != null
+ && listener.result.failureReason().contains("ongoing operations on primary")) {
+ throw new AssertionError(listener.result.failureReason()); // cause the assert busy to retry
+ }
+ });
+ if (listenerHolder.get().error != null) {
+ throw ExceptionsHelper.convertToElastic(listenerHolder.get().error);
}
- if (listener.error != null) {
- throw ExceptionsHelper.convertToElastic(listener.error);
- }
- return listener.result;
+ return listenerHolder.get().result;
}
public static final class LatchedListener implements ActionListener {
diff --git a/server/src/test/java/org/elasticsearch/search/DocValueFormatTests.java b/server/src/test/java/org/elasticsearch/search/DocValueFormatTests.java
index e5cfbf98b3d..01906279474 100644
--- a/server/src/test/java/org/elasticsearch/search/DocValueFormatTests.java
+++ b/server/src/test/java/org/elasticsearch/search/DocValueFormatTests.java
@@ -44,6 +44,7 @@ public class DocValueFormatTests extends ESTestCase {
entries.add(new Entry(DocValueFormat.class, DocValueFormat.GEOHASH.getWriteableName(), in -> DocValueFormat.GEOHASH));
entries.add(new Entry(DocValueFormat.class, DocValueFormat.IP.getWriteableName(), in -> DocValueFormat.IP));
entries.add(new Entry(DocValueFormat.class, DocValueFormat.RAW.getWriteableName(), in -> DocValueFormat.RAW));
+ entries.add(new Entry(DocValueFormat.class, DocValueFormat.BINARY.getWriteableName(), in -> DocValueFormat.BINARY));
NamedWriteableRegistry registry = new NamedWriteableRegistry(entries);
BytesStreamOutput out = new BytesStreamOutput();
@@ -82,6 +83,11 @@ public class DocValueFormatTests extends ESTestCase {
out.writeNamedWriteable(DocValueFormat.RAW);
in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), registry);
assertSame(DocValueFormat.RAW, in.readNamedWriteable(DocValueFormat.class));
+
+ out = new BytesStreamOutput();
+ out.writeNamedWriteable(DocValueFormat.BINARY);
+ in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), registry);
+ assertSame(DocValueFormat.BINARY, in.readNamedWriteable(DocValueFormat.class));
}
public void testRawFormat() {
@@ -96,6 +102,14 @@ public class DocValueFormatTests extends ESTestCase {
assertEquals("abc", DocValueFormat.RAW.format(new BytesRef("abc")));
}
+ public void testBinaryFormat() {
+ assertEquals("", DocValueFormat.BINARY.format(new BytesRef()));
+ assertEquals("KmQ", DocValueFormat.BINARY.format(new BytesRef(new byte[] {42, 100})));
+
+ assertEquals(new BytesRef(), DocValueFormat.BINARY.parseBytesRef(""));
+ assertEquals(new BytesRef(new byte[] {42, 100}), DocValueFormat.BINARY.parseBytesRef("KmQ"));
+ }
+
public void testBooleanFormat() {
assertEquals(false, DocValueFormat.BOOLEAN.format(0));
assertEquals(true, DocValueFormat.BOOLEAN.format(1));
diff --git a/server/src/test/java/org/elasticsearch/search/MultiValueModeTests.java b/server/src/test/java/org/elasticsearch/search/MultiValueModeTests.java
index d9eb4501326..b64f6ee0ee3 100644
--- a/server/src/test/java/org/elasticsearch/search/MultiValueModeTests.java
+++ b/server/src/test/java/org/elasticsearch/search/MultiValueModeTests.java
@@ -151,54 +151,55 @@ public class MultiValueModeTests extends ESTestCase {
}
private void verifySortedNumeric(Supplier supplier, int maxDoc) throws IOException {
- for (long missingValue : new long[] { 0, randomLong() }) {
- for (MultiValueMode mode : MultiValueMode.values()) {
- SortedNumericDocValues values = supplier.get();
- final NumericDocValues selected = mode.select(values, missingValue);
- for (int i = 0; i < maxDoc; ++i) {
- assertTrue(selected.advanceExact(i));
- final long actual = selected.longValue();
+ for (MultiValueMode mode : MultiValueMode.values()) {
+ SortedNumericDocValues values = supplier.get();
+ final NumericDocValues selected = mode.select(values);
+ for (int i = 0; i < maxDoc; ++i) {
+ Long actual = null;
+ if (selected.advanceExact(i)) {
+ actual = selected.longValue();
verifyLongValueCanCalledMoreThanOnce(selected, actual);
+ }
- long expected = 0;
- if (values.advanceExact(i) == false) {
- expected = missingValue;
+
+ Long expected = null;
+ if (values.advanceExact(i)) {
+ int numValues = values.docValueCount();
+ if (mode == MultiValueMode.MAX) {
+ expected = Long.MIN_VALUE;
+ } else if (mode == MultiValueMode.MIN) {
+ expected = Long.MAX_VALUE;
} else {
- int numValues = values.docValueCount();
- if (mode == MultiValueMode.MAX) {
- expected = Long.MIN_VALUE;
+ expected = 0L;
+ }
+ for (int j = 0; j < numValues; ++j) {
+ if (mode == MultiValueMode.SUM || mode == MultiValueMode.AVG) {
+ expected += values.nextValue();
} else if (mode == MultiValueMode.MIN) {
- expected = Long.MAX_VALUE;
- }
- for (int j = 0; j < numValues; ++j) {
- if (mode == MultiValueMode.SUM || mode == MultiValueMode.AVG) {
- expected += values.nextValue();
- } else if (mode == MultiValueMode.MIN) {
- expected = Math.min(expected, values.nextValue());
- } else if (mode == MultiValueMode.MAX) {
- expected = Math.max(expected, values.nextValue());
- }
- }
- if (mode == MultiValueMode.AVG) {
- expected = numValues > 1 ? Math.round((double)expected/(double)numValues) : expected;
- } else if (mode == MultiValueMode.MEDIAN) {
- int value = numValues/2;
- if (numValues % 2 == 0) {
- for (int j = 0; j < value - 1; ++j) {
- values.nextValue();
- }
- expected = Math.round(((double) values.nextValue() + values.nextValue())/2.0);
- } else {
- for (int j = 0; j < value; ++j) {
- values.nextValue();
- }
- expected = values.nextValue();
- }
+ expected = Math.min(expected, values.nextValue());
+ } else if (mode == MultiValueMode.MAX) {
+ expected = Math.max(expected, values.nextValue());
+ }
+ }
+ if (mode == MultiValueMode.AVG) {
+ expected = numValues > 1 ? Math.round((double)expected/(double)numValues) : expected;
+ } else if (mode == MultiValueMode.MEDIAN) {
+ int value = numValues/2;
+ if (numValues % 2 == 0) {
+ for (int j = 0; j < value - 1; ++j) {
+ values.nextValue();
+ }
+ expected = Math.round(((double) values.nextValue() + values.nextValue())/2.0);
+ } else {
+ for (int j = 0; j < value; ++j) {
+ values.nextValue();
+ }
+ expected = values.nextValue();
}
}
-
- assertEquals(mode.toString() + " docId=" + i, expected, actual);
}
+
+ assertEquals(mode.toString() + " docId=" + i, expected, actual);
}
}
}
@@ -326,54 +327,54 @@ public class MultiValueModeTests extends ESTestCase {
}
private void verifySortedNumericDouble(Supplier supplier, int maxDoc) throws IOException {
- for (long missingValue : new long[] { 0, randomLong() }) {
- for (MultiValueMode mode : MultiValueMode.values()) {
- SortedNumericDoubleValues values = supplier.get();
- final NumericDoubleValues selected = mode.select(values, missingValue);
- for (int i = 0; i < maxDoc; ++i) {
- assertTrue(selected.advanceExact(i));
- final double actual = selected.doubleValue();
+ for (MultiValueMode mode : MultiValueMode.values()) {
+ SortedNumericDoubleValues values = supplier.get();
+ final NumericDoubleValues selected = mode.select(values);
+ for (int i = 0; i < maxDoc; ++i) {
+ Double actual = null;
+ if (selected.advanceExact(i)) {
+ actual = selected.doubleValue();
verifyDoubleValueCanCalledMoreThanOnce(selected, actual);
+ }
- double expected = 0.0;
- if (values.advanceExact(i) == false) {
- expected = missingValue;
+ Double expected = null;
+ if (values.advanceExact(i)) {
+ int numValues = values.docValueCount();
+ if (mode == MultiValueMode.MAX) {
+ expected = Double.NEGATIVE_INFINITY;
+ } else if (mode == MultiValueMode.MIN) {
+ expected = Double.POSITIVE_INFINITY;
} else {
- int numValues = values.docValueCount();
- if (mode == MultiValueMode.MAX) {
- expected = Long.MIN_VALUE;
+ expected = 0d;
+ }
+ for (int j = 0; j < numValues; ++j) {
+ if (mode == MultiValueMode.SUM || mode == MultiValueMode.AVG) {
+ expected += values.nextValue();
} else if (mode == MultiValueMode.MIN) {
- expected = Long.MAX_VALUE;
- }
- for (int j = 0; j < numValues; ++j) {
- if (mode == MultiValueMode.SUM || mode == MultiValueMode.AVG) {
- expected += values.nextValue();
- } else if (mode == MultiValueMode.MIN) {
- expected = Math.min(expected, values.nextValue());
- } else if (mode == MultiValueMode.MAX) {
- expected = Math.max(expected, values.nextValue());
- }
- }
- if (mode == MultiValueMode.AVG) {
- expected = expected/numValues;
- } else if (mode == MultiValueMode.MEDIAN) {
- int value = numValues/2;
- if (numValues % 2 == 0) {
- for (int j = 0; j < value - 1; ++j) {
- values.nextValue();
- }
- expected = (values.nextValue() + values.nextValue())/2.0;
- } else {
- for (int j = 0; j < value; ++j) {
- values.nextValue();
- }
- expected = values.nextValue();
- }
+ expected = Math.min(expected, values.nextValue());
+ } else if (mode == MultiValueMode.MAX) {
+ expected = Math.max(expected, values.nextValue());
+ }
+ }
+ if (mode == MultiValueMode.AVG) {
+ expected = expected/numValues;
+ } else if (mode == MultiValueMode.MEDIAN) {
+ int value = numValues/2;
+ if (numValues % 2 == 0) {
+ for (int j = 0; j < value - 1; ++j) {
+ values.nextValue();
+ }
+ expected = (values.nextValue() + values.nextValue())/2.0;
+ } else {
+ for (int j = 0; j < value; ++j) {
+ values.nextValue();
+ }
+ expected = values.nextValue();
}
}
-
- assertEquals(mode.toString() + " docId=" + i, expected, actual, 0.1);
}
+
+ assertEquals(mode.toString() + " docId=" + i, expected, actual);
}
}
}
diff --git a/server/src/test/java/org/elasticsearch/search/fields/SearchFieldsIT.java b/server/src/test/java/org/elasticsearch/search/fields/SearchFieldsIT.java
index a8a2669ef9b..ab5387b6e3f 100644
--- a/server/src/test/java/org/elasticsearch/search/fields/SearchFieldsIT.java
+++ b/server/src/test/java/org/elasticsearch/search/fields/SearchFieldsIT.java
@@ -19,6 +19,7 @@
package org.elasticsearch.search.fields;
+import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
@@ -700,7 +701,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
assertThat(fields.get("test_field").getValue(), equalTo("foobar"));
}
- public void testFieldsPulledFromFieldData() throws Exception {
+ public void testDocValueFields() throws Exception {
createIndex("test");
String mapping = Strings
@@ -744,6 +745,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
.endObject()
.startObject("binary_field")
.field("type", "binary")
+ .field("doc_values", true) // off by default on binary fields
.endObject()
.startObject("ip_field")
.field("type", "ip")
@@ -766,6 +768,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
.field("double_field", 6.0d)
.field("date_field", Joda.forPattern("dateOptionalTime").printer().print(date))
.field("boolean_field", true)
+ .field("binary_field", new byte[] {42, 100})
.field("ip_field", "::1")
.endObject()).execute().actionGet();
@@ -782,6 +785,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
.addDocValueField("double_field")
.addDocValueField("date_field")
.addDocValueField("boolean_field")
+ .addDocValueField("binary_field")
.addDocValueField("ip_field");
SearchResponse searchResponse = builder.execute().actionGet();
@@ -790,7 +794,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
Set fields = new HashSet<>(searchResponse.getHits().getAt(0).getFields().keySet());
assertThat(fields, equalTo(newHashSet("byte_field", "short_field", "integer_field", "long_field",
"float_field", "double_field", "date_field", "boolean_field", "text_field", "keyword_field",
- "ip_field")));
+ "binary_field", "ip_field")));
assertThat(searchResponse.getHits().getAt(0).getFields().get("byte_field").getValue().toString(), equalTo("1"));
assertThat(searchResponse.getHits().getAt(0).getFields().get("short_field").getValue().toString(), equalTo("2"));
@@ -802,6 +806,8 @@ public class SearchFieldsIT extends ESIntegTestCase {
assertThat(searchResponse.getHits().getAt(0).getFields().get("boolean_field").getValue(), equalTo((Object) true));
assertThat(searchResponse.getHits().getAt(0).getFields().get("text_field").getValue(), equalTo("foo"));
assertThat(searchResponse.getHits().getAt(0).getFields().get("keyword_field").getValue(), equalTo("foo"));
+ assertThat(searchResponse.getHits().getAt(0).getFields().get("binary_field").getValue(),
+ equalTo(new BytesRef(new byte[] {42, 100})));
assertThat(searchResponse.getHits().getAt(0).getFields().get("ip_field").getValue(), equalTo("::1"));
builder = client().prepareSearch().setQuery(matchAllQuery())
@@ -815,6 +821,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
.addDocValueField("double_field", "use_field_mapping")
.addDocValueField("date_field", "use_field_mapping")
.addDocValueField("boolean_field", "use_field_mapping")
+ .addDocValueField("binary_field", "use_field_mapping")
.addDocValueField("ip_field", "use_field_mapping");
searchResponse = builder.execute().actionGet();
@@ -823,7 +830,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
fields = new HashSet<>(searchResponse.getHits().getAt(0).getFields().keySet());
assertThat(fields, equalTo(newHashSet("byte_field", "short_field", "integer_field", "long_field",
"float_field", "double_field", "date_field", "boolean_field", "text_field", "keyword_field",
- "ip_field")));
+ "binary_field", "ip_field")));
assertThat(searchResponse.getHits().getAt(0).getFields().get("byte_field").getValue().toString(), equalTo("1"));
assertThat(searchResponse.getHits().getAt(0).getFields().get("short_field").getValue().toString(), equalTo("2"));
@@ -836,6 +843,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
assertThat(searchResponse.getHits().getAt(0).getFields().get("boolean_field").getValue(), equalTo((Object) true));
assertThat(searchResponse.getHits().getAt(0).getFields().get("text_field").getValue(), equalTo("foo"));
assertThat(searchResponse.getHits().getAt(0).getFields().get("keyword_field").getValue(), equalTo("foo"));
+ assertThat(searchResponse.getHits().getAt(0).getFields().get("binary_field").getValue(), equalTo("KmQ"));
assertThat(searchResponse.getHits().getAt(0).getFields().get("ip_field").getValue(), equalTo("::1"));
builder = client().prepareSearch().setQuery(matchAllQuery())
diff --git a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
index 45110ee6a2d..23c56688e00 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
@@ -94,7 +94,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
}
Thread.sleep(100);
}
- fail("Timeout!!!");
+ fail("Timeout waiting for node [" + node + "] to be blocked");
}
public SnapshotInfo waitForCompletion(String repository, String snapshotName, TimeValue timeout) throws InterruptedException {
diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java
index 65926234d45..70be72989cf 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java
@@ -224,52 +224,16 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase {
IOException writeBlobException = expectThrows(IOException.class, () -> {
BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
@Override
- public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
- throw new IOException("Exception thrown in writeBlob() for " + blobName);
+ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize) throws IOException {
+ throw new IOException("Exception thrown in writeBlobAtomic() for " + blobName);
}
};
checksumFormat.writeAtomic(blobObj, wrapper, name);
});
- assertEquals("Exception thrown in writeBlob() for pending-" + name, writeBlobException.getMessage());
+ assertEquals("Exception thrown in writeBlobAtomic() for " + name, writeBlobException.getMessage());
assertEquals(0, writeBlobException.getSuppressed().length);
}
- {
- IOException moveException = expectThrows(IOException.class, () -> {
- BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
- @Override
- public void move(String sourceBlobName, String targetBlobName) throws IOException {
- throw new IOException("Exception thrown in move() for " + sourceBlobName);
- }
- };
- checksumFormat.writeAtomic(blobObj, wrapper, name);
- });
- assertEquals("Exception thrown in move() for pending-" + name, moveException.getMessage());
- assertEquals(0, moveException.getSuppressed().length);
- }
- {
- IOException moveThenDeleteException = expectThrows(IOException.class, () -> {
- BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
- @Override
- public void move(String sourceBlobName, String targetBlobName) throws IOException {
- throw new IOException("Exception thrown in move() for " + sourceBlobName);
- }
-
- @Override
- public void deleteBlob(String blobName) throws IOException {
- throw new IOException("Exception thrown in deleteBlob() for " + blobName);
- }
- };
- checksumFormat.writeAtomic(blobObj, wrapper, name);
- });
-
- assertEquals("Exception thrown in move() for pending-" + name, moveThenDeleteException.getMessage());
- assertEquals(1, moveThenDeleteException.getSuppressed().length);
-
- final Throwable suppressedThrowable = moveThenDeleteException.getSuppressed()[0];
- assertTrue(suppressedThrowable instanceof IOException);
- assertEquals("Exception thrown in deleteBlob() for pending-" + name, suppressedThrowable.getMessage());
- }
}
protected BlobStore createTestBlobStore() throws IOException {
diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java
index 56a4a279cab..089955d140f 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java
@@ -53,11 +53,21 @@ public class BlobContainerWrapper implements BlobContainer {
delegate.writeBlob(blobName, inputStream, blobSize);
}
+ @Override
+ public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
+ delegate.writeBlobAtomic(blobName, inputStream, blobSize);
+ }
+
@Override
public void deleteBlob(String blobName) throws IOException {
delegate.deleteBlob(blobName);
}
+ @Override
+ public void deleteBlobIgnoringIfNotExists(final String blobName) throws IOException {
+ delegate.deleteBlobIgnoringIfNotExists(blobName);
+ }
+
@Override
public Map listBlobs() throws IOException {
return delegate.listBlobs();
diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
index 3a5b068cd89..5fa884adbfe 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
@@ -19,6 +19,28 @@
package org.elasticsearch.snapshots.mockstore;
+import com.carrotsearch.randomizedtesting.RandomizedContext;
+import org.apache.lucene.index.CorruptIndexException;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.metadata.RepositoryMetaData;
+import org.elasticsearch.common.blobstore.BlobContainer;
+import org.elasticsearch.common.blobstore.BlobMetaData;
+import org.elasticsearch.common.blobstore.BlobPath;
+import org.elasticsearch.common.blobstore.BlobStore;
+import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
+import org.elasticsearch.common.io.PathUtils;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Setting.Property;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.plugins.RepositoryPlugin;
+import org.elasticsearch.repositories.IndexId;
+import org.elasticsearch.repositories.Repository;
+import org.elasticsearch.repositories.fs.FsRepository;
+import org.elasticsearch.snapshots.SnapshotId;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
@@ -29,31 +51,11 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
-import com.carrotsearch.randomizedtesting.RandomizedContext;
-import org.apache.lucene.index.CorruptIndexException;
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.cluster.metadata.MetaData;
-import org.elasticsearch.cluster.metadata.RepositoryMetaData;
-import org.elasticsearch.common.blobstore.BlobContainer;
-import org.elasticsearch.common.blobstore.BlobMetaData;
-import org.elasticsearch.common.blobstore.BlobPath;
-import org.elasticsearch.common.blobstore.BlobStore;
-import org.elasticsearch.common.io.PathUtils;
-import org.elasticsearch.common.settings.Setting;
-import org.elasticsearch.common.settings.Setting.Property;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.NamedXContentRegistry;
-import org.elasticsearch.env.Environment;
-import org.elasticsearch.plugins.RepositoryPlugin;
-import org.elasticsearch.repositories.Repository;
-import org.elasticsearch.repositories.IndexId;
-import org.elasticsearch.repositories.fs.FsRepository;
-import org.elasticsearch.snapshots.SnapshotId;
-
public class MockRepository extends FsRepository {
public static class Plugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin {
@@ -325,6 +327,12 @@ public class MockRepository extends FsRepository {
super.deleteBlob(blobName);
}
+ @Override
+ public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException {
+ maybeIOExceptionOrBlock(blobName);
+ super.deleteBlobIgnoringIfNotExists(blobName);
+ }
+
@Override
public Map listBlobs() throws IOException {
maybeIOExceptionOrBlock("");
@@ -365,6 +373,31 @@ public class MockRepository extends FsRepository {
maybeIOExceptionOrBlock(blobName);
}
}
+
+ @Override
+ public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
+ final Random random = RandomizedContext.current().getRandom();
+ if (random.nextBoolean()) {
+ if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) {
+ // Simulate a failure between the write and move operation in FsBlobContainer
+ final String tempBlobName = FsBlobContainer.tempBlobName(blobName);
+ super.writeBlob(tempBlobName, inputStream, blobSize);
+ maybeIOExceptionOrBlock(blobName);
+ final FsBlobContainer fsBlobContainer = (FsBlobContainer) delegate();
+ fsBlobContainer.move(tempBlobName, blobName);
+ } else {
+ // Atomic write since it is potentially supported
+ // by the delegating blob container
+ maybeIOExceptionOrBlock(blobName);
+ super.writeBlobAtomic(blobName, inputStream, blobSize);
+ }
+ } else {
+ // Simulate a non-atomic write since many blob container
+ // implementations does not support atomic write
+ maybeIOExceptionOrBlock(blobName);
+ super.writeBlob(blobName, inputStream, blobSize);
+ }
+ }
}
}
}
diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java
index 743be6d1bcb..df2024de445 100644
--- a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java
@@ -158,7 +158,11 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
protected void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray) throws IOException {
try (InputStream stream = bytesArray.streamInput()) {
- container.writeBlob(blobName, stream, bytesArray.length());
+ if (randomBoolean()) {
+ container.writeBlob(blobName, stream, bytesArray.length());
+ } else {
+ container.writeBlobAtomic(blobName, stream, bytesArray.length());
+ }
}
}
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java
index 99e6a10ad92..40c694cedb7 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java
@@ -411,7 +411,8 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste
// auto-generate license if no licenses ever existed or if the current license is basic and
// needs extended or if the license signature needs to be updated. this will trigger a subsequent cluster changed event
if (currentClusterState.getNodes().isLocalNodeElectedMaster() &&
- (noLicense || LicenseUtils.licenseNeedsExtended(currentLicense) || LicenseUtils.signatureNeedsUpdate(currentLicense))) {
+ (noLicense || LicenseUtils.licenseNeedsExtended(currentLicense) ||
+ LicenseUtils.signatureNeedsUpdate(currentLicense, currentClusterState.nodes()))) {
registerOrUpdateSelfGeneratedLicense();
}
} else if (logger.isDebugEnabled()) {
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseUtils.java
index 8fcdc05bcf9..4c8a558682b 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseUtils.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseUtils.java
@@ -6,8 +6,12 @@
package org.elasticsearch.license;
import org.elasticsearch.ElasticsearchSecurityException;
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.rest.RestStatus;
+import java.util.stream.StreamSupport;
+
public class LicenseUtils {
public static final String EXPIRED_FEATURE_METADATA = "es.license.expired.feature";
@@ -42,8 +46,25 @@ public class LicenseUtils {
* Checks if the signature of a self generated license with older version needs to be
* recreated with the new key
*/
- public static boolean signatureNeedsUpdate(License license) {
+ public static boolean signatureNeedsUpdate(License license, DiscoveryNodes currentNodes) {
+ assert License.VERSION_CRYPTO_ALGORITHMS == License.VERSION_CURRENT : "update this method when adding a new version";
+
return ("basic".equals(license.type()) || "trial".equals(license.type())) &&
- (license.version() < License.VERSION_CRYPTO_ALGORITHMS);
+ // only upgrade signature when all nodes are ready to deserialize the new signature
+ (license.version() < License.VERSION_CRYPTO_ALGORITHMS &&
+ compatibleLicenseVersion(currentNodes) == License.VERSION_CRYPTO_ALGORITHMS
+ );
+ }
+
+ public static int compatibleLicenseVersion(DiscoveryNodes currentNodes) {
+ assert License.VERSION_CRYPTO_ALGORITHMS == License.VERSION_CURRENT : "update this method when adding a new version";
+
+ if (StreamSupport.stream(currentNodes.spliterator(), false)
+ .allMatch(node -> node.getVersion().onOrAfter(Version.V_6_4_0))) {
+ // License.VERSION_CRYPTO_ALGORITHMS was introduced in 6.4.0
+ return License.VERSION_CRYPTO_ALGORITHMS;
+ } else {
+ return License.VERSION_START_DATE;
+ }
}
}
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/SelfGeneratedLicense.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/SelfGeneratedLicense.java
index 0bc49d517cd..fb9b167d3db 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/SelfGeneratedLicense.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/SelfGeneratedLicense.java
@@ -5,6 +5,7 @@
*/
package org.elasticsearch.license;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -26,8 +27,8 @@ import static org.elasticsearch.license.CryptUtils.decrypt;
class SelfGeneratedLicense {
- public static License create(License.Builder specBuilder) {
- return create(specBuilder, License.VERSION_CURRENT);
+ public static License create(License.Builder specBuilder, DiscoveryNodes currentNodes) {
+ return create(specBuilder, LicenseUtils.compatibleLicenseVersion(currentNodes));
}
public static License create(License.Builder specBuilder, int version) {
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartBasicClusterTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartBasicClusterTask.java
index 0cf949a6990..468f1799a07 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartBasicClusterTask.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartBasicClusterTask.java
@@ -73,7 +73,7 @@ public class StartBasicClusterTask extends ClusterStateUpdateTask {
.issueDate(issueDate)
.type("basic")
.expiryDate(LicenseService.BASIC_SELF_GENERATED_LICENSE_EXPIRATION_MILLIS);
- License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder);
+ License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, currentState.nodes());
if (request.isAcknowledged() == false && currentLicense != null) {
Map ackMessages = LicenseService.getAckMessages(selfGeneratedLicense, currentLicense);
if (ackMessages.isEmpty() == false) {
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartTrialClusterTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartTrialClusterTask.java
index 5c5c03151ba..2bf0555fde1 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartTrialClusterTask.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartTrialClusterTask.java
@@ -82,7 +82,7 @@ public class StartTrialClusterTask extends ClusterStateUpdateTask {
.issueDate(issueDate)
.type(request.getType())
.expiryDate(expiryDate);
- License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder);
+ License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, currentState.nodes());
LicensesMetaData newLicensesMetaData = new LicensesMetaData(selfGeneratedLicense, Version.CURRENT);
mdBuilder.putCustom(LicensesMetaData.TYPE, newLicensesMetaData);
return ClusterState.builder(currentState).metaData(mdBuilder).build();
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartupSelfGeneratedLicenseTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartupSelfGeneratedLicenseTask.java
index 13d6326f3ce..c2d53bd0716 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartupSelfGeneratedLicenseTask.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartupSelfGeneratedLicenseTask.java
@@ -61,7 +61,7 @@ public class StartupSelfGeneratedLicenseTask extends ClusterStateUpdateTask {
"]. Must be trial or basic.");
}
return updateWithLicense(currentState, type);
- } else if (LicenseUtils.signatureNeedsUpdate(currentLicensesMetaData.getLicense())) {
+ } else if (LicenseUtils.signatureNeedsUpdate(currentLicensesMetaData.getLicense(), currentState.nodes())) {
return updateLicenseSignature(currentState, currentLicensesMetaData);
} else if (LicenseUtils.licenseNeedsExtended(currentLicensesMetaData.getLicense())) {
return extendBasic(currentState, currentLicensesMetaData);
@@ -87,7 +87,7 @@ public class StartupSelfGeneratedLicenseTask extends ClusterStateUpdateTask {
.issueDate(issueDate)
.type(type)
.expiryDate(expiryDate);
- License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder);
+ License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, currentState.nodes());
Version trialVersion = currentLicenseMetaData.getMostRecentTrialVersion();
LicensesMetaData newLicenseMetadata = new LicensesMetaData(selfGeneratedLicense, trialVersion);
mdBuilder.putCustom(LicensesMetaData.TYPE, newLicenseMetadata);
@@ -120,7 +120,7 @@ public class StartupSelfGeneratedLicenseTask extends ClusterStateUpdateTask {
.issueDate(currentLicense.issueDate())
.type("basic")
.expiryDate(LicenseService.BASIC_SELF_GENERATED_LICENSE_EXPIRATION_MILLIS);
- License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder);
+ License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, currentLicense.version());
Version trialVersion = currentLicenseMetadata.getMostRecentTrialVersion();
return new LicensesMetaData(selfGeneratedLicense, trialVersion);
}
@@ -141,7 +141,7 @@ public class StartupSelfGeneratedLicenseTask extends ClusterStateUpdateTask {
.issueDate(issueDate)
.type(type)
.expiryDate(expiryDate);
- License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder);
+ License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, currentState.nodes());
LicensesMetaData licensesMetaData;
if ("trial".equals(type)) {
licensesMetaData = new LicensesMetaData(selfGeneratedLicense, Version.CURRENT);
diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseRegistrationTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseRegistrationTests.java
index 2a237f090e2..5405af013af 100644
--- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseRegistrationTests.java
+++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseRegistrationTests.java
@@ -104,7 +104,7 @@ public class LicenseRegistrationTests extends AbstractLicenseServiceTestCase {
.issueDate(dateMath("now-10h", now))
.type("basic")
.expiryDate(dateMath("now-2h", now));
- License license = SelfGeneratedLicense.create(builder);
+ License license = SelfGeneratedLicense.create(builder, License.VERSION_CURRENT);
XPackLicenseState licenseState = new XPackLicenseState(Settings.EMPTY);
setInitialState(license, licenseState, Settings.EMPTY);
@@ -125,4 +125,4 @@ public class LicenseRegistrationTests extends AbstractLicenseServiceTestCase {
assertEquals(LicenseService.BASIC_SELF_GENERATED_LICENSE_EXPIRATION_MILLIS, licenseMetaData.getLicense().expiryDate());
assertEquals(uid, licenseMetaData.getLicense().uid());
}
-}
\ No newline at end of file
+}
diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseSerializationTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseSerializationTests.java
index d7cf5ab50fb..d07be0fd3c7 100644
--- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseSerializationTests.java
+++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseSerializationTests.java
@@ -111,7 +111,7 @@ public class LicenseSerializationTests extends ESTestCase {
.issueDate(now)
.type("basic")
.expiryDate(LicenseService.BASIC_SELF_GENERATED_LICENSE_EXPIRATION_MILLIS);
- License license = SelfGeneratedLicense.create(specBuilder);
+ License license = SelfGeneratedLicense.create(specBuilder, License.VERSION_CURRENT);
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
license.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap(License.REST_VIEW_MODE, "true")));
builder.flush();
diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicensesMetaDataSerializationTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicensesMetaDataSerializationTests.java
index f3ed04ed22d..d7799959f6c 100644
--- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicensesMetaDataSerializationTests.java
+++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicensesMetaDataSerializationTests.java
@@ -95,7 +95,7 @@ public class LicensesMetaDataSerializationTests extends ESTestCase {
.issueDate(issueDate)
.type(randomBoolean() ? "trial" : "basic")
.expiryDate(issueDate + TimeValue.timeValueHours(2).getMillis());
- final License trialLicense = SelfGeneratedLicense.create(specBuilder);
+ final License trialLicense = SelfGeneratedLicense.create(specBuilder, License.VERSION_CURRENT);
LicensesMetaData licensesMetaData = new LicensesMetaData(trialLicense, Version.CURRENT);
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/SelfGeneratedLicenseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/SelfGeneratedLicenseTests.java
index aa27dbdcb49..4e061623ccd 100644
--- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/SelfGeneratedLicenseTests.java
+++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/SelfGeneratedLicenseTests.java
@@ -34,7 +34,7 @@ public class SelfGeneratedLicenseTests extends ESTestCase {
.type(randomBoolean() ? "trial" : "basic")
.issueDate(issueDate)
.expiryDate(issueDate + TimeValue.timeValueHours(2).getMillis());
- License trialLicense = SelfGeneratedLicense.create(specBuilder);
+ License trialLicense = SelfGeneratedLicense.create(specBuilder, License.VERSION_CURRENT);
assertThat(SelfGeneratedLicense.verify(trialLicense), equalTo(true));
}
@@ -47,7 +47,7 @@ public class SelfGeneratedLicenseTests extends ESTestCase {
.maxNodes(5)
.issueDate(issueDate)
.expiryDate(issueDate + TimeValue.timeValueHours(2).getMillis());
- License trialLicense = SelfGeneratedLicense.create(specBuilder);
+ License trialLicense = SelfGeneratedLicense.create(specBuilder, License.VERSION_CURRENT);
final String originalSignature = trialLicense.signature();
License tamperedLicense = License.builder().fromLicenseSpec(trialLicense, originalSignature)
.expiryDate(System.currentTimeMillis() + TimeValue.timeValueHours(5).getMillis())
@@ -70,7 +70,8 @@ public class SelfGeneratedLicenseTests extends ESTestCase {
.issueDate(issueDate)
.expiryDate(issueDate + TimeValue.timeValueHours(2).getMillis());
License pre20TrialLicense = specBuilder.build();
- License license = SelfGeneratedLicense.create(License.builder().fromPre20LicenseSpec(pre20TrialLicense).type("trial"));
+ License license = SelfGeneratedLicense.create(License.builder().fromPre20LicenseSpec(pre20TrialLicense).type("trial"),
+ License.VERSION_CURRENT);
assertThat(SelfGeneratedLicense.verify(license), equalTo(true));
}
diff --git a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java
index 9dcc2e482d0..850efb95da3 100644
--- a/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java
+++ b/x-pack/plugin/rollup/src/main/java/org/elasticsearch/xpack/rollup/action/TransportRollupSearchAction.java
@@ -39,6 +39,7 @@ import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
+import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
@@ -66,6 +67,7 @@ import org.joda.time.DateTimeZone;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
@@ -271,91 +273,38 @@ public class TransportRollupSearchAction extends TransportAction rewritten.add(rewriteQuery(query, jobCaps)));
+ ((DisMaxQueryBuilder) builder).innerQueries().forEach(query -> rewritten.add(rewriteQuery(query, jobCaps)));
return rewritten;
- } else if (builder.getWriteableName().equals(RangeQueryBuilder.NAME) || builder.getWriteableName().equals(TermQueryBuilder.NAME)) {
+ } else if (builder.getWriteableName().equals(RangeQueryBuilder.NAME)) {
+ RangeQueryBuilder range = (RangeQueryBuilder) builder;
+ String fieldName = range.fieldName();
+ // Many range queries don't include the timezone because the default is UTC, but the query
+ // builder will return null so we need to set it here
+ String timeZone = range.timeZone() == null ? DateTimeZone.UTC.toString() : range.timeZone();
- String fieldName = builder.getWriteableName().equals(RangeQueryBuilder.NAME)
- ? ((RangeQueryBuilder)builder).fieldName()
- : ((TermQueryBuilder)builder).fieldName();
-
- List incorrectTimeZones = new ArrayList<>();
- List rewrittenFieldName = jobCaps.stream()
- // We only care about job caps that have the query's target field
- .filter(caps -> caps.getFieldCaps().keySet().contains(fieldName))
- .map(caps -> {
- RollupJobCaps.RollupFieldCaps fieldCaps = caps.getFieldCaps().get(fieldName);
- return fieldCaps.getAggs().stream()
- // For now, we only allow filtering on grouping fields
- .filter(agg -> {
- String type = (String)agg.get(RollupField.AGG);
-
- // If the cap is for a date_histo, and the query is a range, the timezones need to match
- if (type.equals(DateHistogramAggregationBuilder.NAME) && builder instanceof RangeQueryBuilder) {
- String timeZone = ((RangeQueryBuilder)builder).timeZone();
-
- // Many range queries don't include the timezone because the default is UTC, but the query
- // builder will return null so we need to set it here
- if (timeZone == null) {
- timeZone = DateTimeZone.UTC.toString();
- }
- boolean matchingTZ = ((String)agg.get(DateHistoGroupConfig.TIME_ZONE.getPreferredName()))
- .equalsIgnoreCase(timeZone);
- if (matchingTZ == false) {
- incorrectTimeZones.add((String)agg.get(DateHistoGroupConfig.TIME_ZONE.getPreferredName()));
- }
- return matchingTZ;
- }
- // Otherwise just make sure it's one of the three groups
- return type.equals(TermsAggregationBuilder.NAME)
- || type.equals(DateHistogramAggregationBuilder.NAME)
- || type.equals(HistogramAggregationBuilder.NAME);
- })
- // Rewrite the field name to our convention (e.g. "foo" -> "date_histogram.foo.timestamp")
- .map(agg -> {
- if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) {
- return RollupField.formatFieldName(fieldName, (String)agg.get(RollupField.AGG), RollupField.TIMESTAMP);
- } else {
- return RollupField.formatFieldName(fieldName, (String)agg.get(RollupField.AGG), RollupField.VALUE);
- }
- })
- .collect(Collectors.toList());
- })
- .distinct()
- .collect(ArrayList::new, List::addAll, List::addAll);
-
- if (rewrittenFieldName.isEmpty()) {
- if (incorrectTimeZones.isEmpty()) {
- throw new IllegalArgumentException("Field [" + fieldName + "] in [" + builder.getWriteableName()
- + "] query is not available in selected rollup indices, cannot query.");
- } else {
- throw new IllegalArgumentException("Field [" + fieldName + "] in [" + builder.getWriteableName()
- + "] query was found in rollup indices, but requested timezone is not compatible. Options include: "
- + incorrectTimeZones);
- }
+ String rewrittenFieldName = rewriteFieldName(jobCaps, RangeQueryBuilder.NAME, fieldName, timeZone);
+ RangeQueryBuilder rewritten = new RangeQueryBuilder(rewrittenFieldName)
+ .from(range.from())
+ .to(range.to())
+ .includeLower(range.includeLower())
+ .includeUpper(range.includeUpper());
+ if (range.timeZone() != null) {
+ rewritten.timeZone(range.timeZone());
}
-
- if (rewrittenFieldName.size() > 1) {
- throw new IllegalArgumentException("Ambiguous field name resolution when mapping to rolled fields. Field name [" +
- fieldName + "] was mapped to: [" + Strings.collectionToDelimitedString(rewrittenFieldName, ",") + "].");
+ if (range.format() != null) {
+ rewritten.format(range.format());
}
-
- //Note: instanceof here to make casting checks happier
- if (builder instanceof RangeQueryBuilder) {
- RangeQueryBuilder rewritten = new RangeQueryBuilder(rewrittenFieldName.get(0));
- RangeQueryBuilder original = (RangeQueryBuilder)builder;
- rewritten.from(original.from());
- rewritten.to(original.to());
- if (original.timeZone() != null) {
- rewritten.timeZone(original.timeZone());
- }
- rewritten.includeLower(original.includeLower());
- rewritten.includeUpper(original.includeUpper());
- return rewritten;
- } else {
- return new TermQueryBuilder(rewrittenFieldName.get(0), ((TermQueryBuilder)builder).value());
- }
-
+ return rewritten;
+ } else if (builder.getWriteableName().equals(TermQueryBuilder.NAME)) {
+ TermQueryBuilder term = (TermQueryBuilder) builder;
+ String fieldName = term.fieldName();
+ String rewrittenFieldName = rewriteFieldName(jobCaps, TermQueryBuilder.NAME, fieldName, null);
+ return new TermQueryBuilder(rewrittenFieldName, term.value());
+ } else if (builder.getWriteableName().equals(TermsQueryBuilder.NAME)) {
+ TermsQueryBuilder terms = (TermsQueryBuilder) builder;
+ String fieldName = terms.fieldName();
+ String rewrittenFieldName = rewriteFieldName(jobCaps, TermQueryBuilder.NAME, fieldName, null);
+ return new TermsQueryBuilder(rewrittenFieldName, terms.values());
} else if (builder.getWriteableName().equals(MatchAllQueryBuilder.NAME)) {
// no-op
return builder;
@@ -364,6 +313,64 @@ public class TransportRollupSearchAction extends TransportAction jobCaps,
+ String builderName,
+ String fieldName,
+ String timeZone) {
+ List incompatibleTimeZones = timeZone == null ? Collections.emptyList() : new ArrayList<>();
+ List rewrittenFieldNames = jobCaps.stream()
+ // We only care about job caps that have the query's target field
+ .filter(caps -> caps.getFieldCaps().keySet().contains(fieldName))
+ .map(caps -> {
+ RollupJobCaps.RollupFieldCaps fieldCaps = caps.getFieldCaps().get(fieldName);
+ return fieldCaps.getAggs().stream()
+ // For now, we only allow filtering on grouping fields
+ .filter(agg -> {
+ String type = (String)agg.get(RollupField.AGG);
+
+ // If the cap is for a date_histo, and the query is a range, the timezones need to match
+ if (type.equals(DateHistogramAggregationBuilder.NAME) && timeZone != null) {
+ boolean matchingTZ = ((String)agg.get(DateHistoGroupConfig.TIME_ZONE.getPreferredName()))
+ .equalsIgnoreCase(timeZone);
+ if (matchingTZ == false) {
+ incompatibleTimeZones.add((String)agg.get(DateHistoGroupConfig.TIME_ZONE.getPreferredName()));
+ }
+ return matchingTZ;
+ }
+ // Otherwise just make sure it's one of the three groups
+ return type.equals(TermsAggregationBuilder.NAME)
+ || type.equals(DateHistogramAggregationBuilder.NAME)
+ || type.equals(HistogramAggregationBuilder.NAME);
+ })
+ // Rewrite the field name to our convention (e.g. "foo" -> "date_histogram.foo.timestamp")
+ .map(agg -> {
+ if (agg.get(RollupField.AGG).equals(DateHistogramAggregationBuilder.NAME)) {
+ return RollupField.formatFieldName(fieldName, (String)agg.get(RollupField.AGG), RollupField.TIMESTAMP);
+ } else {
+ return RollupField.formatFieldName(fieldName, (String)agg.get(RollupField.AGG), RollupField.VALUE);
+ }
+ })
+ .collect(Collectors.toList());
+ })
+ .distinct()
+ .collect(ArrayList::new, List::addAll, List::addAll);
+ if (rewrittenFieldNames.isEmpty()) {
+ if (incompatibleTimeZones.isEmpty()) {
+ throw new IllegalArgumentException("Field [" + fieldName + "] in [" + builderName
+ + "] query is not available in selected rollup indices, cannot query.");
+ } else {
+ throw new IllegalArgumentException("Field [" + fieldName + "] in [" + builderName
+ + "] query was found in rollup indices, but requested timezone is not compatible. Options include: "
+ + incompatibleTimeZones);
+ }
+ } else if (rewrittenFieldNames.size() > 1) {
+ throw new IllegalArgumentException("Ambiguous field name resolution when mapping to rolled fields. Field name [" +
+ fieldName + "] was mapped to: [" + Strings.collectionToDelimitedString(rewrittenFieldNames, ",") + "].");
+ } else {
+ return rewrittenFieldNames.get(0);
+ }
+ }
+
static RollupSearchContext separateIndices(String[] indices, ImmutableOpenMap indexMetaData) {
if (indices.length == 0) {
diff --git a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java
index d9d3e672a0a..ed21585c7dc 100644
--- a/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java
+++ b/x-pack/plugin/rollup/src/test/java/org/elasticsearch/xpack/rollup/action/SearchActionTests.java
@@ -25,9 +25,11 @@ import org.elasticsearch.index.query.ConstantScoreQueryBuilder;
import org.elasticsearch.index.query.DisMaxQueryBuilder;
import org.elasticsearch.index.query.MatchAllQueryBuilder;
import org.elasticsearch.index.query.MatchPhraseQueryBuilder;
+import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
+import org.elasticsearch.index.query.TermsQueryBuilder;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.script.ScriptService;
@@ -61,6 +63,7 @@ import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -153,7 +156,7 @@ public class SearchActionTests extends ESTestCase {
"compatible. Options include: [UTC]"));
}
- public void testTerms() {
+ public void testTermQuery() {
RollupJobConfig.Builder job = ConfigTestHelpers.getRollupJob("foo");
GroupConfig.Builder group = ConfigTestHelpers.getGroupConfig();
group.setTerms(ConfigTestHelpers.getTerms().setFields(Collections.singletonList("foo")).build());
@@ -166,6 +169,23 @@ public class SearchActionTests extends ESTestCase {
assertThat(((TermQueryBuilder)rewritten).fieldName(), equalTo("foo.terms.value"));
}
+ public void testTermsQuery() {
+ RollupJobConfig.Builder job = ConfigTestHelpers.getRollupJob("foo");
+ GroupConfig.Builder group = ConfigTestHelpers.getGroupConfig();
+ group.setTerms(ConfigTestHelpers.getTerms().setFields(Collections.singletonList("foo")).build());
+ job.setGroupConfig(group.build());
+ RollupJobCaps cap = new RollupJobCaps(job.build());
+ Set caps = new HashSet<>();
+ caps.add(cap);
+ QueryBuilder original = new TermsQueryBuilder("foo", Arrays.asList("bar", "baz"));
+ QueryBuilder rewritten =
+ TransportRollupSearchAction.rewriteQuery(original, caps);
+ assertThat(rewritten, instanceOf(TermsQueryBuilder.class));
+ assertNotSame(rewritten, original);
+ assertThat(((TermsQueryBuilder)rewritten).fieldName(), equalTo("foo.terms.value"));
+ assertThat(((TermsQueryBuilder)rewritten).values(), equalTo(Arrays.asList("bar", "baz")));
+ }
+
public void testCompounds() {
RollupJobConfig.Builder job = ConfigTestHelpers.getRollupJob("foo");
GroupConfig.Builder group = ConfigTestHelpers.getGroupConfig();
diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java
index 6d12b6472f1..d2e35999096 100644
--- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java
+++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java
@@ -7,7 +7,6 @@ package org.elasticsearch.xpack.security;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
-import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
@@ -17,7 +16,6 @@ import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
@@ -112,7 +110,6 @@ import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField;
import org.elasticsearch.xpack.core.security.authc.DefaultAuthenticationFailureHandler;
import org.elasticsearch.xpack.core.security.authc.Realm;
import org.elasticsearch.xpack.core.security.authc.RealmSettings;
-import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
import org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken;
import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
@@ -934,7 +931,8 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
if (enabled) {
return new ValidateTLSOnJoin(XPackSettings.TRANSPORT_SSL_ENABLED.get(settings),
DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings))
- .andThen(new ValidateUpgradedSecurityIndex());
+ .andThen(new ValidateUpgradedSecurityIndex())
+ .andThen(new ValidateLicenseCanBeDeserialized());
}
return null;
}
@@ -971,6 +969,17 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
}
}
+ static final class ValidateLicenseCanBeDeserialized implements BiConsumer {
+ @Override
+ public void accept(DiscoveryNode node, ClusterState state) {
+ License license = LicenseService.getLicense(state.metaData());
+ if (license != null && license.version() >= License.VERSION_CRYPTO_ALGORITHMS && node.getVersion().before(Version.V_6_4_0)) {
+ throw new IllegalStateException("node " + node + " is on version [" + node.getVersion() +
+ "] that cannot deserialize the license format [" + license.version() + "], upgrade node to at least 6.4.0");
+ }
+ }
+ }
+
@Override
public void reloadSPI(ClassLoader loader) {
securityExtensions.addAll(SecurityExtension.loadExtensions(loader));
diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java
index 190c8703955..b1d8d4b67bf 100644
--- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java
+++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java
@@ -28,6 +28,7 @@ import org.elasticsearch.license.TestUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackSettings;
@@ -278,6 +279,19 @@ public class SecurityTests extends ESTestCase {
}
}
+ public void testJoinValidatorForLicenseDeserialization() throws Exception {
+ DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
+ VersionUtils.randomVersionBetween(random(), null, Version.V_6_3_0));
+ MetaData.Builder builder = MetaData.builder();
+ License license = TestUtils.generateSignedLicense(null,
+ randomIntBetween(License.VERSION_CRYPTO_ALGORITHMS, License.VERSION_CURRENT), -1, TimeValue.timeValueHours(24));
+ TestUtils.putLicense(builder, license);
+ ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metaData(builder.build()).build();
+ IllegalStateException e = expectThrows(IllegalStateException.class,
+ () -> new Security.ValidateLicenseCanBeDeserialized().accept(node, state));
+ assertThat(e.getMessage(), containsString("cannot deserialize the license format"));
+ }
+
public void testIndexJoinValidator_Old_And_Rolling() throws Exception {
createComponents(Settings.EMPTY);
BiConsumer joinValidator = security.getJoinValidator();
@@ -345,7 +359,7 @@ public class SecurityTests extends ESTestCase {
.nodes(discoveryNodes).build();
joinValidator.accept(node, clusterState);
}
-
+
public void testGetFieldFilterSecurityEnabled() throws Exception {
createComponents(Settings.EMPTY);
Function> fieldFilter = security.getFieldFilter();
diff --git a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java
index 81c54353a2d..18b10712803 100644
--- a/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java
+++ b/x-pack/qa/ml-native-tests/src/test/java/org/elasticsearch/xpack/ml/integration/ForecastIT.java
@@ -239,8 +239,6 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase {
throw e;
}
- closeJob(job.getId());
-
List forecastStats = getForecastStats();
assertThat(forecastStats.size(), equalTo(1));
ForecastRequestStats forecastRequestStats = forecastStats.get(0);
@@ -248,6 +246,21 @@ public class ForecastIT extends MlNativeAutodetectIntegTestCase {
assertThat(forecastRequestStats.getRecordCount(), equalTo(8000L));
assertThat(forecasts.size(), equalTo(8000));
+
+ // run forecast a 2nd time
+ try {
+ String forecastId = forecast(job.getId(), TimeValue.timeValueHours(1), null);
+
+ waitForecastToFinish(job.getId(), forecastId);
+ } catch (ElasticsearchStatusException e) {
+ if (e.getMessage().contains("disk space")) {
+ throw new ElasticsearchStatusException(
+ "Test likely fails due to insufficient disk space on test machine, please free up space.", e.status(), e);
+ }
+ throw e;
+ }
+
+ closeJob(job.getId());
}
private void createDataWithLotsOfClientIps(TimeValue bucketSpan, Job.Builder job) throws IOException {