when compression is enabled, use the compressed byte buffer to write it to the translog (less data to write), and handle recovery by being able to parse also compressed source
This commit is contained in:
parent
f2eae5b605
commit
49deb80d03
|
@ -39,11 +39,6 @@ public interface SourceFieldMapper extends FieldMapper<byte[]>, InternalMapper {
|
|||
*/
|
||||
boolean enabled();
|
||||
|
||||
/**
|
||||
* Is the source field compressed or not?
|
||||
*/
|
||||
boolean compressed();
|
||||
|
||||
/**
|
||||
* Returns the native source value, if its compressed, then the compressed value is returned.
|
||||
*/
|
||||
|
|
|
@ -121,6 +121,11 @@ public class ParseContext {
|
|||
return this.source;
|
||||
}
|
||||
|
||||
// only should be used by SourceFieldMapper to update with a compressed source
|
||||
public void source(byte[] source) {
|
||||
this.source = source;
|
||||
}
|
||||
|
||||
public ContentPath path() {
|
||||
return this.path;
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements or
|
|||
|
||||
private boolean enabled = Defaults.ENABLED;
|
||||
|
||||
private long compressThreshold = -1;
|
||||
private long compressThreshold = Defaults.COMPRESS_THRESHOLD;
|
||||
|
||||
private Boolean compress = null;
|
||||
|
||||
|
@ -95,7 +95,7 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements or
|
|||
super(new Names(name, name, name, name), Defaults.INDEX, Defaults.STORE, Defaults.TERM_VECTOR, Defaults.BOOST,
|
||||
Defaults.OMIT_NORMS, Defaults.OMIT_TERM_FREQ_AND_POSITIONS, Lucene.KEYWORD_ANALYZER, Lucene.KEYWORD_ANALYZER);
|
||||
this.enabled = enabled;
|
||||
this.compress = compress;
|
||||
this.compress = true;
|
||||
this.compressThreshold = compressThreshold;
|
||||
this.fieldSelector = new SourceFieldSelector(names.indexName());
|
||||
}
|
||||
|
@ -104,10 +104,6 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements or
|
|||
return this.enabled;
|
||||
}
|
||||
|
||||
@Override public boolean compressed() {
|
||||
return compress != null && compress;
|
||||
}
|
||||
|
||||
public FieldSelector fieldSelector() {
|
||||
return this.fieldSelector;
|
||||
}
|
||||
|
@ -117,9 +113,10 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements or
|
|||
return null;
|
||||
}
|
||||
byte[] data = context.source();
|
||||
if (compress != null && compress) {
|
||||
if (compress != null && compress && !LZFDecoder.isCompressed(data)) {
|
||||
if (compressThreshold == -1 || data.length > compressThreshold) {
|
||||
data = LZFEncoder.encodeWithCache(data, data.length);
|
||||
context.source(data);
|
||||
}
|
||||
}
|
||||
return new Field(names.indexName(), data, store);
|
||||
|
|
|
@ -25,6 +25,10 @@ import org.apache.lucene.search.Filter;
|
|||
import org.elasticsearch.common.Preconditions;
|
||||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.compress.CompressedString;
|
||||
import org.elasticsearch.common.compress.lzf.LZFDecoder;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamInput;
|
||||
import org.elasticsearch.common.io.stream.CachedStreamInput;
|
||||
import org.elasticsearch.common.io.stream.LZFStreamInput;
|
||||
import org.elasticsearch.common.lucene.search.TermFilter;
|
||||
import org.elasticsearch.common.thread.ThreadLocals;
|
||||
import org.elasticsearch.common.xcontent.*;
|
||||
|
@ -374,7 +378,15 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
|
|||
|
||||
XContentParser parser = null;
|
||||
try {
|
||||
parser = XContentFactory.xContent(source.source()).createParser(source.source());
|
||||
if (LZFDecoder.isCompressed(source.source())) {
|
||||
BytesStreamInput siBytes = new BytesStreamInput(source.source());
|
||||
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
|
||||
XContentType contentType = XContentFactory.xContentType(siLzf);
|
||||
siLzf.resetToBufferStart();
|
||||
parser = XContentFactory.xContent(contentType).createParser(siLzf);
|
||||
} else {
|
||||
parser = XContentFactory.xContent(source.source()).createParser(source.source());
|
||||
}
|
||||
context.reset(parser, new Document(), type, source.source(), listener);
|
||||
|
||||
// will result in START_OBJECT
|
||||
|
@ -442,7 +454,7 @@ public class XContentDocumentMapper implements DocumentMapper, ToXContent {
|
|||
}
|
||||
}
|
||||
ParsedDocument doc = new ParsedDocument(context.uid(), context.id(), context.type(), source.routing(), context.doc(), context.analyzer(),
|
||||
source.source(), context.mappersAdded()).parent(source.parent());
|
||||
context.source(), context.mappersAdded()).parent(source.parent());
|
||||
// reset the context to free up memory
|
||||
context.reset(null, null, null, null, null);
|
||||
return doc;
|
||||
|
|
Loading…
Reference in New Issue