From da433df217e2239fe874de00bd4b360f4427dc75 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Thu, 26 Jan 2012 00:18:46 +0200 Subject: [PATCH] Mapping: _source mapping to allow for format to convert to (if needed), closes #1639. --- .../common/xcontent/XContentType.java | 14 ++- .../xcontent/json/JsonXContentGenerator.java | 4 + .../mapper/internal/SourceFieldMapper.java | 113 +++++++++++++++--- .../source/DefaultSourceMappingTests.java | 74 +++++++++++- 4 files changed, 184 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/elasticsearch/common/xcontent/XContentType.java b/src/main/java/org/elasticsearch/common/xcontent/XContentType.java index e656c74474e..173e8a335bc 100644 --- a/src/main/java/org/elasticsearch/common/xcontent/XContentType.java +++ b/src/main/java/org/elasticsearch/common/xcontent/XContentType.java @@ -21,8 +21,6 @@ package org.elasticsearch.common.xcontent; /** * The content type of {@link org.elasticsearch.common.xcontent.XContent}. - * - * */ public enum XContentType { @@ -34,6 +32,11 @@ public enum XContentType { public String restContentType() { return "application/json; charset=UTF-8"; } + + @Override + public String shortName() { + return "json"; + } }, /** * The jackson based smile binary format. Fast and compact binary format. @@ -43,6 +46,11 @@ public enum XContentType { public String restContentType() { return "application/smile"; } + + @Override + public String shortName() { + return "smile"; + } }; public static XContentType fromRestContentType(String contentType) { @@ -71,4 +79,6 @@ public enum XContentType { } public abstract String restContentType(); + + public abstract String shortName(); } diff --git a/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContentGenerator.java b/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContentGenerator.java index 2b7c6757aec..d5545241eb6 100644 --- a/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContentGenerator.java +++ b/src/main/java/org/elasticsearch/common/xcontent/json/JsonXContentGenerator.java @@ -267,6 +267,10 @@ public class JsonXContentGenerator implements XContentGenerator { @Override public void copyCurrentStructure(XContentParser parser) throws IOException { + // the start of the parser + if (parser.currentToken() == null) { + parser.nextToken(); + } if (parser instanceof JsonXContentParser) { generator.copyCurrentStructure(((JsonXContentParser) parser).parser); } else { diff --git a/src/main/java/org/elasticsearch/index/mapper/internal/SourceFieldMapper.java b/src/main/java/org/elasticsearch/index/mapper/internal/SourceFieldMapper.java index 34ac7473b86..e6be08d1828 100644 --- a/src/main/java/org/elasticsearch/index/mapper/internal/SourceFieldMapper.java +++ b/src/main/java/org/elasticsearch/index/mapper/internal/SourceFieldMapper.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.mapper.internal; +import com.google.common.base.Objects; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.Fieldable; @@ -27,9 +28,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.compress.lzf.LZF; import org.elasticsearch.common.compress.lzf.LZFDecoder; -import org.elasticsearch.common.io.stream.CachedStreamOutput; -import org.elasticsearch.common.io.stream.LZFStreamOutput; -import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.*; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.document.ResetFieldSelector; import org.elasticsearch.common.unit.ByteSizeValue; @@ -46,6 +45,7 @@ import java.util.List; import java.util.Map; import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeBooleanValue; +import static org.elasticsearch.common.xcontent.support.XContentMapValues.nodeStringValue; import static org.elasticsearch.index.mapper.MapperBuilders.source; /** @@ -61,6 +61,7 @@ public class SourceFieldMapper extends AbstractFieldMapper implements In public static final String NAME = SourceFieldMapper.NAME; public static final boolean ENABLED = true; public static final long COMPRESS_THRESHOLD = -1; + public static final String FORMAT = null; // default format is to use the one provided public static final Field.Index INDEX = Field.Index.NO; public static final Field.Store STORE = Field.Store.YES; public static final boolean OMIT_NORMS = true; @@ -77,6 +78,8 @@ public class SourceFieldMapper extends AbstractFieldMapper implements In private Boolean compress = null; + private String format = Defaults.FORMAT; + private String[] includes = Defaults.INCLUDES; private String[] excludes = Defaults.EXCLUDES; @@ -99,6 +102,11 @@ public class SourceFieldMapper extends AbstractFieldMapper implements In return this; } + public Builder format(String format) { + this.format = format; + return this; + } + public Builder includes(String[] includes) { this.includes = includes; return this; @@ -111,7 +119,7 @@ public class SourceFieldMapper extends AbstractFieldMapper implements In @Override public SourceFieldMapper build(BuilderContext context) { - return new SourceFieldMapper(name, enabled, compress, compressThreshold, includes, excludes); + return new SourceFieldMapper(name, enabled, format, compress, compressThreshold, includes, excludes); } } @@ -135,6 +143,8 @@ public class SourceFieldMapper extends AbstractFieldMapper implements In builder.compressThreshold(ByteSizeValue.parseBytesSizeValue(fieldNode.toString()).bytes()); builder.compress(true); } + } else if ("format".equals(fieldName)) { + builder.format(nodeStringValue(fieldNode, null)); } else if (fieldName.equals("includes")) { List values = (List) fieldNode; String[] includes = new String[values.size()]; @@ -166,11 +176,15 @@ public class SourceFieldMapper extends AbstractFieldMapper implements In private String[] excludes; + private String format; + + private XContentType formatContentType; + public SourceFieldMapper() { - this(Defaults.NAME, Defaults.ENABLED, null, -1, Defaults.INCLUDES, Defaults.EXCLUDES); + this(Defaults.NAME, Defaults.ENABLED, Defaults.FORMAT, null, -1, Defaults.INCLUDES, Defaults.EXCLUDES); } - protected SourceFieldMapper(String name, boolean enabled, Boolean compress, long compressThreshold, String[] includes, String[] excludes) { + protected SourceFieldMapper(String name, boolean enabled, String format, Boolean compress, long compressThreshold, String[] includes, String[] excludes) { super(new Names(name, name, name, name), Defaults.INDEX, Defaults.STORE, Defaults.TERM_VECTOR, Defaults.BOOST, Defaults.OMIT_NORMS, Defaults.OMIT_TERM_FREQ_AND_POSITIONS, Lucene.KEYWORD_ANALYZER, Lucene.KEYWORD_ANALYZER); this.enabled = enabled; @@ -178,6 +192,8 @@ public class SourceFieldMapper extends AbstractFieldMapper implements In this.compressThreshold = compressThreshold; this.includes = includes; this.excludes = excludes; + this.format = format; + this.formatContentType = format == null ? null : XContentType.fromRestContentType(format); } public boolean enabled() { @@ -239,7 +255,11 @@ public class SourceFieldMapper extends AbstractFieldMapper implements In } else { streamOutput = cachedEntry.cachedBytes(); } - XContentBuilder builder = XContentFactory.contentBuilder(mapTuple.v1(), streamOutput).map(filteredSource); + XContentType contentType = formatContentType; + if (contentType == null) { + contentType = mapTuple.v1(); + } + XContentBuilder builder = XContentFactory.contentBuilder(contentType, streamOutput).map(filteredSource); builder.close(); data = cachedEntry.bytes().copiedByteArray(); @@ -250,17 +270,71 @@ public class SourceFieldMapper extends AbstractFieldMapper implements In } else if (compress != null && compress && !LZF.isCompressed(data, dataOffset, dataLength)) { if (compressThreshold == -1 || dataLength > compressThreshold) { CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); - LZFStreamOutput streamOutput = cachedEntry.cachedLZFBytes(); - streamOutput.writeBytes(data, dataOffset, dataLength); - streamOutput.flush(); - // we copy over the byte array, since we need to push back the cached entry - // TODO, we we had a handle into when we are done with parsing, then we push back then and not copy over bytes - data = cachedEntry.bytes().copiedByteArray(); - dataOffset = 0; - dataLength = data.length; - CachedStreamOutput.pushEntry(cachedEntry); - // update the data in the context, so it can be compressed and stored compressed outside... - context.source(data, dataOffset, dataLength); + try { + XContentType contentType = XContentFactory.xContentType(data, dataOffset, dataLength); + if (formatContentType != null && formatContentType != contentType) { + XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, cachedEntry.cachedLZFBytes()); + builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(data, dataOffset, dataLength)); + builder.close(); + } else { + LZFStreamOutput streamOutput = cachedEntry.cachedLZFBytes(); + streamOutput.writeBytes(data, dataOffset, dataLength); + streamOutput.flush(); + } + // we copy over the byte array, since we need to push back the cached entry + // TODO, we we had a handle into when we are done with parsing, then we push back then and not copy over bytes + data = cachedEntry.bytes().copiedByteArray(); + dataOffset = 0; + dataLength = data.length; + // update the data in the context, so it can be compressed and stored compressed outside... + context.source(data, dataOffset, dataLength); + } finally { + CachedStreamOutput.pushEntry(cachedEntry); + } + } + } else if (formatContentType != null) { + // see if we need to convert the content type + if (LZF.isCompressed(data, dataOffset, dataLength)) { + BytesStreamInput siBytes = new BytesStreamInput(data, dataOffset, dataLength, false); + LZFStreamInput siLzf = CachedStreamInput.cachedLzf(siBytes); + XContentType contentType = XContentFactory.xContentType(siLzf); + siLzf.resetToBufferStart(); + if (contentType != formatContentType) { + // we need to reread and store back, compressed.... + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); + try { + LZFStreamOutput streamOutput = cachedEntry.cachedLZFBytes(); + XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, streamOutput); + builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(siLzf)); + builder.close(); + data = cachedEntry.bytes().copiedByteArray(); + dataOffset = 0; + dataLength = data.length; + // update the data in the context, so we store it in the translog in this format + context.source(data, dataOffset, dataLength); + } finally { + CachedStreamOutput.pushEntry(cachedEntry); + } + } + } else { + XContentType contentType = XContentFactory.xContentType(data, dataOffset, dataLength); + if (contentType != formatContentType) { + // we need to reread and store back + // we need to reread and store back, compressed.... + CachedStreamOutput.Entry cachedEntry = CachedStreamOutput.popEntry(); + try { + XContentBuilder builder = XContentFactory.contentBuilder(formatContentType, cachedEntry.cachedBytes()); + builder.copyCurrentStructure(XContentFactory.xContent(contentType).createParser(data, dataOffset, dataLength)); + builder.close(); + data = cachedEntry.bytes().copiedByteArray(); + dataOffset = 0; + dataLength = data.length; + // update the data in the context, so we store it in the translog in this format + context.source(data, dataOffset, dataLength); + } finally { + CachedStreamOutput.pushEntry(cachedEntry); + } + } } } return new Field(names().indexName(), data, dataOffset, dataLength); @@ -321,6 +395,9 @@ public class SourceFieldMapper extends AbstractFieldMapper implements In if (enabled != Defaults.ENABLED) { builder.field("enabled", enabled); } + if (!Objects.equal(format, Defaults.FORMAT)) { + builder.field("format", format); + } if (compress != null) { builder.field("compress", compress); } diff --git a/src/test/java/org/elasticsearch/test/unit/index/mapper/source/DefaultSourceMappingTests.java b/src/test/java/org/elasticsearch/test/unit/index/mapper/source/DefaultSourceMappingTests.java index 364b79946d9..476e9e7b5f6 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/mapper/source/DefaultSourceMappingTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/mapper/source/DefaultSourceMappingTests.java @@ -20,9 +20,14 @@ package org.elasticsearch.test.unit.index.mapper.source; import org.apache.lucene.document.Fieldable; +import org.elasticsearch.common.compress.lzf.LZF; +import org.elasticsearch.common.compress.lzf.LZFDecoder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.index.mapper.*; +import org.elasticsearch.index.mapper.DocumentMapper; +import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.test.unit.index.mapper.MapperTests; import org.testng.annotations.Test; @@ -36,6 +41,73 @@ import static org.hamcrest.Matchers.equalTo; */ public class DefaultSourceMappingTests { + @Test + public void testNoFormat() throws Exception { + String mapping = XContentFactory.jsonBuilder().startObject().startObject("type") + .startObject("_source").endObject() + .endObject().endObject().string(); + + DocumentMapper documentMapper = MapperTests.newParser().parse(mapping); + ParsedDocument doc = documentMapper.parse("type", "1", XContentFactory.jsonBuilder().startObject() + .field("field", "value") + .endObject().copiedBytes()); + + assertThat(XContentFactory.xContentType(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(XContentType.JSON)); + + documentMapper = MapperTests.newParser().parse(mapping); + doc = documentMapper.parse("type", "1", XContentFactory.smileBuilder().startObject() + .field("field", "value") + .endObject().copiedBytes()); + + assertThat(XContentFactory.xContentType(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(XContentType.SMILE)); + } + + @Test + public void testJsonFormat() throws Exception { + String mapping = XContentFactory.jsonBuilder().startObject().startObject("type") + .startObject("_source").field("format", "json").endObject() + .endObject().endObject().string(); + + DocumentMapper documentMapper = MapperTests.newParser().parse(mapping); + ParsedDocument doc = documentMapper.parse("type", "1", XContentFactory.jsonBuilder().startObject() + .field("field", "value") + .endObject().copiedBytes()); + + assertThat(XContentFactory.xContentType(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(XContentType.JSON)); + + documentMapper = MapperTests.newParser().parse(mapping); + doc = documentMapper.parse("type", "1", XContentFactory.smileBuilder().startObject() + .field("field", "value") + .endObject().copiedBytes()); + + assertThat(XContentFactory.xContentType(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(XContentType.JSON)); + } + + @Test + public void testJsonFormatCompressed() throws Exception { + String mapping = XContentFactory.jsonBuilder().startObject().startObject("type") + .startObject("_source").field("format", "json").field("compress", true).endObject() + .endObject().endObject().string(); + + DocumentMapper documentMapper = MapperTests.newParser().parse(mapping); + ParsedDocument doc = documentMapper.parse("type", "1", XContentFactory.jsonBuilder().startObject() + .field("field", "value") + .endObject().copiedBytes()); + + assertThat(LZF.isCompressed(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(true)); + byte[] uncompressed = LZFDecoder.decode(doc.source(), doc.sourceOffset(), doc.sourceLength()); + assertThat(XContentFactory.xContentType(uncompressed), equalTo(XContentType.JSON)); + + documentMapper = MapperTests.newParser().parse(mapping); + doc = documentMapper.parse("type", "1", XContentFactory.smileBuilder().startObject() + .field("field", "value") + .endObject().copiedBytes()); + + assertThat(LZF.isCompressed(doc.source(), doc.sourceOffset(), doc.sourceLength()), equalTo(true)); + uncompressed = LZFDecoder.decode(doc.source(), doc.sourceOffset(), doc.sourceLength()); + assertThat(XContentFactory.xContentType(uncompressed), equalTo(XContentType.JSON)); + } + @Test public void testIncludeExclude() throws Exception { String mapping = XContentFactory.jsonBuilder().startObject().startObject("type")