Start making RecoverySourceHandler unittestable

This commit shuffels and rewrites some code in RecoverySourceHandler to make it
simpler and more unittestable. This commit doesn't change all parts of this class
neither is it fully tested yet. It's an important part of the infrastrucutre so I started
to make it better tested but I don't want to change everything in one go since it makes
review simpler and more detailed. Future commits will continue cleaning up the class and
add more tests.
This commit is contained in:
Simon Willnauer 2015-09-29 10:57:44 +02:00
parent f0a8c10c87
commit e38f4cf01e
6 changed files with 576 additions and 284 deletions

View File

@ -19,6 +19,7 @@
package org.elasticsearch.common.util; package org.elasticsearch.common.util;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -131,7 +132,7 @@ public class CancellableThreads {
public interface Interruptable { public interface Interruptable {
public void run() throws InterruptedException; void run() throws InterruptedException;
} }
public static class ExecutionCancelledException extends ElasticsearchException { public static class ExecutionCancelledException extends ElasticsearchException {

View File

@ -1544,4 +1544,5 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
return estimatedSize; return estimatedSize;
} }
} }
} }

View File

@ -33,19 +33,15 @@ import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.CancellableThreads.Interruptable; import org.elasticsearch.common.util.CancellableThreads.Interruptable;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.RecoveryEngineException; import org.elasticsearch.index.engine.RecoveryEngineException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException; import org.elasticsearch.index.shard.*;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData; import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
@ -55,14 +51,15 @@ import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Comparator; import java.util.Comparator;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function;
import java.util.stream.StreamSupport; import java.util.stream.StreamSupport;
/** /**
@ -83,6 +80,8 @@ public class RecoverySourceHandler {
private final TransportService transportService; private final TransportService transportService;
protected final RecoveryResponse response; protected final RecoveryResponse response;
private final TransportRequestOptions requestOptions;
private final CancellableThreads cancellableThreads = new CancellableThreads() { private final CancellableThreads cancellableThreads = new CancellableThreads() {
@Override @Override
protected void onCancel(String reason, @Nullable Throwable suppressedException) { protected void onCancel(String reason, @Nullable Throwable suppressedException) {
@ -99,7 +98,6 @@ public class RecoverySourceHandler {
} }
}; };
public RecoverySourceHandler(final IndexShard shard, final StartRecoveryRequest request, final RecoverySettings recoverySettings, public RecoverySourceHandler(final IndexShard shard, final StartRecoveryRequest request, final RecoverySettings recoverySettings,
final TransportService transportService, final ESLogger logger) { final TransportService transportService, final ESLogger logger) {
this.shard = shard; this.shard = shard;
@ -111,6 +109,11 @@ public class RecoverySourceHandler {
this.shardId = this.request.shardId().id(); this.shardId = this.request.shardId().id();
this.response = new RecoveryResponse(); this.response = new RecoveryResponse();
this.requestOptions = TransportRequestOptions.options()
.withCompress(recoverySettings.compress())
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionTimeout());
} }
/** /**
@ -216,7 +219,10 @@ public class RecoverySourceHandler {
} }
totalSize += md.length(); totalSize += md.length();
} }
for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) { List<StoreFileMetaData> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());
phase1Files.addAll(diff.different);
phase1Files.addAll(diff.missing);
for (StoreFileMetaData md : phase1Files) {
if (request.metadataSnapshot().asMap().containsKey(md.name())) { if (request.metadataSnapshot().asMap().containsKey(md.name())) {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but is different: remote [{}], local [{}]", logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but is different: remote [{}], local [{}]",
indexName, shardId, request.targetNode(), md.name(), request.metadataSnapshot().asMap().get(md.name()), md); indexName, shardId, request.targetNode(), md.name(), request.metadataSnapshot().asMap().get(md.name()), md);
@ -235,215 +241,69 @@ public class RecoverySourceHandler {
logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]", logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
indexName, shardId, request.targetNode(), response.phase1FileNames.size(), indexName, shardId, request.targetNode(), response.phase1FileNames.size(),
new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize)); new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
cancellableThreads.execute(new Interruptable() { cancellableThreads.execute(() -> {
@Override RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(),
public void run() throws InterruptedException { response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes,
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(), translogView.totalOperations());
response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes, transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
translogView.totalOperations()); TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
}); });
// This latch will be used to wait until all files have been transferred to the target node
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
final CopyOnWriteArrayList<Throwable> exceptions = new CopyOnWriteArrayList<>();
final AtomicReference<Throwable> corruptedEngine = new AtomicReference<>();
int fileIndex = 0;
ThreadPoolExecutor pool;
// How many bytes we've copied since we last called RateLimiter.pause // How many bytes we've copied since we last called RateLimiter.pause
final AtomicLong bytesSinceLastPause = new AtomicLong(); final AtomicLong bytesSinceLastPause = new AtomicLong();
final Function<StoreFileMetaData, OutputStream> outputStreamFactories = (md) -> new RecoveryOutputStream(md, bytesSinceLastPause, translogView);
for (final String name : response.phase1FileNames) { sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories);
long fileSize = response.phase1FileSizes.get(fileIndex); cancellableThreads.execute(() -> {
// Send the CLEAN_FILES request, which takes all of the files that
// Files are split into two categories, files that are "small" // were transferred and renames them from their temporary file
// (under 5mb) and other files. Small files are transferred // names to the actual file names. It also writes checksums for
// using a separate thread pool dedicated to small files. // the files after they have been renamed.
// //
// The idea behind this is that while we are transferring an // Once the files have been renamed, any other files that are not
// older, large index, a user may create a new index, but that // related to this recovery (out of date segments, for example)
// index will not be able to recover until the large index // are deleted
// finishes, by using two different thread pools we can allow try {
// tiny files (like segments for a brand new index) to be transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
// recovered while ongoing large segment recoveries are new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()),
// happening. It also allows these pools to be configured TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
// separately. EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
if (fileSize > RecoverySettings.SMALL_FILE_CUTOFF_BYTES) { } catch (RemoteTransportException remoteException) {
pool = recoverySettings.concurrentStreamPool(); final IOException corruptIndexException;
} else { // we realized that after the index was copied and we wanted to finalize the recovery
pool = recoverySettings.concurrentSmallFileStreamPool(); // the index was corrupted:
} // - maybe due to a broken segments file on an empty index (transferred with no checksum)
// - maybe due to old segments without checksums or length only checks
pool.execute(new AbstractRunnable() { if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) {
@Override try {
public void onFailure(Throwable t) { final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot);
// we either got rejected or the store can't be incremented / we are canceled StoreFileMetaData[] metadata =
logger.debug("Failed to transfer file [" + name + "] on recovery"); StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(size -> new StoreFileMetaData[size]);
} ArrayUtil.timSort(metadata, new Comparator<StoreFileMetaData>() {
@Override
@Override public int compare(StoreFileMetaData o1, StoreFileMetaData o2) {
public void onAfter() { return Long.compare(o1.length(), o2.length()); // check small files first
// Signify this file has completed by decrementing the latch
latch.countDown();
}
@Override
protected void doRun() {
cancellableThreads.checkForCancel();
store.incRef();
final StoreFileMetaData md = recoverySourceMetadata.get(name);
try (final IndexInput indexInput = store.directory().openInput(name, IOContext.READONCE)) {
final int BUFFER_SIZE = (int) Math.max(1, recoverySettings.fileChunkSize().bytes()); // at least one!
final byte[] buf = new byte[BUFFER_SIZE];
boolean shouldCompressRequest = recoverySettings.compress();
if (CompressorFactory.isCompressed(indexInput)) {
shouldCompressRequest = false;
}
final long len = indexInput.length();
long readCount = 0;
final TransportRequestOptions requestOptions = TransportRequestOptions.options()
.withCompress(shouldCompressRequest)
.withType(TransportRequestOptions.Type.RECOVERY)
.withTimeout(recoverySettings.internalActionTimeout());
while (readCount < len) {
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
throw new IndexShardClosedException(shard.shardId());
} }
int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE; });
final long position = indexInput.getFilePointer(); for (StoreFileMetaData md : metadata) {
logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md);
// Pause using the rate limiter, if desired, to throttle the recovery
RateLimiter rl = recoverySettings.rateLimiter();
long throttleTimeInNanos = 0;
if (rl != null) {
long bytes = bytesSinceLastPause.addAndGet(toRead);
if (bytes > rl.getMinPauseCheckBytes()) {
// Time to pause
bytesSinceLastPause.addAndGet(-bytes);
throttleTimeInNanos = rl.pause(bytes);
shard.recoveryStats().addThrottleTime(throttleTimeInNanos);
}
}
indexInput.readBytes(buf, 0, toRead, false);
final BytesArray content = new BytesArray(buf, 0, toRead);
readCount += toRead;
final boolean lastChunk = readCount == len;
final RecoveryFileChunkRequest fileChunkRequest = new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position,
content, lastChunk, translogView.totalOperations(), throttleTimeInNanos);
cancellableThreads.execute(new Interruptable() {
@Override
public void run() throws InterruptedException {
// Actually send the file chunk to the target node, waiting for it to complete
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK,
fileChunkRequest, requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
}
} catch (Throwable e) {
final Throwable corruptIndexException;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail! if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
shard.engine().failEngine("recovery", corruptIndexException);
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md); logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
if (corruptedEngine.compareAndSet(null, corruptIndexException) == false) { throw corruptIndexException;
// if we are not the first exception, add ourselves as suppressed to the main one:
corruptedEngine.get().addSuppressed(e);
}
} else { // corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
exception.addSuppressed(e);
exceptions.add(0, exception); // last exception first
logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum OK",
corruptIndexException, shard.shardId(), request.targetNode(), md);
} }
} else {
exceptions.add(0, e); // last exceptions first
} }
} finally { } catch (IOException ex) {
store.decRef(); remoteException.addSuppressed(ex);
}
}
});
fileIndex++;
}
cancellableThreads.execute(new Interruptable() {
@Override
public void run() throws InterruptedException {
// Wait for all files that need to be transferred to finish transferring
latch.await();
}
});
if (corruptedEngine.get() != null) {
shard.engine().failEngine("recovery", corruptedEngine.get());
throw corruptedEngine.get();
} else {
ExceptionsHelper.rethrowAndSuppress(exceptions);
}
cancellableThreads.execute(new Interruptable() {
@Override
public void run() throws InterruptedException {
// Send the CLEAN_FILES request, which takes all of the files that
// were transferred and renames them from their temporary file
// names to the actual file names. It also writes checksums for
// the files after they have been renamed.
//
// Once the files have been renamed, any other files that are not
// related to this recovery (out of date segments, for example)
// are deleted
try {
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()),
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
} catch (RemoteTransportException remoteException) {
final IOException corruptIndexException;
// we realized that after the index was copied and we wanted to finalize the recovery
// the index was corrupted:
// - maybe due to a broken segments file on an empty index (transferred with no checksum)
// - maybe due to old segments without checksums or length only checks
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) {
try {
final Store.MetadataSnapshot recoverySourceMetadata = store.getMetadata(snapshot);
StoreFileMetaData[] metadata =
StreamSupport.stream(recoverySourceMetadata.spliterator(), false).toArray(size -> new StoreFileMetaData[size]);
ArrayUtil.timSort(metadata, new Comparator<StoreFileMetaData>() {
@Override
public int compare(StoreFileMetaData o1, StoreFileMetaData o2) {
return Long.compare(o1.length(), o2.length()); // check small files first
}
});
for (StoreFileMetaData md : metadata) {
logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md);
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
shard.engine().failEngine("recovery", corruptIndexException);
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
throw corruptIndexException;
}
}
} catch (IOException ex) {
remoteException.addSuppressed(ex);
throw remoteException;
}
// corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
exception.addSuppressed(remoteException);
logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK",
corruptIndexException, shard.shardId(), request.targetNode());
throw exception;
} else {
throw remoteException; throw remoteException;
} }
// corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
exception.addSuppressed(remoteException);
logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK",
corruptIndexException, shard.shardId(), request.targetNode());
throw exception;
} else {
throw remoteException;
} }
} }
}); });
@ -460,6 +320,8 @@ public class RecoverySourceHandler {
} }
} }
protected void prepareTargetForTranslog(final Translog.View translogView) { protected void prepareTargetForTranslog(final Translog.View translogView) {
StopWatch stopWatch = new StopWatch().start(); StopWatch stopWatch = new StopWatch().start();
logger.trace("{} recovery [phase1] to {}: prepare remote engine for translog", request.shardId(), request.targetNode()); logger.trace("{} recovery [phase1] to {}: prepare remote engine for translog", request.shardId(), request.targetNode());
@ -603,14 +465,11 @@ public class RecoverySourceHandler {
// recoverySettings.rateLimiter().pause(size); // recoverySettings.rateLimiter().pause(size);
// } // }
cancellableThreads.execute(new Interruptable() { cancellableThreads.execute(() -> {
@Override final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
public void run() throws InterruptedException { request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations());
final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest( transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations()); recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
}); });
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("[{}][{}] sent batch of [{}][{}] (total: [{}]) translog operations to {}", logger.trace("[{}][{}] sent batch of [{}][{}] (total: [{}]) translog operations to {}",
@ -631,14 +490,11 @@ public class RecoverySourceHandler {
} }
// send the leftover // send the leftover
if (!operations.isEmpty()) { if (!operations.isEmpty()) {
cancellableThreads.execute(new Interruptable() { cancellableThreads.execute(() -> {
@Override RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
public void run() throws InterruptedException { request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations());
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest( transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations()); recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
}); });
} }
@ -667,4 +523,165 @@ public class RecoverySourceHandler {
'}'; '}';
} }
final class RecoveryOutputStream extends OutputStream {
private final StoreFileMetaData md;
private final AtomicLong bytesSinceLastPause;
private final Translog.View translogView;
private long position = 0;
RecoveryOutputStream(StoreFileMetaData md, AtomicLong bytesSinceLastPause, Translog.View translogView) {
this.md = md;
this.bytesSinceLastPause = bytesSinceLastPause;
this.translogView = translogView;
}
@Override
public final void write(int b) throws IOException {
write(new byte[]{(byte) b}, 0, 1);
}
@Override
public final void write(byte[] b, int offset, int length) throws IOException {
sendNextChunk(position, new BytesArray(b, offset, length), md.length() == position + length);
position += length;
assert md.length() >= position : "length: " + md.length() + " but positions was: " + position;
}
private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException {
cancellableThreads.execute(() -> {
// Pause using the rate limiter, if desired, to throttle the recovery
final long throttleTimeInNanos;
final RateLimiter rl = recoverySettings.rateLimiter();
if (rl != null) {
long bytes = bytesSinceLastPause.addAndGet(content.length());
if (bytes > rl.getMinPauseCheckBytes()) {
// Time to pause
bytesSinceLastPause.addAndGet(-bytes);
try {
throttleTimeInNanos = rl.pause(bytes);
shard.recoveryStats().addThrottleTime(throttleTimeInNanos);
} catch (IOException e) {
throw new ElasticsearchException("failed to pause recovery", e);
}
} else {
throttleTimeInNanos = 0;
}
} else {
throttleTimeInNanos = 0;
}
// Actually send the file chunk to the target node, waiting for it to complete
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK,
new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content, lastChunk,
translogView.totalOperations(),
/* we send totalOperations with every request since we collect stats on the target and that way we can
* see how many translog ops we accumulate while copying files across the network. A future optimization
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
*/
throttleTimeInNanos), requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
});
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
throw new IndexShardClosedException(request.shardId());
}
}
}
void sendFiles(Store store, StoreFileMetaData[] files, Function<StoreFileMetaData, OutputStream> outputStreamFactory) throws Throwable {
store.incRef();
try {
Future[] runners = asyncSendFiles(store, files, outputStreamFactory);
IOException corruptedEngine = null;
final List<Throwable> exceptions = new ArrayList<>();
for (int i = 0; i < runners.length; i++) {
StoreFileMetaData md = files[i];
try {
runners[i].get();
} catch (ExecutionException t) {
corruptedEngine = handleExecutionException(store, corruptedEngine, exceptions, md, t.getCause());
} catch (InterruptedException t) {
corruptedEngine = handleExecutionException(store, corruptedEngine, exceptions, md, t);
}
}
if (corruptedEngine != null) {
failEngine(corruptedEngine);
throw corruptedEngine;
} else {
ExceptionsHelper.rethrowAndSuppress(exceptions);
}
} finally {
store.decRef();
}
}
private IOException handleExecutionException(Store store, IOException corruptedEngine, List<Throwable> exceptions, StoreFileMetaData md, Throwable t) {
logger.debug("Failed to transfer file [" + md + "] on recovery");
final IOException corruptIndexException;
final boolean checkIntegrity = corruptedEngine == null;
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(t)) != null) {
if (checkIntegrity && store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
corruptedEngine = corruptIndexException;
} else { // corruption has happened on the way to replica
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
exception.addSuppressed(t);
if (checkIntegrity) {
logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum OK",
corruptIndexException, shardId, request.targetNode(), md);
} else {
logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum are skipped",
corruptIndexException, shardId, request.targetNode(), md);
}
exceptions.add(exception);
}
} else {
exceptions.add(t);
}
return corruptedEngine;
}
protected void failEngine(IOException cause) {
shard.engine().failEngine("recovery", cause);
}
Future<Void>[] asyncSendFiles(Store store, StoreFileMetaData[] files, Function<StoreFileMetaData, OutputStream> outputStreamFactory) {
store.incRef();
try {
final Future<Void>[] futures = new Future[files.length];
for (int i = 0; i < files.length; i++) {
final StoreFileMetaData md = files[i];
long fileSize = md.length();
// Files are split into two categories, files that are "small"
// (under 5mb) and other files. Small files are transferred
// using a separate thread pool dedicated to small files.
//
// The idea behind this is that while we are transferring an
// older, large index, a user may create a new index, but that
// index will not be able to recover until the large index
// finishes, by using two different thread pools we can allow
// tiny files (like segments for a brand new index) to be
// recovered while ongoing large segment recoveries are
// happening. It also allows these pools to be configured
// separately.
ThreadPoolExecutor pool;
if (fileSize > RecoverySettings.SMALL_FILE_CUTOFF_BYTES) {
pool = recoverySettings.concurrentStreamPool();
} else {
pool = recoverySettings.concurrentSmallFileStreamPool();
}
Future<Void> future = pool.submit(() -> {
try (final OutputStream outputStream = outputStreamFactory.apply(md);
final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStream);
}
return null;
});
futures[i] = future;
}
return futures;
} finally {
store.decRef();
}
}
} }

View File

@ -20,14 +20,9 @@ package org.elasticsearch.index.store;
import com.carrotsearch.randomizedtesting.generators.RandomPicks; import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexFileNames; import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@ -71,6 +66,7 @@ import org.elasticsearch.monitor.fs.FsInfo;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.CorruptionUtils;
import org.elasticsearch.test.InternalTestCluster; import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.store.MockFSDirectoryService; import org.elasticsearch.test.store.MockFSDirectoryService;
import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.test.transport.MockTransportService;
@ -83,12 +79,9 @@ import org.junit.Test;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintStream; import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream; import java.nio.file.DirectoryStream;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -649,54 +642,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
} }
} }
pruneOldDeleteGenerations(files); pruneOldDeleteGenerations(files);
Path fileToCorrupt = null; CorruptionUtils.corruptFile(getRandom(), files.toArray(new Path[0]));
if (!files.isEmpty()) {
fileToCorrupt = RandomPicks.randomFrom(getRandom(), files);
try (Directory dir = FSDirectory.open(fileToCorrupt.toAbsolutePath().getParent())) {
long checksumBeforeCorruption;
try (IndexInput input = dir.openInput(fileToCorrupt.getFileName().toString(), IOContext.DEFAULT)) {
checksumBeforeCorruption = CodecUtil.retrieveChecksum(input);
}
try (FileChannel raf = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
// read
raf.position(randomIntBetween(0, (int) Math.min(Integer.MAX_VALUE, raf.size() - 1)));
long filePointer = raf.position();
ByteBuffer bb = ByteBuffer.wrap(new byte[1]);
raf.read(bb);
bb.flip();
// corrupt
byte oldValue = bb.get(0);
byte newValue = (byte) (oldValue + 1);
bb.put(0, newValue);
// rewrite
raf.position(filePointer);
raf.write(bb);
logger.info("Corrupting file for shard {} -- flipping at position {} from {} to {} file: {}", shardRouting, filePointer, Integer.toHexString(oldValue), Integer.toHexString(newValue), fileToCorrupt.getFileName());
}
long checksumAfterCorruption;
long actualChecksumAfterCorruption;
try (ChecksumIndexInput input = dir.openChecksumInput(fileToCorrupt.getFileName().toString(), IOContext.DEFAULT)) {
assertThat(input.getFilePointer(), is(0l));
input.seek(input.length() - 8); // one long is the checksum... 8 bytes
checksumAfterCorruption = input.getChecksum();
actualChecksumAfterCorruption = input.readLong();
}
// we need to add assumptions here that the checksums actually really don't match there is a small chance to get collisions
// in the checksum which is ok though....
StringBuilder msg = new StringBuilder();
msg.append("Checksum before: [").append(checksumBeforeCorruption).append("]");
msg.append(" after: [").append(checksumAfterCorruption).append("]");
msg.append(" checksum value after corruption: ").append(actualChecksumAfterCorruption).append("]");
msg.append(" file: ").append(fileToCorrupt.getFileName()).append(" length: ").append(dir.fileLength(fileToCorrupt.getFileName().toString()));
logger.info(msg.toString());
assumeTrue("Checksum collision - " + msg.toString(),
checksumAfterCorruption != checksumBeforeCorruption // collision
|| actualChecksumAfterCorruption != checksumBeforeCorruption); // checksum corrupted
}
}
assertThat("no file corrupted", fileToCorrupt, notNullValue());
return shardRouting; return shardRouting;
} }

View File

@ -0,0 +1,227 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.recovery;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.document.TextField;
import org.apache.lucene.index.*;
import org.apache.lucene.store.*;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetaData;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.DummyShardLock;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.CorruptionUtils;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.Matchers.is;
public class RecoverySourceHandlerTests extends ESTestCase {
private final ShardId shardId = new ShardId(new Index("index"), 1);
private final NodeSettingsService service = new NodeSettingsService(Settings.EMPTY);
public void testSendFiles() throws Throwable {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
StartRecoveryRequest request = new StartRecoveryRequest(shardId,
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
randomBoolean(), null, RecoveryState.Type.STORE, randomLong());
Store store = newStore(createTempDir());
RecoverySourceHandler handler = new RecoverySourceHandler(null, request, recoverySettings, null, logger);
Directory dir = store.directory();
RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig());
int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
Document document = new Document();
document.add(new StringField("id", Integer.toString(i), Field.Store.YES));
document.add(newField("field", randomUnicodeOfCodepointLengthBetween(1, 10), TextField.TYPE_STORED));
writer.addDocument(document);
}
writer.commit();
Store.MetadataSnapshot metadata = store.getMetadata();
List<StoreFileMetaData> metas = new ArrayList<>();
for (StoreFileMetaData md : metadata) {
metas.add(md);
}
Store targetStore = newStore(createTempDir());
handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> {
try {
return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata();
Store.RecoveryDiff recoveryDiff = targetStoreMetadata.recoveryDiff(metadata);
assertEquals(metas.size(), recoveryDiff.identical.size());
assertEquals(0, recoveryDiff.different.size());
assertEquals(0, recoveryDiff.missing.size());
IndexReader reader = DirectoryReader.open(targetStore.directory());
assertEquals(numDocs, reader.maxDoc());
IOUtils.close(reader, writer, store, targetStore, recoverySettings);
}
public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
StartRecoveryRequest request = new StartRecoveryRequest(shardId,
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
randomBoolean(), null, RecoveryState.Type.STORE, randomLong());
Path tempDir = createTempDir();
Store store = newStore(tempDir);
AtomicBoolean failedEngine = new AtomicBoolean(false);
RecoverySourceHandler handler = new RecoverySourceHandler(null, request, recoverySettings, null, logger) {
@Override
protected void failEngine(IOException cause) {
assertFalse(failedEngine.get());
failedEngine.set(true);
}
};
Directory dir = store.directory();
RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig());
int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
Document document = new Document();
document.add(new StringField("id", Integer.toString(i), Field.Store.YES));
document.add(newField("field", randomUnicodeOfCodepointLengthBetween(1, 10), TextField.TYPE_STORED));
writer.addDocument(document);
}
writer.commit();
writer.close();
Store.MetadataSnapshot metadata = store.getMetadata();
List<StoreFileMetaData> metas = new ArrayList<>();
for (StoreFileMetaData md : metadata) {
metas.add(md);
}
CorruptionUtils.corruptFile(getRandom(), FileSystemUtils.files(tempDir, (p) ->
(p.getFileName().toString().equals("write.lock") ||
Files.isDirectory(p)) == false));
Store targetStore = newStore(createTempDir());
try {
handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> {
try {
return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
fail("corrupted index");
} catch (IOException ex) {
assertNotNull(ExceptionsHelper.unwrapCorruption(ex));
}
assertTrue(failedEngine.get());
IOUtils.close(store, targetStore, recoverySettings);
}
public void testHandleExceptinoOnSendSendFiles() throws Throwable {
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
StartRecoveryRequest request = new StartRecoveryRequest(shardId,
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
randomBoolean(), null, RecoveryState.Type.STORE, randomLong());
Path tempDir = createTempDir();
Store store = newStore(tempDir);
AtomicBoolean failedEngine = new AtomicBoolean(false);
RecoverySourceHandler handler = new RecoverySourceHandler(null, request, recoverySettings, null, logger) {
@Override
protected void failEngine(IOException cause) {
assertFalse(failedEngine.get());
failedEngine.set(true);
}
};
Directory dir = store.directory();
RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig());
int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
Document document = new Document();
document.add(new StringField("id", Integer.toString(i), Field.Store.YES));
document.add(newField("field", randomUnicodeOfCodepointLengthBetween(1, 10), TextField.TYPE_STORED));
writer.addDocument(document);
}
writer.commit();
writer.close();
Store.MetadataSnapshot metadata = store.getMetadata();
List<StoreFileMetaData> metas = new ArrayList<>();
for (StoreFileMetaData md : metadata) {
metas.add(md);
}
final boolean throwCorruptedIndexException = randomBoolean();
Store targetStore = newStore(createTempDir());
try {
handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> {
if (throwCorruptedIndexException) {
throw new RuntimeException(new CorruptIndexException("foo", "bar"));
} else {
throw new RuntimeException("boom");
}
});
fail("exception index");
} catch (RuntimeException ex) {
assertNull(ExceptionsHelper.unwrapCorruption(ex));
if (throwCorruptedIndexException) {
assertEquals(ex.getMessage(), "[File corruption occurred on recovery but checksums are ok]");
} else {
assertEquals(ex.getMessage(), "boom");
}
} catch (CorruptIndexException ex) {
fail("not expected here");
}
assertFalse(failedEngine.get());
IOUtils.close(store, targetStore, recoverySettings);
}
private Store newStore(Path path) throws IOException {
DirectoryService directoryService = new DirectoryService(shardId, Settings.EMPTY) {
@Override
public long throttleTimeInNanos() {
return 0;
}
@Override
public Directory newDirectory() throws IOException {
return RecoverySourceHandlerTests.newFSDirectory(path);
}
};
return new Store(shardId, Settings.EMPTY, directoryService, new DummyShardLock(shardId));
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.*;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.ESLoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Random;
import static org.apache.lucene.util.LuceneTestCase.assumeTrue;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public final class CorruptionUtils {
private static ESLogger logger = ESLoggerFactory.getLogger("test");
private CorruptionUtils() {}
/**
* Corrupts a random file at a random position
*/
public static void corruptFile(Random random, Path... files) throws IOException {
assertTrue("files must be non-empty", files.length > 0);
final Path fileToCorrupt = RandomPicks.randomFrom(random, files);
assertTrue(fileToCorrupt + " is not a file", Files.isRegularFile(fileToCorrupt));
try (Directory dir = FSDirectory.open(fileToCorrupt.toAbsolutePath().getParent())) {
long checksumBeforeCorruption;
try (IndexInput input = dir.openInput(fileToCorrupt.getFileName().toString(), IOContext.DEFAULT)) {
checksumBeforeCorruption = CodecUtil.retrieveChecksum(input);
}
try (FileChannel raf = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
// read
raf.position(random.nextInt((int) Math.min(Integer.MAX_VALUE, raf.size())));
long filePointer = raf.position();
ByteBuffer bb = ByteBuffer.wrap(new byte[1]);
raf.read(bb);
bb.flip();
// corrupt
byte oldValue = bb.get(0);
byte newValue = (byte) (oldValue + 1);
bb.put(0, newValue);
// rewrite
raf.position(filePointer);
raf.write(bb);
logger.info("Corrupting file -- flipping at position {} from {} to {} file: {}", filePointer, Integer.toHexString(oldValue), Integer.toHexString(newValue), fileToCorrupt.getFileName());
}
long checksumAfterCorruption;
long actualChecksumAfterCorruption;
try (ChecksumIndexInput input = dir.openChecksumInput(fileToCorrupt.getFileName().toString(), IOContext.DEFAULT)) {
assertThat(input.getFilePointer(), is(0l));
input.seek(input.length() - 8); // one long is the checksum... 8 bytes
checksumAfterCorruption = input.getChecksum();
actualChecksumAfterCorruption = input.readLong();
}
// we need to add assumptions here that the checksums actually really don't match there is a small chance to get collisions
// in the checksum which is ok though....
StringBuilder msg = new StringBuilder();
msg.append("Checksum before: [").append(checksumBeforeCorruption).append("]");
msg.append(" after: [").append(checksumAfterCorruption).append("]");
msg.append(" checksum value after corruption: ").append(actualChecksumAfterCorruption).append("]");
msg.append(" file: ").append(fileToCorrupt.getFileName()).append(" length: ").append(dir.fileLength(fileToCorrupt.getFileName().toString()));
logger.info(msg.toString());
assumeTrue("Checksum collision - " + msg.toString(),
checksumAfterCorruption != checksumBeforeCorruption // collision
|| actualChecksumAfterCorruption != checksumBeforeCorruption); // checksum corrupted
assertThat("no file corrupted", fileToCorrupt, notNullValue());
}
}
}