diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 07260f0ab37..6f8efa5ebfc 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -19,6 +19,9 @@ package org.elasticsearch.indices.recovery; +import org.apache.lucene.index.CorruptIndexException; +import org.apache.lucene.index.IndexFormatTooNewException; +import org.apache.lucene.index.IndexFormatTooOldException; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.IndexOutput; import org.elasticsearch.ElasticsearchException; @@ -398,6 +401,14 @@ public class RecoveryTarget extends AbstractComponent { Store.MetadataSnapshot sourceMetaData = request.sourceMetaSnapshot(); try { store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); + } catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) { + // this is a fatal exception at this stage. + // this means we transferred files from the remote that have not be checksummed and they are + // broken. We have to clean up this shard entirely, remove all files and bubble it up to the + // source shard since this index might be broken there as well? The Source can handle this and checks + // its content on disk if possible. + store.deleteContent(); // clean up and delete all files + throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex); } catch (Exception ex) { throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex); } diff --git a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java index a8c87cef1b7..26f4fd4b39d 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java +++ b/src/main/java/org/elasticsearch/indices/recovery/ShardRecoveryHandler.java @@ -24,6 +24,7 @@ import com.google.common.collect.Lists; import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IndexInput; +import org.apache.lucene.util.ArrayUtil; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterService; @@ -41,6 +42,7 @@ import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.ArrayUtils; import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.CancellableThreads.Interruptable; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -62,6 +64,9 @@ import org.elasticsearch.transport.RemoteTransportException; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; @@ -345,13 +350,49 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler { // Once the files have been renamed, any other files that are not // related to this recovery (out of date segments, for example) // are deleted - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, - new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata), - TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), - EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + try { + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, + new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata), + TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), + EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); + } catch (RemoteTransportException remoteException) { + final IOException corruptIndexException; + // we realized that after the index was copied and we wanted to finalize the recovery + // the index was corrupted: + // - maybe due to a broken segments file on an empty index (transferred with no checksum) + // - maybe due to old segments without checksums or length only checks + if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) { + try { + final Store.MetadataSnapshot recoverySourceMetadata = store.getMetadata(snapshot); + StoreFileMetaData[] metadata = Iterables.toArray(recoverySourceMetadata, StoreFileMetaData.class); + ArrayUtil.timSort(metadata, new Comparator() { + @Override + public int compare(StoreFileMetaData o1, StoreFileMetaData o2) { + return Long.compare(o1.length(), o2.length()); // check small files first + } + }); + for (StoreFileMetaData md : metadata) { + logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md); + if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! + logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md); + throw corruptIndexException; + } + } + } catch (IOException ex) { + remoteException.addSuppressed(ex); + throw remoteException; + } + // corruption has happened on the way to replica + RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null); + exception.addSuppressed(remoteException); + logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK", + corruptIndexException, shard.shardId(), request.targetNode()); + } else { + throw remoteException; + } + } } }); - stopWatch.stop(); logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime()); response.phase1Time = stopWatch.totalTime().millis(); diff --git a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java index f5cbf80aee1..dbb6ede06d4 100644 --- a/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java +++ b/src/test/java/org/elasticsearch/index/store/CorruptedFileTest.java @@ -20,6 +20,7 @@ package org.elasticsearch.index.store; import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists; import com.carrotsearch.randomizedtesting.LifecycleScope; +import com.carrotsearch.randomizedtesting.annotations.Repeat; import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.google.common.base.Charsets; import com.google.common.base.Predicate; @@ -82,6 +83,7 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; @@ -298,6 +300,65 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest { assertThat(corruptedFile, notNullValue()); } + /** + * This test triggers a corrupt index exception during finalization size if an empty commit point is transferred + * during recovery we don't know the version of the segments_N file because it has no segments we can take it from. + * This simulates recoveries from old indices or even without checksums and makes sure if we fail during finalization + * we also check if the primary is ok. Without the relevant checks this test fails with a RED cluster + */ + public void testCorruptionOnNetworkLayerFinalizingRecovery() throws ExecutionException, InterruptedException, IOException { + internalCluster().ensureAtLeastNumDataNodes(2); + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get(); + List dataNodeStats = new ArrayList<>(); + for (NodeStats stat : nodeStats.getNodes()) { + if (stat.getNode().isDataNode()) { + dataNodeStats.add(stat); + } + } + + assertThat(dataNodeStats.size(), greaterThanOrEqualTo(2)); + Collections.shuffle(dataNodeStats, getRandom()); + NodeStats primariesNode = dataNodeStats.get(0); + NodeStats unluckyNode = dataNodeStats.get(1); + assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, true) + .put("index.routing.allocation.include._name", primariesNode.getNode().name()) + .put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE) + + )); + ensureGreen(); // allocated with empty commit + final AtomicBoolean corrupt = new AtomicBoolean(true); + final CountDownLatch hasCorrupted = new CountDownLatch(1); + for (NodeStats dataNode : dataNodeStats) { + MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name())); + mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) { + + @Override + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException { + if (corrupt.get() && action.equals(RecoveryTarget.Actions.FILE_CHUNK)) { + RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request; + byte[] array = req.content().array(); + int i = randomIntBetween(0, req.content().length() - 1); + array[i] = (byte) ~array[i]; // flip one byte in the content + hasCorrupted.countDown(); + } + super.sendRequest(node, requestId, action, request, options); + } + }); + } + + Settings build = ImmutableSettings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1") + .put("index.routing.allocation.include._name", primariesNode.getNode().name() + "," + unluckyNode.getNode().name()).build(); + client().admin().indices().prepareUpdateSettings("test").setSettings(build).get(); + client().admin().cluster().prepareReroute().get(); + hasCorrupted.await(); + corrupt.set(false); + ensureGreen(); + } + /** * Tests corruption that happens on the network layer and that the primary does not get affected by corruption that happens on the way * to the replica. The file on disk stays uncorrupted