diff --git a/src/main/java/org/elasticsearch/common/compress/CompressedIndexInput.java b/src/main/java/org/elasticsearch/common/compress/CompressedIndexInput.java new file mode 100644 index 00000000000..9fe1505ed96 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/compress/CompressedIndexInput.java @@ -0,0 +1,214 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.compress; + +import org.apache.lucene.store.IndexInput; + +import java.io.EOFException; +import java.io.IOException; + +/** + */ +public abstract class CompressedIndexInput extends IndexInput { + + private IndexInput in; + + private int version; + private long uncompressedLength; + private long[] offsets; + + private boolean closed; + + protected byte[] uncompressed; + private int position = 0; + private int valid = 0; + private long headerLength; + private int currentOffsetIdx; + private long currentOffset; + private long currentOffsetFilePointer; + private long metaDataPosition; + + public CompressedIndexInput(IndexInput in) throws IOException { + super("compressed(" + in.toString() + ")"); + this.in = in; + readHeader(in); + this.version = in.readInt(); + metaDataPosition = in.readLong(); + headerLength = in.getFilePointer(); + in.seek(metaDataPosition); + this.uncompressedLength = in.readVLong(); + int size = in.readVInt(); + offsets = new long[size]; + for (int i = 0; i < offsets.length; i++) { + offsets[i] = in.readVLong(); + } + this.currentOffsetIdx = -1; + this.currentOffset = 0; + this.currentOffsetFilePointer = 0; + in.seek(headerLength); + } + + /** + * Method is overridden to report number of bytes that can now be read + * from decoded data buffer, without reading bytes from the underlying + * stream. + * Never throws an exception; returns number of bytes available without + * further reads from underlying source; -1 if stream has been closed, or + * 0 if an actual read (and possible blocking) is needed to find out. + */ + public int available() throws IOException { + // if closed, return -1; + if (closed) { + return -1; + } + int left = (valid - position); + return (left <= 0) ? 0 : left; + } + + @Override + public byte readByte() throws IOException { + if (!readyBuffer()) { + throw new EOFException(); + } + return uncompressed[position++]; + } + + public int read(byte[] buffer, int offset, int length, boolean fullRead) throws IOException { + if (length < 1) { + return 0; + } + if (!readyBuffer()) { + return -1; + } + // First let's read however much data we happen to have... + int chunkLength = Math.min(valid - position, length); + System.arraycopy(uncompressed, position, buffer, offset, chunkLength); + position += chunkLength; + + if (chunkLength == length || !fullRead) { + return chunkLength; + } + // Need more data, then + int totalRead = chunkLength; + do { + offset += chunkLength; + if (!readyBuffer()) { + break; + } + chunkLength = Math.min(valid - position, (length - totalRead)); + System.arraycopy(uncompressed, position, buffer, offset, chunkLength); + position += chunkLength; + totalRead += chunkLength; + } while (totalRead < length); + + return totalRead; + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + int result = read(b, offset, len, true /* we want to have full reads, thats the contract... */); + if (result < len) { + throw new EOFException(); + } + } + + @Override + public long getFilePointer() { + return currentOffsetFilePointer + position; + } + + @Override + public void seek(long pos) throws IOException { + int idx = (int) (pos / uncompressed.length); + if (idx >= offsets.length) { + // set the next "readyBuffer" to EOF + currentOffsetIdx = idx; + position = 0; + valid = 0; + return; + } + + // TODO: optimize so we won't have to readyBuffer on seek, can keep the position around, and set it on readyBuffer in this case + long pointer = offsets[idx]; + if (pointer != currentOffset) { + in.seek(pointer); + position = 0; + valid = 0; + currentOffsetIdx = idx - 1; // we are going to increase it in readyBuffer... + readyBuffer(); + } + position = (int) (pos % uncompressed.length); + } + + @Override + public long length() { + return uncompressedLength; + } + + @Override + public void close() throws IOException { + position = valid = 0; + if (!closed) { + closed = true; + doClose(); + in.close(); + } + } + + protected abstract void doClose() throws IOException; + + protected boolean readyBuffer() throws IOException { + if (position < valid) { + return true; + } + if (closed) { + return false; + } + // we reached the end... + if (currentOffsetIdx + 1 >= offsets.length) { + return false; + } + valid = uncompress(in, uncompressed); + if (valid < 0) { + return false; + } + currentOffsetIdx++; + currentOffset = offsets[currentOffsetIdx]; + currentOffsetFilePointer = currentOffset - headerLength; + position = 0; + return (position < valid); + } + + protected abstract void readHeader(IndexInput in) throws IOException; + + /** + * Uncompress the data into the out array, returning the size uncompressed + */ + protected abstract int uncompress(IndexInput in, byte[] out) throws IOException; + + @Override + public Object clone() { + CompressedIndexInput cloned = (CompressedIndexInput) super.clone(); + cloned.position = 0; + cloned.valid = 0; + cloned.in = (IndexInput) cloned.in.clone(); + return cloned; + } +} diff --git a/src/main/java/org/elasticsearch/common/compress/CompressedIndexOutput.java b/src/main/java/org/elasticsearch/common/compress/CompressedIndexOutput.java new file mode 100644 index 00000000000..01e47d0854a --- /dev/null +++ b/src/main/java/org/elasticsearch/common/compress/CompressedIndexOutput.java @@ -0,0 +1,216 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.compress; + +import gnu.trove.iterator.TLongIterator; +import gnu.trove.list.array.TLongArrayList; +import org.apache.lucene.store.DataInput; +import org.apache.lucene.store.IndexOutput; + +import java.io.IOException; + +/** + */ +public abstract class CompressedIndexOutput extends IndexOutput { + + final IndexOutput out; + + protected byte[] uncompressed; + private int position = 0; + + private long uncompressedPosition; + + private boolean closed; + + private final long metaDataPointer; + private TLongArrayList offsets = new TLongArrayList(); + + public CompressedIndexOutput(IndexOutput out) throws IOException { + this.out = out; + writeHeader(out); + out.writeInt(0); // version + metaDataPointer = out.getFilePointer(); + out.writeLong(-1); // the pointer to the end of the file metadata + } + + public IndexOutput underlying() { + return this.out; + } + + @Override + public void writeByte(byte b) throws IOException { + if (position >= uncompressed.length) { + flushBuffer(); + } + uncompressedPosition++; + uncompressed[position++] = b; + } + + @Override + public void writeBytes(byte[] input, int offset, int length) throws IOException { + // ES, check if length is 0, and don't write in this case + if (length == 0) { + return; + } + final int BUFFER_LEN = uncompressed.length; + + // simple case first: buffering only (for trivially short writes) + int free = BUFFER_LEN - position; + if (free >= length) { + System.arraycopy(input, offset, uncompressed, position, length); + position += length; + uncompressedPosition += length; + return; + } + // fill partial input as much as possible and flush + if (position > 0) { + System.arraycopy(input, offset, uncompressed, position, free); + position += free; + uncompressedPosition += free; + flushBuffer(); + offset += free; + length -= free; + } + + // then write intermediate full block, if any, without copying: + while (length >= BUFFER_LEN) { + offsets.add(out.getFilePointer()); + compress(input, offset, BUFFER_LEN, out); + offset += BUFFER_LEN; + length -= BUFFER_LEN; + uncompressedPosition += BUFFER_LEN; + } + + // and finally, copy leftovers in input, if any + if (length > 0) { + System.arraycopy(input, offset, uncompressed, 0, length); + } + position = length; + uncompressedPosition += length; + } + + @Override + public void copyBytes(DataInput input, long length) throws IOException { + final int BUFFER_LEN = uncompressed.length; + + // simple case first: buffering only (for trivially short writes) + int free = BUFFER_LEN - position; + if (free >= length) { + input.readBytes(uncompressed, position, (int) length, false); + position += length; + uncompressedPosition += length; + return; + } + // fill partial input as much as possible and flush + if (position > 0) { + input.readBytes(uncompressed, position, free, false); + position += free; + uncompressedPosition += free; + flushBuffer(); + length -= free; + } + + // then write intermediate full block, if any, without copying: + + // Note, if we supported flushing buffers not on "chunkSize", then + // we could have flushed up to the rest of non compressed data in the input + // and then copy compressed segments. This means though that we need to + // store the compressed sizes of each segment on top of the offsets, and + // CompressedIndexInput#seek would be more costly, since it can't do (pos / chunk) + // to get the index... + + while (length >= BUFFER_LEN) { + offsets.add(out.getFilePointer()); + input.readBytes(uncompressed, 0, BUFFER_LEN); + compress(uncompressed, 0, BUFFER_LEN, out); + length -= BUFFER_LEN; + uncompressedPosition += BUFFER_LEN; + } + + // and finally, copy leftovers in input, if any + if (length > 0) { + input.readBytes(uncompressed, 0, (int) length, false); + } + position = (int) length; + uncompressedPosition += length; + } + + @Override + public void flush() throws IOException { + // ignore flush, we always want to flush on actual block size + //flushBuffer(); + out.flush(); + } + + @Override + public void close() throws IOException { + if (!closed) { + flushBuffer(); + + // write metadata, and update pointer + long metaDataPointerValue = out.getFilePointer(); + // length uncompressed + out.writeVLong(uncompressedPosition); + // compressed pointers + out.writeVInt(offsets.size()); + for (TLongIterator it = offsets.iterator(); it.hasNext(); ) { + out.writeVLong(it.next()); + } + out.seek(metaDataPointer); + out.writeLong(metaDataPointerValue); + + closed = true; + doClose(); + out.close(); + } + } + + protected abstract void doClose() throws IOException; + + @Override + public long getFilePointer() { + return uncompressedPosition; + } + + @Override + public void seek(long pos) throws IOException { + throw new IOException("seek not supported on compressed output"); + } + + @Override + public long length() throws IOException { + return uncompressedPosition; + } + + private void flushBuffer() throws IOException { + if (position > 0) { + offsets.add(out.getFilePointer()); + compress(uncompressed, 0, position, out); + position = 0; + } + } + + protected abstract void writeHeader(IndexOutput out) throws IOException; + + /** + * Compresses the data into the output + */ + protected abstract void compress(byte[] data, int offset, int len, IndexOutput out) throws IOException; +} diff --git a/src/main/java/org/elasticsearch/common/compress/Compressor.java b/src/main/java/org/elasticsearch/common/compress/Compressor.java index ebd13d79233..4946a52d0a1 100644 --- a/src/main/java/org/elasticsearch/common/compress/Compressor.java +++ b/src/main/java/org/elasticsearch/common/compress/Compressor.java @@ -19,6 +19,8 @@ package org.elasticsearch.common.compress; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.jboss.netty.buffer.ChannelBuffer; @@ -35,6 +37,8 @@ public interface Compressor { boolean isCompressed(ChannelBuffer buffer); + boolean isCompressed(IndexInput in) throws IOException; + /** * Uncompress the provided data, data can be detected as compressed using {@link #isCompressed(byte[], int, int)}. */ @@ -48,4 +52,8 @@ public interface Compressor { CompressedStreamInput streamInput(StreamInput in) throws IOException; CompressedStreamOutput streamOutput(StreamOutput out) throws IOException; + + CompressedIndexInput indexInput(IndexInput in) throws IOException; + + CompressedIndexOutput indexOutput(IndexOutput out) throws IOException; } diff --git a/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java b/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java index cb8195b5e68..15bd262f150 100644 --- a/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java +++ b/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java @@ -20,6 +20,7 @@ package org.elasticsearch.common.compress; import com.google.common.collect.ImmutableMap; +import org.apache.lucene.store.IndexInput; import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.MapBuilder; @@ -90,6 +91,16 @@ public class CompressorFactory { return null; } + @Nullable + public static Compressor compressor(IndexInput in) throws IOException { + for (Compressor compressor : compressors) { + if (compressor.isCompressed(in)) { + return compressor; + } + } + return null; + } + public static Compressor compressor(String type) { return compressorsByType.get(type); } diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedIndexInput.java b/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedIndexInput.java new file mode 100644 index 00000000000..9f0c415f448 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedIndexInput.java @@ -0,0 +1,74 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.compress.lzf; + +import com.ning.compress.lzf.ChunkDecoder; +import com.ning.compress.lzf.LZFChunk; +import org.apache.lucene.store.IndexInput; +import org.elasticsearch.common.compress.CompressedIndexInput; +import org.elasticsearch.common.lucene.store.InputStreamIndexInput; + +import java.io.IOException; +import java.util.Arrays; + +/** + */ +public class LZFCompressedIndexInput extends CompressedIndexInput { + + private final ChunkDecoder decoder; + // scratch area buffer + private byte[] inputBuffer; + + public LZFCompressedIndexInput(IndexInput in, ChunkDecoder decoder) throws IOException { + super(in); + + this.decoder = decoder; + this.uncompressed = new byte[LZFChunk.MAX_CHUNK_LEN]; + this.inputBuffer = new byte[LZFChunk.MAX_CHUNK_LEN]; + } + + @Override + protected void readHeader(IndexInput in) throws IOException { + byte[] header = new byte[LZFCompressor.LUCENE_HEADER.length]; + in.readBytes(header, 0, header.length, false); + if (!Arrays.equals(header, LZFCompressor.LUCENE_HEADER)) { + throw new IOException("wrong lzf compressed header [" + Arrays.toString(header) + "]"); + } + } + + @Override + protected int uncompress(IndexInput in, byte[] out) throws IOException { + return decoder.decodeChunk(new InputStreamIndexInput(in, Long.MAX_VALUE), inputBuffer, out); + } + + @Override + protected void doClose() throws IOException { + // nothing to do here... + } + + @Override + public Object clone() { + LZFCompressedIndexInput cloned = (LZFCompressedIndexInput) super.clone(); + cloned.uncompressed = new byte[LZFChunk.MAX_CHUNK_LEN]; + System.arraycopy(uncompressed, 0, cloned.uncompressed, 0, uncompressed.length); + cloned.inputBuffer = new byte[LZFChunk.MAX_CHUNK_LEN]; + return cloned; + } +} diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedIndexOutput.java b/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedIndexOutput.java new file mode 100644 index 00000000000..364098f476b --- /dev/null +++ b/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedIndexOutput.java @@ -0,0 +1,64 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.compress.lzf; + +import com.ning.compress.BufferRecycler; +import com.ning.compress.lzf.ChunkEncoder; +import com.ning.compress.lzf.LZFChunk; +import org.apache.lucene.store.IndexOutput; +import org.elasticsearch.common.compress.CompressedIndexOutput; +import org.elasticsearch.common.lucene.store.OutputStreamIndexOutput; + +import java.io.IOException; + +/** + */ +public class LZFCompressedIndexOutput extends CompressedIndexOutput { + + private final BufferRecycler recycler; + private final ChunkEncoder encoder; + + public LZFCompressedIndexOutput(IndexOutput out) throws IOException { + super(out); + this.recycler = BufferRecycler.instance(); + this.uncompressed = this.recycler.allocOutputBuffer(LZFChunk.MAX_CHUNK_LEN); + this.encoder = new ChunkEncoder(LZFChunk.MAX_CHUNK_LEN); + } + + @Override + protected void writeHeader(IndexOutput out) throws IOException { + out.writeBytes(LZFCompressor.LUCENE_HEADER, LZFCompressor.LUCENE_HEADER.length); + } + + @Override + protected void compress(byte[] data, int offset, int len, IndexOutput out) throws IOException { + encoder.encodeAndWriteChunk(data, offset, len, new OutputStreamIndexOutput(out)); + } + + @Override + protected void doClose() throws IOException { + byte[] buf = uncompressed; + if (buf != null) { + uncompressed = null; + recycler.releaseOutputBuffer(buf); + } + encoder.close(); + } +} diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedStreamInput.java b/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedStreamInput.java index 0334c321441..78b8ae985b5 100644 --- a/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedStreamInput.java +++ b/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedStreamInput.java @@ -34,7 +34,7 @@ public class LZFCompressedStreamInput extends CompressedStreamInput { private final BufferRecycler recycler; - private final com.ning.compress.lzf.ChunkDecoder decoder; + private final ChunkDecoder decoder; // scratch area buffer private byte[] inputBuffer; diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedStreamOutput.java b/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedStreamOutput.java index a8847547e75..af58fde006d 100644 --- a/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedStreamOutput.java +++ b/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressedStreamOutput.java @@ -32,7 +32,7 @@ import java.io.IOException; public class LZFCompressedStreamOutput extends CompressedStreamOutput { private final BufferRecycler recycler; - private final com.ning.compress.lzf.ChunkEncoder encoder; + private final ChunkEncoder encoder; public LZFCompressedStreamOutput(StreamOutput out) throws IOException { super(out); diff --git a/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressor.java b/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressor.java index 93dec24bf7f..c2ff2300562 100644 --- a/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressor.java +++ b/src/main/java/org/elasticsearch/common/compress/lzf/LZFCompressor.java @@ -23,9 +23,9 @@ import com.ning.compress.lzf.ChunkDecoder; import com.ning.compress.lzf.LZFChunk; import com.ning.compress.lzf.LZFEncoder; import com.ning.compress.lzf.util.ChunkDecoderFactory; -import org.elasticsearch.common.compress.CompressedStreamInput; -import org.elasticsearch.common.compress.CompressedStreamOutput; -import org.elasticsearch.common.compress.Compressor; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.elasticsearch.common.compress.*; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.logging.Loggers; @@ -37,6 +37,8 @@ import java.io.IOException; */ public class LZFCompressor implements Compressor { + static final byte[] LUCENE_HEADER = {'L', 'Z', 'F', 0}; + public static final String TYPE = "lzf"; private ChunkDecoder decoder; @@ -68,6 +70,23 @@ public class LZFCompressor implements Compressor { (buffer.getByte(offset + 2) == LZFChunk.BLOCK_TYPE_COMPRESSED || buffer.getByte(offset + 2) == LZFChunk.BLOCK_TYPE_NON_COMPRESSED); } + @Override + public boolean isCompressed(IndexInput in) throws IOException { + long currentPointer = in.getFilePointer(); + // since we have some metdata before the first compressed header, we check on our specific header + if (in.length() - currentPointer < (LUCENE_HEADER.length)) { + return false; + } + for (int i = 0; i < LUCENE_HEADER.length; i++) { + if (in.readByte() != LUCENE_HEADER[i]) { + in.seek(currentPointer); + return false; + } + } + in.seek(currentPointer); + return true; + } + @Override public byte[] uncompress(byte[] data, int offset, int length) throws IOException { return decoder.decode(data, offset, length); @@ -88,4 +107,13 @@ public class LZFCompressor implements Compressor { return new LZFCompressedStreamOutput(out); } + @Override + public CompressedIndexInput indexInput(IndexInput in) throws IOException { + return new LZFCompressedIndexInput(in, decoder); + } + + @Override + public CompressedIndexOutput indexOutput(IndexOutput out) throws IOException { + return new LZFCompressedIndexOutput(out); + } } diff --git a/src/main/java/org/elasticsearch/common/lucene/store/BufferedChecksumIndexOutput.java b/src/main/java/org/elasticsearch/common/lucene/store/BufferedChecksumIndexOutput.java new file mode 100644 index 00000000000..5ec266345a1 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/lucene/store/BufferedChecksumIndexOutput.java @@ -0,0 +1,103 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.lucene.store; + +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.OpenBufferedIndexOutput; + +import java.io.IOException; +import java.util.zip.Checksum; + +/** + */ +public class BufferedChecksumIndexOutput extends OpenBufferedIndexOutput { + + private final IndexOutput out; + + private final Checksum digest; + + public BufferedChecksumIndexOutput(IndexOutput out, Checksum digest) { + // we add 8 to be bigger than the default BufferIndexOutput buffer size so any flush will go directly + // to the output without being copied over to the delegate buffer + super(OpenBufferedIndexOutput.DEFAULT_BUFFER_SIZE + 64); + this.out = out; + this.digest = digest; + } + + public Checksum digest() { + return digest; + } + + public IndexOutput underlying() { + return this.out; + } + + // don't override it, base class method simple reads from input and writes to this output +// @Override public void copyBytes(IndexInput input, long numBytes) throws IOException { +// delegate.copyBytes(input, numBytes); +// } + + @Override + public void close() throws IOException { + super.close(); + out.close(); + } + + @Override + protected void flushBuffer(byte[] b, int offset, int len) throws IOException { + out.writeBytes(b, offset, len); + digest.update(b, offset, len); + } + + // don't override it, base class method simple reads from input and writes to this output +// @Override public void copyBytes(IndexInput input, long numBytes) throws IOException { +// delegate.copyBytes(input, numBytes); +// } + + @Override + public void flush() throws IOException { + super.flush(); + out.flush(); + } + + @Override + public void seek(long pos) throws IOException { + // seek might be called on files, which means that the checksum is not file checksum + // but a checksum of the bytes written to this stream, which is the same for each + // type of file in lucene + super.seek(pos); + out.seek(pos); + } + + @Override + public long length() throws IOException { + return out.length(); + } + + @Override + public void setLength(long length) throws IOException { + out.setLength(length); + } + + @Override + public String toString() { + return out.toString(); + } +} diff --git a/src/main/java/org/elasticsearch/common/lucene/store/ChecksumIndexOutput.java b/src/main/java/org/elasticsearch/common/lucene/store/ChecksumIndexOutput.java new file mode 100644 index 00000000000..5bc37e3c543 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/lucene/store/ChecksumIndexOutput.java @@ -0,0 +1,97 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.lucene.store; + +import org.apache.lucene.store.IndexOutput; + +import java.io.IOException; +import java.util.zip.Checksum; + +/** + */ +public class ChecksumIndexOutput extends IndexOutput { + + private final IndexOutput out; + + private final Checksum digest; + + public ChecksumIndexOutput(IndexOutput out, Checksum digest) { + this.out = out; + this.digest = digest; + } + + public Checksum digest() { + return digest; + } + + @Override + public void writeByte(byte b) throws IOException { + out.writeByte(b); + digest.update(b); + } + + @Override + public void setLength(long length) throws IOException { + out.setLength(length); + } + + // don't override copyBytes, since we need to read it and compute it +// @Override +// public void copyBytes(DataInput input, long numBytes) throws IOException { +// super.copyBytes(input, numBytes); +// } + + + @Override + public String toString() { + return out.toString(); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + out.writeBytes(b, offset, length); + digest.update(b, offset, length); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + out.close(); + } + + @Override + public long getFilePointer() { + return out.getFilePointer(); + } + + @Override + public void seek(long pos) throws IOException { + out.seek(pos); + } + + @Override + public long length() throws IOException { + return out.length(); + } +} diff --git a/src/main/java/org/elasticsearch/common/lucene/store/OutputStreamIndexOutput.java b/src/main/java/org/elasticsearch/common/lucene/store/OutputStreamIndexOutput.java new file mode 100644 index 00000000000..a56c838a931 --- /dev/null +++ b/src/main/java/org/elasticsearch/common/lucene/store/OutputStreamIndexOutput.java @@ -0,0 +1,61 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.lucene.store; + +import org.apache.lucene.store.IndexOutput; + +import java.io.IOException; +import java.io.OutputStream; + +/** + */ +public class OutputStreamIndexOutput extends OutputStream { + + private final IndexOutput out; + + public OutputStreamIndexOutput(IndexOutput out) { + this.out = out; + } + + @Override + public void write(int b) throws IOException { + out.writeByte((byte) b); + } + + @Override + public void write(byte[] b) throws IOException { + out.writeBytes(b, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.writeBytes(b, off, len); + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + out.close(); + } +} diff --git a/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java b/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java index 13235e592e8..6c43d51fdf3 100644 --- a/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java +++ b/src/main/java/org/elasticsearch/index/gateway/blobstore/BlobStoreIndexShardGateway.java @@ -637,7 +637,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo try { // we create an output with no checksum, this is because the pure binary data of the file is not // the checksum (because of seek). We will create the checksum file once copying is done - indexOutput = store.createOutputWithNoChecksum(fileInfo.physicalName()); + indexOutput = store.createOutputRaw(fileInfo.physicalName()); } catch (IOException e) { failures.add(e); latch.countDown(); @@ -752,7 +752,7 @@ public abstract class BlobStoreIndexShardGateway extends AbstractIndexShardCompo IndexInput indexInput = null; try { - indexInput = dir.openInput(fileInfo.physicalName()); + indexInput = indexShard.store().openInputRaw(fileInfo.physicalName()); indexInput.seek(partNumber * chunkBytes); InputStreamIndexInput is = new ThreadSafeInputStreamIndexInput(indexInput, chunkBytes); diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index bc1236e84dc..6b80acf5878 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -24,13 +24,20 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import jsr166y.ThreadLocalRandom; import org.apache.lucene.store.*; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.MapBuilder; +import org.elasticsearch.common.compress.CompressedIndexOutput; +import org.elasticsearch.common.compress.Compressor; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.lucene.Directories; +import org.elasticsearch.common.lucene.store.BufferedChecksumIndexOutput; +import org.elasticsearch.common.lucene.store.ChecksumIndexOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.support.ForceSyncDirectory; @@ -43,12 +50,29 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.zip.Adler32; -import java.util.zip.Checksum; /** */ public class Store extends AbstractIndexShardComponent { + static { + IndexMetaData.addDynamicSettings( + "index.store.compress.stored_fields" + ); + } + + class ApplySettings implements IndexSettingsService.Listener { + @Override + public void onRefreshSettings(Settings settings) { + boolean compressedStoredFields = settings.getAsBoolean("index.store.compress.stored_fields", Store.this.compressedStoredFields); + if (compressedStoredFields != Store.this.compressedStoredFields) { + logger.info("updating [compress.stored_fields] from [{}] to [{}]", Store.this.compressedStoredFields, compressedStoredFields); + Store.this.compressedStoredFields = compressedStoredFields; + } + } + } + + static final String CHECKSUMS_PREFIX = "_checksums-"; public static final boolean isChecksum(String name) { @@ -57,6 +81,8 @@ public class Store extends AbstractIndexShardComponent { private final IndexStore indexStore; + private final IndexSettingsService indexSettingsService; + private final DirectoryService directoryService; private final StoreDirectory directory; @@ -69,13 +95,22 @@ public class Store extends AbstractIndexShardComponent { private final boolean sync; + private volatile boolean compressedStoredFields; + + private final ApplySettings applySettings = new ApplySettings(); + + @Inject - public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, DirectoryService directoryService) throws IOException { + public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, IndexSettingsService indexSettingsService, DirectoryService directoryService) throws IOException { super(shardId, indexSettings); this.indexStore = indexStore; + this.indexSettingsService = indexSettingsService; this.directoryService = directoryService; this.sync = componentSettings.getAsBoolean("sync", true); // TODO we don't really need to fsync when using shared gateway... this.directory = new StoreDirectory(directoryService.build()); + + this.compressedStoredFields = componentSettings.getAsBoolean("compress.stored_fields", false); + indexSettingsService.addListener(applySettings); } public Directory directory() { @@ -218,7 +253,7 @@ public class Store extends AbstractIndexShardComponent { checksums.put(metaData.name(), metaData.checksum()); } } - IndexOutput output = directory.createOutput(checksumName, false); + IndexOutput output = directory.createOutput(checksumName, true); output.writeInt(0); // version output.writeStringStringMap(checksums); output.close(); @@ -242,11 +277,26 @@ public class Store extends AbstractIndexShardComponent { } public void close() throws IOException { + indexSettingsService.removeListener(applySettings); directory.close(); } - public IndexOutput createOutputWithNoChecksum(String name) throws IOException { - return directory.createOutput(name, false); + /** + * Creates a raw output, no checksum is computed, and no compression if enabled. + */ + public IndexOutput createOutputRaw(String name) throws IOException { + return directory.createOutput(name, true); + } + + /** + * Opened an index input in raw form, no decompression for example. + */ + public IndexInput openInputRaw(String name) throws IOException { + StoreFileMetaData metaData = filesMetadata.get(name); + if (metaData == null) { + throw new FileNotFoundException(name); + } + return metaData.directory().openInput(name); } public void writeChecksum(String name, String checksum) throws IOException { @@ -387,10 +437,10 @@ public class Store extends AbstractIndexShardComponent { @Override public IndexOutput createOutput(String name) throws IOException { - return createOutput(name, true); + return createOutput(name, false); } - public IndexOutput createOutput(String name, boolean computeChecksum) throws IOException { + public IndexOutput createOutput(String name, boolean raw) throws IOException { Directory directory = null; if (isChecksum(name)) { directory = delegates[0]; @@ -420,7 +470,26 @@ public class Store extends AbstractIndexShardComponent { StoreFileMetaData metaData = new StoreFileMetaData(name, -1, -1, null, directory); filesMetadata = MapBuilder.newMapBuilder(filesMetadata).put(name, metaData).immutableMap(); files = filesMetadata.keySet().toArray(new String[filesMetadata.size()]); - return new StoreIndexOutput(metaData, out, name, computeChecksum); + boolean computeChecksum = !raw; + if (computeChecksum) { + // don't compute checksum for segment based files + if ("segments.gen".equals(name) || name.startsWith("segments")) { + computeChecksum = false; + } + } + if (!raw && compressedStoredFields && name.endsWith(".fdt")) { + if (computeChecksum) { + // with compression, there is no need for buffering when doing checksums + // since we have buffering on the compressed index output + out = new ChecksumIndexOutput(out, new Adler32()); + } + out = CompressorFactory.defaultCompressor().indexOutput(out); + } else { + if (computeChecksum) { + out = new BufferedChecksumIndexOutput(out, new Adler32()); + } + } + return new StoreIndexOutput(metaData, out, name); } } @@ -430,7 +499,30 @@ public class Store extends AbstractIndexShardComponent { if (metaData == null) { throw new FileNotFoundException(name); } - return metaData.directory().openInput(name); + IndexInput in = metaData.directory().openInput(name); + if (name.endsWith(".fdt")) { + Compressor compressor = CompressorFactory.compressor(in); + if (compressor != null) { + in = compressor.indexInput(in); + } + } + return in; + } + + @Override + public IndexInput openInput(String name, int bufferSize) throws IOException { + StoreFileMetaData metaData = filesMetadata.get(name); + if (metaData == null) { + throw new FileNotFoundException(name); + } + IndexInput in = metaData.directory().openInput(name, bufferSize); + if (name.endsWith(".fdt")) { + Compressor compressor = CompressorFactory.compressor(in); + if (compressor != null) { + in = compressor.indexInput(in); + } + } + return in; } @Override @@ -449,15 +541,6 @@ public class Store extends AbstractIndexShardComponent { return delegates[0].makeLock(name); } - @Override - public IndexInput openInput(String name, int bufferSize) throws IOException { - StoreFileMetaData metaData = filesMetadata.get(name); - if (metaData == null) { - throw new FileNotFoundException(name); - } - return metaData.directory().openInput(name, bufferSize); - } - @Override public void clearLock(String name) throws IOException { delegates[0].clearLock(name); @@ -524,49 +607,32 @@ public class Store extends AbstractIndexShardComponent { } } - class StoreIndexOutput extends OpenBufferedIndexOutput { + class StoreIndexOutput extends IndexOutput { private final StoreFileMetaData metaData; - private final IndexOutput delegate; + private final IndexOutput out; private final String name; - private final Checksum digest; - - StoreIndexOutput(StoreFileMetaData metaData, IndexOutput delegate, String name, boolean computeChecksum) { - // we add 8 to be bigger than the default BufferIndexOutput buffer size so any flush will go directly - // to the output without being copied over to the delegate buffer - super(OpenBufferedIndexOutput.DEFAULT_BUFFER_SIZE + 64); + StoreIndexOutput(StoreFileMetaData metaData, IndexOutput delegate, String name) { this.metaData = metaData; - this.delegate = delegate; + this.out = delegate; this.name = name; - if (computeChecksum) { - if ("segments.gen".equals(name)) { - // no need to create checksum for segments.gen since its not snapshot to recovery - this.digest = null; - } else if (name.startsWith("segments")) { - // don't compute checksum for segments files, so pure Lucene can open this directory - // and since we, in any case, always recover the segments files - this.digest = null; - } else { -// this.digest = new CRC32(); - // adler is faster, and we compare on length as well, should be enough to check for difference - // between files - this.digest = new Adler32(); - } - } else { - this.digest = null; - } } @Override public void close() throws IOException { - super.close(); - delegate.close(); + out.close(); String checksum = null; - if (digest != null) { - checksum = Long.toString(digest.getValue(), Character.MAX_RADIX); + IndexOutput underlying = out; + if (out instanceof CompressedIndexOutput) { + underlying = ((CompressedIndexOutput) out).underlying(); + } + if (underlying instanceof BufferedChecksumIndexOutput) { + checksum = Long.toString(((BufferedChecksumIndexOutput) underlying).digest().getValue(), Character.MAX_RADIX); + } else if (underlying instanceof ChecksumIndexOutput) { + checksum = Long.toString(((ChecksumIndexOutput) underlying).digest().getValue(), Character.MAX_RADIX); } synchronized (mutex) { StoreFileMetaData md = new StoreFileMetaData(name, metaData.directory().fileLength(name), metaData.directory().fileModified(name), checksum, metaData.directory()); @@ -576,41 +642,48 @@ public class Store extends AbstractIndexShardComponent { } @Override - protected void flushBuffer(byte[] b, int offset, int len) throws IOException { - delegate.writeBytes(b, offset, len); - if (digest != null) { - digest.update(b, offset, len); - } + public void copyBytes(DataInput input, long numBytes) throws IOException { + out.copyBytes(input, numBytes); } - // don't override it, base class method simple reads from input and writes to this output -// @Override public void copyBytes(IndexInput input, long numBytes) throws IOException { -// delegate.copyBytes(input, numBytes); -// } + @Override + public long getFilePointer() { + return out.getFilePointer(); + } + + @Override + public void writeByte(byte b) throws IOException { + out.writeByte(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + out.writeBytes(b, offset, length); + } @Override public void flush() throws IOException { - super.flush(); - delegate.flush(); + out.flush(); } @Override public void seek(long pos) throws IOException { - // seek might be called on files, which means that the checksum is not file checksum - // but a checksum of the bytes written to this stream, which is the same for each - // type of file in lucene - super.seek(pos); - delegate.seek(pos); + out.seek(pos); } @Override public long length() throws IOException { - return delegate.length(); + return out.length(); } @Override public void setLength(long length) throws IOException { - delegate.setLength(length); + out.setLength(length); + } + + @Override + public String toString() { + return out.toString(); } } } diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java index 32c709714e9..2090e59aa1a 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java @@ -134,7 +134,7 @@ public class RecoverySource extends AbstractComponent { final int BUFFER_SIZE = (int) recoverySettings.fileChunkSize().bytes(); byte[] buf = new byte[BUFFER_SIZE]; StoreFileMetaData md = shard.store().metaData(name); - indexInput = snapshot.getDirectory().openInput(name); + indexInput = shard.store().openInputRaw(name); long len = indexInput.length(); long readCount = 0; while (readCount < len) { diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index e62b1e4f29b..ab3273a471f 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -520,7 +520,7 @@ public class RecoveryTarget extends AbstractComponent { name = name + "." + onGoingRecovery.startTime; } - indexOutput = shard.store().createOutputWithNoChecksum(name); + indexOutput = shard.store().createOutputRaw(name); onGoingRecovery.openIndexOutputs.put(request.name(), indexOutput); } else { diff --git a/src/test/java/org/elasticsearch/benchmark/compress/LuceneCompressionBenchmark.java b/src/test/java/org/elasticsearch/benchmark/compress/LuceneCompressionBenchmark.java new file mode 100644 index 00000000000..510e90fcf73 --- /dev/null +++ b/src/test/java/org/elasticsearch/benchmark/compress/LuceneCompressionBenchmark.java @@ -0,0 +1,128 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.benchmark.compress; + +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.NIOFSDirectory; +import org.elasticsearch.common.compress.CompressedIndexInput; +import org.elasticsearch.common.compress.Compressor; +import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.FileSystemUtils; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; + +import java.io.File; +import java.io.IOException; + +/** + */ +public class LuceneCompressionBenchmark { + + public static void main(String[] args) throws Exception { + final long MAX_SIZE = ByteSizeValue.parseBytesSizeValue("50mb").bytes(); + + final Compressor compressor = CompressorFactory.defaultCompressor(); + + File testFile = new File("target/test/compress/lucene"); + FileSystemUtils.deleteRecursively(testFile); + testFile.mkdirs(); + + FSDirectory uncompressedDir = new NIOFSDirectory(new File(testFile, "uncompressed")); + IndexWriter uncompressedWriter = new IndexWriter(uncompressedDir, new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER)); + + FSDirectory compressedDir = new NIOFSDirectory(new File(testFile, "compressed")) { + @Override + public IndexOutput createOutput(String name) throws IOException { + if (name.endsWith(".fdt")) { + return compressor.indexOutput(super.createOutput(name)); + } + return super.createOutput(name); + } + + @Override + public IndexInput openInput(String name) throws IOException { + if (name.endsWith(".fdt")) { + IndexInput in = super.openInput(name); + Compressor compressor1 = CompressorFactory.compressor(in); + if (compressor1 != null) { + return compressor1.indexInput(in); + } else { + return in; + } + } + return super.openInput(name); + } + + @Override + public IndexInput openInput(String name, int bufferSize) throws IOException { + if (name.endsWith(".fdt")) { + IndexInput in = super.openInput(name, bufferSize); + // in case the override called openInput(String) + if (in instanceof CompressedIndexInput) { + return in; + } + Compressor compressor1 = CompressorFactory.compressor(in); + if (compressor1 != null) { + return compressor1.indexInput(in); + } else { + return in; + } + } + return super.openInput(name, bufferSize); + } + }; + + IndexWriter compressedWriter = new IndexWriter(compressedDir, new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER)); + + System.out.println("feeding data..."); + TestData testData = new TestData(); + while (testData.next() && testData.getTotalSize() < MAX_SIZE) { + // json + XContentBuilder builder = XContentFactory.jsonBuilder(); + testData.current(builder); + builder.close(); + Document doc = new Document(); + doc.add(new Field("_source", builder.underlyingBytes(), 0, builder.underlyingBytesLength())); + uncompressedWriter.addDocument(doc); + compressedWriter.addDocument(doc); + } + System.out.println("optimizing..."); + uncompressedWriter.forceMerge(1); + compressedWriter.forceMerge(1); + uncompressedWriter.waitForMerges(); + compressedWriter.waitForMerges(); + + System.out.println("done"); + uncompressedDir.close(); + compressedWriter.close(); + + compressedDir.close(); + uncompressedDir.close(); + } + +} diff --git a/src/test/java/org/elasticsearch/test/unit/common/compress/CompressIndexInputOutputTests.java b/src/test/java/org/elasticsearch/test/unit/common/compress/CompressIndexInputOutputTests.java new file mode 100644 index 00000000000..fdc86b9c897 --- /dev/null +++ b/src/test/java/org/elasticsearch/test/unit/common/compress/CompressIndexInputOutputTests.java @@ -0,0 +1,322 @@ +package org.elasticsearch.test.unit.common.compress; + +import jsr166y.ThreadLocalRandom; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.MapFieldSelector; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.RAMDirectory; +import org.elasticsearch.common.RandomStringGenerator; +import org.elasticsearch.common.compress.CompressedIndexInput; +import org.elasticsearch.common.compress.CompressedIndexOutput; +import org.elasticsearch.common.compress.Compressor; +import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.lucene.Lucene; +import org.elasticsearch.common.unit.SizeValue; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.EOFException; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +/** + */ +@Test +public class CompressIndexInputOutputTests { + + private Compressor compressor; + + @BeforeClass + public void buildCompressor() { + this.compressor = CompressorFactory.defaultCompressor(); + } + + @Test + public void empty() throws Exception { + Directory dir = new RAMDirectory(); + IndexOutput out = compressor.indexOutput(dir.createOutput("test")); + out.close(); + + IndexInput in = compressor.indexInput(dir.openInput("test")); + try { + in.readByte(); + assert false; + } catch (EOFException e) { + // all is well + } + + in.seek(100); + try { + in.readByte(); + assert false; + } catch (EOFException e) { + // all is well + } + } + + @Test + public void simple() throws Exception { + Directory dir = new RAMDirectory(); + IndexOutput out = compressor.indexOutput(dir.createOutput("test")); + long pos1 = out.getFilePointer(); + out.writeInt(1); + long pos2 = out.getFilePointer(); + out.writeString("test1"); + long pos3 = out.getFilePointer(); + String largeString = RandomStringGenerator.random(0xFFFF + 5); + out.writeString(largeString); + long pos4 = out.getFilePointer(); + out.writeInt(2); + long pos5 = out.getFilePointer(); + out.writeString("test2"); + out.close(); + + IndexInput in = compressor.indexInput(dir.openInput("test")); + assertThat(in.readInt(), equalTo(1)); + assertThat(in.readString(), equalTo("test1")); + assertThat(in.readString(), equalTo(largeString)); + assertThat(in.readInt(), equalTo(2)); + assertThat(in.readString(), equalTo("test2")); + + in.seek(pos3); + assertThat(in.readString(), equalTo(largeString)); + in.seek(pos2); + assertThat(in.readString(), equalTo("test1")); + in.seek(pos5); + assertThat(in.readString(), equalTo("test2")); + in.seek(pos1); + assertThat(in.readInt(), equalTo(1)); + + in.seek(0); + byte[] full = new byte[(int) in.length()]; + in.readBytes(full, 0, full.length); + + in.close(); + } + + @Test + public void seek1Compressed() throws Exception { + seek1(true); + } + + @Test + public void seek1UnCompressed() throws Exception { + seek1(false); + } + + private void seek1(boolean compressed) throws Exception { + Directory dir = new RAMDirectory(); + IndexOutput out = compressed ? compressor.indexOutput(dir.createOutput("test")) : dir.createOutput("test"); + long pos1 = out.getFilePointer(); + out.writeVInt(4); + out.writeInt(1); + long pos2 = out.getFilePointer(); + out.writeVInt(8); + long posX = out.getFilePointer(); + out.writeInt(2); + out.writeInt(3); + long pos3 = out.getFilePointer(); + out.writeVInt(4); + out.writeInt(4); + out.close(); + + //IndexInput in = dir.openInput("test"); + IndexInput in = compressed ? compressor.indexInput(dir.openInput("test")) : dir.openInput("test"); + in.seek(pos2); + // now "skip" + int numBytes = in.readVInt(); + assertThat(in.getFilePointer(), equalTo(posX)); + in.seek(in.getFilePointer() + numBytes); + assertThat(in.readVInt(), equalTo(4)); + assertThat(in.readInt(), equalTo(4)); + } + + @Test + public void copyBytes() throws Exception { + Directory dir = new RAMDirectory(); + IndexOutput out = compressor.indexOutput(dir.createOutput("test")); + long pos1 = out.getFilePointer(); + out.writeInt(1); + long pos2 = out.getFilePointer(); + assertThat(pos2, equalTo(4l)); + out.writeString("test1"); + long pos3 = out.getFilePointer(); + String largeString = RandomStringGenerator.random(0xFFFF + 5); + out.writeString(largeString); + long pos4 = out.getFilePointer(); + out.writeInt(2); + long pos5 = out.getFilePointer(); + out.writeString("test2"); + assertThat(out.length(), equalTo(out.getFilePointer())); + long length = out.length(); + out.close(); + + CompressedIndexOutput out2 = compressor.indexOutput(dir.createOutput("test2")); + out2.writeString("mergeStart"); + long startMergePos = out2.getFilePointer(); + CompressedIndexInput testInput = compressor.indexInput(dir.openInput("test")); + assertThat(testInput.length(), equalTo(length)); + out2.copyBytes(testInput, testInput.length()); + long endMergePos = out2.getFilePointer(); + out2.writeString("mergeEnd"); + out2.close(); + + IndexInput in = compressor.indexInput(dir.openInput("test2")); + assertThat(in.readString(), equalTo("mergeStart")); + assertThat(in.readInt(), equalTo(1)); + assertThat(in.readString(), equalTo("test1")); + assertThat(in.readString(), equalTo(largeString)); + assertThat(in.readInt(), equalTo(2)); + assertThat(in.readString(), equalTo("test2")); + assertThat(in.readString(), equalTo("mergeEnd")); + + in.seek(pos1); + assertThat(in.readString(), equalTo("mergeStart")); + in.seek(endMergePos); + assertThat(in.readString(), equalTo("mergeEnd")); + + try { + in.readByte(); + assert false; + } catch (EOFException e) { + // all is well, we reached hte end... + } + } + + @Test + public void lucene() throws Exception { + final AtomicBoolean compressed = new AtomicBoolean(true); + Directory dir = new RAMDirectory() { + + @Override + public IndexOutput createOutput(String name) throws IOException { + if (compressed.get() && name.endsWith(".fdt")) { + return compressor.indexOutput(super.createOutput(name)); + } + return super.createOutput(name); + } + + @Override + public IndexInput openInput(String name) throws IOException { + if (name.endsWith(".fdt")) { + IndexInput in = super.openInput(name); + Compressor compressor1 = CompressorFactory.compressor(in); + if (compressor1 != null) { + return compressor1.indexInput(in); + } else { + return in; + } + } + return super.openInput(name); + } + + @Override + public IndexInput openInput(String name, int bufferSize) throws IOException { + if (name.endsWith(".fdt")) { + IndexInput in = super.openInput(name, bufferSize); + // in case the override called openInput(String) + if (in instanceof CompressedIndexInput) { + return in; + } + Compressor compressor1 = CompressorFactory.compressor(in); + if (compressor1 != null) { + return compressor1.indexInput(in); + } else { + return in; + } + } + return super.openInput(name, bufferSize); + } + }; + + IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER)); + writer.addDocument(createDoc(1, (int) SizeValue.parseSizeValue("100b").singles())); + writer.addDocument(createDoc(2, (int) SizeValue.parseSizeValue("5k").singles())); + writer.commit(); + writer.addDocument(createDoc(3, (int) SizeValue.parseSizeValue("2k").singles())); + writer.addDocument(createDoc(4, (int) SizeValue.parseSizeValue("1k").singles())); + writer.commit(); + verify(writer); + writer.forceMerge(1); + writer.waitForMerges(); + verify(writer); + compressed.set(false); + writer.addDocument(createDoc(5, (int) SizeValue.parseSizeValue("2k").singles())); + writer.addDocument(createDoc(6, (int) SizeValue.parseSizeValue("1k").singles())); + verify(writer); + writer.forceMerge(1); + writer.waitForMerges(); + verify(writer); + } + + private void verify(IndexWriter writer) throws Exception { + IndexReader reader = IndexReader.open(writer, true); + for (int i = 0; i < reader.maxDoc(); i++) { + if (reader.isDeleted(i)) { + continue; + } + Document document = reader.document(i); + checkDoc(document); + document = reader.document(i, new MapFieldSelector("id", "field", "count")); + checkDoc(document); + } + for (int i = 0; i < 100; i++) { + int doc = ThreadLocalRandom.current().nextInt(reader.maxDoc()); + if (reader.isDeleted(i)) { + continue; + } + Document document = reader.document(doc); + checkDoc(document); + document = reader.document(doc, new MapFieldSelector("id", "field", "count")); + checkDoc(document); + } + } + + private void checkDoc(Document document) { + String id = document.get("id"); + String field = document.get("field"); + int count = 0; + int idx = 0; + while (true) { + int oldIdx = idx; + idx = field.indexOf(' ', oldIdx); + if (idx == -1) { + break; + } + count++; + assertThat(field.substring(oldIdx, idx), equalTo(id)); + idx++; + } + assertThat(count, equalTo(Integer.parseInt(document.get("count")))); + } + + private Document createDoc(int id, int size) { + Document doc = new Document(); + doc.add(new Field("id", Integer.toString(id), Field.Store.YES, Field.Index.NOT_ANALYZED)); + doc.add(new Field("size", Integer.toString(size), Field.Store.YES, Field.Index.NOT_ANALYZED)); + doc.add(new Field("skip", RandomStringGenerator.random(50), Field.Store.YES, Field.Index.NO)); + StringBuilder sb = new StringBuilder(); + int count = 0; + while (true) { + count++; + sb.append(id); + sb.append(" "); + if (sb.length() >= size) { + break; + } + } + doc.add(new Field("count", Integer.toString(count), Field.Store.YES, Field.Index.NOT_ANALYZED)); + doc.add(new Field("field", sb.toString(), Field.Store.YES, Field.Index.NOT_ANALYZED)); + doc.add(new Field("skip", RandomStringGenerator.random(50), Field.Store.YES, Field.Index.NO)); + return doc; + } +} diff --git a/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java b/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java index 14ede18be8a..45ef7404c58 100644 --- a/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java +++ b/src/test/java/org/elasticsearch/test/unit/index/engine/AbstractSimpleEngineTests.java @@ -107,11 +107,11 @@ public abstract class AbstractSimpleEngineTests { } protected Store createStore() throws IOException { - return new Store(shardId, EMPTY_SETTINGS, null, new RamDirectoryService(shardId, EMPTY_SETTINGS)); + return new Store(shardId, EMPTY_SETTINGS, null, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), new RamDirectoryService(shardId, EMPTY_SETTINGS)); } protected Store createStoreReplica() throws IOException { - return new Store(shardId, EMPTY_SETTINGS, null, new RamDirectoryService(shardId, EMPTY_SETTINGS)); + return new Store(shardId, EMPTY_SETTINGS, null, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), new RamDirectoryService(shardId, EMPTY_SETTINGS)); } protected Translog createTranslog() {