Mapping: _source mapping to allow for format to convert to (if needed), closes #1639.
This commit is contained in:
parent
7d757a2c2e
commit
da433df217
|
@ -21,8 +21,6 @@ package org.elasticsearch.common.xcontent;
|
|||
|
||||
/**
|
||||
* The content type of {@link org.elasticsearch.common.xcontent.XContent}.
|
||||
*
|
||||
*
|
||||
*/
|
||||
public enum XContentType {
|
||||
|
||||
|
@ -34,6 +32,11 @@ public enum XContentType {
|
|||
public String restContentType() {
|
||||
return "application/json; charset=UTF-8";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String shortName() {
|
||||
return "json";
|
||||
}
|
||||
},
|
||||
/**
|
||||
* The jackson based smile binary format. Fast and compact binary format.
|
||||
|
@ -43,6 +46,11 @@ public enum XContentType {
|
|||
public String restContentType() {
|
||||
return "application/smile";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String shortName() {
|
||||
return "smile";
|
||||
}
|
||||
};
|
||||
|
||||
public static XContentType fromRestContentType(String contentType) {
|
||||
|
@ -71,4 +79,6 @@ public enum XContentType {
|
|||
}
|
||||
|
||||
public abstract String restContentType();
|
||||
|
||||
public abstract String shortName();
|
||||
}
|
||||
|
|
|
@ -267,6 +267,10 @@ public class JsonXContentGenerator implements XContentGenerator {
|
|||
|
||||
@Override
|
||||
public void copyCurrentStructure(XContentParser parser) throws IOException {
|
||||
// the start of the parser
|
||||
if (parser.currentToken() == null) {
|
||||
parser.nextToken();
|
||||
}
|
||||
if (parser instanceof JsonXContentParser) {
|
||||
generator.copyCurrentStructure(((JsonXContentParser) parser).parser);
|
||||
} else {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.index.mapper.internal;
|
||||
|
||||
import com.google.common.base.Objects;
|
||||
import org.apache.lucene.document.Document;
|
||||
import org.apache.lucene.document.Field;
|
||||
import org.apache.lucene.document.Fieldable;
|
||||
|
@ -27,9 +28,7 @@ import org.elasticsearch.common.Strings;
|
|||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.compress.lzf.LZF;
|
||||
import org.elasticsearch.common.compress.lzf.LZFDecoder;
|
||||
import org.elasticsearch.common.io.stream.CachedStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.LZFStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.*;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.lucene.document.ResetFieldSelector;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
@ -46,6 +45,7 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeBooleanValue;
|
||||
import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeStringValue;
|
||||
import static org.elasticsearch.index.mapper.MapperBuilders.source;
|
||||
|
||||
/**
|
||||
|
@ -61,6 +61,7 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
|
|||
public static final String NAME = SourceFieldMapper.NAME;
|
||||
public static final boolean ENABLED = true;
|
||||
public static final long COMPRESS_THRESHOLD = -1;
|
||||
public static final String FORMAT = null; // default format is to use the one provided
|
||||
public static final Field.Index INDEX = Field.Index.NO;
|
||||
public static final Field.Store STORE = Field.Store.YES;
|
||||
public static final boolean OMIT_NORMS = true;
|
||||
|
@ -77,6 +78,8 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
|
|||
|
||||
private Boolean compress = null;
|
||||
|
||||
private String format = Defaults.FORMAT;
|
||||
|
||||
private String[] includes = Defaults.INCLUDES;
|
||||
private String[] excludes = Defaults.EXCLUDES;
|
||||
|
||||
|
@ -99,6 +102,11 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder format(String format) {
|
||||
this.format = format;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Builder includes(String[] includes) {
|
||||
this.includes = includes;
|
||||
return this;
|
||||
|
@ -111,7 +119,7 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
|
|||
|
||||
@Override
|
||||
public SourceFieldMapper build(BuilderContext context) {
|
||||
return new SourceFieldMapper(name, enabled, compress, compressThreshold, includes, excludes);
|
||||
return new SourceFieldMapper(name, enabled, format, compress, compressThreshold, includes, excludes);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -135,6 +143,8 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
|
|||
builder.compressThreshold(ByteSizeValue.parseBytesSizeValue(fieldNode.toString()).bytes());
|
||||
builder.compress(true);
|
||||
}
|
||||
} else if ("format".equals(fieldName)) {
|
||||
builder.format(nodeStringValue(fieldNode, null));
|
||||
} else if (fieldName.equals("includes")) {
|
||||
List<Object> values = (List<Object>) fieldNode;
|
||||
String[] includes = new String[values.size()];
|
||||
|
@ -166,11 +176,15 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
|
|||
|
||||
private String[] excludes;
|
||||
|
||||
private String format;
|
||||
|
||||
private XContentType formatContentType;
|
||||
|
||||
public SourceFieldMapper() {
|
||||
this(Defaults.NAME, Defaults.ENABLED, null, -1, Defaults.INCLUDES, Defaults.EXCLUDES);
|
||||
this(Defaults.NAME, Defaults.ENABLED, Defaults.FORMAT, null, -1, Defaults.INCLUDES, Defaults.EXCLUDES);
|
||||
}
|
||||
|
||||
protected SourceFieldMapper(String name, boolean enabled, Boolean compress, long compressThreshold, String[] includes, String[] excludes) {
|
||||
protected SourceFieldMapper(String name, boolean enabled, String format, Boolean compress, long compressThreshold, String[] includes, String[] excludes) {
|
||||
super(new Names(name, name, name, name), Defaults.INDEX, Defaults.STORE, Defaults.TERM_VECTOR, Defaults.BOOST,
|
||||
Defaults.OMIT_NORMS, Defaults.OMIT_TERM_FREQ_AND_POSITIONS, Lucene.KEYWORD_ANALYZER, Lucene.KEYWORD_ANALYZER);
|
||||
this.enabled = enabled;
|
||||
|
@ -178,6 +192,8 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
|
|||
this.compressThreshold = compressThreshold;
|
||||
this.includes = includes;
|
||||
this.excludes = excludes;
|
||||
this.format = format;
|
||||
this.formatContentType = format == null ? null : XContentType.fromRestContentType(format);
|
||||
}
|
||||
|
||||
public boolean enabled() {
|
||||
|
@ -239,7 +255,11 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
|
|||
} else {
|
||||
streamOutput = cachedEntry.cachedBytes();
|
||||
}
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(mapTuple.v1(), streamOutput).map(filteredSource);
|
||||
XContentType contentType = formatContentType;
|
||||
if (contentType == null) {
|
||||
contentType = mapTuple.v1();
|
||||
}
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(contentType, streamOutput).map(filteredSource);
|
||||
builder.close();
|
||||
|
||||
data = cachedEntry.bytes().copiedByteArray();
|
||||
|
@ -250,17 +270,71 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
|
|||
} else if (compress != null && compress && !LZF.isCompressed(data, dataOffset, dataLength)) {
|
||||
if (compressThreshold == -1 || dataLength > compressThreshold) {
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
LZFStreamOutput streamOutput = cachedEntry.cachedLZFBytes();
|
||||
streamOutput.writeBytes(data, dataOffset, dataLength);
|
||||
streamOutput.flush();
|
||||
// we copy over the byte array, since we need to push back the cached entry
|
||||
// TODO, we we had a handle into when we are done with parsing, then we push back then and not copy over bytes
|
||||
data = cachedEntry.bytes().copiedByteArray();
|
||||
dataOffset = 0;
|
||||
dataLength = data.length;
|
||||
CachedStreamOutput.pushEntry(cachedEntry);
|
||||
// update the data in the context, so it can be compressed and stored compressed outside...
|
||||
context.source(data, dataOffset, dataLength);
|
||||
try {
|
||||
XContentType contentType = XContentFactory.xContentType(data, dataOffset, dataLength);
|
||||
if (formatContentType != null && formatContentType != contentType) {
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, cachedEntry.cachedLZFBytes());
|
||||
builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(data, dataOffset, dataLength));
|
||||
builder.close();
|
||||
} else {
|
||||
LZFStreamOutput streamOutput = cachedEntry.cachedLZFBytes();
|
||||
streamOutput.writeBytes(data, dataOffset, dataLength);
|
||||
streamOutput.flush();
|
||||
}
|
||||
// we copy over the byte array, since we need to push back the cached entry
|
||||
// TODO, we we had a handle into when we are done with parsing, then we push back then and not copy over bytes
|
||||
data = cachedEntry.bytes().copiedByteArray();
|
||||
dataOffset = 0;
|
||||
dataLength = data.length;
|
||||
// update the data in the context, so it can be compressed and stored compressed outside...
|
||||
context.source(data, dataOffset, dataLength);
|
||||
} finally {
|
||||
CachedStreamOutput.pushEntry(cachedEntry);
|
||||
}
|
||||
}
|
||||
} else if (formatContentType != null) {
|
||||
// see if we need to convert the content type
|
||||
if (LZF.isCompressed(data, dataOffset, dataLength)) {
|
||||
BytesStreamInput siBytes = new BytesStreamInput(data, dataOffset, dataLength, false);
|
||||
LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes);
|
||||
XContentType contentType = XContentFactory.xContentType(siLzf);
|
||||
siLzf.resetToBufferStart();
|
||||
if (contentType != formatContentType) {
|
||||
// we need to reread and store back, compressed....
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
try {
|
||||
LZFStreamOutput streamOutput = cachedEntry.cachedLZFBytes();
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, streamOutput);
|
||||
builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(siLzf));
|
||||
builder.close();
|
||||
data = cachedEntry.bytes().copiedByteArray();
|
||||
dataOffset = 0;
|
||||
dataLength = data.length;
|
||||
// update the data in the context, so we store it in the translog in this format
|
||||
context.source(data, dataOffset, dataLength);
|
||||
} finally {
|
||||
CachedStreamOutput.pushEntry(cachedEntry);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
XContentType contentType = XContentFactory.xContentType(data, dataOffset, dataLength);
|
||||
if (contentType != formatContentType) {
|
||||
// we need to reread and store back
|
||||
// we need to reread and store back, compressed....
|
||||
CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry();
|
||||
try {
|
||||
XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, cachedEntry.cachedBytes());
|
||||
builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(data, dataOffset, dataLength));
|
||||
builder.close();
|
||||
data = cachedEntry.bytes().copiedByteArray();
|
||||
dataOffset = 0;
|
||||
dataLength = data.length;
|
||||
// update the data in the context, so we store it in the translog in this format
|
||||
context.source(data, dataOffset, dataLength);
|
||||
} finally {
|
||||
CachedStreamOutput.pushEntry(cachedEntry);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return new Field(names().indexName(), data, dataOffset, dataLength);
|
||||
|
@ -321,6 +395,9 @@ public class SourceFieldMapper extends AbstractFieldMapper<byte[]> implements In
|
|||
if (enabled != Defaults.ENABLED) {
|
||||
builder.field("enabled", enabled);
|
||||
}
|
||||
if (!Objects.equal(format, Defaults.FORMAT)) {
|
||||
builder.field("format", format);
|
||||
}
|
||||
if (compress != null) {
|
||||
builder.field("compress", compress);
|
||||
}
|
||||
|
|
|
@ -20,9 +20,14 @@
|
|||
package org.elasticsearch.test.unit.index.mapper.source;
|
||||
|
||||
import org.apache.lucene.document.Fieldable;
|
||||
import org.elasticsearch.common.compress.lzf.LZF;
|
||||
import org.elasticsearch.common.compress.lzf.LZFDecoder;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.index.mapper.*;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.MapperParsingException;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||
import org.elasticsearch.test.unit.index.mapper.MapperTests;
|
||||
import org.testng.annotations.Test;
|
||||
|
||||
|
@ -36,6 +41,73 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
*/
|
||||
public class DefaultSourceMappingTests {
|
||||
|
||||
@Test
|
||||
public void testNoFormat() throws Exception {
|
||||
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
|
||||
.startObject("_source").endObject()
|
||||
.endObject().endObject().string();
|
||||
|
||||
DocumentMapper documentMapper = MapperTests.newParser().parse(mapping);
|
||||
ParsedDocument doc = documentMapper.parse("type", "1", XContentFactory.jsonBuilder().startObject()
|
||||
.field("field", "value")
|
||||
.endObject().copiedBytes());
|
||||
|
||||
assertThat(XContentFactory.xContentType(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(XContentType.JSON));
|
||||
|
||||
documentMapper = MapperTests.newParser().parse(mapping);
|
||||
doc = documentMapper.parse("type", "1", XContentFactory.smileBuilder().startObject()
|
||||
.field("field", "value")
|
||||
.endObject().copiedBytes());
|
||||
|
||||
assertThat(XContentFactory.xContentType(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(XContentType.SMILE));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJsonFormat() throws Exception {
|
||||
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
|
||||
.startObject("_source").field("format", "json").endObject()
|
||||
.endObject().endObject().string();
|
||||
|
||||
DocumentMapper documentMapper = MapperTests.newParser().parse(mapping);
|
||||
ParsedDocument doc = documentMapper.parse("type", "1", XContentFactory.jsonBuilder().startObject()
|
||||
.field("field", "value")
|
||||
.endObject().copiedBytes());
|
||||
|
||||
assertThat(XContentFactory.xContentType(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(XContentType.JSON));
|
||||
|
||||
documentMapper = MapperTests.newParser().parse(mapping);
|
||||
doc = documentMapper.parse("type", "1", XContentFactory.smileBuilder().startObject()
|
||||
.field("field", "value")
|
||||
.endObject().copiedBytes());
|
||||
|
||||
assertThat(XContentFactory.xContentType(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(XContentType.JSON));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testJsonFormatCompressed() throws Exception {
|
||||
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
|
||||
.startObject("_source").field("format", "json").field("compress", true).endObject()
|
||||
.endObject().endObject().string();
|
||||
|
||||
DocumentMapper documentMapper = MapperTests.newParser().parse(mapping);
|
||||
ParsedDocument doc = documentMapper.parse("type", "1", XContentFactory.jsonBuilder().startObject()
|
||||
.field("field", "value")
|
||||
.endObject().copiedBytes());
|
||||
|
||||
assertThat(LZF.isCompressed(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(true));
|
||||
byte[] uncompressed = LZFDecoder.decode(doc.source(), doc.sourceOffset(), doc.sourceLength());
|
||||
assertThat(XContentFactory.xContentType(uncompressed), equalTo(XContentType.JSON));
|
||||
|
||||
documentMapper = MapperTests.newParser().parse(mapping);
|
||||
doc = documentMapper.parse("type", "1", XContentFactory.smileBuilder().startObject()
|
||||
.field("field", "value")
|
||||
.endObject().copiedBytes());
|
||||
|
||||
assertThat(LZF.isCompressed(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(true));
|
||||
uncompressed = LZFDecoder.decode(doc.source(), doc.sourceOffset(), doc.sourceLength());
|
||||
assertThat(XContentFactory.xContentType(uncompressed), equalTo(XContentType.JSON));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncludeExclude() throws Exception {
|
||||
String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")
|
||||
|
|
Loading…
Reference in New Issue