From c63626b537955d0309c6613642e990b54efd3cbf Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 25 Aug 2014 16:55:40 +0200 Subject: [PATCH] [SNAPSHOT] Add BWC layer to .si / segments_N hashing Due to additional safety added in #7351 we compute now a strong hash for .si and segments_N files which are compared during snapshot / restore. Old snapshots don't have this hash which can cause unnecessary copying of large amount of data. This commit adds the ability to fetch this hash from the blob store if needed. Closes #7434 --- .../BlobStoreIndexShardRepository.java | 42 ++- .../org/elasticsearch/index/store/Store.java | 28 +- .../BasicBackwardsCompatibilityTest.java | 108 -------- .../SharedClusterSnapshotRestoreTests.java | 66 +++++ .../SnapshotBackwardsCompatibilityTest.java | 242 ++++++++++++++++++ 5 files changed, 371 insertions(+), 115 deletions(-) create mode 100644 src/test/java/org/elasticsearch/snapshots/SnapshotBackwardsCompatibilityTest.java diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java index 9b17068c921..e8845eb2d98 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java @@ -24,6 +24,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.store.*; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.metadata.SnapshotId; @@ -422,6 +423,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements long indexTotalFilesSize = 0; ArrayList filesToSnapshot = newArrayList(); final Store.MetadataSnapshot metadata; + // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should try { metadata = store.getMetadata(snapshotIndexCommit); } catch (IOException e) { @@ -436,7 +438,15 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements final StoreFileMetaData md = metadata.get(fileName); boolean snapshotRequired = false; BlobStoreIndexShardSnapshot.FileInfo fileInfo = snapshots.findPhysicalIndexFile(fileName); - + try { + // in 1.4.0 we added additional hashes for .si / segments_N files + // to ensure we don't double the space in the repo since old snapshots + // don't have this hash we try to read that hash from the blob store + // in a bwc compatible way. + maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata); + } catch (Throwable e) { + logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); + } if (fileInfo == null || !fileInfo.isSame(md) || !snapshotFileExistsInBlobs(fileInfo, blobs)) { // commit point file does not exists in any commit point, or has different length, or does not fully exists in the listed blobs snapshotRequired = true; @@ -677,6 +687,25 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements } } + /** + * This is a BWC layer to ensure we update the snapshots metdata with the corresponding hashes before we compare them. + * The new logic for StoreFileMetaData reads the entire .si and segments.n files to strengthen the + * comparison of the files on a per-segment / per-commit level. + */ + private static final void maybeRecalculateMetadataHash(ImmutableBlobContainer blobContainer, FileInfo fileInfo, Store.MetadataSnapshot snapshot) throws IOException { + final StoreFileMetaData metadata; + if (fileInfo != null && (metadata = snapshot.get(fileInfo.name())) != null) { + if (metadata.hash().length > 0 && fileInfo.metadata().hash().length == 0) { + // we have a hash - check if our repo has a hash too otherwise we have + // to calculate it. + byte[] bytes = blobContainer.readBlobFully(fileInfo.physicalName()); + final BytesRef spare = new BytesRef(bytes); + Store.MetadataSnapshot.hashFile(fileInfo.metadata().hash(), spare); + } + } + + } + /** * Context for restore operations */ @@ -728,8 +757,17 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements final List filesToRecover = Lists.newArrayList(); final Map snapshotMetaData = new HashMap<>(); final Map fileInfos = new HashMap<>(); - for (final FileInfo fileInfo : snapshot.indexFiles()) { + try { + // in 1.4.0 we added additional hashes for .si / segments_N files + // to ensure we don't double the space in the repo since old snapshots + // don't have this hash we try to read that hash from the blob store + // in a bwc compatible way. + maybeRecalculateMetadataHash(blobContainer, fileInfo, recoveryTargetMetadata); + } catch (Throwable e) { + // if the index is broken we might not be able to read it + logger.warn("{} Can't calculate hash from blog for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); + } snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata()); fileInfos.put(fileInfo.metadata().name(), fileInfo); } diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index 9fac24e0fc0..9867dae0565 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -550,7 +550,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex private static void checksumFromLuceneFile(Directory directory, String file, ImmutableMap.Builder builder, ESLogger logger, Version version, boolean readFileAsHash) throws IOException { final String checksum; - BytesRef fileHash = new BytesRef(); + final BytesRef fileHash = new BytesRef(); try (IndexInput in = directory.openInput(file, IOContext.READONCE)) { try { if (in.length() < CodecUtil.footerLength()) { @@ -558,10 +558,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex throw new CorruptIndexException("Can't retrieve checksum from file: " + file + " file length must be >= " + CodecUtil.footerLength() + " but was: " + in.length()); } if (readFileAsHash) { - final int len = (int)Math.min(1024 * 1024, in.length()); // for safety we limit this to 1MB - fileHash.bytes = new byte[len]; - in.readBytes(fileHash.bytes, 0, len); - fileHash.length = len; + hashFile(fileHash, in); } checksum = digestToString(CodecUtil.retrieveChecksum(in)); @@ -573,6 +570,27 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex } } + /** + * Computes a strong hash value for small files. Note that this method should only be used for files < 1MB + */ + public static void hashFile(BytesRef fileHash, IndexInput in) throws IOException { + final int len = (int)Math.min(1024 * 1024, in.length()); // for safety we limit this to 1MB + fileHash.offset = 0; + fileHash.grow(len); + fileHash.length = len; + in.readBytes(fileHash.bytes, 0, len); + } + + /** + * Computes a strong hash value for small files. Note that this method should only be used for files < 1MB + */ + public static void hashFile(BytesRef fileHash, BytesRef source) throws IOException { + final int len = Math.min(1024 * 1024, source.length); // for safety we limit this to 1MB + fileHash.offset = 0; + fileHash.grow(len); + fileHash.length = len; + System.arraycopy(source.bytes, source.offset, fileHash.bytes, 0, len); + } @Override public Iterator iterator() { diff --git a/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java b/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java index efccea75b97..74a0ffd9f37 100644 --- a/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java +++ b/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java @@ -450,114 +450,6 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa return client().admin().cluster().prepareState().get().getState().nodes().masterNode().getVersion(); } - @Test - @TestLogging("index.snapshots:TRACE,index.shard.service:TRACE") - public void testSnapshotAndRestore() throws ExecutionException, InterruptedException, IOException { - logger.info("--> creating repository"); - assertAcked(client().admin().cluster().preparePutRepository("test-repo") - .setType("fs").setSettings(ImmutableSettings.settingsBuilder() - .put("location", newTempDir(LifecycleScope.SUITE).getAbsolutePath()) - .put("compress", randomBoolean()) - .put("chunk_size", randomIntBetween(100, 1000)))); - String[] indicesBefore = new String[randomIntBetween(2,5)]; - String[] indicesAfter = new String[randomIntBetween(2,5)]; - for (int i = 0; i < indicesBefore.length; i++) { - indicesBefore[i] = "index_before_" + i; - createIndex(indicesBefore[i]); - } - for (int i = 0; i < indicesAfter.length; i++) { - indicesAfter[i] = "index_after_" + i; - createIndex(indicesAfter[i]); - } - String[] indices = new String[indicesBefore.length + indicesAfter.length]; - System.arraycopy(indicesBefore, 0, indices, 0, indicesBefore.length); - System.arraycopy(indicesAfter, 0, indices, indicesBefore.length, indicesAfter.length); - ensureYellow(); - logger.info("--> indexing some data"); - IndexRequestBuilder[] buildersBefore = new IndexRequestBuilder[randomIntBetween(10, 200)]; - for (int i = 0; i < buildersBefore.length; i++) { - buildersBefore[i] = client().prepareIndex(RandomPicks.randomFrom(getRandom(), indicesBefore), "foo", Integer.toString(i)).setSource("{ \"foo\" : \"bar\" } "); - } - IndexRequestBuilder[] buildersAfter = new IndexRequestBuilder[randomIntBetween(10, 200)]; - for (int i = 0; i < buildersAfter.length; i++) { - buildersAfter[i] = client().prepareIndex(RandomPicks.randomFrom(getRandom(), indicesBefore), "bar", Integer.toString(i)).setSource("{ \"foo\" : \"bar\" } "); - } - indexRandom(true, buildersBefore); - indexRandom(true, buildersAfter); - assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length))); - long[] counts = new long[indices.length]; - for (int i = 0; i < indices.length; i++) { - counts[i] = client().prepareCount(indices[i]).get().getCount(); - } - - logger.info("--> snapshot subset of indices before upgrage"); - CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).setIndices("index_before_*").get(); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); - - assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-1").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); - - logger.info("--> delete some data from indices that were already snapshotted"); - int howMany = randomIntBetween(1, buildersBefore.length); - - for (int i = 0; i < howMany; i++) { - IndexRequestBuilder indexRequestBuilder = RandomPicks.randomFrom(getRandom(), buildersBefore); - IndexRequest request = indexRequestBuilder.request(); - client().prepareDelete(request.index(), request.type(), request.id()).get(); - } - refresh(); - final long numDocs = client().prepareCount(indices).get().getCount(); - assertThat(client().prepareCount(indices).get().getCount(), lessThan((long) (buildersBefore.length + buildersAfter.length))); - - - client().admin().indices().prepareUpdateSettings(indices).setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "none")).get(); - backwardsCluster().allowOnAllNodes(indices); - logClusterState(); - boolean upgraded; - do { - logClusterState(); - CountResponse countResponse = client().prepareCount().get(); - assertHitCount(countResponse, numDocs); - upgraded = backwardsCluster().upgradeOneNode(); - ensureYellow(); - countResponse = client().prepareCount().get(); - assertHitCount(countResponse, numDocs); - } while (upgraded); - client().admin().indices().prepareUpdateSettings(indices).setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all")).get(); - - logger.info("--> close indices"); - - client().admin().indices().prepareClose("index_before_*").get(); - - logger.info("--> restore all indices from the snapshot"); - RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).execute().actionGet(); - assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); - - ensureYellow(); - assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length))); - for (int i = 0; i < indices.length; i++) { - assertThat(counts[i], equalTo(client().prepareCount(indices[i]).get().getCount())); - } - - logger.info("--> snapshot subset of indices after upgrade"); - createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setIndices("index_*").get(); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); - - // Test restore after index deletion - logger.info("--> delete indices"); - String index = RandomPicks.randomFrom(getRandom(), indices); - cluster().wipeIndices(index); - logger.info("--> restore one index after deletion"); - restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setIndices(index).execute().actionGet(); - assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); - ensureYellow(); - assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length))); - for (int i = 0; i < indices.length; i++) { - assertThat(counts[i], equalTo(client().prepareCount(indices[i]).get().getCount())); - } - } - @Test public void testDeleteByQuery() throws ExecutionException, InterruptedException { createIndex("test"); diff --git a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java index fea86834105..4e6adce0b34 100644 --- a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java +++ b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java @@ -37,8 +37,10 @@ import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.count.CountResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.SnapshotMetaData; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; @@ -55,6 +57,8 @@ import org.junit.Test; import java.io.File; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static com.google.common.collect.Lists.newArrayList; @@ -1255,6 +1259,68 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests { logger.info("--> done"); } + public void testSnapshotMoreThanOnce() throws ExecutionException, InterruptedException { + Client client = client(); + + logger.info("--> creating repository"); + assertAcked(client.admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(ImmutableSettings.settingsBuilder() + .put("location", newTempDir(LifecycleScope.SUITE)) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000)))); + + // only one shard + assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1))); + ensureGreen(); + logger.info("--> indexing"); + + final int numdocs = randomIntBetween(10, 100); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test", "doc", Integer.toString(i)).setSource("foo", "bar" + i); + } + indexRandom(true, builders); + flushAndRefresh(); + assertNoFailures(client().admin().indices().prepareOptimize("test").setForce(true).setFlush(true).setWaitForMerge(true).setMaxNumSegments(1).get()); + + CreateSnapshotResponse createSnapshotResponseFirst = client.admin().cluster().prepareCreateSnapshot("test-repo", "test").setWaitForCompletion(true).setIndices("test").get(); + assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseFirst.getSnapshotInfo().totalShards())); + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + { + SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test").get().getSnapshots().get(0); + List shards = snapshotStatus.getShards(); + for (SnapshotIndexShardStatus status : shards) { + assertThat(status.getStats().getProcessedFiles(), greaterThan(1)); + } + } + + CreateSnapshotResponse createSnapshotResponseSecond = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-1").setWaitForCompletion(true).setIndices("test").get(); + assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseSecond.getSnapshotInfo().totalShards())); + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-1").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + { + SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-1").get().getSnapshots().get(0); + List shards = snapshotStatus.getShards(); + for (SnapshotIndexShardStatus status : shards) { + assertThat(status.getStats().getProcessedFiles(), equalTo(1)); // we flush before the snapshot such that we have to process the segments_N files + } + } + + client().prepareDelete("test", "doc", "1").get(); + CreateSnapshotResponse createSnapshotResponseThird = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-2").setWaitForCompletion(true).setIndices("test").get(); + assertThat(createSnapshotResponseThird.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponseThird.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseThird.getSnapshotInfo().totalShards())); + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-2").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + { + SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-2").get().getSnapshots().get(0); + List shards = snapshotStatus.getShards(); + for (SnapshotIndexShardStatus status : shards) { + assertThat(status.getStats().getProcessedFiles(), equalTo(2)); // we flush before the snapshot such that we have to process the segments_N files plus the .del file + } + } + } + private boolean waitForIndex(final String index, TimeValue timeout) throws InterruptedException { return awaitBusy(new Predicate() { @Override diff --git a/src/test/java/org/elasticsearch/snapshots/SnapshotBackwardsCompatibilityTest.java b/src/test/java/org/elasticsearch/snapshots/SnapshotBackwardsCompatibilityTest.java new file mode 100644 index 00000000000..6ed8e616164 --- /dev/null +++ b/src/test/java/org/elasticsearch/snapshots/SnapshotBackwardsCompatibilityTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.snapshots; + +import com.carrotsearch.randomizedtesting.LifecycleScope; +import com.carrotsearch.randomizedtesting.generators.RandomPicks; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; +import org.elasticsearch.action.count.CountResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; + +public class SnapshotBackwardsCompatibilityTest extends ElasticsearchBackwardsCompatIntegrationTest { + + @Test + public void testSnapshotAndRestore() throws ExecutionException, InterruptedException, IOException { + logger.info("--> creating repository"); + assertAcked(client().admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(ImmutableSettings.settingsBuilder() + .put("location", newTempDir(LifecycleScope.SUITE).getAbsolutePath()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000)))); + String[] indicesBefore = new String[randomIntBetween(2,5)]; + String[] indicesAfter = new String[randomIntBetween(2,5)]; + for (int i = 0; i < indicesBefore.length; i++) { + indicesBefore[i] = "index_before_" + i; + createIndex(indicesBefore[i]); + } + for (int i = 0; i < indicesAfter.length; i++) { + indicesAfter[i] = "index_after_" + i; + createIndex(indicesAfter[i]); + } + String[] indices = new String[indicesBefore.length + indicesAfter.length]; + System.arraycopy(indicesBefore, 0, indices, 0, indicesBefore.length); + System.arraycopy(indicesAfter, 0, indices, indicesBefore.length, indicesAfter.length); + ensureYellow(); + logger.info("--> indexing some data"); + IndexRequestBuilder[] buildersBefore = new IndexRequestBuilder[randomIntBetween(10, 200)]; + for (int i = 0; i < buildersBefore.length; i++) { + buildersBefore[i] = client().prepareIndex(RandomPicks.randomFrom(getRandom(), indicesBefore), "foo", Integer.toString(i)).setSource("{ \"foo\" : \"bar\" } "); + } + IndexRequestBuilder[] buildersAfter = new IndexRequestBuilder[randomIntBetween(10, 200)]; + for (int i = 0; i < buildersAfter.length; i++) { + buildersAfter[i] = client().prepareIndex(RandomPicks.randomFrom(getRandom(), indicesBefore), "bar", Integer.toString(i)).setSource("{ \"foo\" : \"bar\" } "); + } + indexRandom(true, buildersBefore); + indexRandom(true, buildersAfter); + assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length))); + long[] counts = new long[indices.length]; + for (int i = 0; i < indices.length; i++) { + counts[i] = client().prepareCount(indices[i]).get().getCount(); + } + + logger.info("--> snapshot subset of indices before upgrage"); + CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).setIndices("index_before_*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-1").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + + logger.info("--> delete some data from indices that were already snapshotted"); + int howMany = randomIntBetween(1, buildersBefore.length); + + for (int i = 0; i < howMany; i++) { + IndexRequestBuilder indexRequestBuilder = RandomPicks.randomFrom(getRandom(), buildersBefore); + IndexRequest request = indexRequestBuilder.request(); + client().prepareDelete(request.index(), request.type(), request.id()).get(); + } + refresh(); + final long numDocs = client().prepareCount(indices).get().getCount(); + assertThat(client().prepareCount(indices).get().getCount(), lessThan((long) (buildersBefore.length + buildersAfter.length))); + + + client().admin().indices().prepareUpdateSettings(indices).setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "none")).get(); + backwardsCluster().allowOnAllNodes(indices); + logClusterState(); + boolean upgraded; + do { + logClusterState(); + CountResponse countResponse = client().prepareCount().get(); + assertHitCount(countResponse, numDocs); + upgraded = backwardsCluster().upgradeOneNode(); + ensureYellow(); + countResponse = client().prepareCount().get(); + assertHitCount(countResponse, numDocs); + } while (upgraded); + client().admin().indices().prepareUpdateSettings(indices).setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all")).get(); + + logger.info("--> close indices"); + + client().admin().indices().prepareClose("index_before_*").get(); + + logger.info("--> restore all indices from the snapshot"); + RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + + ensureYellow(); + assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length))); + for (int i = 0; i < indices.length; i++) { + assertThat(counts[i], equalTo(client().prepareCount(indices[i]).get().getCount())); + } + + logger.info("--> snapshot subset of indices after upgrade"); + createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setIndices("index_*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + // Test restore after index deletion + logger.info("--> delete indices"); + String index = RandomPicks.randomFrom(getRandom(), indices); + cluster().wipeIndices(index); + logger.info("--> restore one index after deletion"); + restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setIndices(index).execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + ensureYellow(); + assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length))); + for (int i = 0; i < indices.length; i++) { + assertThat(counts[i], equalTo(client().prepareCount(indices[i]).get().getCount())); + } + } + + public void testSnapshotMoreThanOnce() throws ExecutionException, InterruptedException, IOException { + Client client = client(); + + logger.info("--> creating repository"); + assertAcked(client.admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(ImmutableSettings.settingsBuilder() + .put("location", newTempDir(LifecycleScope.SUITE).getAbsoluteFile()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000)))); + + // only one shard + assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + )); + ensureYellow(); + logger.info("--> indexing"); + + final int numDocs = randomIntBetween(10, 100); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test", "doc", Integer.toString(i)).setSource("foo", "bar" + i); + } + indexRandom(true, builders); + flushAndRefresh(); + assertNoFailures(client().admin().indices().prepareOptimize("test").setForce(true).setFlush(true).setWaitForMerge(true).setMaxNumSegments(1).get()); + + CreateSnapshotResponse createSnapshotResponseFirst = client.admin().cluster().prepareCreateSnapshot("test-repo", "test").setWaitForCompletion(true).setIndices("test").get(); + assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseFirst.getSnapshotInfo().totalShards())); + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + { + SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test").get().getSnapshots().get(0); + List shards = snapshotStatus.getShards(); + for (SnapshotIndexShardStatus status : shards) { + assertThat(status.getStats().getProcessedFiles(), greaterThan(1)); + } + } + if (frequently()) { + logger.info("--> upgrade"); + client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "none")).get(); + backwardsCluster().allowOnAllNodes("test"); + logClusterState(); + boolean upgraded; + do { + logClusterState(); + CountResponse countResponse = client().prepareCount().get(); + assertHitCount(countResponse, numDocs); + upgraded = backwardsCluster().upgradeOneNode(); + ensureYellow(); + countResponse = client().prepareCount().get(); + assertHitCount(countResponse, numDocs); + } while (upgraded); + client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all")).get(); + } + if (randomBoolean()) { + client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1,2))).get(); + } + + CreateSnapshotResponse createSnapshotResponseSecond = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-1").setWaitForCompletion(true).setIndices("test").get(); + assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseSecond.getSnapshotInfo().totalShards())); + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-1").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + { + SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-1").get().getSnapshots().get(0); + List shards = snapshotStatus.getShards(); + for (SnapshotIndexShardStatus status : shards) { + assertThat(status.getStats().getProcessedFiles(), equalTo(1)); // we flush before the snapshot such that we have to process the segments_N files + } + } + + client().prepareDelete("test", "doc", "1").get(); + CreateSnapshotResponse createSnapshotResponseThird = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-2").setWaitForCompletion(true).setIndices("test").get(); + assertThat(createSnapshotResponseThird.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponseThird.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseThird.getSnapshotInfo().totalShards())); + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-2").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + { + SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-2").get().getSnapshots().get(0); + List shards = snapshotStatus.getShards(); + for (SnapshotIndexShardStatus status : shards) { + assertThat(status.getStats().getProcessedFiles(), equalTo(2)); // we flush before the snapshot such that we have to process the segments_N files plus the .del file + } + } + } +}