From 09eb8d1383b02e54bea030c1e1f7c9173828f2b3 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Fri, 6 Feb 2015 21:09:37 +0000 Subject: [PATCH] [RECOVERY] Handle corruptions during recovery finalization Today we sometimes have to transfer files without verifying the checksum ie. if the file had an old alder32 checksum but was using random access while writing such that we can only verify they files length. We will likely not detect corruptions there and with the new checks during recovery finalization we might run into corrupt index exceptions in that stage. This causes the primary to be failed as well since we don't handle the exception today. This commit adds better handling and a test for this scenario. --- .../indices/recovery/RecoveryTarget.java | 11 ++++ .../recovery/ShardRecoveryHandler.java | 51 ++++++++++++++-- .../index/store/CorruptedFileTest.java | 61 +++++++++++++++++++ 3 files changed, 118 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 07260f0ab37..6f8efa5ebfc 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -19,6 +19,9 @@ package org.elasticsearch.indices.recovery; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IndexOutput; import org.elasticsearch.ElasticsearchException; @@ -398,6 +401,14 @@ public class RecoveryTarget extends AbstractComponent { Store.MetadataSnapshot sourceMetaData = request.sourceMetaSnapshot(); try { store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); + } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { + // this is a fatal exception at this stage. + // this means we transferred files from the remote that have not be checksummed and they are + // broken. We have to clean up this shard entirely, remove all files and bubble it up to the + // source shard since this index might be broken there as well? The Source can handle this and checks + // its content on disk if possible. + store.deleteContent(); // clean up and delete all files + throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex); } catch (Exception ex) { throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex); } diff --git a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java index a8c87cef1b7..26f4fd4b39d 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.ArrayUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterService; @@ -41,6 +42,7 @@ import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.ArrayUtils; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.CancellableThreads.Interruptable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -62,6 +64,9 @@ import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; @@ -345,13 +350,49 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { // Once the files have been renamed, any other files that are not // related to this recovery (out of date segments, for example) // are deleted - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, - new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata), - TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + try { + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, + new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata), + TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + } catch (RemoteTransportException remoteException) { + final IOException corruptIndexException; + // we realized that after the index was copied and we wanted to finalize the recovery + // the index was corrupted: + // - maybe due to a broken segments file on an empty index (transferred with no checksum) + // - maybe due to old segments without checksums or length only checks + if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) { + try { + final Store.MetadataSnapshot recoverySourceMetadata = store.getMetadata(snapshot); + StoreFileMetaData[] metadata = Iterables.toArray(recoverySourceMetadata, StoreFileMetaData.class); + ArrayUtil.timSort(metadata, new Comparator() { + @Override + public int compare(StoreFileMetaData o1, StoreFileMetaData o2) { + return Long.compare(o1.length(), o2.length()); // check small files first + } + }); + for (StoreFileMetaData md : metadata) { + logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md); + if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! + logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md); + throw corruptIndexException; + } + } + } catch (IOException ex) { + remoteException.addSuppressed(ex); + throw remoteException; + } + // corruption has happened on the way to replica + RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null); + exception.addSuppressed(remoteException); + logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK", + corruptIndexException, shard.shardId(), request.targetNode()); + } else { + throw remoteException; + } + } } }); - stopWatch.stop(); logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime()); response.phase1Time = stopWatch.totalTime().millis(); diff --git a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java index f5cbf80aee1..dbb6ede06d4 100644 --- a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java +++ b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.store; import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists; import com.carrotsearch.randomizedtesting.LifecycleScope; +import com.carrotsearch.randomizedtesting.annotations.Repeat; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.google.common.base.Charsets; import com.google.common.base.Predicate; @@ -82,6 +83,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; @@ -298,6 +300,65 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest { assertThat(corruptedFile, notNullValue()); } + /** + * This test triggers a corrupt index exception during finalization size if an empty commit point is transferred + * during recovery we don't know the version of the segments_N file because it has no segments we can take it from. + * This simulates recoveries from old indices or even without checksums and makes sure if we fail during finalization + * we also check if the primary is ok. Without the relevant checks this test fails with a RED cluster + */ + public void testCorruptionOnNetworkLayerFinalizingRecovery() throws ExecutionException, InterruptedException, IOException { + internalCluster().ensureAtLeastNumDataNodes(2); + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get(); + List dataNodeStats = new ArrayList<>(); + for (NodeStats stat : nodeStats.getNodes()) { + if (stat.getNode().isDataNode()) { + dataNodeStats.add(stat); + } + } + + assertThat(dataNodeStats.size(), greaterThanOrEqualTo(2)); + Collections.shuffle(dataNodeStats, getRandom()); + NodeStats primariesNode = dataNodeStats.get(0); + NodeStats unluckyNode = dataNodeStats.get(1); + assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, true) + .put("index.routing.allocation.include._name", primariesNode.getNode().name()) + .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE) + + )); + ensureGreen(); // allocated with empty commit + final AtomicBoolean corrupt = new AtomicBoolean(true); + final CountDownLatch hasCorrupted = new CountDownLatch(1); + for (NodeStats dataNode : dataNodeStats) { + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name())); + mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) { + + @Override + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + if (corrupt.get() && action.equals(RecoveryTarget.Actions.FILE_CHUNK)) { + RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; + byte[] array = req.content().array(); + int i = randomIntBetween(0, req.content().length() - 1); + array[i] = (byte) ~array[i]; // flip one byte in the content + hasCorrupted.countDown(); + } + super.sendRequest(node, requestId, action, request, options); + } + }); + } + + Settings build = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1") + .put("index.routing.allocation.include._name", primariesNode.getNode().name() + "," + unluckyNode.getNode().name()).build(); + client().admin().indices().prepareUpdateSettings("test").setSettings(build).get(); + client().admin().cluster().prepareReroute().get(); + hasCorrupted.await(); + corrupt.set(false); + ensureGreen(); + } + /** * Tests corruption that happens on the network layer and that the primary does not get affected by corruption that happens on the way * to the replica. The file on disk stays uncorrupted