support compression of the source field

This commit is contained in:
kimchy 2010-08-15 17:57:14 +03:00
parent ee33ee457a
commit 14237317fc
11 changed files with 263 additions and 13 deletions

View File

@ -23,6 +23,7 @@ import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -135,6 +136,16 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
* The source of the document if exists.
*/
public byte[] source() {
if (source == null) {
return null;
}
if (LZFDecoder.isCompressed(source)) {
try {
this.source = LZFDecoder.decode(source);
} catch (IOException e) {
throw new ElasticSearchParseException("failed to decompress source", e);
}
}
return this.source;
}
@ -152,7 +163,7 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
if (source == null) {
return null;
}
return Unicode.fromBytes(source);
return Unicode.fromBytes(source());
}
/**
@ -166,6 +177,7 @@ public class GetResponse implements ActionResponse, Streamable, Iterable<GetFiel
if (sourceAsMap != null) {
return sourceAsMap;
}
byte[] source = source();
XContentParser parser = null;
try {
parser = XContentFactory.xContent(source).createParser(source);

View File

@ -157,7 +157,7 @@ public class TransportGetAction extends TransportSingleOperationAction<GetReques
byte[] source = null;
Fieldable sourceField = doc.getFieldable(documentMapper.sourceMapper().names().indexName());
if (sourceField != null) {
source = documentMapper.sourceMapper().value(sourceField);
source = documentMapper.sourceMapper().nativeValue(sourceField);
doc.removeField(documentMapper.sourceMapper().names().indexName());
}
return source;

View File

@ -48,6 +48,10 @@ public class LZFDecoder {
private LZFDecoder() {
}
public static boolean isCompressed(final byte[] buffer) {
return buffer.length >= 2 && buffer[0] == LZFChunk.BYTE_Z && buffer[1] == LZFChunk.BYTE_V;
}
public static byte[] decode(final byte[] sourceBuffer) throws IOException {
byte[] result = new byte[calculateUncompressedSize(sourceBuffer)];
decode(sourceBuffer, result);

View File

@ -30,6 +30,8 @@
package org.elasticsearch.common.compress.lzf;
import org.elasticsearch.common.thread.ThreadLocals;
import java.io.IOException;
import java.io.OutputStream;
@ -71,6 +73,45 @@ public class LZFEncoder {
} while (left > 0);
}
public static ThreadLocal<ThreadLocals.CleanableValue<ChunkEncoder>> cachedEncoder = new ThreadLocal<ThreadLocals.CleanableValue<ChunkEncoder>>() {
@Override protected ThreadLocals.CleanableValue<ChunkEncoder> initialValue() {
return new ThreadLocals.CleanableValue<ChunkEncoder>(new ChunkEncoder(LZFChunk.MAX_CHUNK_LEN));
}
};
public static byte[] encodeWithCache(byte[] data, int length) throws IOException {
int left = length;
ChunkEncoder enc = cachedEncoder.get().get();
int chunkLen = Math.min(LZFChunk.MAX_CHUNK_LEN, left);
LZFChunk first = enc.encodeChunk(data, 0, chunkLen);
left -= chunkLen;
// shortcut: if it all fit in, no need to coalesce:
if (left < 1) {
return first.getData();
}
// otherwise need to get other chunks:
int resultBytes = first.length();
int inputOffset = chunkLen;
LZFChunk last = first;
do {
chunkLen = Math.min(left, LZFChunk.MAX_CHUNK_LEN);
LZFChunk chunk = enc.encodeChunk(data, inputOffset, chunkLen);
inputOffset += chunkLen;
left -= chunkLen;
resultBytes += chunk.length();
last.setNext(chunk);
last = chunk;
} while (left > 0);
// and then coalesce returns into single contiguous byte array
byte[] result = new byte[resultBytes];
int ptr = 0;
for (; first != null; first = first.next()) {
ptr = first.copyTo(result, ptr);
}
return result;
}
/**
* Method for compressing given input data using LZF encoding and
* block structure (compatible with lzf command line utility).

View File

@ -26,7 +26,7 @@ public class LZFInputStream extends InputStream {
public static int EOF_FLAG = -1;
/* stream to be decompressed */
private final InputStream inputStream;
private InputStream inputStream;
/* the current buffer of compressed bytes */
private final byte[] compressedBytes = new byte[LZFChunk.MAX_CHUNK_LEN];
@ -87,6 +87,12 @@ public class LZFInputStream extends InputStream {
inputStream.close();
}
public void reset(InputStream is) {
this.inputStream = is;
bufferLength = 0;
bufferPosition = 0;
}
/**
* Fill the uncompressed bytes buffer by reading the underlying inputStream.
*

View File

@ -21,12 +21,13 @@ package org.elasticsearch.index.mapper;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.FieldSelector;
import org.apache.lucene.document.Fieldable;
import org.elasticsearch.common.util.concurrent.ThreadSafe;
/**
* A mapper that maps the actual source of a generated document.
*
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
@ThreadSafe
public interface SourceFieldMapper extends FieldMapper<byte[]>, InternalMapper {
@ -38,6 +39,16 @@ public interface SourceFieldMapper extends FieldMapper<byte[]>, InternalMapper {
*/
boolean enabled();
/**
* Is the source field compressed or not?
*/
boolean compressed();
/**
* Returns the native source value, if its compressed, then the compressed value is returned.
*/
byte[] nativeValue(Fieldable field);
byte[] value(Document document);
/**

View File

@ -224,6 +224,8 @@ public class XContentDocumentMapperParser extends AbstractIndexComponent impleme
Object fieldNode = entry.getValue();
if (fieldName.equals("enabled")) {
builder.enabled(nodeBooleanValue(fieldNode));
} else if (fieldName.equals("compress") && fieldNode != null) {
builder.compress(nodeBooleanValue(fieldNode));
}
}
return builder;

View File

@ -20,6 +20,9 @@
package org.elasticsearch.index.mapper.xcontent;
import org.apache.lucene.document.*;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.compress.lzf.LZFEncoder;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.xcontent.builder.XContentBuilder;
import org.elasticsearch.index.mapper.MergeMappingException;
@ -47,6 +50,8 @@ public class XContentSourceFieldMapper extends XContentFieldMapper<byte[]> imple
private boolean enabled = Defaults.ENABLED;
private Boolean compress = null;
public Builder() {
super(Defaults.NAME);
}
@ -56,23 +61,31 @@ public class XContentSourceFieldMapper extends XContentFieldMapper<byte[]> imple
return this;
}
public Builder compress(boolean compress) {
this.compress = compress;
return this;
}
@Override public XContentSourceFieldMapper build(BuilderContext context) {
return new XContentSourceFieldMapper(name, enabled);
return new XContentSourceFieldMapper(name, enabled, compress);
}
}
private final boolean enabled;
private Boolean compress;
private final SourceFieldSelector fieldSelector;
protected XContentSourceFieldMapper() {
this(Defaults.NAME, Defaults.ENABLED);
this(Defaults.NAME, Defaults.ENABLED, null);
}
protected XContentSourceFieldMapper(String name, boolean enabled) {
protected XContentSourceFieldMapper(String name, boolean enabled, Boolean compress) {
super(new Names(name, name, name, name), Defaults.INDEX, Defaults.STORE, Defaults.TERM_VECTOR, Defaults.BOOST,
Defaults.OMIT_NORMS, Defaults.OMIT_TERM_FREQ_AND_POSITIONS, Lucene.KEYWORD_ANALYZER, Lucene.KEYWORD_ANALYZER);
this.enabled = enabled;
this.compress = compress;
this.fieldSelector = new SourceFieldSelector(names.indexName());
}
@ -80,6 +93,10 @@ public class XContentSourceFieldMapper extends XContentFieldMapper<byte[]> imple
return this.enabled;
}
@Override public boolean compressed() {
return compress != null && compress;
}
public FieldSelector fieldSelector() {
return this.fieldSelector;
}
@ -88,7 +105,11 @@ public class XContentSourceFieldMapper extends XContentFieldMapper<byte[]> imple
if (!enabled) {
return null;
}
return new Field(names.indexName(), context.source(), store);
byte[] data = context.source();
if (compress != null && compress) {
data = LZFEncoder.encodeWithCache(data, data.length);
}
return new Field(names.indexName(), data, store);
}
@Override public byte[] value(Document document) {
@ -96,10 +117,25 @@ public class XContentSourceFieldMapper extends XContentFieldMapper<byte[]> imple
return field == null ? null : value(field);
}
@Override public byte[] value(Fieldable field) {
@Override public byte[] nativeValue(Fieldable field) {
return field.getBinaryValue();
}
@Override public byte[] value(Fieldable field) {
byte[] value = field.getBinaryValue();
if (value == null) {
return value;
}
if (LZFDecoder.isCompressed(value)) {
try {
return LZFDecoder.decode(value);
} catch (IOException e) {
throw new ElasticSearchParseException("failed to decompress source", e);
}
}
return value;
}
@Override public byte[] valueFromString(String value) {
return null;
}
@ -136,10 +172,18 @@ public class XContentSourceFieldMapper extends XContentFieldMapper<byte[]> imple
builder.startObject(contentType());
builder.field("name", name());
builder.field("enabled", enabled);
if (compress != null) {
builder.field("compress", compress);
}
builder.endObject();
}
@Override public void merge(XContentMapper mergeWith, MergeContext mergeContext) throws MergeMappingException {
// do nothing here, no merging, but also no exception
XContentSourceFieldMapper sourceMergeWith = (XContentSourceFieldMapper) mergeWith;
if (!mergeContext.mergeFlags().simulate()) {
if (sourceMergeWith.compress != null) {
this.compress = sourceMergeWith.compress;
}
}
}
}

View File

@ -168,7 +168,7 @@ public class FetchPhase implements SearchPhase {
private byte[] extractSource(Document doc, DocumentMapper documentMapper) {
Fieldable sourceField = doc.getFieldable(SourceFieldMapper.NAME);
if (sourceField != null) {
return documentMapper.sourceMapper().value(sourceField);
return documentMapper.sourceMapper().nativeValue(sourceField);
}
return null;
}

View File

@ -23,6 +23,7 @@ import org.apache.lucene.search.Explanation;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.common.Unicode;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.compress.lzf.LZFDecoder;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.trove.TIntObjectHashMap;
@ -126,7 +127,17 @@ public class InternalSearchHit implements SearchHit {
}
@Override public byte[] source() {
return source;
if (source == null) {
return null;
}
if (LZFDecoder.isCompressed(source)) {
try {
this.source = LZFDecoder.decode(source);
} catch (IOException e) {
throw new ElasticSearchParseException("failed to decompress source", e);
}
}
return this.source;
}
@Override public boolean isSourceEmpty() {
@ -141,7 +152,7 @@ public class InternalSearchHit implements SearchHit {
if (source == null) {
return null;
}
return Unicode.fromBytes(source);
return Unicode.fromBytes(source());
}
@SuppressWarnings({"unchecked"})
@ -152,6 +163,7 @@ public class InternalSearchHit implements SearchHit {
if (sourceAsMap != null) {
return sourceAsMap;
}
byte[] source = source();
XContentParser parser = null;
try {
parser = XContentFactory.xContent(source).createParser(source);

View File

@ -0,0 +1,118 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.search.compress;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.builder.XContentBuilder;
import org.elasticsearch.index.query.xcontent.QueryBuilders;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.IOException;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
/**
* @author kimchy (shay.banon)
*/
public class SearchSourceCompressTests extends AbstractNodesTests {
private Client client;
@BeforeClass public void createNodes() throws Exception {
startNode("node1");
startNode("node2");
client = getClient();
}
@AfterClass public void closeNodes() {
client.close();
closeAllNodes();
}
protected Client getClient() {
return client("node1");
}
@Test public void testSourceFieldCompressed() throws IOException {
verifySource(true);
}
@Test public void testSourceFieldPlainExplciit() throws IOException {
verifySource(false);
}
@Test public void testSourceFieldPlain() throws IOException {
verifySource(null);
}
private void verifySource(Boolean compress) throws IOException {
try {
client.admin().indices().prepareDelete("test").execute().actionGet();
} catch (Exception e) {
// ignore
}
client.admin().indices().prepareCreate("test").execute().actionGet();
client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
String mapping = XContentFactory.contentTextBuilder(XContentType.JSON).startObject().startObject("type1")
.startObject("_source").field("compress", compress).endObject()
.endObject().endObject().string();
client.admin().indices().preparePutMapping().setType("type1").setSource(mapping).execute().actionGet();
for (int i = 1; i < 100; i++) {
client.prepareIndex("test", "type1", Integer.toString(i)).setSource(buildSource(i)).execute().actionGet();
}
client.prepareIndex("test", "type1", Integer.toString(10000)).setSource(buildSource(10000)).execute().actionGet();
client.admin().indices().prepareRefresh().execute().actionGet();
for (int i = 1; i < 100; i++) {
GetResponse getResponse = client.prepareGet("test", "type1", Integer.toString(i)).execute().actionGet();
assertThat(getResponse.source(), equalTo(buildSource(i).copiedBytes()));
}
GetResponse getResponse = client.prepareGet("test", "type1", Integer.toString(10000)).execute().actionGet();
assertThat(getResponse.source(), equalTo(buildSource(10000).copiedBytes()));
for (int i = 1; i < 100; i++) {
SearchResponse searchResponse = client.prepareSearch().setQuery(QueryBuilders.termQuery("_id", Integer.toString(i))).execute().actionGet();
assertThat(searchResponse.hits().getTotalHits(), equalTo(1l));
assertThat(searchResponse.hits().getAt(0).source(), equalTo(buildSource(i).copiedBytes()));
}
}
private XContentBuilder buildSource(int count) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
StringBuilder sb = new StringBuilder();
for (int j = 0; j < count; j++) {
sb.append("value").append(j).append(' ');
}
builder.field("field", sb.toString());
return builder.endObject();
}
}