diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetResponse.java index a989aabca4f..966dc891802 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/get/GetResponse.java @@ -23,6 +23,7 @@ import org.elasticsearch.ElasticSearchParseException; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.Unicode; import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.compress.lzf.LZFDecoder; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -135,6 +136,16 @@ public class GetResponse implements ActionResponse, Streamable, Iterable= 2 && buffer[0] == LZFChunk.BYTE_Z && buffer[1] == LZFChunk.BYTE_V; + } + public static byte[] decode(final byte[] sourceBuffer) throws IOException { byte[] result = new byte[calculateUncompressedSize(sourceBuffer)]; decode(sourceBuffer, result); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java index 34432160b43..0235c20b1e3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFEncoder.java @@ -30,6 +30,8 @@ package org.elasticsearch.common.compress.lzf; +import org.elasticsearch.common.thread.ThreadLocals; + import java.io.IOException; import java.io.OutputStream; @@ -71,6 +73,45 @@ public class LZFEncoder { } while (left > 0); } + public static ThreadLocal> cachedEncoder = new ThreadLocal>() { + @Override protected ThreadLocals.CleanableValue initialValue() { + return new ThreadLocals.CleanableValue(new ChunkEncoder(LZFChunk.MAX_CHUNK_LEN)); + } + }; + + public static byte[] encodeWithCache(byte[] data, int length) throws IOException { + int left = length; + ChunkEncoder enc = cachedEncoder.get().get(); + int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left); + LZFChunk first = enc.encodeChunk(data, 0, chunkLen); + left -= chunkLen; + // shortcut: if it all fit in, no need to coalesce: + if (left < 1) { + return first.getData(); + } + // otherwise need to get other chunks: + int resultBytes = first.length(); + int inputOffset = chunkLen; + LZFChunk last = first; + + do { + chunkLen = Math.min(left, LZFChunk.MAX_CHUNK_LEN); + LZFChunk chunk = enc.encodeChunk(data, inputOffset, chunkLen); + inputOffset += chunkLen; + left -= chunkLen; + resultBytes += chunk.length(); + last.setNext(chunk); + last = chunk; + } while (left > 0); + // and then coalesce returns into single contiguous byte array + byte[] result = new byte[resultBytes]; + int ptr = 0; + for (; first != null; first = first.next()) { + ptr = first.copyTo(result, ptr); + } + return result; + } + /** * Method for compressing given input data using LZF encoding and * block structure (compatible with lzf command line utility). diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFInputStream.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFInputStream.java index 768852c0999..e767b436d13 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFInputStream.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/compress/lzf/LZFInputStream.java @@ -26,7 +26,7 @@ public class LZFInputStream extends InputStream { public static int EOF_FLAG = -1; /* stream to be decompressed */ - private final InputStream inputStream; + private InputStream inputStream; /* the current buffer of compressed bytes */ private final byte[] compressedBytes = new byte[LZFChunk.MAX_CHUNK_LEN]; @@ -87,6 +87,12 @@ public class LZFInputStream extends InputStream { inputStream.close(); } + public void reset(InputStream is) { + this.inputStream = is; + bufferLength = 0; + bufferPosition = 0; + } + /** * Fill the uncompressed bytes buffer by reading the underlying inputStream. * diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java index 3dca6bb3e24..b79ceed62f4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/SourceFieldMapper.java @@ -21,12 +21,13 @@ package org.elasticsearch.index.mapper; import org.apache.lucene.document.Document; import org.apache.lucene.document.FieldSelector; +import org.apache.lucene.document.Fieldable; import org.elasticsearch.common.util.concurrent.ThreadSafe; /** * A mapper that maps the actual source of a generated document. * - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ @ThreadSafe public interface SourceFieldMapper extends FieldMapper, InternalMapper { @@ -38,6 +39,16 @@ public interface SourceFieldMapper extends FieldMapper, InternalMapper { */ boolean enabled(); + /** + * Is the source field compressed or not? + */ + boolean compressed(); + + /** + * Returns the native source value, if its compressed, then the compressed value is returned. + */ + byte[] nativeValue(Fieldable field); + byte[] value(Document document); /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapperParser.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapperParser.java index 86defa5a838..1acbd54b696 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapperParser.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapperParser.java @@ -224,6 +224,8 @@ public class XContentDocumentMapperParser extends AbstractIndexComponent impleme Object fieldNode = entry.getValue(); if (fieldName.equals("enabled")) { builder.enabled(nodeBooleanValue(fieldNode)); + } else if (fieldName.equals("compress") && fieldNode != null) { + builder.compress(nodeBooleanValue(fieldNode)); } } return builder; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentSourceFieldMapper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentSourceFieldMapper.java index c1fd8352c33..995547de93f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentSourceFieldMapper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentSourceFieldMapper.java @@ -20,6 +20,9 @@ package org.elasticsearch.index.mapper.xcontent; import org.apache.lucene.document.*; +import org.elasticsearch.ElasticSearchParseException; +import org.elasticsearch.common.compress.lzf.LZFDecoder; +import org.elasticsearch.common.compress.lzf.LZFEncoder; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.xcontent.builder.XContentBuilder; import org.elasticsearch.index.mapper.MergeMappingException; @@ -47,6 +50,8 @@ public class XContentSourceFieldMapper extends XContentFieldMapper imple private boolean enabled = Defaults.ENABLED; + private Boolean compress = null; + public Builder() { super(Defaults.NAME); } @@ -56,23 +61,31 @@ public class XContentSourceFieldMapper extends XContentFieldMapper imple return this; } + public Builder compress(boolean compress) { + this.compress = compress; + return this; + } + @Override public XContentSourceFieldMapper build(BuilderContext context) { - return new XContentSourceFieldMapper(name, enabled); + return new XContentSourceFieldMapper(name, enabled, compress); } } private final boolean enabled; + private Boolean compress; + private final SourceFieldSelector fieldSelector; protected XContentSourceFieldMapper() { - this(Defaults.NAME, Defaults.ENABLED); + this(Defaults.NAME, Defaults.ENABLED, null); } - protected XContentSourceFieldMapper(String name, boolean enabled) { + protected XContentSourceFieldMapper(String name, boolean enabled, Boolean compress) { super(new Names(name, name, name, name), Defaults.INDEX, Defaults.STORE, Defaults.TERM_VECTOR, Defaults.BOOST, Defaults.OMIT_NORMS, Defaults.OMIT_TERM_FREQ_AND_POSITIONS, Lucene.KEYWORD_ANALYZER, Lucene.KEYWORD_ANALYZER); this.enabled = enabled; + this.compress = compress; this.fieldSelector = new SourceFieldSelector(names.indexName()); } @@ -80,6 +93,10 @@ public class XContentSourceFieldMapper extends XContentFieldMapper imple return this.enabled; } + @Override public boolean compressed() { + return compress != null && compress; + } + public FieldSelector fieldSelector() { return this.fieldSelector; } @@ -88,7 +105,11 @@ public class XContentSourceFieldMapper extends XContentFieldMapper imple if (!enabled) { return null; } - return new Field(names.indexName(), context.source(), store); + byte[] data = context.source(); + if (compress != null && compress) { + data = LZFEncoder.encodeWithCache(data, data.length); + } + return new Field(names.indexName(), data, store); } @Override public byte[] value(Document document) { @@ -96,10 +117,25 @@ public class XContentSourceFieldMapper extends XContentFieldMapper imple return field == null ? null : value(field); } - @Override public byte[] value(Fieldable field) { + @Override public byte[] nativeValue(Fieldable field) { return field.getBinaryValue(); } + @Override public byte[] value(Fieldable field) { + byte[] value = field.getBinaryValue(); + if (value == null) { + return value; + } + if (LZFDecoder.isCompressed(value)) { + try { + return LZFDecoder.decode(value); + } catch (IOException e) { + throw new ElasticSearchParseException("failed to decompress source", e); + } + } + return value; + } + @Override public byte[] valueFromString(String value) { return null; } @@ -136,10 +172,18 @@ public class XContentSourceFieldMapper extends XContentFieldMapper imple builder.startObject(contentType()); builder.field("name", name()); builder.field("enabled", enabled); + if (compress != null) { + builder.field("compress", compress); + } builder.endObject(); } @Override public void merge(XContentMapper mergeWith, MergeContext mergeContext) throws MergeMappingException { - // do nothing here, no merging, but also no exception + XContentSourceFieldMapper sourceMergeWith = (XContentSourceFieldMapper) mergeWith; + if (!mergeContext.mergeFlags().simulate()) { + if (sourceMergeWith.compress != null) { + this.compress = sourceMergeWith.compress; + } + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java index 399d5942613..77cb46dbea3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/fetch/FetchPhase.java @@ -168,7 +168,7 @@ public class FetchPhase implements SearchPhase { private byte[] extractSource(Document doc, DocumentMapper documentMapper) { Fieldable sourceField = doc.getFieldable(SourceFieldMapper.NAME); if (sourceField != null) { - return documentMapper.sourceMapper().value(sourceField); + return documentMapper.sourceMapper().nativeValue(sourceField); } return null; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java index 0ff12a68abc..be966cf1f9e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/internal/InternalSearchHit.java @@ -23,6 +23,7 @@ import org.apache.lucene.search.Explanation; import org.elasticsearch.ElasticSearchParseException; import org.elasticsearch.common.Unicode; import org.elasticsearch.common.collect.ImmutableMap; +import org.elasticsearch.common.compress.lzf.LZFDecoder; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.trove.TIntObjectHashMap; @@ -126,7 +127,17 @@ public class InternalSearchHit implements SearchHit { } @Override public byte[] source() { - return source; + if (source == null) { + return null; + } + if (LZFDecoder.isCompressed(source)) { + try { + this.source = LZFDecoder.decode(source); + } catch (IOException e) { + throw new ElasticSearchParseException("failed to decompress source", e); + } + } + return this.source; } @Override public boolean isSourceEmpty() { @@ -141,7 +152,7 @@ public class InternalSearchHit implements SearchHit { if (source == null) { return null; } - return Unicode.fromBytes(source); + return Unicode.fromBytes(source()); } @SuppressWarnings({"unchecked"}) @@ -152,6 +163,7 @@ public class InternalSearchHit implements SearchHit { if (sourceAsMap != null) { return sourceAsMap; } + byte[] source = source(); XContentParser parser = null; try { parser = XContentFactory.xContent(source).createParser(source); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/compress/SearchSourceCompressTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/compress/SearchSourceCompressTests.java new file mode 100644 index 00000000000..82df9f16843 --- /dev/null +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/search/compress/SearchSourceCompressTests.java @@ -0,0 +1,118 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.test.integration.search.compress; + +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.common.xcontent.builder.XContentBuilder; +import org.elasticsearch.index.query.xcontent.QueryBuilders; +import org.elasticsearch.test.integration.AbstractNodesTests; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; + +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; + +/** + * @author kimchy (shay.banon) + */ +public class SearchSourceCompressTests extends AbstractNodesTests { + + private Client client; + + @BeforeClass public void createNodes() throws Exception { + startNode("node1"); + startNode("node2"); + client = getClient(); + } + + @AfterClass public void closeNodes() { + client.close(); + closeAllNodes(); + } + + protected Client getClient() { + return client("node1"); + } + + @Test public void testSourceFieldCompressed() throws IOException { + verifySource(true); + } + + @Test public void testSourceFieldPlainExplciit() throws IOException { + verifySource(false); + } + + @Test public void testSourceFieldPlain() throws IOException { + verifySource(null); + } + + private void verifySource(Boolean compress) throws IOException { + try { + client.admin().indices().prepareDelete("test").execute().actionGet(); + } catch (Exception e) { + // ignore + } + client.admin().indices().prepareCreate("test").execute().actionGet(); + client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + + String mapping = XContentFactory.contentTextBuilder(XContentType.JSON).startObject().startObject("type1") + .startObject("_source").field("compress", compress).endObject() + .endObject().endObject().string(); + + client.admin().indices().preparePutMapping().setType("type1").setSource(mapping).execute().actionGet(); + + for (int i = 1; i < 100; i++) { + client.prepareIndex("test", "type1", Integer.toString(i)).setSource(buildSource(i)).execute().actionGet(); + } + client.prepareIndex("test", "type1", Integer.toString(10000)).setSource(buildSource(10000)).execute().actionGet(); + + client.admin().indices().prepareRefresh().execute().actionGet(); + + for (int i = 1; i < 100; i++) { + GetResponse getResponse = client.prepareGet("test", "type1", Integer.toString(i)).execute().actionGet(); + assertThat(getResponse.source(), equalTo(buildSource(i).copiedBytes())); + } + GetResponse getResponse = client.prepareGet("test", "type1", Integer.toString(10000)).execute().actionGet(); + assertThat(getResponse.source(), equalTo(buildSource(10000).copiedBytes())); + + for (int i = 1; i < 100; i++) { + SearchResponse searchResponse = client.prepareSearch().setQuery(QueryBuilders.termQuery("_id", Integer.toString(i))).execute().actionGet(); + assertThat(searchResponse.hits().getTotalHits(), equalTo(1l)); + assertThat(searchResponse.hits().getAt(0).source(), equalTo(buildSource(i).copiedBytes())); + } + } + + private XContentBuilder buildSource(int count) throws IOException { + XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); + StringBuilder sb = new StringBuilder(); + for (int j = 0; j < count; j++) { + sb.append("value").append(j).append(' '); + } + builder.field("field", sb.toString()); + return builder.endObject(); + } +}