Remove settings option for index store compression, compression is always enabled

closes #2577
This commit is contained in:
Shay Banon 2013-01-23 13:11:48 +01:00
parent 2880cd0172
commit d969e61999
14 changed files with 17 additions and 1205 deletions

View File

@ -1,164 +0,0 @@
package org.elasticsearch.common.compress;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.apache.lucene.store.*;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.index.store.support.ForceSyncDirectory;
import java.io.IOException;
import java.util.Collection;
/**
*/
public class CompressedDirectory extends Directory implements ForceSyncDirectory {
private final Directory dir;
private final Compressor compressor;
private final boolean actualLength;
private final ImmutableSet<String> compressExtensions;
private final ImmutableSet<String> decompressExtensions;
private volatile boolean compress = true;
public CompressedDirectory(Directory dir, Compressor compressor, boolean actualLength, String... extensions) {
this(dir, compressor, actualLength, extensions, extensions);
}
public CompressedDirectory(Directory dir, Compressor compressor, boolean actualLength, String[] compressExtensions, String[] decompressExtensions) {
this.dir = dir;
this.actualLength = actualLength;
this.compressor = compressor;
this.compressExtensions = ImmutableSet.copyOf(compressExtensions);
this.decompressExtensions = ImmutableSet.copyOf(decompressExtensions);
this.lockFactory = dir.getLockFactory();
}
@Override
public String[] listAll() throws IOException {
return dir.listAll();
}
public void setCompress(boolean compress) {
this.compress = compress;
}
/**
* Utility method to return a file's extension.
*/
public static String getExtension(String name) {
int i = name.lastIndexOf('.');
if (i == -1) {
return "";
}
return name.substring(i + 1, name.length());
}
@Override
public boolean fileExists(String name) throws IOException {
return dir.fileExists(name);
}
@Override
public void deleteFile(String name) throws IOException {
dir.deleteFile(name);
}
/**
* Returns the actual file size, so will work with compound file format
* when compressed. Its the only one that really uses it for offsets...
*/
@Override
public long fileLength(String name) throws IOException {
if (actualLength && decompressExtensions.contains(getExtension(name))) {
// LUCENE 4 UPGRADE: Is this the right IOContext?
IndexInput in = openInput(name, IOContext.READONCE);
try {
return in.length();
} finally {
IOUtils.close(in);
}
}
return dir.fileLength(name);
}
@Override
public void sync(Collection<String> names) throws IOException {
dir.sync(names);
}
@Override
public void forceSync(String name) throws IOException {
if (dir instanceof ForceSyncDirectory) {
((ForceSyncDirectory) dir).forceSync(name);
} else {
dir.sync(ImmutableList.of(name));
}
}
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
if (decompressExtensions.contains(getExtension(name))) {
IndexInput in = dir.openInput(name, context);
Compressor compressor1 = CompressorFactory.compressor(in);
if (compressor1 != null) {
return compressor1.indexInput(in);
} else {
return in;
}
}
return dir.openInput(name, context);
}
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
if (compress && compressExtensions.contains(getExtension(name))) {
return compressor.indexOutput(dir.createOutput(name, context));
}
return dir.createOutput(name, context);
}
// can't override this one, we need to open the correct compression
// @Override
// public void copy(Directory to, String src, String dest) throws IOException {
// dir.copy(to, src, dest);
// }
@Override
public void close() throws IOException {
dir.close();
}
@Override
public void setLockFactory(LockFactory lockFactory) throws IOException {
dir.setLockFactory(lockFactory);
}
@Override
public LockFactory getLockFactory() {
return dir.getLockFactory();
}
@Override
public String getLockID() {
return dir.getLockID();
}
@Override
public Lock makeLock(String name) {
return dir.makeLock(name);
}
@Override
public void clearLock(String name) throws IOException {
dir.clearLock(name);
}
@Override
public String toString() {
return "compressed(" + compressExtensions + "):" + dir.toString();
}
}

View File

@ -26,7 +26,9 @@ import java.io.EOFException;
import java.io.IOException;
/**
* @deprecated Used only for backward comp. to read old compressed files, since we now use codec based compression
*/
@Deprecated
public abstract class CompressedIndexInput<T extends CompressorContext> extends IndexInput {
private IndexInput in;

View File

@ -1,220 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress;
import gnu.trove.iterator.TLongIterator;
import gnu.trove.list.array.TLongArrayList;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.IndexOutput;
import java.io.IOException;
/**
*/
public abstract class CompressedIndexOutput<T extends CompressorContext> extends IndexOutput {
final IndexOutput out;
protected final T context;
protected byte[] uncompressed;
protected int uncompressedLength;
private int position = 0;
private long uncompressedPosition;
private boolean closed;
private final long metaDataPointer;
// need to have a growing segment long array list here...
private TLongArrayList offsets = new TLongArrayList();
public CompressedIndexOutput(IndexOutput out, T context) throws IOException {
this.out = out;
this.context = context;
writeHeader(out);
out.writeInt(0); // version
metaDataPointer = out.getFilePointer();
out.writeLong(-1); // the pointer to the end of the file metadata
}
public IndexOutput underlying() {
return this.out;
}
@Override
public void writeByte(byte b) throws IOException {
if (position >= uncompressedLength) {
flushBuffer();
}
uncompressedPosition++;
uncompressed[position++] = b;
}
@Override
public void writeBytes(byte[] input, int offset, int length) throws IOException {
// ES, check if length is 0, and don't write in this case
if (length == 0) {
return;
}
final int BUFFER_LEN = uncompressedLength;
// simple case first: buffering only (for trivially short writes)
int free = BUFFER_LEN - position;
if (free >= length) {
System.arraycopy(input, offset, uncompressed, position, length);
position += length;
uncompressedPosition += length;
return;
}
// fill partial input as much as possible and flush
if (position > 0) {
System.arraycopy(input, offset, uncompressed, position, free);
position += free;
uncompressedPosition += free;
flushBuffer();
offset += free;
length -= free;
}
// then write intermediate full block, if any, without copying:
while (length >= BUFFER_LEN) {
offsets.add(out.getFilePointer());
compress(input, offset, BUFFER_LEN, out);
offset += BUFFER_LEN;
length -= BUFFER_LEN;
uncompressedPosition += BUFFER_LEN;
}
// and finally, copy leftovers in input, if any
if (length > 0) {
System.arraycopy(input, offset, uncompressed, 0, length);
}
position = length;
uncompressedPosition += length;
}
@Override
public void copyBytes(DataInput input, long length) throws IOException {
final int BUFFER_LEN = uncompressedLength;
// simple case first: buffering only (for trivially short writes)
int free = BUFFER_LEN - position;
if (free >= length) {
input.readBytes(uncompressed, position, (int) length, false);
position += length;
uncompressedPosition += length;
return;
}
// fill partial input as much as possible and flush
if (position > 0) {
input.readBytes(uncompressed, position, free, false);
position += free;
uncompressedPosition += free;
flushBuffer();
length -= free;
}
// then write intermediate full block, if any, without copying:
// Note, if we supported flushing buffers not on "chunkSize", then
// we could have flushed up to the rest of non compressed data in the input
// and then copy compressed segments. This means though that we need to
// store the compressed sizes of each segment on top of the offsets, and
// CompressedIndexInput#seek would be more costly, since it can't do (pos / chunk)
// to get the index...
while (length >= BUFFER_LEN) {
offsets.add(out.getFilePointer());
input.readBytes(uncompressed, 0, BUFFER_LEN);
compress(uncompressed, 0, BUFFER_LEN, out);
length -= BUFFER_LEN;
uncompressedPosition += BUFFER_LEN;
}
// and finally, copy leftovers in input, if any
if (length > 0) {
input.readBytes(uncompressed, 0, (int) length, false);
}
position = (int) length;
uncompressedPosition += length;
}
@Override
public void flush() throws IOException {
// ignore flush, we always want to flush on actual block size
//flushBuffer();
out.flush();
}
@Override
public void close() throws IOException {
if (!closed) {
flushBuffer();
// write metadata, and update pointer
long metaDataPointerValue = out.getFilePointer();
// length uncompressed
out.writeVLong(uncompressedPosition);
// compressed pointers
out.writeVInt(offsets.size());
for (TLongIterator it = offsets.iterator(); it.hasNext(); ) {
out.writeVLong(it.next());
}
out.seek(metaDataPointer);
out.writeLong(metaDataPointerValue);
closed = true;
doClose();
out.close();
}
}
protected abstract void doClose() throws IOException;
@Override
public long getFilePointer() {
return uncompressedPosition;
}
@Override
public void seek(long pos) throws IOException {
throw new IOException("seek not supported on compressed output");
}
@Override
public long length() throws IOException {
return uncompressedPosition;
}
private void flushBuffer() throws IOException {
if (position > 0) {
offsets.add(out.getFilePointer());
compress(uncompressed, 0, position, out);
position = 0;
}
}
protected abstract void writeHeader(IndexOutput out) throws IOException;
/**
* Compresses the data into the output
*/
protected abstract void compress(byte[] data, int offset, int len, IndexOutput out) throws IOException;
}

View File

@ -20,7 +20,6 @@
package org.elasticsearch.common.compress;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -59,7 +58,8 @@ public interface Compressor {
CompressedStreamOutput streamOutput(StreamOutput out) throws IOException;
/**
* @deprecated Used for backward comp. since we now use Lucene compressed codec.
*/
CompressedIndexInput indexInput(IndexInput in) throws IOException;
CompressedIndexOutput indexOutput(IndexOutput out) throws IOException;
}

View File

@ -30,6 +30,7 @@ import java.util.Arrays;
/**
*/
@Deprecated
public class LZFCompressedIndexInput extends CompressedIndexInput<LZFCompressorContext> {
private final ChunkDecoder decoder;

View File

@ -1,65 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.compress.lzf;
import com.ning.compress.BufferRecycler;
import com.ning.compress.lzf.ChunkEncoder;
import com.ning.compress.lzf.LZFChunk;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.compress.CompressedIndexOutput;
import org.elasticsearch.common.lucene.store.OutputStreamIndexOutput;
import java.io.IOException;
/**
*/
public class LZFCompressedIndexOutput extends CompressedIndexOutput<LZFCompressorContext> {
private final BufferRecycler recycler;
private final ChunkEncoder encoder;
public LZFCompressedIndexOutput(IndexOutput out) throws IOException {
super(out, LZFCompressorContext.INSTANCE);
this.recycler = BufferRecycler.instance();
this.uncompressed = this.recycler.allocOutputBuffer(LZFChunk.MAX_CHUNK_LEN);
this.uncompressedLength = LZFChunk.MAX_CHUNK_LEN;
this.encoder = new ChunkEncoder(LZFChunk.MAX_CHUNK_LEN);
}
@Override
protected void writeHeader(IndexOutput out) throws IOException {
out.writeBytes(LZFCompressor.LUCENE_HEADER, LZFCompressor.LUCENE_HEADER.length);
}
@Override
protected void compress(byte[] data, int offset, int len, IndexOutput out) throws IOException {
encoder.encodeAndWriteChunk(data, offset, len, new OutputStreamIndexOutput(out));
}
@Override
protected void doClose() throws IOException {
byte[] buf = uncompressed;
if (buf != null) {
uncompressed = null;
recycler.releaseOutputBuffer(buf);
}
encoder.close();
}
}

View File

@ -24,9 +24,11 @@ import com.ning.compress.lzf.LZFChunk;
import com.ning.compress.lzf.LZFEncoder;
import com.ning.compress.lzf.util.ChunkDecoderFactory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.*;
import org.elasticsearch.common.compress.CompressedIndexInput;
import org.elasticsearch.common.compress.CompressedStreamInput;
import org.elasticsearch.common.compress.CompressedStreamOutput;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.Loggers;
@ -137,9 +139,4 @@ public class LZFCompressor implements Compressor {
public CompressedIndexInput indexInput(IndexInput in) throws IOException {
return new LZFCompressedIndexInput(in, decoder);
}
@Override
public CompressedIndexOutput indexOutput(IndexOutput out) throws IOException {
return new LZFCompressedIndexOutput(out);
}
}

View File

@ -24,10 +24,8 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import jsr166y.ThreadLocalRandom;
import org.apache.lucene.store.*;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.compress.CompressedIndexOutput;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Inject;
@ -37,7 +35,6 @@ import org.elasticsearch.common.lucene.store.ChecksumIndexOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.support.ForceSyncDirectory;
@ -55,30 +52,6 @@ import java.util.zip.Adler32;
*/
public class Store extends AbstractIndexShardComponent {
static {
IndexMetaData.addDynamicSettings(
"index.store.compress.stored",
"index.store.compress.tv"
);
}
class ApplySettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
boolean compressStored = settings.getAsBoolean("index.store.compress.stored", Store.this.compressStored);
if (compressStored != Store.this.compressStored) {
logger.info("updating [index.store.compress.stored] from [{}] to [{}]", Store.this.compressStored, compressStored);
Store.this.compressStored = compressStored;
}
boolean compressTv = settings.getAsBoolean("index.store.compress.tv", Store.this.compressTv);
if (compressTv != Store.this.compressTv) {
logger.info("updating [index.store.compress.tv] from [{}] to [{}]", Store.this.compressTv, compressTv);
Store.this.compressTv = compressTv;
}
}
}
static final String CHECKSUMS_PREFIX = "_checksums-";
public static final boolean isChecksum(String name) {
@ -87,8 +60,6 @@ public class Store extends AbstractIndexShardComponent {
private final IndexStore indexStore;
private final IndexSettingsService indexSettingsService;
private final DirectoryService directoryService;
private final StoreDirectory directory;
@ -101,27 +72,13 @@ public class Store extends AbstractIndexShardComponent {
private final boolean sync;
private volatile boolean compressStored;
private volatile boolean compressTv;
private final ApplySettings applySettings = new ApplySettings();
@Inject
public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, IndexSettingsService indexSettingsService, DirectoryService directoryService) throws IOException {
public Store(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore, DirectoryService directoryService) throws IOException {
super(shardId, indexSettings);
this.indexStore = indexStore;
this.indexSettingsService = indexSettingsService;
this.directoryService = directoryService;
this.sync = componentSettings.getAsBoolean("sync", true); // TODO we don't really need to fsync when using shared gateway...
this.directory = new StoreDirectory(directoryService.build());
this.compressStored = componentSettings.getAsBoolean("compress.stored", false);
this.compressTv = componentSettings.getAsBoolean("compress.tv", false);
logger.debug("using compress.stored [{}], compress.tv [{}]", compressStored, compressTv);
indexSettingsService.addListener(applySettings);
}
public IndexStore indexStore() {
@ -292,7 +249,6 @@ public class Store extends AbstractIndexShardComponent {
}
public void close() throws IOException {
indexSettingsService.removeListener(applySettings);
directory.close();
}
@ -477,17 +433,8 @@ public class Store extends AbstractIndexShardComponent {
computeChecksum = false;
}
}
if (!raw && ((compressStored && name.endsWith(".fdt")) || (compressTv && name.endsWith(".tvf")))) {
if (computeChecksum) {
// with compression, there is no need for buffering when doing checksums
// since we have buffering on the compressed index output
out = new ChecksumIndexOutput(out, new Adler32());
}
out = CompressorFactory.defaultCompressor().indexOutput(out);
} else {
if (computeChecksum) {
out = new BufferedChecksumIndexOutput(out, new Adler32());
}
if (computeChecksum) {
out = new BufferedChecksumIndexOutput(out, new Adler32());
}
return new StoreIndexOutput(metaData, out, name);
}
@ -500,6 +447,7 @@ public class Store extends AbstractIndexShardComponent {
throw new FileNotFoundException(name);
}
IndexInput in = metaData.directory().openInput(name, context);
// Only for backward comp. since we now use Lucene codec compression
if (name.endsWith(".fdt") || name.endsWith(".tvf")) {
Compressor compressor = CompressorFactory.compressor(in);
if (compressor != null) {
@ -515,6 +463,7 @@ public class Store extends AbstractIndexShardComponent {
if (metaData == null) {
throw new FileNotFoundException(name);
}
// Only for backward comp. since we now use Lucene codec compression
if (name.endsWith(".fdt") || name.endsWith(".tvf")) {
// rely on the slicer from the base class that uses an input, since they might be compressed...
// note, it seems like slicers are only used in compound file format..., so not relevant for now
@ -613,9 +562,6 @@ public class Store extends AbstractIndexShardComponent {
out.close();
String checksum = null;
IndexOutput underlying = out;
if (out instanceof CompressedIndexOutput) {
underlying = ((CompressedIndexOutput) out).underlying();
}
if (underlying instanceof BufferedChecksumIndexOutput) {
checksum = Long.toString(((BufferedChecksumIndexOutput) underlying).digest().getValue(), Character.MAX_RADIX);
} else if (underlying instanceof ChecksumIndexOutput) {

View File

@ -1,87 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.compress;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.NIOFSDirectory;
import org.elasticsearch.common.compress.CompressedDirectory;
import org.elasticsearch.common.compress.lzf.LZFCompressor;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.File;
/**
*/
public class LuceneCompressionBenchmark {
public static void main(String[] args) throws Exception {
final long MAX_SIZE = ByteSizeValue.parseBytesSizeValue("50mb").bytes();
final boolean WITH_TV = true;
File testFile = new File("target/test/compress/lucene");
FileSystemUtils.deleteRecursively(testFile);
testFile.mkdirs();
FSDirectory uncompressedDir = new NIOFSDirectory(new File(testFile, "uncompressed"));
IndexWriter uncompressedWriter = new IndexWriter(uncompressedDir, new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER));
Directory compressedLzfDir = new CompressedDirectory(new NIOFSDirectory(new File(testFile, "compressed_lzf")), new LZFCompressor(), false, "fdt", "tvf");
IndexWriter compressedLzfWriter = new IndexWriter(compressedLzfDir, new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER));
System.out.println("feeding data...");
TestData testData = new TestData();
while (testData.next() && testData.getTotalSize() < MAX_SIZE) {
// json
XContentBuilder builder = XContentFactory.jsonBuilder();
testData.current(builder);
builder.close();
Document doc = new Document();
doc.add(new Field("_source", builder.bytes().array(), builder.bytes().arrayOffset(), builder.bytes().length()));
if (WITH_TV) {
Field field = new Field("text", builder.string(), Field.Store.NO, Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS);
doc.add(field);
}
uncompressedWriter.addDocument(doc);
compressedLzfWriter.addDocument(doc);
}
System.out.println("optimizing...");
uncompressedWriter.forceMerge(1);
compressedLzfWriter.forceMerge(1);
uncompressedWriter.waitForMerges();
compressedLzfWriter.waitForMerges();
System.out.println("done");
uncompressedWriter.close();
compressedLzfWriter.close();
compressedLzfDir.close();
uncompressedDir.close();
}
}

View File

@ -1,93 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.compress;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.File;
import java.io.FileOutputStream;
/**
*/
public class PureCompressionBenchmark {
public static void main(String[] args) throws Exception {
final long MAX_SIZE = ByteSizeValue.parseBytesSizeValue("50mb").bytes();
File testFile = new File("target/test/compress/pure");
FileSystemUtils.deleteRecursively(testFile);
testFile.mkdirs();
FileOutputStream rawJson = new FileOutputStream(new File(testFile, "raw_json"));
FileOutputStream rawSmile = new FileOutputStream(new File(testFile, "raw_smile"));
FileOutputStream compressedByDocJson = new FileOutputStream(new File(testFile, "compressed_by_doc_json"));
FileOutputStream compressedByDocSmile = new FileOutputStream(new File(testFile, "compressed_by_doc_smile"));
Compressor compressor = CompressorFactory.defaultCompressor();
StreamOutput compressedJson = compressor.streamOutput(new OutputStreamStreamOutput(new FileOutputStream(new File(testFile, "compressed_json"))));
StreamOutput compressedSmile = compressor.streamOutput(new OutputStreamStreamOutput(new FileOutputStream(new File(testFile, "compressed_smile"))));
TestData testData = new TestData();
while (testData.next() && testData.getTotalSize() < MAX_SIZE) {
{
// json
XContentBuilder builder = XContentFactory.jsonBuilder();
testData.current(builder);
rawJson.write(builder.bytes().array(), builder.bytes().arrayOffset(), builder.bytes().length());
compressedJson.write(builder.bytes().array(), builder.bytes().arrayOffset(), builder.bytes().length());
byte[] compressed = compressor.compress(builder.bytes().array(), builder.bytes().arrayOffset(), builder.bytes().length());
compressedByDocJson.write(compressed);
builder.close();
}
{
// smile
XContentBuilder builder = XContentFactory.smileBuilder();
testData.current(builder);
rawSmile.write(builder.bytes().array(), builder.bytes().arrayOffset(), builder.bytes().length());
compressedSmile.write(builder.bytes().array(), builder.bytes().arrayOffset(), builder.bytes().length());
byte[] compressed = compressor.compress(builder.bytes().array(), builder.bytes().arrayOffset(), builder.bytes().length());
compressedByDocSmile.write(compressed);
builder.close();
}
}
rawJson.close();
rawSmile.close();
compressedJson.close();
compressedSmile.close();
compressedByDocJson.close();
compressedByDocSmile.close();
}
}

View File

@ -1,87 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.benchmark.compress;
import org.elasticsearch.common.compress.bzip2.CBZip2InputStream;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URL;
import java.util.Date;
/**
*/
public class TestData {
private BufferedReader reader;
private String line;
private String id;
private String type;
private String text;
private long totalSize;
public TestData() throws IOException {
URL url = new URL("http://downloads.dbpedia.org/3.0/en/longabstract_en.nt.bz2");
BufferedInputStream stream = new BufferedInputStream(url.openStream());
// read two bytes for the header...
stream.read();
stream.read();
reader = new BufferedReader(new InputStreamReader(new CBZip2InputStream(stream)));
}
public long getTotalSize() {
return totalSize;
}
public boolean next() throws Exception {
line = reader.readLine();
if (line == null) {
return false;
}
totalSize += line.length();
int endId = line.indexOf(' ');
id = line.substring(0, endId);
int endType = line.indexOf(' ', endId + 1);
type = line.substring(endId + 1, endType);
text = line.substring(endType + 1);
return true;
}
public String currentText() {
return text;
}
/**
*/
public XContentBuilder current(XContentBuilder builder) throws Exception {
builder.startObject();
builder.field("id", id);
builder.field("type", type);
builder.field("text", text);
builder.field("time", new Date());
builder.endObject();
return builder;
}
}

View File

@ -1,102 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.stress.compress;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NIOFSDirectory;
import org.elasticsearch.benchmark.compress.TestData;
import org.elasticsearch.common.compress.CompressedDirectory;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.FastByteArrayOutputStream;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.joda.time.DateTime;
import java.io.File;
import java.io.PrintStream;
/**
*/
public class LuceneCompressionStressTest {
public static void main(String[] args) throws Exception {
final boolean USE_COMPOUND = false;
final Compressor compressor = CompressorFactory.defaultCompressor();
File testFile = new File("target/bench/compress/lucene");
FileSystemUtils.deleteRecursively(testFile);
testFile.mkdirs();
Directory dir = new CompressedDirectory(new NIOFSDirectory(new File(testFile, "compressed")), compressor, false, "fdt", "tvf");
TieredMergePolicy mergePolicy = new TieredMergePolicy();
mergePolicy.setUseCompoundFile(USE_COMPOUND);
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER).setMergePolicy(mergePolicy));
System.out.println("feeding data...");
TestData testData = new TestData();
long count = 0;
long round = 0;
while (true) {
// json
XContentBuilder builder = XContentFactory.jsonBuilder();
testData.current(builder);
builder.close();
Document doc = new Document();
doc.add(new Field("_source", builder.bytes().array(), builder.bytes().arrayOffset(), builder.bytes().length()));
if (true) {
Field field = new Field("text", builder.string(), Field.Store.NO, Field.Index.ANALYZED, Field.TermVector.WITH_POSITIONS_OFFSETS);
doc.add(field);
}
writer.addDocument(doc);
if ((++count % 10000) == 0) {
writer.commit();
++round;
System.out.println(DateTime.now() + "[" + round + "] closing");
writer.close(true);
System.out.println(DateTime.now() + "[" + round + "] closed");
CheckIndex checkIndex = new CheckIndex(dir);
FastByteArrayOutputStream os = new FastByteArrayOutputStream();
PrintStream out = new PrintStream(os);
checkIndex.setInfoStream(out);
out.flush();
CheckIndex.Status status = checkIndex.checkIndex();
if (!status.clean) {
System.out.println("check index [failure]\n" + new String(os.bytes().toBytes()));
} else {
System.out.println(DateTime.now() + "[" + round + "] checked");
}
mergePolicy = new TieredMergePolicy();
mergePolicy.setUseCompoundFile(USE_COMPOUND);
writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER).setMergePolicy(mergePolicy));
}
}
}
}

View File

@ -1,316 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.unit.common.compress;
import jsr166y.ThreadLocalRandom;
import org.apache.lucene.document.*;
import org.apache.lucene.index.*;
import org.apache.lucene.store.*;
import org.apache.lucene.util.Bits;
import org.elasticsearch.common.RandomStringGenerator;
import org.elasticsearch.common.compress.CompressedDirectory;
import org.elasticsearch.common.compress.CompressedIndexInput;
import org.elasticsearch.common.compress.CompressedIndexOutput;
import org.elasticsearch.common.compress.Compressor;
import org.elasticsearch.common.compress.lzf.LZFCompressor;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.unit.SizeValue;
import org.testng.annotations.Test;
import java.io.EOFException;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
/**
*/
@Test
public class CompressIndexInputOutputTests {
@Test
public void testLZF() throws Exception {
testCompressor(new LZFCompressor());
}
private void testCompressor(Compressor compressor) throws Exception {
empty(compressor);
simple(compressor);
seek1Compressed(compressor);
seek1UnCompressed(compressor);
copyBytes(compressor);
lucene(compressor);
}
private void empty(Compressor compressor) throws Exception {
Directory dir = new RAMDirectory();
IndexOutput out = compressor.indexOutput(dir.createOutput("test", IOContext.DEFAULT));
out.close();
IndexInput in = compressor.indexInput(dir.openInput("test", IOContext.DEFAULT));
try {
in.readByte();
assert false;
} catch (EOFException e) {
// all is well
}
in.seek(100);
try {
in.readByte();
assert false;
} catch (EOFException e) {
// all is well
}
}
private void simple(Compressor compressor) throws Exception {
Directory dir = new RAMDirectory();
IndexOutput out = compressor.indexOutput(dir.createOutput("test", IOContext.DEFAULT));
long pos1 = out.getFilePointer();
out.writeInt(1);
long pos2 = out.getFilePointer();
out.writeString("test1");
long pos3 = out.getFilePointer();
String largeString = RandomStringGenerator.random(0xFFFF + 5);
out.writeString(largeString);
long pos4 = out.getFilePointer();
out.writeInt(2);
long pos5 = out.getFilePointer();
out.writeString("test2");
out.close();
IndexInput in = compressor.indexInput(dir.openInput("test", IOContext.DEFAULT));
assertThat(in.readInt(), equalTo(1));
assertThat(in.readString(), equalTo("test1"));
assertThat(in.readString(), equalTo(largeString));
assertThat(in.readInt(), equalTo(2));
assertThat(in.readString(), equalTo("test2"));
in.seek(pos3);
assertThat(in.readString(), equalTo(largeString));
in.seek(pos2);
assertThat(in.readString(), equalTo("test1"));
in.seek(pos5);
assertThat(in.readString(), equalTo("test2"));
in.seek(pos1);
assertThat(in.readInt(), equalTo(1));
in.seek(0);
byte[] full = new byte[(int) in.length()];
in.readBytes(full, 0, full.length);
in.close();
}
private void seek1Compressed(Compressor compressor) throws Exception {
seek1(true, compressor);
}
private void seek1UnCompressed(Compressor compressor) throws Exception {
seek1(false, compressor);
}
private void seek1(boolean compressed, Compressor compressor) throws Exception {
Directory dir = new RAMDirectory();
IndexOutput out = compressed ? compressor.indexOutput(dir.createOutput("test", IOContext.DEFAULT)) : dir.createOutput("test", IOContext.DEFAULT);
long pos1 = out.getFilePointer();
out.writeVInt(4);
out.writeInt(1);
long pos2 = out.getFilePointer();
out.writeVInt(8);
long posX = out.getFilePointer();
out.writeInt(2);
out.writeInt(3);
long pos3 = out.getFilePointer();
out.writeVInt(4);
out.writeInt(4);
int size = 50;
long[] positions = new long[size];
String[] data = new String[size];
for (int i = 0; i < 50; i++) {
positions[i] = out.getFilePointer();
data[i] = RandomStringGenerator.random(12345);
out.writeString(data[i]);
}
out.close();
//IndexInput in = dir.openInput("test");
IndexInput in = compressed ? compressor.indexInput(dir.openInput("test", IOContext.DEFAULT)) : dir.openInput("test", IOContext.DEFAULT);
in.seek(pos2);
// now "skip"
int numBytes = in.readVInt();
assertThat(in.getFilePointer(), equalTo(posX));
in.seek(in.getFilePointer() + numBytes);
assertThat(in.readVInt(), equalTo(4));
assertThat(in.readInt(), equalTo(4));
for (int i = 0; i < size; i++) {
in.seek(positions[i]);
assertThat(in.getFilePointer(), equalTo(positions[i]));
assertThat(in.readString(), equalTo(data[i]));
}
}
private void copyBytes(Compressor compressor) throws Exception {
Directory dir = new RAMDirectory();
IndexOutput out = compressor.indexOutput(dir.createOutput("test", IOContext.DEFAULT));
long pos1 = out.getFilePointer();
out.writeInt(1);
long pos2 = out.getFilePointer();
assertThat(pos2, equalTo(4l));
out.writeString("test1");
long pos3 = out.getFilePointer();
String largeString = RandomStringGenerator.random(0xFFFF + 5);
out.writeString(largeString);
long pos4 = out.getFilePointer();
out.writeInt(2);
long pos5 = out.getFilePointer();
out.writeString("test2");
assertThat(out.length(), equalTo(out.getFilePointer()));
long length = out.length();
out.close();
CompressedIndexOutput out2 = compressor.indexOutput(dir.createOutput("test2", IOContext.DEFAULT));
out2.writeString("mergeStart");
long startMergePos = out2.getFilePointer();
CompressedIndexInput testInput = compressor.indexInput(dir.openInput("test", IOContext.DEFAULT));
assertThat(testInput.length(), equalTo(length));
out2.copyBytes(testInput, testInput.length());
long endMergePos = out2.getFilePointer();
out2.writeString("mergeEnd");
out2.close();
IndexInput in = compressor.indexInput(dir.openInput("test2", IOContext.DEFAULT));
assertThat(in.readString(), equalTo("mergeStart"));
assertThat(in.readInt(), equalTo(1));
assertThat(in.readString(), equalTo("test1"));
assertThat(in.readString(), equalTo(largeString));
assertThat(in.readInt(), equalTo(2));
assertThat(in.readString(), equalTo("test2"));
assertThat(in.readString(), equalTo("mergeEnd"));
in.seek(pos1);
assertThat(in.readString(), equalTo("mergeStart"));
in.seek(endMergePos);
assertThat(in.readString(), equalTo("mergeEnd"));
try {
in.readByte();
assert false;
} catch (EOFException e) {
// all is well, we reached hte end...
}
}
private void lucene(Compressor compressor) throws Exception {
CompressedDirectory dir = new CompressedDirectory(new RAMDirectory(), compressor, false, "fdt");
IndexWriter writer = new IndexWriter(dir, new IndexWriterConfig(Lucene.VERSION, Lucene.STANDARD_ANALYZER));
writer.addDocument(createDoc(1, (int) SizeValue.parseSizeValue("100b").singles()));
writer.addDocument(createDoc(2, (int) SizeValue.parseSizeValue("5k").singles()));
writer.commit();
writer.addDocument(createDoc(3, (int) SizeValue.parseSizeValue("2k").singles()));
writer.addDocument(createDoc(4, (int) SizeValue.parseSizeValue("1k").singles()));
writer.commit();
verify(writer);
writer.forceMerge(1);
writer.waitForMerges();
verify(writer);
dir.setCompress(false);
writer.addDocument(createDoc(5, (int) SizeValue.parseSizeValue("2k").singles()));
writer.addDocument(createDoc(6, (int) SizeValue.parseSizeValue("1k").singles()));
verify(writer);
writer.forceMerge(1);
writer.waitForMerges();
verify(writer);
writer.close();
}
private void verify(IndexWriter writer) throws Exception {
CheckIndex checkIndex = new CheckIndex(writer.getDirectory());
CheckIndex.Status status = checkIndex.checkIndex();
assertThat(status.clean, equalTo(true));
IndexReader reader = DirectoryReader.open(writer, true);
final Bits liveDocs = MultiFields.getLiveDocs(reader);
for (int i = 0; i < reader.maxDoc(); i++) {
if (liveDocs != null && !liveDocs.get(i)) {
continue;
}
Document document = reader.document(i);
checkDoc(document);
DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor("id", "field", "count");
reader.document(i, visitor);
document = visitor.getDocument();
checkDoc(document);
}
for (int i = 0; i < 100; i++) {
int doc = ThreadLocalRandom.current().nextInt(reader.maxDoc());
if (liveDocs != null && !liveDocs.get(i)) {
continue;
}
Document document = reader.document(doc);
checkDoc(document);
DocumentStoredFieldVisitor visitor = new DocumentStoredFieldVisitor("id", "field", "count");
reader.document(doc, visitor);
document = visitor.getDocument();
checkDoc(document);
}
}
private void checkDoc(Document document) {
String id = document.get("id");
String field = document.get("field");
int count = 0;
int idx = 0;
while (true) {
int oldIdx = idx;
idx = field.indexOf(' ', oldIdx);
if (idx == -1) {
break;
}
count++;
assertThat(field.substring(oldIdx, idx), equalTo(id));
idx++;
}
assertThat(count, equalTo(Integer.parseInt(document.get("count"))));
}
private Document createDoc(int id, int size) {
Document doc = new Document();
doc.add(new Field("id", Integer.toString(id), StringField.TYPE_STORED));
doc.add(new Field("size", Integer.toString(size), StringField.TYPE_STORED));
doc.add(new Field("skip", RandomStringGenerator.random(50), StoredField.TYPE));
StringBuilder sb = new StringBuilder();
int count = 0;
while (true) {
count++;
sb.append(id);
sb.append(" ");
if (sb.length() >= size) {
break;
}
}
doc.add(new Field("count", Integer.toString(count), StringField.TYPE_STORED));
doc.add(new Field("field", sb.toString(), StringField.TYPE_STORED));
doc.add(new Field("skip", RandomStringGenerator.random(50), StoredField.TYPE));
return doc;
}
}

View File

@ -122,11 +122,11 @@ public abstract class AbstractSimpleEngineTests {
}
protected Store createStore() throws IOException {
return new Store(shardId, EMPTY_SETTINGS, null, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), new RamDirectoryService(shardId, EMPTY_SETTINGS));
return new Store(shardId, EMPTY_SETTINGS, null, new RamDirectoryService(shardId, EMPTY_SETTINGS));
}
protected Store createStoreReplica() throws IOException {
return new Store(shardId, EMPTY_SETTINGS, null, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS), new RamDirectoryService(shardId, EMPTY_SETTINGS));
return new Store(shardId, EMPTY_SETTINGS, null, new RamDirectoryService(shardId, EMPTY_SETTINGS));
}
protected Translog createTranslog() {