LUCENE-10627: Using ByteBuffersDataInput reduce memory copy on compressing data (#987)

This commit is contained in:
luyuncheng 2022-08-02 00:34:41 +08:00 committed by GitHub
parent 04e4f317cb
commit 34154736c6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 851 additions and 66 deletions

View File

@ -106,6 +106,8 @@ Optimizations
field that is indexed with terms and SORTED or SORTED_SET doc values.
(Adrien Grand)
* LUCENE-10627: Using ByteBuffersDataInput reduce memory copy on compressing data. (luyuncheng)
Bug Fixes
---------------------
* LUCENE-10663: Fix KnnVectorQuery explain with multiple segments. (Shiming Li)

View File

@ -20,6 +20,7 @@ module org.apache.lucene.backward_codecs {
requires org.apache.lucene.core;
exports org.apache.lucene.backward_codecs;
exports org.apache.lucene.backward_codecs.compressing;
exports org.apache.lucene.backward_codecs.lucene40.blocktree;
exports org.apache.lucene.backward_codecs.lucene50;
exports org.apache.lucene.backward_codecs.lucene50.compressing;

View File

@ -0,0 +1,297 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.backward_codecs.compressing;
import java.io.IOException;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.compress.LZ4;
/**
* A compression mode. Tells how much effort should be spent on compression and decompression of
* stored fields.
*
* @lucene.experimental
*/
public abstract class CompressionMode {
/**
* A compression mode that trades compression ratio for speed. Although the compression ratio
* might remain high, compression and decompression are very fast. Use this mode with indices that
* have a high update rate but should be able to load documents from disk quickly.
*/
public static final CompressionMode FAST =
new CompressionMode() {
@Override
public Compressor newCompressor() {
return new LZ4FastCompressor();
}
@Override
public Decompressor newDecompressor() {
return LZ4_DECOMPRESSOR;
}
@Override
public String toString() {
return "FAST";
}
};
/**
* A compression mode that trades speed for compression ratio. Although compression and
* decompression might be slow, this compression mode should provide a good compression ratio.
* This mode might be interesting if/when your index size is much bigger than your OS cache.
*/
public static final CompressionMode HIGH_COMPRESSION =
new CompressionMode() {
@Override
public Compressor newCompressor() {
// notes:
// 3 is the highest level that doesn't have lazy match evaluation
// 6 is the default, higher than that is just a waste of cpu
return new DeflateCompressor(6);
}
@Override
public Decompressor newDecompressor() {
return new DeflateDecompressor();
}
@Override
public String toString() {
return "HIGH_COMPRESSION";
}
};
/**
* This compression mode is similar to {@link #FAST} but it spends more time compressing in order
* to improve the compression ratio. This compression mode is best used with indices that have a
* low update rate but should be able to load documents from disk quickly.
*/
public static final CompressionMode FAST_DECOMPRESSION =
new CompressionMode() {
@Override
public Compressor newCompressor() {
return new LZ4HighCompressor();
}
@Override
public Decompressor newDecompressor() {
return LZ4_DECOMPRESSOR;
}
@Override
public String toString() {
return "FAST_DECOMPRESSION";
}
};
/** Sole constructor. */
protected CompressionMode() {}
/** Create a new {@link Compressor} instance. */
public abstract Compressor newCompressor();
/** Create a new {@link Decompressor} instance. */
public abstract Decompressor newDecompressor();
private static final Decompressor LZ4_DECOMPRESSOR =
new Decompressor() {
@Override
public void decompress(
DataInput in, int originalLength, int offset, int length, BytesRef bytes)
throws IOException {
assert offset + length <= originalLength;
// add 7 padding bytes, this is not necessary but can help decompression run faster
if (bytes.bytes.length < originalLength + 7) {
bytes.bytes = new byte[ArrayUtil.oversize(originalLength + 7, 1)];
}
final int decompressedLength = LZ4.decompress(in, offset + length, bytes.bytes, 0);
if (decompressedLength > originalLength) {
throw new CorruptIndexException(
"Corrupted: lengths mismatch: " + decompressedLength + " > " + originalLength, in);
}
bytes.offset = offset;
bytes.length = length;
}
@Override
public Decompressor clone() {
return this;
}
};
private static final class LZ4FastCompressor extends Compressor {
private final LZ4.FastCompressionHashTable ht;
LZ4FastCompressor() {
ht = new LZ4.FastCompressionHashTable();
}
@Override
public void compress(byte[] bytes, int off, int len, DataOutput out) throws IOException {
LZ4.compress(bytes, off, len, out, ht);
}
@Override
public void close() throws IOException {
// no-op
}
}
private static final class LZ4HighCompressor extends Compressor {
private final LZ4.HighCompressionHashTable ht;
LZ4HighCompressor() {
ht = new LZ4.HighCompressionHashTable();
}
@Override
public void compress(byte[] bytes, int off, int len, DataOutput out) throws IOException {
LZ4.compress(bytes, off, len, out, ht);
}
@Override
public void close() throws IOException {
// no-op
}
}
private static final class DeflateDecompressor extends Decompressor {
byte[] compressed;
DeflateDecompressor() {
compressed = new byte[0];
}
@Override
public void decompress(DataInput in, int originalLength, int offset, int length, BytesRef bytes)
throws IOException {
assert offset + length <= originalLength;
if (length == 0) {
bytes.length = 0;
return;
}
final int compressedLength = in.readVInt();
// pad with extra "dummy byte": see javadocs for using Inflater(true)
// we do it for compliance, but it's unnecessary for years in zlib.
final int paddedLength = compressedLength + 1;
compressed = ArrayUtil.growNoCopy(compressed, paddedLength);
in.readBytes(compressed, 0, compressedLength);
compressed[compressedLength] = 0; // explicitly set dummy byte to 0
final Inflater decompressor = new Inflater(true);
try {
// extra "dummy byte"
decompressor.setInput(compressed, 0, paddedLength);
bytes.offset = bytes.length = 0;
bytes.bytes = ArrayUtil.growNoCopy(bytes.bytes, originalLength);
try {
bytes.length = decompressor.inflate(bytes.bytes, bytes.length, originalLength);
} catch (DataFormatException e) {
throw new IOException(e);
}
if (!decompressor.finished()) {
throw new CorruptIndexException(
"Invalid decoder state: needsInput="
+ decompressor.needsInput()
+ ", needsDict="
+ decompressor.needsDictionary(),
in);
}
} finally {
decompressor.end();
}
if (bytes.length != originalLength) {
throw new CorruptIndexException(
"Lengths mismatch: " + bytes.length + " != " + originalLength, in);
}
bytes.offset = offset;
bytes.length = length;
}
@Override
public Decompressor clone() {
return new DeflateDecompressor();
}
}
private static class DeflateCompressor extends Compressor {
final Deflater compressor;
byte[] compressed;
boolean closed;
DeflateCompressor(int level) {
compressor = new Deflater(level, true);
compressed = new byte[64];
}
@Override
public void compress(byte[] bytes, int off, int len, DataOutput out) throws IOException {
compressor.reset();
compressor.setInput(bytes, off, len);
compressor.finish();
if (compressor.needsInput()) {
// no output
assert len == 0 : len;
out.writeVInt(0);
return;
}
int totalCount = 0;
for (; ; ) {
final int count =
compressor.deflate(compressed, totalCount, compressed.length - totalCount);
totalCount += count;
assert totalCount <= compressed.length;
if (compressor.finished()) {
break;
} else {
compressed = ArrayUtil.grow(compressed);
}
}
out.writeVInt(totalCount);
out.writeBytes(compressed, totalCount);
}
@Override
public void close() throws IOException {
if (closed == false) {
compressor.end();
closed = true;
}
}
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.backward_codecs.compressing;
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.store.DataOutput;
/** A data compressor. */
public abstract class Compressor implements Closeable {
/** Sole constructor, typically called from sub-classes. */
protected Compressor() {}
/**
* Compress bytes into <code>out</code>. It is the responsibility of the compressor to add all
* necessary information so that a {@link Decompressor} will know when to stop decompressing bytes
* from the stream.
*/
public abstract void compress(byte[] bytes, int off, int len, DataOutput out) throws IOException;
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.backward_codecs.compressing;
import java.io.IOException;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.util.BytesRef;
/** A decompressor. */
public abstract class Decompressor implements Cloneable {
/** Sole constructor, typically called from sub-classes. */
protected Decompressor() {}
/**
* Decompress bytes that were stored between offsets <code>offset</code> and <code>offset+length
* </code> in the original stream from the compressed stream <code>in</code> to <code>bytes</code>
* . After returning, the length of <code>bytes</code> (<code>bytes.length</code>) must be equal
* to <code>length</code>. Implementations of this method are free to resize <code>bytes</code>
* depending on their needs.
*
* @param in the input that stores the compressed stream
* @param originalLength the length of the original data (before compression)
* @param offset bytes before this offset do not need to be decompressed
* @param length bytes after <code>offset+length</code> do not need to be decompressed
* @param bytes a {@link org.apache.lucene.util.BytesRef} where to store the decompressed data
*/
public abstract void decompress(
DataInput in, int originalLength, int offset, int length, BytesRef bytes) throws IOException;
@Override
public abstract Decompressor clone();
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.backward_codecs.compressing;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.MergeState;
/**
* Computes which segments have identical field name to number mappings, which allows stored fields
* and term vectors in this codec to be bulk-merged.
*
* @lucene.internal
*/
public class MatchingReaders {
/**
* {@link org.apache.lucene.index.SegmentReader}s that have identical field name/number mapping,
* so their stored fields and term vectors may be bulk merged.
*/
public final boolean[] matchingReaders;
/** How many {@link #matchingReaders} are set. */
final int count;
/** Sole constructor */
public MatchingReaders(MergeState mergeState) {
// If the i'th reader is a SegmentReader and has
// identical fieldName -> number mapping, then this
// array will be non-null at position i:
int numReaders = mergeState.maxDocs.length;
int matchedCount = 0;
matchingReaders = new boolean[numReaders];
// If this reader is a SegmentReader, and all of its
// field name -> number mappings match the "merged"
// FieldInfos, then we can do a bulk copy of the
// stored fields:
nextReader:
for (int i = 0; i < numReaders; i++) {
for (FieldInfo fi : mergeState.fieldInfos[i]) {
FieldInfo other = mergeState.mergeFieldInfos.fieldInfo(fi.number);
if (other == null || !other.name.equals(fi.name)) {
continue nextReader;
}
}
matchingReaders[i] = true;
matchedCount++;
}
this.count = matchedCount;
if (mergeState.infoStream.isEnabled("SM")) {
mergeState.infoStream.message(
"SM", "merge store matchedCount=" + count + " vs " + numReaders);
if (count != numReaders) {
mergeState.infoStream.message("SM", "" + (numReaders - count) + " non-bulk merges");
}
}
}
}

View File

@ -0,0 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** Compressing helper classes. */
package org.apache.lucene.backward_codecs.compressing;

View File

@ -18,15 +18,15 @@ package org.apache.lucene.backward_codecs.lucene50;
import java.io.IOException;
import java.util.Objects;
import org.apache.lucene.backward_codecs.compressing.CompressionMode;
import org.apache.lucene.backward_codecs.compressing.Compressor;
import org.apache.lucene.backward_codecs.compressing.Decompressor;
import org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingStoredFieldsFormat;
import org.apache.lucene.backward_codecs.packed.LegacyDirectMonotonicWriter;
import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil;
import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.compressing.Compressor;
import org.apache.lucene.codecs.compressing.Decompressor;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.SegmentInfo;

View File

@ -17,12 +17,12 @@
package org.apache.lucene.backward_codecs.lucene50.compressing;
import java.io.IOException;
import org.apache.lucene.backward_codecs.compressing.CompressionMode;
import org.apache.lucene.backward_codecs.packed.LegacyDirectMonotonicWriter;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.SegmentInfo;

View File

@ -19,11 +19,11 @@ package org.apache.lucene.backward_codecs.lucene50.compressing;
import java.io.EOFException;
import java.io.IOException;
import java.util.Arrays;
import org.apache.lucene.backward_codecs.compressing.CompressionMode;
import org.apache.lucene.backward_codecs.compressing.Decompressor;
import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.compressing.Decompressor;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.FieldInfo;

View File

@ -17,12 +17,12 @@
package org.apache.lucene.backward_codecs.lucene50.compressing;
import java.io.IOException;
import org.apache.lucene.backward_codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.codecs.TermVectorsFormat;
import org.apache.lucene.codecs.TermVectorsReader;
import org.apache.lucene.codecs.TermVectorsWriter;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.Directory;

View File

@ -19,12 +19,12 @@ package org.apache.lucene.backward_codecs.lucene50.compressing;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.lucene.backward_codecs.compressing.CompressionMode;
import org.apache.lucene.backward_codecs.compressing.Decompressor;
import org.apache.lucene.backward_codecs.packed.LegacyPackedInts;
import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.TermVectorsReader;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.compressing.Decompressor;
import org.apache.lucene.index.BaseTermsEnum;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.FieldInfo;

View File

@ -20,9 +20,9 @@ import java.io.IOException;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.compressing.Compressor;
import org.apache.lucene.codecs.compressing.Decompressor;
import org.apache.lucene.backward_codecs.compressing.CompressionMode;
import org.apache.lucene.backward_codecs.compressing.Compressor;
import org.apache.lucene.backward_codecs.compressing.Decompressor;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;

View File

@ -17,10 +17,10 @@
package org.apache.lucene.backward_codecs.lucene87;
import java.io.IOException;
import org.apache.lucene.backward_codecs.compressing.CompressionMode;
import org.apache.lucene.backward_codecs.compressing.Compressor;
import org.apache.lucene.backward_codecs.compressing.Decompressor;
import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.compressing.Compressor;
import org.apache.lucene.codecs.compressing.Decompressor;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.DataInput;

View File

@ -18,11 +18,11 @@ package org.apache.lucene.backward_codecs.lucene87;
import java.io.IOException;
import java.util.Objects;
import org.apache.lucene.backward_codecs.compressing.CompressionMode;
import org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingStoredFieldsFormat;
import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.codecs.StoredFieldsReader;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.index.FieldInfos;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.index.StoredFieldVisitor;

View File

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.backward_codecs.compressing;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.BytesRef;
public abstract class AbstractTestCompressionMode extends LuceneTestCase {
CompressionMode mode;
static byte[] randomArray(Random random) {
int bigsize = TEST_NIGHTLY ? 192 * 1024 : 33 * 1024;
final int max = random.nextBoolean() ? random.nextInt(4) : random.nextInt(255);
final int length = random.nextBoolean() ? random.nextInt(20) : random.nextInt(bigsize);
return randomArray(random, length, max);
}
static byte[] randomArray(Random random, int length, int max) {
final byte[] arr = new byte[length];
for (int i = 0; i < arr.length; ++i) {
arr[i] = (byte) RandomNumbers.randomIntBetween(random, 0, max);
}
return arr;
}
byte[] compress(byte[] decompressed, int off, int len) throws IOException {
Compressor compressor = mode.newCompressor();
return compress(compressor, decompressed, off, len);
}
static byte[] compress(Compressor compressor, byte[] decompressed, int off, int len)
throws IOException {
byte[] compressed = new byte[len * 2 + 16]; // should be enough
ByteArrayDataOutput out = new ByteArrayDataOutput(compressed);
compressor.compress(decompressed, off, len, out);
final int compressedLen = out.getPosition();
return ArrayUtil.copyOfSubArray(compressed, 0, compressedLen);
}
byte[] decompress(byte[] compressed, int originalLength) throws IOException {
Decompressor decompressor = mode.newDecompressor();
return decompress(decompressor, compressed, originalLength);
}
static byte[] decompress(Decompressor decompressor, byte[] compressed, int originalLength)
throws IOException {
final BytesRef bytes = new BytesRef();
decompressor.decompress(
new ByteArrayDataInput(compressed), originalLength, 0, originalLength, bytes);
return BytesRef.deepCopyOf(bytes).bytes;
}
byte[] decompress(byte[] compressed, int originalLength, int offset, int length)
throws IOException {
Decompressor decompressor = mode.newDecompressor();
final BytesRef bytes = new BytesRef();
decompressor.decompress(
new ByteArrayDataInput(compressed), originalLength, offset, length, bytes);
return BytesRef.deepCopyOf(bytes).bytes;
}
public void testDecompress() throws IOException {
Random random = random();
final int iterations = atLeast(random, 3);
for (int i = 0; i < iterations; ++i) {
final byte[] decompressed = randomArray(random);
final int off = random.nextBoolean() ? 0 : TestUtil.nextInt(random, 0, decompressed.length);
final int len =
random.nextBoolean()
? decompressed.length - off
: TestUtil.nextInt(random, 0, decompressed.length - off);
final byte[] compressed = compress(decompressed, off, len);
final byte[] restored = decompress(compressed, len);
assertArrayEquals(ArrayUtil.copyOfSubArray(decompressed, off, off + len), restored);
}
}
public void testPartialDecompress() throws IOException {
Random random = random();
final int iterations = atLeast(random, 3);
for (int i = 0; i < iterations; ++i) {
final byte[] decompressed = randomArray(random);
final byte[] compressed = compress(decompressed, 0, decompressed.length);
final int offset, length;
if (decompressed.length == 0) {
offset = length = 0;
} else {
offset = random.nextInt(decompressed.length);
length = random.nextInt(decompressed.length - offset);
}
final byte[] restored = decompress(compressed, decompressed.length, offset, length);
assertArrayEquals(ArrayUtil.copyOfSubArray(decompressed, offset, offset + length), restored);
}
}
public byte[] test(byte[] decompressed) throws IOException {
return test(decompressed, 0, decompressed.length);
}
public byte[] test(byte[] decompressed, int off, int len) throws IOException {
final byte[] compressed = compress(decompressed, off, len);
final byte[] restored = decompress(compressed, len);
assertEquals(len, restored.length);
return compressed;
}
public void testEmptySequence() throws IOException {
test(new byte[0]);
}
public void testShortSequence() throws IOException {
test(new byte[] {(byte) random().nextInt(256)});
}
public void testIncompressible() throws IOException {
final byte[] decompressed = new byte[RandomNumbers.randomIntBetween(random(), 20, 256)];
for (int i = 0; i < decompressed.length; ++i) {
decompressed[i] = (byte) i;
}
test(decompressed);
}
public void testConstant() throws IOException {
final byte[] decompressed = new byte[TestUtil.nextInt(random(), 1, 10000)];
Arrays.fill(decompressed, (byte) random().nextInt());
test(decompressed);
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.backward_codecs.compressing;
public class TestFastCompressionMode extends AbstractTestCompressionMode {
@Override
public void setUp() throws Exception {
super.setUp();
mode = CompressionMode.FAST;
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.backward_codecs.compressing;
public class TestFastDecompressionMode extends AbstractTestCompressionMode {
@Override
public void setUp() throws Exception {
super.setUp();
mode = CompressionMode.FAST_DECOMPRESSION;
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.backward_codecs.compressing;
public class TestHighCompressionMode extends AbstractTestCompressionMode {
@Override
public void setUp() throws Exception {
super.setUp();
mode = CompressionMode.HIGH_COMPRESSION;
}
}

View File

@ -17,10 +17,10 @@
package org.apache.lucene.backward_codecs.lucene50;
import java.io.IOException;
import org.apache.lucene.backward_codecs.compressing.CompressionMode;
import org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50RWCompressingStoredFieldsFormat;
import org.apache.lucene.codecs.StoredFieldsFormat;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;

View File

@ -30,11 +30,11 @@ import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50Com
import static org.apache.lucene.backward_codecs.lucene50.compressing.Lucene50CompressingStoredFieldsReader.VERSION_CURRENT;
import java.io.IOException;
import org.apache.lucene.backward_codecs.compressing.CompressionMode;
import org.apache.lucene.backward_codecs.compressing.Compressor;
import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.compressing.Compressor;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexableField;

View File

@ -36,11 +36,11 @@ import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.lucene.backward_codecs.compressing.CompressionMode;
import org.apache.lucene.backward_codecs.compressing.Compressor;
import org.apache.lucene.backward_codecs.store.EndiannessReverserUtil;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.codecs.TermVectorsWriter;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.compressing.Compressor;
import org.apache.lucene.index.FieldInfo;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.SegmentInfo;

View File

@ -17,8 +17,8 @@
package org.apache.lucene.backward_codecs.lucene50.compressing;
import java.io.IOException;
import org.apache.lucene.backward_codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.StoredFieldsWriter;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;

View File

@ -17,8 +17,8 @@
package org.apache.lucene.backward_codecs.lucene50.compressing;
import java.io.IOException;
import org.apache.lucene.backward_codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.TermVectorsWriter;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;

View File

@ -21,6 +21,7 @@ import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.util.ArrayUtil;
@ -155,8 +156,11 @@ public abstract class CompressionMode {
}
@Override
public void compress(byte[] bytes, int off, int len, DataOutput out) throws IOException {
LZ4.compress(bytes, off, len, out, ht);
public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException {
final int len = (int) buffersInput.size();
byte[] bytes = new byte[len];
buffersInput.readBytes(bytes, 0, len);
LZ4.compress(bytes, 0, len, out, ht);
}
@Override
@ -174,8 +178,11 @@ public abstract class CompressionMode {
}
@Override
public void compress(byte[] bytes, int off, int len, DataOutput out) throws IOException {
LZ4.compress(bytes, off, len, out, ht);
public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException {
final int len = (int) buffersInput.size();
byte[] bytes = new byte[len];
buffersInput.readBytes(bytes, 0, len);
LZ4.compress(bytes, 0, len, out, ht);
}
@Override
@ -257,9 +264,13 @@ public abstract class CompressionMode {
}
@Override
public void compress(byte[] bytes, int off, int len, DataOutput out) throws IOException {
public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException {
final int len = (int) buffersInput.size();
byte[] bytes = new byte[len];
buffersInput.readBytes(bytes, 0, len);
compressor.reset();
compressor.setInput(bytes, off, len);
compressor.setInput(bytes, 0, len);
compressor.finish();
if (compressor.needsInput()) {

View File

@ -18,6 +18,7 @@ package org.apache.lucene.codecs.compressing;
import java.io.Closeable;
import java.io.IOException;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.DataOutput;
/** A data compressor. */
@ -31,5 +32,6 @@ public abstract class Compressor implements Closeable {
* necessary information so that a {@link Decompressor} will know when to stop decompressing bytes
* from the stream.
*/
public abstract void compress(byte[] bytes, int off, int len, DataOutput out) throws IOException;
public abstract void compress(ByteBuffersDataInput buffersInput, DataOutput out)
throws IOException;
}

View File

@ -24,6 +24,7 @@ import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.compressing.Compressor;
import org.apache.lucene.codecs.compressing.Decompressor;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.util.ArrayUtil;
@ -163,10 +164,12 @@ public final class DeflateWithPresetDictCompressionMode extends CompressionMode
final Deflater compressor;
byte[] compressed;
boolean closed;
byte[] buffer;
DeflateWithPresetDictCompressor(int level) {
compressor = new Deflater(level, true);
compressed = new byte[64];
buffer = BytesRef.EMPTY_BYTES;
}
private void doCompress(byte[] bytes, int off, int len, DataOutput out) throws IOException {
@ -198,22 +201,26 @@ public final class DeflateWithPresetDictCompressionMode extends CompressionMode
}
@Override
public void compress(byte[] bytes, int off, int len, DataOutput out) throws IOException {
public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException {
final int len = (int) (buffersInput.size() - buffersInput.position());
final int dictLength = len / (NUM_SUB_BLOCKS * DICT_SIZE_FACTOR);
final int blockLength = (len - dictLength + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS;
out.writeVInt(dictLength);
out.writeVInt(blockLength);
final int end = off + len;
// Compress the dictionary first
compressor.reset();
doCompress(bytes, off, dictLength, out);
buffer = ArrayUtil.growNoCopy(buffer, dictLength + blockLength);
buffersInput.readBytes(buffer, 0, dictLength);
doCompress(buffer, 0, dictLength, out);
// And then sub blocks
for (int start = off + dictLength; start < end; start += blockLength) {
for (int start = dictLength; start < len; start += blockLength) {
compressor.reset();
compressor.setDictionary(bytes, off, dictLength);
doCompress(bytes, start, Math.min(blockLength, off + len - start), out);
compressor.setDictionary(buffer, 0, dictLength);
int l = Math.min(blockLength, len - start);
buffersInput.readBytes(buffer, dictLength, l);
doCompress(buffer, dictLength, l, out);
}
}

View File

@ -21,6 +21,7 @@ import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.compressing.Compressor;
import org.apache.lucene.codecs.compressing.Decompressor;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
@ -167,23 +168,23 @@ public final class LZ4WithPresetDictCompressionMode extends CompressionMode {
}
@Override
public void compress(byte[] bytes, int off, int len, DataOutput out) throws IOException {
public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException {
final int len = (int) (buffersInput.size() - buffersInput.position());
final int dictLength = len / (NUM_SUB_BLOCKS * DICT_SIZE_FACTOR);
final int blockLength = (len - dictLength + NUM_SUB_BLOCKS - 1) / NUM_SUB_BLOCKS;
buffer = ArrayUtil.growNoCopy(buffer, dictLength + blockLength);
out.writeVInt(dictLength);
out.writeVInt(blockLength);
final int end = off + len;
compressed.reset();
// Compress the dictionary first
System.arraycopy(bytes, off, buffer, 0, dictLength);
buffersInput.readBytes(buffer, 0, dictLength);
doCompress(buffer, 0, dictLength, out);
// And then sub blocks
for (int start = off + dictLength; start < end; start += blockLength) {
int l = Math.min(blockLength, off + len - start);
System.arraycopy(bytes, start, buffer, dictLength, l);
for (int start = dictLength; start < len; start += blockLength) {
int l = Math.min(blockLength, len - start);
buffersInput.readBytes(buffer, dictLength, l);
doCompress(buffer, dictLength, l, out);
}

View File

@ -35,6 +35,7 @@ import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
@ -245,23 +246,18 @@ public final class Lucene90CompressingStoredFieldsWriter extends StoredFieldsWri
final boolean sliced = bufferedDocs.size() >= 2 * chunkSize;
final boolean dirtyChunk = force;
writeHeader(docBase, numBufferedDocs, numStoredFields, lengths, sliced, dirtyChunk);
ByteBuffersDataInput bytebuffers = bufferedDocs.toDataInput();
// compress stored fields to fieldsStream.
//
// TODO: do we need to slice it since we already have the slices in the buffer? Perhaps
// we should use max-block-bits restriction on the buffer itself, then we won't have to check it
// here.
byte[] content = bufferedDocs.toArrayCopy();
bufferedDocs.reset();
if (sliced) {
// big chunk, slice it
for (int compressed = 0; compressed < content.length; compressed += chunkSize) {
compressor.compress(
content, compressed, Math.min(chunkSize, content.length - compressed), fieldsStream);
// big chunk, slice it, using ByteBuffersDataInput ignore memory copy
final int capacity = (int) bytebuffers.size();
for (int compressed = 0; compressed < capacity; compressed += chunkSize) {
int l = Math.min(chunkSize, capacity - compressed);
ByteBuffersDataInput bbdi = bytebuffers.slice(compressed, l);
compressor.compress(bbdi, fieldsStream);
}
} else {
compressor.compress(content, 0, content.length, fieldsStream);
compressor.compress(bytebuffers, fieldsStream);
}
// reset

View File

@ -41,6 +41,7 @@ import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.MergeState;
import org.apache.lucene.index.SegmentInfo;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersDataOutput;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.Directory;
@ -422,11 +423,9 @@ public final class Lucene90CompressingTermVectorsWriter extends TermVectorsWrite
flushPayloadLengths();
// compress terms and payloads and write them to the output
//
// TODO: We could compress in the slices we already have in the buffer (min/max slice
// can be set on the buffer itself).
byte[] content = termSuffixes.toArrayCopy();
compressor.compress(content, 0, content.length, vectorsStream);
// using ByteBuffersDataInput reduce memory copy
ByteBuffersDataInput content = termSuffixes.toDataInput();
compressor.compress(content, vectorsStream);
}
// reset

View File

@ -31,6 +31,7 @@ import org.apache.lucene.codecs.compressing.Compressor;
import org.apache.lucene.codecs.compressing.Decompressor;
import org.apache.lucene.codecs.lucene90.compressing.Lucene90CompressingStoredFieldsFormat;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.store.Directory;
@ -50,9 +51,9 @@ final class SortingStoredFieldsConsumer extends StoredFieldsConsumer {
public void close() throws IOException {}
@Override
public void compress(byte[] bytes, int off, int len, DataOutput out)
public void compress(ByteBuffersDataInput buffersInput, DataOutput out)
throws IOException {
out.writeBytes(bytes, off, len);
out.copyBytes(buffersInput, buffersInput.size());
}
};
}

View File

@ -18,10 +18,12 @@ package org.apache.lucene.codecs.compressing;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.ArrayUtil;
@ -53,9 +55,13 @@ public abstract class AbstractTestCompressionMode extends LuceneTestCase {
static byte[] compress(Compressor compressor, byte[] decompressed, int off, int len)
throws IOException {
byte[] compressed = new byte[len * 2 + 16]; // should be enough
byte[] compressed = new byte[len * 3 + 16]; // should be enough
ByteBuffer bb = ByteBuffer.wrap(decompressed);
ByteBuffersDataInput input = new ByteBuffersDataInput(Arrays.asList(bb)).slice(off, len);
ByteArrayDataOutput out = new ByteArrayDataOutput(compressed);
compressor.compress(decompressed, off, len, out);
compressor.compress(input, out);
final int compressedLen = out.getPosition();
return ArrayUtil.copyOfSubArray(compressed, 0, compressedLen);
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.codecs.compressing;
import org.apache.lucene.codecs.lucene90.DeflateWithPresetDictCompressionMode;
public class TestDeflateWithPresetDictCompressionMode extends AbstractTestCompressionMode {
@Override
public void setUp() throws Exception {
super.setUp();
mode = new DeflateWithPresetDictCompressionMode();
}
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.codecs.compressing;
import org.apache.lucene.codecs.lucene90.LZ4WithPresetDictCompressionMode;
public class TestLZ4WithPresetDictCompressionMode extends AbstractTestCompressionMode {
@Override
public void setUp() throws Exception {
super.setUp();
mode = new LZ4WithPresetDictCompressionMode();
}
}

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import org.apache.lucene.codecs.compressing.CompressionMode;
import org.apache.lucene.codecs.compressing.Compressor;
import org.apache.lucene.codecs.compressing.Decompressor;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.DataOutput;
import org.apache.lucene.tests.codecs.compressing.CompressingCodec;
@ -76,8 +77,8 @@ public class DummyCompressingCodec extends CompressingCodec {
new Compressor() {
@Override
public void compress(byte[] bytes, int off, int len, DataOutput out) throws IOException {
out.writeBytes(bytes, off, len);
public void compress(ByteBuffersDataInput buffersInput, DataOutput out) throws IOException {
out.copyBytes(buffersInput, buffersInput.size());
}
@Override