mirror of
synced 2025-03-09 14:34:43 +00:00
[RECOVERY] Handle corruptions during recovery finalization
Today we sometimes have to transfer files without verifying the checksum ie. if the file had an old alder32 checksum but was using random access while writing such that we can only verify they files length. We will likely not detect corruptions there and with the new checks during recovery finalization we might run into corrupt index exceptions in that stage. This causes the primary to be failed as well since we don't handle the exception today. This commit adds better handling and a test for this scenario.
This commit is contained in:
@ -19,6 +19,9 @@
package org.elasticsearch.indices.recovery;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.ElasticsearchException;
@ -398,6 +401,14 @@ public class RecoveryTarget extends AbstractComponent {
Store.MetadataSnapshot sourceMetaData = request.sourceMetaSnapshot();
try {
store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData);
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
// broken. We have to clean up this shard entirely, remove all files and bubble it up to the
// source shard since this index might be broken there as well? The Source can handle this and checks
// its content on disk if possible.
store.deleteContent(); // clean up and delete all files
throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex);
} catch (Exception ex) {
throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex);
@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService;
@ -41,6 +42,7 @@ import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.CancellableThreads.Interruptable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -62,6 +64,9 @@ import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
@ -345,13 +350,49 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
// Once the files have been renamed, any other files that are not
// related to this recovery (out of date segments, for example)
// are deleted
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata),
try {
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata),
} catch (RemoteTransportException remoteException) {
final IOException corruptIndexException;
// we realized that after the index was copied and we wanted to finalize the recovery
// the index was corrupted:
// - maybe due to a broken segments file on an empty index (transferred with no checksum)
// - maybe due to old segments without checksums or length only checks
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) {
try {
final Store.MetadataSnapshot recoverySourceMetadata = store.getMetadata(snapshot);
StoreFileMetaData[] metadata = Iterables.toArray(recoverySourceMetadata, StoreFileMetaData.class);
ArrayUtil.timSort(metadata, new Comparator<StoreFileMetaData>() {
public int compare(StoreFileMetaData o1, StoreFileMetaData o2) {
return Long.compare(o1.length(), o2.length()); // check small files first
for (StoreFileMetaData md : metadata) {
logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md);
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
throw corruptIndexException;
} catch (IOException ex) {
throw remoteException;
// corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK",
corruptIndexException, shard.shardId(), request.targetNode());
} else {
throw remoteException;
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime());
response.phase1Time = stopWatch.totalTime().millis();
@ -20,6 +20,7 @@ package org.elasticsearch.index.store;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists;
import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.google.common.base.Charsets;
import com.google.common.base.Predicate;
@ -82,6 +83,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
@ -298,6 +300,65 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
assertThat(corruptedFile, notNullValue());
* This test triggers a corrupt index exception during finalization size if an empty commit point is transferred
* during recovery we don't know the version of the segments_N file because it has no segments we can take it from.
* This simulates recoveries from old indices or even without checksums and makes sure if we fail during finalization
* we also check if the primary is ok. Without the relevant checks this test fails with a RED cluster
public void testCorruptionOnNetworkLayerFinalizingRecovery() throws ExecutionException, InterruptedException, IOException {
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
List<NodeStats> dataNodeStats = new ArrayList<>();
for (NodeStats stat : nodeStats.getNodes()) {
if (stat.getNode().isDataNode()) {
assertThat(dataNodeStats.size(), greaterThanOrEqualTo(2));
Collections.shuffle(dataNodeStats, getRandom());
NodeStats primariesNode = dataNodeStats.get(0);
NodeStats unluckyNode = dataNodeStats.get(1);
.put("index.routing.allocation.include._name", primariesNode.getNode().name())
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)
ensureGreen(); // allocated with empty commit
final AtomicBoolean corrupt = new AtomicBoolean(true);
final CountDownLatch hasCorrupted = new CountDownLatch(1);
for (NodeStats dataNode : dataNodeStats) {
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name()));
mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) {
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
if (corrupt.get() && action.equals(RecoveryTarget.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
byte[] array = req.content().array();
int i = randomIntBetween(0, req.content().length() - 1);
array[i] = (byte) ~array[i]; // flip one byte in the content
super.sendRequest(node, requestId, action, request, options);
Settings build = ImmutableSettings.builder()
.put("index.routing.allocation.include._name", primariesNode.getNode().name() + "," + unluckyNode.getNode().name()).build();
* Tests corruption that happens on the network layer and that the primary does not get affected by corruption that happens on the way
* to the replica. The file on disk stays uncorrupted
Reference in New Issue
Block a user