Merge pull request #13840 from s1monw/source_handler_testability
Start making RecoverySourceHandler unittestable
This commit is contained in:
commit
fc8e452a37
|
@ -19,6 +19,7 @@
|
|||
package org.elasticsearch.common.util;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
||||
|
@ -131,7 +132,7 @@ public class CancellableThreads {
|
|||
|
||||
|
||||
public interface Interruptable {
|
||||
public void run() throws InterruptedException;
|
||||
void run() throws InterruptedException;
|
||||
}
|
||||
|
||||
public static class ExecutionCancelledException extends ElasticsearchException {
|
||||
|
|
|
@ -1544,4 +1544,5 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
|||
return estimatedSize;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -33,19 +33,15 @@ import org.elasticsearch.ExceptionsHelper;
|
|||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.compress.CompressorFactory;
|
||||
import org.elasticsearch.common.io.Streams;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.CancellableThreads;
|
||||
import org.elasticsearch.common.util.CancellableThreads.Interruptable;
|
||||
import org.elasticsearch.common.util.iterable.Iterables;
|
||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.RecoveryEngineException;
|
||||
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardClosedException;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.index.shard.*;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
|
@ -55,14 +51,15 @@ import org.elasticsearch.transport.TransportRequestOptions;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.StreamSupport;
|
||||
|
||||
/**
|
||||
|
@ -83,6 +80,8 @@ public class RecoverySourceHandler {
|
|||
private final TransportService transportService;
|
||||
|
||||
protected final RecoveryResponse response;
|
||||
private final TransportRequestOptions requestOptions;
|
||||
|
||||
private final CancellableThreads cancellableThreads = new CancellableThreads() {
|
||||
@Override
|
||||
protected void onCancel(String reason, @Nullable Throwable suppressedException) {
|
||||
|
@ -99,7 +98,6 @@ public class RecoverySourceHandler {
|
|||
}
|
||||
};
|
||||
|
||||
|
||||
public RecoverySourceHandler(final IndexShard shard, final StartRecoveryRequest request, final RecoverySettings recoverySettings,
|
||||
final TransportService transportService, final ESLogger logger) {
|
||||
this.shard = shard;
|
||||
|
@ -111,6 +109,11 @@ public class RecoverySourceHandler {
|
|||
this.shardId = this.request.shardId().id();
|
||||
|
||||
this.response = new RecoveryResponse();
|
||||
this.requestOptions = TransportRequestOptions.options()
|
||||
.withCompress(recoverySettings.compress())
|
||||
.withType(TransportRequestOptions.Type.RECOVERY)
|
||||
.withTimeout(recoverySettings.internalActionTimeout());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -216,7 +219,10 @@ public class RecoverySourceHandler {
|
|||
}
|
||||
totalSize += md.length();
|
||||
}
|
||||
for (StoreFileMetaData md : Iterables.concat(diff.different, diff.missing)) {
|
||||
List<StoreFileMetaData> phase1Files = new ArrayList<>(diff.different.size() + diff.missing.size());
|
||||
phase1Files.addAll(diff.different);
|
||||
phase1Files.addAll(diff.missing);
|
||||
for (StoreFileMetaData md : phase1Files) {
|
||||
if (request.metadataSnapshot().asMap().containsKey(md.name())) {
|
||||
logger.trace("[{}][{}] recovery [phase1] to {}: recovering [{}], exists in local store, but is different: remote [{}], local [{}]",
|
||||
indexName, shardId, request.targetNode(), md.name(), request.metadataSnapshot().asMap().get(md.name()), md);
|
||||
|
@ -235,215 +241,69 @@ public class RecoverySourceHandler {
|
|||
logger.trace("[{}][{}] recovery [phase1] to {}: recovering_files [{}] with total_size [{}], reusing_files [{}] with total_size [{}]",
|
||||
indexName, shardId, request.targetNode(), response.phase1FileNames.size(),
|
||||
new ByteSizeValue(totalSize), response.phase1ExistingFileNames.size(), new ByteSizeValue(existingTotalSize));
|
||||
cancellableThreads.execute(new Interruptable() {
|
||||
@Override
|
||||
public void run() throws InterruptedException {
|
||||
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(),
|
||||
response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes,
|
||||
translogView.totalOperations());
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
|
||||
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
|
||||
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
}
|
||||
cancellableThreads.execute(() -> {
|
||||
RecoveryFilesInfoRequest recoveryInfoFilesRequest = new RecoveryFilesInfoRequest(request.recoveryId(), request.shardId(),
|
||||
response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames, response.phase1ExistingFileSizes,
|
||||
translogView.totalOperations());
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILES_INFO, recoveryInfoFilesRequest,
|
||||
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
|
||||
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
});
|
||||
|
||||
// This latch will be used to wait until all files have been transferred to the target node
|
||||
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
|
||||
final CopyOnWriteArrayList<Throwable> exceptions = new CopyOnWriteArrayList<>();
|
||||
final AtomicReference<Throwable> corruptedEngine = new AtomicReference<>();
|
||||
int fileIndex = 0;
|
||||
ThreadPoolExecutor pool;
|
||||
|
||||
// How many bytes we've copied since we last called RateLimiter.pause
|
||||
final AtomicLong bytesSinceLastPause = new AtomicLong();
|
||||
|
||||
for (final String name : response.phase1FileNames) {
|
||||
long fileSize = response.phase1FileSizes.get(fileIndex);
|
||||
|
||||
// Files are split into two categories, files that are "small"
|
||||
// (under 5mb) and other files. Small files are transferred
|
||||
// using a separate thread pool dedicated to small files.
|
||||
final Function<StoreFileMetaData, OutputStream> outputStreamFactories = (md) -> new RecoveryOutputStream(md, bytesSinceLastPause, translogView);
|
||||
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories);
|
||||
cancellableThreads.execute(() -> {
|
||||
// Send the CLEAN_FILES request, which takes all of the files that
|
||||
// were transferred and renames them from their temporary file
|
||||
// names to the actual file names. It also writes checksums for
|
||||
// the files after they have been renamed.
|
||||
//
|
||||
// The idea behind this is that while we are transferring an
|
||||
// older, large index, a user may create a new index, but that
|
||||
// index will not be able to recover until the large index
|
||||
// finishes, by using two different thread pools we can allow
|
||||
// tiny files (like segments for a brand new index) to be
|
||||
// recovered while ongoing large segment recoveries are
|
||||
// happening. It also allows these pools to be configured
|
||||
// separately.
|
||||
if (fileSize > RecoverySettings.SMALL_FILE_CUTOFF_BYTES) {
|
||||
pool = recoverySettings.concurrentStreamPool();
|
||||
} else {
|
||||
pool = recoverySettings.concurrentSmallFileStreamPool();
|
||||
}
|
||||
|
||||
pool.execute(new AbstractRunnable() {
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
// we either got rejected or the store can't be incremented / we are canceled
|
||||
logger.debug("Failed to transfer file [" + name + "] on recovery");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onAfter() {
|
||||
// Signify this file has completed by decrementing the latch
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() {
|
||||
cancellableThreads.checkForCancel();
|
||||
store.incRef();
|
||||
final StoreFileMetaData md = recoverySourceMetadata.get(name);
|
||||
try (final IndexInput indexInput = store.directory().openInput(name, IOContext.READONCE)) {
|
||||
final int BUFFER_SIZE = (int) Math.max(1, recoverySettings.fileChunkSize().bytes()); // at least one!
|
||||
final byte[] buf = new byte[BUFFER_SIZE];
|
||||
boolean shouldCompressRequest = recoverySettings.compress();
|
||||
if (CompressorFactory.isCompressed(indexInput)) {
|
||||
shouldCompressRequest = false;
|
||||
}
|
||||
|
||||
final long len = indexInput.length();
|
||||
long readCount = 0;
|
||||
final TransportRequestOptions requestOptions = TransportRequestOptions.options()
|
||||
.withCompress(shouldCompressRequest)
|
||||
.withType(TransportRequestOptions.Type.RECOVERY)
|
||||
.withTimeout(recoverySettings.internalActionTimeout());
|
||||
|
||||
while (readCount < len) {
|
||||
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
|
||||
throw new IndexShardClosedException(shard.shardId());
|
||||
// Once the files have been renamed, any other files that are not
|
||||
// related to this recovery (out of date segments, for example)
|
||||
// are deleted
|
||||
try {
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
|
||||
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()),
|
||||
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
|
||||
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
} catch (RemoteTransportException remoteException) {
|
||||
final IOException corruptIndexException;
|
||||
// we realized that after the index was copied and we wanted to finalize the recovery
|
||||
// the index was corrupted:
|
||||
// - maybe due to a broken segments file on an empty index (transferred with no checksum)
|
||||
// - maybe due to old segments without checksums or length only checks
|
||||
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) {
|
||||
try {
|
||||
final Store.MetadataSnapshot recoverySourceMetadata1 = store.getMetadata(snapshot);
|
||||
StoreFileMetaData[] metadata =
|
||||
StreamSupport.stream(recoverySourceMetadata1.spliterator(), false).toArray(size -> new StoreFileMetaData[size]);
|
||||
ArrayUtil.timSort(metadata, new Comparator<StoreFileMetaData>() {
|
||||
@Override
|
||||
public int compare(StoreFileMetaData o1, StoreFileMetaData o2) {
|
||||
return Long.compare(o1.length(), o2.length()); // check small files first
|
||||
}
|
||||
int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE;
|
||||
final long position = indexInput.getFilePointer();
|
||||
|
||||
// Pause using the rate limiter, if desired, to throttle the recovery
|
||||
RateLimiter rl = recoverySettings.rateLimiter();
|
||||
long throttleTimeInNanos = 0;
|
||||
if (rl != null) {
|
||||
long bytes = bytesSinceLastPause.addAndGet(toRead);
|
||||
if (bytes > rl.getMinPauseCheckBytes()) {
|
||||
// Time to pause
|
||||
bytesSinceLastPause.addAndGet(-bytes);
|
||||
throttleTimeInNanos = rl.pause(bytes);
|
||||
shard.recoveryStats().addThrottleTime(throttleTimeInNanos);
|
||||
}
|
||||
}
|
||||
indexInput.readBytes(buf, 0, toRead, false);
|
||||
final BytesArray content = new BytesArray(buf, 0, toRead);
|
||||
readCount += toRead;
|
||||
final boolean lastChunk = readCount == len;
|
||||
final RecoveryFileChunkRequest fileChunkRequest = new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position,
|
||||
content, lastChunk, translogView.totalOperations(), throttleTimeInNanos);
|
||||
cancellableThreads.execute(new Interruptable() {
|
||||
@Override
|
||||
public void run() throws InterruptedException {
|
||||
// Actually send the file chunk to the target node, waiting for it to complete
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK,
|
||||
fileChunkRequest, requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
final Throwable corruptIndexException;
|
||||
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(e)) != null) {
|
||||
});
|
||||
for (StoreFileMetaData md : metadata) {
|
||||
logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md);
|
||||
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
|
||||
shard.engine().failEngine("recovery", corruptIndexException);
|
||||
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
|
||||
if (corruptedEngine.compareAndSet(null, corruptIndexException) == false) {
|
||||
// if we are not the first exception, add ourselves as suppressed to the main one:
|
||||
corruptedEngine.get().addSuppressed(e);
|
||||
}
|
||||
} else { // corruption has happened on the way to replica
|
||||
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
|
||||
exception.addSuppressed(e);
|
||||
exceptions.add(0, exception); // last exception first
|
||||
logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum OK",
|
||||
corruptIndexException, shard.shardId(), request.targetNode(), md);
|
||||
|
||||
throw corruptIndexException;
|
||||
}
|
||||
} else {
|
||||
exceptions.add(0, e); // last exceptions first
|
||||
}
|
||||
} finally {
|
||||
store.decRef();
|
||||
|
||||
}
|
||||
}
|
||||
});
|
||||
fileIndex++;
|
||||
}
|
||||
|
||||
cancellableThreads.execute(new Interruptable() {
|
||||
@Override
|
||||
public void run() throws InterruptedException {
|
||||
// Wait for all files that need to be transferred to finish transferring
|
||||
latch.await();
|
||||
}
|
||||
});
|
||||
|
||||
if (corruptedEngine.get() != null) {
|
||||
shard.engine().failEngine("recovery", corruptedEngine.get());
|
||||
throw corruptedEngine.get();
|
||||
} else {
|
||||
ExceptionsHelper.rethrowAndSuppress(exceptions);
|
||||
}
|
||||
|
||||
cancellableThreads.execute(new Interruptable() {
|
||||
@Override
|
||||
public void run() throws InterruptedException {
|
||||
// Send the CLEAN_FILES request, which takes all of the files that
|
||||
// were transferred and renames them from their temporary file
|
||||
// names to the actual file names. It also writes checksums for
|
||||
// the files after they have been renamed.
|
||||
//
|
||||
// Once the files have been renamed, any other files that are not
|
||||
// related to this recovery (out of date segments, for example)
|
||||
// are deleted
|
||||
try {
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.CLEAN_FILES,
|
||||
new RecoveryCleanFilesRequest(request.recoveryId(), shard.shardId(), recoverySourceMetadata, translogView.totalOperations()),
|
||||
TransportRequestOptions.options().withTimeout(recoverySettings.internalActionTimeout()),
|
||||
EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
} catch (RemoteTransportException remoteException) {
|
||||
final IOException corruptIndexException;
|
||||
// we realized that after the index was copied and we wanted to finalize the recovery
|
||||
// the index was corrupted:
|
||||
// - maybe due to a broken segments file on an empty index (transferred with no checksum)
|
||||
// - maybe due to old segments without checksums or length only checks
|
||||
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(remoteException)) != null) {
|
||||
try {
|
||||
final Store.MetadataSnapshot recoverySourceMetadata = store.getMetadata(snapshot);
|
||||
StoreFileMetaData[] metadata =
|
||||
StreamSupport.stream(recoverySourceMetadata.spliterator(), false).toArray(size -> new StoreFileMetaData[size]);
|
||||
ArrayUtil.timSort(metadata, new Comparator<StoreFileMetaData>() {
|
||||
@Override
|
||||
public int compare(StoreFileMetaData o1, StoreFileMetaData o2) {
|
||||
return Long.compare(o1.length(), o2.length()); // check small files first
|
||||
}
|
||||
});
|
||||
for (StoreFileMetaData md : metadata) {
|
||||
logger.debug("{} checking integrity for file {} after remove corruption exception", shard.shardId(), md);
|
||||
if (store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
|
||||
shard.engine().failEngine("recovery", corruptIndexException);
|
||||
logger.warn("{} Corrupted file detected {} checksum mismatch", shard.shardId(), md);
|
||||
throw corruptIndexException;
|
||||
}
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
remoteException.addSuppressed(ex);
|
||||
throw remoteException;
|
||||
}
|
||||
// corruption has happened on the way to replica
|
||||
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
|
||||
exception.addSuppressed(remoteException);
|
||||
logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK",
|
||||
corruptIndexException, shard.shardId(), request.targetNode());
|
||||
throw exception;
|
||||
} else {
|
||||
} catch (IOException ex) {
|
||||
remoteException.addSuppressed(ex);
|
||||
throw remoteException;
|
||||
}
|
||||
// corruption has happened on the way to replica
|
||||
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
|
||||
exception.addSuppressed(remoteException);
|
||||
logger.warn("{} Remote file corruption during finalization on node {}, recovering {}. local checksum OK",
|
||||
corruptIndexException, shard.shardId(), request.targetNode());
|
||||
throw exception;
|
||||
} else {
|
||||
throw remoteException;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -460,6 +320,8 @@ public class RecoverySourceHandler {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
protected void prepareTargetForTranslog(final Translog.View translogView) {
|
||||
StopWatch stopWatch = new StopWatch().start();
|
||||
logger.trace("{} recovery [phase1] to {}: prepare remote engine for translog", request.shardId(), request.targetNode());
|
||||
|
@ -603,14 +465,11 @@ public class RecoverySourceHandler {
|
|||
// recoverySettings.rateLimiter().pause(size);
|
||||
// }
|
||||
|
||||
cancellableThreads.execute(new Interruptable() {
|
||||
@Override
|
||||
public void run() throws InterruptedException {
|
||||
final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
|
||||
request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations());
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
|
||||
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
}
|
||||
cancellableThreads.execute(() -> {
|
||||
final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
|
||||
request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations());
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
|
||||
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
});
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("[{}][{}] sent batch of [{}][{}] (total: [{}]) translog operations to {}",
|
||||
|
@ -631,14 +490,11 @@ public class RecoverySourceHandler {
|
|||
}
|
||||
// send the leftover
|
||||
if (!operations.isEmpty()) {
|
||||
cancellableThreads.execute(new Interruptable() {
|
||||
@Override
|
||||
public void run() throws InterruptedException {
|
||||
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
|
||||
request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations());
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
|
||||
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
}
|
||||
cancellableThreads.execute(() -> {
|
||||
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
|
||||
request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations());
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
|
||||
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
});
|
||||
|
||||
}
|
||||
|
@ -667,4 +523,165 @@ public class RecoverySourceHandler {
|
|||
'}';
|
||||
}
|
||||
|
||||
|
||||
final class RecoveryOutputStream extends OutputStream {
|
||||
private final StoreFileMetaData md;
|
||||
private final AtomicLong bytesSinceLastPause;
|
||||
private final Translog.View translogView;
|
||||
private long position = 0;
|
||||
|
||||
RecoveryOutputStream(StoreFileMetaData md, AtomicLong bytesSinceLastPause, Translog.View translogView) {
|
||||
this.md = md;
|
||||
this.bytesSinceLastPause = bytesSinceLastPause;
|
||||
this.translogView = translogView;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void write(int b) throws IOException {
|
||||
write(new byte[]{(byte) b}, 0, 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void write(byte[] b, int offset, int length) throws IOException {
|
||||
sendNextChunk(position, new BytesArray(b, offset, length), md.length() == position + length);
|
||||
position += length;
|
||||
assert md.length() >= position : "length: " + md.length() + " but positions was: " + position;
|
||||
}
|
||||
|
||||
private void sendNextChunk(long position, BytesArray content, boolean lastChunk) throws IOException {
|
||||
cancellableThreads.execute(() -> {
|
||||
// Pause using the rate limiter, if desired, to throttle the recovery
|
||||
final long throttleTimeInNanos;
|
||||
final RateLimiter rl = recoverySettings.rateLimiter();
|
||||
if (rl != null) {
|
||||
long bytes = bytesSinceLastPause.addAndGet(content.length());
|
||||
if (bytes > rl.getMinPauseCheckBytes()) {
|
||||
// Time to pause
|
||||
bytesSinceLastPause.addAndGet(-bytes);
|
||||
try {
|
||||
throttleTimeInNanos = rl.pause(bytes);
|
||||
shard.recoveryStats().addThrottleTime(throttleTimeInNanos);
|
||||
} catch (IOException e) {
|
||||
throw new ElasticsearchException("failed to pause recovery", e);
|
||||
}
|
||||
} else {
|
||||
throttleTimeInNanos = 0;
|
||||
}
|
||||
} else {
|
||||
throttleTimeInNanos = 0;
|
||||
}
|
||||
// Actually send the file chunk to the target node, waiting for it to complete
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK,
|
||||
new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content, lastChunk,
|
||||
translogView.totalOperations(),
|
||||
/* we send totalOperations with every request since we collect stats on the target and that way we can
|
||||
* see how many translog ops we accumulate while copying files across the network. A future optimization
|
||||
* would be in to restart file copy again (new deltas) if we have too many translog ops are piling up.
|
||||
*/
|
||||
throttleTimeInNanos), requestOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
});
|
||||
if (shard.state() == IndexShardState.CLOSED) { // check if the shard got closed on us
|
||||
throw new IndexShardClosedException(request.shardId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void sendFiles(Store store, StoreFileMetaData[] files, Function<StoreFileMetaData, OutputStream> outputStreamFactory) throws Throwable {
|
||||
store.incRef();
|
||||
try {
|
||||
Future[] runners = asyncSendFiles(store, files, outputStreamFactory);
|
||||
IOException corruptedEngine = null;
|
||||
final List<Throwable> exceptions = new ArrayList<>();
|
||||
for (int i = 0; i < runners.length; i++) {
|
||||
StoreFileMetaData md = files[i];
|
||||
try {
|
||||
runners[i].get();
|
||||
} catch (ExecutionException t) {
|
||||
corruptedEngine = handleExecutionException(store, corruptedEngine, exceptions, md, t.getCause());
|
||||
} catch (InterruptedException t) {
|
||||
corruptedEngine = handleExecutionException(store, corruptedEngine, exceptions, md, t);
|
||||
}
|
||||
}
|
||||
if (corruptedEngine != null) {
|
||||
failEngine(corruptedEngine);
|
||||
throw corruptedEngine;
|
||||
} else {
|
||||
ExceptionsHelper.rethrowAndSuppress(exceptions);
|
||||
}
|
||||
} finally {
|
||||
store.decRef();
|
||||
}
|
||||
}
|
||||
|
||||
private IOException handleExecutionException(Store store, IOException corruptedEngine, List<Throwable> exceptions, StoreFileMetaData md, Throwable t) {
|
||||
logger.debug("Failed to transfer file [" + md + "] on recovery");
|
||||
final IOException corruptIndexException;
|
||||
final boolean checkIntegrity = corruptedEngine == null;
|
||||
if ((corruptIndexException = ExceptionsHelper.unwrapCorruption(t)) != null) {
|
||||
if (checkIntegrity && store.checkIntegrityNoException(md) == false) { // we are corrupted on the primary -- fail!
|
||||
logger.warn("{} Corrupted file detected {} checksum mismatch", shardId, md);
|
||||
corruptedEngine = corruptIndexException;
|
||||
} else { // corruption has happened on the way to replica
|
||||
RemoteTransportException exception = new RemoteTransportException("File corruption occurred on recovery but checksums are ok", null);
|
||||
exception.addSuppressed(t);
|
||||
if (checkIntegrity) {
|
||||
logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum OK",
|
||||
corruptIndexException, shardId, request.targetNode(), md);
|
||||
} else {
|
||||
logger.warn("{} Remote file corruption on node {}, recovering {}. local checksum are skipped",
|
||||
corruptIndexException, shardId, request.targetNode(), md);
|
||||
}
|
||||
exceptions.add(exception);
|
||||
|
||||
}
|
||||
} else {
|
||||
exceptions.add(t);
|
||||
}
|
||||
return corruptedEngine;
|
||||
}
|
||||
|
||||
protected void failEngine(IOException cause) {
|
||||
shard.engine().failEngine("recovery", cause);
|
||||
}
|
||||
|
||||
Future<Void>[] asyncSendFiles(Store store, StoreFileMetaData[] files, Function<StoreFileMetaData, OutputStream> outputStreamFactory) {
|
||||
store.incRef();
|
||||
try {
|
||||
final Future<Void>[] futures = new Future[files.length];
|
||||
for (int i = 0; i < files.length; i++) {
|
||||
final StoreFileMetaData md = files[i];
|
||||
long fileSize = md.length();
|
||||
|
||||
// Files are split into two categories, files that are "small"
|
||||
// (under 5mb) and other files. Small files are transferred
|
||||
// using a separate thread pool dedicated to small files.
|
||||
//
|
||||
// The idea behind this is that while we are transferring an
|
||||
// older, large index, a user may create a new index, but that
|
||||
// index will not be able to recover until the large index
|
||||
// finishes, by using two different thread pools we can allow
|
||||
// tiny files (like segments for a brand new index) to be
|
||||
// recovered while ongoing large segment recoveries are
|
||||
// happening. It also allows these pools to be configured
|
||||
// separately.
|
||||
ThreadPoolExecutor pool;
|
||||
if (fileSize > RecoverySettings.SMALL_FILE_CUTOFF_BYTES) {
|
||||
pool = recoverySettings.concurrentStreamPool();
|
||||
} else {
|
||||
pool = recoverySettings.concurrentSmallFileStreamPool();
|
||||
}
|
||||
Future<Void> future = pool.submit(() -> {
|
||||
try (final OutputStream outputStream = outputStreamFactory.apply(md);
|
||||
final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
|
||||
Streams.copy(new InputStreamIndexInput(indexInput, md.length()), outputStream);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
futures[i] = future;
|
||||
}
|
||||
return futures;
|
||||
} finally {
|
||||
store.decRef();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,14 +20,9 @@ package org.elasticsearch.index.store;
|
|||
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import org.apache.lucene.codecs.CodecUtil;
|
||||
|
||||
import org.apache.lucene.index.CheckIndex;
|
||||
import org.apache.lucene.index.IndexFileNames;
|
||||
import org.apache.lucene.store.ChecksumIndexInput;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
import org.apache.lucene.store.IOContext;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus;
|
||||
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||
|
@ -71,6 +66,7 @@ import org.elasticsearch.monitor.fs.FsInfo;
|
|||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.snapshots.SnapshotState;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.CorruptionUtils;
|
||||
import org.elasticsearch.test.InternalTestCluster;
|
||||
import org.elasticsearch.test.store.MockFSDirectoryService;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
|
@ -83,12 +79,9 @@ import org.junit.Test;
|
|||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PrintStream;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.DirectoryStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -649,54 +642,7 @@ public class CorruptedFileIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
pruneOldDeleteGenerations(files);
|
||||
Path fileToCorrupt = null;
|
||||
if (!files.isEmpty()) {
|
||||
fileToCorrupt = RandomPicks.randomFrom(getRandom(), files);
|
||||
try (Directory dir = FSDirectory.open(fileToCorrupt.toAbsolutePath().getParent())) {
|
||||
long checksumBeforeCorruption;
|
||||
try (IndexInput input = dir.openInput(fileToCorrupt.getFileName().toString(), IOContext.DEFAULT)) {
|
||||
checksumBeforeCorruption = CodecUtil.retrieveChecksum(input);
|
||||
}
|
||||
try (FileChannel raf = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
|
||||
// read
|
||||
raf.position(randomIntBetween(0, (int) Math.min(Integer.MAX_VALUE, raf.size() - 1)));
|
||||
long filePointer = raf.position();
|
||||
ByteBuffer bb = ByteBuffer.wrap(new byte[1]);
|
||||
raf.read(bb);
|
||||
bb.flip();
|
||||
|
||||
// corrupt
|
||||
byte oldValue = bb.get(0);
|
||||
byte newValue = (byte) (oldValue + 1);
|
||||
bb.put(0, newValue);
|
||||
|
||||
// rewrite
|
||||
raf.position(filePointer);
|
||||
raf.write(bb);
|
||||
logger.info("Corrupting file for shard {} -- flipping at position {} from {} to {} file: {}", shardRouting, filePointer, Integer.toHexString(oldValue), Integer.toHexString(newValue), fileToCorrupt.getFileName());
|
||||
}
|
||||
long checksumAfterCorruption;
|
||||
long actualChecksumAfterCorruption;
|
||||
try (ChecksumIndexInput input = dir.openChecksumInput(fileToCorrupt.getFileName().toString(), IOContext.DEFAULT)) {
|
||||
assertThat(input.getFilePointer(), is(0l));
|
||||
input.seek(input.length() - 8); // one long is the checksum... 8 bytes
|
||||
checksumAfterCorruption = input.getChecksum();
|
||||
actualChecksumAfterCorruption = input.readLong();
|
||||
}
|
||||
// we need to add assumptions here that the checksums actually really don't match there is a small chance to get collisions
|
||||
// in the checksum which is ok though....
|
||||
StringBuilder msg = new StringBuilder();
|
||||
msg.append("Checksum before: [").append(checksumBeforeCorruption).append("]");
|
||||
msg.append(" after: [").append(checksumAfterCorruption).append("]");
|
||||
msg.append(" checksum value after corruption: ").append(actualChecksumAfterCorruption).append("]");
|
||||
msg.append(" file: ").append(fileToCorrupt.getFileName()).append(" length: ").append(dir.fileLength(fileToCorrupt.getFileName().toString()));
|
||||
logger.info(msg.toString());
|
||||
assumeTrue("Checksum collision - " + msg.toString(),
|
||||
checksumAfterCorruption != checksumBeforeCorruption // collision
|
||||
|| actualChecksumAfterCorruption != checksumBeforeCorruption); // checksum corrupted
|
||||
}
|
||||
}
|
||||
assertThat("no file corrupted", fileToCorrupt, notNullValue());
|
||||
CorruptionUtils.corruptFile(getRandom(), files.toArray(new Path[0]));
|
||||
return shardRouting;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,227 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.indices.recovery;
|
||||
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.StringField;
|
||||
import org.apache.lucene.document.TextField;
|
||||
import org.apache.lucene.index.*;
|
||||
import org.apache.lucene.store.*;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.DummyTransportAddress;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.store.DirectoryService;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.store.StoreFileMetaData;
|
||||
import org.elasticsearch.node.settings.NodeSettingsService;
|
||||
import org.elasticsearch.test.DummyShardLock;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.CorruptionUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.hamcrest.Matchers.is;
|
||||
|
||||
public class RecoverySourceHandlerTests extends ESTestCase {
|
||||
|
||||
private final ShardId shardId = new ShardId(new Index("index"), 1);
|
||||
private final NodeSettingsService service = new NodeSettingsService(Settings.EMPTY);
|
||||
|
||||
public void testSendFiles() throws Throwable {
|
||||
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
|
||||
StartRecoveryRequest request = new StartRecoveryRequest(shardId,
|
||||
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
|
||||
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
|
||||
randomBoolean(), null, RecoveryState.Type.STORE, randomLong());
|
||||
Store store = newStore(createTempDir());
|
||||
RecoverySourceHandler handler = new RecoverySourceHandler(null, request, recoverySettings, null, logger);
|
||||
Directory dir = store.directory();
|
||||
RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig());
|
||||
int numDocs = randomIntBetween(10, 100);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
Document document = new Document();
|
||||
document.add(new StringField("id", Integer.toString(i), Field.Store.YES));
|
||||
document.add(newField("field", randomUnicodeOfCodepointLengthBetween(1, 10), TextField.TYPE_STORED));
|
||||
writer.addDocument(document);
|
||||
}
|
||||
writer.commit();
|
||||
Store.MetadataSnapshot metadata = store.getMetadata();
|
||||
List<StoreFileMetaData> metas = new ArrayList<>();
|
||||
for (StoreFileMetaData md : metadata) {
|
||||
metas.add(md);
|
||||
}
|
||||
Store targetStore = newStore(createTempDir());
|
||||
handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> {
|
||||
try {
|
||||
return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
Store.MetadataSnapshot targetStoreMetadata = targetStore.getMetadata();
|
||||
Store.RecoveryDiff recoveryDiff = targetStoreMetadata.recoveryDiff(metadata);
|
||||
assertEquals(metas.size(), recoveryDiff.identical.size());
|
||||
assertEquals(0, recoveryDiff.different.size());
|
||||
assertEquals(0, recoveryDiff.missing.size());
|
||||
IndexReader reader = DirectoryReader.open(targetStore.directory());
|
||||
assertEquals(numDocs, reader.maxDoc());
|
||||
IOUtils.close(reader, writer, store, targetStore, recoverySettings);
|
||||
}
|
||||
|
||||
public void testHandleCorruptedIndexOnSendSendFiles() throws Throwable {
|
||||
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
|
||||
StartRecoveryRequest request = new StartRecoveryRequest(shardId,
|
||||
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
|
||||
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
|
||||
randomBoolean(), null, RecoveryState.Type.STORE, randomLong());
|
||||
Path tempDir = createTempDir();
|
||||
Store store = newStore(tempDir);
|
||||
AtomicBoolean failedEngine = new AtomicBoolean(false);
|
||||
RecoverySourceHandler handler = new RecoverySourceHandler(null, request, recoverySettings, null, logger) {
|
||||
@Override
|
||||
protected void failEngine(IOException cause) {
|
||||
assertFalse(failedEngine.get());
|
||||
failedEngine.set(true);
|
||||
}
|
||||
};
|
||||
Directory dir = store.directory();
|
||||
RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig());
|
||||
int numDocs = randomIntBetween(10, 100);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
Document document = new Document();
|
||||
document.add(new StringField("id", Integer.toString(i), Field.Store.YES));
|
||||
document.add(newField("field", randomUnicodeOfCodepointLengthBetween(1, 10), TextField.TYPE_STORED));
|
||||
writer.addDocument(document);
|
||||
}
|
||||
writer.commit();
|
||||
writer.close();
|
||||
|
||||
Store.MetadataSnapshot metadata = store.getMetadata();
|
||||
List<StoreFileMetaData> metas = new ArrayList<>();
|
||||
for (StoreFileMetaData md : metadata) {
|
||||
metas.add(md);
|
||||
}
|
||||
CorruptionUtils.corruptFile(getRandom(), FileSystemUtils.files(tempDir, (p) ->
|
||||
(p.getFileName().toString().equals("write.lock") ||
|
||||
Files.isDirectory(p)) == false));
|
||||
Store targetStore = newStore(createTempDir());
|
||||
try {
|
||||
handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> {
|
||||
try {
|
||||
return new IndexOutputOutputStream(targetStore.createVerifyingOutput(md.name(), md, IOContext.DEFAULT));
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
fail("corrupted index");
|
||||
} catch (IOException ex) {
|
||||
assertNotNull(ExceptionsHelper.unwrapCorruption(ex));
|
||||
}
|
||||
assertTrue(failedEngine.get());
|
||||
IOUtils.close(store, targetStore, recoverySettings);
|
||||
}
|
||||
|
||||
public void testHandleExceptinoOnSendSendFiles() throws Throwable {
|
||||
final RecoverySettings recoverySettings = new RecoverySettings(Settings.EMPTY, service);
|
||||
StartRecoveryRequest request = new StartRecoveryRequest(shardId,
|
||||
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
|
||||
new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT),
|
||||
randomBoolean(), null, RecoveryState.Type.STORE, randomLong());
|
||||
Path tempDir = createTempDir();
|
||||
Store store = newStore(tempDir);
|
||||
AtomicBoolean failedEngine = new AtomicBoolean(false);
|
||||
RecoverySourceHandler handler = new RecoverySourceHandler(null, request, recoverySettings, null, logger) {
|
||||
@Override
|
||||
protected void failEngine(IOException cause) {
|
||||
assertFalse(failedEngine.get());
|
||||
failedEngine.set(true);
|
||||
}
|
||||
};
|
||||
Directory dir = store.directory();
|
||||
RandomIndexWriter writer = new RandomIndexWriter(random(), dir, newIndexWriterConfig());
|
||||
int numDocs = randomIntBetween(10, 100);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
Document document = new Document();
|
||||
document.add(new StringField("id", Integer.toString(i), Field.Store.YES));
|
||||
document.add(newField("field", randomUnicodeOfCodepointLengthBetween(1, 10), TextField.TYPE_STORED));
|
||||
writer.addDocument(document);
|
||||
}
|
||||
writer.commit();
|
||||
writer.close();
|
||||
|
||||
Store.MetadataSnapshot metadata = store.getMetadata();
|
||||
List<StoreFileMetaData> metas = new ArrayList<>();
|
||||
for (StoreFileMetaData md : metadata) {
|
||||
metas.add(md);
|
||||
}
|
||||
final boolean throwCorruptedIndexException = randomBoolean();
|
||||
Store targetStore = newStore(createTempDir());
|
||||
try {
|
||||
handler.sendFiles(store, metas.toArray(new StoreFileMetaData[0]), (md) -> {
|
||||
if (throwCorruptedIndexException) {
|
||||
throw new RuntimeException(new CorruptIndexException("foo", "bar"));
|
||||
} else {
|
||||
throw new RuntimeException("boom");
|
||||
}
|
||||
});
|
||||
fail("exception index");
|
||||
} catch (RuntimeException ex) {
|
||||
assertNull(ExceptionsHelper.unwrapCorruption(ex));
|
||||
if (throwCorruptedIndexException) {
|
||||
assertEquals(ex.getMessage(), "[File corruption occurred on recovery but checksums are ok]");
|
||||
} else {
|
||||
assertEquals(ex.getMessage(), "boom");
|
||||
}
|
||||
} catch (CorruptIndexException ex) {
|
||||
fail("not expected here");
|
||||
}
|
||||
assertFalse(failedEngine.get());
|
||||
IOUtils.close(store, targetStore, recoverySettings);
|
||||
}
|
||||
|
||||
private Store newStore(Path path) throws IOException {
|
||||
DirectoryService directoryService = new DirectoryService(shardId, Settings.EMPTY) {
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Directory newDirectory() throws IOException {
|
||||
return RecoverySourceHandlerTests.newFSDirectory(path);
|
||||
}
|
||||
};
|
||||
return new Store(shardId, Settings.EMPTY, directoryService, new DummyShardLock(shardId));
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.test;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
|
||||
import org.apache.lucene.codecs.CodecUtil;
|
||||
import org.apache.lucene.store.*;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.logging.ESLoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.util.Random;
|
||||
|
||||
import static org.apache.lucene.util.LuceneTestCase.assumeTrue;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
||||
public final class CorruptionUtils {
|
||||
private static ESLogger logger = ESLoggerFactory.getLogger("test");
|
||||
private CorruptionUtils() {}
|
||||
|
||||
/**
|
||||
* Corrupts a random file at a random position
|
||||
*/
|
||||
public static void corruptFile(Random random, Path... files) throws IOException {
|
||||
assertTrue("files must be non-empty", files.length > 0);
|
||||
final Path fileToCorrupt = RandomPicks.randomFrom(random, files);
|
||||
assertTrue(fileToCorrupt + " is not a file", Files.isRegularFile(fileToCorrupt));
|
||||
try (Directory dir = FSDirectory.open(fileToCorrupt.toAbsolutePath().getParent())) {
|
||||
long checksumBeforeCorruption;
|
||||
try (IndexInput input = dir.openInput(fileToCorrupt.getFileName().toString(), IOContext.DEFAULT)) {
|
||||
checksumBeforeCorruption = CodecUtil.retrieveChecksum(input);
|
||||
}
|
||||
try (FileChannel raf = FileChannel.open(fileToCorrupt, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
|
||||
// read
|
||||
raf.position(random.nextInt((int) Math.min(Integer.MAX_VALUE, raf.size())));
|
||||
long filePointer = raf.position();
|
||||
ByteBuffer bb = ByteBuffer.wrap(new byte[1]);
|
||||
raf.read(bb);
|
||||
bb.flip();
|
||||
|
||||
// corrupt
|
||||
byte oldValue = bb.get(0);
|
||||
byte newValue = (byte) (oldValue + 1);
|
||||
bb.put(0, newValue);
|
||||
|
||||
// rewrite
|
||||
raf.position(filePointer);
|
||||
raf.write(bb);
|
||||
logger.info("Corrupting file -- flipping at position {} from {} to {} file: {}", filePointer, Integer.toHexString(oldValue), Integer.toHexString(newValue), fileToCorrupt.getFileName());
|
||||
}
|
||||
long checksumAfterCorruption;
|
||||
long actualChecksumAfterCorruption;
|
||||
try (ChecksumIndexInput input = dir.openChecksumInput(fileToCorrupt.getFileName().toString(), IOContext.DEFAULT)) {
|
||||
assertThat(input.getFilePointer(), is(0l));
|
||||
input.seek(input.length() - 8); // one long is the checksum... 8 bytes
|
||||
checksumAfterCorruption = input.getChecksum();
|
||||
actualChecksumAfterCorruption = input.readLong();
|
||||
}
|
||||
// we need to add assumptions here that the checksums actually really don't match there is a small chance to get collisions
|
||||
// in the checksum which is ok though....
|
||||
StringBuilder msg = new StringBuilder();
|
||||
msg.append("Checksum before: [").append(checksumBeforeCorruption).append("]");
|
||||
msg.append(" after: [").append(checksumAfterCorruption).append("]");
|
||||
msg.append(" checksum value after corruption: ").append(actualChecksumAfterCorruption).append("]");
|
||||
msg.append(" file: ").append(fileToCorrupt.getFileName()).append(" length: ").append(dir.fileLength(fileToCorrupt.getFileName().toString()));
|
||||
logger.info(msg.toString());
|
||||
assumeTrue("Checksum collision - " + msg.toString(),
|
||||
checksumAfterCorruption != checksumBeforeCorruption // collision
|
||||
|| actualChecksumAfterCorruption != checksumBeforeCorruption); // checksum corrupted
|
||||
assertThat("no file corrupted", fileToCorrupt, notNullValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Reference in New Issue