add offset and length all the way to the source parsing level

This commit is contained in:
Shay Banon 2011-08-31 22:29:04 +03:00
parent bc1dd108d1
commit 3b9da384c3
13 changed files with 157 additions and 29 deletions

View File

@ -122,7 +122,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
}
}
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
SourceToParse sourceToParse = SourceToParse.source(indexRequest.underlyingSource(), indexRequest.underlyingSourceOffset(), indexRequest.underlyingSourceLength()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp());
long version;
Engine.IndexingOperation op;
@ -231,7 +231,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (item.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) item.request();
try {
SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id())
SourceToParse sourceToParse = SourceToParse.source(indexRequest.underlyingSource(), indexRequest.underlyingSourceOffset(), indexRequest.underlyingSourceLength()).type(indexRequest.type()).id(indexRequest.id())
.routing(indexRequest.routing()).parent(indexRequest.parent()).timestamp(indexRequest.timestamp());
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse).version(indexRequest.version()).origin(Engine.Operation.Origin.REPLICA);

View File

@ -299,6 +299,18 @@ public class IndexRequest extends ShardReplicationOperationRequest {
return source;
}
public byte[] underlyingSource() {
return this.source;
}
public int underlyingSourceOffset() {
return this.sourceOffset;
}
public int underlyingSourceLength() {
return this.sourceLength;
}
/**
* Index the Map as a {@link org.elasticsearch.client.Requests#INDEX_CONTENT_TYPE}.
*

View File

@ -169,7 +169,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
}
IndexShard indexShard = indexShard(shardRequest);
SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id())
SourceToParse sourceToParse = SourceToParse.source(request.underlyingSource(), request.underlyingSourceOffset(), request.underlyingSourceLength()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp());
long version;
Engine.IndexingOperation op;
@ -224,7 +224,7 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
@Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
IndexShard indexShard = indexShard(shardRequest);
IndexRequest request = shardRequest.request;
SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id())
SourceToParse sourceToParse = SourceToParse.source(request.underlyingSource(), request.underlyingSourceOffset(), request.underlyingSourceLength()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp());
if (request.opType() == IndexRequest.OpType.INDEX) {
Engine.Index index = indexShard.prepareIndex(sourceToParse)

View File

@ -208,7 +208,7 @@ public class TransportMoreLikeThisAction extends BaseAction<MoreLikeThisRequest,
if (getResponse.source() == null) {
return;
}
docMapper.parse(SourceToParse.source(getResponse.source()).type(request.type()).id(request.id()), new DocumentMapper.ParseListenerAdapter() {
docMapper.parse(SourceToParse.source(getResponse.sourceRef().bytes(), getResponse.sourceRef().offset(), getResponse.sourceRef().length()).type(request.type()).id(request.id()), new DocumentMapper.ParseListenerAdapter() {
@Override public boolean beforeFieldAdded(FieldMapper fieldMapper, Fieldable field, Object parseContext) {
if (fieldMapper instanceof InternalMapper) {
return true;

View File

@ -400,6 +400,14 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.doc.source();
}
public int sourceOffset() {
return this.doc.sourceOffset();
}
public int sourceLength() {
return this.doc.sourceLength();
}
public UidField uidField() {
return (UidField) doc.rootDoc().getFieldable(UidFieldMapper.NAME);
}
@ -522,6 +530,14 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
return this.doc.source();
}
public int sourceOffset() {
return this.doc.sourceOffset();
}
public int sourceLength() {
return this.doc.sourceLength();
}
public UidField uidField() {
return (UidField) doc.rootDoc().getFieldable(UidFieldMapper.NAME);
}

View File

@ -415,14 +415,14 @@ public class DocumentMapper implements ToXContent {
XContentParser parser = source.parser();
try {
if (parser == null) {
if (LZF.isCompressed(source.source())) {
BytesStreamInput siBytes = new BytesStreamInput(source.source());
if (LZF.isCompressed(source.source(), source.sourceOffset(), source.sourceLength())) {
BytesStreamInput siBytes = new BytesStreamInput(source.source(), source.sourceOffset(), source.sourceLength());
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
XContentType contentType = XContentFactory.xContentType(siLzf);
siLzf.resetToBufferStart();
parser = XContentFactory.xContent(contentType).createParser(siLzf);
} else {
parser = XContentFactory.xContent(source.source()).createParser(source.source());
parser = XContentFactory.xContent(source.source(), source.sourceOffset(), source.sourceLength()).createParser(source.source(), source.sourceOffset(), source.sourceLength());
}
}
context.reset(parser, new Document(), source, listener);
@ -487,7 +487,7 @@ public class DocumentMapper implements ToXContent {
Collections.reverse(context.docs());
}
ParsedDocument doc = new ParsedDocument(context.uid(), context.id(), context.type(), source.routing(), source.timestamp(), context.docs(), context.analyzer(),
context.source(), context.mappersAdded()).parent(source.parent());
context.source(), context.sourceOffset(), context.sourceLength(), context.mappersAdded()).parent(source.parent());
// reset the context to free up memory
context.reset(null, null, null, null);
return doc;

View File

@ -57,6 +57,8 @@ public class ParseContext {
private SourceToParse sourceToParse;
private byte[] source;
private int sourceOffset;
private int sourceLength;
private String id;
@ -97,6 +99,8 @@ public class ParseContext {
this.id = null;
this.sourceToParse = source;
this.source = source == null ? null : sourceToParse.source();
this.sourceOffset = source == null ? 0 : sourceToParse.sourceOffset();
this.sourceLength = source == null ? 0 : sourceToParse.sourceLength();
this.path.reset();
this.mappersAdded = false;
this.listener = listener == null ? DocumentMapper.ParseListener.EMPTY : listener;
@ -136,9 +140,19 @@ public class ParseContext {
return source;
}
public int sourceOffset() {
return this.sourceOffset;
}
public int sourceLength() {
return this.sourceLength;
}
// only should be used by SourceFieldMapper to update with a compressed source
public void source(byte[] source) {
public void source(byte[] source, int offset, int length) {
this.source = source;
this.sourceOffset = offset;
this.sourceLength = length;
}
public ContentPath path() {

View File

@ -47,16 +47,18 @@ public class ParsedDocument {
private final Analyzer analyzer;
private final byte[] source;
private final int sourceOffset;
private final int sourceLength;
private boolean mappersAdded;
private String parent;
public ParsedDocument(String uid, String id, String type, String routing, long timestamp, Document document, Analyzer analyzer, byte[] source, boolean mappersAdded) {
this(uid, id, type, routing, timestamp, Arrays.asList(document), analyzer, source, mappersAdded);
this(uid, id, type, routing, timestamp, Arrays.asList(document), analyzer, source, 0, source.length, mappersAdded);
}
public ParsedDocument(String uid, String id, String type, String routing, long timestamp, List<Document> documents, Analyzer analyzer, byte[] source, boolean mappersAdded) {
public ParsedDocument(String uid, String id, String type, String routing, long timestamp, List<Document> documents, Analyzer analyzer, byte[] source, int sourceOffset, int sourceLength, boolean mappersAdded) {
this.uid = uid;
this.id = id;
this.type = type;
@ -64,6 +66,8 @@ public class ParsedDocument {
this.timestamp = timestamp;
this.documents = documents;
this.source = source;
this.sourceOffset = sourceOffset;
this.sourceLength = sourceLength;
this.analyzer = analyzer;
this.mappersAdded = mappersAdded;
}
@ -104,6 +108,14 @@ public class ParsedDocument {
return this.source;
}
public int sourceOffset() {
return this.sourceOffset;
}
public int sourceLength() {
return this.sourceLength;
}
public ParsedDocument parent(String parent) {
this.parent = parent;
return this;

View File

@ -30,11 +30,17 @@ public class SourceToParse {
return new SourceToParse(source);
}
public static SourceToParse source(byte[] source, int offset, int length) {
return new SourceToParse(source, offset, length);
}
public static SourceToParse source(XContentParser parser) {
return new SourceToParse(parser);
}
private final byte[] source;
private final int sourceOffset;
private final int sourceLength;
private final XContentParser parser;
@ -53,10 +59,21 @@ public class SourceToParse {
public SourceToParse(XContentParser parser) {
this.parser = parser;
this.source = null;
this.sourceOffset = 0;
this.sourceLength = 0;
}
public SourceToParse(byte[] source) {
this.source = source;
this.sourceOffset = 0;
this.sourceLength = source.length;
this.parser = null;
}
public SourceToParse(byte[] source, int offset, int length) {
this.source = source;
this.sourceOffset = offset;
this.sourceLength = length;
this.parser = null;
}
@ -68,6 +85,14 @@ public class SourceToParse {
return this.source;
}
public int sourceOffset() {
return this.sourceOffset;
}
public int sourceLength() {
return this.sourceLength;
}
public String type() {
return this.type;
}

View File

@ -131,7 +131,7 @@ public class SizeFieldMapper extends IntegerFieldMapper implements RootMapper {
if (!enabled) {
return null;
}
return new CustomIntegerNumericField(this, context.source().length);
return new CustomIntegerNumericField(this, context.sourceLength());
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {

View File

@ -26,7 +26,8 @@ import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.compress.lzf.LZFEncoder;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.LZFStreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.document.ResetFieldSelector;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -178,13 +179,25 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
return null;
}
byte[] data = context.source();
if (compress != null && compress && !LZF.isCompressed(data)) {
if (compressThreshold == -1 || data.length > compressThreshold) {
data = LZFEncoder.encode(data, data.length);
context.source(data);
int dataOffset = context.sourceOffset();
int dataLength = context.sourceLength();
if (compress != null && compress && !LZF.isCompressed(data, dataOffset, dataLength)) {
if (compressThreshold == -1 || dataLength > compressThreshold) {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
LZFStreamOutput streamOutput = cachedEntry.cachedLZFBytes();
streamOutput.writeBytes(data, dataOffset, dataLength);
streamOutput.flush();
// we copy over the byte array, since we need to push back the cached entry
// TODO, we we had a handle into when we are done with parsing, then we push back then and not copy over bytes
data = cachedEntry.bytes().copiedByteArray();
dataOffset = 0;
dataLength = data.length;
CachedStreamOutput.pushEntry(cachedEntry);
// update the data in the context, so it can be compressed and stored compressed outside...
context.source(data, dataOffset, dataLength);
}
}
return new Field(names().indexName(), data);
return new Field(names().indexName(), data, dataOffset, dataLength);
}
public byte[] value(Document document) {

View File

@ -532,13 +532,13 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
switch (operation.opType()) {
case CREATE:
Translog.Create create = (Translog.Create) operation;
engine.create(prepareCreate(source(create.source()).type(create.type()).id(create.id())
engine.create(prepareCreate(source(create.source(), create.sourceOffset(), create.sourceLength()).type(create.type()).id(create.id())
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp())).version(create.version())
.origin(Engine.Operation.Origin.RECOVERY));
break;
case SAVE:
Translog.Index index = (Translog.Index) operation;
engine.index(prepareIndex(source(index.source()).type(index.type()).id(index.id())
engine.index(prepareIndex(source(index.source(), index.sourceOffset(), index.sourceLength()).type(index.type()).id(index.id())
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp())).version(index.version())
.origin(Engine.Operation.Origin.RECOVERY));
break;

View File

@ -244,6 +244,8 @@ public interface Translog extends IndexShardComponent {
private String id;
private String type;
private byte[] source;
private int sourceOffset;
private int sourceLength;
private String routing;
private String parent;
private long timestamp;
@ -253,7 +255,11 @@ public interface Translog extends IndexShardComponent {
}
public Create(Engine.Create create) {
this(create.type(), create.id(), create.source());
this.id = create.id();
this.type = create.type();
this.source = create.source();
this.sourceOffset = create.sourceOffset();
this.sourceLength = create.sourceLength();
this.routing = create.routing();
this.parent = create.parent();
this.timestamp = create.timestamp();
@ -264,6 +270,8 @@ public interface Translog extends IndexShardComponent {
this.id = id;
this.type = type;
this.source = source;
this.sourceOffset = 0;
this.sourceLength = source.length;
}
@Override public Type opType() {
@ -282,6 +290,14 @@ public interface Translog extends IndexShardComponent {
return this.source;
}
public int sourceOffset() {
return this.sourceOffset;
}
public int sourceLength() {
return this.sourceLength;
}
public String type() {
return this.type;
}
@ -334,7 +350,9 @@ public interface Translog extends IndexShardComponent {
int version = in.readVInt(); // version
id = in.readUTF();
type = in.readUTF();
source = new byte[in.readVInt()];
sourceOffset = 0;
sourceLength = in.readVInt();
source = new byte[sourceLength];
in.readFully(source);
if (version >= 1) {
if (in.readBoolean()) {
@ -358,8 +376,8 @@ public interface Translog extends IndexShardComponent {
out.writeVInt(4); // version
out.writeUTF(id);
out.writeUTF(type);
out.writeVInt(source.length);
out.writeBytes(source);
out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength);
if (routing == null) {
out.writeBoolean(false);
} else {
@ -382,6 +400,8 @@ public interface Translog extends IndexShardComponent {
private String type;
private long version;
private byte[] source;
private int sourceOffset;
private int sourceLength;
private String routing;
private String parent;
private long timestamp;
@ -390,7 +410,11 @@ public interface Translog extends IndexShardComponent {
}
public Index(Engine.Index index) {
this(index.type(), index.id(), index.source());
this.id = index.id();
this.type = index.type();
this.source = index.source();
this.sourceOffset = index.sourceOffset();
this.sourceLength = index.sourceLength();
this.routing = index.routing();
this.parent = index.parent();
this.version = index.version();
@ -401,6 +425,8 @@ public interface Translog extends IndexShardComponent {
this.type = type;
this.id = id;
this.source = source;
this.sourceOffset = 0;
this.sourceLength = source.length;
}
@Override public Type opType() {
@ -435,6 +461,14 @@ public interface Translog extends IndexShardComponent {
return this.source;
}
public int sourceOffset() {
return this.sourceOffset;
}
public int sourceLength() {
return this.sourceLength;
}
public long version() {
return this.version;
}
@ -471,7 +505,9 @@ public interface Translog extends IndexShardComponent {
int version = in.readVInt(); // version
id = in.readUTF();
type = in.readUTF();
source = new byte[in.readVInt()];
sourceOffset = 0;
sourceLength = in.readVInt();
source = new byte[sourceLength];
in.readFully(source);
if (version >= 1) {
if (in.readBoolean()) {
@ -495,8 +531,8 @@ public interface Translog extends IndexShardComponent {
out.writeVInt(4); // version
out.writeUTF(id);
out.writeUTF(type);
out.writeVInt(source.length);
out.writeBytes(source);
out.writeVInt(sourceLength);
out.writeBytes(source, sourceOffset, sourceLength);
if (routing == null) {
out.writeBoolean(false);
} else {