[RECOVERY] Mark last file chunk to fail fast if payload is truncated

Today we rely on the metadata length of the file we are recoverying
to indicate when the last chunk was received. Yet, this might hide bugs
on the compression layer if payloads are truncated. We should indicate
if the last chunk is send to make sure we validate checksums
accordingly if possible.

Closes #7830
This commit is contained in:
Simon Willnauer 2014-09-23 09:50:06 +02:00
parent 5533495171
commit 6c8aa5fa6c
5 changed files with 31 additions and 12 deletions

View File

@ -275,6 +275,15 @@ public class InternalEngine extends AbstractIndexShardComponent implements Engin
mergeScheduler.addListener(throttle);
} catch (IOException e) {
maybeFailEngine(e, "start");
if (this.indexWriter != null) {
try {
IndexWriter pending = indexWriter;
indexWriter = null;
pending.rollback();
} catch (IOException e1) {
e.addSuppressed(e1);
}
}
throw new EngineCreationFailureException(shardId, "failed to create engine", e);
}

View File

@ -35,7 +35,7 @@ import java.io.IOException;
*
*/
public final class RecoveryFileChunkRequest extends TransportRequest { // public for testing
private boolean lastChunk;
private long recoveryId;
private ShardId shardId;
private long position;
@ -45,12 +45,13 @@ public final class RecoveryFileChunkRequest extends TransportRequest { // publi
RecoveryFileChunkRequest() {
}
public RecoveryFileChunkRequest(long recoveryId, ShardId shardId, StoreFileMetaData metaData, long position, BytesReference content) {
public RecoveryFileChunkRequest(long recoveryId, ShardId shardId, StoreFileMetaData metaData, long position, BytesReference content, boolean lastChunk) {
this.recoveryId = recoveryId;
this.shardId = shardId;
this.metaData = metaData;
this.position = position;
this.content = content;
this.lastChunk = lastChunk;
}
public long recoveryId() {
@ -82,12 +83,6 @@ public final class RecoveryFileChunkRequest extends TransportRequest { // publi
return content;
}
public RecoveryFileChunkRequest readFileChunk(StreamInput in) throws IOException {
RecoveryFileChunkRequest request = new RecoveryFileChunkRequest();
request.readFrom(in);
return request;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
@ -104,6 +99,11 @@ public final class RecoveryFileChunkRequest extends TransportRequest { // publi
writtenBy = Lucene.parseVersionLenient(versionString, null);
}
metaData = new StoreFileMetaData(name, length, checksum, writtenBy);
if (in.getVersion().onOrAfter(org.elasticsearch.Version.V_1_4_0_Beta1)) {
lastChunk = in.readBoolean();
} else {
lastChunk = false;
}
}
@Override
@ -119,6 +119,9 @@ public final class RecoveryFileChunkRequest extends TransportRequest { // publi
if (out.getVersion().onOrAfter(org.elasticsearch.Version.V_1_3_0)) {
out.writeOptionalString(metaData.writtenBy() == null ? null : metaData.writtenBy().toString());
}
if (out.getVersion().onOrAfter(org.elasticsearch.Version.V_1_4_0_Beta1)) {
out.writeBoolean(lastChunk);
}
}
@Override
@ -131,4 +134,11 @@ public final class RecoveryFileChunkRequest extends TransportRequest { // publi
public StoreFileMetaData metadata() {
return metaData;
}
/**
* Returns <code>true</code> if this chunk is the last chunk in the stream.
*/
public boolean lastChunk() {
return lastChunk;
}
}

View File

@ -223,9 +223,9 @@ public class RecoverySource extends AbstractComponent {
indexInput.readBytes(buf, 0, toRead, false);
BytesArray content = new BytesArray(buf, 0, toRead);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content),
TransportRequestOptions.options().withCompress(shouldCompressRequest).withType(TransportRequestOptions.Type.RECOVERY).withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
readCount += toRead;
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.recoveryId(), request.shardId(), md, position, content, readCount == len),
TransportRequestOptions.options().withCompress(shouldCompressRequest).withType(TransportRequestOptions.Type.RECOVERY).withTimeout(internalActionTimeout), EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
} catch (Throwable e) {
final CorruptIndexException corruptIndexException;

View File

@ -594,7 +594,7 @@ public class RecoveryTarget extends AbstractComponent {
if (file != null) {
file.updateRecovered(request.length());
}
if (indexOutput.getFilePointer() == request.length()) {
if (indexOutput.getFilePointer() >= request.length() || request.lastChunk()) {
Store.verify(indexOutput);
// we are done
indexOutput.close();

View File

@ -339,7 +339,7 @@ public class CorruptedFileTest extends ElasticsearchIntegrationTest {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
if (truncate && req.length() > 1) {
BytesArray array = new BytesArray(req.content().array(), req.content().arrayOffset(), (int)req.length()-1);
request = new RecoveryFileChunkRequest(req.recoveryId(), req.shardId(), req.metadata(), req.position(), array);
request = new RecoveryFileChunkRequest(req.recoveryId(), req.shardId(), req.metadata(), req.position(), array, req.lastChunk());
} else {
byte[] array = req.content().array();
int i = randomIntBetween(0, req.content().length() - 1);