diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java index 4ae71bee5c6..19c58c31398 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.index; +import org.apache.lucene.util.UnicodeUtil; import org.elasticsearch.ElasticSearchGenerationException; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.action.ActionRequestValidationException; @@ -34,6 +35,7 @@ import org.elasticsearch.util.xcontent.builder.BinaryXContentBuilder; import org.elasticsearch.util.xcontent.builder.XContentBuilder; import java.io.IOException; +import java.util.Arrays; import java.util.Map; import static org.elasticsearch.action.Actions.*; @@ -102,10 +104,13 @@ public class IndexRequest extends ShardReplicationOperationRequest { private String type; private String id; - private byte[] source; - private OpType opType = OpType.INDEX; - private transient XContentBuilder sourceBuilder; + private byte[] source; + private int sourceOffset; + private int sourceLength; + private boolean sourceUnsafe; + + private OpType opType = OpType.INDEX; public IndexRequest() { } @@ -138,12 +143,19 @@ public class IndexRequest extends ShardReplicationOperationRequest { if (type == null) { validationException = addValidationError("type is missing", validationException); } - if (source == null && sourceBuilder == null) { + if (source == null) { validationException = addValidationError("source is missing", validationException); } return validationException; } + /** + * Before we fork on a local thread, make sure we copy over the bytes if they are unsafe + */ + @Override protected void beforeLocalFork() { + source(); + } + /** * Sets the index the index operation will happen on. */ @@ -203,12 +215,9 @@ public class IndexRequest extends ShardReplicationOperationRequest { * The source of the JSON document to index. */ byte[] source() { - if (source == null && sourceBuilder != null) { - try { - source = sourceBuilder.copiedBytes(); - } catch (IOException e) { - throw new ElasticSearchGenerationException("Failed to build source", e); - } + if (sourceUnsafe) { + source = Arrays.copyOfRange(source, sourceOffset, sourceLength); + sourceUnsafe = false; } return source; } @@ -223,7 +232,7 @@ public class IndexRequest extends ShardReplicationOperationRequest { } /** - * Writes the JSON as the provided content type. + * Writes the Map as the provided content type. * * @param source The map to index */ @@ -231,21 +240,24 @@ public class IndexRequest extends ShardReplicationOperationRequest { try { BinaryXContentBuilder builder = XContentFactory.contentBinaryBuilder(contentType); builder.map(source); - this.source = builder.copiedBytes(); + return source(builder); } catch (IOException e) { throw new ElasticSearchGenerationException("Failed to generate [" + source + "]", e); } - return this; } /** - * Sets the JSON source to index. + * Sets the document source to index. * *

Note, its preferable to either set it using {@link #source(org.elasticsearch.util.xcontent.builder.XContentBuilder)} * or using the {@link #source(byte[])}. */ @Required public IndexRequest source(String source) { - this.source = Unicode.fromStringAsBytes(source); + UnicodeUtil.UTF8Result result = Unicode.fromStringAsUtf8(source); + this.source = result.result; + this.sourceOffset = 0; + this.sourceLength = result.length; + this.sourceUnsafe = true; return this; } @@ -253,15 +265,51 @@ public class IndexRequest extends ShardReplicationOperationRequest { * Sets the content source to index. */ @Required public IndexRequest source(XContentBuilder sourceBuilder) { - this.sourceBuilder = sourceBuilder; + try { + source = sourceBuilder.unsafeBytes(); + sourceOffset = 0; + sourceLength = sourceBuilder.unsafeBytesLength(); + sourceUnsafe = true; + } catch (IOException e) { + throw new ElasticSearchGenerationException("Failed to generate [" + sourceBuilder + "]", e); + } return this; } /** - * Sets the JSON source to index. + * Sets the document to index in bytes form. */ - @Required public IndexRequest source(byte[] source) { + public IndexRequest source(byte[] source) { + return source(source, 0, source.length); + } + + /** + * Sets the document to index in bytes form (assumed to be safe to be used from different + * threads). + * + * @param source The source to index + * @param offset The offset in the byte array + * @param length The length of the data + * @return + */ + @Required public IndexRequest source(byte[] source, int offset, int length) { + return source(source, offset, length, false); + } + + /** + * Sets the document to index in bytes form. + * + * @param source The source to index + * @param offset The offset in the byte array + * @param length The length of the data + * @param unsafe Is the byte array safe to be used form a different thread + * @return + */ + @Required public IndexRequest source(byte[] source, int offset, int length, boolean unsafe) { this.source = source; + this.sourceOffset = offset; + this.sourceLength = length; + this.sourceUnsafe = unsafe; return this; } @@ -319,8 +367,13 @@ public class IndexRequest extends ShardReplicationOperationRequest { if (in.readBoolean()) { id = in.readUTF(); } - source = new byte[in.readVInt()]; + + sourceUnsafe = false; + sourceOffset = 0; + sourceLength = in.readVInt(); + source = new byte[sourceLength]; in.readFully(source); + opType = OpType.fromId(in.readByte()); } @@ -333,17 +386,12 @@ public class IndexRequest extends ShardReplicationOperationRequest { out.writeBoolean(true); out.writeUTF(id); } - if (source != null) { - out.writeVInt(source.length); - out.writeBytes(source); - } else { - out.writeVInt(sourceBuilder.unsafeBytesLength()); - out.writeBytes(sourceBuilder.unsafeBytes(), 0, sourceBuilder.unsafeBytesLength()); - } + out.writeVInt(sourceLength); + out.writeBytes(source, sourceOffset, sourceLength); out.writeByte(opType.id()); } @Override public String toString() { - return "[" + index + "][" + type + "][" + id + "], source[" + Unicode.fromBytes(source) + "]"; + return "[" + index + "][" + type + "][" + id + "], source[" + Unicode.fromBytes(source, sourceOffset, sourceLength) + "]"; } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java index 86fa6102842..355ddb1b908 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/ShardReplicationOperationRequest.java @@ -107,4 +107,11 @@ public abstract class ShardReplicationOperationRequest implements ActionRequest timeout.writeTo(out); out.writeUTF(index); } + + /** + * Called before the request gets forked into a local thread. + */ + protected void beforeLocalFork() { + + } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index 307c55ba8d3..3066b050963 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -246,6 +246,7 @@ public abstract class TransportShardReplicationOperationAction