diff --git a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java index 9b17068c921..e8845eb2d98 100644 --- a/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java +++ b/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardRepository.java @@ -24,6 +24,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.store.*; +import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.IOUtils; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.metadata.SnapshotId; @@ -422,6 +423,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements long indexTotalFilesSize = 0; ArrayList filesToSnapshot = newArrayList(); final Store.MetadataSnapshot metadata; + // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should try { metadata = store.getMetadata(snapshotIndexCommit); } catch (IOException e) { @@ -436,7 +438,15 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements final StoreFileMetaData md = metadata.get(fileName); boolean snapshotRequired = false; BlobStoreIndexShardSnapshot.FileInfo fileInfo = snapshots.findPhysicalIndexFile(fileName); - + try { + // in 1.4.0 we added additional hashes for .si / segments_N files + // to ensure we don't double the space in the repo since old snapshots + // don't have this hash we try to read that hash from the blob store + // in a bwc compatible way. + maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata); + } catch (Throwable e) { + logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); + } if (fileInfo == null || !fileInfo.isSame(md) || !snapshotFileExistsInBlobs(fileInfo, blobs)) { // commit point file does not exists in any commit point, or has different length, or does not fully exists in the listed blobs snapshotRequired = true; @@ -677,6 +687,25 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements } } + /** + * This is a BWC layer to ensure we update the snapshots metdata with the corresponding hashes before we compare them. + * The new logic for StoreFileMetaData reads the entire .si and segments.n files to strengthen the + * comparison of the files on a per-segment / per-commit level. + */ + private static final void maybeRecalculateMetadataHash(ImmutableBlobContainer blobContainer, FileInfo fileInfo, Store.MetadataSnapshot snapshot) throws IOException { + final StoreFileMetaData metadata; + if (fileInfo != null && (metadata = snapshot.get(fileInfo.name())) != null) { + if (metadata.hash().length > 0 && fileInfo.metadata().hash().length == 0) { + // we have a hash - check if our repo has a hash too otherwise we have + // to calculate it. + byte[] bytes = blobContainer.readBlobFully(fileInfo.physicalName()); + final BytesRef spare = new BytesRef(bytes); + Store.MetadataSnapshot.hashFile(fileInfo.metadata().hash(), spare); + } + } + + } + /** * Context for restore operations */ @@ -728,8 +757,17 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements final List filesToRecover = Lists.newArrayList(); final Map snapshotMetaData = new HashMap<>(); final Map fileInfos = new HashMap<>(); - for (final FileInfo fileInfo : snapshot.indexFiles()) { + try { + // in 1.4.0 we added additional hashes for .si / segments_N files + // to ensure we don't double the space in the repo since old snapshots + // don't have this hash we try to read that hash from the blob store + // in a bwc compatible way. + maybeRecalculateMetadataHash(blobContainer, fileInfo, recoveryTargetMetadata); + } catch (Throwable e) { + // if the index is broken we might not be able to read it + logger.warn("{} Can't calculate hash from blog for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata()); + } snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata()); fileInfos.put(fileInfo.metadata().name(), fileInfo); } diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index 9fac24e0fc0..9867dae0565 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -550,7 +550,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex private static void checksumFromLuceneFile(Directory directory, String file, ImmutableMap.Builder builder, ESLogger logger, Version version, boolean readFileAsHash) throws IOException { final String checksum; - BytesRef fileHash = new BytesRef(); + final BytesRef fileHash = new BytesRef(); try (IndexInput in = directory.openInput(file, IOContext.READONCE)) { try { if (in.length() < CodecUtil.footerLength()) { @@ -558,10 +558,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex throw new CorruptIndexException("Can't retrieve checksum from file: " + file + " file length must be >= " + CodecUtil.footerLength() + " but was: " + in.length()); } if (readFileAsHash) { - final int len = (int)Math.min(1024 * 1024, in.length()); // for safety we limit this to 1MB - fileHash.bytes = new byte[len]; - in.readBytes(fileHash.bytes, 0, len); - fileHash.length = len; + hashFile(fileHash, in); } checksum = digestToString(CodecUtil.retrieveChecksum(in)); @@ -573,6 +570,27 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex } } + /** + * Computes a strong hash value for small files. Note that this method should only be used for files < 1MB + */ + public static void hashFile(BytesRef fileHash, IndexInput in) throws IOException { + final int len = (int)Math.min(1024 * 1024, in.length()); // for safety we limit this to 1MB + fileHash.offset = 0; + fileHash.grow(len); + fileHash.length = len; + in.readBytes(fileHash.bytes, 0, len); + } + + /** + * Computes a strong hash value for small files. Note that this method should only be used for files < 1MB + */ + public static void hashFile(BytesRef fileHash, BytesRef source) throws IOException { + final int len = Math.min(1024 * 1024, source.length); // for safety we limit this to 1MB + fileHash.offset = 0; + fileHash.grow(len); + fileHash.length = len; + System.arraycopy(source.bytes, source.offset, fileHash.bytes, 0, len); + } @Override public Iterator iterator() { diff --git a/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java b/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java index efccea75b97..74a0ffd9f37 100644 --- a/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java +++ b/src/test/java/org/elasticsearch/bwcompat/BasicBackwardsCompatibilityTest.java @@ -450,114 +450,6 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa return client().admin().cluster().prepareState().get().getState().nodes().masterNode().getVersion(); } - @Test - @TestLogging("index.snapshots:TRACE,index.shard.service:TRACE") - public void testSnapshotAndRestore() throws ExecutionException, InterruptedException, IOException { - logger.info("--> creating repository"); - assertAcked(client().admin().cluster().preparePutRepository("test-repo") - .setType("fs").setSettings(ImmutableSettings.settingsBuilder() - .put("location", newTempDir(LifecycleScope.SUITE).getAbsolutePath()) - .put("compress", randomBoolean()) - .put("chunk_size", randomIntBetween(100, 1000)))); - String[] indicesBefore = new String[randomIntBetween(2,5)]; - String[] indicesAfter = new String[randomIntBetween(2,5)]; - for (int i = 0; i < indicesBefore.length; i++) { - indicesBefore[i] = "index_before_" + i; - createIndex(indicesBefore[i]); - } - for (int i = 0; i < indicesAfter.length; i++) { - indicesAfter[i] = "index_after_" + i; - createIndex(indicesAfter[i]); - } - String[] indices = new String[indicesBefore.length + indicesAfter.length]; - System.arraycopy(indicesBefore, 0, indices, 0, indicesBefore.length); - System.arraycopy(indicesAfter, 0, indices, indicesBefore.length, indicesAfter.length); - ensureYellow(); - logger.info("--> indexing some data"); - IndexRequestBuilder[] buildersBefore = new IndexRequestBuilder[randomIntBetween(10, 200)]; - for (int i = 0; i < buildersBefore.length; i++) { - buildersBefore[i] = client().prepareIndex(RandomPicks.randomFrom(getRandom(), indicesBefore), "foo", Integer.toString(i)).setSource("{ \"foo\" : \"bar\" } "); - } - IndexRequestBuilder[] buildersAfter = new IndexRequestBuilder[randomIntBetween(10, 200)]; - for (int i = 0; i < buildersAfter.length; i++) { - buildersAfter[i] = client().prepareIndex(RandomPicks.randomFrom(getRandom(), indicesBefore), "bar", Integer.toString(i)).setSource("{ \"foo\" : \"bar\" } "); - } - indexRandom(true, buildersBefore); - indexRandom(true, buildersAfter); - assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length))); - long[] counts = new long[indices.length]; - for (int i = 0; i < indices.length; i++) { - counts[i] = client().prepareCount(indices[i]).get().getCount(); - } - - logger.info("--> snapshot subset of indices before upgrage"); - CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).setIndices("index_before_*").get(); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); - - assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-1").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); - - logger.info("--> delete some data from indices that were already snapshotted"); - int howMany = randomIntBetween(1, buildersBefore.length); - - for (int i = 0; i < howMany; i++) { - IndexRequestBuilder indexRequestBuilder = RandomPicks.randomFrom(getRandom(), buildersBefore); - IndexRequest request = indexRequestBuilder.request(); - client().prepareDelete(request.index(), request.type(), request.id()).get(); - } - refresh(); - final long numDocs = client().prepareCount(indices).get().getCount(); - assertThat(client().prepareCount(indices).get().getCount(), lessThan((long) (buildersBefore.length + buildersAfter.length))); - - - client().admin().indices().prepareUpdateSettings(indices).setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "none")).get(); - backwardsCluster().allowOnAllNodes(indices); - logClusterState(); - boolean upgraded; - do { - logClusterState(); - CountResponse countResponse = client().prepareCount().get(); - assertHitCount(countResponse, numDocs); - upgraded = backwardsCluster().upgradeOneNode(); - ensureYellow(); - countResponse = client().prepareCount().get(); - assertHitCount(countResponse, numDocs); - } while (upgraded); - client().admin().indices().prepareUpdateSettings(indices).setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all")).get(); - - logger.info("--> close indices"); - - client().admin().indices().prepareClose("index_before_*").get(); - - logger.info("--> restore all indices from the snapshot"); - RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).execute().actionGet(); - assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); - - ensureYellow(); - assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length))); - for (int i = 0; i < indices.length; i++) { - assertThat(counts[i], equalTo(client().prepareCount(indices[i]).get().getCount())); - } - - logger.info("--> snapshot subset of indices after upgrade"); - createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setIndices("index_*").get(); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); - - // Test restore after index deletion - logger.info("--> delete indices"); - String index = RandomPicks.randomFrom(getRandom(), indices); - cluster().wipeIndices(index); - logger.info("--> restore one index after deletion"); - restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setIndices(index).execute().actionGet(); - assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); - ensureYellow(); - assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length))); - for (int i = 0; i < indices.length; i++) { - assertThat(counts[i], equalTo(client().prepareCount(indices[i]).get().getCount())); - } - } - @Test public void testDeleteByQuery() throws ExecutionException, InterruptedException { createIndex("test"); diff --git a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java index fea86834105..4e6adce0b34 100644 --- a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java +++ b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java @@ -37,8 +37,10 @@ import org.elasticsearch.action.admin.indices.flush.FlushResponse; import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse; import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse; import org.elasticsearch.action.count.CountResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.SnapshotMetaData; import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider; @@ -55,6 +57,8 @@ import org.junit.Test; import java.io.File; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static com.google.common.collect.Lists.newArrayList; @@ -1255,6 +1259,68 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests { logger.info("--> done"); } + public void testSnapshotMoreThanOnce() throws ExecutionException, InterruptedException { + Client client = client(); + + logger.info("--> creating repository"); + assertAcked(client.admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(ImmutableSettings.settingsBuilder() + .put("location", newTempDir(LifecycleScope.SUITE)) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000)))); + + // only one shard + assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1))); + ensureGreen(); + logger.info("--> indexing"); + + final int numdocs = randomIntBetween(10, 100); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test", "doc", Integer.toString(i)).setSource("foo", "bar" + i); + } + indexRandom(true, builders); + flushAndRefresh(); + assertNoFailures(client().admin().indices().prepareOptimize("test").setForce(true).setFlush(true).setWaitForMerge(true).setMaxNumSegments(1).get()); + + CreateSnapshotResponse createSnapshotResponseFirst = client.admin().cluster().prepareCreateSnapshot("test-repo", "test").setWaitForCompletion(true).setIndices("test").get(); + assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseFirst.getSnapshotInfo().totalShards())); + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + { + SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test").get().getSnapshots().get(0); + List shards = snapshotStatus.getShards(); + for (SnapshotIndexShardStatus status : shards) { + assertThat(status.getStats().getProcessedFiles(), greaterThan(1)); + } + } + + CreateSnapshotResponse createSnapshotResponseSecond = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-1").setWaitForCompletion(true).setIndices("test").get(); + assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseSecond.getSnapshotInfo().totalShards())); + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-1").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + { + SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-1").get().getSnapshots().get(0); + List shards = snapshotStatus.getShards(); + for (SnapshotIndexShardStatus status : shards) { + assertThat(status.getStats().getProcessedFiles(), equalTo(1)); // we flush before the snapshot such that we have to process the segments_N files + } + } + + client().prepareDelete("test", "doc", "1").get(); + CreateSnapshotResponse createSnapshotResponseThird = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-2").setWaitForCompletion(true).setIndices("test").get(); + assertThat(createSnapshotResponseThird.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponseThird.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseThird.getSnapshotInfo().totalShards())); + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-2").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + { + SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-2").get().getSnapshots().get(0); + List shards = snapshotStatus.getShards(); + for (SnapshotIndexShardStatus status : shards) { + assertThat(status.getStats().getProcessedFiles(), equalTo(2)); // we flush before the snapshot such that we have to process the segments_N files plus the .del file + } + } + } + private boolean waitForIndex(final String index, TimeValue timeout) throws InterruptedException { return awaitBusy(new Predicate() { @Override diff --git a/src/test/java/org/elasticsearch/snapshots/SnapshotBackwardsCompatibilityTest.java b/src/test/java/org/elasticsearch/snapshots/SnapshotBackwardsCompatibilityTest.java new file mode 100644 index 00000000000..6ed8e616164 --- /dev/null +++ b/src/test/java/org/elasticsearch/snapshots/SnapshotBackwardsCompatibilityTest.java @@ -0,0 +1,242 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.snapshots; + +import com.carrotsearch.randomizedtesting.LifecycleScope; +import com.carrotsearch.randomizedtesting.generators.RandomPicks; +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; +import org.elasticsearch.action.count.CountResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.ExecutionException; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.lessThan; + +public class SnapshotBackwardsCompatibilityTest extends ElasticsearchBackwardsCompatIntegrationTest { + + @Test + public void testSnapshotAndRestore() throws ExecutionException, InterruptedException, IOException { + logger.info("--> creating repository"); + assertAcked(client().admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(ImmutableSettings.settingsBuilder() + .put("location", newTempDir(LifecycleScope.SUITE).getAbsolutePath()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000)))); + String[] indicesBefore = new String[randomIntBetween(2,5)]; + String[] indicesAfter = new String[randomIntBetween(2,5)]; + for (int i = 0; i < indicesBefore.length; i++) { + indicesBefore[i] = "index_before_" + i; + createIndex(indicesBefore[i]); + } + for (int i = 0; i < indicesAfter.length; i++) { + indicesAfter[i] = "index_after_" + i; + createIndex(indicesAfter[i]); + } + String[] indices = new String[indicesBefore.length + indicesAfter.length]; + System.arraycopy(indicesBefore, 0, indices, 0, indicesBefore.length); + System.arraycopy(indicesAfter, 0, indices, indicesBefore.length, indicesAfter.length); + ensureYellow(); + logger.info("--> indexing some data"); + IndexRequestBuilder[] buildersBefore = new IndexRequestBuilder[randomIntBetween(10, 200)]; + for (int i = 0; i < buildersBefore.length; i++) { + buildersBefore[i] = client().prepareIndex(RandomPicks.randomFrom(getRandom(), indicesBefore), "foo", Integer.toString(i)).setSource("{ \"foo\" : \"bar\" } "); + } + IndexRequestBuilder[] buildersAfter = new IndexRequestBuilder[randomIntBetween(10, 200)]; + for (int i = 0; i < buildersAfter.length; i++) { + buildersAfter[i] = client().prepareIndex(RandomPicks.randomFrom(getRandom(), indicesBefore), "bar", Integer.toString(i)).setSource("{ \"foo\" : \"bar\" } "); + } + indexRandom(true, buildersBefore); + indexRandom(true, buildersAfter); + assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length))); + long[] counts = new long[indices.length]; + for (int i = 0; i < indices.length; i++) { + counts[i] = client().prepareCount(indices[i]).get().getCount(); + } + + logger.info("--> snapshot subset of indices before upgrage"); + CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).setIndices("index_before_*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-1").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + + logger.info("--> delete some data from indices that were already snapshotted"); + int howMany = randomIntBetween(1, buildersBefore.length); + + for (int i = 0; i < howMany; i++) { + IndexRequestBuilder indexRequestBuilder = RandomPicks.randomFrom(getRandom(), buildersBefore); + IndexRequest request = indexRequestBuilder.request(); + client().prepareDelete(request.index(), request.type(), request.id()).get(); + } + refresh(); + final long numDocs = client().prepareCount(indices).get().getCount(); + assertThat(client().prepareCount(indices).get().getCount(), lessThan((long) (buildersBefore.length + buildersAfter.length))); + + + client().admin().indices().prepareUpdateSettings(indices).setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "none")).get(); + backwardsCluster().allowOnAllNodes(indices); + logClusterState(); + boolean upgraded; + do { + logClusterState(); + CountResponse countResponse = client().prepareCount().get(); + assertHitCount(countResponse, numDocs); + upgraded = backwardsCluster().upgradeOneNode(); + ensureYellow(); + countResponse = client().prepareCount().get(); + assertHitCount(countResponse, numDocs); + } while (upgraded); + client().admin().indices().prepareUpdateSettings(indices).setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all")).get(); + + logger.info("--> close indices"); + + client().admin().indices().prepareClose("index_before_*").get(); + + logger.info("--> restore all indices from the snapshot"); + RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + + ensureYellow(); + assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length))); + for (int i = 0; i < indices.length; i++) { + assertThat(counts[i], equalTo(client().prepareCount(indices[i]).get().getCount())); + } + + logger.info("--> snapshot subset of indices after upgrade"); + createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setIndices("index_*").get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + // Test restore after index deletion + logger.info("--> delete indices"); + String index = RandomPicks.randomFrom(getRandom(), indices); + cluster().wipeIndices(index); + logger.info("--> restore one index after deletion"); + restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setIndices(index).execute().actionGet(); + assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); + ensureYellow(); + assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length))); + for (int i = 0; i < indices.length; i++) { + assertThat(counts[i], equalTo(client().prepareCount(indices[i]).get().getCount())); + } + } + + public void testSnapshotMoreThanOnce() throws ExecutionException, InterruptedException, IOException { + Client client = client(); + + logger.info("--> creating repository"); + assertAcked(client.admin().cluster().preparePutRepository("test-repo") + .setType("fs").setSettings(ImmutableSettings.settingsBuilder() + .put("location", newTempDir(LifecycleScope.SUITE).getAbsoluteFile()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000)))); + + // only one shard + assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + )); + ensureYellow(); + logger.info("--> indexing"); + + final int numDocs = randomIntBetween(10, 100); + IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; + for (int i = 0; i < builders.length; i++) { + builders[i] = client().prepareIndex("test", "doc", Integer.toString(i)).setSource("foo", "bar" + i); + } + indexRandom(true, builders); + flushAndRefresh(); + assertNoFailures(client().admin().indices().prepareOptimize("test").setForce(true).setFlush(true).setWaitForMerge(true).setMaxNumSegments(1).get()); + + CreateSnapshotResponse createSnapshotResponseFirst = client.admin().cluster().prepareCreateSnapshot("test-repo", "test").setWaitForCompletion(true).setIndices("test").get(); + assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseFirst.getSnapshotInfo().totalShards())); + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + { + SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test").get().getSnapshots().get(0); + List shards = snapshotStatus.getShards(); + for (SnapshotIndexShardStatus status : shards) { + assertThat(status.getStats().getProcessedFiles(), greaterThan(1)); + } + } + if (frequently()) { + logger.info("--> upgrade"); + client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "none")).get(); + backwardsCluster().allowOnAllNodes("test"); + logClusterState(); + boolean upgraded; + do { + logClusterState(); + CountResponse countResponse = client().prepareCount().get(); + assertHitCount(countResponse, numDocs); + upgraded = backwardsCluster().upgradeOneNode(); + ensureYellow(); + countResponse = client().prepareCount().get(); + assertHitCount(countResponse, numDocs); + } while (upgraded); + client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all")).get(); + } + if (randomBoolean()) { + client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1,2))).get(); + } + + CreateSnapshotResponse createSnapshotResponseSecond = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-1").setWaitForCompletion(true).setIndices("test").get(); + assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseSecond.getSnapshotInfo().totalShards())); + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-1").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + { + SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-1").get().getSnapshots().get(0); + List shards = snapshotStatus.getShards(); + for (SnapshotIndexShardStatus status : shards) { + assertThat(status.getStats().getProcessedFiles(), equalTo(1)); // we flush before the snapshot such that we have to process the segments_N files + } + } + + client().prepareDelete("test", "doc", "1").get(); + CreateSnapshotResponse createSnapshotResponseThird = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-2").setWaitForCompletion(true).setIndices("test").get(); + assertThat(createSnapshotResponseThird.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponseThird.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseThird.getSnapshotInfo().totalShards())); + assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-2").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS)); + { + SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-2").get().getSnapshots().get(0); + List shards = snapshotStatus.getShards(); + for (SnapshotIndexShardStatus status : shards) { + assertThat(status.getStats().getProcessedFiles(), equalTo(2)); // we flush before the snapshot such that we have to process the segments_N files plus the .del file + } + } + } +}