diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index f0b91e50edb..1fc092690f6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -122,7 +122,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation } } - SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id()) + SourceToParse sourceToParse = SourceToParse.source(indexRequest.underlyingSource(), indexRequest.underlyingSourceOffset(), indexRequest.underlyingSourceLength()).type(indexRequest.type()).id(indexRequest.id()) .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()); long version; Engine.IndexingOperation op; @@ -231,7 +231,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation if (item.request() instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) item.request(); try { - SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id()) + SourceToParse sourceToParse = SourceToParse.source(indexRequest.underlyingSource(), indexRequest.underlyingSourceOffset(), indexRequest.underlyingSourceLength()).type(indexRequest.type()).id(indexRequest.id()) .routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp()); if (indexRequest.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java index f5d0938af75..cb2b6ed41e6 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/IndexRequest.java @@ -299,6 +299,18 @@ public class IndexRequest extends ShardReplicationOperationRequest { return source; } + public byte[] underlyingSource() { + return this.source; + } + + public int underlyingSourceOffset() { + return this.sourceOffset; + } + + public int underlyingSourceLength() { + return this.sourceLength; + } + /** * Index the Map as a {@link org.elasticsearch.client.Requests#INDEX_CONTENT_TYPE}. * diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 731585d5b35..f1f457dc866 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -169,7 +169,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi } IndexShard indexShard = indexShard(shardRequest); - SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id()) + SourceToParse sourceToParse = SourceToParse.source(request.underlyingSource(), request.underlyingSourceOffset(), request.underlyingSourceLength()).type(request.type()).id(request.id()) .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()); long version; Engine.IndexingOperation op; @@ -224,7 +224,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) { IndexShard indexShard = indexShard(shardRequest); IndexRequest request = shardRequest.request; - SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id()) + SourceToParse sourceToParse = SourceToParse.source(request.underlyingSource(), request.underlyingSourceOffset(), request.underlyingSourceLength()).type(request.type()).id(request.id()) .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()); if (request.opType() == IndexRequest.OpType.INDEX) { Engine.Index index = indexShard.prepareIndex(sourceToParse) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/mlt/TransportMoreLikeThisAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/mlt/TransportMoreLikeThisAction.java index c0e690e7b46..10fcedad4c9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/mlt/TransportMoreLikeThisAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/mlt/TransportMoreLikeThisAction.java @@ -208,7 +208,7 @@ public class TransportMoreLikeThisAction extends BaseAction documents, Analyzer analyzer, byte[] source, boolean mappersAdded) { + public ParsedDocument(String uid, String id, String type, String routing, long timestamp, List documents, Analyzer analyzer, byte[] source, int sourceOffset, int sourceLength, boolean mappersAdded) { this.uid = uid; this.id = id; this.type = type; @@ -64,6 +66,8 @@ public class ParsedDocument { this.timestamp = timestamp; this.documents = documents; this.source = source; + this.sourceOffset = sourceOffset; + this.sourceLength = sourceLength; this.analyzer = analyzer; this.mappersAdded = mappersAdded; } @@ -104,6 +108,14 @@ public class ParsedDocument { return this.source; } + public int sourceOffset() { + return this.sourceOffset; + } + + public int sourceLength() { + return this.sourceLength; + } + public ParsedDocument parent(String parent) { this.parent = parent; return this; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java index f20a750ebcd..5c59c0c74b4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/SourceToParse.java @@ -30,11 +30,17 @@ public class SourceToParse { return new SourceToParse(source); } + public static SourceToParse source(byte[] source, int offset, int length) { + return new SourceToParse(source, offset, length); + } + public static SourceToParse source(XContentParser parser) { return new SourceToParse(parser); } private final byte[] source; + private final int sourceOffset; + private final int sourceLength; private final XContentParser parser; @@ -53,10 +59,21 @@ public class SourceToParse { public SourceToParse(XContentParser parser) { this.parser = parser; this.source = null; + this.sourceOffset = 0; + this.sourceLength = 0; } public SourceToParse(byte[] source) { this.source = source; + this.sourceOffset = 0; + this.sourceLength = source.length; + this.parser = null; + } + + public SourceToParse(byte[] source, int offset, int length) { + this.source = source; + this.sourceOffset = offset; + this.sourceLength = length; this.parser = null; } @@ -68,6 +85,14 @@ public class SourceToParse { return this.source; } + public int sourceOffset() { + return this.sourceOffset; + } + + public int sourceLength() { + return this.sourceLength; + } + public String type() { return this.type; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/internal/SizeFieldMapper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/internal/SizeFieldMapper.java index 47d312b99d3..4599c521b02 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/internal/SizeFieldMapper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/internal/SizeFieldMapper.java @@ -131,7 +131,7 @@ public class SizeFieldMapper extends IntegerFieldMapper implements RootMapper { if (!enabled) { return null; } - return new CustomIntegerNumericField(this, context.source().length); + return new CustomIntegerNumericField(this, context.sourceLength()); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/internal/SourceFieldMapper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/internal/SourceFieldMapper.java index 6ffc395e87f..3a8e398078e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/internal/SourceFieldMapper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/internal/SourceFieldMapper.java @@ -26,7 +26,8 @@ import org.elasticsearch.ElasticSearchParseException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.compress.lzf.LZF; import org.elasticsearch.common.compress.lzf.LZFDecoder; -import org.elasticsearch.common.compress.lzf.LZFEncoder; +import org.elasticsearch.common.io.stream.CachedStreamOutput; +import org.elasticsearch.common.io.stream.LZFStreamOutput; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.document.ResetFieldSelector; import org.elasticsearch.common.unit.ByteSizeValue; @@ -178,13 +179,25 @@ public class SourceFieldMapper extends AbstractFieldMapper implements In return null; } byte[] data = context.source(); - if (compress != null && compress && !LZF.isCompressed(data)) { - if (compressThreshold == -1 || data.length > compressThreshold) { - data = LZFEncoder.encode(data, data.length); - context.source(data); + int dataOffset = context.sourceOffset(); + int dataLength = context.sourceLength(); + if (compress != null && compress && !LZF.isCompressed(data, dataOffset, dataLength)) { + if (compressThreshold == -1 || dataLength > compressThreshold) { + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); + LZFStreamOutput streamOutput = cachedEntry.cachedLZFBytes(); + streamOutput.writeBytes(data, dataOffset, dataLength); + streamOutput.flush(); + // we copy over the byte array, since we need to push back the cached entry + // TODO, we we had a handle into when we are done with parsing, then we push back then and not copy over bytes + data = cachedEntry.bytes().copiedByteArray(); + dataOffset = 0; + dataLength = data.length; + CachedStreamOutput.pushEntry(cachedEntry); + // update the data in the context, so it can be compressed and stored compressed outside... + context.source(data, dataOffset, dataLength); } } - return new Field(names().indexName(), data); + return new Field(names().indexName(), data, dataOffset, dataLength); } public byte[] value(Document document) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 9f8aa033b84..211bf17f970 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -532,13 +532,13 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I switch (operation.opType()) { case CREATE: Translog.Create create = (Translog.Create) operation; - engine.create(prepareCreate(source(create.source()).type(create.type()).id(create.id()) + engine.create(prepareCreate(source(create.source(), create.sourceOffset(), create.sourceLength()).type(create.type()).id(create.id()) .routing(create.routing()).parent(create.parent()).timestamp(create.timestamp())).version(create.version()) .origin(Engine.Operation.Origin.RECOVERY)); break; case SAVE: Translog.Index index = (Translog.Index) operation; - engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id()) + engine.index(prepareIndex(source(index.source(), index.sourceOffset(), index.sourceLength()).type(index.type()).id(index.id()) .routing(index.routing()).parent(index.parent()).timestamp(index.timestamp())).version(index.version()) .origin(Engine.Operation.Origin.RECOVERY)); break; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java index 62836280194..1548e7e3d94 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -244,6 +244,8 @@ public interface Translog extends IndexShardComponent { private String id; private String type; private byte[] source; + private int sourceOffset; + private int sourceLength; private String routing; private String parent; private long timestamp; @@ -253,7 +255,11 @@ public interface Translog extends IndexShardComponent { } public Create(Engine.Create create) { - this(create.type(), create.id(), create.source()); + this.id = create.id(); + this.type = create.type(); + this.source = create.source(); + this.sourceOffset = create.sourceOffset(); + this.sourceLength = create.sourceLength(); this.routing = create.routing(); this.parent = create.parent(); this.timestamp = create.timestamp(); @@ -264,6 +270,8 @@ public interface Translog extends IndexShardComponent { this.id = id; this.type = type; this.source = source; + this.sourceOffset = 0; + this.sourceLength = source.length; } @Override public Type opType() { @@ -282,6 +290,14 @@ public interface Translog extends IndexShardComponent { return this.source; } + public int sourceOffset() { + return this.sourceOffset; + } + + public int sourceLength() { + return this.sourceLength; + } + public String type() { return this.type; } @@ -334,7 +350,9 @@ public interface Translog extends IndexShardComponent { int version = in.readVInt(); // version id = in.readUTF(); type = in.readUTF(); - source = new byte[in.readVInt()]; + sourceOffset = 0; + sourceLength = in.readVInt(); + source = new byte[sourceLength]; in.readFully(source); if (version >= 1) { if (in.readBoolean()) { @@ -358,8 +376,8 @@ public interface Translog extends IndexShardComponent { out.writeVInt(4); // version out.writeUTF(id); out.writeUTF(type); - out.writeVInt(source.length); - out.writeBytes(source); + out.writeVInt(sourceLength); + out.writeBytes(source, sourceOffset, sourceLength); if (routing == null) { out.writeBoolean(false); } else { @@ -382,6 +400,8 @@ public interface Translog extends IndexShardComponent { private String type; private long version; private byte[] source; + private int sourceOffset; + private int sourceLength; private String routing; private String parent; private long timestamp; @@ -390,7 +410,11 @@ public interface Translog extends IndexShardComponent { } public Index(Engine.Index index) { - this(index.type(), index.id(), index.source()); + this.id = index.id(); + this.type = index.type(); + this.source = index.source(); + this.sourceOffset = index.sourceOffset(); + this.sourceLength = index.sourceLength(); this.routing = index.routing(); this.parent = index.parent(); this.version = index.version(); @@ -401,6 +425,8 @@ public interface Translog extends IndexShardComponent { this.type = type; this.id = id; this.source = source; + this.sourceOffset = 0; + this.sourceLength = source.length; } @Override public Type opType() { @@ -435,6 +461,14 @@ public interface Translog extends IndexShardComponent { return this.source; } + public int sourceOffset() { + return this.sourceOffset; + } + + public int sourceLength() { + return this.sourceLength; + } + public long version() { return this.version; } @@ -471,7 +505,9 @@ public interface Translog extends IndexShardComponent { int version = in.readVInt(); // version id = in.readUTF(); type = in.readUTF(); - source = new byte[in.readVInt()]; + sourceOffset = 0; + sourceLength = in.readVInt(); + source = new byte[sourceLength]; in.readFully(source); if (version >= 1) { if (in.readBoolean()) { @@ -495,8 +531,8 @@ public interface Translog extends IndexShardComponent { out.writeVInt(4); // version out.writeUTF(id); out.writeUTF(type); - out.writeVInt(source.length); - out.writeBytes(source); + out.writeVInt(sourceLength); + out.writeBytes(source, sourceOffset, sourceLength); if (routing == null) { out.writeBoolean(false); } else {