* Add support for LZF compression on binary fields.

This commit is contained in:
Jeremie BORDIER 2011-10-25 16:41:14 +02:00 committed by Shay Banon
parent 4bbf29834e
commit 28c9595af3
1 changed files with 107 additions and 12 deletions

View File

@ -19,19 +19,29 @@
package org.elasticsearch.index.mapper.core; package org.elasticsearch.index.mapper.core;
import org.apache.lucene.document.Field; import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeBooleanValue;
import org.apache.lucene.document.Fieldable; import static org.elasticsearch.index.mapper.MapperBuilders.*;
import org.elasticsearch.common.xcontent.XContentBuilder; import static org.elasticsearch.index.mapper.core.TypeParsers.*;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.ParseContext;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.index.mapper.MapperBuilders.*; import org.apache.lucene.document.Field;
import static org.elasticsearch.index.mapper.core.TypeParsers.*; import org.apache.lucene.document.Fieldable;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.lzf.LZF;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.io.stream.CachedStreamOutput;
import org.elasticsearch.common.io.stream.LZFStreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.Mapper;
import org.elasticsearch.index.mapper.MapperParsingException;
import org.elasticsearch.index.mapper.MergeContext;
import org.elasticsearch.index.mapper.MergeMappingException;
import org.elasticsearch.index.mapper.ParseContext;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
@ -40,19 +50,37 @@ public class BinaryFieldMapper extends AbstractFieldMapper<byte[]> {
public static final String CONTENT_TYPE = "binary"; public static final String CONTENT_TYPE = "binary";
public static class Defaults extends AbstractFieldMapper.Defaults {
public static final long COMPRESS_THRESHOLD = -1;
}
public static class Builder extends AbstractFieldMapper.Builder<Builder, BinaryFieldMapper> { public static class Builder extends AbstractFieldMapper.Builder<Builder, BinaryFieldMapper> {
private Boolean compress = null;
private long compressThreshold = Defaults.COMPRESS_THRESHOLD;
public Builder(String name) { public Builder(String name) {
super(name); super(name);
builder = this; builder = this;
} }
public Builder compress(boolean compress) {
this.compress = compress;
return this;
}
public Builder compressThreshold(long compressThreshold) {
this.compressThreshold = compressThreshold;
return this;
}
@Override public Builder indexName(String indexName) { @Override public Builder indexName(String indexName) {
return super.indexName(indexName); return super.indexName(indexName);
} }
@Override public BinaryFieldMapper build(BuilderContext context) { @Override public BinaryFieldMapper build(BuilderContext context) {
return new BinaryFieldMapper(buildNames(context)); return new BinaryFieldMapper(buildNames(context), compress, compressThreshold);
} }
} }
@ -60,16 +88,50 @@ public class BinaryFieldMapper extends AbstractFieldMapper<byte[]> {
@Override public Mapper.Builder parse(String name, Map<String, Object> node, ParserContext parserContext) throws MapperParsingException { @Override public Mapper.Builder parse(String name, Map<String, Object> node, ParserContext parserContext) throws MapperParsingException {
BinaryFieldMapper.Builder builder = binaryField(name); BinaryFieldMapper.Builder builder = binaryField(name);
parseField(builder, name, node, parserContext); parseField(builder, name, node, parserContext);
for (Map.Entry<String, Object> entry : node.entrySet()) {
String fieldName = Strings.toUnderscoreCase(entry.getKey());
Object fieldNode = entry.getValue();
if (fieldName.equals("compress") && fieldNode != null) {
builder.compress(nodeBooleanValue(fieldNode));
} else if (fieldName.equals("compress_threshold") && fieldNode != null) {
if (fieldNode instanceof Number) {
builder.compressThreshold(((Number) fieldNode).longValue());
builder.compress(true);
} else {
builder.compressThreshold(ByteSizeValue.parseBytesSizeValue(fieldNode.toString()).bytes());
builder.compress(true);
}
}
}
return builder; return builder;
} }
} }
protected BinaryFieldMapper(Names names) { private Boolean compress;
private long compressThreshold;
protected BinaryFieldMapper(Names names, Boolean compress, long compressThreshold) {
super(names, Field.Index.NO, Field.Store.YES, Field.TermVector.NO, 1.0f, true, true, null, null); super(names, Field.Index.NO, Field.Store.YES, Field.TermVector.NO, 1.0f, true, true, null, null);
this.compress = compress;
this.compressThreshold = compressThreshold;
}
@Override
public Object valueForSearch(Fieldable field) {
return value(field);
} }
@Override public byte[] value(Fieldable field) { @Override public byte[] value(Fieldable field) {
return field.getBinaryValue(); byte[] value = field.getBinaryValue();
if (value != null && LZF.isCompressed(value)) {
try {
return LZFDecoder.decode(value);
} catch (IOException e) {
throw new ElasticSearchParseException("failed to decompress source", e);
}
}
return value;
} }
@Override public byte[] valueFromString(String value) { @Override public byte[] valueFromString(String value) {
@ -90,6 +152,20 @@ public class BinaryFieldMapper extends AbstractFieldMapper<byte[]> {
return null; return null;
} else { } else {
value = context.parser().binaryValue(); value = context.parser().binaryValue();
if (compress != null && compress && !LZF.isCompressed(value, 0, value.length)) {
if (compressThreshold == -1 || value.length > compressThreshold) {
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
LZFStreamOutput streamOutput = cachedEntry.cachedLZFBytes();
streamOutput.writeBytes(value, 0, value.length);
streamOutput.flush();
// we copy over the byte array, since we need to push back the cached entry
// TODO, we we had a handle into when we are done with parsing, then we push back then and not copy over bytes
value = cachedEntry.bytes().copiedByteArray();
CachedStreamOutput.pushEntry(cachedEntry);
// update the data in the context, so it can be compressed and stored compressed outside...
context.source(value, 0, value.length);
}
}
} }
if (value == null) { if (value == null) {
return null; return null;
@ -107,7 +183,26 @@ public class BinaryFieldMapper extends AbstractFieldMapper<byte[]> {
if (!names.name().equals(names.indexNameClean())) { if (!names.name().equals(names.indexNameClean())) {
builder.field("index_name", names.indexNameClean()); builder.field("index_name", names.indexNameClean());
} }
if (compress != null) {
builder.field("compress", compress);
}
if (compressThreshold != -1) {
builder.field("compress_threshold", new ByteSizeValue(compressThreshold).toString());
}
builder.endObject(); builder.endObject();
return builder; return builder;
} }
@Override public void merge(Mapper mergeWith, MergeContext mergeContext) throws MergeMappingException {
BinaryFieldMapper sourceMergeWith = (BinaryFieldMapper) mergeWith;
if (!mergeContext.mergeFlags().simulate()) {
if (sourceMergeWith.compress != null) {
this.compress = sourceMergeWith.compress;
}
if (sourceMergeWith.compressThreshold != -1) {
this.compressThreshold = sourceMergeWith.compressThreshold;
}
}
}
} }