recovery chunk to use bytes ref
This commit is contained in:
parent
cf73e18146
commit
c02dc8f4f8
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.indices.recovery;
|
||||
|
||||
import org.elasticsearch.common.BytesHolder;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -37,20 +38,18 @@ class RecoveryFileChunkRequest implements Streamable {
|
|||
private long position;
|
||||
private long length;
|
||||
private String checksum;
|
||||
private byte[] content;
|
||||
private int contentLength;
|
||||
private BytesHolder content;
|
||||
|
||||
RecoveryFileChunkRequest() {
|
||||
}
|
||||
|
||||
RecoveryFileChunkRequest(ShardId shardId, String name, long position, long length, String checksum, byte[] content, int contentLength) {
|
||||
RecoveryFileChunkRequest(ShardId shardId, String name, long position, long length, String checksum, BytesHolder content) {
|
||||
this.shardId = shardId;
|
||||
this.name = name;
|
||||
this.position = position;
|
||||
this.length = length;
|
||||
this.checksum = checksum;
|
||||
this.content = content;
|
||||
this.contentLength = contentLength;
|
||||
}
|
||||
|
||||
public ShardId shardId() {
|
||||
|
@ -74,14 +73,10 @@ class RecoveryFileChunkRequest implements Streamable {
|
|||
return length;
|
||||
}
|
||||
|
||||
public byte[] content() {
|
||||
public BytesHolder content() {
|
||||
return content;
|
||||
}
|
||||
|
||||
public int contentLength() {
|
||||
return contentLength;
|
||||
}
|
||||
|
||||
public RecoveryFileChunkRequest readFileChunk(StreamInput in) throws IOException {
|
||||
RecoveryFileChunkRequest request = new RecoveryFileChunkRequest();
|
||||
request.readFrom(in);
|
||||
|
@ -97,9 +92,7 @@ class RecoveryFileChunkRequest implements Streamable {
|
|||
if (in.readBoolean()) {
|
||||
checksum = in.readUTF();
|
||||
}
|
||||
contentLength = in.readVInt();
|
||||
content = new byte[contentLength];
|
||||
in.readFully(content);
|
||||
content = in.readBytesReference();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -114,8 +107,7 @@ class RecoveryFileChunkRequest implements Streamable {
|
|||
out.writeBoolean(true);
|
||||
out.writeUTF(checksum);
|
||||
}
|
||||
out.writeVInt(contentLength);
|
||||
out.writeBytes(content, 0, contentLength);
|
||||
out.writeBytesHolder(content);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
|
|||
import com.google.common.collect.Sets;
|
||||
import org.apache.lucene.store.IndexInput;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.common.BytesHolder;
|
||||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -148,7 +149,8 @@ public class RecoverySource extends AbstractComponent {
|
|||
}
|
||||
|
||||
indexInput.readBytes(buf, 0, toRead, false);
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), buf, toRead),
|
||||
BytesHolder content = new BytesHolder(buf, 0, toRead);
|
||||
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), content),
|
||||
TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
|
||||
readCount += toRead;
|
||||
}
|
||||
|
|
|
@ -59,8 +59,6 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
|
|||
* <p/>
|
||||
* <p>Note, it can be safely assumed that there will only be a single recovery per shard (index+id) and
|
||||
* not several of them (since we don't allocate several shard replicas to the same node).
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class RecoveryTarget extends AbstractComponent {
|
||||
|
||||
|
@ -534,10 +532,10 @@ public class RecoveryTarget extends AbstractComponent {
|
|||
synchronized (indexOutput) {
|
||||
try {
|
||||
if (recoverySettings.rateLimiter() != null) {
|
||||
recoverySettings.rateLimiter().pause(request.contentLength());
|
||||
recoverySettings.rateLimiter().pause(request.content().length());
|
||||
}
|
||||
indexOutput.writeBytes(request.content(), request.contentLength());
|
||||
onGoingRecovery.currentFilesSize.addAndGet(request.contentLength());
|
||||
indexOutput.writeBytes(request.content().bytes(), request.content().offset(), request.content().length());
|
||||
onGoingRecovery.currentFilesSize.addAndGet(request.length());
|
||||
if (indexOutput.getFilePointer() == request.length()) {
|
||||
// we are done
|
||||
indexOutput.close();
|
||||
|
|
Loading…
Reference in New Issue