extends BlobStoreForm
}
/**
- * Writes blob in atomic manner with resolving the blob name using {@link #blobName} and {@link #tempBlobName} methods.
+ * Writes blob in atomic manner with resolving the blob name using {@link #blobName} method.
*
* The blob will be compressed and checksum will be written if required.
*
@@ -131,20 +130,12 @@ public class ChecksumBlobStoreFormat extends BlobStoreForm
* @param name blob name
*/
public void writeAtomic(T obj, BlobContainer blobContainer, String name) throws IOException {
- String blobName = blobName(name);
- String tempBlobName = tempBlobName(name);
- writeBlob(obj, blobContainer, tempBlobName);
- try {
- blobContainer.move(tempBlobName, blobName);
- } catch (IOException ex) {
- // Move failed - try cleaning up
- try {
- blobContainer.deleteBlob(tempBlobName);
- } catch (Exception e) {
- ex.addSuppressed(e);
+ final String blobName = blobName(name);
+ writeTo(obj, blobName, bytesArray -> {
+ try (InputStream stream = bytesArray.streamInput()) {
+ blobContainer.writeBlobAtomic(blobName, stream, bytesArray.length());
}
- throw ex;
- }
+ });
}
/**
@@ -157,51 +148,35 @@ public class ChecksumBlobStoreFormat extends BlobStoreForm
* @param name blob name
*/
public void write(T obj, BlobContainer blobContainer, String name) throws IOException {
- String blobName = blobName(name);
- writeBlob(obj, blobContainer, blobName);
+ final String blobName = blobName(name);
+ writeTo(obj, blobName, bytesArray -> {
+ try (InputStream stream = bytesArray.streamInput()) {
+ blobContainer.writeBlob(blobName, stream, bytesArray.length());
+ }
+ });
}
- /**
- * Writes blob in atomic manner without resolving the blobName using using {@link #blobName} method.
- *
- * The blob will be compressed and checksum will be written if required.
- *
- * @param obj object to be serialized
- * @param blobContainer blob container
- * @param blobName blob name
- */
- protected void writeBlob(T obj, BlobContainer blobContainer, String blobName) throws IOException {
- BytesReference bytes = write(obj);
- try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
+ private void writeTo(final T obj, final String blobName, final CheckedConsumer consumer) throws IOException {
+ final BytesReference bytes = write(obj);
+ try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
final String resourceDesc = "ChecksumBlobStoreFormat.writeBlob(blob=\"" + blobName + "\")";
- try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, byteArrayOutputStream, BUFFER_SIZE)) {
+ try (OutputStreamIndexOutput indexOutput = new OutputStreamIndexOutput(resourceDesc, blobName, outputStream, BUFFER_SIZE)) {
CodecUtil.writeHeader(indexOutput, codec, VERSION);
try (OutputStream indexOutputOutputStream = new IndexOutputOutputStream(indexOutput) {
@Override
public void close() throws IOException {
// this is important since some of the XContentBuilders write bytes on close.
// in order to write the footer we need to prevent closing the actual index input.
- } }) {
+ }
+ }) {
bytes.writeTo(indexOutputOutputStream);
}
CodecUtil.writeFooter(indexOutput);
}
- BytesArray bytesArray = new BytesArray(byteArrayOutputStream.toByteArray());
- try (InputStream stream = bytesArray.streamInput()) {
- blobContainer.writeBlob(blobName, stream, bytesArray.length());
- }
+ consumer.accept(new BytesArray(outputStream.toByteArray()));
}
}
- /**
- * Returns true if the blob is a leftover temporary blob.
- *
- * The temporary blobs might be left after failed atomic write operation.
- */
- public boolean isTempBlobName(String blobName) {
- return blobName.startsWith(ChecksumBlobStoreFormat.TEMP_FILE_PREFIX);
- }
-
protected BytesReference write(T obj) throws IOException {
try (BytesStreamOutput bytesStreamOutput = new BytesStreamOutput()) {
if (compress) {
@@ -222,10 +197,4 @@ public class ChecksumBlobStoreFormat extends BlobStoreForm
builder.endObject();
}
}
-
-
- protected String tempBlobName(String name) {
- return TEMP_FILE_PREFIX + String.format(Locale.ROOT, blobNameFormat, name);
- }
-
}
diff --git a/server/src/main/java/org/elasticsearch/search/DocValueFormat.java b/server/src/main/java/org/elasticsearch/search/DocValueFormat.java
index 8677370fc99..242e0887473 100644
--- a/server/src/main/java/org/elasticsearch/search/DocValueFormat.java
+++ b/server/src/main/java/org/elasticsearch/search/DocValueFormat.java
@@ -39,6 +39,7 @@ import java.text.DecimalFormatSymbols;
import java.text.NumberFormat;
import java.text.ParseException;
import java.util.Arrays;
+import java.util.Base64;
import java.util.Locale;
import java.util.Objects;
import java.util.function.LongSupplier;
@@ -121,6 +122,50 @@ public interface DocValueFormat extends NamedWriteable {
}
};
+ DocValueFormat BINARY = new DocValueFormat() {
+
+ @Override
+ public String getWriteableName() {
+ return "binary";
+ }
+
+ @Override
+ public void writeTo(StreamOutput out) throws IOException {
+ }
+
+ @Override
+ public Object format(long value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object format(double value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String format(BytesRef value) {
+ return Base64.getEncoder()
+ .withoutPadding()
+ .encodeToString(Arrays.copyOfRange(value.bytes, value.offset, value.offset + value.length));
+ }
+
+ @Override
+ public long parseLong(String value, boolean roundUp, LongSupplier now) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public double parseDouble(String value, boolean roundUp, LongSupplier now) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BytesRef parseBytesRef(String value) {
+ return new BytesRef(Base64.getDecoder().decode(value));
+ }
+ };
+
final class DateTime implements DocValueFormat {
public static final String NAME = "date_time";
diff --git a/server/src/main/java/org/elasticsearch/search/SearchModule.java b/server/src/main/java/org/elasticsearch/search/SearchModule.java
index 66ea407f42a..869dfe995ed 100644
--- a/server/src/main/java/org/elasticsearch/search/SearchModule.java
+++ b/server/src/main/java/org/elasticsearch/search/SearchModule.java
@@ -645,6 +645,7 @@ public class SearchModule {
registerValueFormat(DocValueFormat.GEOHASH.getWriteableName(), in -> DocValueFormat.GEOHASH);
registerValueFormat(DocValueFormat.IP.getWriteableName(), in -> DocValueFormat.IP);
registerValueFormat(DocValueFormat.RAW.getWriteableName(), in -> DocValueFormat.RAW);
+ registerValueFormat(DocValueFormat.BINARY.getWriteableName(), in -> DocValueFormat.BINARY);
}
/**
diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponseTests.java
index c15c0a1be7f..e8bd14b640d 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponseTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/settings/ClusterUpdateSettingsResponseTests.java
@@ -102,6 +102,6 @@ public class ClusterUpdateSettingsResponseTests extends AbstractStreamableXConte
public void testOldSerialisation() throws IOException {
ClusterUpdateSettingsResponse original = createTestInstance();
- assertSerialization(original, VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, Version.V_7_0_0_alpha1));
+ assertSerialization(original, VersionUtils.randomVersionBetween(random(), Version.V_6_0_0, Version.V_6_4_0));
}
}
diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponseTests.java
index c3ff4511815..903accac6ab 100644
--- a/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponseTests.java
+++ b/server/src/test/java/org/elasticsearch/action/admin/indices/rollover/RolloverResponseTests.java
@@ -19,6 +19,8 @@
package org.elasticsearch.action.admin.indices.rollover;
+import com.carrotsearch.randomizedtesting.annotations.Repeat;
+
import org.elasticsearch.Version;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
@@ -132,6 +134,6 @@ public class RolloverResponseTests extends AbstractStreamableXContentTestCase "Shard [" + e.getKey() + "], result [" + e.getValue() + "]")
- .collect(Collectors.joining(","));
- return String.format(Locale.ROOT, "Total shards: [%d], failed: [%s], reason: [%s], detail: [%s]",
- result.totalShards(), result.failed(), result.failureReason(), detail);
- }
-
- @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29392")
- @TestLogging("_root:DEBUG,org.elasticsearch.indices.flush:TRACE")
public void testSyncedFlushSkipOutOfSyncReplicas() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
@@ -281,7 +268,6 @@ public class FlushIT extends ESIntegTestCase {
indexDoc(IndexShardTestCase.getEngine(outOfSyncReplica), "extra_" + i);
}
final ShardsSyncedFlushResult partialResult = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
- logger.info("Partial seal: {}", syncedFlushDescription(partialResult));
assertThat(partialResult.totalShards(), equalTo(numberOfReplicas + 1));
assertThat(partialResult.successfulShards(), equalTo(numberOfReplicas));
assertThat(partialResult.shardResponses().get(outOfSyncReplica.routingEntry()).failureReason, equalTo(
@@ -297,8 +283,6 @@ public class FlushIT extends ESIntegTestCase {
assertThat(fullResult.successfulShards(), equalTo(numberOfReplicas + 1));
}
- @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/29392")
- @TestLogging("_root:DEBUG,org.elasticsearch.indices.flush:TRACE")
public void testDoNotRenewSyncedFlushWhenAllSealed() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(between(2, 3));
final int numberOfReplicas = internalCluster().numDataNodes() - 1;
@@ -315,11 +299,9 @@ public class FlushIT extends ESIntegTestCase {
index("test", "doc", Integer.toString(i));
}
final ShardsSyncedFlushResult firstSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
- logger.info("First seal: {}", syncedFlushDescription(firstSeal));
assertThat(firstSeal.successfulShards(), equalTo(numberOfReplicas + 1));
// Do not renew synced-flush
final ShardsSyncedFlushResult secondSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
- logger.info("Second seal: {}", syncedFlushDescription(secondSeal));
assertThat(secondSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(secondSeal.syncId(), equalTo(firstSeal.syncId()));
// Shards were updated, renew synced flush.
@@ -328,7 +310,6 @@ public class FlushIT extends ESIntegTestCase {
index("test", "doc", Integer.toString(i));
}
final ShardsSyncedFlushResult thirdSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
- logger.info("Third seal: {}", syncedFlushDescription(thirdSeal));
assertThat(thirdSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(thirdSeal.syncId(), not(equalTo(firstSeal.syncId())));
// Manually remove or change sync-id, renew synced flush.
@@ -344,7 +325,6 @@ public class FlushIT extends ESIntegTestCase {
assertThat(shard.commitStats().syncId(), nullValue());
}
final ShardsSyncedFlushResult forthSeal = SyncedFlushUtil.attemptSyncedFlush(logger, internalCluster(), shardId);
- logger.info("Forth seal: {}", syncedFlushDescription(forthSeal));
assertThat(forthSeal.successfulShards(), equalTo(numberOfReplicas + 1));
assertThat(forthSeal.syncId(), not(equalTo(thirdSeal.syncId())));
}
diff --git a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java
index 987f69b6587..8a8d57295a5 100644
--- a/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java
+++ b/server/src/test/java/org/elasticsearch/indices/flush/SyncedFlushUtil.java
@@ -29,6 +29,9 @@ import org.elasticsearch.test.InternalTestCluster;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.elasticsearch.test.ESTestCase.assertBusy;
/** Utils for SyncedFlush */
public class SyncedFlushUtil {
@@ -40,21 +43,31 @@ public class SyncedFlushUtil {
/**
* Blocking version of {@link SyncedFlushService#attemptSyncedFlush(ShardId, ActionListener)}
*/
- public static ShardsSyncedFlushResult attemptSyncedFlush(Logger logger, InternalTestCluster cluster, ShardId shardId) {
+ public static ShardsSyncedFlushResult attemptSyncedFlush(Logger logger, InternalTestCluster cluster, ShardId shardId) throws Exception {
+ /*
+ * When the last indexing operation is completed, we will fire a global checkpoint sync.
+ * Since a global checkpoint sync request is a replication request, it will acquire an index
+ * shard permit on the primary when executing. If this happens at the same time while we are
+ * issuing the synced-flush, the synced-flush request will fail as it thinks there are
+ * in-flight operations. We can avoid such situation by continuing issuing another synced-flush
+ * if the synced-flush failed due to the ongoing operations on the primary.
+ */
SyncedFlushService service = cluster.getInstance(SyncedFlushService.class);
- logger.debug("Issue synced-flush on node [{}], shard [{}], cluster state [{}]",
- service.nodeName(), shardId, cluster.clusterService(service.nodeName()).state());
- LatchedListener listener = new LatchedListener<>();
- service.attemptSyncedFlush(shardId, listener);
- try {
+ AtomicReference> listenerHolder = new AtomicReference<>();
+ assertBusy(() -> {
+ LatchedListener listener = new LatchedListener<>();
+ listenerHolder.set(listener);
+ service.attemptSyncedFlush(shardId, listener);
listener.latch.await();
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
+ if (listener.result != null && listener.result.failureReason() != null
+ && listener.result.failureReason().contains("ongoing operations on primary")) {
+ throw new AssertionError(listener.result.failureReason()); // cause the assert busy to retry
+ }
+ });
+ if (listenerHolder.get().error != null) {
+ throw ExceptionsHelper.convertToElastic(listenerHolder.get().error);
}
- if (listener.error != null) {
- throw ExceptionsHelper.convertToElastic(listener.error);
- }
- return listener.result;
+ return listenerHolder.get().result;
}
public static final class LatchedListener implements ActionListener {
diff --git a/server/src/test/java/org/elasticsearch/search/DocValueFormatTests.java b/server/src/test/java/org/elasticsearch/search/DocValueFormatTests.java
index e5cfbf98b3d..01906279474 100644
--- a/server/src/test/java/org/elasticsearch/search/DocValueFormatTests.java
+++ b/server/src/test/java/org/elasticsearch/search/DocValueFormatTests.java
@@ -44,6 +44,7 @@ public class DocValueFormatTests extends ESTestCase {
entries.add(new Entry(DocValueFormat.class, DocValueFormat.GEOHASH.getWriteableName(), in -> DocValueFormat.GEOHASH));
entries.add(new Entry(DocValueFormat.class, DocValueFormat.IP.getWriteableName(), in -> DocValueFormat.IP));
entries.add(new Entry(DocValueFormat.class, DocValueFormat.RAW.getWriteableName(), in -> DocValueFormat.RAW));
+ entries.add(new Entry(DocValueFormat.class, DocValueFormat.BINARY.getWriteableName(), in -> DocValueFormat.BINARY));
NamedWriteableRegistry registry = new NamedWriteableRegistry(entries);
BytesStreamOutput out = new BytesStreamOutput();
@@ -82,6 +83,11 @@ public class DocValueFormatTests extends ESTestCase {
out.writeNamedWriteable(DocValueFormat.RAW);
in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), registry);
assertSame(DocValueFormat.RAW, in.readNamedWriteable(DocValueFormat.class));
+
+ out = new BytesStreamOutput();
+ out.writeNamedWriteable(DocValueFormat.BINARY);
+ in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), registry);
+ assertSame(DocValueFormat.BINARY, in.readNamedWriteable(DocValueFormat.class));
}
public void testRawFormat() {
@@ -96,6 +102,14 @@ public class DocValueFormatTests extends ESTestCase {
assertEquals("abc", DocValueFormat.RAW.format(new BytesRef("abc")));
}
+ public void testBinaryFormat() {
+ assertEquals("", DocValueFormat.BINARY.format(new BytesRef()));
+ assertEquals("KmQ", DocValueFormat.BINARY.format(new BytesRef(new byte[] {42, 100})));
+
+ assertEquals(new BytesRef(), DocValueFormat.BINARY.parseBytesRef(""));
+ assertEquals(new BytesRef(new byte[] {42, 100}), DocValueFormat.BINARY.parseBytesRef("KmQ"));
+ }
+
public void testBooleanFormat() {
assertEquals(false, DocValueFormat.BOOLEAN.format(0));
assertEquals(true, DocValueFormat.BOOLEAN.format(1));
diff --git a/server/src/test/java/org/elasticsearch/search/fields/SearchFieldsIT.java b/server/src/test/java/org/elasticsearch/search/fields/SearchFieldsIT.java
index a8a2669ef9b..ab5387b6e3f 100644
--- a/server/src/test/java/org/elasticsearch/search/fields/SearchFieldsIT.java
+++ b/server/src/test/java/org/elasticsearch/search/fields/SearchFieldsIT.java
@@ -19,6 +19,7 @@
package org.elasticsearch.search.fields;
+import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
@@ -700,7 +701,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
assertThat(fields.get("test_field").getValue(), equalTo("foobar"));
}
- public void testFieldsPulledFromFieldData() throws Exception {
+ public void testDocValueFields() throws Exception {
createIndex("test");
String mapping = Strings
@@ -744,6 +745,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
.endObject()
.startObject("binary_field")
.field("type", "binary")
+ .field("doc_values", true) // off by default on binary fields
.endObject()
.startObject("ip_field")
.field("type", "ip")
@@ -766,6 +768,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
.field("double_field", 6.0d)
.field("date_field", Joda.forPattern("dateOptionalTime").printer().print(date))
.field("boolean_field", true)
+ .field("binary_field", new byte[] {42, 100})
.field("ip_field", "::1")
.endObject()).execute().actionGet();
@@ -782,6 +785,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
.addDocValueField("double_field")
.addDocValueField("date_field")
.addDocValueField("boolean_field")
+ .addDocValueField("binary_field")
.addDocValueField("ip_field");
SearchResponse searchResponse = builder.execute().actionGet();
@@ -790,7 +794,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
Set fields = new HashSet<>(searchResponse.getHits().getAt(0).getFields().keySet());
assertThat(fields, equalTo(newHashSet("byte_field", "short_field", "integer_field", "long_field",
"float_field", "double_field", "date_field", "boolean_field", "text_field", "keyword_field",
- "ip_field")));
+ "binary_field", "ip_field")));
assertThat(searchResponse.getHits().getAt(0).getFields().get("byte_field").getValue().toString(), equalTo("1"));
assertThat(searchResponse.getHits().getAt(0).getFields().get("short_field").getValue().toString(), equalTo("2"));
@@ -802,6 +806,8 @@ public class SearchFieldsIT extends ESIntegTestCase {
assertThat(searchResponse.getHits().getAt(0).getFields().get("boolean_field").getValue(), equalTo((Object) true));
assertThat(searchResponse.getHits().getAt(0).getFields().get("text_field").getValue(), equalTo("foo"));
assertThat(searchResponse.getHits().getAt(0).getFields().get("keyword_field").getValue(), equalTo("foo"));
+ assertThat(searchResponse.getHits().getAt(0).getFields().get("binary_field").getValue(),
+ equalTo(new BytesRef(new byte[] {42, 100})));
assertThat(searchResponse.getHits().getAt(0).getFields().get("ip_field").getValue(), equalTo("::1"));
builder = client().prepareSearch().setQuery(matchAllQuery())
@@ -815,6 +821,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
.addDocValueField("double_field", "use_field_mapping")
.addDocValueField("date_field", "use_field_mapping")
.addDocValueField("boolean_field", "use_field_mapping")
+ .addDocValueField("binary_field", "use_field_mapping")
.addDocValueField("ip_field", "use_field_mapping");
searchResponse = builder.execute().actionGet();
@@ -823,7 +830,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
fields = new HashSet<>(searchResponse.getHits().getAt(0).getFields().keySet());
assertThat(fields, equalTo(newHashSet("byte_field", "short_field", "integer_field", "long_field",
"float_field", "double_field", "date_field", "boolean_field", "text_field", "keyword_field",
- "ip_field")));
+ "binary_field", "ip_field")));
assertThat(searchResponse.getHits().getAt(0).getFields().get("byte_field").getValue().toString(), equalTo("1"));
assertThat(searchResponse.getHits().getAt(0).getFields().get("short_field").getValue().toString(), equalTo("2"));
@@ -836,6 +843,7 @@ public class SearchFieldsIT extends ESIntegTestCase {
assertThat(searchResponse.getHits().getAt(0).getFields().get("boolean_field").getValue(), equalTo((Object) true));
assertThat(searchResponse.getHits().getAt(0).getFields().get("text_field").getValue(), equalTo("foo"));
assertThat(searchResponse.getHits().getAt(0).getFields().get("keyword_field").getValue(), equalTo("foo"));
+ assertThat(searchResponse.getHits().getAt(0).getFields().get("binary_field").getValue(), equalTo("KmQ"));
assertThat(searchResponse.getHits().getAt(0).getFields().get("ip_field").getValue(), equalTo("::1"));
builder = client().prepareSearch().setQuery(matchAllQuery())
diff --git a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
index 70591e06d10..c25cad61e07 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java
@@ -100,7 +100,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
}
Thread.sleep(100);
}
- fail("Timeout!!!");
+ fail("Timeout waiting for node [" + node + "] to be blocked");
}
public SnapshotInfo waitForCompletion(String repository, String snapshotName, TimeValue timeout) throws InterruptedException {
diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java
index 65926234d45..70be72989cf 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java
@@ -224,52 +224,16 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase {
IOException writeBlobException = expectThrows(IOException.class, () -> {
BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
@Override
- public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
- throw new IOException("Exception thrown in writeBlob() for " + blobName);
+ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobSize) throws IOException {
+ throw new IOException("Exception thrown in writeBlobAtomic() for " + blobName);
}
};
checksumFormat.writeAtomic(blobObj, wrapper, name);
});
- assertEquals("Exception thrown in writeBlob() for pending-" + name, writeBlobException.getMessage());
+ assertEquals("Exception thrown in writeBlobAtomic() for " + name, writeBlobException.getMessage());
assertEquals(0, writeBlobException.getSuppressed().length);
}
- {
- IOException moveException = expectThrows(IOException.class, () -> {
- BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
- @Override
- public void move(String sourceBlobName, String targetBlobName) throws IOException {
- throw new IOException("Exception thrown in move() for " + sourceBlobName);
- }
- };
- checksumFormat.writeAtomic(blobObj, wrapper, name);
- });
- assertEquals("Exception thrown in move() for pending-" + name, moveException.getMessage());
- assertEquals(0, moveException.getSuppressed().length);
- }
- {
- IOException moveThenDeleteException = expectThrows(IOException.class, () -> {
- BlobContainer wrapper = new BlobContainerWrapper(blobContainer) {
- @Override
- public void move(String sourceBlobName, String targetBlobName) throws IOException {
- throw new IOException("Exception thrown in move() for " + sourceBlobName);
- }
-
- @Override
- public void deleteBlob(String blobName) throws IOException {
- throw new IOException("Exception thrown in deleteBlob() for " + blobName);
- }
- };
- checksumFormat.writeAtomic(blobObj, wrapper, name);
- });
-
- assertEquals("Exception thrown in move() for pending-" + name, moveThenDeleteException.getMessage());
- assertEquals(1, moveThenDeleteException.getSuppressed().length);
-
- final Throwable suppressedThrowable = moveThenDeleteException.getSuppressed()[0];
- assertTrue(suppressedThrowable instanceof IOException);
- assertEquals("Exception thrown in deleteBlob() for pending-" + name, suppressedThrowable.getMessage());
- }
}
protected BlobStore createTestBlobStore() throws IOException {
diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java
index 56a4a279cab..089955d140f 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java
@@ -53,11 +53,21 @@ public class BlobContainerWrapper implements BlobContainer {
delegate.writeBlob(blobName, inputStream, blobSize);
}
+ @Override
+ public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
+ delegate.writeBlobAtomic(blobName, inputStream, blobSize);
+ }
+
@Override
public void deleteBlob(String blobName) throws IOException {
delegate.deleteBlob(blobName);
}
+ @Override
+ public void deleteBlobIgnoringIfNotExists(final String blobName) throws IOException {
+ delegate.deleteBlobIgnoringIfNotExists(blobName);
+ }
+
@Override
public Map listBlobs() throws IOException {
return delegate.listBlobs();
diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
index 3a5b068cd89..5fa884adbfe 100644
--- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
+++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java
@@ -19,6 +19,28 @@
package org.elasticsearch.snapshots.mockstore;
+import com.carrotsearch.randomizedtesting.RandomizedContext;
+import org.apache.lucene.index.CorruptIndexException;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.metadata.RepositoryMetaData;
+import org.elasticsearch.common.blobstore.BlobContainer;
+import org.elasticsearch.common.blobstore.BlobMetaData;
+import org.elasticsearch.common.blobstore.BlobPath;
+import org.elasticsearch.common.blobstore.BlobStore;
+import org.elasticsearch.common.blobstore.fs.FsBlobContainer;
+import org.elasticsearch.common.io.PathUtils;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Setting.Property;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.NamedXContentRegistry;
+import org.elasticsearch.env.Environment;
+import org.elasticsearch.plugins.RepositoryPlugin;
+import org.elasticsearch.repositories.IndexId;
+import org.elasticsearch.repositories.Repository;
+import org.elasticsearch.repositories.fs.FsRepository;
+import org.elasticsearch.snapshots.SnapshotId;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
@@ -29,31 +51,11 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
-import com.carrotsearch.randomizedtesting.RandomizedContext;
-import org.apache.lucene.index.CorruptIndexException;
-import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.cluster.metadata.MetaData;
-import org.elasticsearch.cluster.metadata.RepositoryMetaData;
-import org.elasticsearch.common.blobstore.BlobContainer;
-import org.elasticsearch.common.blobstore.BlobMetaData;
-import org.elasticsearch.common.blobstore.BlobPath;
-import org.elasticsearch.common.blobstore.BlobStore;
-import org.elasticsearch.common.io.PathUtils;
-import org.elasticsearch.common.settings.Setting;
-import org.elasticsearch.common.settings.Setting.Property;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.xcontent.NamedXContentRegistry;
-import org.elasticsearch.env.Environment;
-import org.elasticsearch.plugins.RepositoryPlugin;
-import org.elasticsearch.repositories.Repository;
-import org.elasticsearch.repositories.IndexId;
-import org.elasticsearch.repositories.fs.FsRepository;
-import org.elasticsearch.snapshots.SnapshotId;
-
public class MockRepository extends FsRepository {
public static class Plugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin {
@@ -325,6 +327,12 @@ public class MockRepository extends FsRepository {
super.deleteBlob(blobName);
}
+ @Override
+ public void deleteBlobIgnoringIfNotExists(String blobName) throws IOException {
+ maybeIOExceptionOrBlock(blobName);
+ super.deleteBlobIgnoringIfNotExists(blobName);
+ }
+
@Override
public Map listBlobs() throws IOException {
maybeIOExceptionOrBlock("");
@@ -365,6 +373,31 @@ public class MockRepository extends FsRepository {
maybeIOExceptionOrBlock(blobName);
}
}
+
+ @Override
+ public void writeBlobAtomic(final String blobName, final InputStream inputStream, final long blobSize) throws IOException {
+ final Random random = RandomizedContext.current().getRandom();
+ if (random.nextBoolean()) {
+ if ((delegate() instanceof FsBlobContainer) && (random.nextBoolean())) {
+ // Simulate a failure between the write and move operation in FsBlobContainer
+ final String tempBlobName = FsBlobContainer.tempBlobName(blobName);
+ super.writeBlob(tempBlobName, inputStream, blobSize);
+ maybeIOExceptionOrBlock(blobName);
+ final FsBlobContainer fsBlobContainer = (FsBlobContainer) delegate();
+ fsBlobContainer.move(tempBlobName, blobName);
+ } else {
+ // Atomic write since it is potentially supported
+ // by the delegating blob container
+ maybeIOExceptionOrBlock(blobName);
+ super.writeBlobAtomic(blobName, inputStream, blobSize);
+ }
+ } else {
+ // Simulate a non-atomic write since many blob container
+ // implementations does not support atomic write
+ maybeIOExceptionOrBlock(blobName);
+ super.writeBlob(blobName, inputStream, blobSize);
+ }
+ }
}
}
}
diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java
index 743be6d1bcb..df2024de445 100644
--- a/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/repositories/ESBlobStoreContainerTestCase.java
@@ -158,7 +158,11 @@ public abstract class ESBlobStoreContainerTestCase extends ESTestCase {
protected void writeBlob(final BlobContainer container, final String blobName, final BytesArray bytesArray) throws IOException {
try (InputStream stream = bytesArray.streamInput()) {
- container.writeBlob(blobName, stream, bytesArray.length());
+ if (randomBoolean()) {
+ container.writeBlob(blobName, stream, bytesArray.length());
+ } else {
+ container.writeBlobAtomic(blobName, stream, bytesArray.length());
+ }
}
}
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java
index 99e6a10ad92..40c694cedb7 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseService.java
@@ -411,7 +411,8 @@ public class LicenseService extends AbstractLifecycleComponent implements Cluste
// auto-generate license if no licenses ever existed or if the current license is basic and
// needs extended or if the license signature needs to be updated. this will trigger a subsequent cluster changed event
if (currentClusterState.getNodes().isLocalNodeElectedMaster() &&
- (noLicense || LicenseUtils.licenseNeedsExtended(currentLicense) || LicenseUtils.signatureNeedsUpdate(currentLicense))) {
+ (noLicense || LicenseUtils.licenseNeedsExtended(currentLicense) ||
+ LicenseUtils.signatureNeedsUpdate(currentLicense, currentClusterState.nodes()))) {
registerOrUpdateSelfGeneratedLicense();
}
} else if (logger.isDebugEnabled()) {
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseUtils.java
index 8fcdc05bcf9..4c8a558682b 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseUtils.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/LicenseUtils.java
@@ -6,8 +6,12 @@
package org.elasticsearch.license;
import org.elasticsearch.ElasticsearchSecurityException;
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.rest.RestStatus;
+import java.util.stream.StreamSupport;
+
public class LicenseUtils {
public static final String EXPIRED_FEATURE_METADATA = "es.license.expired.feature";
@@ -42,8 +46,25 @@ public class LicenseUtils {
* Checks if the signature of a self generated license with older version needs to be
* recreated with the new key
*/
- public static boolean signatureNeedsUpdate(License license) {
+ public static boolean signatureNeedsUpdate(License license, DiscoveryNodes currentNodes) {
+ assert License.VERSION_CRYPTO_ALGORITHMS == License.VERSION_CURRENT : "update this method when adding a new version";
+
return ("basic".equals(license.type()) || "trial".equals(license.type())) &&
- (license.version() < License.VERSION_CRYPTO_ALGORITHMS);
+ // only upgrade signature when all nodes are ready to deserialize the new signature
+ (license.version() < License.VERSION_CRYPTO_ALGORITHMS &&
+ compatibleLicenseVersion(currentNodes) == License.VERSION_CRYPTO_ALGORITHMS
+ );
+ }
+
+ public static int compatibleLicenseVersion(DiscoveryNodes currentNodes) {
+ assert License.VERSION_CRYPTO_ALGORITHMS == License.VERSION_CURRENT : "update this method when adding a new version";
+
+ if (StreamSupport.stream(currentNodes.spliterator(), false)
+ .allMatch(node -> node.getVersion().onOrAfter(Version.V_6_4_0))) {
+ // License.VERSION_CRYPTO_ALGORITHMS was introduced in 6.4.0
+ return License.VERSION_CRYPTO_ALGORITHMS;
+ } else {
+ return License.VERSION_START_DATE;
+ }
}
}
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/SelfGeneratedLicense.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/SelfGeneratedLicense.java
index 0bc49d517cd..fb9b167d3db 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/SelfGeneratedLicense.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/SelfGeneratedLicense.java
@@ -5,6 +5,7 @@
*/
package org.elasticsearch.license;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
@@ -26,8 +27,8 @@ import static org.elasticsearch.license.CryptUtils.decrypt;
class SelfGeneratedLicense {
- public static License create(License.Builder specBuilder) {
- return create(specBuilder, License.VERSION_CURRENT);
+ public static License create(License.Builder specBuilder, DiscoveryNodes currentNodes) {
+ return create(specBuilder, LicenseUtils.compatibleLicenseVersion(currentNodes));
}
public static License create(License.Builder specBuilder, int version) {
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartBasicClusterTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartBasicClusterTask.java
index 0cf949a6990..468f1799a07 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartBasicClusterTask.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartBasicClusterTask.java
@@ -73,7 +73,7 @@ public class StartBasicClusterTask extends ClusterStateUpdateTask {
.issueDate(issueDate)
.type("basic")
.expiryDate(LicenseService.BASIC_SELF_GENERATED_LICENSE_EXPIRATION_MILLIS);
- License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder);
+ License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, currentState.nodes());
if (request.isAcknowledged() == false && currentLicense != null) {
Map ackMessages = LicenseService.getAckMessages(selfGeneratedLicense, currentLicense);
if (ackMessages.isEmpty() == false) {
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartTrialClusterTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartTrialClusterTask.java
index 5c5c03151ba..2bf0555fde1 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartTrialClusterTask.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartTrialClusterTask.java
@@ -82,7 +82,7 @@ public class StartTrialClusterTask extends ClusterStateUpdateTask {
.issueDate(issueDate)
.type(request.getType())
.expiryDate(expiryDate);
- License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder);
+ License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, currentState.nodes());
LicensesMetaData newLicensesMetaData = new LicensesMetaData(selfGeneratedLicense, Version.CURRENT);
mdBuilder.putCustom(LicensesMetaData.TYPE, newLicensesMetaData);
return ClusterState.builder(currentState).metaData(mdBuilder).build();
diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartupSelfGeneratedLicenseTask.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartupSelfGeneratedLicenseTask.java
index 13d6326f3ce..c2d53bd0716 100644
--- a/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartupSelfGeneratedLicenseTask.java
+++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/license/StartupSelfGeneratedLicenseTask.java
@@ -61,7 +61,7 @@ public class StartupSelfGeneratedLicenseTask extends ClusterStateUpdateTask {
"]. Must be trial or basic.");
}
return updateWithLicense(currentState, type);
- } else if (LicenseUtils.signatureNeedsUpdate(currentLicensesMetaData.getLicense())) {
+ } else if (LicenseUtils.signatureNeedsUpdate(currentLicensesMetaData.getLicense(), currentState.nodes())) {
return updateLicenseSignature(currentState, currentLicensesMetaData);
} else if (LicenseUtils.licenseNeedsExtended(currentLicensesMetaData.getLicense())) {
return extendBasic(currentState, currentLicensesMetaData);
@@ -87,7 +87,7 @@ public class StartupSelfGeneratedLicenseTask extends ClusterStateUpdateTask {
.issueDate(issueDate)
.type(type)
.expiryDate(expiryDate);
- License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder);
+ License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, currentState.nodes());
Version trialVersion = currentLicenseMetaData.getMostRecentTrialVersion();
LicensesMetaData newLicenseMetadata = new LicensesMetaData(selfGeneratedLicense, trialVersion);
mdBuilder.putCustom(LicensesMetaData.TYPE, newLicenseMetadata);
@@ -120,7 +120,7 @@ public class StartupSelfGeneratedLicenseTask extends ClusterStateUpdateTask {
.issueDate(currentLicense.issueDate())
.type("basic")
.expiryDate(LicenseService.BASIC_SELF_GENERATED_LICENSE_EXPIRATION_MILLIS);
- License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder);
+ License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, currentLicense.version());
Version trialVersion = currentLicenseMetadata.getMostRecentTrialVersion();
return new LicensesMetaData(selfGeneratedLicense, trialVersion);
}
@@ -141,7 +141,7 @@ public class StartupSelfGeneratedLicenseTask extends ClusterStateUpdateTask {
.issueDate(issueDate)
.type(type)
.expiryDate(expiryDate);
- License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder);
+ License selfGeneratedLicense = SelfGeneratedLicense.create(specBuilder, currentState.nodes());
LicensesMetaData licensesMetaData;
if ("trial".equals(type)) {
licensesMetaData = new LicensesMetaData(selfGeneratedLicense, Version.CURRENT);
diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseRegistrationTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseRegistrationTests.java
index 2a237f090e2..5405af013af 100644
--- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseRegistrationTests.java
+++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseRegistrationTests.java
@@ -104,7 +104,7 @@ public class LicenseRegistrationTests extends AbstractLicenseServiceTestCase {
.issueDate(dateMath("now-10h", now))
.type("basic")
.expiryDate(dateMath("now-2h", now));
- License license = SelfGeneratedLicense.create(builder);
+ License license = SelfGeneratedLicense.create(builder, License.VERSION_CURRENT);
XPackLicenseState licenseState = new XPackLicenseState(Settings.EMPTY);
setInitialState(license, licenseState, Settings.EMPTY);
@@ -125,4 +125,4 @@ public class LicenseRegistrationTests extends AbstractLicenseServiceTestCase {
assertEquals(LicenseService.BASIC_SELF_GENERATED_LICENSE_EXPIRATION_MILLIS, licenseMetaData.getLicense().expiryDate());
assertEquals(uid, licenseMetaData.getLicense().uid());
}
-}
\ No newline at end of file
+}
diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseSerializationTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseSerializationTests.java
index d7cf5ab50fb..d07be0fd3c7 100644
--- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseSerializationTests.java
+++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicenseSerializationTests.java
@@ -111,7 +111,7 @@ public class LicenseSerializationTests extends ESTestCase {
.issueDate(now)
.type("basic")
.expiryDate(LicenseService.BASIC_SELF_GENERATED_LICENSE_EXPIRATION_MILLIS);
- License license = SelfGeneratedLicense.create(specBuilder);
+ License license = SelfGeneratedLicense.create(specBuilder, License.VERSION_CURRENT);
XContentBuilder builder = XContentFactory.contentBuilder(XContentType.JSON);
license.toXContent(builder, new ToXContent.MapParams(Collections.singletonMap(License.REST_VIEW_MODE, "true")));
builder.flush();
diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicensesMetaDataSerializationTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicensesMetaDataSerializationTests.java
index f3ed04ed22d..d7799959f6c 100644
--- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicensesMetaDataSerializationTests.java
+++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/LicensesMetaDataSerializationTests.java
@@ -95,7 +95,7 @@ public class LicensesMetaDataSerializationTests extends ESTestCase {
.issueDate(issueDate)
.type(randomBoolean() ? "trial" : "basic")
.expiryDate(issueDate + TimeValue.timeValueHours(2).getMillis());
- final License trialLicense = SelfGeneratedLicense.create(specBuilder);
+ final License trialLicense = SelfGeneratedLicense.create(specBuilder, License.VERSION_CURRENT);
LicensesMetaData licensesMetaData = new LicensesMetaData(trialLicense, Version.CURRENT);
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/SelfGeneratedLicenseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/SelfGeneratedLicenseTests.java
index aa27dbdcb49..4e061623ccd 100644
--- a/x-pack/plugin/core/src/test/java/org/elasticsearch/license/SelfGeneratedLicenseTests.java
+++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/license/SelfGeneratedLicenseTests.java
@@ -34,7 +34,7 @@ public class SelfGeneratedLicenseTests extends ESTestCase {
.type(randomBoolean() ? "trial" : "basic")
.issueDate(issueDate)
.expiryDate(issueDate + TimeValue.timeValueHours(2).getMillis());
- License trialLicense = SelfGeneratedLicense.create(specBuilder);
+ License trialLicense = SelfGeneratedLicense.create(specBuilder, License.VERSION_CURRENT);
assertThat(SelfGeneratedLicense.verify(trialLicense), equalTo(true));
}
@@ -47,7 +47,7 @@ public class SelfGeneratedLicenseTests extends ESTestCase {
.maxNodes(5)
.issueDate(issueDate)
.expiryDate(issueDate + TimeValue.timeValueHours(2).getMillis());
- License trialLicense = SelfGeneratedLicense.create(specBuilder);
+ License trialLicense = SelfGeneratedLicense.create(specBuilder, License.VERSION_CURRENT);
final String originalSignature = trialLicense.signature();
License tamperedLicense = License.builder().fromLicenseSpec(trialLicense, originalSignature)
.expiryDate(System.currentTimeMillis() + TimeValue.timeValueHours(5).getMillis())
@@ -70,7 +70,8 @@ public class SelfGeneratedLicenseTests extends ESTestCase {
.issueDate(issueDate)
.expiryDate(issueDate + TimeValue.timeValueHours(2).getMillis());
License pre20TrialLicense = specBuilder.build();
- License license = SelfGeneratedLicense.create(License.builder().fromPre20LicenseSpec(pre20TrialLicense).type("trial"));
+ License license = SelfGeneratedLicense.create(License.builder().fromPre20LicenseSpec(pre20TrialLicense).type("trial"),
+ License.VERSION_CURRENT);
assertThat(SelfGeneratedLicense.verify(license), equalTo(true));
}
diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java
index 6d12b6472f1..d2e35999096 100644
--- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java
+++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/Security.java
@@ -7,7 +7,6 @@ package org.elasticsearch.xpack.security;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
-import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
@@ -17,7 +16,6 @@ import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.bootstrap.BootstrapCheck;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
@@ -112,7 +110,6 @@ import org.elasticsearch.xpack.core.security.authc.AuthenticationServiceField;
import org.elasticsearch.xpack.core.security.authc.DefaultAuthenticationFailureHandler;
import org.elasticsearch.xpack.core.security.authc.Realm;
import org.elasticsearch.xpack.core.security.authc.RealmSettings;
-import org.elasticsearch.xpack.core.security.authc.TokenMetaData;
import org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken;
import org.elasticsearch.xpack.core.security.authz.AuthorizationServiceField;
import org.elasticsearch.xpack.core.security.authz.RoleDescriptor;
@@ -934,7 +931,8 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
if (enabled) {
return new ValidateTLSOnJoin(XPackSettings.TRANSPORT_SSL_ENABLED.get(settings),
DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings))
- .andThen(new ValidateUpgradedSecurityIndex());
+ .andThen(new ValidateUpgradedSecurityIndex())
+ .andThen(new ValidateLicenseCanBeDeserialized());
}
return null;
}
@@ -971,6 +969,17 @@ public class Security extends Plugin implements ActionPlugin, IngestPlugin, Netw
}
}
+ static final class ValidateLicenseCanBeDeserialized implements BiConsumer {
+ @Override
+ public void accept(DiscoveryNode node, ClusterState state) {
+ License license = LicenseService.getLicense(state.metaData());
+ if (license != null && license.version() >= License.VERSION_CRYPTO_ALGORITHMS && node.getVersion().before(Version.V_6_4_0)) {
+ throw new IllegalStateException("node " + node + " is on version [" + node.getVersion() +
+ "] that cannot deserialize the license format [" + license.version() + "], upgrade node to at least 6.4.0");
+ }
+ }
+ }
+
@Override
public void reloadSPI(ClassLoader loader) {
securityExtensions.addAll(SecurityExtension.loadExtensions(loader));
diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java
index 190c8703955..b1d8d4b67bf 100644
--- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java
+++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/SecurityTests.java
@@ -28,6 +28,7 @@ import org.elasticsearch.license.TestUtils;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xpack.core.XPackSettings;
@@ -278,6 +279,19 @@ public class SecurityTests extends ESTestCase {
}
}
+ public void testJoinValidatorForLicenseDeserialization() throws Exception {
+ DiscoveryNode node = new DiscoveryNode("foo", buildNewFakeTransportAddress(),
+ VersionUtils.randomVersionBetween(random(), null, Version.V_6_3_0));
+ MetaData.Builder builder = MetaData.builder();
+ License license = TestUtils.generateSignedLicense(null,
+ randomIntBetween(License.VERSION_CRYPTO_ALGORITHMS, License.VERSION_CURRENT), -1, TimeValue.timeValueHours(24));
+ TestUtils.putLicense(builder, license);
+ ClusterState state = ClusterState.builder(ClusterName.DEFAULT).metaData(builder.build()).build();
+ IllegalStateException e = expectThrows(IllegalStateException.class,
+ () -> new Security.ValidateLicenseCanBeDeserialized().accept(node, state));
+ assertThat(e.getMessage(), containsString("cannot deserialize the license format"));
+ }
+
public void testIndexJoinValidator_Old_And_Rolling() throws Exception {
createComponents(Settings.EMPTY);
BiConsumer joinValidator = security.getJoinValidator();
@@ -345,7 +359,7 @@ public class SecurityTests extends ESTestCase {
.nodes(discoveryNodes).build();
joinValidator.accept(node, clusterState);
}
-
+
public void testGetFieldFilterSecurityEnabled() throws Exception {
createComponents(Settings.EMPTY);
Function> fieldFilter = security.getFieldFilter();