[RECOVERY] Handle corruptions during recovery finalization

Today we sometimes have to transfer files without verifying the checksum
ie. if the file had an old alder32 checksum but was using random access
while writing such that we can only verify they files length. We will likely
not detect corruptions there and with the new checks during recovery finalization
we might run into corrupt index exceptions in that stage. This causes
the primary to be failed as well since we don't handle the exception today. This commit
adds better handling and a test for this scenario.
This commit is contained in:
Simon Willnauer 2015-02-06 21:09:37 +00:00
parent c9480783b3
commit 09eb8d1383
3 changed files with 118 additions and 5 deletions

View File

@ -19,6 +19,9 @@
package org.elasticsearch.indices.recovery; package org.elasticsearch.indices.recovery;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.IndexOutput; import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
@ -398,6 +401,14 @@ public class RecoveryTarget extends AbstractComponent {
Store.MetadataSnapshot sourceMetaData = request.sourceMetaSnapshot(); Store.MetadataSnapshot sourceMetaData = request.sourceMetaSnapshot();
try { try {
store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData); store.cleanupAndVerify("recovery CleanFilesRequestHandler", sourceMetaData);
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
// this means we transferred files from the remote that have not be checksummed and they are
// broken. We have to clean up this shard entirely, remove all files and bubble it up to the
// source shard since this index might be broken there as well? The Source can handle this and checks
// its content on disk if possible.
store.deleteContent(); // clean up and delete all files
throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex);
} catch (Exception ex) { } catch (Exception ex) {
throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex); throw new RecoveryFailedException(recoveryStatus.state(), "failed to clean after recovery", ex);
} }

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Lists;
import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.IOContext; import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.ArrayUtil;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
@ -41,6 +42,7 @@ import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.ArrayUtils;
import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.CancellableThreads.Interruptable; import org.elasticsearch.common.util.CancellableThreads.Interruptable;
import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@ -62,6 +64,9 @@ import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -345,13 +350,49 @@ public final class ShardRecoveryHandler implements Engine.RecoveryHandler {
// Once the files have been renamed, any other files that are not // Once the files have been renamed, any other files that are not
// related to this recovery (out of date segments, for example) // related to this recovery (out of date segments, for example)
// are deleted // are deleted
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES, try {
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata), transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()), new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet(); TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
} catch (RemoteTransportException remoteException) {
final IOException corruptIndexException;
// we realized that after the index was copied and we wanted to finalize the recovery
// the index was corrupted:
// - maybe due to a broken segments file on an empty index (transferred with no checksum)
// - maybe due to old segments without checksums or length only checks
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) {
try {
final Store.MetadataSnapshot recoverySourceMetadata = store.getMetadata(snapshot);
StoreFileMetaData[] metadata = Iterables.toArray(recoverySourceMetadata, StoreFileMetaData.class);
ArrayUtil.timSort(metadata, new Comparator<StoreFileMetaData>() {
@Override
public int compare(StoreFileMetaData o1, StoreFileMetaData o2) {
return Long.compare(o1.length(), o2.length()); // check small files first
}
});
for (StoreFileMetaData md : metadata) {
logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md);
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
throw corruptIndexException;
}
}
} catch (IOException ex) {
remoteException.addSuppressed(ex);
throw remoteException;
}
// corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
exception.addSuppressed(remoteException);
logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK",
corruptIndexException, shard.shardId(), request.targetNode());
} else {
throw remoteException;
}
}
} }
}); });
stopWatch.stop(); stopWatch.stop();
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime()); logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime());
response.phase1Time = stopWatch.totalTime().millis(); response.phase1Time = stopWatch.totalTime().millis();

View File

@ -20,6 +20,7 @@ package org.elasticsearch.index.store;
import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists; import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.Lists;
import com.carrotsearch.randomizedtesting.LifecycleScope; import com.carrotsearch.randomizedtesting.LifecycleScope;
import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
@ -82,6 +83,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.*;
@ -298,6 +300,65 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
assertThat(corruptedFile, notNullValue()); assertThat(corruptedFile, notNullValue());
} }
/**
* This test triggers a corrupt index exception during finalization size if an empty commit point is transferred
* during recovery we don't know the version of the segments_N file because it has no segments we can take it from.
* This simulates recoveries from old indices or even without checksums and makes sure if we fail during finalization
* we also check if the primary is ok. Without the relevant checks this test fails with a RED cluster
*/
public void testCorruptionOnNetworkLayerFinalizingRecovery() throws ExecutionException, InterruptedException, IOException {
internalCluster().ensureAtLeastNumDataNodes(2);
NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
List<NodeStats> dataNodeStats = new ArrayList<>();
for (NodeStats stat : nodeStats.getNodes()) {
if (stat.getNode().isDataNode()) {
dataNodeStats.add(stat);
}
}
assertThat(dataNodeStats.size(), greaterThanOrEqualTo(2));
Collections.shuffle(dataNodeStats, getRandom());
NodeStats primariesNode = dataNodeStats.get(0);
NodeStats unluckyNode = dataNodeStats.get(1);
assertAcked(prepareCreate("test").setSettings(ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(EngineConfig.INDEX_FAIL_ON_CORRUPTION_SETTING, true)
.put("index.routing.allocation.include._name", primariesNode.getNode().name())
.put(EnableAllocationDecider.INDEX_ROUTING_REBALANCE_ENABLE, EnableAllocationDecider.Rebalance.NONE)
));
ensureGreen(); // allocated with empty commit
final AtomicBoolean corrupt = new AtomicBoolean(true);
final CountDownLatch hasCorrupted = new CountDownLatch(1);
for (NodeStats dataNode : dataNodeStats) {
MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().name()));
mockTransportService.addDelegate(internalCluster().getInstance(Discovery.class, unluckyNode.getNode().name()).localNode(), new MockTransportService.DelegateTransport(mockTransportService.original()) {
@Override
public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
if (corrupt.get() && action.equals(RecoveryTarget.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
byte[] array = req.content().array();
int i = randomIntBetween(0, req.content().length() - 1);
array[i] = (byte) ~array[i]; // flip one byte in the content
hasCorrupted.countDown();
}
super.sendRequest(node, requestId, action, request, options);
}
});
}
Settings build = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "1")
.put("index.routing.allocation.include._name", primariesNode.getNode().name() + "," + unluckyNode.getNode().name()).build();
client().admin().indices().prepareUpdateSettings("test").setSettings(build).get();
client().admin().cluster().prepareReroute().get();
hasCorrupted.await();
corrupt.set(false);
ensureGreen();
}
/** /**
* Tests corruption that happens on the network layer and that the primary does not get affected by corruption that happens on the way * Tests corruption that happens on the network layer and that the primary does not get affected by corruption that happens on the way
* to the replica. The file on disk stays uncorrupted * to the replica. The file on disk stays uncorrupted