Stored Fields Compression, closes #2037.

Compressing the stored fields file (the .fdt file) directly allows to have better compression on the size of the index, specifically when indexing (and storing) small documents. The compression will be considerably more effective compared to compressing each doc on its own (when setting compress on the _source mapper). The downside is that more data needs to be uncompressed when loading documents.

The settings to control it is `index.store.compress.stored_fields` set to `true` (it defaults to `false`), and can be enabled dynamically using the update settings API. This allows to enabled compression at a later stage (i.e. old time based indices), and then optimize the index to make sure it gets compressed.
This commit is contained in:
Shay Banon 2012-06-20 05:31:34 +02:00
parent fbf4c70af9
commit b009c9c652
19 changed files with 1476 additions and 77 deletions

View File

@ -0,0 +1,214 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress;
import org.apache.lucene.store.IndexInput;
import java.io.EOFException;
import java.io.IOException;
/**
*/
public abstract class CompressedIndexInput extends IndexInput {
private IndexInput in;
private int version;
private long uncompressedLength;
private long[] offsets;
private boolean closed;
protected byte[] uncompressed;
private int position = 0;
private int valid = 0;
private long headerLength;
private int currentOffsetIdx;
private long currentOffset;
private long currentOffsetFilePointer;
private long metaDataPosition;
public CompressedIndexInput(IndexInput in) throws IOException {
super("compressed(" + in.toString() + ")");
this.in = in;
readHeader(in);
this.version = in.readInt();
metaDataPosition = in.readLong();
headerLength = in.getFilePointer();
in.seek(metaDataPosition);
this.uncompressedLength = in.readVLong();
int size = in.readVInt();
offsets = new long[size];
for (int i = 0; i < offsets.length; i++) {
offsets[i] = in.readVLong();
}
this.currentOffsetIdx = -1;
this.currentOffset = 0;
this.currentOffsetFilePointer = 0;
in.seek(headerLength);
}
/**
* Method is overridden to report number of bytes that can now be read
* from decoded data buffer, without reading bytes from the underlying
* stream.
* Never throws an exception; returns number of bytes available without
* further reads from underlying source; -1 if stream has been closed, or
* 0 if an actual read (and possible blocking) is needed to find out.
*/
public int available() throws IOException {
// if closed, return -1;
if (closed) {
return -1;
}
int left = (valid - position);
return (left <= 0) ? 0 : left;
}
@Override
public byte readByte() throws IOException {
if (!readyBuffer()) {
throw new EOFException();
}
return uncompressed[position++];
}
public int read(byte[] buffer, int offset, int length, boolean fullRead) throws IOException {
if (length < 1) {
return 0;
}
if (!readyBuffer()) {
return -1;
}
// First let's read however much data we happen to have...
int chunkLength = Math.min(valid - position, length);
System.arraycopy(uncompressed, position, buffer, offset, chunkLength);
position += chunkLength;
if (chunkLength == length || !fullRead) {
return chunkLength;
}
// Need more data, then
int totalRead = chunkLength;
do {
offset += chunkLength;
if (!readyBuffer()) {
break;
}
chunkLength = Math.min(valid - position, (length - totalRead));
System.arraycopy(uncompressed, position, buffer, offset, chunkLength);
position += chunkLength;
totalRead += chunkLength;
} while (totalRead < length);
return totalRead;
}
@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
int result = read(b, offset, len, true /* we want to have full reads, thats the contract... */);
if (result < len) {
throw new EOFException();
}
}
@Override
public long getFilePointer() {
return currentOffsetFilePointer + position;
}
@Override
public void seek(long pos) throws IOException {
int idx = (int) (pos / uncompressed.length);
if (idx >= offsets.length) {
// set the next "readyBuffer" to EOF
currentOffsetIdx = idx;
position = 0;
valid = 0;
return;
}
// TODO: optimize so we won't have to readyBuffer on seek, can keep the position around, and set it on readyBuffer in this case
long pointer = offsets[idx];
if (pointer != currentOffset) {
in.seek(pointer);
position = 0;
valid = 0;
currentOffsetIdx = idx - 1; // we are going to increase it in readyBuffer...
readyBuffer();
}
position = (int) (pos % uncompressed.length);
}
@Override
public long length() {
return uncompressedLength;
}
@Override
public void close() throws IOException {
position = valid = 0;
if (!closed) {
closed = true;
doClose();
in.close();
}
}
protected abstract void doClose() throws IOException;
protected boolean readyBuffer() throws IOException {
if (position < valid) {
return true;
}
if (closed) {
return false;
}
// we reached the end...
if (currentOffsetIdx + 1 >= offsets.length) {
return false;
}
valid = uncompress(in, uncompressed);
if (valid < 0) {
return false;
}
currentOffsetIdx++;
currentOffset = offsets[currentOffsetIdx];
currentOffsetFilePointer = currentOffset - headerLength;
position = 0;
return (position < valid);
}
protected abstract void readHeader(IndexInput in) throws IOException;
/**
* Uncompress the data into the out array, returning the size uncompressed
*/
protected abstract int uncompress(IndexInput in, byte[] out) throws IOException;
@Override
public Object clone() {
CompressedIndexInput cloned = (CompressedIndexInput) super.clone();
cloned.position = 0;
cloned.valid = 0;
cloned.in = (IndexInput) cloned.in.clone();
return cloned;
}
}

View File

@ -0,0 +1,216 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress;
import gnu.trove.iterator.TLongIterator;
import gnu.trove.list.array.TLongArrayList;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.IndexOutput;
import java.io.IOException;
/**
*/
public abstract class CompressedIndexOutput extends IndexOutput {
final IndexOutput out;
protected byte[] uncompressed;
private int position = 0;
private long uncompressedPosition;
private boolean closed;
private final long metaDataPointer;
private TLongArrayList offsets = new TLongArrayList();
public CompressedIndexOutput(IndexOutput out) throws IOException {
this.out = out;
writeHeader(out);
out.writeInt(0); // version
metaDataPointer = out.getFilePointer();
out.writeLong(-1); // the pointer to the end of the file metadata
}
public IndexOutput underlying() {
return this.out;
}
@Override
public void writeByte(byte b) throws IOException {
if (position >= uncompressed.length) {
flushBuffer();
}
uncompressedPosition++;
uncompressed[position++] = b;
}
@Override
public void writeBytes(byte[] input, int offset, int length) throws IOException {
// ES, check if length is 0, and don't write in this case
if (length == 0) {
return;
}
final int BUFFER_LEN = uncompressed.length;
// simple case first: buffering only (for trivially short writes)
int free = BUFFER_LEN - position;
if (free >= length) {
System.arraycopy(input, offset, uncompressed, position, length);
position += length;
uncompressedPosition += length;
return;
}
// fill partial input as much as possible and flush
if (position > 0) {
System.arraycopy(input, offset, uncompressed, position, free);
position += free;
uncompressedPosition += free;
flushBuffer();
offset += free;
length -= free;
}
// then write intermediate full block, if any, without copying:
while (length >= BUFFER_LEN) {
offsets.add(out.getFilePointer());
compress(input, offset, BUFFER_LEN, out);
offset += BUFFER_LEN;
length -= BUFFER_LEN;
uncompressedPosition += BUFFER_LEN;
}
// and finally, copy leftovers in input, if any
if (length > 0) {
System.arraycopy(input, offset, uncompressed, 0, length);
}
position = length;
uncompressedPosition += length;
}
@Override
public void copyBytes(DataInput input, long length) throws IOException {
final int BUFFER_LEN = uncompressed.length;
// simple case first: buffering only (for trivially short writes)
int free = BUFFER_LEN - position;
if (free >= length) {
input.readBytes(uncompressed, position, (int) length, false);
position += length;
uncompressedPosition += length;
return;
}
// fill partial input as much as possible and flush
if (position > 0) {
input.readBytes(uncompressed, position, free, false);
position += free;
uncompressedPosition += free;
flushBuffer();
length -= free;
}
// then write intermediate full block, if any, without copying:
// Note, if we supported flushing buffers not on "chunkSize", then
// we could have flushed up to the rest of non compressed data in the input
// and then copy compressed segments. This means though that we need to
// store the compressed sizes of each segment on top of the offsets, and
// CompressedIndexInput#seek would be more costly, since it can't do (pos / chunk)
// to get the index...
while (length >= BUFFER_LEN) {
offsets.add(out.getFilePointer());
input.readBytes(uncompressed, 0, BUFFER_LEN);
compress(uncompressed, 0, BUFFER_LEN, out);
length -= BUFFER_LEN;
uncompressedPosition += BUFFER_LEN;
}
// and finally, copy leftovers in input, if any
if (length > 0) {
input.readBytes(uncompressed, 0, (int) length, false);
}
position = (int) length;
uncompressedPosition += length;
}
@Override
public void flush() throws IOException {
// ignore flush, we always want to flush on actual block size
//flushBuffer();
out.flush();
}
@Override
public void close() throws IOException {
if (!closed) {
flushBuffer();
// write metadata, and update pointer
long metaDataPointerValue = out.getFilePointer();
// length uncompressed
out.writeVLong(uncompressedPosition);
// compressed pointers
out.writeVInt(offsets.size());
for (TLongIterator it = offsets.iterator(); it.hasNext(); ) {
out.writeVLong(it.next());
}
out.seek(metaDataPointer);
out.writeLong(metaDataPointerValue);
closed = true;
doClose();
out.close();
}
}
protected abstract void doClose() throws IOException;
@Override
public long getFilePointer() {
return uncompressedPosition;
}
@Override
public void seek(long pos) throws IOException {
throw new IOException("seek not supported on compressed output");
}
@Override
public long length() throws IOException {
return uncompressedPosition;
}
private void flushBuffer() throws IOException {
if (position > 0) {
offsets.add(out.getFilePointer());
compress(uncompressed, 0, position, out);
position = 0;
}
}
protected abstract void writeHeader(IndexOutput out) throws IOException;
/**
* Compresses the data into the output
*/
protected abstract void compress(byte[] data, int offset, int len, IndexOutput out) throws IOException;
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.common.compress; package org.elasticsearch.common.compress;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffer;
@ -35,6 +37,8 @@ public interface Compressor {
boolean isCompressed(ChannelBuffer buffer); boolean isCompressed(ChannelBuffer buffer);
boolean isCompressed(IndexInput in) throws IOException;
/** /**
* Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(byte[], int, int)}. * Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(byte[], int, int)}.
*/ */
@ -48,4 +52,8 @@ public interface Compressor {
CompressedStreamInput streamInput(StreamInput in) throws IOException; CompressedStreamInput streamInput(StreamInput in) throws IOException;
CompressedStreamOutput streamOutput(StreamOutput out) throws IOException; CompressedStreamOutput streamOutput(StreamOutput out) throws IOException;
CompressedIndexInput indexInput(IndexInput in) throws IOException;
CompressedIndexOutput indexOutput(IndexOutput out) throws IOException;
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.common.compress; package org.elasticsearch.common.compress;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
@ -90,6 +91,16 @@ public class CompressorFactory {
return null; return null;
} }
@Nullable
public static Compressor compressor(IndexInput in) throws IOException {
for (Compressor compressor : compressors) {
if (compressor.isCompressed(in)) {
return compressor;
}
}
return null;
}
public static Compressor compressor(String type) { public static Compressor compressor(String type) {
return compressorsByType.get(type); return compressorsByType.get(type);
} }

View File

@ -0,0 +1,74 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.lzf;
import com.ning.compress.lzf.ChunkDecoder;
import com.ning.compress.lzf.LZFChunk;
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.common.compress.CompressedIndexInput;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import java.io.IOException;
import java.util.Arrays;
/**
*/
public class LZFCompressedIndexInput extends CompressedIndexInput {
private final ChunkDecoder decoder;
// scratch area buffer
private byte[] inputBuffer;
public LZFCompressedIndexInput(IndexInput in, ChunkDecoder decoder) throws IOException {
super(in);
this.decoder = decoder;
this.uncompressed = new byte[LZFChunk.MAX_CHUNK_LEN];
this.inputBuffer = new byte[LZFChunk.MAX_CHUNK_LEN];
}
@Override
protected void readHeader(IndexInput in) throws IOException {
byte[] header = new byte[LZFCompressor.LUCENE_HEADER.length];
in.readBytes(header, 0, header.length, false);
if (!Arrays.equals(header, LZFCompressor.LUCENE_HEADER)) {
throw new IOException("wrong lzf compressed header [" + Arrays.toString(header) + "]");
}
}
@Override
protected int uncompress(IndexInput in, byte[] out) throws IOException {
return decoder.decodeChunk(new InputStreamIndexInput(in, Long.MAX_VALUE), inputBuffer, out);
}
@Override
protected void doClose() throws IOException {
// nothing to do here...
}
@Override
public Object clone() {
LZFCompressedIndexInput cloned = (LZFCompressedIndexInput) super.clone();
cloned.uncompressed = new byte[LZFChunk.MAX_CHUNK_LEN];
System.arraycopy(uncompressed, 0, cloned.uncompressed, 0, uncompressed.length);
cloned.inputBuffer = new byte[LZFChunk.MAX_CHUNK_LEN];
return cloned;
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.lzf;
import com.ning.compress.BufferRecycler;
import com.ning.compress.lzf.ChunkEncoder;
import com.ning.compress.lzf.LZFChunk;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.compress.CompressedIndexOutput;
import org.elasticsearch.common.lucene.store.OutputStreamIndexOutput;
import java.io.IOException;
/**
*/
public class LZFCompressedIndexOutput extends CompressedIndexOutput {
private final BufferRecycler recycler;
private final ChunkEncoder encoder;
public LZFCompressedIndexOutput(IndexOutput out) throws IOException {
super(out);
this.recycler = BufferRecycler.instance();
this.uncompressed = this.recycler.allocOutputBuffer(LZFChunk.MAX_CHUNK_LEN);
this.encoder = new ChunkEncoder(LZFChunk.MAX_CHUNK_LEN);
}
@Override
protected void writeHeader(IndexOutput out) throws IOException {
out.writeBytes(LZFCompressor.LUCENE_HEADER, LZFCompressor.LUCENE_HEADER.length);
}
@Override
protected void compress(byte[] data, int offset, int len, IndexOutput out) throws IOException {
encoder.encodeAndWriteChunk(data, offset, len, new OutputStreamIndexOutput(out));
}
@Override
protected void doClose() throws IOException {
byte[] buf = uncompressed;
if (buf != null) {
uncompressed = null;
recycler.releaseOutputBuffer(buf);
}
encoder.close();
}
}

View File

@ -34,7 +34,7 @@ public class LZFCompressedStreamInput extends CompressedStreamInput {
private final BufferRecycler recycler; private final BufferRecycler recycler;
private final com.ning.compress.lzf.ChunkDecoder decoder; private final ChunkDecoder decoder;
// scratch area buffer // scratch area buffer
private byte[] inputBuffer; private byte[] inputBuffer;

View File

@ -32,7 +32,7 @@ import java.io.IOException;
public class LZFCompressedStreamOutput extends CompressedStreamOutput { public class LZFCompressedStreamOutput extends CompressedStreamOutput {
private final BufferRecycler recycler; private final BufferRecycler recycler;
private final com.ning.compress.lzf.ChunkEncoder encoder; private final ChunkEncoder encoder;
public LZFCompressedStreamOutput(StreamOutput out) throws IOException { public LZFCompressedStreamOutput(StreamOutput out) throws IOException {
super(out); super(out);

View File

@ -23,9 +23,9 @@ import com.ning.compress.lzf.ChunkDecoder;
import com.ning.compress.lzf.LZFChunk; import com.ning.compress.lzf.LZFChunk;
import com.ning.compress.lzf.LZFEncoder; import com.ning.compress.lzf.LZFEncoder;
import com.ning.compress.lzf.util.ChunkDecoderFactory; import com.ning.compress.lzf.util.ChunkDecoderFactory;
import org.elasticsearch.common.compress.CompressedStreamInput; import org.apache.lucene.store.IndexInput;
import org.elasticsearch.common.compress.CompressedStreamOutput; import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.compress.Compressor; import org.elasticsearch.common.compress.*;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
@ -37,6 +37,8 @@ import java.io.IOException;
*/ */
public class LZFCompressor implements Compressor { public class LZFCompressor implements Compressor {
static final byte[] LUCENE_HEADER = {'L', 'Z', 'F', 0};
public static final String TYPE = "lzf"; public static final String TYPE = "lzf";
private ChunkDecoder decoder; private ChunkDecoder decoder;
@ -68,6 +70,23 @@ public class LZFCompressor implements Compressor {
(buffer.getByte(offset + 2) == LZFChunk.BLOCK_TYPE_COMPRESSED || buffer.getByte(offset + 2) == LZFChunk.BLOCK_TYPE_NON_COMPRESSED); (buffer.getByte(offset + 2) == LZFChunk.BLOCK_TYPE_COMPRESSED || buffer.getByte(offset + 2) == LZFChunk.BLOCK_TYPE_NON_COMPRESSED);
} }
@Override
public boolean isCompressed(IndexInput in) throws IOException {
long currentPointer = in.getFilePointer();
// since we have some metdata before the first compressed header, we check on our specific header
if (in.length() - currentPointer < (LUCENE_HEADER.length)) {
return false;
}
for (int i = 0; i < LUCENE_HEADER.length; i++) {
if (in.readByte() != LUCENE_HEADER[i]) {
in.seek(currentPointer);
return false;
}
}
in.seek(currentPointer);
return true;
}
@Override @Override
public byte[] uncompress(byte[] data, int offset, int length) throws IOException { public byte[] uncompress(byte[] data, int offset, int length) throws IOException {
return decoder.decode(data, offset, length); return decoder.decode(data, offset, length);
@ -88,4 +107,13 @@ public class LZFCompressor implements Compressor {
return new LZFCompressedStreamOutput(out); return new LZFCompressedStreamOutput(out);
} }
@Override
public CompressedIndexInput indexInput(IndexInput in) throws IOException {
return new LZFCompressedIndexInput(in, decoder);
}
@Override
public CompressedIndexOutput indexOutput(IndexOutput out) throws IOException {
return new LZFCompressedIndexOutput(out);
}
} }

View File

@ -0,0 +1,103 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.lucene.store;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.OpenBufferedIndexOutput;
import java.io.IOException;
import java.util.zip.Checksum;
/**
*/
public class BufferedChecksumIndexOutput extends OpenBufferedIndexOutput {
private final IndexOutput out;
private final Checksum digest;
public BufferedChecksumIndexOutput(IndexOutput out, Checksum digest) {
// we add 8 to be bigger than the default BufferIndexOutput buffer size so any flush will go directly
// to the output without being copied over to the delegate buffer
super(OpenBufferedIndexOutput.DEFAULT_BUFFER_SIZE + 64);
this.out = out;
this.digest = digest;
}
public Checksum digest() {
return digest;
}
public IndexOutput underlying() {
return this.out;
}
// don't override it, base class method simple reads from input and writes to this output
// @Override public void copyBytes(IndexInput input, long numBytes) throws IOException {
// delegate.copyBytes(input, numBytes);
// }
@Override
public void close() throws IOException {
super.close();
out.close();
}
@Override
protected void flushBuffer(byte[] b, int offset, int len) throws IOException {
out.writeBytes(b, offset, len);
digest.update(b, offset, len);
}
// don't override it, base class method simple reads from input and writes to this output
// @Override public void copyBytes(IndexInput input, long numBytes) throws IOException {
// delegate.copyBytes(input, numBytes);
// }
@Override
public void flush() throws IOException {
super.flush();
out.flush();
}
@Override
public void seek(long pos) throws IOException {
// seek might be called on files, which means that the checksum is not file checksum
// but a checksum of the bytes written to this stream, which is the same for each
// type of file in lucene
super.seek(pos);
out.seek(pos);
}
@Override
public long length() throws IOException {
return out.length();
}
@Override
public void setLength(long length) throws IOException {
out.setLength(length);
}
@Override
public String toString() {
return out.toString();
}
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.lucene.store;
import org.apache.lucene.store.IndexOutput;
import java.io.IOException;
import java.util.zip.Checksum;
/**
*/
public class ChecksumIndexOutput extends IndexOutput {
private final IndexOutput out;
private final Checksum digest;
public ChecksumIndexOutput(IndexOutput out, Checksum digest) {
this.out = out;
this.digest = digest;
}
public Checksum digest() {
return digest;
}
@Override
public void writeByte(byte b) throws IOException {
out.writeByte(b);
digest.update(b);
}
@Override
public void setLength(long length) throws IOException {
out.setLength(length);
}
// don't override copyBytes, since we need to read it and compute it
// @Override
// public void copyBytes(DataInput input, long numBytes) throws IOException {
// super.copyBytes(input, numBytes);
// }
@Override
public String toString() {
return out.toString();
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
out.writeBytes(b, offset, length);
digest.update(b, offset, length);
}
@Override
public void flush() throws IOException {
out.flush();
}
@Override
public void close() throws IOException {
out.close();
}
@Override
public long getFilePointer() {
return out.getFilePointer();
}
@Override
public void seek(long pos) throws IOException {
out.seek(pos);
}
@Override
public long length() throws IOException {
return out.length();
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.lucene.store;
import org.apache.lucene.store.IndexOutput;
import java.io.IOException;
import java.io.OutputStream;
/**
*/
public class OutputStreamIndexOutput extends OutputStream {
private final IndexOutput out;
public OutputStreamIndexOutput(IndexOutput out) {
this.out = out;
}
@Override
public void write(int b) throws IOException {
out.writeByte((byte) b);
}
@Override
public void write(byte[] b) throws IOException {
out.writeBytes(b, b.length);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.writeBytes(b, off, len);
}
@Override
public void flush() throws IOException {
out.flush();
}
@Override
public void close() throws IOException {
out.close();
}
}

View File

@ -637,7 +637,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
try { try {
// we create an output with no checksum, this is because the pure binary data of the file is not // we create an output with no checksum, this is because the pure binary data of the file is not
// the checksum (because of seek). We will create the checksum file once copying is done // the checksum (because of seek). We will create the checksum file once copying is done
indexOutput = store.createOutputWithNoChecksum(fileInfo.physicalName()); indexOutput = store.createOutputRaw(fileInfo.physicalName());
} catch (IOException e) { } catch (IOException e) {
failures.add(e); failures.add(e);
latch.countDown(); latch.countDown();
@ -752,7 +752,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo
IndexInput indexInput = null; IndexInput indexInput = null;
try { try {
indexInput = dir.openInput(fileInfo.physicalName()); indexInput = indexShard.store().openInputRaw(fileInfo.physicalName());
indexInput.seek(partNumber * chunkBytes); indexInput.seek(partNumber * chunkBytes);
InputStreamIndexInput is = new ThreadSafeInputStreamIndexInput(indexInput, chunkBytes); InputStreamIndexInput is = new ThreadSafeInputStreamIndexInput(indexInput, chunkBytes);

View File

@ -24,13 +24,20 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import jsr166y.ThreadLocalRandom; import jsr166y.ThreadLocalRandom;
import org.apache.lucene.store.*; import org.apache.lucene.store.*;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.compress.CompressedIndexOutput;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.Directories; import org.elasticsearch.common.lucene.Directories;
import org.elasticsearch.common.lucene.store.BufferedChecksumIndexOutput;
import org.elasticsearch.common.lucene.store.ChecksumIndexOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.support.ForceSyncDirectory; import org.elasticsearch.index.store.support.ForceSyncDirectory;
@ -43,12 +50,29 @@ import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.zip.Adler32; import java.util.zip.Adler32;
import java.util.zip.Checksum;
/** /**
*/ */
public class Store extends AbstractIndexShardComponent { public class Store extends AbstractIndexShardComponent {
static {
IndexMetaData.addDynamicSettings(
"index.store.compress.stored_fields"
);
}
class ApplySettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
boolean compressedStoredFields = settings.getAsBoolean("index.store.compress.stored_fields", Store.this.compressedStoredFields);
if (compressedStoredFields != Store.this.compressedStoredFields) {
logger.info("updating [compress.stored_fields] from [{}] to [{}]", Store.this.compressedStoredFields, compressedStoredFields);
Store.this.compressedStoredFields = compressedStoredFields;
}
}
}
static final String CHECKSUMS_PREFIX = "_checksums-"; static final String CHECKSUMS_PREFIX = "_checksums-";
public static final boolean isChecksum(String name) { public static final boolean isChecksum(String name) {
@ -57,6 +81,8 @@ public class Store extends AbstractIndexShardComponent {
private final IndexStore indexStore; private final IndexStore indexStore;
private final IndexSettingsService indexSettingsService;
private final DirectoryService directoryService; private final DirectoryService directoryService;
private final StoreDirectory directory; private final StoreDirectory directory;
@ -69,13 +95,22 @@ public class Store extends AbstractIndexShardComponent {
private final boolean sync; private final boolean sync;
private volatile boolean compressedStoredFields;
private final ApplySettings applySettings = new ApplySettings();
@Inject @Inject
public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, DirectoryService directoryService) throws IOException { public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, IndexSettingsService indexSettingsService, DirectoryService directoryService) throws IOException {
super(shardId, indexSettings); super(shardId, indexSettings);
this.indexStore = indexStore; this.indexStore = indexStore;
this.indexSettingsService = indexSettingsService;
this.directoryService = directoryService; this.directoryService = directoryService;
this.sync = componentSettings.getAsBoolean("sync", true); // TODO we don't really need to fsync when using shared gateway... this.sync = componentSettings.getAsBoolean("sync", true); // TODO we don't really need to fsync when using shared gateway...
this.directory = new StoreDirectory(directoryService.build()); this.directory = new StoreDirectory(directoryService.build());
this.compressedStoredFields = componentSettings.getAsBoolean("compress.stored_fields", false);
indexSettingsService.addListener(applySettings);
} }
public Directory directory() { public Directory directory() {
@ -218,7 +253,7 @@ public class Store extends AbstractIndexShardComponent {
checksums.put(metaData.name(), metaData.checksum()); checksums.put(metaData.name(), metaData.checksum());
} }
} }
IndexOutput output = directory.createOutput(checksumName, false); IndexOutput output = directory.createOutput(checksumName, true);
output.writeInt(0); // version output.writeInt(0); // version
output.writeStringStringMap(checksums); output.writeStringStringMap(checksums);
output.close(); output.close();
@ -242,11 +277,26 @@ public class Store extends AbstractIndexShardComponent {
} }
public void close() throws IOException { public void close() throws IOException {
indexSettingsService.removeListener(applySettings);
directory.close(); directory.close();
} }
public IndexOutput createOutputWithNoChecksum(String name) throws IOException { /**
return directory.createOutput(name, false); * Creates a raw output, no checksum is computed, and no compression if enabled.
*/
public IndexOutput createOutputRaw(String name) throws IOException {
return directory.createOutput(name, true);
}
/**
* Opened an index input in raw form, no decompression for example.
*/
public IndexInput openInputRaw(String name) throws IOException {
StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData == null) {
throw new FileNotFoundException(name);
}
return metaData.directory().openInput(name);
} }
public void writeChecksum(String name, String checksum) throws IOException { public void writeChecksum(String name, String checksum) throws IOException {
@ -387,10 +437,10 @@ public class Store extends AbstractIndexShardComponent {
@Override @Override
public IndexOutput createOutput(String name) throws IOException { public IndexOutput createOutput(String name) throws IOException {
return createOutput(name, true); return createOutput(name, false);
} }
public IndexOutput createOutput(String name, boolean computeChecksum) throws IOException { public IndexOutput createOutput(String name, boolean raw) throws IOException {
Directory directory = null; Directory directory = null;
if (isChecksum(name)) { if (isChecksum(name)) {
directory = delegates[0]; directory = delegates[0];
@ -420,7 +470,26 @@ public class Store extends AbstractIndexShardComponent {
StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1, null, directory); StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1, null, directory);
filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap(); filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap();
files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]);
return new StoreIndexOutput(metaData, out, name, computeChecksum); boolean computeChecksum = !raw;
if (computeChecksum) {
// don't compute checksum for segment based files
if ("segments.gen".equals(name) || name.startsWith("segments")) {
computeChecksum = false;
}
}
if (!raw && compressedStoredFields && name.endsWith(".fdt")) {
if (computeChecksum) {
// with compression, there is no need for buffering when doing checksums
// since we have buffering on the compressed index output
out = new ChecksumIndexOutput(out, new Adler32());
}
out = CompressorFactory.defaultCompressor().indexOutput(out);
} else {
if (computeChecksum) {
out = new BufferedChecksumIndexOutput(out, new Adler32());
}
}
return new StoreIndexOutput(metaData, out, name);
} }
} }
@ -430,7 +499,30 @@ public class Store extends AbstractIndexShardComponent {
if (metaData == null) { if (metaData == null) {
throw new FileNotFoundException(name); throw new FileNotFoundException(name);
} }
return metaData.directory().openInput(name); IndexInput in = metaData.directory().openInput(name);
if (name.endsWith(".fdt")) {
Compressor compressor = CompressorFactory.compressor(in);
if (compressor != null) {
in = compressor.indexInput(in);
}
}
return in;
}
@Override
public IndexInput openInput(String name, int bufferSize) throws IOException {
StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData == null) {
throw new FileNotFoundException(name);
}
IndexInput in = metaData.directory().openInput(name, bufferSize);
if (name.endsWith(".fdt")) {
Compressor compressor = CompressorFactory.compressor(in);
if (compressor != null) {
in = compressor.indexInput(in);
}
}
return in;
} }
@Override @Override
@ -449,15 +541,6 @@ public class Store extends AbstractIndexShardComponent {
return delegates[0].makeLock(name); return delegates[0].makeLock(name);
} }
@Override
public IndexInput openInput(String name, int bufferSize) throws IOException {
StoreFileMetaData metaData = filesMetadata.get(name);
if (metaData == null) {
throw new FileNotFoundException(name);
}
return metaData.directory().openInput(name, bufferSize);
}
@Override @Override
public void clearLock(String name) throws IOException { public void clearLock(String name) throws IOException {
delegates[0].clearLock(name); delegates[0].clearLock(name);
@ -524,49 +607,32 @@ public class Store extends AbstractIndexShardComponent {
} }
} }
class StoreIndexOutput extends OpenBufferedIndexOutput { class StoreIndexOutput extends IndexOutput {
private final StoreFileMetaData metaData; private final StoreFileMetaData metaData;
private final IndexOutput delegate; private final IndexOutput out;
private final String name; private final String name;
private final Checksum digest; StoreIndexOutput(StoreFileMetaData metaData, IndexOutput delegate, String name) {
StoreIndexOutput(StoreFileMetaData metaData, IndexOutput delegate, String name, boolean computeChecksum) {
// we add 8 to be bigger than the default BufferIndexOutput buffer size so any flush will go directly
// to the output without being copied over to the delegate buffer
super(OpenBufferedIndexOutput.DEFAULT_BUFFER_SIZE + 64);
this.metaData = metaData; this.metaData = metaData;
this.delegate = delegate; this.out = delegate;
this.name = name; this.name = name;
if (computeChecksum) {
if ("segments.gen".equals(name)) {
// no need to create checksum for segments.gen since its not snapshot to recovery
this.digest = null;
} else if (name.startsWith("segments")) {
// don't compute checksum for segments files, so pure Lucene can open this directory
// and since we, in any case, always recover the segments files
this.digest = null;
} else {
// this.digest = new CRC32();
// adler is faster, and we compare on length as well, should be enough to check for difference
// between files
this.digest = new Adler32();
}
} else {
this.digest = null;
}
} }
@Override @Override
public void close() throws IOException { public void close() throws IOException {
super.close(); out.close();
delegate.close();
String checksum = null; String checksum = null;
if (digest != null) { IndexOutput underlying = out;
checksum = Long.toString(digest.getValue(), Character.MAX_RADIX); if (out instanceof CompressedIndexOutput) {
underlying = ((CompressedIndexOutput) out).underlying();
}
if (underlying instanceof BufferedChecksumIndexOutput) {
checksum = Long.toString(((BufferedChecksumIndexOutput) underlying).digest().getValue(), Character.MAX_RADIX);
} else if (underlying instanceof ChecksumIndexOutput) {
checksum = Long.toString(((ChecksumIndexOutput) underlying).digest().getValue(), Character.MAX_RADIX);
} }
synchronized (mutex) { synchronized (mutex) {
StoreFileMetaData md = new StoreFileMetaData(name, metaData.directory().fileLength(name), metaData.directory().fileModified(name), checksum, metaData.directory()); StoreFileMetaData md = new StoreFileMetaData(name, metaData.directory().fileLength(name), metaData.directory().fileModified(name), checksum, metaData.directory());
@ -576,41 +642,48 @@ public class Store extends AbstractIndexShardComponent {
} }
@Override @Override
protected void flushBuffer(byte[] b, int offset, int len) throws IOException { public void copyBytes(DataInput input, long numBytes) throws IOException {
delegate.writeBytes(b, offset, len); out.copyBytes(input, numBytes);
if (digest != null) {
digest.update(b, offset, len);
}
} }
// don't override it, base class method simple reads from input and writes to this output @Override
// @Override public void copyBytes(IndexInput input, long numBytes) throws IOException { public long getFilePointer() {
// delegate.copyBytes(input, numBytes); return out.getFilePointer();
// } }
@Override
public void writeByte(byte b) throws IOException {
out.writeByte(b);
}
@Override
public void writeBytes(byte[] b, int offset, int length) throws IOException {
out.writeBytes(b, offset, length);
}
@Override @Override
public void flush() throws IOException { public void flush() throws IOException {
super.flush(); out.flush();
delegate.flush();
} }
@Override @Override
public void seek(long pos) throws IOException { public void seek(long pos) throws IOException {
// seek might be called on files, which means that the checksum is not file checksum out.seek(pos);
// but a checksum of the bytes written to this stream, which is the same for each
// type of file in lucene
super.seek(pos);
delegate.seek(pos);
} }
@Override @Override
public long length() throws IOException { public long length() throws IOException {
return delegate.length(); return out.length();
} }
@Override @Override
public void setLength(long length) throws IOException { public void setLength(long length) throws IOException {
delegate.setLength(length); out.setLength(length);
}
@Override
public String toString() {
return out.toString();
} }
} }
} }

View File

@ -134,7 +134,7 @@ public class RecoverySource extends AbstractComponent {
final int BUFFER_SIZE = (int) recoverySettings.fileChunkSize().bytes(); final int BUFFER_SIZE = (int) recoverySettings.fileChunkSize().bytes();
byte[] buf = new byte[BUFFER_SIZE]; byte[] buf = new byte[BUFFER_SIZE];
StoreFileMetaData md = shard.store().metaData(name); StoreFileMetaData md = shard.store().metaData(name);
indexInput = snapshot.getDirectory().openInput(name); indexInput = shard.store().openInputRaw(name);
long len = indexInput.length(); long len = indexInput.length();
long readCount = 0; long readCount = 0;
while (readCount < len) { while (readCount < len) {

View File

@ -520,7 +520,7 @@ public class RecoveryTarget extends AbstractComponent {
name = name + "." + onGoingRecovery.startTime; name = name + "." + onGoingRecovery.startTime;
} }
indexOutput = shard.store().createOutputWithNoChecksum(name); indexOutput = shard.store().createOutputRaw(name);
onGoingRecovery.openIndexOutputs.put(request.name(), indexOutput); onGoingRecovery.openIndexOutputs.put(request.name(), indexOutput);
} else { } else {

View File

@ -0,0 +1,128 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.compress;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.NIOFSDirectory;
import org.elasticsearch.common.compress.CompressedIndexInput;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.File;
import java.io.IOException;
/**
*/
public class LuceneCompressionBenchmark {
public static void main(String[] args) throws Exception {
final long MAX_SIZE = ByteSizeValue.parseBytesSizeValue("50mb").bytes();
final Compressor compressor = CompressorFactory.defaultCompressor();
File testFile = new File("target/test/compress/lucene");
FileSystemUtils.deleteRecursively(testFile);
testFile.mkdirs();
FSDirectory uncompressedDir = new NIOFSDirectory(new File(testFile, "uncompressed"));
IndexWriter uncompressedWriter = new IndexWriter(uncompressedDir, new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER));
FSDirectory compressedDir = new NIOFSDirectory(new File(testFile, "compressed")) {
@Override
public IndexOutput createOutput(String name) throws IOException {
if (name.endsWith(".fdt")) {
return compressor.indexOutput(super.createOutput(name));
}
return super.createOutput(name);
}
@Override
public IndexInput openInput(String name) throws IOException {
if (name.endsWith(".fdt")) {
IndexInput in = super.openInput(name);
Compressor compressor1 = CompressorFactory.compressor(in);
if (compressor1 != null) {
return compressor1.indexInput(in);
} else {
return in;
}
}
return super.openInput(name);
}
@Override
public IndexInput openInput(String name, int bufferSize) throws IOException {
if (name.endsWith(".fdt")) {
IndexInput in = super.openInput(name, bufferSize);
// in case the override called openInput(String)
if (in instanceof CompressedIndexInput) {
return in;
}
Compressor compressor1 = CompressorFactory.compressor(in);
if (compressor1 != null) {
return compressor1.indexInput(in);
} else {
return in;
}
}
return super.openInput(name, bufferSize);
}
};
IndexWriter compressedWriter = new IndexWriter(compressedDir, new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER));
System.out.println("feeding data...");
TestData testData = new TestData();
while (testData.next() && testData.getTotalSize() < MAX_SIZE) {
// json
XContentBuilder builder = XContentFactory.jsonBuilder();
testData.current(builder);
builder.close();
Document doc = new Document();
doc.add(new Field("_source", builder.underlyingBytes(), 0, builder.underlyingBytesLength()));
uncompressedWriter.addDocument(doc);
compressedWriter.addDocument(doc);
}
System.out.println("optimizing...");
uncompressedWriter.forceMerge(1);
compressedWriter.forceMerge(1);
uncompressedWriter.waitForMerges();
compressedWriter.waitForMerges();
System.out.println("done");
uncompressedDir.close();
compressedWriter.close();
compressedDir.close();
uncompressedDir.close();
}
}

View File

@ -0,0 +1,322 @@
package org.elasticsearch.test.unit.common.compress;
import jsr166y.ThreadLocalRandom;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.MapFieldSelector;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RAMDirectory;
import org.elasticsearch.common.RandomStringGenerator;
import org.elasticsearch.common.compress.CompressedIndexInput;
import org.elasticsearch.common.compress.CompressedIndexOutput;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.SizeValue;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.EOFException;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
/**
*/
@Test
public class CompressIndexInputOutputTests {
private Compressor compressor;
@BeforeClass
public void buildCompressor() {
this.compressor = CompressorFactory.defaultCompressor();
}
@Test
public void empty() throws Exception {
Directory dir = new RAMDirectory();
IndexOutput out = compressor.indexOutput(dir.createOutput("test"));
out.close();
IndexInput in = compressor.indexInput(dir.openInput("test"));
try {
in.readByte();
assert false;
} catch (EOFException e) {
// all is well
}
in.seek(100);
try {
in.readByte();
assert false;
} catch (EOFException e) {
// all is well
}
}
@Test
public void simple() throws Exception {
Directory dir = new RAMDirectory();
IndexOutput out = compressor.indexOutput(dir.createOutput("test"));
long pos1 = out.getFilePointer();
out.writeInt(1);
long pos2 = out.getFilePointer();
out.writeString("test1");
long pos3 = out.getFilePointer();
String largeString = RandomStringGenerator.random(0xFFFF + 5);
out.writeString(largeString);
long pos4 = out.getFilePointer();
out.writeInt(2);
long pos5 = out.getFilePointer();
out.writeString("test2");
out.close();
IndexInput in = compressor.indexInput(dir.openInput("test"));
assertThat(in.readInt(), equalTo(1));
assertThat(in.readString(), equalTo("test1"));
assertThat(in.readString(), equalTo(largeString));
assertThat(in.readInt(), equalTo(2));
assertThat(in.readString(), equalTo("test2"));
in.seek(pos3);
assertThat(in.readString(), equalTo(largeString));
in.seek(pos2);
assertThat(in.readString(), equalTo("test1"));
in.seek(pos5);
assertThat(in.readString(), equalTo("test2"));
in.seek(pos1);
assertThat(in.readInt(), equalTo(1));
in.seek(0);
byte[] full = new byte[(int) in.length()];
in.readBytes(full, 0, full.length);
in.close();
}
@Test
public void seek1Compressed() throws Exception {
seek1(true);
}
@Test
public void seek1UnCompressed() throws Exception {
seek1(false);
}
private void seek1(boolean compressed) throws Exception {
Directory dir = new RAMDirectory();
IndexOutput out = compressed ? compressor.indexOutput(dir.createOutput("test")) : dir.createOutput("test");
long pos1 = out.getFilePointer();
out.writeVInt(4);
out.writeInt(1);
long pos2 = out.getFilePointer();
out.writeVInt(8);
long posX = out.getFilePointer();
out.writeInt(2);
out.writeInt(3);
long pos3 = out.getFilePointer();
out.writeVInt(4);
out.writeInt(4);
out.close();
//IndexInput in = dir.openInput("test");
IndexInput in = compressed ? compressor.indexInput(dir.openInput("test")) : dir.openInput("test");
in.seek(pos2);
// now "skip"
int numBytes = in.readVInt();
assertThat(in.getFilePointer(), equalTo(posX));
in.seek(in.getFilePointer() + numBytes);
assertThat(in.readVInt(), equalTo(4));
assertThat(in.readInt(), equalTo(4));
}
@Test
public void copyBytes() throws Exception {
Directory dir = new RAMDirectory();
IndexOutput out = compressor.indexOutput(dir.createOutput("test"));
long pos1 = out.getFilePointer();
out.writeInt(1);
long pos2 = out.getFilePointer();
assertThat(pos2, equalTo(4l));
out.writeString("test1");
long pos3 = out.getFilePointer();
String largeString = RandomStringGenerator.random(0xFFFF + 5);
out.writeString(largeString);
long pos4 = out.getFilePointer();
out.writeInt(2);
long pos5 = out.getFilePointer();
out.writeString("test2");
assertThat(out.length(), equalTo(out.getFilePointer()));
long length = out.length();
out.close();
CompressedIndexOutput out2 = compressor.indexOutput(dir.createOutput("test2"));
out2.writeString("mergeStart");
long startMergePos = out2.getFilePointer();
CompressedIndexInput testInput = compressor.indexInput(dir.openInput("test"));
assertThat(testInput.length(), equalTo(length));
out2.copyBytes(testInput, testInput.length());
long endMergePos = out2.getFilePointer();
out2.writeString("mergeEnd");
out2.close();
IndexInput in = compressor.indexInput(dir.openInput("test2"));
assertThat(in.readString(), equalTo("mergeStart"));
assertThat(in.readInt(), equalTo(1));
assertThat(in.readString(), equalTo("test1"));
assertThat(in.readString(), equalTo(largeString));
assertThat(in.readInt(), equalTo(2));
assertThat(in.readString(), equalTo("test2"));
assertThat(in.readString(), equalTo("mergeEnd"));
in.seek(pos1);
assertThat(in.readString(), equalTo("mergeStart"));
in.seek(endMergePos);
assertThat(in.readString(), equalTo("mergeEnd"));
try {
in.readByte();
assert false;
} catch (EOFException e) {
// all is well, we reached hte end...
}
}
@Test
public void lucene() throws Exception {
final AtomicBoolean compressed = new AtomicBoolean(true);
Directory dir = new RAMDirectory() {
@Override
public IndexOutput createOutput(String name) throws IOException {
if (compressed.get() && name.endsWith(".fdt")) {
return compressor.indexOutput(super.createOutput(name));
}
return super.createOutput(name);
}
@Override
public IndexInput openInput(String name) throws IOException {
if (name.endsWith(".fdt")) {
IndexInput in = super.openInput(name);
Compressor compressor1 = CompressorFactory.compressor(in);
if (compressor1 != null) {
return compressor1.indexInput(in);
} else {
return in;
}
}
return super.openInput(name);
}
@Override
public IndexInput openInput(String name, int bufferSize) throws IOException {
if (name.endsWith(".fdt")) {
IndexInput in = super.openInput(name, bufferSize);
// in case the override called openInput(String)
if (in instanceof CompressedIndexInput) {
return in;
}
Compressor compressor1 = CompressorFactory.compressor(in);
if (compressor1 != null) {
return compressor1.indexInput(in);
} else {
return in;
}
}
return super.openInput(name, bufferSize);
}
};
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER));
writer.addDocument(createDoc(1, (int) SizeValue.parseSizeValue("100b").singles()));
writer.addDocument(createDoc(2, (int) SizeValue.parseSizeValue("5k").singles()));
writer.commit();
writer.addDocument(createDoc(3, (int) SizeValue.parseSizeValue("2k").singles()));
writer.addDocument(createDoc(4, (int) SizeValue.parseSizeValue("1k").singles()));
writer.commit();
verify(writer);
writer.forceMerge(1);
writer.waitForMerges();
verify(writer);
compressed.set(false);
writer.addDocument(createDoc(5, (int) SizeValue.parseSizeValue("2k").singles()));
writer.addDocument(createDoc(6, (int) SizeValue.parseSizeValue("1k").singles()));
verify(writer);
writer.forceMerge(1);
writer.waitForMerges();
verify(writer);
}
private void verify(IndexWriter writer) throws Exception {
IndexReader reader = IndexReader.open(writer, true);
for (int i = 0; i < reader.maxDoc(); i++) {
if (reader.isDeleted(i)) {
continue;
}
Document document = reader.document(i);
checkDoc(document);
document = reader.document(i, new MapFieldSelector("id", "field", "count"));
checkDoc(document);
}
for (int i = 0; i < 100; i++) {
int doc = ThreadLocalRandom.current().nextInt(reader.maxDoc());
if (reader.isDeleted(i)) {
continue;
}
Document document = reader.document(doc);
checkDoc(document);
document = reader.document(doc, new MapFieldSelector("id", "field", "count"));
checkDoc(document);
}
}
private void checkDoc(Document document) {
String id = document.get("id");
String field = document.get("field");
int count = 0;
int idx = 0;
while (true) {
int oldIdx = idx;
idx = field.indexOf(' ', oldIdx);
if (idx == -1) {
break;
}
count++;
assertThat(field.substring(oldIdx, idx), equalTo(id));
idx++;
}
assertThat(count, equalTo(Integer.parseInt(document.get("count"))));
}
private Document createDoc(int id, int size) {
Document doc = new Document();
doc.add(new Field("id", Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED));
doc.add(new Field("size", Integer.toString(size), Field.Store.YES, Field.Index.NOT_ANALYZED));
doc.add(new Field("skip", RandomStringGenerator.random(50), Field.Store.YES, Field.Index.NO));
StringBuilder sb = new StringBuilder();
int count = 0;
while (true) {
count++;
sb.append(id);
sb.append(" ");
if (sb.length() >= size) {
break;
}
}
doc.add(new Field("count", Integer.toString(count), Field.Store.YES, Field.Index.NOT_ANALYZED));
doc.add(new Field("field", sb.toString(), Field.Store.YES, Field.Index.NOT_ANALYZED));
doc.add(new Field("skip", RandomStringGenerator.random(50), Field.Store.YES, Field.Index.NO));
return doc;
}
}

View File

@ -107,11 +107,11 @@ public abstract class AbstractSimpleEngineTests {
} }
protected Store createStore() throws IOException { protected Store createStore() throws IOException {
return new Store(shardId, EMPTY_SETTINGS, null, new RamDirectoryService(shardId, EMPTY_SETTINGS)); return new Store(shardId, EMPTY_SETTINGS, null, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), new RamDirectoryService(shardId, EMPTY_SETTINGS));
} }
protected Store createStoreReplica() throws IOException { protected Store createStoreReplica() throws IOException {
return new Store(shardId, EMPTY_SETTINGS, null, new RamDirectoryService(shardId, EMPTY_SETTINGS)); return new Store(shardId, EMPTY_SETTINGS, null, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), new RamDirectoryService(shardId, EMPTY_SETTINGS));
} }
protected Translog createTranslog() { protected Translog createTranslog() {