[SNAPSHOT] Add BWC layer to .si / segments_N hashing
Due to additional safety added in #7351 we compute now a strong hash for .si and segments_N files which are compared during snapshot / restore. Old snapshots don't have this hash which can cause unnecessary copying of large amount of data. This commit adds the ability to fetch this hash from the blob store if needed. Closes #7434
This commit is contained in:
parent
10197936da
commit
c63626b537
|
@ -24,6 +24,7 @@ import com.google.common.collect.Iterables;
|
|||
import com.google.common.collect.Lists;
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.apache.lucene.store.*;
|
||||
import org.apache.lucene.util.BytesRef;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.cluster.metadata.SnapshotId;
|
||||
|
@ -422,6 +423,7 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
long indexTotalFilesSize = 0;
|
||||
ArrayList<FileInfo> filesToSnapshot = newArrayList();
|
||||
final Store.MetadataSnapshot metadata;
|
||||
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
|
||||
try {
|
||||
metadata = store.getMetadata(snapshotIndexCommit);
|
||||
} catch (IOException e) {
|
||||
|
@ -436,7 +438,15 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
final StoreFileMetaData md = metadata.get(fileName);
|
||||
boolean snapshotRequired = false;
|
||||
BlobStoreIndexShardSnapshot.FileInfo fileInfo = snapshots.findPhysicalIndexFile(fileName);
|
||||
|
||||
try {
|
||||
// in 1.4.0 we added additional hashes for .si / segments_N files
|
||||
// to ensure we don't double the space in the repo since old snapshots
|
||||
// don't have this hash we try to read that hash from the blob store
|
||||
// in a bwc compatible way.
|
||||
maybeRecalculateMetadataHash(blobContainer, fileInfo, metadata);
|
||||
} catch (Throwable e) {
|
||||
logger.warn("{} Can't calculate hash from blob for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata());
|
||||
}
|
||||
if (fileInfo == null || !fileInfo.isSame(md) || !snapshotFileExistsInBlobs(fileInfo, blobs)) {
|
||||
// commit point file does not exists in any commit point, or has different length, or does not fully exists in the listed blobs
|
||||
snapshotRequired = true;
|
||||
|
@ -677,6 +687,25 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This is a BWC layer to ensure we update the snapshots metdata with the corresponding hashes before we compare them.
|
||||
* The new logic for StoreFileMetaData reads the entire <tt>.si</tt> and <tt>segments.n</tt> files to strengthen the
|
||||
* comparison of the files on a per-segment / per-commit level.
|
||||
*/
|
||||
private static final void maybeRecalculateMetadataHash(ImmutableBlobContainer blobContainer, FileInfo fileInfo, Store.MetadataSnapshot snapshot) throws IOException {
|
||||
final StoreFileMetaData metadata;
|
||||
if (fileInfo != null && (metadata = snapshot.get(fileInfo.name())) != null) {
|
||||
if (metadata.hash().length > 0 && fileInfo.metadata().hash().length == 0) {
|
||||
// we have a hash - check if our repo has a hash too otherwise we have
|
||||
// to calculate it.
|
||||
byte[] bytes = blobContainer.readBlobFully(fileInfo.physicalName());
|
||||
final BytesRef spare = new BytesRef(bytes);
|
||||
Store.MetadataSnapshot.hashFile(fileInfo.metadata().hash(), spare);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Context for restore operations
|
||||
*/
|
||||
|
@ -728,8 +757,17 @@ public class BlobStoreIndexShardRepository extends AbstractComponent implements
|
|||
final List<FileInfo> filesToRecover = Lists.newArrayList();
|
||||
final Map<String, StoreFileMetaData> snapshotMetaData = new HashMap<>();
|
||||
final Map<String, FileInfo> fileInfos = new HashMap<>();
|
||||
|
||||
for (final FileInfo fileInfo : snapshot.indexFiles()) {
|
||||
try {
|
||||
// in 1.4.0 we added additional hashes for .si / segments_N files
|
||||
// to ensure we don't double the space in the repo since old snapshots
|
||||
// don't have this hash we try to read that hash from the blob store
|
||||
// in a bwc compatible way.
|
||||
maybeRecalculateMetadataHash(blobContainer, fileInfo, recoveryTargetMetadata);
|
||||
} catch (Throwable e) {
|
||||
// if the index is broken we might not be able to read it
|
||||
logger.warn("{} Can't calculate hash from blog for file [{}] [{}]", e, shardId, fileInfo.physicalName(), fileInfo.metadata());
|
||||
}
|
||||
snapshotMetaData.put(fileInfo.metadata().name(), fileInfo.metadata());
|
||||
fileInfos.put(fileInfo.metadata().name(), fileInfo);
|
||||
}
|
||||
|
|
|
@ -550,7 +550,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
|
|||
|
||||
private static void checksumFromLuceneFile(Directory directory, String file, ImmutableMap.Builder<String, StoreFileMetaData> builder, ESLogger logger, Version version, boolean readFileAsHash) throws IOException {
|
||||
final String checksum;
|
||||
BytesRef fileHash = new BytesRef();
|
||||
final BytesRef fileHash = new BytesRef();
|
||||
try (IndexInput in = directory.openInput(file, IOContext.READONCE)) {
|
||||
try {
|
||||
if (in.length() < CodecUtil.footerLength()) {
|
||||
|
@ -558,10 +558,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
|
|||
throw new CorruptIndexException("Can't retrieve checksum from file: " + file + " file length must be >= " + CodecUtil.footerLength() + " but was: " + in.length());
|
||||
}
|
||||
if (readFileAsHash) {
|
||||
final int len = (int)Math.min(1024 * 1024, in.length()); // for safety we limit this to 1MB
|
||||
fileHash.bytes = new byte[len];
|
||||
in.readBytes(fileHash.bytes, 0, len);
|
||||
fileHash.length = len;
|
||||
hashFile(fileHash, in);
|
||||
}
|
||||
checksum = digestToString(CodecUtil.retrieveChecksum(in));
|
||||
|
||||
|
@ -573,6 +570,27 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes a strong hash value for small files. Note that this method should only be used for files < 1MB
|
||||
*/
|
||||
public static void hashFile(BytesRef fileHash, IndexInput in) throws IOException {
|
||||
final int len = (int)Math.min(1024 * 1024, in.length()); // for safety we limit this to 1MB
|
||||
fileHash.offset = 0;
|
||||
fileHash.grow(len);
|
||||
fileHash.length = len;
|
||||
in.readBytes(fileHash.bytes, 0, len);
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes a strong hash value for small files. Note that this method should only be used for files < 1MB
|
||||
*/
|
||||
public static void hashFile(BytesRef fileHash, BytesRef source) throws IOException {
|
||||
final int len = Math.min(1024 * 1024, source.length); // for safety we limit this to 1MB
|
||||
fileHash.offset = 0;
|
||||
fileHash.grow(len);
|
||||
fileHash.length = len;
|
||||
System.arraycopy(source.bytes, source.offset, fileHash.bytes, 0, len);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Iterator<StoreFileMetaData> iterator() {
|
||||
|
|
|
@ -450,114 +450,6 @@ public class BasicBackwardsCompatibilityTest extends ElasticsearchBackwardsCompa
|
|||
return client().admin().cluster().prepareState().get().getState().nodes().masterNode().getVersion();
|
||||
}
|
||||
|
||||
@Test
|
||||
@TestLogging("index.snapshots:TRACE,index.shard.service:TRACE")
|
||||
public void testSnapshotAndRestore() throws ExecutionException, InterruptedException, IOException {
|
||||
logger.info("--> creating repository");
|
||||
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
|
||||
.put("location", newTempDir(LifecycleScope.SUITE).getAbsolutePath())
|
||||
.put("compress", randomBoolean())
|
||||
.put("chunk_size", randomIntBetween(100, 1000))));
|
||||
String[] indicesBefore = new String[randomIntBetween(2,5)];
|
||||
String[] indicesAfter = new String[randomIntBetween(2,5)];
|
||||
for (int i = 0; i < indicesBefore.length; i++) {
|
||||
indicesBefore[i] = "index_before_" + i;
|
||||
createIndex(indicesBefore[i]);
|
||||
}
|
||||
for (int i = 0; i < indicesAfter.length; i++) {
|
||||
indicesAfter[i] = "index_after_" + i;
|
||||
createIndex(indicesAfter[i]);
|
||||
}
|
||||
String[] indices = new String[indicesBefore.length + indicesAfter.length];
|
||||
System.arraycopy(indicesBefore, 0, indices, 0, indicesBefore.length);
|
||||
System.arraycopy(indicesAfter, 0, indices, indicesBefore.length, indicesAfter.length);
|
||||
ensureYellow();
|
||||
logger.info("--> indexing some data");
|
||||
IndexRequestBuilder[] buildersBefore = new IndexRequestBuilder[randomIntBetween(10, 200)];
|
||||
for (int i = 0; i < buildersBefore.length; i++) {
|
||||
buildersBefore[i] = client().prepareIndex(RandomPicks.randomFrom(getRandom(), indicesBefore), "foo", Integer.toString(i)).setSource("{ \"foo\" : \"bar\" } ");
|
||||
}
|
||||
IndexRequestBuilder[] buildersAfter = new IndexRequestBuilder[randomIntBetween(10, 200)];
|
||||
for (int i = 0; i < buildersAfter.length; i++) {
|
||||
buildersAfter[i] = client().prepareIndex(RandomPicks.randomFrom(getRandom(), indicesBefore), "bar", Integer.toString(i)).setSource("{ \"foo\" : \"bar\" } ");
|
||||
}
|
||||
indexRandom(true, buildersBefore);
|
||||
indexRandom(true, buildersAfter);
|
||||
assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length)));
|
||||
long[] counts = new long[indices.length];
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
counts[i] = client().prepareCount(indices[i]).get().getCount();
|
||||
}
|
||||
|
||||
logger.info("--> snapshot subset of indices before upgrage");
|
||||
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).setIndices("index_before_*").get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||
|
||||
assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-1").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
|
||||
|
||||
logger.info("--> delete some data from indices that were already snapshotted");
|
||||
int howMany = randomIntBetween(1, buildersBefore.length);
|
||||
|
||||
for (int i = 0; i < howMany; i++) {
|
||||
IndexRequestBuilder indexRequestBuilder = RandomPicks.randomFrom(getRandom(), buildersBefore);
|
||||
IndexRequest request = indexRequestBuilder.request();
|
||||
client().prepareDelete(request.index(), request.type(), request.id()).get();
|
||||
}
|
||||
refresh();
|
||||
final long numDocs = client().prepareCount(indices).get().getCount();
|
||||
assertThat(client().prepareCount(indices).get().getCount(), lessThan((long) (buildersBefore.length + buildersAfter.length)));
|
||||
|
||||
|
||||
client().admin().indices().prepareUpdateSettings(indices).setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "none")).get();
|
||||
backwardsCluster().allowOnAllNodes(indices);
|
||||
logClusterState();
|
||||
boolean upgraded;
|
||||
do {
|
||||
logClusterState();
|
||||
CountResponse countResponse = client().prepareCount().get();
|
||||
assertHitCount(countResponse, numDocs);
|
||||
upgraded = backwardsCluster().upgradeOneNode();
|
||||
ensureYellow();
|
||||
countResponse = client().prepareCount().get();
|
||||
assertHitCount(countResponse, numDocs);
|
||||
} while (upgraded);
|
||||
client().admin().indices().prepareUpdateSettings(indices).setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all")).get();
|
||||
|
||||
logger.info("--> close indices");
|
||||
|
||||
client().admin().indices().prepareClose("index_before_*").get();
|
||||
|
||||
logger.info("--> restore all indices from the snapshot");
|
||||
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).execute().actionGet();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
|
||||
|
||||
ensureYellow();
|
||||
assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length)));
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
assertThat(counts[i], equalTo(client().prepareCount(indices[i]).get().getCount()));
|
||||
}
|
||||
|
||||
logger.info("--> snapshot subset of indices after upgrade");
|
||||
createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setIndices("index_*").get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||
|
||||
// Test restore after index deletion
|
||||
logger.info("--> delete indices");
|
||||
String index = RandomPicks.randomFrom(getRandom(), indices);
|
||||
cluster().wipeIndices(index);
|
||||
logger.info("--> restore one index after deletion");
|
||||
restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setIndices(index).execute().actionGet();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
|
||||
ensureYellow();
|
||||
assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length)));
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
assertThat(counts[i], equalTo(client().prepareCount(indices[i]).get().getCount()));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeleteByQuery() throws ExecutionException, InterruptedException {
|
||||
createIndex("test");
|
||||
|
|
|
@ -37,8 +37,10 @@ import org.elasticsearch.action.admin.indices.flush.FlushResponse;
|
|||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
|
||||
import org.elasticsearch.action.count.CountResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MappingMetaData;
|
||||
import org.elasticsearch.cluster.metadata.SnapshotMetaData;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
|
||||
|
@ -55,6 +57,8 @@ import org.junit.Test;
|
|||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static com.google.common.collect.Lists.newArrayList;
|
||||
|
@ -1255,6 +1259,68 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
|
|||
logger.info("--> done");
|
||||
}
|
||||
|
||||
public void testSnapshotMoreThanOnce() throws ExecutionException, InterruptedException {
|
||||
Client client = client();
|
||||
|
||||
logger.info("--> creating repository");
|
||||
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
|
||||
.put("location", newTempDir(LifecycleScope.SUITE))
|
||||
.put("compress", randomBoolean())
|
||||
.put("chunk_size", randomIntBetween(100, 1000))));
|
||||
|
||||
// only one shard
|
||||
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)));
|
||||
ensureGreen();
|
||||
logger.info("--> indexing");
|
||||
|
||||
final int numdocs = randomIntBetween(10, 100);
|
||||
IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs];
|
||||
for (int i = 0; i < builders.length; i++) {
|
||||
builders[i] = client().prepareIndex("test", "doc", Integer.toString(i)).setSource("foo", "bar" + i);
|
||||
}
|
||||
indexRandom(true, builders);
|
||||
flushAndRefresh();
|
||||
assertNoFailures(client().admin().indices().prepareOptimize("test").setForce(true).setFlush(true).setWaitForMerge(true).setMaxNumSegments(1).get());
|
||||
|
||||
CreateSnapshotResponse createSnapshotResponseFirst = client.admin().cluster().prepareCreateSnapshot("test-repo", "test").setWaitForCompletion(true).setIndices("test").get();
|
||||
assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseFirst.getSnapshotInfo().totalShards()));
|
||||
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
|
||||
{
|
||||
SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test").get().getSnapshots().get(0);
|
||||
List<SnapshotIndexShardStatus> shards = snapshotStatus.getShards();
|
||||
for (SnapshotIndexShardStatus status : shards) {
|
||||
assertThat(status.getStats().getProcessedFiles(), greaterThan(1));
|
||||
}
|
||||
}
|
||||
|
||||
CreateSnapshotResponse createSnapshotResponseSecond = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-1").setWaitForCompletion(true).setIndices("test").get();
|
||||
assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseSecond.getSnapshotInfo().totalShards()));
|
||||
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-1").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
|
||||
{
|
||||
SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-1").get().getSnapshots().get(0);
|
||||
List<SnapshotIndexShardStatus> shards = snapshotStatus.getShards();
|
||||
for (SnapshotIndexShardStatus status : shards) {
|
||||
assertThat(status.getStats().getProcessedFiles(), equalTo(1)); // we flush before the snapshot such that we have to process the segments_N files
|
||||
}
|
||||
}
|
||||
|
||||
client().prepareDelete("test", "doc", "1").get();
|
||||
CreateSnapshotResponse createSnapshotResponseThird = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-2").setWaitForCompletion(true).setIndices("test").get();
|
||||
assertThat(createSnapshotResponseThird.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(createSnapshotResponseThird.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseThird.getSnapshotInfo().totalShards()));
|
||||
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-2").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
|
||||
{
|
||||
SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-2").get().getSnapshots().get(0);
|
||||
List<SnapshotIndexShardStatus> shards = snapshotStatus.getShards();
|
||||
for (SnapshotIndexShardStatus status : shards) {
|
||||
assertThat(status.getStats().getProcessedFiles(), equalTo(2)); // we flush before the snapshot such that we have to process the segments_N files plus the .del file
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean waitForIndex(final String index, TimeValue timeout) throws InterruptedException {
|
||||
return awaitBusy(new Predicate<Object>() {
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,242 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.snapshots;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.LifecycleScope;
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotIndexShardStatus;
|
||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
|
||||
import org.elasticsearch.action.count.CountResponse;
|
||||
import org.elasticsearch.action.index.IndexRequest;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.test.ElasticsearchBackwardsCompatIntegrationTest;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
|
||||
public class SnapshotBackwardsCompatibilityTest extends ElasticsearchBackwardsCompatIntegrationTest {
|
||||
|
||||
@Test
|
||||
public void testSnapshotAndRestore() throws ExecutionException, InterruptedException, IOException {
|
||||
logger.info("--> creating repository");
|
||||
assertAcked(client().admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
|
||||
.put("location", newTempDir(LifecycleScope.SUITE).getAbsolutePath())
|
||||
.put("compress", randomBoolean())
|
||||
.put("chunk_size", randomIntBetween(100, 1000))));
|
||||
String[] indicesBefore = new String[randomIntBetween(2,5)];
|
||||
String[] indicesAfter = new String[randomIntBetween(2,5)];
|
||||
for (int i = 0; i < indicesBefore.length; i++) {
|
||||
indicesBefore[i] = "index_before_" + i;
|
||||
createIndex(indicesBefore[i]);
|
||||
}
|
||||
for (int i = 0; i < indicesAfter.length; i++) {
|
||||
indicesAfter[i] = "index_after_" + i;
|
||||
createIndex(indicesAfter[i]);
|
||||
}
|
||||
String[] indices = new String[indicesBefore.length + indicesAfter.length];
|
||||
System.arraycopy(indicesBefore, 0, indices, 0, indicesBefore.length);
|
||||
System.arraycopy(indicesAfter, 0, indices, indicesBefore.length, indicesAfter.length);
|
||||
ensureYellow();
|
||||
logger.info("--> indexing some data");
|
||||
IndexRequestBuilder[] buildersBefore = new IndexRequestBuilder[randomIntBetween(10, 200)];
|
||||
for (int i = 0; i < buildersBefore.length; i++) {
|
||||
buildersBefore[i] = client().prepareIndex(RandomPicks.randomFrom(getRandom(), indicesBefore), "foo", Integer.toString(i)).setSource("{ \"foo\" : \"bar\" } ");
|
||||
}
|
||||
IndexRequestBuilder[] buildersAfter = new IndexRequestBuilder[randomIntBetween(10, 200)];
|
||||
for (int i = 0; i < buildersAfter.length; i++) {
|
||||
buildersAfter[i] = client().prepareIndex(RandomPicks.randomFrom(getRandom(), indicesBefore), "bar", Integer.toString(i)).setSource("{ \"foo\" : \"bar\" } ");
|
||||
}
|
||||
indexRandom(true, buildersBefore);
|
||||
indexRandom(true, buildersAfter);
|
||||
assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length)));
|
||||
long[] counts = new long[indices.length];
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
counts[i] = client().prepareCount(indices[i]).get().getCount();
|
||||
}
|
||||
|
||||
logger.info("--> snapshot subset of indices before upgrage");
|
||||
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).setIndices("index_before_*").get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||
|
||||
assertThat(client().admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-snap-1").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
|
||||
|
||||
logger.info("--> delete some data from indices that were already snapshotted");
|
||||
int howMany = randomIntBetween(1, buildersBefore.length);
|
||||
|
||||
for (int i = 0; i < howMany; i++) {
|
||||
IndexRequestBuilder indexRequestBuilder = RandomPicks.randomFrom(getRandom(), buildersBefore);
|
||||
IndexRequest request = indexRequestBuilder.request();
|
||||
client().prepareDelete(request.index(), request.type(), request.id()).get();
|
||||
}
|
||||
refresh();
|
||||
final long numDocs = client().prepareCount(indices).get().getCount();
|
||||
assertThat(client().prepareCount(indices).get().getCount(), lessThan((long) (buildersBefore.length + buildersAfter.length)));
|
||||
|
||||
|
||||
client().admin().indices().prepareUpdateSettings(indices).setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "none")).get();
|
||||
backwardsCluster().allowOnAllNodes(indices);
|
||||
logClusterState();
|
||||
boolean upgraded;
|
||||
do {
|
||||
logClusterState();
|
||||
CountResponse countResponse = client().prepareCount().get();
|
||||
assertHitCount(countResponse, numDocs);
|
||||
upgraded = backwardsCluster().upgradeOneNode();
|
||||
ensureYellow();
|
||||
countResponse = client().prepareCount().get();
|
||||
assertHitCount(countResponse, numDocs);
|
||||
} while (upgraded);
|
||||
client().admin().indices().prepareUpdateSettings(indices).setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all")).get();
|
||||
|
||||
logger.info("--> close indices");
|
||||
|
||||
client().admin().indices().prepareClose("index_before_*").get();
|
||||
|
||||
logger.info("--> restore all indices from the snapshot");
|
||||
RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-1").setWaitForCompletion(true).execute().actionGet();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
|
||||
|
||||
ensureYellow();
|
||||
assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length)));
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
assertThat(counts[i], equalTo(client().prepareCount(indices[i]).get().getCount()));
|
||||
}
|
||||
|
||||
logger.info("--> snapshot subset of indices after upgrade");
|
||||
createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setIndices("index_*").get();
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||
|
||||
// Test restore after index deletion
|
||||
logger.info("--> delete indices");
|
||||
String index = RandomPicks.randomFrom(getRandom(), indices);
|
||||
cluster().wipeIndices(index);
|
||||
logger.info("--> restore one index after deletion");
|
||||
restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setWaitForCompletion(true).setIndices(index).execute().actionGet();
|
||||
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
|
||||
ensureYellow();
|
||||
assertThat(client().prepareCount(indices).get().getCount(), equalTo((long) (buildersBefore.length + buildersAfter.length)));
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
assertThat(counts[i], equalTo(client().prepareCount(indices[i]).get().getCount()));
|
||||
}
|
||||
}
|
||||
|
||||
public void testSnapshotMoreThanOnce() throws ExecutionException, InterruptedException, IOException {
|
||||
Client client = client();
|
||||
|
||||
logger.info("--> creating repository");
|
||||
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
|
||||
.setType("fs").setSettings(ImmutableSettings.settingsBuilder()
|
||||
.put("location", newTempDir(LifecycleScope.SUITE).getAbsoluteFile())
|
||||
.put("compress", randomBoolean())
|
||||
.put("chunk_size", randomIntBetween(100, 1000))));
|
||||
|
||||
// only one shard
|
||||
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
));
|
||||
ensureYellow();
|
||||
logger.info("--> indexing");
|
||||
|
||||
final int numDocs = randomIntBetween(10, 100);
|
||||
IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs];
|
||||
for (int i = 0; i < builders.length; i++) {
|
||||
builders[i] = client().prepareIndex("test", "doc", Integer.toString(i)).setSource("foo", "bar" + i);
|
||||
}
|
||||
indexRandom(true, builders);
|
||||
flushAndRefresh();
|
||||
assertNoFailures(client().admin().indices().prepareOptimize("test").setForce(true).setFlush(true).setWaitForMerge(true).setMaxNumSegments(1).get());
|
||||
|
||||
CreateSnapshotResponse createSnapshotResponseFirst = client.admin().cluster().prepareCreateSnapshot("test-repo", "test").setWaitForCompletion(true).setIndices("test").get();
|
||||
assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(createSnapshotResponseFirst.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseFirst.getSnapshotInfo().totalShards()));
|
||||
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
|
||||
{
|
||||
SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test").get().getSnapshots().get(0);
|
||||
List<SnapshotIndexShardStatus> shards = snapshotStatus.getShards();
|
||||
for (SnapshotIndexShardStatus status : shards) {
|
||||
assertThat(status.getStats().getProcessedFiles(), greaterThan(1));
|
||||
}
|
||||
}
|
||||
if (frequently()) {
|
||||
logger.info("--> upgrade");
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "none")).get();
|
||||
backwardsCluster().allowOnAllNodes("test");
|
||||
logClusterState();
|
||||
boolean upgraded;
|
||||
do {
|
||||
logClusterState();
|
||||
CountResponse countResponse = client().prepareCount().get();
|
||||
assertHitCount(countResponse, numDocs);
|
||||
upgraded = backwardsCluster().upgradeOneNode();
|
||||
ensureYellow();
|
||||
countResponse = client().prepareCount().get();
|
||||
assertHitCount(countResponse, numDocs);
|
||||
} while (upgraded);
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE, "all")).get();
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings(ImmutableSettings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, randomIntBetween(1,2))).get();
|
||||
}
|
||||
|
||||
CreateSnapshotResponse createSnapshotResponseSecond = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-1").setWaitForCompletion(true).setIndices("test").get();
|
||||
assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(createSnapshotResponseSecond.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseSecond.getSnapshotInfo().totalShards()));
|
||||
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-1").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
|
||||
{
|
||||
SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-1").get().getSnapshots().get(0);
|
||||
List<SnapshotIndexShardStatus> shards = snapshotStatus.getShards();
|
||||
for (SnapshotIndexShardStatus status : shards) {
|
||||
assertThat(status.getStats().getProcessedFiles(), equalTo(1)); // we flush before the snapshot such that we have to process the segments_N files
|
||||
}
|
||||
}
|
||||
|
||||
client().prepareDelete("test", "doc", "1").get();
|
||||
CreateSnapshotResponse createSnapshotResponseThird = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-2").setWaitForCompletion(true).setIndices("test").get();
|
||||
assertThat(createSnapshotResponseThird.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||
assertThat(createSnapshotResponseThird.getSnapshotInfo().successfulShards(), equalTo(createSnapshotResponseThird.getSnapshotInfo().totalShards()));
|
||||
assertThat(client.admin().cluster().prepareGetSnapshots("test-repo").setSnapshots("test-2").get().getSnapshots().get(0).state(), equalTo(SnapshotState.SUCCESS));
|
||||
{
|
||||
SnapshotStatus snapshotStatus = client.admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-2").get().getSnapshots().get(0);
|
||||
List<SnapshotIndexShardStatus> shards = snapshotStatus.getShards();
|
||||
for (SnapshotIndexShardStatus status : shards) {
|
||||
assertThat(status.getStats().getProcessedFiles(), equalTo(2)); // we flush before the snapshot such that we have to process the segments_N files plus the .del file
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue